1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| use std::{sync::{mpsc::{Receiver, SyncSender, sync_channel}, Arc, Mutex}, task::Context, thread};
use futures::{future::BoxFuture, task::{waker_ref, ArcWake}, FutureExt}; use std::future::Future; use vec_future::VecFuture;
struct Executor<T> { ready_queue: Receiver<Arc<Task<T>>> }
#[derive(Clone)] struct Spawner<T> { task_sender: SyncSender<Arc<Task<T>>> }
struct Task<T> { future: Mutex<Option<BoxFuture<'static, T>>>, task_sender: SyncSender<Arc<Task<T>>>, }
fn new_executor_and_spawner<T>() -> (Executor<T>, Spawner<T>) { const MAX_QUEUE_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUE_TASKS); (Executor{ready_queue}, Spawner{task_sender}) }
impl<T> Spawner<T> { fn spawn(&self, future: impl Future<Output = T> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); println!("first dispatch the future task to executor."); self.task_sender.send(task).expect("too many tasks queued."); } }
impl<T> ArcWake for Task<T> { fn wake_by_ref(arc_self: &Arc<Self>) { let cloned = arc_self.clone(); arc_self .task_sender .send(cloned) .expect("too many tasks queued"); } }
impl<T> Executor<T> { fn run(&self) { let mut count = 0; while let Ok(task) = self.ready_queue.recv() { count = count + 1; println!("received task. {}", count); let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); if future.as_mut().poll(context).is_pending() { println!("executor run the future task, but is not ready, create a future again."); *future_slot = Some(future); } else { println!("executor run the future task, is ready. the future task is done."); } } } } }
fn main() { let (exec, spawner) = new_executor_and_spawner();
spawner.spawn(async { let a = VecFuture::new(String::from("hello world"), 10).await; println!("{:?}", a); });
drop(spawner); exec.run(); }
|