From c3281be4195c31ed1f09cfa57ed517c7a98ca535 Mon Sep 17 00:00:00 2001 From: Tom <25043847+Douile@users.noreply.github.com> Date: Mon, 25 Sep 2023 19:12:54 +0000 Subject: [PATCH] [Protocol] Retry failed requests (#95) * Add retry count to TimeoutSettings This can be used to specify how many times to re-send requests that fail. The default value is "1" so the if the first request fails, 1 more attempt will be made. * Add retries to valve queries * [Protocol] &Optional add get_retries_or_default Allow fetching the number of retries or the default retries value from a borrowed optional TimeoutSettings. * [Protocol] Add retries to minecraft protocol * [Protocol] Add retries to quake * [Protocol] Add retries to gamespy * [Protocol] Update TimeoutSettings docs, and change default retries to 0 * Remove logging from retry_on_timeout * [Protocol] TimeoutSettings make retries non-optional * [Protocol] Move retry logic into lower level query functions Retries are now implemented as wrappers on the single function that would need to be retried on timeout. In order to avoid cloning of TimeoutSettings, Socket::apply_timeouts() was changed to accept a borrowed TimeoutSettings. And extra helpers were added to the TimeoutSettings impl to reduce repetition. * [Examples] Add retries to the generic example * Also retry on PacketSend error Sending packets could also timeout and until error_generic_member_access is stable we have no way of determining the type of the underlying `std::error::Error`. * Add retry unit tests * [Docs] Update changelog --- CHANGELOG.md | 4 + examples/generic.rs | 26 +++++- .../gamespy/protocols/one/protocol.rs | 14 +++- .../gamespy/protocols/three/protocol.rs | 17 +++- .../gamespy/protocols/two/protocol.rs | 18 +++- src/protocols/minecraft/protocol/bedrock.rs | 18 +++- src/protocols/minecraft/protocol/java.rs | 14 +++- .../minecraft/protocol/legacy_bv1_8.rs | 18 +++- .../minecraft/protocol/legacy_v1_4.rs | 18 +++- .../minecraft/protocol/legacy_v1_6.rs | 18 +++- src/protocols/quake/client.rs | 17 +++- src/protocols/types.rs | 57 +++++++++++-- src/protocols/valve/protocol.rs | 28 ++++++- src/services/valve_master_server/service.rs | 2 +- src/socket.rs | 18 ++-- src/utils.rs | 82 ++++++++++++++++++- 16 files changed, 323 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5652443..77464e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ Protocols: Crate: - Rich errors, capturing backtrace is done on `RUST_BACKTRACE=1`. - Applied some nursery Clippy lints. +- The `retries` field was added to `TimeoutSettings` that specifies the number of times to retry a failed request (request being individual send, receive sequence, some protocols can include multiple requests in a single query) + - By default `retries` is set to `0`, meaning no retries will be attempted Generics: - Added `ExtraRequestSettings` containing all possible extra request settings. @@ -22,6 +24,8 @@ Generics: Crate: - The enum used for errors, `GDError` has been renamed to `GDErrorKind`. - `GDError` is now a struct that holds its kind, the source and a backtrace. +- The `Socket::apply_timeout` method now borrows `TimeoutSettings` (`&Option`) + - To make this easier to work with a new method was added to `TimeoutSettings`: `TimeoutSettings::get_read_and_write_or_defaults` this takes a borrowed optional `TimeoutSettings` and returns the contained read and write durations or the default read and write durations. Generics: - Renamed `CommonResponseJson`'s `game` field (and the function) to `game_mode`. diff --git a/examples/generic.rs b/examples/generic.rs index e74b133..3c0650c 100644 --- a/examples/generic.rs +++ b/examples/generic.rs @@ -44,13 +44,27 @@ fn main() { .expect("Could not lookup host"); let port: Option = args.next().map(|s| s.parse().unwrap()); + let timeout_settings = TimeoutSettings::new( + TimeoutSettings::default().get_read(), + TimeoutSettings::default().get_write(), + 2, + ) + .unwrap(); + let extra_settings = ExtraRequestSettings::default() .set_hostname(hostname.to_string()) .set_gather_rules(true) .set_gather_players(true) .set_check_app_id(false); - generic_query(&game_name, &addr.ip(), port, None, Some(extra_settings)).unwrap(); + generic_query( + &game_name, + &addr.ip(), + port, + Some(timeout_settings), + Some(extra_settings), + ) + .unwrap(); } else { // Without arguments print a list of games @@ -73,8 +87,14 @@ mod test { const ADDR: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST); fn test_game(game_name: &str) { - let timeout_settings = - Some(TimeoutSettings::new(Some(Duration::from_nanos(1)), Some(Duration::from_nanos(1))).unwrap()); + let timeout_settings = Some( + TimeoutSettings::new( + Some(Duration::from_nanos(1)), + Some(Duration::from_nanos(1)), + 0, + ) + .unwrap(), + ); assert!(generic_query(game_name, &ADDR, None, timeout_settings, None).is_err()); } diff --git a/src/protocols/gamespy/protocols/one/protocol.rs b/src/protocols/gamespy/protocols/one/protocol.rs index 1baaed3..cc0c67f 100644 --- a/src/protocols/gamespy/protocols/one/protocol.rs +++ b/src/protocols/gamespy/protocols/one/protocol.rs @@ -4,6 +4,7 @@ use crate::buffer::Utf8Decoder; use crate::protocols::gamespy::common::has_password; use crate::GDErrorKind::TypeParse; +use crate::utils::retry_on_timeout; use crate::{ buffer::Buffer, protocols::{ @@ -17,13 +18,22 @@ use crate::{ use std::collections::HashMap; use std::net::SocketAddr; +/// Send status request, and parse response into HashMap. +/// This function will retry fetch on timeouts. fn get_server_values( address: &SocketAddr, - timeout_settings: Option, + timeout_settings: &Option, ) -> GDResult> { let mut socket = UdpSocket::new(address)?; socket.apply_timeout(timeout_settings)?; + retry_on_timeout( + TimeoutSettings::get_retries_or_default(timeout_settings), + move || get_server_values_impl(&mut socket), + ) +} +/// Send status request, and parse response into HashMap (without retry logic). +fn get_server_values_impl(socket: &mut UdpSocket) -> GDResult> { socket.send(b"\\status\\xserverquery")?; let mut received_query_id: Option = None; @@ -191,7 +201,7 @@ pub fn query_vars( address: &SocketAddr, timeout_settings: Option, ) -> GDResult> { - get_server_values(address, timeout_settings) + get_server_values(address, &timeout_settings) } /// Query a server by providing the address, the port and timeout settings. diff --git a/src/protocols/gamespy/protocols/three/protocol.rs b/src/protocols/gamespy/protocols/three/protocol.rs index 6d9d194..d868d6b 100644 --- a/src/protocols/gamespy/protocols/three/protocol.rs +++ b/src/protocols/gamespy/protocols/three/protocol.rs @@ -5,6 +5,7 @@ use crate::protocols::gamespy::common::has_password; use crate::protocols::gamespy::three::{Player, Response, Team}; use crate::protocols::types::TimeoutSettings; use crate::socket::{Socket, UdpSocket}; +use crate::utils::retry_on_timeout; use crate::GDErrorKind::{PacketBad, TypeParse}; use crate::{GDErrorKind, GDResult}; use std::collections::HashMap; @@ -43,6 +44,7 @@ pub(crate) struct GameSpy3 { socket: UdpSocket, payload: [u8; 4], single_packets: bool, + retry_count: usize, } const PACKET_SIZE: usize = 2048; @@ -51,12 +53,14 @@ const DEFAULT_PAYLOAD: [u8; 4] = [0xFF, 0xFF, 0xFF, 0x01]; impl GameSpy3 { fn new(address: &SocketAddr, timeout_settings: Option) -> GDResult { let socket = UdpSocket::new(address)?; - socket.apply_timeout(timeout_settings)?; + let retry_count = TimeoutSettings::get_retries_or_default(&timeout_settings); + socket.apply_timeout(&timeout_settings)?; Ok(Self { socket, payload: DEFAULT_PAYLOAD, single_packets: false, + retry_count, }) } @@ -67,12 +71,14 @@ impl GameSpy3 { single_packets: bool, ) -> GDResult { let socket = UdpSocket::new(address)?; - socket.apply_timeout(timeout_settings)?; + let retry_count = TimeoutSettings::get_retries_or_default(&timeout_settings); + socket.apply_timeout(&timeout_settings)?; Ok(Self { socket, payload, single_packets, + retry_count, }) } @@ -130,7 +136,14 @@ impl GameSpy3 { ) } + /// Fetch packets from server and store in buffer. + /// This function will retry fetch on timeouts. pub(crate) fn get_server_packets(&mut self) -> GDResult>> { + retry_on_timeout(self.retry_count, move || self.get_server_packets_impl()) + } + + /// Fetch packets from server and store in buffer (without retry logic). + fn get_server_packets_impl(&mut self) -> GDResult>> { let challenge = self.make_initial_handshake()?; self.send_data_request(challenge)?; diff --git a/src/protocols/gamespy/protocols/two/protocol.rs b/src/protocols/gamespy/protocols/two/protocol.rs index 93a8aa8..81904db 100644 --- a/src/protocols/gamespy/protocols/two/protocol.rs +++ b/src/protocols/gamespy/protocols/two/protocol.rs @@ -2,6 +2,7 @@ use crate::buffer::{Buffer, Utf8Decoder}; use crate::protocols::gamespy::two::{Player, Response, Team}; use crate::protocols::types::TimeoutSettings; use crate::socket::{Socket, UdpSocket}; +use crate::utils::retry_on_timeout; use crate::GDErrorKind::{PacketBad, TypeParse}; use crate::{GDErrorKind, GDResult}; use byteorder::BigEndian; @@ -10,6 +11,7 @@ use std::net::SocketAddr; struct GameSpy2 { socket: UdpSocket, + retry_count: usize, } macro_rules! table_extract { @@ -78,12 +80,24 @@ fn data_as_table(data: &mut Buffer) -> GDResult<(HashMap) -> GDResult { let socket = UdpSocket::new(address)?; - socket.apply_timeout(timeout_settings)?; + let retry_count = TimeoutSettings::get_retries_or_default(&timeout_settings); + socket.apply_timeout(&timeout_settings)?; - Ok(Self { socket }) + Ok(Self { + socket, + retry_count, + }) } + /// Send fetch request to server and store result in buffer. + /// This function will retry fetch on timeouts. fn request_data(&mut self) -> GDResult<(Vec, usize)> { + retry_on_timeout(self.retry_count, move || self.request_data_impl()) + } + + /// Send fetch request to server and store result in buffer (without retry + /// logic). + fn request_data_impl(&mut self) -> GDResult<(Vec, usize)> { self.socket .send(&[0xFE, 0xFD, 0x00, 0x00, 0x00, 0x00, 0x01, 0xFF, 0xFF, 0xFF])?; diff --git a/src/protocols/minecraft/protocol/bedrock.rs b/src/protocols/minecraft/protocol/bedrock.rs index db9f87b..60efc36 100644 --- a/src/protocols/minecraft/protocol/bedrock.rs +++ b/src/protocols/minecraft/protocol/bedrock.rs @@ -7,7 +7,7 @@ use crate::{ types::TimeoutSettings, }, socket::{Socket, UdpSocket}, - utils::error_by_expected_size, + utils::{error_by_expected_size, retry_on_timeout}, GDErrorKind::{PacketBad, TypeParse}, GDResult, }; @@ -18,14 +18,19 @@ use byteorder::LittleEndian; pub struct Bedrock { socket: UdpSocket, + retry_count: usize, } impl Bedrock { fn new(address: &SocketAddr, timeout_settings: Option) -> GDResult { let socket = UdpSocket::new(address)?; - socket.apply_timeout(timeout_settings)?; + socket.apply_timeout(&timeout_settings)?; - Ok(Self { socket }) + let retry_count = TimeoutSettings::get_retries_or_default(&timeout_settings); + Ok(Self { + socket, + retry_count, + }) } fn send_status_request(&mut self) -> GDResult<()> { @@ -39,7 +44,14 @@ impl Bedrock { Ok(()) } + /// Send a status request, and parse the response. + /// This function will retry fetch on timeouts. fn get_info(&mut self) -> GDResult { + retry_on_timeout(self.retry_count, move || self.get_info_impl()) + } + + /// Send a status request, and parse the response (without retry logic). + fn get_info_impl(&mut self) -> GDResult { self.send_status_request()?; let received = self.socket.receive(None)?; diff --git a/src/protocols/minecraft/protocol/java.rs b/src/protocols/minecraft/protocol/java.rs index bed40e6..d3406c3 100644 --- a/src/protocols/minecraft/protocol/java.rs +++ b/src/protocols/minecraft/protocol/java.rs @@ -5,6 +5,7 @@ use crate::{ types::TimeoutSettings, }, socket::{Socket, TcpSocket}, + utils::retry_on_timeout, GDErrorKind::{JsonParse, PacketBad}, GDResult, }; @@ -18,6 +19,7 @@ use serde_json::Value; pub struct Java { socket: TcpSocket, request_settings: RequestSettings, + retry_count: usize, } impl Java { @@ -27,11 +29,13 @@ impl Java { request_settings: Option, ) -> GDResult { let socket = TcpSocket::new(address)?; - socket.apply_timeout(timeout_settings)?; + socket.apply_timeout(&timeout_settings)?; + let retry_count = TimeoutSettings::get_retries_or_default(&timeout_settings); Ok(Self { socket, request_settings: request_settings.unwrap_or_default(), + retry_count, }) } @@ -92,7 +96,15 @@ impl Java { Ok(()) } + /// Send minecraft ping request and parse the response. + /// This function will retry fetch on timeouts. fn get_info(&mut self) -> GDResult { + retry_on_timeout(self.retry_count, move || self.get_info_impl()) + } + + /// Send minecraft ping request and parse the response (without retry + /// logic). + fn get_info_impl(&mut self) -> GDResult { self.send_handshake()?; self.send_status_request()?; self.send_ping_request()?; diff --git a/src/protocols/minecraft/protocol/legacy_bv1_8.rs b/src/protocols/minecraft/protocol/legacy_bv1_8.rs index ec50486..b2140e2 100644 --- a/src/protocols/minecraft/protocol/legacy_bv1_8.rs +++ b/src/protocols/minecraft/protocol/legacy_bv1_8.rs @@ -5,7 +5,7 @@ use crate::{ types::TimeoutSettings, }, socket::{Socket, TcpSocket}, - utils::error_by_expected_size, + utils::{error_by_expected_size, retry_on_timeout}, GDErrorKind::{PacketBad, ProtocolFormat}, GDResult, }; @@ -16,19 +16,31 @@ use byteorder::BigEndian; pub struct LegacyBV1_8 { socket: TcpSocket, + retry_count: usize, } impl LegacyBV1_8 { fn new(address: &SocketAddr, timeout_settings: Option) -> GDResult { let socket = TcpSocket::new(address)?; - socket.apply_timeout(timeout_settings)?; + socket.apply_timeout(&timeout_settings)?; - Ok(Self { socket }) + let retry_count = TimeoutSettings::get_retries_or_default(&timeout_settings); + Ok(Self { + socket, + retry_count, + }) } fn send_initial_request(&mut self) -> GDResult<()> { self.socket.send(&[0xFE]) } + /// Send request for info and parse response. + /// This function will retry fetch on timeouts. fn get_info(&mut self) -> GDResult { + retry_on_timeout(self.retry_count, move || self.get_info_impl()) + } + + /// Send request for info and parse response (without retry logic). + fn get_info_impl(&mut self) -> GDResult { self.send_initial_request()?; let data = self.socket.receive(None)?; diff --git a/src/protocols/minecraft/protocol/legacy_v1_4.rs b/src/protocols/minecraft/protocol/legacy_v1_4.rs index bab741b..ba4733e 100644 --- a/src/protocols/minecraft/protocol/legacy_v1_4.rs +++ b/src/protocols/minecraft/protocol/legacy_v1_4.rs @@ -7,7 +7,7 @@ use crate::{ types::TimeoutSettings, }, socket::{Socket, TcpSocket}, - utils::error_by_expected_size, + utils::{error_by_expected_size, retry_on_timeout}, GDErrorKind::{PacketBad, ProtocolFormat}, GDResult, }; @@ -15,19 +15,31 @@ use std::net::SocketAddr; pub struct LegacyV1_4 { socket: TcpSocket, + retry_count: usize, } impl LegacyV1_4 { fn new(address: &SocketAddr, timeout_settings: Option) -> GDResult { let socket = TcpSocket::new(address)?; - socket.apply_timeout(timeout_settings)?; + socket.apply_timeout(&timeout_settings)?; - Ok(Self { socket }) + let retry_count = TimeoutSettings::get_retries_or_default(&timeout_settings); + Ok(Self { + socket, + retry_count, + }) } fn send_initial_request(&mut self) -> GDResult<()> { self.socket.send(&[0xFE, 0x01]) } + /// Send info request and parse response. + /// This function will retry fetch on timeouts. fn get_info(&mut self) -> GDResult { + retry_on_timeout(self.retry_count, move || self.get_info_impl()) + } + + /// Send info request and parse response (without retry logic). + fn get_info_impl(&mut self) -> GDResult { self.send_initial_request()?; let data = self.socket.receive(None)?; diff --git a/src/protocols/minecraft/protocol/legacy_v1_6.rs b/src/protocols/minecraft/protocol/legacy_v1_6.rs index ce1486b..0def533 100644 --- a/src/protocols/minecraft/protocol/legacy_v1_6.rs +++ b/src/protocols/minecraft/protocol/legacy_v1_6.rs @@ -7,7 +7,7 @@ use crate::{ types::TimeoutSettings, }, socket::{Socket, TcpSocket}, - utils::error_by_expected_size, + utils::{error_by_expected_size, retry_on_timeout}, GDErrorKind::{PacketBad, ProtocolFormat}, GDResult, }; @@ -15,14 +15,19 @@ use std::net::SocketAddr; pub struct LegacyV1_6 { socket: TcpSocket, + retry_count: usize, } impl LegacyV1_6 { fn new(address: &SocketAddr, timeout_settings: Option) -> GDResult { let socket = TcpSocket::new(address)?; - socket.apply_timeout(timeout_settings)?; + socket.apply_timeout(&timeout_settings)?; - Ok(Self { socket }) + let retry_count = TimeoutSettings::get_retries_or_default(&timeout_settings); + Ok(Self { + socket, + retry_count, + }) } fn send_initial_request(&mut self) -> GDResult<()> { @@ -81,7 +86,14 @@ impl LegacyV1_6 { }) } + /// Send info request and parse response. + /// This function will retry fetch on timeouts. fn get_info(&mut self) -> GDResult { + retry_on_timeout(self.retry_count, move || self.get_info_impl()) + } + + /// Send info request and parse response (without retry logic). + fn get_info_impl(&mut self) -> GDResult { self.send_initial_request()?; let data = self.socket.receive(None)?; diff --git a/src/protocols/quake/client.rs b/src/protocols/quake/client.rs index b0e3108..79c65f4 100644 --- a/src/protocols/quake/client.rs +++ b/src/protocols/quake/client.rs @@ -4,6 +4,7 @@ use crate::buffer::{Buffer, Utf8Decoder}; use crate::protocols::quake::types::Response; use crate::protocols::types::TimeoutSettings; use crate::socket::{Socket, UdpSocket}; +use crate::utils::retry_on_timeout; use crate::GDErrorKind::{PacketBad, TypeParse}; use crate::{GDErrorKind, GDResult}; use std::collections::HashMap; @@ -18,10 +19,22 @@ pub trait QuakeClient { fn parse_player_string(data: Iter<&str>) -> GDResult; } -fn get_data(address: &SocketAddr, timeout_settings: Option) -> GDResult> { +/// Send request and return result buffer. +/// This function will retry fetch on timeouts. +fn get_data( + address: &SocketAddr, + timeout_settings: &Option, +) -> GDResult> { let mut socket = UdpSocket::new(address)?; socket.apply_timeout(timeout_settings)?; + retry_on_timeout( + TimeoutSettings::get_retries_or_default(timeout_settings), + move || get_data_impl::(&mut socket), + ) +} +/// Send request and return result buffer (without retry logic). +fn get_data_impl(socket: &mut UdpSocket) -> GDResult> { socket.send( &[ &[0xFF, 0xFF, 0xFF, 0xFF], @@ -95,7 +108,7 @@ pub fn client_query( address: &SocketAddr, timeout_settings: Option, ) -> GDResult> { - let data = get_data::(address, timeout_settings)?; + let data = get_data::(address, &timeout_settings)?; let mut bufferer = Buffer::::new(&data); let mut server_vars = get_server_values(&mut bufferer)?; diff --git a/src/protocols/types.rs b/src/protocols/types.rs index 93d096e..a2855eb 100644 --- a/src/protocols/types.rs +++ b/src/protocols/types.rs @@ -146,12 +146,19 @@ pub struct CommonPlayerJson<'a> { pub struct TimeoutSettings { read: Option, write: Option, + retries: usize, } impl TimeoutSettings { /// Construct new settings, passing None will block indefinitely. /// Passing zero Duration throws GDErrorKind::[InvalidInput]. - pub fn new(read: Option, write: Option) -> GDResult { + /// + /// The retry count is the number of extra tries once the original request + /// fails, so a value of "0" will only make a single request, whereas + /// "1" will try the request again once if it fails. + /// The retry count is per-request so for multi-request queries (valve) if a + /// single part fails that part can be retried up to `retries` times. + pub fn new(read: Option, write: Option, retries: usize) -> GDResult { if let Some(read_duration) = read { if read_duration == Duration::new(0, 0) { return Err(InvalidInput.context("Read duration must not be 0")); @@ -164,7 +171,11 @@ impl TimeoutSettings { } } - Ok(Self { read, write }) + Ok(Self { + read, + write, + retries, + }) } /// Get the read timeout. @@ -172,18 +183,48 @@ impl TimeoutSettings { /// Get the write timeout. pub const fn get_write(&self) -> Option { self.write } -} -impl Default for TimeoutSettings { - /// Default values are 4 seconds for both read and write. - fn default() -> Self { + /// Get number of retries + pub const fn get_retries(&self) -> usize { self.retries } + + /// Get the number of retries if there are timeout settings else fall back + /// to the default + pub const fn get_retries_or_default(timeout_settings: &Option) -> usize { + if let Some(timeout_settings) = timeout_settings { + timeout_settings.get_retries() + } else { + TimeoutSettings::const_default().get_retries() + } + } + + /// Get the read and write durations if there are timeout settings else fall + /// back to the defaults + pub const fn get_read_and_write_or_defaults( + timeout_settings: &Option, + ) -> (Option, Option) { + if let Some(timeout_settings) = timeout_settings { + (timeout_settings.get_read(), timeout_settings.get_write()) + } else { + let default = TimeoutSettings::const_default(); + (default.get_read(), default.get_write()) + } + } + + /// Default values are 4 seconds for both read and write, no retries. + pub const fn const_default() -> Self { Self { read: Some(Duration::from_secs(4)), write: Some(Duration::from_secs(4)), + retries: 0, } } } +impl Default for TimeoutSettings { + /// Default values are 4 seconds for both read and write, no retries. + fn default() -> Self { Self::const_default() } +} + /// Generic extra request settings /// /// Fields of this struct may not be used depending on which protocol @@ -273,7 +314,7 @@ mod tests { let write_duration = Duration::from_secs(2); // Create new TimeoutSettings with the valid durations - let timeout_settings = TimeoutSettings::new(Some(read_duration), Some(write_duration))?; + let timeout_settings = TimeoutSettings::new(Some(read_duration), Some(write_duration), 0)?; // Verify that the get_read and get_write methods return the expected values assert_eq!(timeout_settings.get_read(), Some(read_duration)); @@ -291,7 +332,7 @@ mod tests { // Try to create new TimeoutSettings with the zero read duration (this should // fail) - let result = TimeoutSettings::new(Some(read_duration), Some(write_duration)); + let result = TimeoutSettings::new(Some(read_duration), Some(write_duration), 0); // Verify that the function returned an error and that the error type is // InvalidInput diff --git a/src/protocols/valve/protocol.rs b/src/protocols/valve/protocol.rs index 873f534..4fd95a1 100644 --- a/src/protocols/valve/protocol.rs +++ b/src/protocols/valve/protocol.rs @@ -20,7 +20,7 @@ use crate::{ }, }, socket::{Socket, UdpSocket}, - utils::u8_lower_upper, + utils::{retry_on_timeout, u8_lower_upper}, GDErrorKind::{BadGame, Decompress, UnknownEnumCast}, GDResult, }; @@ -120,6 +120,7 @@ impl SplitPacket { pub(crate) struct ValveProtocol { socket: UdpSocket, + retry_count: usize, } static PACKET_SIZE: usize = 6144; @@ -127,9 +128,16 @@ static PACKET_SIZE: usize = 6144; impl ValveProtocol { pub fn new(address: &SocketAddr, timeout_settings: Option) -> GDResult { let socket = UdpSocket::new(address)?; - socket.apply_timeout(timeout_settings)?; + let retry_count = timeout_settings + .as_ref() + .map(|t| t.get_retries()) + .unwrap_or_else(|| TimeoutSettings::default().get_retries()); + socket.apply_timeout(&timeout_settings)?; - Ok(Self { socket }) + Ok(Self { + socket, + retry_count, + }) } fn receive(&mut self, engine: &Engine, protocol: u8, buffer_size: usize) -> GDResult { @@ -170,7 +178,21 @@ impl ValveProtocol { } /// Ask for a specific request only. + /// This function will retry fetch on timeouts. pub fn get_request_data(&mut self, engine: &Engine, protocol: u8, kind: u8, payload: Vec) -> GDResult> { + retry_on_timeout(self.retry_count, || { + self.get_request_data_impl(engine, protocol, kind, payload.clone()) + }) + } + + /// Ask for a specific request only (without retry logic). + fn get_request_data_impl( + &mut self, + engine: &Engine, + protocol: u8, + kind: u8, + payload: Vec, + ) -> GDResult> { let request_initial_packet = Packet::new(kind, payload).to_bytes(); self.socket.send(&request_initial_packet)?; diff --git a/src/services/valve_master_server/service.rs b/src/services/valve_master_server/service.rs index 98d6161..ac44dfa 100644 --- a/src/services/valve_master_server/service.rs +++ b/src/services/valve_master_server/service.rs @@ -50,7 +50,7 @@ impl ValveMasterServer { /// Construct a new struct. pub fn new(master_address: &SocketAddr) -> GDResult { let socket = UdpSocket::new(master_address)?; - socket.apply_timeout(None)?; + socket.apply_timeout(&None)?; Ok(Self { socket }) } diff --git a/src/socket.rs b/src/socket.rs index 042b6b4..36c4dca 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -16,7 +16,7 @@ pub trait Socket { fn new(address: &SocketAddr) -> GDResult where Self: Sized; - fn apply_timeout(&self, timeout_settings: Option) -> GDResult<()>; + fn apply_timeout(&self, timeout_settings: &Option) -> GDResult<()>; fn send(&mut self, data: &[u8]) -> GDResult<()>; fn receive(&mut self, size: Option) -> GDResult>; @@ -37,10 +37,10 @@ impl Socket for TcpSocket { }) } - fn apply_timeout(&self, timeout_settings: Option) -> GDResult<()> { - let settings = timeout_settings.unwrap_or_default(); - self.socket.set_read_timeout(settings.get_read()).unwrap(); // unwrapping because TimeoutSettings::new - self.socket.set_write_timeout(settings.get_write()).unwrap(); // checks if these are 0 and throws an error + fn apply_timeout(&self, timeout_settings: &Option) -> GDResult<()> { + let (read, write) = TimeoutSettings::get_read_and_write_or_defaults(timeout_settings); + self.socket.set_read_timeout(read).unwrap(); // unwrapping because TimeoutSettings::new + self.socket.set_write_timeout(write).unwrap(); // checks if these are 0 and throws an error Ok(()) } @@ -77,10 +77,10 @@ impl Socket for UdpSocket { }) } - fn apply_timeout(&self, timeout_settings: Option) -> GDResult<()> { - let settings = timeout_settings.unwrap_or_default(); - self.socket.set_read_timeout(settings.get_read()).unwrap(); // unwrapping because TimeoutSettings::new - self.socket.set_write_timeout(settings.get_write()).unwrap(); // checks if these are 0 and throws an error + fn apply_timeout(&self, timeout_settings: &Option) -> GDResult<()> { + let (read, write) = TimeoutSettings::get_read_and_write_or_defaults(timeout_settings); + self.socket.set_read_timeout(read).unwrap(); // unwrapping because TimeoutSettings::new + self.socket.set_write_timeout(write).unwrap(); // checks if these are 0 and throws an error Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index b06a75d..7c4a6f2 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,4 @@ -use crate::GDErrorKind::{PacketOverflow, PacketUnderflow}; +use crate::GDErrorKind::{PacketOverflow, PacketReceive, PacketSend, PacketUnderflow}; use crate::GDResult; use std::cmp::Ordering; @@ -12,8 +12,31 @@ pub fn error_by_expected_size(expected: usize, size: usize) -> GDResult<()> { pub const fn u8_lower_upper(n: u8) -> (u8, u8) { (n & 15, n >> 4) } +/// Run a closure `retry_count+1` times while it returns [PacketReceive] or +/// [PacketSend] errors, returning the first success, other Error, or after +/// `retry_count+1` tries the last [PacketReceive] or [PacketSend] error. +pub fn retry_on_timeout(mut retry_count: usize, mut fetch: impl FnMut() -> GDResult) -> GDResult { + let mut last_err = PacketReceive.context("Retry count was 0"); + retry_count += 1; + while retry_count > 0 { + last_err = match fetch() { + Ok(r) => return Ok(r), + Err(e) if e.kind == PacketReceive || e.kind == PacketSend => e, + Err(e) => return Err(e), + }; + retry_count -= 1; + } + Err(last_err) +} + #[cfg(test)] mod tests { + use super::retry_on_timeout; + use crate::{ + GDErrorKind::{PacketBad, PacketReceive, PacketSend}, + GDResult, + }; + #[test] fn u8_lower_upper() { assert_eq!(super::u8_lower_upper(171), (11, 10)); @@ -25,4 +48,61 @@ mod tests { assert!(super::error_by_expected_size(69, 68).is_err()); assert!(super::error_by_expected_size(69, 70).is_err()); } + + #[test] + fn retry_success_on_first() { + let r = retry_on_timeout(0, || Ok(())); + assert!(r.is_ok()); + } + + #[test] + fn retry_no_success() { + let r: GDResult<()> = retry_on_timeout(100, || Err(PacketSend.context("test"))); + assert!(r.is_err()); + assert_eq!(r.unwrap_err().kind, PacketSend); + } + + #[test] + fn retry_success_on_third() { + let mut i = 0u8; + let r = retry_on_timeout(2, || { + i += 1; + if i < 3 { + Err(PacketReceive.context("test")) + } else { + Ok(()) + } + }); + assert!(r.is_ok()); + } + + #[test] + fn retry_success_on_third_but_less_retries() { + let mut i = 0u8; + let r = retry_on_timeout(1, || { + i += 1; + if i < 3 { + Err(PacketReceive.context("test")) + } else { + Ok(()) + } + }); + assert!(r.is_err()); + assert_eq!(r.unwrap_err().kind, PacketReceive); + } + + #[test] + fn retry_with_non_timeout_error() { + let mut i = 0u8; + let r = retry_on_timeout(50, || { + i += 1; + match i { + 1 => Err(PacketSend.context("test")), + 2 => Err(PacketBad.context("test")), + _ => Ok(()), + } + }); + assert!(r.is_err()); + assert_eq!(r.unwrap_err().kind, PacketBad); + } }