From 57cc0c6345b075196e93fc793691501c7d9afbd7 Mon Sep 17 00:00:00 2001 From: freesig Date: Tue, 17 Mar 2020 18:18:34 +1100 Subject: [PATCH 1/3] parallel out --- crates/net/src/sim2h_worker.rs | 54 ++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/crates/net/src/sim2h_worker.rs b/crates/net/src/sim2h_worker.rs index 5c6c5a204a..d9d0f5b16d 100644 --- a/crates/net/src/sim2h_worker.rs +++ b/crates/net/src/sim2h_worker.rs @@ -29,6 +29,7 @@ use sim2h::{ generate_ack_receipt_hash, TcpWss, WireError, WireMessage, WIRE_VERSION, }; use std::{convert::TryFrom, time::Instant}; +use std::collections::{HashMap, VecDeque}; use url::Url; use url2::prelude::*; @@ -51,9 +52,10 @@ pub struct Sim2hConfig { pub sim2h_url: String, } +#[derive(Clone)] struct BufferedMessage { pub wire_message: WireMessage, - pub hash: u64, + //pub hash: u64, pub last_sent: Option, } @@ -61,7 +63,7 @@ impl From for BufferedMessage { fn from(wire_message: WireMessage) -> BufferedMessage { BufferedMessage { wire_message, - hash: 0, + //hash: 0, last_sent: None, } } @@ -82,7 +84,8 @@ pub struct Sim2hWorker { connection_timeout_backoff: u64, reconnect_interval: Duration, metric_publisher: std::sync::Arc>, - outgoing_message_buffer: Vec, + outgoing_message_buffer: VecDeque, + outgoing_ack: HashMap, ws_frame: Option, initial_authoring_list: Option, initial_gossiping_list: Option, @@ -123,7 +126,8 @@ impl Sim2hWorker { metric_publisher: std::sync::Arc::new(std::sync::RwLock::new( DefaultMetricPublisher::default(), )), - outgoing_message_buffer: Vec::new(), + outgoing_message_buffer: VecDeque::new(), + outgoing_ack: HashMap::new(), ws_frame: None, initial_authoring_list: None, initial_gossiping_list: None, @@ -225,6 +229,17 @@ impl Sim2hWorker { } } + fn check_resend(&mut self) { + for msg in self.outgoing_ack.values_mut() { + if let Some(instant_last_sent) = msg.last_sent { + if instant_last_sent.elapsed() < Duration::from_millis(RESEND_WIRE_MESSAGE_MS) { + msg.last_sent = None; + self.outgoing_message_buffer.push_back(msg.clone()); + } + } + } + } + /// if we have queued wire messages and our connection is ready, /// try to send one /// return if we did something @@ -232,12 +247,17 @@ impl Sim2hWorker { if self.outgoing_message_buffer.is_empty() || !self.connection_ready() { return false; } + /* + let len = self.outgoing_message_buffer.len(); let buffered_message = self.outgoing_message_buffer.get_mut(0).unwrap(); if let Some(instant_last_sent) = buffered_message.last_sent { if instant_last_sent.elapsed() < Duration::from_millis(RESEND_WIRE_MESSAGE_MS) { return false; } } + */ + self.check_resend(); + let mut buffered_message = self.outgoing_message_buffer.pop_front().unwrap(); let message = &buffered_message.wire_message; debug!("WireMessage: preparing to send {:?}", message); let payload: String = message.clone().into(); @@ -272,8 +292,9 @@ impl Sim2hWorker { self.check_reconnect(); return true; } - buffered_message.hash = generate_ack_receipt_hash(&payload); + let hash = generate_ack_receipt_hash(&payload); buffered_message.last_sent = Some(Instant::now()); + self.outgoing_ack.insert(hash, buffered_message); true } @@ -293,7 +314,7 @@ impl Sim2hWorker { // we always put messages in the outgoing buffer, // they'll be sent when the connection is ready debug!("WireMessage: queueing {:?}", message); - self.outgoing_message_buffer.push(message.into()); + self.outgoing_message_buffer.push_back(message.into()); Ok(()) } @@ -551,22 +572,19 @@ impl Sim2hWorker { } WireMessage::StatusResponse(_) => error!("Got a StatusResponse from the Sim2h server, weird! Ignoring (I use Hello not Status)"), WireMessage::Ack(hash) => { - if self.outgoing_message_buffer - .first() - .and_then(|buffered_message| Some(buffered_message.hash == hash)) - .unwrap_or(false) + if self.outgoing_ack + .remove(&hash) + .map(|buffered_message| { + tracing::debug!(got_ack = true, ?buffered_message.wire_message, ?buffered_message.last_sent); + buffered_message + }) + .is_some() { debug!("WireMessage::Ack received => dequeuing sent message {:?}", message); - // if we made it here, we successfully sent the first message - // we can remove it from the outgoing buffer queue - self.outgoing_message_buffer.remove(0); } else { warn!( - "WireMessage::Ack received that came out of order! Got hash: {}, have top hash: {:?}", - hash, - self.outgoing_message_buffer - .first() - .map(|buffered_message| buffered_message.hash) + "WireMessage::Ack received that came but wasn't held! Got hash: {}", + hash ); } } From f61081377d62a51056c64b429f38ff10c540235f Mon Sep 17 00:00:00 2001 From: freesig Date: Wed, 25 Mar 2020 09:46:00 +1100 Subject: [PATCH 2/3] clean up --- crates/core_types/src/bits_n_pieces.rs | 4 ++-- crates/net/src/sim2h_worker.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/core_types/src/bits_n_pieces.rs b/crates/core_types/src/bits_n_pieces.rs index 4d142a5c1e..b3ae1608b0 100644 --- a/crates/core_types/src/bits_n_pieces.rs +++ b/crates/core_types/src/bits_n_pieces.rs @@ -8,7 +8,7 @@ pub fn u32_high_bits(i: u32) -> u16 { /// returns the u16 low bits from a u32 by doing a lossy cast pub fn u32_low_bits(i: u32) -> u16 { - (i as u16) + i as u16 } /// splits the high and low bits of u32 into a tuple of u16, for destructuring convenience @@ -26,7 +26,7 @@ pub fn u64_high_bits(i: u64) -> u32 { } pub fn u64_low_bits(i: u64) -> u32 { - (i as u32) + i as u32 } pub fn u64_split_bits(i: u64) -> (u32, u32) { diff --git a/crates/net/src/sim2h_worker.rs b/crates/net/src/sim2h_worker.rs index d9d0f5b16d..840b5b4563 100644 --- a/crates/net/src/sim2h_worker.rs +++ b/crates/net/src/sim2h_worker.rs @@ -575,7 +575,7 @@ impl Sim2hWorker { if self.outgoing_ack .remove(&hash) .map(|buffered_message| { - tracing::debug!(got_ack = true, ?buffered_message.wire_message, ?buffered_message.last_sent); + debug!("got_ack {:?}, {:?}", buffered_message.wire_message, buffered_message.last_sent); buffered_message }) .is_some() From b04b6c1184393bd9967407f503c1fb0efa0c1b18 Mon Sep 17 00:00:00 2001 From: freesig Date: Wed, 25 Mar 2020 09:47:44 +1100 Subject: [PATCH 3/3] gt --- crates/net/src/sim2h_worker.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/net/src/sim2h_worker.rs b/crates/net/src/sim2h_worker.rs index 840b5b4563..541c3cce68 100644 --- a/crates/net/src/sim2h_worker.rs +++ b/crates/net/src/sim2h_worker.rs @@ -28,8 +28,11 @@ use sim2h::{ crypto::{Provenance, SignedWireMessage}, generate_ack_receipt_hash, TcpWss, WireError, WireMessage, WIRE_VERSION, }; -use std::{convert::TryFrom, time::Instant}; -use std::collections::{HashMap, VecDeque}; +use std::{ + collections::{HashMap, VecDeque}, + convert::TryFrom, + time::Instant, +}; use url::Url; use url2::prelude::*; @@ -232,7 +235,7 @@ impl Sim2hWorker { fn check_resend(&mut self) { for msg in self.outgoing_ack.values_mut() { if let Some(instant_last_sent) = msg.last_sent { - if instant_last_sent.elapsed() < Duration::from_millis(RESEND_WIRE_MESSAGE_MS) { + if instant_last_sent.elapsed() >= Duration::from_millis(RESEND_WIRE_MESSAGE_MS) { msg.last_sent = None; self.outgoing_message_buffer.push_back(msg.clone()); }