Skip to content

异步编程

你会学习到:

  • epoll/kqueue等事件队列/多路复用机制

    • 原理,好处

    • 使用&抽象方法(rust编码实例)

    • mio的抽象方法(着重于关键抽象)

    • 为stackless协程铺垫

  • stackful协程

异步模型

  • 协作方式

    • 抢占式

    • 非抢占式

  • 是否使用栈空间


操作系统层面

本节会介绍操作系统层面的知识:

  • 固件

  • 事件队列

固件

简单来说,固件就像是嵌入在硬件设备内部的“迷你”软件,它直接控制着该硬件的基本操作和功能。它不像操作系统或应用程序那样安装在硬盘上,而是通常固化在硬件的只读存储器(ROM)或闪存中。

固件在异步编程中的作用和工作方式:

  1. 硬件的直接控制者:

    • 像网卡、硬盘控制器、显卡(的BIOS部分)、主板(BIOS/UEFI)等计算机部件都有自己的固件。这些固件负责最底层的操作,比如网卡固件知道如何接收和发送网络数据包,硬盘固件知道如何读取和写入磁盘扇区。

    • 很多时候,这些硬件设备本身就带有一个小型的、专用的处理器(微控制器),固件就运行在这个微控制器上。这很重要,因为它意味着硬件设备可以在不占用主CPU的情况下独立完成一些工作。(书中 P22 提到)

  2. 事件检测与发起通知:

    • 这是固件在异步I/O中最关键的作用。当一个外部事件发生时(例如,网卡接收到数据包,硬盘完成了数据读取),是运行在设备微控制器上的固件首先检测到这个状态变化。

    • 固件检测到事件后,它并不是直接通知你的应用程序。相反,它会采取一种更高效的方式来通知系统的其他部分。

  3. 触发中断(Interrupts):

    • 固件检测到关键事件(如数据准备好、操作完成)后,它会指示硬件向主CPU发送一个硬件中断信号(一个电信号,通过中断请求线IRQ发送)。(书中 P20-P21 描述了这个流程)

    • 这个中断信号会打断CPU当前正在执行的任务,强制CPU去处理这个中断。CPU会根据中断信号找到操作系统预设好的中断处理程序(Interrupt Handler,通常是设备驱动程序的一部分)。

  4. 配合DMA(直接内存访问):

    • 为了进一步提高效率,很多现代硬件(如网卡、硬盘)的固件会配合DMA控制器工作。当数据需要传输时(比如从网卡接收数据到内存),固件会设置好DMA控制器,让数据直接在设备和主内存之间传输,绕过CPU,CPU无需介入每一个字节的传输。

    • 传输完成后,固件(或DMA控制器)才会触发一个中断,通知CPU“数据已经准备好了”。

  5. 实现异步的关键——“通知”而非“轮询”:

    • 想象一下,如果没有固件和中断机制,操作系统或应用程序想要知道网卡有没有收到数据,就只能不断地去轮询(Polling)网卡:“数据来了吗?数据来了吗?”。这会极大地浪费CPU资源,因为CPU大部分时间都在做无意义的查询。

    • 固件的角色就是让设备具备了主动通知的能力。固件在设备内部可能自己进行某种形式的“轮询”或等待,但这发生在设备自己的微控制器上。一旦有事发生,它通过中断机制“通知”操作系统。

    • 操作系统收到了这个硬件中断(由固件发起),就知道某个I/O操作有了进展(比如数据可读了)。然后操作系统会查找是哪个应用程序的哪个任务在等待这个事件,并唤醒这个等待的任务(在Rust的异步模型中,就是调用对应的Waker),让异步运行时(Executor)可以重新调度执行这个任务。(书中 P22 强调了“通知”优于“轮询”的效率优势)

总结来说:

在异步编程的场景下,固件是硬件设备的“大脑”,负责:

  • 直接操作硬件完成底层任务(收发数据、读写磁盘等)。

  • 检测硬件状态的变化和外部事件的发生。

  • 通过硬件中断机制,高效地将事件的发生通知给操作系统和CPU,而不是让CPU或操作系统浪费资源去反复轮询。

  • (可选)配合DMA控制器,在不占用主CPU的情况下完成数据传输。

正是因为有了固件及其配合的中断、DMA等机制,硬件才能在完成任务后主动通知系统,操作系统才能据此唤醒等待的异步任务,从而实现了非阻塞I/O,让CPU可以在等待I/O期间去执行其他计算任务,这就是异步编程提高效率的基础。没有固件的这种底层工作机制,高效的异步编程几乎是不可能实现的。


事件队列

事件队列是操作系统用于管理异步事件的数据结构,其核心功能是:

  • 事件存储:记录来自硬件(如网卡中断、磁盘I/O完成)或软件(如定时器、进程间通信)的事件通知。
  • 事件调度:按优先级或时间顺序将事件传递给应用程序,例如通过回调函数或唤醒阻塞线程147
  • 解耦生产与消费:事件的产生(如网络数据到达)与处理(如应用程序读取数据)通过队列分离,避免阻塞7

典型应用场景

  • GUI事件处理:用户点击、键盘输入等事件通过队列传递给应用程序7
  • 异步I/O:如网络通信中,网卡接收数据后触发中断,内核将事件加入队列,通知应用程序读取17
  • 定时任务:定时器到期事件通过队列调度46

epoll/kqueue/IOCP

机制操作系统核心原理与事件队列的关联
epollLinux基于红黑树+就绪链表,仅返回活跃的文件描述符(FD)。内核维护一个I/O事件队列,epoll通过epoll_wait从队列中获取就绪事件47
kqueueBSD/macOS支持多种事件类型(文件、信号、定时器等),通过事件过滤器管理。内核将事件分类存储到多个队列,kqueue通过kevent统一监听和提取事件47
IOCPWindows基于完成端口模型,与线程池结合实现异步I/O。内核将I/O操作结果直接写入完成队列,应用程序通过GetQueuedCompletionStatus轮询队列7

以网络服务器接收连接为例,结合事件队列和epoll的工作流程:

  1. 注册事件:应用程序通过epoll_ctl将Socket FD注册到epoll实例,内核将其加入监听队列
  2. 事件触发:网卡收到数据后触发中断,内核将FD标记为就绪,移入就绪队列
  3. 事件获取epoll_wait从就绪队列中取出事件,返回给应用程序。
  4. 事件处理:应用程序根据事件类型(如可读、可写)执行回调或线程任务47

这一过程通过内核与用户态协作,事件队列作为中间层,实现了高效的资源调度。


readiness-based event queue

epoll和kqueue是基于准备状态的事件队列,我们从一个socket应用出发,构建一个工作流:

  • 通过epoll_create创建一个事件队列

  • 获取socket的fd

  • 通过系统调用注册读取事件

  • 调用epoll_wait或者kevent来等待事件,这将会使得他被调用的线程阻塞

  • 当事件被触发了,线程将会被释放

那么问题来了,异步是如何实现的呢?

一个案例,使用epoll实现TCP监听服务器

c
// 创建 epoll 实例
int epfd = epoll_create1(0);

// 监听 socket 初始化(略)
int listen_sock = socket(...);
bind(...);
listen(...);

// 注册监听 socket 到 epoll
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;  // 监听可读事件(边缘触发)
ev.data.fd = listen_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &ev);

// 事件循环
struct epoll_event events[MAX_EVENTS];
while (1) {
    int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
    for (int i = 0; i < nfds; i++) {
        if (events[i].data.fd == listen_sock) {
            // 接受新连接并注册到 epoll(略)
        } else {
            // 处理数据或关闭连接(略)
        }
    }
}

创建自己的异步库

创建自己的事件队列

rust的异步生态中有人尽皆知的tokio,其底层依赖的事件队列框架其实是mio,这类似于python中uvloop背后的libuv,它们本质上都是对系统提供的事件队列接口进行封装,当我们学会如何自己封装,我们就掌握了一些底层机制了,具体来说,我们要掌握:

  • 如何基于epoll来创建自己的事件队列

  • 为什么我们要创建抽象

poll&registry

标题的这两个名词代表着mio中最终要的两类抽象:

rust
// 在根目录下的poll.rs中,Poll是事件队列本身的抽象
pub struct Poll {
    registry: Registry,
}


// 三个关键接口:新建事件队列,返回注册表,轮询poll


// 注册表
pub struct Registry {
    selector: sys::Selector,
    /// Whether this selector currently has an associated waker.
    #[cfg(all(debug_assertions, not(target_os = "wasi")))]
    has_waker: Arc<AtomicBool>,
}

//那么为何要有这个结构体呢?
//该问题可以在下文的并发支持中获得解答

ET vsLT

特性水平触发 (LT) - 默认边缘触发 (ET) - EPOLLET
触发条件只要文件描述符处于就绪状态文件描述符状态从未就绪变为就绪的那一刻
通知次数只要状态满足,每次epoll_wait都可能通知状态变化时仅通知一次
数据处理可以不一次性读/写完所有数据,下次还会通知必须一次性读/写完所有数据,直到返回错误(EAGAIN/EWOULDBLOCK)
编程复杂度相对简单,不易丢失事件较高,必须正确处理循环读/写,否则可能丢失数据
效率可能有冗余通知,效率稍低通知次数少,效率通常更高
配合模式可与阻塞或非阻塞I/O配合,但非阻塞更常用必须与非阻塞I/O配合

你自己的事件队列

这一章的核心目标是从头开始构建一个基于 Linux epoll 机制的简单事件队列,并且其API设计会参考流行的底层I/O库 mio。

