Implement Some Tokio Tutorials

This commit is contained in:
GuilhermeWerner
2021-01-19 16:03:55 -03:00
parent 2e7b35e32d
commit 055be51002
12 changed files with 415 additions and 0 deletions

View File

@ -20,7 +20,49 @@ path = "Source/Server.rs"
tokio = { version = "1.0.2", features = ["full"] }
mini-redis = "0.4"
bytes = "1.0.1"
crossbeam = "0.7"
futures = "0.3"
[[example]]
name="Hello"
path = "Examples/Hello.rs"
[[example]]
name="ReadPart"
path = "Tutorial/ReadPart.rs"
[[example]]
name="ReadFile"
path = "Tutorial/ReadFile.rs"
[[example]]
name="WritePart"
path = "Tutorial/WritePart.rs"
[[example]]
name="WriteFile"
path = "Tutorial/WriteFile.rs"
[[example]]
name="CreateFile"
path = "Tutorial/CreateFile.rs"
[[example]]
name="Echo"
path = "Tutorial/Echo.rs"
[[example]]
name="EchoCopy"
path = "Tutorial/EchoCopy.rs"
[[example]]
name="Future"
path = "Tutorial/Future.rs"
[[example]]
name="MiniTokio"
path = "Tutorial/MiniTokio.rs"
[[example]]
name="Waker"
path = "Tutorial/Waker.rs"

14
Tutorial/CreateFile.rs Normal file
View File

@ -0,0 +1,14 @@
#![allow(non_snake_case)]
use tokio::fs::File;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("File.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}

24
Tutorial/Delay.rs Normal file
View File

@ -0,0 +1,24 @@
#![allow(non_snake_case)]
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::Notify;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify2.notify_one();
});
notify.notified().await;
}

38
Tutorial/Echo.rs Normal file
View File

@ -0,0 +1,38 @@
#![allow(non_snake_case)]
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
// Return value of `Ok(0)` signifies that the remote has
// closed
Ok(0) => return,
Ok(n) => {
// Copy the data back to socket
if socket.write_all(&buf[..n]).await.is_err() {
// Unexpected socket error. There isn't much we can
// do here so just stop processing.
return;
}
}
Err(_) => {
// Unexpected socket error. There isn't much we can do
// here so just stop processing.
return;
}
}
}
});
}
}

21
Tutorial/EchoCopy.rs Normal file
View File

@ -0,0 +1,21 @@
#![allow(non_snake_case)]
use tokio::io;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();
if io::copy(&mut rd, &mut wr).await.is_err() {
eprintln!("failed to copy");
}
});
}
}

55
Tutorial/Future.rs Normal file
View File

@ -0,0 +1,55 @@
#![allow(non_snake_case)]
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> {
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });
poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);
assert!(res.is_pending());
tokio::spawn(async move {
delay.await;
});
Poll::Ready(())
})
.await;
/*
let when = Instant::now() + Duration::from_millis(1000);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });
*/
}

110
Tutorial/MiniTTokio.rs Normal file
View File

@ -0,0 +1,110 @@
#![allow(non_snake_case)]
use futures::task;
use futures::task::ArcWake;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
fn main() {
let mut mini_tokio = MiniTokio::new();
mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
});
mini_tokio.run();
}
struct MiniTokio {
scheduled: channel::Receiver<Arc<Task>>,
sender: channel::Sender<Arc<Task>>,
}
struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `future` at any given time. The
// `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
executor: channel::Sender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}
impl MiniTokio {
/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = channel::unbounded();
MiniTokio { scheduled, sender }
}
/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}
}
impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);
// No other thread ever tries to lock the future
let mut future = self.future.try_lock().unwrap();
// Poll the future
let _ = future.as_mut().poll(&mut cx);
}
// Spawns a new taks with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
}

17
Tutorial/ReadFile.rs Normal file
View File

@ -0,0 +1,17 @@
#![allow(non_snake_case)]
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut buffer = Vec::new();
let mut file = File::open("README.md").await?;
// read the whole file
file.read_to_end(&mut buffer).await?;
println!("The bytes: {:?}", &buffer);
Ok(())
}

17
Tutorial/ReadPart.rs Normal file
View File

@ -0,0 +1,17 @@
#![allow(non_snake_case)]
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut buffer = [1; 10];
let mut file = File::open("README.md").await?;
// read up to 10 bytes
let n = file.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}

48
Tutorial/Waker.rs Normal file
View File

@ -0,0 +1,48 @@
#![allow(non_snake_case)]
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> {
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Get a handle to the waker for the current task
let waker = cx.waker().clone();
let when = self.when;
// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(100);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
}

13
Tutorial/WriteFile.rs Normal file
View File

@ -0,0 +1,13 @@
#![allow(non_snake_case)]
use tokio::fs::File;
use tokio::io::{self, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut buffer = File::create("File.txt").await?;
buffer.write_all(b"some bytes").await?;
Ok(())
}

16
Tutorial/WritePart.rs Normal file
View File

@ -0,0 +1,16 @@
#![allow(non_snake_case)]
use tokio::fs::File;
use tokio::io::{self, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("File.txt").await?;
// Writes some prefix of the byte string, but not necessarily all of it.
let n = file.write(b"some bytes").await?;
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}