mirror of
https://github.com/guilhermewerner/mini-redis
synced 2025-06-15 14:35:13 +00:00
111 lines
2.9 KiB
Rust
111 lines
2.9 KiB
Rust
#![allow(non_snake_case)]
|
|
|
|
use futures::task;
|
|
use futures::task::ArcWake;
|
|
use std::collections::VecDeque;
|
|
use std::future::Future;
|
|
use std::pin::Pin;
|
|
use std::sync::{Arc, Mutex};
|
|
use std::task::{Context, Poll};
|
|
use std::time::{Duration, Instant};
|
|
|
|
fn main() {
|
|
let mut mini_tokio = MiniTokio::new();
|
|
|
|
mini_tokio.spawn(async {
|
|
let when = Instant::now() + Duration::from_millis(10);
|
|
let future = Delay { when };
|
|
|
|
let out = future.await;
|
|
assert_eq!(out, "done");
|
|
});
|
|
|
|
mini_tokio.run();
|
|
}
|
|
|
|
struct MiniTokio {
|
|
scheduled: channel::Receiver<Arc<Task>>,
|
|
sender: channel::Sender<Arc<Task>>,
|
|
}
|
|
|
|
struct Task {
|
|
// The `Mutex` is to make `Task` implement `Sync`. Only
|
|
// one thread accesses `future` at any given time. The
|
|
// `Mutex` is not required for correctness. Real Tokio
|
|
// does not use a mutex here, but real Tokio has
|
|
// more lines of code than can fit in a single tutorial
|
|
// page.
|
|
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
|
|
executor: channel::Sender<Arc<Task>>,
|
|
}
|
|
|
|
impl Task {
|
|
fn schedule(self: &Arc<Self>) {
|
|
self.executor.send(self.clone());
|
|
}
|
|
}
|
|
|
|
impl ArcWake for Task {
|
|
fn wake_by_ref(arc_self: &Arc<Self>) {
|
|
arc_self.schedule();
|
|
}
|
|
}
|
|
|
|
impl MiniTokio {
|
|
/// Initialize a new mini-tokio instance.
|
|
fn new() -> MiniTokio {
|
|
let (sender, scheduled) = channel::unbounded();
|
|
|
|
MiniTokio { scheduled, sender }
|
|
}
|
|
|
|
/// Spawn a future onto the mini-tokio instance.
|
|
///
|
|
/// The given future is wrapped with the `Task` harness and pushed into the
|
|
/// `scheduled` queue. The future will be executed when `run` is called.
|
|
fn spawn<F>(&self, future: F)
|
|
where
|
|
F: Future<Output = ()> + Send + 'static,
|
|
{
|
|
Task::spawn(future, &self.sender);
|
|
}
|
|
|
|
fn run(&self) {
|
|
while let Ok(task) = self.scheduled.recv() {
|
|
task.poll();
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Task {
|
|
fn poll(self: Arc<Self>) {
|
|
// Create a waker from the `Task` instance. This
|
|
// uses the `ArcWake` impl from above.
|
|
let waker = task::waker(self.clone());
|
|
let mut cx = Context::from_waker(&waker);
|
|
|
|
// No other thread ever tries to lock the future
|
|
let mut future = self.future.try_lock().unwrap();
|
|
|
|
// Poll the future
|
|
let _ = future.as_mut().poll(&mut cx);
|
|
}
|
|
|
|
// Spawns a new taks with the given future.
|
|
//
|
|
// Initializes a new Task harness containing the given future and pushes it
|
|
// onto `sender`. The receiver half of the channel will get the task and
|
|
// execute it.
|
|
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
|
|
where
|
|
F: Future<Output = ()> + Send + 'static,
|
|
{
|
|
let task = Arc::new(Task {
|
|
future: Mutex::new(Box::pin(future)),
|
|
executor: sender.clone(),
|
|
});
|
|
|
|
let _ = sender.send(task);
|
|
}
|
|
}
|