主要的编程例子(通常在 main.rs 文件中)会做以下事情:

  1. 启动一个辅助服务器:例子需要一个 delayserver 在后台运行。这个服务器的作用是模拟网络延迟,当你向它发送请求时,它会等待指定的毫秒数后再返回响应。

  2. 创建多个网络连接:程序会创建多个 TcpStream(TCP流),同时连接到本地运行的 delayserver。

  3. 设置非阻塞模式:将每个 TcpStream 设置为非阻塞模式。这是异步I/O的基础,意味着读写操作不会卡住线程。

  4. 创建并使用自定义事件队列

    • 初始化在 poll.rs 中定义的 Poll 结构体实例。这个 Poll 结构体封装了底层的 epoll 实例(通过 epoll_create 创建)。

    • 对于每一个 TcpStream,程序会通过 Poll 实例中的 Registry(注册表)来注册对读事件(Read events)的兴趣。这通常会调用底层的 epoll_ctl 函数,将流的文件描述符(file descriptor)添加到 epoll 实例的监听列表中。

    • 注册时,会关联一个唯一的令牌(Token),通常是一个数字(比如循环的索引 i),用来在之后事件发生时识别是哪个流准备好了。

    • 书中提到,例子会使用边缘触发(Edge-Triggered, ET)模式(通过 EPOLLET 标志),这是 mio 的常用模式,也更接近硬件中断的处理方式。

  5. 发送HTTP请求:向每个 TcpStream 写入一个简单的 HTTP GET 请求,请求中会包含不同的延迟时间和标识符。

  6. 进入事件循环

    • 程序进入一个主循环。

    • 在循环中,调用 Poll::poll() 方法(内部调用 epoll_wait)。这个调用会阻塞当前线程,直到至少有一个已注册的流发生了我们感兴趣的事件(这里是可读事件),或者超时(例子中可能设置为永不超时或一个很长的时间)。这是关键的等待点

    • 当 Poll::poll() 返回时,它会带回一个包含就绪事件的列表。

    • 程序遍历这些事件。

    • 通过事件中包含的令牌(Token),程序能准确地知道是哪个 TcpStream 准备好了数据。

    • 从对应的 TcpStream 中读取数据(HTTP响应)。由于使用了边缘触发,这里需要循环读取,直到读完所有可用数据或遇到 WouldBlock 错误,以确保不会错过后续数据(因为边缘触发只通知一次状态变化)。

    • 处理(比如打印)读取到的响应。

    • 程序会持续这个循环,直到所有请求都收到了响应。

这个例子最精妙/最有启发性的地方

这个例子之所以精妙和有启发性,主要在于它揭示了现代高性能异步I/O的核心工作机制,并且是通过自己动手实现来达成的:

  1. 从抽象到底层:它不是直接使用高级的异步框架(如 Tokio 或 async-std),而是直接深入到操作系统提供的底层机制 (epoll)。通过 FFI (Foreign Function Interface) 调用 C 库函数 (libc) 中的 epoll 相关系统调用,让你真切地看到异步运行时与操作系统是如何交互的。这打破了异步编程的“魔法”感。

  2. 单线程处理高并发的基石:这个例子(很可能)只使用单个线程就管理了多个并发的网络连接和I/O操作。它清晰地展示了事件驱动模型:线程只在真正有事可做(I/O 事件就绪)时才工作,在等待期间通过 epoll_wait 将自己阻塞挂起,让出CPU,而不是空转轮询。这是 Node.js、Nginx 以及 Rust 中 Tokio 等高性能框架能够用较少线程处理大量并发连接的根本原因。

  3. 具体化了“事件循环”:我们常听说“事件循环”,这个例子就构建了一个最基础的事件循环。Poll::poll() 的阻塞等待和返回后处理就绪事件的循环,就是事件循环的核心骨架。它将抽象的概念落实到了具体的代码和系统调用上。

  4. 暴露了底层细节和挑战:为了让 epoll 正确工作,例子中必须处理一些底层细节,比如:

    • 将套接字设为非阻塞

    • 理解并正确处理边缘触发(需要循环读取直到 WouldBlock)。

    • 使用令牌(Token)来关联事件和具体的I/O源。
      这些都是在使用底层API时必须面对的问题,理解它们有助于更好地使用上层抽象。

  5. 连接了 mio 和 epoll:通过模仿 mio 的 API (Poll, Registry, Token, Interest) 来封装 epoll,这个例子让你不仅学会了 epoll,还大致理解了像 mio 这样的底层I/O库是如何对不同平台的事件通知机制(epoll, kqueue, IOCP)进行抽象和封装的,虽然这一章只实现了 epoll 部分。

总结来说,这个例子最精妙之处在于它不是教你“怎么用异步”,而是通过让你亲手“构建异步的核心引擎(事件队列)”,揭示了“异步为什么能高效工作”的底层原理。它将操作系统、系统调用、并发模型和编程实践结合起来,提供了一个理解高性能网络编程基础的绝佳切入点。 完成这个例子后,你会对异步运行时(Reactor 部分)的工作方式有更深刻的理解。


并发支持

  1. 分离“等待”和“注册”操作的生命周期和所有权需求

    • 等待事件 (Poll::poll) 这个操作通常只应该在一个专门的线程(事件循环线程)中进行。这个线程负责驱动整个事件循环,阻塞等待 OS 的通知。因此,Poll 实例通常由这个事件循环线程拥有或独占访问。poll() 方法需要 &mut self(可变借用)是很自然的,因为它会接收并可能处理内核返回的事件数据。

    • 注册/注销事件 (Registry::register/deregister) 这个操作可能需要从多个不同的线程发起。想象一个多线程的 Web 服务器,一个新的连接可能在任何一个工作线程中被接受(accept),然后这个工作线程需要将这个新的连接注册到共享的事件队列中。

  2. 线程安全

    • 如果只有一个结构体,并且 poll() 需要 &mut self,那么其他线程就无法在 poll() 阻塞期间调用 register() 或 deregister()(因为它们也需要访问同一个结构体,可能也需要 &mut self 或者至少 &self,但可变借用期间不允许其他借用)。

    • 通过将注册功能分离到 Registry 中,mio 可以让 Registry 可以被安全地克隆 (try_clone) 并在多个线程之间共享(通常包裹在 Arc 中)。Registry 内部只需要包含事件队列的句柄副本(一个整数 fd 或 kq),以及可能需要的线程安全机制(如内部锁,尽管在 mio 的某些实现中,注册操作本身可能是线程安全的系统调用)。

    • 这样,事件循环线程可以持有 Poll 实例并调用 poll(&mut self) 进行阻塞等待,而其他工作线程可以持有 Registry 的克隆副本或 Arc<Registry>,并在任何时候调用 register() 或 deregister() 来管理它们各自的 I/O 资源。这两个操作可以并发进行(或者至少看起来是并发的,底层系统调用可能仍需同步)。

  3. API 设计的清晰性

    • 将职责分开使得 API 更加清晰。Poll 专注于“等待和接收事件通知”,而 Registry 专注于“管理对事件源的兴趣”。这符合单一职责原则。

mio 的 try_clone

Registry::try_clone 这个方法。这正是实现上述多线程注册的关键:

  • 事件循环线程创建 Poll 实例。

  • 事件循环线程通过 poll.registry() 获取一个 &Registry 引用。

  • 这个 &Registry 可以通过 try_clone() 创建出拥有独立所有权的 Registry 实例副本。

  • 这些独立的 Registry 副本可以被发送到其他线程(或者包裹在 Arc 中共享)。

  • 其他线程使用它们自己的 Registry 副本进行注册/注销操作,而事件循环线程继续使用原始的 Poll 实例进行 poll() 等待。

下面我们来直接看看当前最新版本的mio是如何实现的吧

rust
    pub fn try_clone(&self) -> io::Result<Registry> {
        self.selector.try_clone().map(|selector| Registry {
            selector,
            #[cfg(all(debug_assertions, not(target_os = "wasi")))]
            has_waker: Arc::clone(&self.has_waker),
        })
    }

当你调用try_clone,它首先会调用其内部 selector 字段的 try_clone() 方法。这里的 self.selector 是 sys::Selector 类型,在 kqueue 环境下,它实际上是 src/sys/unix/selector/kqueue.rs 中定义的 Selector 类型。

rust
// Inside Selector::try_clone in src/sys/unix/selector/kqueue.rs
pub fn try_clone(&self) -> io::Result<Selector> {
    self.kq.try_clone().map(|kq| Selector {
        // It's the same selector, so we use the same id.
        #[cfg(debug_assertions)]
        id: self.id,
        kq, // The cloned OwnedFd
    })
}
  • kqueue::Selector 结构体包含一个 kq: OwnedFd 字段。这个 OwnedFd 是一个封装了文件描述符(File Descriptor, fd)的类型,这个 fd 指向的是通过 kqueue() 系统调用创建的内核 kqueue 实例。
  • 这里的关键是调用 self.kq.try_clone()OwnedFd 是标准库 std::os::fd 提供的类型,它的 try_clone() 方法会执行 dup() 系统调用
  • dup() 系统调用的作用是:创建一个新的文件描述符,这个新的文件描述符与旧的文件描述符指向同一个底层的内核文件表项(file table entry)。在这个场景下,这个底层内核对象就是那个特定的 kqueue 实例。
  • 因此,self.kq.try_clone() 成功后会返回一个新的 OwnedFd,它包含了一个新的 fd,但这个新 fd 和旧 fd 都指向同一个内核 kqueue 实例
  • 最后,kqueue::Selector::try_clone() 用这个新克隆的 OwnedFd 创建并返回一个新的 kqueue::Selector 实例。注意,调试用的 id 保持不变,表示它们逻辑上是同一个 selector。

实现goroutine

概述

动画内容
  • 一个依托于“栈”的协程的的工作过程(栈式非抢占协程)

  • 展示代码为rust(非常简单,几乎等同于伪代码);默认cpu为x86-64平台(重要,决定了寄存器的类型和名称)

  • 主函数启动了两个协程,每个协程每次完成一个阶段的工作后,就yield回runtime(案例中的代码会贴在简介,也会放在下期视频中讲解)

  • callee-saved寄存器中rbx,rbp,r12-r15实际上都会变化(动画中忘了展示)

  • 如果看完还有疑惑不要紧,将在下期视频得到解答

  • 栈式协程的例子,最常见的便是go语言的goroutine


