Files
mini-redis/Source/Core/Connection.rs
2021-01-19 16:03:02 -03:00

119 lines
3.9 KiB
Rust

use bytes::{Buf, BytesMut};
use mini_redis::frame::Error::Incomplete;
use mini_redis::{Frame, Result};
use std::io::Cursor;
use tokio::io::{self, AsyncReadExt, BufWriter};
use tokio::net::TcpStream;
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
// Allocate the buffer with 4kb of capacity.
buffer: BytesMut::with_capacity(4096),
}
}
/// Read a frame from the connection.
///
/// Returns `None` if EOF is reached
pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
loop {
// Attempt to parse a frame from the buffered data. If
// enough data has been buffered, the frame is
// returned.
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// There is not enough buffered data to read a frame.
// Attempt to read more data from the socket.
//
// On success, the number of bytes is returned. `0`
// indicates "end of stream".
if 0 == self.stream.read_buf(&mut self.buffer).await? {
// The remote closed the connection. For this to be
// a clean shutdown, there should be no data in the
// read buffer. If there is, this means that the
// peer closed the socket while sending a frame.
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}
/// Write a frame to the connection.
async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> {
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();
self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Array(_val) => unimplemented!(),
}
self.stream.flush().await;
Ok(())
}
fn parse_frame(&mut self) -> Result<Option<Frame>> {
// Create the `T: Buf` type.
let mut buf = Cursor::new(&self.buffer[..]);
// Check whether a full frame is available
match Frame::check(&mut buf) {
Ok(_) => {
// Get the byte length of the frame
let len = buf.position() as usize;
// Reset the internal cursor for the
// call to `parse`.
buf.set_position(0);
// Parse the frame
let frame = Frame::parse(&mut buf)?;
// Discard the frame from the buffer
self.buffer.advance(len);
// Return the frame to the caller.
Ok(Some(frame))
}
// Not enough data has been buffered
Err(Incomplete) => Ok(None),
// An error was encountered
Err(e) => Err(e.into()),
}
}
}