diff --git a/Cargo.toml b/Cargo.toml index 1d33fa5..6e15ce3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,49 @@ path = "Source/Server.rs" tokio = { version = "1.0.2", features = ["full"] } mini-redis = "0.4" bytes = "1.0.1" +crossbeam = "0.7" +futures = "0.3" [[example]] name="Hello" path = "Examples/Hello.rs" + +[[example]] +name="ReadPart" +path = "Tutorial/ReadPart.rs" + +[[example]] +name="ReadFile" +path = "Tutorial/ReadFile.rs" + +[[example]] +name="WritePart" +path = "Tutorial/WritePart.rs" + +[[example]] +name="WriteFile" +path = "Tutorial/WriteFile.rs" + +[[example]] +name="CreateFile" +path = "Tutorial/CreateFile.rs" + +[[example]] +name="Echo" +path = "Tutorial/Echo.rs" + +[[example]] +name="EchoCopy" +path = "Tutorial/EchoCopy.rs" + +[[example]] +name="Future" +path = "Tutorial/Future.rs" + +[[example]] +name="MiniTokio" +path = "Tutorial/MiniTokio.rs" + +[[example]] +name="Waker" +path = "Tutorial/Waker.rs" diff --git a/Tutorial/CreateFile.rs b/Tutorial/CreateFile.rs new file mode 100644 index 0000000..24d36ae --- /dev/null +++ b/Tutorial/CreateFile.rs @@ -0,0 +1,14 @@ +#![allow(non_snake_case)] + +use tokio::fs::File; +use tokio::io; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut reader: &[u8] = b"hello"; + let mut file = File::create("File.txt").await?; + + io::copy(&mut reader, &mut file).await?; + + Ok(()) +} diff --git a/Tutorial/Delay.rs b/Tutorial/Delay.rs new file mode 100644 index 0000000..491cbf0 --- /dev/null +++ b/Tutorial/Delay.rs @@ -0,0 +1,24 @@ +#![allow(non_snake_case)] + +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; +use tokio::sync::Notify; + +async fn delay(dur: Duration) { + let when = Instant::now() + dur; + let notify = Arc::new(Notify::new()); + let notify2 = notify.clone(); + + thread::spawn(move || { + let now = Instant::now(); + + if now < when { + thread::sleep(when - now); + } + + notify2.notify_one(); + }); + + notify.notified().await; +} diff --git a/Tutorial/Echo.rs b/Tutorial/Echo.rs new file mode 100644 index 0000000..afbee36 --- /dev/null +++ b/Tutorial/Echo.rs @@ -0,0 +1,38 @@ +#![allow(non_snake_case)] + +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap(); + + loop { + let (mut socket, _) = listener.accept().await?; + + tokio::spawn(async move { + let mut buf = vec![0; 1024]; + + loop { + match socket.read(&mut buf).await { + // Return value of `Ok(0)` signifies that the remote has + // closed + Ok(0) => return, + Ok(n) => { + // Copy the data back to socket + if socket.write_all(&buf[..n]).await.is_err() { + // Unexpected socket error. There isn't much we can + // do here so just stop processing. + return; + } + } + Err(_) => { + // Unexpected socket error. There isn't much we can do + // here so just stop processing. + return; + } + } + } + }); + } +} diff --git a/Tutorial/EchoCopy.rs b/Tutorial/EchoCopy.rs new file mode 100644 index 0000000..06d0f2c --- /dev/null +++ b/Tutorial/EchoCopy.rs @@ -0,0 +1,21 @@ +#![allow(non_snake_case)] + +use tokio::io; +use tokio::net::TcpListener; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap(); + + loop { + let (mut socket, _) = listener.accept().await?; + + tokio::spawn(async move { + let (mut rd, mut wr) = socket.split(); + + if io::copy(&mut rd, &mut wr).await.is_err() { + eprintln!("failed to copy"); + } + }); + } +} diff --git a/Tutorial/Future.rs b/Tutorial/Future.rs new file mode 100644 index 0000000..05df8d0 --- /dev/null +++ b/Tutorial/Future.rs @@ -0,0 +1,55 @@ +#![allow(non_snake_case)] + +use futures::future::poll_fn; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +struct Delay { + when: Instant, +} + +impl Future for Delay { + type Output = &'static str; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { + if Instant::now() >= self.when { + println!("Hello world"); + Poll::Ready("done") + } else { + // Ignore this line for now. + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +#[tokio::main] +async fn main() { + let when = Instant::now() + Duration::from_millis(10); + let mut delay = Some(Delay { when }); + + poll_fn(move |cx| { + let mut delay = delay.take().unwrap(); + let res = Pin::new(&mut delay).poll(cx); + assert!(res.is_pending()); + tokio::spawn(async move { + delay.await; + }); + + Poll::Ready(()) + }) + .await; + + /* + let when = Instant::now() + Duration::from_millis(1000); + let future = Delay { when }; + + let out = future.await; + assert_eq!(out, "done"); + + let when = Instant::now() + Duration::from_millis(10); + let mut delay = Some(Delay { when }); + */ +} diff --git a/Tutorial/MiniTTokio.rs b/Tutorial/MiniTTokio.rs new file mode 100644 index 0000000..dc1aa6c --- /dev/null +++ b/Tutorial/MiniTTokio.rs @@ -0,0 +1,110 @@ +#![allow(non_snake_case)] + +use futures::task; +use futures::task::ArcWake; +use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +fn main() { + let mut mini_tokio = MiniTokio::new(); + + mini_tokio.spawn(async { + let when = Instant::now() + Duration::from_millis(10); + let future = Delay { when }; + + let out = future.await; + assert_eq!(out, "done"); + }); + + mini_tokio.run(); +} + +struct MiniTokio { + scheduled: channel::Receiver>, + sender: channel::Sender>, +} + +struct Task { + // The `Mutex` is to make `Task` implement `Sync`. Only + // one thread accesses `future` at any given time. The + // `Mutex` is not required for correctness. Real Tokio + // does not use a mutex here, but real Tokio has + // more lines of code than can fit in a single tutorial + // page. + future: Mutex + Send>>>, + executor: channel::Sender>, +} + +impl Task { + fn schedule(self: &Arc) { + self.executor.send(self.clone()); + } +} + +impl ArcWake for Task { + fn wake_by_ref(arc_self: &Arc) { + arc_self.schedule(); + } +} + +impl MiniTokio { + /// Initialize a new mini-tokio instance. + fn new() -> MiniTokio { + let (sender, scheduled) = channel::unbounded(); + + MiniTokio { scheduled, sender } + } + + /// Spawn a future onto the mini-tokio instance. + /// + /// The given future is wrapped with the `Task` harness and pushed into the + /// `scheduled` queue. The future will be executed when `run` is called. + fn spawn(&self, future: F) + where + F: Future + Send + 'static, + { + Task::spawn(future, &self.sender); + } + + fn run(&self) { + while let Ok(task) = self.scheduled.recv() { + task.poll(); + } + } +} + +impl Task { + fn poll(self: Arc) { + // Create a waker from the `Task` instance. This + // uses the `ArcWake` impl from above. + let waker = task::waker(self.clone()); + let mut cx = Context::from_waker(&waker); + + // No other thread ever tries to lock the future + let mut future = self.future.try_lock().unwrap(); + + // Poll the future + let _ = future.as_mut().poll(&mut cx); + } + + // Spawns a new taks with the given future. + // + // Initializes a new Task harness containing the given future and pushes it + // onto `sender`. The receiver half of the channel will get the task and + // execute it. + fn spawn(future: F, sender: &channel::Sender>) + where + F: Future + Send + 'static, + { + let task = Arc::new(Task { + future: Mutex::new(Box::pin(future)), + executor: sender.clone(), + }); + + let _ = sender.send(task); + } +} diff --git a/Tutorial/ReadFile.rs b/Tutorial/ReadFile.rs new file mode 100644 index 0000000..7a0506e --- /dev/null +++ b/Tutorial/ReadFile.rs @@ -0,0 +1,17 @@ +#![allow(non_snake_case)] + +use tokio::fs::File; +use tokio::io::{self, AsyncReadExt}; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut buffer = Vec::new(); + let mut file = File::open("README.md").await?; + + // read the whole file + file.read_to_end(&mut buffer).await?; + + println!("The bytes: {:?}", &buffer); + + Ok(()) +} diff --git a/Tutorial/ReadPart.rs b/Tutorial/ReadPart.rs new file mode 100644 index 0000000..9fe4ef3 --- /dev/null +++ b/Tutorial/ReadPart.rs @@ -0,0 +1,17 @@ +#![allow(non_snake_case)] + +use tokio::fs::File; +use tokio::io::{self, AsyncReadExt}; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut buffer = [1; 10]; + let mut file = File::open("README.md").await?; + + // read up to 10 bytes + let n = file.read(&mut buffer[..]).await?; + + println!("The bytes: {:?}", &buffer[..n]); + + Ok(()) +} diff --git a/Tutorial/Waker.rs b/Tutorial/Waker.rs new file mode 100644 index 0000000..d1bad5f --- /dev/null +++ b/Tutorial/Waker.rs @@ -0,0 +1,48 @@ +#![allow(non_snake_case)] + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::thread; +use std::time::{Duration, Instant}; + +struct Delay { + when: Instant, +} + +impl Future for Delay { + type Output = &'static str; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { + if Instant::now() >= self.when { + println!("Hello world"); + Poll::Ready("done") + } else { + // Get a handle to the waker for the current task + let waker = cx.waker().clone(); + let when = self.when; + + // Spawn a timer thread. + thread::spawn(move || { + let now = Instant::now(); + + if now < when { + thread::sleep(when - now); + } + + waker.wake(); + }); + + Poll::Pending + } + } +} + +#[tokio::main] +async fn main() { + let when = Instant::now() + Duration::from_millis(100); + let future = Delay { when }; + + let out = future.await; + assert_eq!(out, "done"); +} diff --git a/Tutorial/WriteFile.rs b/Tutorial/WriteFile.rs new file mode 100644 index 0000000..885b280 --- /dev/null +++ b/Tutorial/WriteFile.rs @@ -0,0 +1,13 @@ +#![allow(non_snake_case)] + +use tokio::fs::File; +use tokio::io::{self, AsyncWriteExt}; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut buffer = File::create("File.txt").await?; + + buffer.write_all(b"some bytes").await?; + + Ok(()) +} diff --git a/Tutorial/WritePart.rs b/Tutorial/WritePart.rs new file mode 100644 index 0000000..1121994 --- /dev/null +++ b/Tutorial/WritePart.rs @@ -0,0 +1,16 @@ +#![allow(non_snake_case)] + +use tokio::fs::File; +use tokio::io::{self, AsyncWriteExt}; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut file = File::create("File.txt").await?; + + // Writes some prefix of the byte string, but not necessarily all of it. + let n = file.write(b"some bytes").await?; + + println!("Wrote the first {} bytes of 'some bytes'.", n); + + Ok(()) +}