内容
  • abi和寄存器

  • 代码演示什么是栈跳转/上下文切换

  • 分解一个极简调度器的实现(动画内容中的)

  • 围绕generator和may的技术性分解

    • generator:上下文表示方法;上下文切换;栈的实现

ABI含义和寄存器

  • ABI 定义了二进制级别的接口。它规定了编译后的机器代码如何交互,包括:

    • 数据类型的大小、布局和对齐方式: 比如 int 类型占多少字节,结构体成员在内存中如何排列,内存地址需要按多少字节对齐。

    • 函数调用约定 (Calling Convention): 这是 ABI 的核心部分。它规定了:

      • 函数参数如何传递(是通过寄存器还是通过栈?如果是寄存器,用哪些?如果是栈,参数的入栈顺序是什么?)。

      • 返回值如何传递(通过哪个寄存器?)。

      • 调用函数前后,哪些 CPU 寄存器的值必须由调用者 (caller) 保存和恢复,哪些必须由被调用者 (callee) 保存和恢复(这就是所谓的 caller-saved 和 callee-saved 寄存器)。

      • 函数调用时栈如何设置和清理。

    • 系统调用的方式: 如何从用户程序陷入内核执行系统调用。

    • 目标文件的格式、程序库的格式等等。

简单来说,ABI 确保了由不同编译器编译(甚至用不同语言编写,只要它们遵循相同的 ABI)的目标文件、库文件和可执行文件能够正确地链接和运行在一起。没有统一的 ABI,A 编译器编译的库就无法被 B 编译器编译的程序调用,因为它们可能对函数参数如何传递、寄存器如何使用等基本问题有不同的“约定”。


我们的 swap_registers 函数(或者说,进行上下文切换的这段代码)可以被看作一个特殊的“被调用者”(callee)。调用它的“代码”(即协程 A 运行到一半,需要切换到协程 B)期望在未来某个时刻恢复执行时,某些寄存器的值保持不变,就像调用一个普通函数返回后一样。ABI 规定了哪些寄存器是需要被调用者(swap_registers)负责保存和恢复的,这些就是 callee-saved 寄存器。如果 swap_registers 修改了这些寄存器而没有恢复,那么当协程 A 恢复执行时,它依赖的 callee-saved 寄存器值就可能被破坏,导致程序错误。


需要的汇编知识

  • 数据移动

  • 栈空间存储什么


Rust内联汇编宏

我们使用了x86-64的cpu架构和systemV,intel汇编。

rust一般有两种方式可以内嵌汇编代码:

  • asm!宏

  • 外部汇编文件结合extern"C"调用

前者和后者有一个非常关键的区别:当你使用外部汇编实现switch时,你不需要显示地告知编译器不需要生成前奏和尾声相关的指令,但前者需要,比如:

rust
#[cfg(not(target_os = "windows"))]
#[naked]
#[no_mangle]
#[cfg_attr(target_os = "macos", export_name = "\x01switch")]
unsafe extern "C" fn switch() {
    naked_asm!(
        "mov [rdi + 0x00], rsp",
        "mov [rdi + 0x08], r15",
        "mov [rdi + 0x10], r14",
        "mov [rdi + 0x18], r13",
        "mov [rdi + 0x20], r12",
        "mov [rdi + 0x28], rbx",
        "mov [rdi + 0x30], rbp",
        "mov rsp, [rsi + 0x00]",
        "mov r15, [rsi + 0x08]",
        "mov r14, [rsi + 0x10]",
        "mov r13, [rsi + 0x18]",
        "mov r12, [rsi + 0x20]",
        "mov rbx, [rsi + 0x28]",
        "mov rbp, [rsi + 0x30]",
        "ret"
    );
}

栈跳转

rust
use core::arch::asm; // 导入内联汇编宏

const SSIZE: isize = 48; // 定义一个非常小的栈大小(仅用于演示)

// 代表 CPU 状态的结构体(极简版,只关心栈指针)
#[derive(Debug, Default)]
#[repr(C)] // 保证内存布局与 C 语言兼容
struct ThreadContext {
    rsp: u64, // rsp 寄存器是 x86-64 的栈指针
}

// 我们想在新栈上执行的目标函数
fn hello() -> ! { // "->" 表示这个函数永不返回
    println!("I LOVE WAKING UP ON A NEW STACK!");
    loop {} // 永远循环,模拟任务执行
}

// 实现“切换”到新栈的函数(核心部分)
unsafe fn gt_switch(new: *const ThreadContext) {
    asm!(
        // 将 new 指针指向的 ThreadContext 结构体中的 rsp 成员的值,
        // 移动到 CPU 的 rsp 寄存器中。
        // [{0} + 0x00] 的意思是:取第 0 个输入参数(new 指针)的值,
        // 加上 0 的偏移量(也就是不加偏移),解引用得到内存地址,
        // 再从这个内存地址加载 8 字节(因为 rsp 是 u64)到 rsp 寄存器。
        // 简单说就是: mov rsp, [new]
        "mov rsp, [{0} + 0x00]",
        // 执行 ret 指令。
        "ret",
        // 将 new (指向 ThreadContext 的指针) 作为输入,
        // 让编译器选择一个通用寄存器(reg)来存放它,这个寄存器对应模板中的 {0}。
        in(reg) new,
        // 告诉编译器我们可能修改了内存,并且这个汇编块不会返回(noreturn)。
        // options(nostack) 也可以考虑,明确告知不依赖当前栈。
        options(noreturn)
    );
}

fn main() {
    // 创建一个默认的 ThreadContext 实例
    let mut ctx = ThreadContext::default();
    // 在当前(main 函数)栈上分配一块内存作为“新栈”
    // 这里用 Vec<u8> 模拟,大小为 SSIZE (48字节)
    let mut stack = vec![0_u8; SSIZE as usize];

    unsafe {
        // 计算栈底地址。Vec 的指针指向起始(低地址),栈向下增长,所以栈底是高地址。
        let stack_bottom = stack.as_mut_ptr().offset(SSIZE);
        // 对栈底地址进行 16 字节对齐(System V ABI 要求)。
        // (addr & !15) 是一种常用的向下对齐到 16 倍数地址的技巧。
        let sb_aligned = (stack_bottom as usize & !15) as *mut u8;

        // 在对齐后的栈底向下偏移 16 字节的位置,写入 hello 函数的地址。
        // -16 的位置是 CPU 执行 ret 指令时期望找到返回地址的地方。
        // 我们把 hello 函数的地址伪装成“返回地址”放在这里。
        std::ptr::write(sb_aligned.offset(-16) as *mut u64, hello as u64);

        // 将这个伪造的“返回地址”所在的地址(即新栈的栈顶)
        // 保存到 ctx 的 rsp 字段中。
        ctx.rsp = sb_aligned.offset(-16) as u64;

        // 调用切换函数,传入包含新栈顶指针的 ctx
        gt_switch(&ctx);
        // gt_switch 设置了 noreturn,所以理论上代码不会执行到这里。
    }
}
  • ThreadContext 的极简化: 这里只定义了 rsp 字段。因为这个最简单的例子只关心如何设置新栈的指针,暂时不关心保存旧栈的状态。完整纤程切换需要保存更多寄存器。#[repr(C)] 确保 rsp 字段就在结构体的开头,方便汇编代码通过 [{0} + 0x00] 访问。

  • 模拟新栈 (Vec<u8>): 作者用 Vec<u8> 在当前函数(main)的栈(或堆,取决于 Vec 的大小和分配策略,但概念上是在当前执行流的内存空间)中分配了一块连续内存,来模拟一个独立的栈空间。

  • 栈地址和对齐:

    • 计算 stack_bottom:获取 Vec 内存的末尾地址(高地址)。

    • 计算 sb_aligned:因为 System V ABI 要求栈指针(rsp)在函数调用(特别是 call 指令之后,ret 指令之前)必须是 16 字节对齐的,所以作者必须确保我们即将设置给 rsp 的地址满足这个对齐要求。(addr & !15) 是一个位运算技巧,将地址强制向下对齐到最近的 16 的倍数。!15 的二进制是 ...11110000,与地址进行按位与操作会清除低 4 位,从而实现 16 字节对齐。

  • 在“新栈”上伪造“返回地址”: 这是这个例子最核心的技巧。

    • std::ptr::write(sb_aligned.offset(-16) as *mut u64, hello as u64);

    • 我们知道栈是向下增长的。sb_aligned 是对齐后的栈底(最高有效地址)。

    • sb_aligned.offset(-16) 指向的是从对齐栈底向下(向低地址)移动 16 个字节的位置。

    • hello as u64 将 hello 函数的地址转换成一个 64 位整数。

    • 这行代码的作用就是,在我们准备好的新栈内存的特定位置(栈顶-16字节处)写入 hello 函数的地址。

    • 为什么是 -16 字节? 因为 ret 指令会从 rsp 指向的地址弹出一个 8 字节的值(在 x86-64 中,地址是 8 字节)作为返回地址,然后跳转到这个地址。并且,在 ret 执行之前,rsp 通常指向的是栈上的最后一个值(比如 push 的最后一个参数或保存的寄存器),而返回地址位于 rsp + 8 的位置(对于标准 call 指令压栈的情况)。但这里我们是直接设置 rsp,然后立刻 ret。所以,我们需要让 rsp 直接指向我们存放 hello 函数地址的那个 8 字节内存。同时考虑到 16 字节对齐要求,以及我们可能需要在 hello 函数地址之上(更高地址)预留空间(例如 red zone,虽然这个简单例子没用),将 hello 地址放在 -16 的位置,并将 rsp 指向它,是一个相对安全且符合 ABI 潜在要求的做法。最关键的是,当 ret 执行时,rsp 必须指向存放 hello 地址的地方。

  • 设置 ctx.rsp:

    • ctx.rsp = sb_aligned.offset(-16) as u64;

    • 我们将刚才存放 hello 函数地址的那个内存地址(也就是我们希望 rsp 最终指向的位置)保存到 ctx 结构体的 rsp 字段中。

  • gt_switch 函数的核心逻辑:

    • mov rsp, [{0} + 0x00]:这行汇编代码读取 ctx.rsp 的值(我们刚刚存入的、指向新栈上 hello 函数地址的那个地址),并将其写入 CPU 的 rsp 寄存器。执行完这句后,CPU 的栈指针就指向了我们新栈上的特定位置!

    • ret:CPU 执行 ret 指令。它会:

      1. 从当前 rsp 指向的内存地址(也就是我们新栈上存放 hello 地址的地方)读取 8 字节。

      2. 将这 8 字节的值(也就是 hello 函数的地址)加载到指令指针寄存器 (rip) 中。

      3. 增加 rsp 的值(弹出操作)。

    • 结果: rip 现在指向了 hello 函数的第一条指令。CPU 的下一个指令周期就会去执行 hello 函数的代码。并且,由于 rsp 已经被设置为指向我们的新栈,hello 函数内部的所有栈操作(比如 println! 可能需要的栈空间,虽然这个例子简单,但原理如此)都会发生在我们手动创建的 stack (Vec) 上,而不是原来的 main 函数的栈上。

  • unsafe 关键字: 整个操作涉及直接读写内存指针 (std::ptr::write) 和执行内联汇编 (asm!) 来修改 CPU 核心寄存器 (rsp) 以及控制流 (ret 技巧),这些都是 Rust 无法保证内存安全的,所以必须在 unsafe 块中进行。

