Rust Async

最近在看 rust 的异步内容,主要对如何推动 future 前进感到疑惑,在重温了一遍 asyncbook 第二章之后,有了一点点思路,大概就是 future 就是task,他背后有一双手推动他走,注意这双手不是 executor,一般都是 epoll、kqueue 等或者是另一个后台线程(or协程),他们在到达完成状态(数据到达、定时器到期等)时通过调用wake()来告知executor future 到达了完成状态。对于 epoll 等方式我们可以设置回调函数为 wake(),剩下的就是内核的任务了;对于后台线程就在业务逻辑执行完调用一下 wake()即可。wake()核心的作用就是将 future 放到一个就绪队列中,队列哪里来的呢?executor 提供的,exectuor 会不断轮询这个队列,一旦有数据就取出来,然后 poll 一下,很多人都说 poll 是推动 future 前进,但是我理解 poll 只是表面上看着是推动了,但实际上就是一个状态检查,真正推动的还是前面的 epoll 等。然后教程上说的 future 是惰性的,只有被 poll 才会运行,也要分情况,假如我在 new future 的时候启动了一个后台线程去做别的事情,其实 future 已经在被默默推动了,只不过要等 poll才知道它有没有完成。还有一点是只有 tokio::spawn 或者 thread::spawn 才等同于go func();

更新

异步Runtime,由两部分组成:

  • Executor
    • 这个前面说过了,就是 poll 一下 future
  • Reactor
    • 这个其实前面也讲了,但是没有一个官方定义,最近看资料才发现原来叫反应器,实际就是epoll啥的,轮询并唤醒挂载的事件,并执行对应的wake方法

最后贴段代码供自己以后复习使用

lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
use std::thread;
use std::sync::{Arc, Mutex};
use std::task::{Waker, Poll};
use std::future::Future;
use std::time::Duration;

pub struct VecFuture<T> {
data: Arc<Mutex<VecData<T>>>
}

struct VecData<T> {
v: Vec<T>,
waker: Option<Waker>
}

impl<T: std::marker::Send + 'static + Clone> VecFuture<T> {
pub fn new(data: T, delay: u32) -> Self {
let a = VecFuture { data: Arc::new(Mutex::new(VecData { v: Vec::new(), waker: None })) };
let b = a.data.clone();
thread::spawn(move || {
thread::sleep(Duration::from_secs(delay.into()));
let mut d = b.lock().unwrap();
d.v.push(data);
if let Some(waker) = d.waker.take() {
waker.wake();
}
});
a
}
}

impl<T: std::marker::Send + 'static + Clone> Future for VecFuture<T> {
type Output = Vec<T>;

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
let mut data = self.data.lock().unwrap();
if !data.v.is_empty() {
Poll::Ready(data.v.clone())
} else {
data.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}

main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::{sync::{mpsc::{Receiver, SyncSender, sync_channel}, Arc, Mutex}, task::Context, thread};

use futures::{future::BoxFuture, task::{waker_ref, ArcWake}, FutureExt};
use std::future::Future;
use vec_future::VecFuture;

struct Executor<T> {
ready_queue: Receiver<Arc<Task<T>>>
}

#[derive(Clone)]
struct Spawner<T> {
task_sender: SyncSender<Arc<Task<T>>>
}

struct Task<T> {
future: Mutex<Option<BoxFuture<'static, T>>>,
task_sender: SyncSender<Arc<Task<T>>>,
}

fn new_executor_and_spawner<T>() -> (Executor<T>, Spawner<T>) {
const MAX_QUEUE_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUE_TASKS);
(Executor{ready_queue}, Spawner{task_sender})
}

impl<T> Spawner<T> {
fn spawn(&self, future: impl Future<Output = T> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
println!("first dispatch the future task to executor.");
self.task_sender.send(task).expect("too many tasks queued.");
}
}

/// 实现ArcWake,表明怎么去唤醒任务去调度执行。
impl<T> ArcWake for Task<T> {
fn wake_by_ref(arc_self: &Arc<Self>) {
// 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll`
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks queued");
}
}

impl<T> Executor<T> {
// 实际运行具体的Future任务,不断的接收Future task执行。
fn run(&self) {
let mut count = 0;
while let Ok(task) = self.ready_queue.recv() {
count = count + 1;
println!("received task. {}", count);
// 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 基于任务自身创建一个 `LocalWaker`
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
// 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
if future.as_mut().poll(context).is_pending() {
println!("executor run the future task, but is not ready, create a future again.");
// Future还没执行完,因此将它放回任务中,等待下次被poll
*future_slot = Some(future);
} else {
println!("executor run the future task, is ready. the future task is done.");
}
}
}
}
}

fn main() {
let (exec, spawner) = new_executor_and_spawner();

spawner.spawn(async {
let a = VecFuture::new(String::from("hello world"), 10).await;
println!("{:?}", a);
});

drop(spawner);
exec.run();
}