Files
mini-redis/Tutorial/MiniTTokio.rs
2021-01-19 16:03:55 -03:00

111 lines
2.9 KiB
Rust

#![allow(non_snake_case)]
use futures::task;
use futures::task::ArcWake;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
fn main() {
let mut mini_tokio = MiniTokio::new();
mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
});
mini_tokio.run();
}
struct MiniTokio {
scheduled: channel::Receiver<Arc<Task>>,
sender: channel::Sender<Arc<Task>>,
}
struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `future` at any given time. The
// `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
executor: channel::Sender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}
impl MiniTokio {
/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = channel::unbounded();
MiniTokio { scheduled, sender }
}
/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}
}
impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);
// No other thread ever tries to lock the future
let mut future = self.future.try_lock().unwrap();
// Poll the future
let _ = future.as_mut().poll(&mut cx);
}
// Spawns a new taks with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
}