如果仍然理解不了上述代码,可以在"switch"前加入下面代码来可视化栈空间:

rust
for i in 0..SSIZE {
 println!("mem: {}, val: {}",
 sb_aligned.offset(-i as isize) as usize,
 *sb_aligned.offset(-i as isize))
}

用户态线程/纤程的实现

下面说说要实现一个用户态线程,我们可能可以想象到的内容:

  1. 创建多个纤程: 每个纤程有自己的栈和执行上下文。

  2. 保存和恢复上下文: 在纤程之间切换时,能够完整地保存当前纤程的执行状态(必要的 CPU 寄存器),并恢复下一个纤程的状态。

  3. 调度: 实现一个(虽然非常简单的)调度器,决定下一个应该运行哪个纤程。

  4. Yield (让出)机制: 提供一种方式让当前运行的纤程主动放弃 CPU 控制权,让调度器选择其他纤程运行。

  5. 纤程生命周期管理: 处理纤程的创建、运行、就绪、结束等状态。

结合栈跳转的例子,先构思一下该如何实现


generator crate

generator-rs 库是一个非常典型的、基于栈(Stackful)的协程实现

  • 上下文表示 (RegContext, detail/asm/*.rs, detail/<arch>_<os>.rs):

    • 定义了 Registers 结构体(在 detail/<arch>_<os>.rs 中),根据不同的 CPU 架构(x86_64, aarch64, riscv64, arm, loongarch64)和操作系统(Unix-like, Windows)包含了需要保存的 callee-saved 寄存器(包括通用寄存器和浮点寄存器)。
    • RegContext 结构体包装了 Registers。

    • 这个库更完整,考虑了更多寄存器(尤其是浮点寄存器)和特定平台(如 Windows x86_64 的 TEB 结构)的需求。

  • 上下文切换 (detail/asm/*.S, swap_registers):

    • 汇编代码实现了完整的保存旧上下文(将 CPU 寄存器值写入 out_regs 指针指向的内存)和加载新上下文(从 in_regs 指针指向的内存读取值并加载到 CPU 寄存器)。
    • 跳转: x86_64 版本使用了 pop rax 后 jmp rax 的方式跳转,而 ARM/AArch64/RISC-V 等则通常通过修改链接寄存器(lr/ra)或使用特定的跳转指令(如 br/jr)实现。
  • 栈管理 (stack/mod.rs, stack/sys.rs 及平台特定文件):

    • 上个例子: 使用 Vec<u8> 模拟栈,手动计算对齐和栈底/栈顶。
    • 此库实现:

    • 系统级栈分配: 使用操作系统提供的内存映射功能(如 mmap on Unix, VirtualAlloc on Windows)来分配真正的、页面对齐的栈内存 (SysStack)。这比 Vec 更接近真实线程栈。

    • 栈保护 (Guard Page): 在栈的底部(低地址端)设置一个保护页 (mprotect on Unix, VirtualProtect on Windows 设置 PAGE_GUARD)。当发生栈溢出,访问到这个保护页时,会触发一个硬件异常(段错误/访问冲突)。

    • 栈溢出处理 (stack/overflow_*.rs): 注册信号处理器(Unix)或向量化异常处理器(Windows),捕获由访问保护页引起的异常。处理器判断是否确实是栈溢出,如果是,则设置错误状态并安全地切换回父上下文,而不是让程序直接崩溃。这是比书中例子健壮得多的关键特性。

    • StackBox<T>: 提供了一个类似 Box<T> 但在协程栈上分配内存的智能指针,并自动管理栈上数据的生命周期和栈内存的回收(在其 Drop 实现中)。


stack overflow(选读)

1. overflow_*.rs 文件主要解决什么问题?

  • 问题核心: 这个库为每个生成器(协程)分配了固定大小的独立栈空间。与普通线程栈不同(操作系统通常可以按需增长),生成器的栈是预先分配好的,大小有限。如果生成器内部的函数调用层级过深、递归没有终止条件,或者在栈上分配了非常大的局部变量,就可能耗尽这个固定大小的栈,导致栈溢出 (Stack Overflow)
  • 危险性: 传统的栈溢出会覆盖栈以外的内存(例如,返回地址、其他变量),导致程序崩溃、数据损坏甚至安全漏洞。
  • 解决方案: 为了安全地检测和处理这种栈溢出,而不是让它破坏程序状态,库采用了栈保护页 (Guard Page) 机制。
    • 在分配的栈内存底部(内存地址最低处,因为栈通常向下增长)紧邻着设置一个或多个特殊的内存页。
    • 这些页被标记为不可访问(例如,只读或无访问权限)。
    • 当栈增长超出其分配的有效范围,尝试写入或读取这个保护页时,CPU 会触发一个硬件异常(在 Windows 上是 EXCEPTION_STACK_OVERFLOW,在 Unix 上通常是 SIGSEGV - 段错误)。
  • overflow_*.rs 的作用: 这些文件包含了平台特定的代码,用于:
    • 设置异常/信号处理程序: 注册一个自定义的处理函数,以便在发生上述硬件异常时能够捕获它。
    • 识别栈溢出: 在处理函数中,判断捕获到的异常/信号是否确实是由访问我们设置的保护页引起的(而不是其他内存访问错误)。
    • 安全地处理溢出: 如果确认是生成器的栈溢出,处理程序会:
      • 阻止程序直接崩溃。
      • 记录一个特定的错误状态(Error::StackErr)到发生溢出的生成器的上下文中。
      • 修改当前的执行上下文(CPU 寄存器状态),使得当异常处理返回时,程序能够安全地回到调用生成器的地方,并将 Error::StackErr 作为结果(通常通过 panic 机制)传递回去。

2. 栈溢出一般在什么情况下可能出现?

  • 无限递归或过深递归: 函数不断调用自身,每次调用都在栈上分配新的栈帧,最终耗尽栈空间。

    rust
    fn recursive() {
        recursive(); // 无终止条件
    }
    // 在生成器内调用 recursive()
  • 非常深的函数调用链: 函数 A 调用 B,B 调用 C……每一层调用都需要栈空间,如果调用链太长,也会溢出。

  • 在栈上分配过大的数据结构:

    rust
    fn large_array() {
        let buffer: [u8; 1024 * 1024] = [0; 1024 * 1024]; // 尝试在栈上分配 1MB
        // ... 使用 buffer ...
    }
    // 在生成器内调用 large_array(),如果生成器栈小于 1MB 就可能溢出

3. test_overflow 是如何通过测试去验证功能的?

我们再看一下 src/rt.rs 中的 test_overflow

rust
#[test]
fn test_overflow() {
    // ... setup ...
    let result = catch_unwind(|| { // 1. 捕获 panic
        let mut g = Gn::new_scoped(move |_s: Scope<(), ()>| {
            let guard = super::guard::current(); // 2. 获取保护页范围

            // 3. 故意访问保护页的起始地址
            std::hint::black_box(unsafe { *(guard.start as *const usize) });

            // 4. 如果能执行到这里,说明处理失败,强制终止
            eprintln!("entered unreachable code");
            std::process::abort();
        });
        g.next(); // 5. 运行生成器,触发访问
    });

    // 6. 断言捕获到的 panic 是我们预期的 StackErr
    assert!(matches!(
        result.map_err(|err| *err.downcast::<Error>().unwrap()),
        Err(Error::StackErr)
    ));
}
  • 步骤 2 & 3: 测试代码获取计算出的保护页范围 (guard),然后故意去读取保护页的第一个字节 (guard.start)。这一定会触发硬件异常。
  • 步骤 5: 调用 g.next() 启动生成器,执行到访问保护页的代码。
  • 预期流程:
    • 硬件异常被触发。
    • overflow_windows.rs 中的 vectored_handler 被调用。
    • vectored_handler 检查异常代码是否为 EXCEPTION_STACK_OVERFLOW,并检查发生异常时的栈指针 (context.Sp 或 context.Rsp) 是否落在 guard::current() 计算出的范围内。
    • 如果检查通过,vectored_handler 将 Error::StackErr 设置到当前生成器的 cur.err 字段。
    • vectored_handler 调用 context_init,这个函数修改 context(异常发生时的 CPU 状态),主要是设置 Pc (程序计数器) 指向一个能触发 panic 的路径(通常是恢复到调用 swap_registers 之后,然后检查 cur.err 并 resume_unwind)。
    • vectored_handler 返回 EXCEPTION_CONTINUE_EXECUTION,操作系统根据修改后的 context 恢复执行。
    • 执行流回到 gen_impl.rs 中的 resume_gen 函数,检测到 cur.err 中有错误,于是调用 panic::resume_unwind(err)
  • 步骤 1 & 6: catch_unwind 捕获到这个由 Error::StackErr 引发的 panic,测试断言捕获到的错误确实是 Error::StackErr,从而验证整个栈溢出检测和处理机制按预期工作。
  • 步骤 4: 如果异常处理失败(例如,vectored_handler 没有被调用,或者 context_init 没能正确引导 panic 流程),那么访问保护页后的代码会被意外执行,eprintln! 和 abort() 会确保测试在这种情况下失败。

4. Unix 和 Windows 两个平台的差异点在于什么?

主要的差异在于操作系统提供的异常/信号处理机制不同

  • 异常/信号类型:
    • Windows: 使用异常 (Exception) 机制。访问保护页触发 EXCEPTION_STACK_OVERFLOW 硬件异常。
    • Unix: 使用信号 (Signal) 机制。访问保护页(通常用 mprotect 设置为 PROT_NONE)会触发 SIGSEGV (Segmentation Fault) 信号。
  • 处理程序注册:
    • Windows: 使用 AddVectoredExceptionHandler 注册一个向量异常处理程序 (VEH)。VEH 是一个链式处理机制,允许在标准的结构化异常处理 (SEH) 之前捕获异常。
    • Unix: 使用 sigaction 系统调用来注册一个信号处理函数,指定处理 SIGSEGV 信号。通常还需要设置 SA_SIGINFO 标志以接收更详细的信号信息,并可能使用 sigaltstack 来确保信号处理函数本身在一个安全的栈上运行(防止处理函数自己也栈溢出)。
  • 获取上下文信息:
    • Windows: VEH 处理函数接收一个 EXCEPTION_POINTERS 指针,其中包含 ExceptionRecord (异常信息) 和 ContextRecord (发生异常时的 CPU 寄存器状态,即 CONTEXT 结构)。
    • Unix: sigaction 注册的信号处理函数(如果设置了 SA_SIGINFO)接收信号编号、一个 siginfo_t 结构(包含信号的详细原因,如出错的内存地址 si_addr)和一个 ucontext_t 指针(包含信号发生时的 CPU 寄存器状态)。
  • 恢复执行/修改上下文:
    • Windows: vectored_handler 通过修改传入的 CONTEXT 结构体(特别是 PcSp 等),然后返回 EXCEPTION_CONTINUE_EXECUTION 来控制异常处理后的执行流程。
    • Unix: 信号处理函数通常通过修改传入的 ucontext_t 结构体中的寄存器值(如 uc_mcontext.gregs[REG_RIP/REG_PC]uc_mcontext.gregs[REG_RSP/REG_SP] 等)来改变信号处理函数返回后的执行点。或者,在某些情况下,可以使用 setcontext 来直接切换到另一个上下文。
  • 栈保护实现:
    • Windows: 通常依赖操作系统在分配栈时自动处理保护页,或者使用 VirtualProtect 配合 PAGE_GUARD 标志。
    • Unix: 通常使用 mmap 分配内存,然后使用 mprotect 将栈底部的页面标记为 PROT_NONE(不可访问)。

总的来说,目标都是一样的:捕获访问保护页的事件,确认是栈溢出,然后安全地将错误传递回调用者。但实现的技术细节和依赖的操作系统 API 完全不同。

现在你对这部分的逻辑应该有了更清晰的理解。test_overflow 在 Windows ARM64 上失败,很可能意味着 vectored_handler 没有被正确触发、没有正确识别溢出、或者 context_init 在恢复 ARM64 上下文时存在问题,导致无法正确地将 Error::StackErr panic 传递出来。

区别

generator-rs 库是一个非常典型的、基于栈(Stackful)的协程实现。它提供了比示例更完整、更健壮、跨平台性也更好的实现。

generator-rs 库完成的功能(对照例子中内容):

  1. 上下文表示 (RegContext, detail/asm/*.rs, detail/_.rs):

    • 例子实现: 定义了 ThreadContext 结构体来保存 callee-saved 寄存器。

    • 此库实现:

      • 定义了 Registers 结构体(在 detail/_.rs 中),根据不同的 CPU 架构(x86_64, aarch64, riscv64, arm, loongarch64)和操作系统(Unix-like, Windows)包含了需要保存的 callee-saved 寄存器(包括通用寄存器和浮点寄存器)。

      • RegContext 结构体包装了 Registers。

      • 这个库的实现比书中例子更完整,考虑了更多寄存器(尤其是浮点寄存器)和特定平台(如 Windows x86_64 的 TEB 结构)的需求。

  2. 上下文切换 (detail/asm/*.S, swap_registers):

    • 例子实现: 使用 Rust 内联汇编 asm! 宏编写 switch 函数,手动 mov 寄存器值并使用 ret 跳转。

    • 此库实现:

      • 更专业的方式: 使用外部汇编文件 (.S 文件) 来编写上下文切换的核心逻辑 (swap_registers),并通过 extern "C" 在 Rust 代码中声明和调用。这种方式通常比内联汇编更稳定、更易于维护,尤其对于复杂的、跨平台的汇编代码。

      • 汇编代码实现了完整的保存旧上下文(将 CPU 寄存器值写入 out_regs 指针指向的内存)和加载新上下文(从 in_regs 指针指向的内存读取值并加载到 CPU 寄存器)。

      • 跳转: x86_64 版本使用了 pop rax 后 jmp rax 的方式跳转,而 ARM/AArch64/RISC-V 等则通常通过修改链接寄存器(lr/ra)或使用特定的跳转指令(如 br/jr)实现。

  3. 栈管理 (stack/mod.rs, stack/sys.rs 及平台特定文件):

    • 例子实现: 使用 Vec 模拟栈,手动计算对齐和栈底/栈顶。

    • 此库实现:

      • 系统级栈分配: 使用操作系统提供的内存映射功能(如 mmap on Unix, VirtualAlloc on Windows)来分配真正的、页面对齐的栈内存 (SysStack)。这比 Vec 更接近真实线程栈。

      • 栈保护 (Guard Page): 在栈的底部(低地址端)设置一个保护页 (mprotect on Unix, VirtualProtect on Windows 设置 PAGE_GUARD)。当发生栈溢出,访问到这个保护页时,会触发一个硬件异常(段错误/访问冲突)。

      • 栈溢出处理 (stack/overflow_*.rs): 注册信号处理器(Unix)或向量化异常处理器(Windows),捕获由访问保护页引起的异常。处理器判断是否确实是栈溢出,如果是,则设置错误状态并安全地切换回父上下文,而不是让程序直接崩溃。这是比书中例子健壮得多的关键特性。

      • StackBox: 提供了一个类似 Box 但在协程栈上分配内存的智能指针,并自动管理栈上数据的生命周期和栈内存的回收(在其 Drop 实现中)。

  4. 协程初始化 (initialize_call_frame, gen_init, bootstrap_green_task):

    • 例子实现: 手动在 spawn 函数中计算偏移量,使用 std::ptr::write 将 guard 和 f 的地址写入栈中,并设置初始 rsp。

    • 此库实现:

      • 定义了一个标准的入口函数类型 InitFn (gen_init)。

      • initialize_call_frame 函数负责在新栈上构建初始栈帧。它会将实际要执行的函数指针(f 对应的包装闭包)和参数(虽然例子中参数用法较复杂)保存到特定的寄存器(如 x86_64 的 r12, r13, r14),并将启动代码 (bootstrap_green_task) 的地址设置为初始的“返回地址”(写入栈上或 lr/ra 寄存器)。

      • bootstrap_green_task 是一小段汇编代码,它负责将保存在特定寄存器中的参数移动到 ABI 规定的函数调用参数寄存器(如 x86_64 的 rdi, rsi),然后跳转到实际的用户函数 (fptr,最终会调用到 gen_init_impl)。

      • gen_init_impl (detail/gen.rs) 负责调用用户提供的闭包,并通过 catch_unwind 捕获 panic,最后调用 yield_now 结束协程。

  5. 运行时和调度 (rt.rs, yield_.rs):

    • 例子实现: 实现了一个简单的 Runtime 结构体和 t_yield 函数进行轮询调度。

    • 此库实现:

      • 使用 thread_local! 维护一个上下文栈 (ContextStack),允许多个协程嵌套调用(虽然通常不这么用)。Context 结构体除了包含 RegContext,还保存了父子关系、传递参数(para)和返回值(ret)的指针、引用计数(_ref 用于判断状态)以及错误信息。

      • Yield 实现 (yield_now, raw_yield_now): 核心是获取当前上下文和父上下文,然后调用 RegContext::swap 进行切换。

      • 提供了更丰富的 yield 形式,如 yield_with(传递返回值)、get_yield(获取传入参数)、yield_(结合两者)、yield_from(委托给另一个生成器)。

      • 没有显式的调度器: 这个库更侧重于提供上下文切换和 yield 的机制,它本身不包含像书中 Runtime::t_yield 那样的显式调度循环。它依赖于调用者(使用 Generator 的代码)来决定何时以及如何恢复协程(通过调用 resume 或 send)。

  6. 对外接口 (lib.rs, gen_impl.rs, scope.rs):

    • 例子实现: 直接使用 Runtime 和 spawn。

    • 此库实现:

      • 提供了更友好的 Generator 和 LocalGenerator 类型 (GeneratorObj) 来封装底层的 GeneratorImpl 和 StackBox。

      • 提供了 Gn::new_scoped 等构造函数,使用作用域闭包 (Scoped Closure) (Scope<'scope, 'a, A, T>) 来创建协程。这是一种更安全的方式,可以更好地处理借用和生命周期问题,避免直接传递函数指针。

      • 提供了 Iterator 实现,使得无参数输入的生成器可以像迭代器一样使用 next() 来驱动


May赏析

  1. 这是一个什么项目? may 是一个用于 Rust 语言的栈式协程库 (Stackful Coroutine Library)。从 src/lib.rs 的文档注释和 Cargo.toml 的描述来看,它的目标是提供一个类似于 Go 语言 Goroutine 的并发编程模型,让开发者能够更容易地编写和维护大规模并发程序。

  2. 它有什么样的作用? may 库的主要作用是提供一套高效、易用的并发原语和运行时环境,其核心特性包括:

    • 栈式协程: 基于 generator crate 实现,协程拥有自己的栈,可以在任意函数调用点挂起和恢复,简化异步逻辑编写。
    • 调度器: 支持将协程调度到可配置数量的线程上执行,利用多核 CPU 提高性能。支持本地队列和全局队列,并可选地启用工作窃取 (Work Stealing) 策略来平衡负载 (scheduler.rscrossbeam_queue_shim.rs)。
    • 协程本地存储 (CLS): 提供了类似线程本地存储 (TLS) 的机制,但作用域是协程级别 (local.rscoroutine_local! 宏)。
    • 异步 I/O: 集成了高效的异步网络 I/O (如 TCP, UDP) 和可能的其他 I/O 操作,与协程模型无缝结合 (src/io/src/net/)。
    • 定时器: 提供了高效的定时器管理,用于实现 sleep 或其他需要超时的操作 (timeout_list.rssleep.rs)。
    • 同步原语: 提供了协程版本的标准同步原语,如 MPSC/SPMC 队列 (may_queue/), 互斥锁, 条件变量, 信号量等 (src/sync/)。
    • 协程取消: 支持取消正在运行或挂起的协程 (cancel.rs)。
    • Panic 处理: 优雅地处理协程内部的 panic,避免影响其他协程。
    • 作用域协程: 支持创建保证在特定作用域结束前完成的协程 (scoped.rs)。
    • 通用 Select: 提供了 select! 宏,可以同时等待多个不同的异步操作完成 (cqueue.rsmacros.rs)。
  3. 代码实现中最精妙的部份 这个项目有很多设计精巧的地方,要选出“最”精妙的部分比较主观,但我认为以下几个方面特别值得关注:

    • 高效的定时器管理 (src/timeout_list.rs): TimeOutList 的实现非常巧妙。它没有使用简单的线性列表或单一的优先队列来管理所有定时器,而是结合了 哈希表 (按时间间隔分组)最小堆 (管理不同间隔列表的最近到期时间)。每个时间间隔对应一个独立的 MPSC 队列 (TimeoutQueueWrapper)。这种设计:

      • 减少了锁竞争: 不同间隔的定时器操作主要在各自的队列上进行。
      • 提高了查找效率: 通过哈希表快速定位到特定间隔的队列。
      • 优化了近期事件处理: 最小堆使得获取下一个最近到期的事件非常高效。
      • 这种分层和分组的设计在处理大量、不同周期的定时事件时,能显著提升性能和扩展性。
    • 灵活的调度器与工作窃取 (src/scheduler.rs): 调度器是协程库的核心。may 的调度器设计考虑了多核环境下的性能:

      • 混合队列: 每个工作线程有自己的本地队列 (SPSC 或 SPMC),同时还有全局队列 (MPSC) 用于跨线程调度。
      • 可选的工作窃取: 当启用 work_steal 特性时,空闲的线程可以从其他线程的本地队列中“窃取”任务来执行,这是一种成熟且高效的负载均衡策略,能有效提高 CPU 利用率。crossbeam_queue_shim.rs 或 may_queue::spmc 提供了实现工作窃取所需的数据结构。
    • 通用的 cqueueselect! 宏 (src/cqueue.rs, src/macros.rs): 不同于 Go 语言主要基于 channel 的 select,mayselect! 宏更加通用。它通过 Cqueue (Coroutine Queue) 来实现:

      • select! 的每个分支被包装成一个独立的“选择协程”。
      • 这些选择协程执行其“上半部分”逻辑(例如发起一个异步操作)。
      • 当操作可能阻塞时,选择协程通过 EventSender::send 发送一个事件到 Cqueue 并挂起自己。
      • 调用 cqueue.poll 的主协程会等待 Cqueue 中的事件。
      • 一旦收到事件,poll 会恢复对应的选择协程,让它执行“下半部分”逻辑。
      • 这种机制允许在任意异步操作(只要能包装成 FnOnce(EventSender))上进行 select,非常灵活和强大。
    • 栈式协程与 generator 的结合 (src/coroutine_impl.rs): 虽然底层的 generator 是外部库,但 may 将其与调度、本地存储、取消、JoinHandle 等机制无缝集成,构建了一个完整且易用的栈式协程运行时,这是整个库的基础,也是其区别于 async/await 的核心所在。


rust-future

概览

  • 无栈协程通识:future-状态机/async-await模型

  • 什么是future,leaf future

  • 为什么都使用async和await

  • 嵌套与数据依赖


  • 讲解代码主要为rust,但讲解对象是rust和python

  • 共同点和不同点


这一章是一个过渡章节,从我们之前深入研究的 Stackful 协程(纤程/绿色线程)转向 Rust 异步编程的核心抽象——Future trait 以及与之相关的概念。

这一章的目的不是深入实现细节(那将是后续章节的任务),而是为你建立一个关于 Rust 异步模型的高层次理解和心智模型。它为你学习后面章节如何手动实现基于 Future 的 coroutine 和 runtime 打下概念基础。

1. 核心概念引入:Rust 的 Future

  • 目标: 理解 Future 在 Rust 异步编程中的核心地位。
  • 内容:
    • 什么是 Future 它代表一个尚未完成的异步计算。它不是计算结果本身,而是一个最终会产生结果(或错误)的操作句柄。
    • 基于 Poll 的模型: Rust 的 Future 采用轮询 (Polling) 模型。这意味着你需要反复“询问” Future:“你完成了吗?” 直到它最终完成。这与 Stackful 协程那种可以任意暂停和恢复的方式有本质区别。
    • 三个阶段:
      • Poll (轮询) 阶段: 执行器 (Executor) 调用 Future 的 poll 方法,推动计算前进,直到遇到无法立即完成的点(如等待 I/O)。
      • Wait (等待) 阶段: 当 Future 无法继续前进时,它会安排一个“唤醒”机制(通过 Waker),通常是注册到反应器 (Reactor) 上,等待某个外部事件(如 I/O 就绪)。此时 Future 返回 Pending
      • Wake (唤醒) 阶段: 当外部事件发生,Reactor 触发之前注册的 WakerWaker 通知 Executor。
      • 再次 Poll: Executor 收到通知后,会再次调度并调用 Future 的 poll 方法,继续计算,直到最终返回 Ready(result)
  • 重要性: 这是理解后续所有 Rust 异步内容的基础。

2. Leaf Futures vs. Non-leaf Futures

  • 目标: 区分两种不同类型的 Future,理解它们在异步系统中的不同角色。
  • 内容:
    • Leaf Futures (叶子 Future):
      • 通常由运行时底层 I/O 库提供。
      • 代表具体的异步资源或操作,如一个 TCP 连接 (TcpStream)、一个 UDP Socket、一个定时器。
      • 它们的 poll 方法通常会与操作系统的 I/O 事件(如 Reactor)交互。
      • 我们通常不直接实现它们,除非在编写运行时或底层库。
      • 例子:tokio::net::TcpStream::connect() 返回的 Future。
    • Non-leaf Futures (非叶子 Future):
      • 我们通常用 async 块或 async fn 编写的就是这种 Future
      • 代表一系列组合的操作,是一个可以暂停的计算流程。
      • 它们的 poll 方法通常会驱动内部逻辑,并最终轮询 (poll) 其他 Future(可能是叶子 Future,也可能是其他非叶子 Future)。
      • 当内部的 Future 返回 Pending 时,它通常也会返回 Pending。当内部的 Future 返回 Ready 时,它会继续执行后续操作。
      • 例子:async { let stream = TcpStream::connect(...).await; stream.read(...).await; } 这个 async 块本身就是一个 Non-leaf Future。

3. Runtimes (运行时) 的必要性

  • 目标: 理解为什么 Rust 标准库不提供异步运行时,以及运行时在 Rust 异步生态中的作用。
  • 内容:
    • Rust 标准库只提供基础抽象: Future trait, Waker, Context, Poll enum, 以及 async/await 语法糖。它不包含实际执行 Future、调度任务、处理 I/O 事件的机制。
    • 运行时的职责:
      • Executor (执行器): 负责调度轮询 (poll) Futures,推动它们执行直到完成。
      • Reactor (反应器): 负责与操作系统交互,监听 I/O 事件(使用 epoll, kqueue, iocp 等),并在事件就绪时唤醒 (wake) 对应的 Future (通过 Waker)。
    • 需要选择: 因为标准库不提供,开发者必须选择一个第三方运行时库(如 Tokio, async-std, smol 等)来实际运行异步代码。
    • 对比其他语言: 这与 Go, C#, JavaScript 等自带运行时的语言不同。

4. 异步运行时的高层心智模型 (Reactor-Executor 模型)

  • 目标: 建立一个关于典型 Rust 异步运行时如何工作的简化模型。
  • 内容:
    • Executor 持有 Futures: Executor 管理一组待处理的顶层 Futures (Tasks)。
    • Executor Polls Future with Waker: Executor 调用 Future 的 poll 方法,并传入一个 Waker
    • Future Returns Pending: 如果 Future 未完成(例如等待 I/O),它返回 Pending,并且(通常在内部)将 Waker 注册到 Reactor。
    • Reactor Waits for Events: Reactor 监听底层 I/O 事件。
    • Reactor Wakes Future: 当事件发生,Reactor 找到对应的 Waker 并调用 wake()
    • Executor Reschedules: wake() 通知 Executor 该 Future 已准备好再次被 poll。Executor 将其放入就绪队列。
    • Executor Polls Again: Executor 再次调用 Future 的 poll 方法。
    • 循环直到 Ready: 这个过程重复进行,直到 Future 返回 Ready(result)
  • 关键交互: Executor 与 Future 交互(通过 pollWaker),Reactor 与 Future 交互(通过注册 Waker 和底层事件),Reactor 最终通过 Waker 通知 Executor。Executor 和 Reactor 通常不直接通信。

5. I/O 密集型 vs. CPU 密集型任务

  • 目标: 理解在 async 代码中如何处理长时间运行的计算密集型任务,以避免阻塞 Executor。
  • 内容:
    • async 块/函数中的代码(在 await 点之间)是同步执行的,并且通常运行在 Executor 所在的线程上。
    • 如果这部分代码包含 CPU 密集型计算,它会阻塞 Executor,使其无法调度和轮询其他 Futures,降低并发性能。
    • 解决方案: 通常是将 CPU 密集型任务移出 Executor 线程,例如:
      • 使用运行时提供的 spawn_blocking (如 Tokio) 将任务放到一个专门的阻塞线程池中执行,并返回一个 Future 来等待其完成。
      • (理论上)运行时可以将 Executor 自身移动到不同线程(较少见)。
      • (理论上)创建自定义的 Reactor 来处理计算任务(更复杂)。

Future构造实例

见视频&代码讲解

python和rust的async模型

相似点 (Core Concepts are Shared):
  1. async/await 语法糖 (Syntactic Sugar):

    • 两者都使用 async 关键字定义异步函数/代码块,使用 await 关键字暂停执行并等待一个异步操作完成。
    • 核心作用相同: 都是为了让开发者能够以更接近同步代码的风格编写异步逻辑,避免回调地狱。
    • 底层转换: 无论是 Python 的 async def 还是 Rust 的 async fn,编译器(或解释器在某种程度上)都会将这种语法转换为某种形式的状态机生成器,使其能够暂停和恢复。
  2. 核心异步单元 (Coroutine/Future/Awaitable):

    • Python: async def 函数返回一个协程对象 (coroutine object),它是一个可等待对象 (awaitable)。
    • Rust: async fnasync {} 块返回一个实现了 Future trait 的匿名类型。
    • 概念共通: 这两者都代表一个尚未完成的、可以被暂停和恢复的计算单元。它们是异步操作的基本抽象。
  3. 事件循环/调度器 (Event Loop/Executor):

    • 两者都需要一个事件循环/调度器来驱动异步任务的执行。
    • Python (asyncio): asyncio 提供了 EventLoop,负责管理和调度协程。你通过 asyncio.run()loop.create_task() 将协程提交给事件循环。
    • Rust: Rust 标准库不提供事件循环或 Executor。你需要选择一个第三方运行时库(如 Tokio, async-std, smol)来提供 Executor 和 Reactor。
    • 职责相似: 无论是 Python 的 Event Loop 还是 Rust 的 Executor,它们都负责轮询(或被通知)就绪的异步单元,并调用其恢复执行的方法(Python 协程的 send() 或 Rust Future 的 poll())。
  4. 基于 Poll/Resume 的模型(广义上):

    • 虽然具体机制不同(Python 是 send(),Rust 是 poll()),但两者都依赖于一种“检查点”,在这些点上异步单元可以暂停,并将控制权交还给事件循环/调度器。当异步单元准备好继续时,事件循环/调度器会再次调用相应的方法来恢复它。
  5. Leaf Future / 底层异步操作:

    • 两者都有封装底层异步 I/O 操作的“叶子”单元。
    • Python (asyncio): 标准库的 asyncio.open_connectionasyncio.sleep,以及第三方库如 aiohttp 提供的 session.get() 等返回的都是可等待对象,它们内部与事件循环和操作系统的非阻塞 I/O 交互。
    • Rust: 运行时库(如 Tokio)提供的 TcpStream::connect()sleep() 等返回的是实现了 Future trait 的类型,它们是与 Reactor 交互的 Leaf Future。HttpGetFuture 就是这种 Leaf Future 的一个模拟。
  6. 非阻塞 I/O 为基础:

    • 为了实现真正的并发,两者的异步模型都依赖于底层的非阻塞 I/O 操作。当一个 I/O 操作不能立即完成时,它不会阻塞整个线程,而是允许事件循环/调度器去执行其他任务。

不同点 (Implementation Details and Language Philosophy):
  1. 运行时 (Runtime) 的提供方式:

    • Python (asyncio): 标准库内置。 asyncio 是 Python 标准库的一部分,提供了开箱即用的事件循环和异步原语。
    • Rust: 标准库不提供,由第三方库实现。 这是 Rust 的一个重要设计哲学——保持标准库小而核心,将更复杂的或有多种实现方式的功能留给生态系统。这导致了 Rust 中有多个可选的异步运行时。
  2. 内存管理和所有权:

    • Python: 垃圾回收 (GC)。 对象的生命周期由 GC 管理,这在编写异步代码时,对于共享状态和闭包捕获等情况处理起来相对简单。
    • Rust: 所有权和借用检查。 Rust 的严格内存安全模型对异步编程带来了挑战,尤其是在处理跨 await 点的引用和生命周期时。这直接导致了 Pin 的出现(第九章内容)。
      • 自引用结构 (Self-Referential Structs): Rust 的 async/await 生成的状态机很容易变成自引用结构(状态机内部的字段可能引用状态机自身内存中的其他字段,特别是当 await 发生在持有引用的变量的作用域内时)。Python 由于 GC 和引用计数,这类问题不明显或以不同方式处理。
      • Pin<T> Rust 引入 Pin 来保证某些数据(如 Future 状态机)在内存中的位置不会被移动,从而安全地处理自引用。Python 不需要这种显式的 pinning 机制。
  3. 并发与并行模型:

    • Python (asyncio): 主要是单线程并发asyncio 的事件循环通常运行在一个 OS 线程上。虽然可以通过 loop.run_in_executor() 将 CPU 密集型任务分派到线程池,但 asyncio 本身的核心是单线程的协作式多任务。GIL(全局解释器锁)也限制了 Python 在 CPU 密集型任务上利用多核的能力。
    • Rust: 异步运行时(如 Tokio)通常提供多线程 Executor,可以将异步任务调度到多个 OS 线程上执行,从而实现真正的并行(如果任务是 CPU 绑定的且可以并行化)。Rust 没有 GIL,因此可以更好地利用多核 CPU。
  4. 错误处理:

    • Python: 主要使用异常 (Exceptions)try...except 块用于捕获异步操作中可能发生的错误。
    • Rust: 主要使用 Result<T, E> 枚举。异步函数通常返回 Resultawait 之后需要显式处理 OkErr.unwrap()? 操作符是常用的处理方式。
  5. 取消 (Cancellation):

    • Python (asyncio): 协程任务 (Task) 可以被取消 (task.cancel())。取消会向协程内部注入一个 CancelledError 异常,协程可以通过 try...finally 来处理清理工作。
    • Rust: Future 本身没有内置的取消机制。当一个 Future 不再被 poll(例如它的句柄被 drop 掉),它关联的异步操作可能会在后台继续进行,也可能因为没有被 poll 而永远无法完成。运行时或更高级别的抽象(如 Tokio 的 JoinHandle)可能会提供取消功能,但这通常是通过某种信道或状态标记来实现的,而不是语言层面的特性。generator-rs 提供的 Cancel 机制是库层面的实现。
  6. 底层抽象和性能:

    • Python: 更高层次的抽象,动态类型,解释执行(或 JIT)。性能通常不如 Rust。
    • Rust: 更接近底层,静态类型,编译到机器码。Rust 的目标之一是提供“零成本抽象”,其异步模型在设计上非常注重性能和内存效率。Future trait 和 poll 模型的设计就是为了最小化开销。

总结:

特性Python (asyncio)Rust
核心语法async/awaitasync/await
异步单元协程对象 (Generator-like)实现了 Future trait 的匿名类型 (状态机)
驱动机制Event Loop (标准库提供)Executor (第三方库提供)
调度方式协作式,事件驱动协作式,事件驱动
Leaf 操作标准库/第三方库提供的 awaitable I/O 函数运行时/第三方库提供的 Leaf Future
内存管理垃圾回收所有权、借用检查、Pin
并发/并行主要是单线程并发,可通过线程池实现部分并行通常支持多线程 Executor,可实现真正的并行
错误处理异常 (Exceptions)Result<T, E>
取消机制Task.cancel() 注入 CancelledErrorFuture 本身无内置取消,依赖运行时或上层抽象
性能/底层较高层抽象,动态,受 GIL 影响更底层,静态,注重零成本抽象和性能

通过对比,你可以看到,尽管实现细节和语言特性有所不同,但构建异步编程模型的基本思想(状态机、事件驱动、非阻塞I/O)是共通的。Rust 的模型由于其对内存安全和性能的极致追求,引入了像 Pin 这样在其他语言中不常见的概念,。


嵌套与数据依赖

协程之间互相嵌套是一种常见的场景,对于无栈协程,调用栈上并没有真正的“嵌套协程调用”。实际上是父 Future 在其 poll 方法中调用子 Future 的 poll 方法。控制权是通过 Poll::Pending 和 Poll::Ready 以及 Waker 来回传递的。以具体的代码为例,我们去揭示几种常见的嵌套形式:

协程间存在数据依赖
rust
async fn task_a() -> ResultTypeA {
    // 假设 some_async_op_a() 返回一个 Future<Output = ResultTypeA>
    some_async_op_a().await
}

async fn task_b(input_from_a: ResultTypeA) -> ResultTypeB {
    // ... 使用 input_from_a 执行异步操作 b ...
    // 假设 some_async_op_b(input_from_a) 返回一个 Future<Output = ResultTypeB>
    some_async_op_b(input_from_a).await
}

async fn main_task() -> ResultTypeB {
    println!("Starting main task...");

    // 1. 调用并等待 task_a 完成
    let result_a = task_a().await;
    println!("Task A completed with result: {:?}", result_a); // 假设 ResultTypeA实现了Debug

    // 2. 将 task_a 的结果传递给 task_b,并等待 task_b 完成
    let result_b = task_b(result_a).await;
    println!("Task B completed with result: {:?}", result_b); // 假设 ResultTypeB实现了Debug

    result_b // 返回 task_b 的结果
}

// fn main() {
//     let runtime = MyRuntime::new(); // 假设你有一个运行时
//     runtime.block_on(main_task());
// }

实际生成的代码可能是这样的:

rust
enum MainTaskState {
    Start,
    WaitingA(Box<dyn Future<Output = ResultTypeA>>),
    WaitingB {
        result_from_a: ResultTypeA, // 需要保存A的结果
        future_b: Box<dyn Future<Output = ResultTypeB>>,
    },
    Done,
}

struct MainTaskFuture {
    state: MainTaskState,
}

impl MainTaskFuture {
    fn new() -> Self {
        MainTaskFuture {
            state: MainTaskState::Start,
        }
    }
}

impl Future for MainTaskFuture {
    type Output = ResultTypeB; // 假设最终返回 B 的结果

    fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
        loop { // 内部循环驱动状态转换
            match self.state {
                MainTaskState::Start => {
                    println!("Starting main task...");
                    let future_a = Box::new(task_a_future()); // 创建 task_a 的 Future
                    self.state = MainTaskState::WaitingA(future_a);
                    // 继续 loop,尝试 poll future_a
                }
                MainTaskState::WaitingA(ref mut future_a) => {
                    match future_a.as_mut().poll(waker) { // Poll 子 Future A
                        Poll::Ready(result_a) => {
                            println!("Task A completed with result: {:?}", result_a);
                            let future_b = Box::new(task_b_future(result_a.clone())); // 使用 A 的结果创建 B
                            self.state = MainTaskState::WaitingB {
                                result_from_a: result_a, // 保存A的结果,如果后续还需用
                                future_b,
                            };
                            // 继续 loop,尝试 poll future_b
                        }
                        Poll::Pending => {
                            return Poll::Pending; // 子 Future A 未就绪,整个任务也未就绪
                        }
                    }
                }
                MainTaskState::WaitingB { ref result_from_a, ref mut future_b } => {
                    match future_b.as_mut().poll(waker) { // Poll 子 Future B
                        Poll::Ready(result_b) => {
                            println!("Task B completed with result: {:?}", result_b);
                            self.state = MainTaskState::Done;
                            return Poll::Ready(result_b); // 整个任务完成,返回B的结果
                        }
                        Poll::Pending => {
                            return Poll::Pending; // 子 Future B 未就绪,整个任务也未就绪
                        }
                    }
                }
                MainTaskState::Done => {
                    panic!("Polled a completed future");
                }
            }
        }
    }
}
没有依赖,但需要同步-join
rust
async fn task_a() -> ResultTypeA {
    println!("Task A started");
    // ... 执行异步操作 a ...
    some_async_op_a().await
}

async fn task_b() -> ResultTypeB {
    println!("Task B started");
    // ... 执行异步操作 b ...
    some_async_op_b().await
}

async fn main_task() {
    println!("Starting main task...");

    // 使用 join! 同时启动并等待 task_a 和 task_b
    // join! 会同时 poll future_a 和 future_b
    let (result_a, result_b) = tokio::join!(task_a(), task_b());
    // 对于标准库的 futures::join! 或其他类似宏,用法可能略有不同

    println!("Task A completed with result: {:?}", result_a);
    println!("Task B completed with result: {:?}", result_b);
    println!("Main task completed.");
}

// 在你的运行时中驱动 main_task
// fn main() {
//     tokio::runtime::Runtime::new().unwrap().block_on(main_task());
// }
顶层任务
rust
async fn task_a() -> ResultTypeA { /* ... */ }
async fn task_b() -> ResultTypeB { /* ... */ }

