Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
ChaoticTempest committed Jan 16, 2025
1 parent d43649e commit eb18367
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 32 deletions.
8 changes: 4 additions & 4 deletions chain-signatures/node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,10 @@ impl ConsensusProtocol for RunningState {
ProtocolState::Running(contract_state) => match contract_state.epoch.cmp(&self.epoch) {
Ordering::Greater => {
tracing::warn!(
"running(running): our current epoch is {} while contract state's is {}, trying to rejoin as a new participant",
self.epoch,
contract_state.epoch
);
"running(running): our current epoch is {} while contract state's is {}, trying to rejoin as a new participant",
self.epoch,
contract_state.epoch
);

Ok(NodeState::Joining(JoiningState {
participants: contract_state.participants,
Expand Down
27 changes: 14 additions & 13 deletions chain-signatures/node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ impl CryptographicProtocol for ResharingState {
// TODO: we are not using active potential participants here, but we should in the future.
// Currently resharing protocol does not timeout and restart with new set of participants.
// So if it picks up a participant that is not active, it will never be able to send a message to it.
let active = mesh_state.active.and(&mesh_state.potential);
tracing::info!(active = ?active.keys().collect::<Vec<_>>(), "progressing key reshare");
let active = mesh_state.active.and(&mesh_state.active_potential);
tracing::info!(active = ?active.keys_vec(), "progressing key reshare");
let mut protocol = self.protocol.write().await;
loop {
let action = match protocol.poke() {
Expand Down Expand Up @@ -199,18 +199,19 @@ impl CryptographicProtocol for ResharingState {
tracing::debug!("resharing: sending a private message to {to:?}");
if self.new_participants.get(&to).is_none() {
tracing::error!("resharing: send_private unknown participant {to:?}");
} else {
ctx.channel()
.send(
self.me,
to,
ResharingMessage {
epoch: self.old_epoch,
from: self.me,
data,
},
)
.await;
}
ctx.channel()
.send(
self.me,
to,
ResharingMessage {
epoch: self.old_epoch,
from: self.me,
data,
},
)
.await;
}
Action::Return(private_share) => {
tracing::debug!("resharing: successfully completed key reshare");
Expand Down
58 changes: 49 additions & 9 deletions chain-signatures/node/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ impl Message {

#[derive(Default)]
pub struct MessageInbox {
/// encrypted messages that are pending to be decrypted. These are messages that we received
/// from other nodes that weren't able to be processed yet due to missing info such as the
/// participant id in the case of slow resharing.
retry_decrypt: VecDeque<(Ciphered, Instant)>,

generating: VecDeque<GeneratingMessage>,
resharing: HashMap<Epoch, VecDeque<ResharingMessage>>,
triple: HashMap<Epoch, HashMap<TripleId, VecDeque<TripleMessage>>>,
Expand Down Expand Up @@ -188,13 +193,40 @@ impl MessageInbox {
}
}

pub fn expire(&mut self, protocol: &ProtocolConfig) {
self.retry_decrypt.retain(|(_, timestamp)| {
timestamp.elapsed() < Duration::from_secs(protocol.message_timeout)
});
}

pub async fn extend_decrypt(
&mut self,
cipher_sk: &hpke::SecretKey,
protocol_state: &Arc<RwLock<NodeState>>,
incoming: &mut mpsc::Receiver<Ciphered>,
) -> usize {
let mut count = 0;
let mut retry = Vec::new();
while let Some((encrypted, timestamp)) = self.retry_decrypt.pop_front() {
let messages: Vec<Message> =
match SignedMessage::decrypt(cipher_sk, protocol_state, &encrypted).await {
Ok(msg) => msg,
Err(err) => {
if matches!(err, MessageError::UnknownParticipant(_)) {
retry.push((encrypted, timestamp));
} else {
tracing::error!(?err, "failed to decrypt/verify retried messages");
}
continue;
}
};

count += messages.len();
for msg in messages {
self.push(msg);
}
}

loop {
let encrypted = match incoming.try_recv() {
Ok(msg) => msg,
Expand All @@ -210,10 +242,14 @@ impl MessageInbox {
};

let messages: Vec<Message> =
match SignedMessage::decrypt(cipher_sk, protocol_state, encrypted).await {
match SignedMessage::decrypt(cipher_sk, protocol_state, &encrypted).await {
Ok(msg) => msg,
Err(err) => {
tracing::error!(?err, "failed to decrypt or verify an encrypted message");
if matches!(err, MessageError::UnknownParticipant(_)) {
retry.push((encrypted, Instant::now()));
} else {
tracing::error!(?err, "failed to decrypt/verify received messages");
}
continue;
}
};
Expand All @@ -223,6 +259,8 @@ impl MessageInbox {
self.push(msg);
}
}

self.retry_decrypt.extend(retry);
count
}
}
Expand Down Expand Up @@ -252,11 +290,13 @@ impl MessageExecutor {
)
};

self.inbox
.write()
.await
.extend_decrypt(&cipher_sk, &self.protocol_state, &mut self.incoming)
.await;
{
let mut inbox = self.inbox.write().await;
inbox
.extend_decrypt(&cipher_sk, &self.protocol_state, &mut self.incoming)
.await;
inbox.expire(&protocol);
}

let active = {
let mesh_state = self.mesh_state.read().await;
Expand Down Expand Up @@ -709,10 +749,10 @@ impl SignedMessage {
pub async fn decrypt<T: DeserializeOwned>(
cipher_sk: &hpke::SecretKey,
protocol_state: &Arc<RwLock<NodeState>>,
encrypted: Ciphered,
encrypted: &Ciphered,
) -> Result<T, MessageError> {
let msg = cipher_sk
.decrypt(&encrypted, Self::ASSOCIATED_DATA)
.decrypt(encrypted, Self::ASSOCIATED_DATA)
.map_err(|err| {
tracing::error!(error = ?err, "failed to decrypt message");
MessageError::Encryption(err.to_string())
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/actions/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ async fn require_node_state(nodes: &Cluster, state: NodeState, id: usize) -> any
};

if node_state != state {
anyhow::bail!("node not ready yet {:?} != {:?}", node_state, state);
anyhow::bail!("node(id={id}) not ready yet {node_state:?} != {state:?}",);
}

Ok(state)
Expand Down
8 changes: 4 additions & 4 deletions integration-tests/tests/cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,22 +433,22 @@ async fn test_multichain_reshare_with_lake_congestion() -> anyhow::Result<()> {
// this fails if the latency above is too long (10s)
nodes.leave(None).await.unwrap();

let state = nodes.wait().running().await?;
let state = nodes.expect_running().await?;
assert!(state.participants.len() == 2);

// Going below T should error out
nodes.leave(None).await.unwrap_err();
let state = nodes.wait().running().await?;
let state = nodes.expect_running().await?;
assert!(state.participants.len() == 2);

nodes.join(None).await.unwrap();
// add latency to node2->rpc
add_latency(&nodes.nodes.proxy_name_for_node(2), true, 1.0, 1_000, 100).await?;
let state = nodes.wait().running().await?;
let state = nodes.expect_running().await?;
assert!(state.participants.len() == 3);

nodes.leave(None).await.unwrap();
let state = nodes.wait().running().await?;
let state = nodes.expect_running().await?;
assert!(state.participants.len() == 2);

// make sure signing works after reshare
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/cluster/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl IntoFuture for ClusterSpawner {
};

if self.wait_for_running {
cluster.wait().running().await?;
cluster.wait().running().nodes_running().await?;
}

Ok(cluster)
Expand Down
1 change: 1 addition & 0 deletions integration-tests/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl Cluster {
.running_on_epoch(state.epoch + 1)
.participant_missing(&kick)
.await?;

tracing::info!(
"Getting new state, old {} {:?}, new {} {:?}",
state.participants.len(),
Expand Down

0 comments on commit eb18367

Please sign in to comment.