Implement a shared state between tasks

This commit is contained in:
GuilhermeWerner
2021-01-19 12:24:50 -03:00
parent 751a8d2dfb
commit 81f8aed54d
4 changed files with 27 additions and 12 deletions

View File

@ -19,6 +19,7 @@ path = "Source/Server.rs"
[dependencies] [dependencies]
tokio = { version = "1.0.2", features = ["full"] } tokio = { version = "1.0.2", features = ["full"] }
mini-redis = "0.4" mini-redis = "0.4"
bytes = "1.0.1"
[[example]] [[example]]
name="Hello" name="Hello"

View File

@ -1,3 +1,5 @@
#![allow(non_snake_case)]
use mini_redis::{client, Result}; use mini_redis::{client, Result};
#[tokio::main] #[tokio::main]

View File

@ -1 +1,3 @@
#![allow(non_snake_case)]
fn main() {} fn main() {}

View File

@ -1,29 +1,41 @@
#![allow(non_snake_case)]
use bytes::Bytes;
use mini_redis::{Connection, Frame}; use mini_redis::{Connection, Frame};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
type DataBase = Arc<Mutex<HashMap<String, Bytes>>>;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// Bind the listener to the address // Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listen {}", 6379);
let db = Arc::new(Mutex::new(HashMap::new()));
loop { loop {
// The second item contains the IP and port of the new connection. // The second item contains the IP and port of the new connection.
let (socket, _) = listener.accept().await.unwrap(); let (socket, _) = listener.accept().await.unwrap();
// Clone the handle to the hash map.
let db = db.clone();
println!("Fetched");
// A new task is spawned for each inbound socket. The socket is // A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there. // moved to the new task and processed there.
tokio::spawn(async move { tokio::spawn(async move {
process(socket).await; process(socket, db).await;
}); });
} }
} }
async fn process(socket: TcpStream) { async fn process(socket: TcpStream, db: DataBase) {
use mini_redis::Command::{self, Get, Set}; use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// A hashmap is used to store data
let mut db = HashMap::new();
// Connection, provided by `mini-redis`, handles parsing frames from // Connection, provided by `mini-redis`, handles parsing frames from
// the socket // the socket
@ -33,16 +45,14 @@ async fn process(socket: TcpStream) {
while let Some(frame) = connection.read_frame().await.unwrap() { while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() { let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => { Set(cmd) => {
// The value is stored as `Vec<u8>` let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().to_vec()); db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string()) Frame::Simple("OK".to_string())
} }
Get(cmd) => { Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) { if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` expects data to be of type `Bytes`. This Frame::Bulk(value.clone())
// type will be covered later in the tutorial. For now,
// `&Vec<u8>` is converted to `Bytes` using `into()`.
Frame::Bulk(value.clone().into())
} else { } else {
Frame::Null Frame::Null
} }