async fn main_task() {
    println!("Starting main task...");

    // 将 task_a 和 task_b 作为独立的任务提交给运行时调度器
    let handle_a = tokio::spawn(task_a());
    let handle_b = tokio::spawn(task_b());

    println!("Tasks A and B spawned.");

    // main_task 可以继续做其他事情,或者在需要时等待结果
    // 例如,等待 task_a 完成:
    let result_a = handle_a.await.unwrap(); // unwrap 处理 JoinError
    println!("Task A completed with result: {:?}", result_a);

    // 或者等待 task_b 完成:
    let result_b = handle_b.await.unwrap();
    println!("Task B completed with result: {:?}", result_b);

    println!("Main task completed.");
}

reactor-executor模式

这一章节我们需要将loop based的Future的poll方法改写为基于事件队列的reactor-executor模式的实现。

首先我们回忆一下两个知识点:

  • 事件队列如何“赋能”异步

  • Future状态机


如果你记得之前的内容,你大概能在大脑中想象一个现代异步框架是如何工作的:我们之前视频中的事件流转实际是基于轮训+检查模式的,通过事件队列,我们可以在注册了我们的等待任务之后,实现一个全局函数,该函数的任务是poll(等待外部回信,并在收到信息后将信息保存起来),对于一个顶部的Future的poll调用,当接收到回信就立刻去处理对应的子Future(一个信息总会对应到一个叶子Future)

