use core::{ future::Future, pin::Pin, sync::atomic::{AtomicU64, Ordering}, task::{Context, Poll, Waker}, }; use alloc::{ boxed::Box, collections::{BTreeMap, VecDeque}, sync::Arc, task::Wake, }; use crate::{sync::Mutex, syscall}; #[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] struct TaskId(u64); impl TaskId { pub fn new() -> TaskId { static NEXT_ID: AtomicU64 = AtomicU64::new(0); TaskId(NEXT_ID.fetch_add(1, Ordering::Relaxed)) } } pub struct Task { id: TaskId, future: Pin + Send>>, } impl Task { pub fn new(future: impl Future + Sync + Send + 'static) -> Task { Task { id: TaskId::new(), future: Box::pin(future), } } pub fn poll(&mut self, context: &mut Context) -> Poll<()> { self.future.as_mut().poll(context) } } struct TaskWaker { task_id: TaskId, task_queue: Arc>>, } impl TaskWaker { fn create_waker(task_id: TaskId, task_queue: Arc>>) -> Waker { Waker::from(Arc::new(TaskWaker { task_id, task_queue, })) } fn wake_task(&self) { self.task_queue.lock().push_back(self.task_id); } } impl Wake for TaskWaker { fn wake(self: Arc) { self.wake_task(); } fn wake_by_ref(self: &Arc) { self.wake_task(); } } #[derive(Default)] pub struct Executor { tasks: Arc>>, // TODO: Consider a better datastructure for this. task_queue: Arc>>, waker_cache: BTreeMap, } impl Executor { pub fn new() -> Executor { Executor::default() } pub fn spawn(&mut self, task: Task) { let task_id = task.id; if self.tasks.lock().insert(task_id, task).is_some() { panic!("Task is already existed in executor map"); } self.task_queue.lock().push_back(task_id); } fn run_ready_tasks(&mut self) { while let Some(task_id) = self.task_queue.lock().pop_front() { let mut tasks = self.tasks.lock(); let task = tasks.get_mut(&task_id).unwrap(); let waker = self .waker_cache .entry(task_id) .or_insert_with(|| TaskWaker::create_waker(task_id, self.task_queue.clone())); let mut ctx = Context::from_waker(waker); match task.poll(&mut ctx) { Poll::Ready(()) => { tasks.remove(&task_id); self.waker_cache.remove(&task_id); } Poll::Pending => {} }; } } pub fn run(&mut self) { loop { self.run_ready_tasks(); // TODO: We need some sort of semaphore wait here. syscall::thread_sleep(10).unwrap(); } } pub fn new_spawner(&self) -> Spawner { Spawner::new(self.tasks.clone(), self.task_queue.clone()) } } pub struct Spawner { tasks: Arc>>, task_queue: Arc>>, } impl Spawner { fn new( tasks: Arc>>, task_queue: Arc>>, ) -> Self { Spawner { tasks, task_queue } } pub fn spawn(&self, task: Task) { let task_id = task.id; if self.tasks.lock().insert(task_id, task).is_some() { panic!("Task is already existed in executor map"); } self.task_queue.lock().push_back(task_id); } }