From 2e7b35e32d304651253f610a04a9cfc8fd2272cd Mon Sep 17 00:00:00 2001 From: GuilhermeWerner Date: Tue, 19 Jan 2021 16:03:02 -0300 Subject: [PATCH] Initial Implementation on Connections and Frames --- Source/Client.rs | 93 +++++++++++++++++++++++++++++- Source/Core/Connection.rs | 118 ++++++++++++++++++++++++++++++++++++++ Source/Core/Frame.rs | 10 ++++ 3 files changed, 220 insertions(+), 1 deletion(-) create mode 100644 Source/Core/Connection.rs create mode 100644 Source/Core/Frame.rs diff --git a/Source/Client.rs b/Source/Client.rs index 87a8fb3..21abfc3 100644 --- a/Source/Client.rs +++ b/Source/Client.rs @@ -1,3 +1,94 @@ #![allow(non_snake_case)] -fn main() {} +use bytes::Bytes; +use mini_redis::client; +use tokio::sync::{mpsc, oneshot}; + +/// Multiple different commands are multiplexed over a single channel. +#[derive(Debug)] +enum Command { + Get { + key: String, + resp: Responder>, + }, + Set { + key: String, + val: Vec, + resp: Responder<()>, + }, +} + +/// Provided by the requester and used by the manager task to send the command +/// response back to the requester. +type Responder = oneshot::Sender>; + +#[tokio::main] +async fn main() { + let (tx, mut rx) = mpsc::channel(32); + + // Clone a `tx` handle for the second f + let tx2 = tx.clone(); + + let manager = tokio::spawn(async move { + // Open a connection to the mini-redis address. + let mut client = client::connect("127.0.0.1:6379").await.unwrap(); + + while let Some(cmd) = rx.recv().await { + match cmd { + Command::Get { key, resp } => { + let res = client.get(&key).await; + // Ignore errors + let _ = resp.send(res); + } + Command::Set { key, val, resp } => { + let res = client.set(&key, val.into()).await; + // Ignore errors + let _ = resp.send(res); + } + } + } + }); + + // Spawn two tasks, each setting a value + let t1 = tokio::spawn(async move { + let (resp_tx, resp_rx) = oneshot::channel(); + let cmd = Command::Get { + key: "hello".to_string(), + resp: resp_tx, + }; + + // Send the GET request + if tx.send(cmd).await.is_err() { + eprintln!("connection task shutdown"); + return; + } + + // Await the response + let res = resp_rx.await; + println!("GOT = {:?}", res); + }); + + let t2 = tokio::spawn(async move { + let (resp_tx, resp_rx) = oneshot::channel(); + let cmd = Command::Set { + key: "foo".to_string(), + val: b"bar".to_vec(), + resp: resp_tx, + }; + + // Send the SET request + if tx2.send(cmd).await.is_err() { + eprintln!("connection task shutdown"); + return; + } + + // Await the response + let res = resp_rx.await; + println!("GOT = {:?}", res) + }); + + t1.await.unwrap(); + t2.await.unwrap(); + + manager.await.unwrap(); +} diff --git a/Source/Core/Connection.rs b/Source/Core/Connection.rs new file mode 100644 index 0000000..b6f26dd --- /dev/null +++ b/Source/Core/Connection.rs @@ -0,0 +1,118 @@ +use bytes::{Buf, BytesMut}; +use mini_redis::frame::Error::Incomplete; +use mini_redis::{Frame, Result}; +use std::io::Cursor; +use tokio::io::{self, AsyncReadExt, BufWriter}; +use tokio::net::TcpStream; + +pub struct Connection { + stream: BufWriter, + buffer: BytesMut, +} + +impl Connection { + pub fn new(stream: TcpStream) -> Connection { + Connection { + stream: BufWriter::new(stream), + // Allocate the buffer with 4kb of capacity. + buffer: BytesMut::with_capacity(4096), + } + } + + /// Read a frame from the connection. + /// + /// Returns `None` if EOF is reached + pub async fn read_frame(&mut self) -> Result> { + loop { + // Attempt to parse a frame from the buffered data. If + // enough data has been buffered, the frame is + // returned. + if let Some(frame) = self.parse_frame()? { + return Ok(Some(frame)); + } + + // There is not enough buffered data to read a frame. + // Attempt to read more data from the socket. + // + // On success, the number of bytes is returned. `0` + // indicates "end of stream". + if 0 == self.stream.read_buf(&mut self.buffer).await? { + // The remote closed the connection. For this to be + // a clean shutdown, there should be no data in the + // read buffer. If there is, this means that the + // peer closed the socket while sending a frame. + if self.buffer.is_empty() { + return Ok(None); + } else { + return Err("connection reset by peer".into()); + } + } + } + } + + /// Write a frame to the connection. + async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> { + match frame { + Frame::Simple(val) => { + self.stream.write_u8(b'+').await?; + self.stream.write_all(val.as_bytes()).await?; + self.stream.write_all(b"\r\n").await?; + } + Frame::Error(val) => { + self.stream.write_u8(b'-').await?; + self.stream.write_all(val.as_bytes()).await?; + self.stream.write_all(b"\r\n").await?; + } + Frame::Integer(val) => { + self.stream.write_u8(b':').await?; + self.write_decimal(*val).await?; + } + Frame::Null => { + self.stream.write_all(b"$-1\r\n").await?; + } + Frame::Bulk(val) => { + let len = val.len(); + + self.stream.write_u8(b'$').await?; + self.write_decimal(len as u64).await?; + self.stream.write_all(val).await?; + self.stream.write_all(b"\r\n").await?; + } + Frame::Array(_val) => unimplemented!(), + } + + self.stream.flush().await; + + Ok(()) + } + + fn parse_frame(&mut self) -> Result> { + // Create the `T: Buf` type. + let mut buf = Cursor::new(&self.buffer[..]); + + // Check whether a full frame is available + match Frame::check(&mut buf) { + Ok(_) => { + // Get the byte length of the frame + let len = buf.position() as usize; + + // Reset the internal cursor for the + // call to `parse`. + buf.set_position(0); + + // Parse the frame + let frame = Frame::parse(&mut buf)?; + + // Discard the frame from the buffer + self.buffer.advance(len); + + // Return the frame to the caller. + Ok(Some(frame)) + } + // Not enough data has been buffered + Err(Incomplete) => Ok(None), + // An error was encountered + Err(e) => Err(e.into()), + } + } +} diff --git a/Source/Core/Frame.rs b/Source/Core/Frame.rs new file mode 100644 index 0000000..6e3c203 --- /dev/null +++ b/Source/Core/Frame.rs @@ -0,0 +1,10 @@ +use bytes::Bytes; + +enum Frame { + Simple(String), + Error(String), + Integer(u64), + Bulk(Bytes), + Null, + Array(Vec), +}