现在的问题在于:

  • 单线程能够保证多个Future被正常调度吗

  • 当需要等待io时,当前Future如何释放cpu

  • 当poll()返回时,reactor如何让调度中心将cpu提供给对应的Future

为了实现可以调度多个任务的,基于事件响应的系统,我们必须了解一些控制线程状态的api


线程控制

我们想要的实际是对“线程时间片”的控制,毕竟在我们设计这个系统的过程中,几乎一定会遇到所有任务都在等待外部系统响应的情况(在一个io密集型任务中这很常见),为此我们不能让cpu自旋,但是需要让Reactor或者其他什么东西能够在接收到响应时立刻告知我们,由于第一个收到外部系统消息的一定是Reactor(poll()方法),所以从直觉上来说,reactor应该可以直接或者间接地控制线程的handler,让执行者在leaf future中“暂停”线程,而让reactor在收到消息后通过handler让该线程重新工作。

因此,既然我们一定需要它们,那么它们大概是存在的:

  • 标准库的Thread对象,即handler

  • std::thread::park(),暂停操作

  • std::thread::unPark()

  • Waker对象,让reactor能够提醒executor thread的关键


简单的多线程方案

基于我们上面的讨论,我们来重构一个更合理的多线程异步系统:

首先我们需要明确:如果线程之间能够互相通信甚至是“偷走”对方的任务,那么整个系统一定会变得非常复杂,我们先考虑一个简单的系统,即各个线程只关心自己手头上的任务。我们来分别看看系统中最重要的几个部分:

  • Executor/ThreadManager:线程的指挥官,它负责推动最上层提供的任务(那么任务是?),另一方面,他一定要能控制线程的handler,他决策的依据是什么呢,显然得有队列这样的数据结构来存储要做的任务就绪的任务这两种东西

  • Task:从之前我们讨论不难看出任务就是顶层Future

  • reactor:专门接听外部设备回信的收信员,有了前面的铺垫,它的存在极其合理

