From 81f8aed54d033ca4fe1b5b88ca20f4265bb3649c Mon Sep 17 00:00:00 2001 From: GuilhermeWerner Date: Tue, 19 Jan 2021 12:24:50 -0300 Subject: [PATCH] Implement a shared state between tasks --- Cargo.toml | 1 + Examples/Hello.rs | 2 ++ Source/Client.rs | 2 ++ Source/Server.rs | 34 ++++++++++++++++++++++------------ 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a699de2..1d33fa5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ path = "Source/Server.rs" [dependencies] tokio = { version = "1.0.2", features = ["full"] } mini-redis = "0.4" +bytes = "1.0.1" [[example]] name="Hello" diff --git a/Examples/Hello.rs b/Examples/Hello.rs index 3fa16ce..e17a73a 100644 --- a/Examples/Hello.rs +++ b/Examples/Hello.rs @@ -1,3 +1,5 @@ +#![allow(non_snake_case)] + use mini_redis::{client, Result}; #[tokio::main] diff --git a/Source/Client.rs b/Source/Client.rs index f328e4d..87a8fb3 100644 --- a/Source/Client.rs +++ b/Source/Client.rs @@ -1 +1,3 @@ +#![allow(non_snake_case)] + fn main() {} diff --git a/Source/Server.rs b/Source/Server.rs index d9c7d83..5867e41 100644 --- a/Source/Server.rs +++ b/Source/Server.rs @@ -1,29 +1,41 @@ +#![allow(non_snake_case)] + +use bytes::Bytes; use mini_redis::{Connection, Frame}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use tokio::net::{TcpListener, TcpStream}; +type DataBase = Arc>>; + #[tokio::main] async fn main() { // Bind the listener to the address let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); + println!("Listen {}", 6379); + + let db = Arc::new(Mutex::new(HashMap::new())); + loop { // The second item contains the IP and port of the new connection. let (socket, _) = listener.accept().await.unwrap(); + // Clone the handle to the hash map. + let db = db.clone(); + + println!("Fetched"); + // A new task is spawned for each inbound socket. The socket is // moved to the new task and processed there. tokio::spawn(async move { - process(socket).await; + process(socket, db).await; }); } } -async fn process(socket: TcpStream) { +async fn process(socket: TcpStream, db: DataBase) { use mini_redis::Command::{self, Get, Set}; - use std::collections::HashMap; - - // A hashmap is used to store data - let mut db = HashMap::new(); // Connection, provided by `mini-redis`, handles parsing frames from // the socket @@ -33,16 +45,14 @@ async fn process(socket: TcpStream) { while let Some(frame) = connection.read_frame().await.unwrap() { let response = match Command::from_frame(frame).unwrap() { Set(cmd) => { - // The value is stored as `Vec` - db.insert(cmd.key().to_string(), cmd.value().to_vec()); + let mut db = db.lock().unwrap(); + db.insert(cmd.key().to_string(), cmd.value().clone()); Frame::Simple("OK".to_string()) } Get(cmd) => { + let db = db.lock().unwrap(); if let Some(value) = db.get(cmd.key()) { - // `Frame::Bulk` expects data to be of type `Bytes`. This - // type will be covered later in the tutorial. For now, - // `&Vec` is converted to `Bytes` using `into()`. - Frame::Bulk(value.clone().into()) + Frame::Bulk(value.clone()) } else { Frame::Null }