diff --git a/crates/core_types/src/bits_n_pieces.rs b/crates/core_types/src/bits_n_pieces.rs index 4d142a5c1e..b3ae1608b0 100644 --- a/crates/core_types/src/bits_n_pieces.rs +++ b/crates/core_types/src/bits_n_pieces.rs @@ -8,7 +8,7 @@ pub fn u32_high_bits(i: u32) -> u16 { /// returns the u16 low bits from a u32 by doing a lossy cast pub fn u32_low_bits(i: u32) -> u16 { - (i as u16) + i as u16 } /// splits the high and low bits of u32 into a tuple of u16, for destructuring convenience @@ -26,7 +26,7 @@ pub fn u64_high_bits(i: u64) -> u32 { } pub fn u64_low_bits(i: u64) -> u32 { - (i as u32) + i as u32 } pub fn u64_split_bits(i: u64) -> (u32, u32) { diff --git a/crates/net/src/sim2h_worker.rs b/crates/net/src/sim2h_worker.rs index 5c6c5a204a..541c3cce68 100644 --- a/crates/net/src/sim2h_worker.rs +++ b/crates/net/src/sim2h_worker.rs @@ -28,7 +28,11 @@ use sim2h::{ crypto::{Provenance, SignedWireMessage}, generate_ack_receipt_hash, TcpWss, WireError, WireMessage, WIRE_VERSION, }; -use std::{convert::TryFrom, time::Instant}; +use std::{ + collections::{HashMap, VecDeque}, + convert::TryFrom, + time::Instant, +}; use url::Url; use url2::prelude::*; @@ -51,9 +55,10 @@ pub struct Sim2hConfig { pub sim2h_url: String, } +#[derive(Clone)] struct BufferedMessage { pub wire_message: WireMessage, - pub hash: u64, + //pub hash: u64, pub last_sent: Option, } @@ -61,7 +66,7 @@ impl From for BufferedMessage { fn from(wire_message: WireMessage) -> BufferedMessage { BufferedMessage { wire_message, - hash: 0, + //hash: 0, last_sent: None, } } @@ -82,7 +87,8 @@ pub struct Sim2hWorker { connection_timeout_backoff: u64, reconnect_interval: Duration, metric_publisher: std::sync::Arc>, - outgoing_message_buffer: Vec, + outgoing_message_buffer: VecDeque, + outgoing_ack: HashMap, ws_frame: Option, initial_authoring_list: Option, initial_gossiping_list: Option, @@ -123,7 +129,8 @@ impl Sim2hWorker { metric_publisher: std::sync::Arc::new(std::sync::RwLock::new( DefaultMetricPublisher::default(), )), - outgoing_message_buffer: Vec::new(), + outgoing_message_buffer: VecDeque::new(), + outgoing_ack: HashMap::new(), ws_frame: None, initial_authoring_list: None, initial_gossiping_list: None, @@ -225,6 +232,17 @@ impl Sim2hWorker { } } + fn check_resend(&mut self) { + for msg in self.outgoing_ack.values_mut() { + if let Some(instant_last_sent) = msg.last_sent { + if instant_last_sent.elapsed() >= Duration::from_millis(RESEND_WIRE_MESSAGE_MS) { + msg.last_sent = None; + self.outgoing_message_buffer.push_back(msg.clone()); + } + } + } + } + /// if we have queued wire messages and our connection is ready, /// try to send one /// return if we did something @@ -232,12 +250,17 @@ impl Sim2hWorker { if self.outgoing_message_buffer.is_empty() || !self.connection_ready() { return false; } + /* + let len = self.outgoing_message_buffer.len(); let buffered_message = self.outgoing_message_buffer.get_mut(0).unwrap(); if let Some(instant_last_sent) = buffered_message.last_sent { if instant_last_sent.elapsed() < Duration::from_millis(RESEND_WIRE_MESSAGE_MS) { return false; } } + */ + self.check_resend(); + let mut buffered_message = self.outgoing_message_buffer.pop_front().unwrap(); let message = &buffered_message.wire_message; debug!("WireMessage: preparing to send {:?}", message); let payload: String = message.clone().into(); @@ -272,8 +295,9 @@ impl Sim2hWorker { self.check_reconnect(); return true; } - buffered_message.hash = generate_ack_receipt_hash(&payload); + let hash = generate_ack_receipt_hash(&payload); buffered_message.last_sent = Some(Instant::now()); + self.outgoing_ack.insert(hash, buffered_message); true } @@ -293,7 +317,7 @@ impl Sim2hWorker { // we always put messages in the outgoing buffer, // they'll be sent when the connection is ready debug!("WireMessage: queueing {:?}", message); - self.outgoing_message_buffer.push(message.into()); + self.outgoing_message_buffer.push_back(message.into()); Ok(()) } @@ -551,22 +575,19 @@ impl Sim2hWorker { } WireMessage::StatusResponse(_) => error!("Got a StatusResponse from the Sim2h server, weird! Ignoring (I use Hello not Status)"), WireMessage::Ack(hash) => { - if self.outgoing_message_buffer - .first() - .and_then(|buffered_message| Some(buffered_message.hash == hash)) - .unwrap_or(false) + if self.outgoing_ack + .remove(&hash) + .map(|buffered_message| { + debug!("got_ack {:?}, {:?}", buffered_message.wire_message, buffered_message.last_sent); + buffered_message + }) + .is_some() { debug!("WireMessage::Ack received => dequeuing sent message {:?}", message); - // if we made it here, we successfully sent the first message - // we can remove it from the outgoing buffer queue - self.outgoing_message_buffer.remove(0); } else { warn!( - "WireMessage::Ack received that came out of order! Got hash: {}, have top hash: {:?}", - hash, - self.outgoing_message_buffer - .first() - .map(|buffered_message| buffered_message.hash) + "WireMessage::Ack received that came but wasn't held! Got hash: {}", + hash ); } }