目前来看,这样就够了,Executor推动它所在的线程完成Future树的流转,工作到达叶子节点这一步需要真正干活的阶段时,叶子节点的poll方法中需要有:reactor实例来完成注册,另外需要一个方法来获取reactor的响应,很多人这个时候会想到回调——当收到消息了,让reactor调用一个函数来改变executor的就绪队列。但这样是不足的:假如当前线程发现任务队列中已经没有工作时,它就应该让当前线程休眠了,这意味着reactor要有该线程的handler的能力,除此之外还有任务的id以及就绪队列的共享引用(且要保证线程安全),显然它们可以被封装为一个数据结构,只要将前文提到的回调内化为这个结构的方法,一种通信数据结构就诞生了,由于它存在就是为了对工作线程进行唤醒的,我们不妨称他为waker吧。。


Waker

整个系统状态的切换依赖于上述模块的相互联动以及future之间的调用链条,不难发现,在执行线程这一侧的最小粒度是poll方法,并且waker和任务是一对一关联的,异步事件是否完成的职责现在落到了waker身上,另外,future的推进一定是从上至下的(状态机本体实质是顶部feature),这帮我们再次澄清了:存储任务的数据结构一定存储的是顶部feature。另一点很重要的结论是:waker是胶水 + poll链推动状态变换 => waker要通过poll来传递,到这一步,整个reactor-executor架构就清晰了。


自引用数据结构和pinning

代码案例

python

下列是一段获取用户的数据库模型(sqlmodel)实例的代码片段

python
db_url = os.getenv("DATABASE_URL", "sqlite:///./local.db")

async_engine = create_async_engine(
    db_url.replace('sqlite:///', 'sqlite+aiosqlite:///'),
    echo=False,
    connect_args={"check_same_thread": False}
)

from app.utils.auth import get_current_user
async with AsyncSession(async_engine) as session:
    user = await get_current_user(token, session)

知识在于积累