From 477c86040dbd1ea9acd851c2dbfef1ad4a96e4df Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Thu, 24 Oct 2024 16:28:09 -0700 Subject: [PATCH] enable batching multiple sign calls in one batch transaction --- chain-signatures/Cargo.lock | 1 + chain-signatures/contract/src/lib.rs | 19 ++- chain-signatures/node/Cargo.toml | 1 + chain-signatures/node/src/protocol/message.rs | 27 ++-- .../node/src/protocol/presignature.rs | 2 +- .../node/src/protocol/signature.rs | 115 ++++++++++-------- chain-signatures/node/src/protocol/triple.rs | 2 +- integration-tests/chain-signatures/Cargo.lock | 2 + integration-tests/chain-signatures/Cargo.toml | 1 + .../chain-signatures/tests/actions/mod.rs | 56 +++++++++ .../tests/actions/wait_for.rs | 96 +++++++++++++++ .../chain-signatures/tests/cases/mod.rs | 13 ++ 12 files changed, 269 insertions(+), 66 deletions(-) diff --git a/chain-signatures/Cargo.lock b/chain-signatures/Cargo.lock index 5df45e303..5a730725d 100644 --- a/chain-signatures/Cargo.lock +++ b/chain-signatures/Cargo.lock @@ -3170,6 +3170,7 @@ dependencies = [ "aws-types", "axum", "axum-extra", + "borsh", "cait-sith", "chrono", "clap", diff --git a/chain-signatures/contract/src/lib.rs b/chain-signatures/contract/src/lib.rs index c75fccb90..cc4402c48 100644 --- a/chain-signatures/contract/src/lib.rs +++ b/chain-signatures/contract/src/lib.rs @@ -64,17 +64,23 @@ impl Default for VersionedMpcContract { #[derive(BorshDeserialize, BorshSerialize, Debug)] pub struct MpcContract { protocol_state: ProtocolContractState, - pending_requests: LookupMap, + pending_requests: LookupMap>, request_counter: u32, proposed_updates: ProposedUpdates, config: Config, } impl MpcContract { + fn mark_request_received(&mut self, request: &SignatureRequest) { + if self.pending_requests.insert(request, &None).is_none() { + self.request_counter += 1; + } + } + fn add_request(&mut self, request: &SignatureRequest, data_id: CryptoHash) { if self .pending_requests - .insert(request, &YieldIndex { data_id }) + .insert(request, &Some(YieldIndex { data_id })) .is_none() { self.request_counter += 1; @@ -166,6 +172,7 @@ impl VersionedMpcContract { "sign: predecessor={predecessor}, payload={payload:?}, path={path:?}, key_version={key_version}", ); env::log_str(&serde_json::to_string(&near_sdk::env::random_seed_array()).unwrap()); + self.mark_request_received(&request); let contract_signature_request = ContractSignatureRequest { request, requester: predecessor, @@ -275,7 +282,7 @@ impl VersionedMpcContract { match self { Self::V0(mpc_contract) => { - if let Some(YieldIndex { data_id }) = + if let Some(Some(YieldIndex { data_id })) = mpc_contract.pending_requests.get(&request) { env::promise_yield_resume( @@ -803,6 +810,12 @@ impl VersionedMpcContract { } } + fn mark_request_received(&mut self, request: &SignatureRequest) { + match self { + Self::V0(ref mut mpc_contract) => mpc_contract.mark_request_received(request), + } + } + fn threshold(&self) -> Result { match self { Self::V0(contract) => match &contract.protocol_state { diff --git a/chain-signatures/node/Cargo.toml b/chain-signatures/node/Cargo.toml index 68b91637d..819038dae 100644 --- a/chain-signatures/node/Cargo.toml +++ b/chain-signatures/node/Cargo.toml @@ -15,6 +15,7 @@ aws-sdk-s3 = "1.29" aws-types = "1.2" axum = { version = "0.6.19" } axum-extra = "0.7" +borsh = "1.5.0" cait-sith = { git = "https://github.com/LIT-Protocol/cait-sith.git", features = [ "k256", ], rev = "8ad2316" } diff --git a/chain-signatures/node/src/protocol/message.rs b/chain-signatures/node/src/protocol/message.rs index 1d91071c8..f6a89666d 100644 --- a/chain-signatures/node/src/protocol/message.rs +++ b/chain-signatures/node/src/protocol/message.rs @@ -1,5 +1,6 @@ use super::cryptography::CryptographicError; use super::presignature::{GenerationError, PresignatureId}; +use super::signature::SignRequestIdentifier; use super::state::{GeneratingState, NodeState, ResharingState, RunningState}; use super::triple::TripleId; use crate::gcp::error::SecretStorageError; @@ -103,7 +104,7 @@ pub struct MpcMessageQueue { resharing_bins: HashMap>, triple_bins: HashMap>>, presignature_bins: HashMap>>, - signature_bins: HashMap>>, + signature_bins: HashMap>>, } impl MpcMessageQueue { @@ -133,7 +134,11 @@ impl MpcMessageQueue { .signature_bins .entry(message.epoch) .or_default() - .entry(message.receipt_id) + .entry(SignRequestIdentifier::new( + message.receipt_id, + message.epsilon, + message.request.payload, + )) .or_default() .push_back(message), } @@ -366,7 +371,7 @@ impl MessageHandler for RunningState { let mut signature_manager = self.signature_manager.write().await; let signature_messages = queue.signature_bins.entry(self.epoch).or_default(); - signature_messages.retain(|receipt_id, queue| { + signature_messages.retain(|sign_request_identifier, queue| { // Skip message if it already timed out if queue.is_empty() || queue.iter().any(|msg| { @@ -379,9 +384,9 @@ impl MessageHandler for RunningState { return false; } - !signature_manager.refresh_gc(receipt_id) + !signature_manager.refresh_gc(sign_request_identifier) }); - for (receipt_id, queue) in signature_messages { + for (sign_request_identifier, queue) in signature_messages { // SAFETY: this unwrap() is safe since we have already checked that the queue is not empty. let SignatureMessage { proposer, @@ -414,7 +419,7 @@ impl MessageHandler for RunningState { // TODO: Validate that the message matches our sign_queue let protocol = match signature_manager.get_or_generate( participants, - *receipt_id, + sign_request_identifier.receipt_id, *proposer, *presignature_id, request, @@ -437,7 +442,11 @@ impl MessageHandler for RunningState { // and have the other nodes timeout in the following cases: // - If a presignature is in GC, then it was used already or failed to be produced. // - If a presignature is missing, that means our system cannot process this signature. - tracing::warn!(%receipt_id, ?err, "signature cannot be generated"); + tracing::warn!( + ?sign_request_identifier, + ?err, + "signature cannot be generated" + ); queue.clear(); continue; } @@ -445,7 +454,7 @@ impl MessageHandler for RunningState { // ignore the whole of the messages since the generation had bad parameters. Also have the other node who // initiated the protocol resend the message or have it timeout on their side. tracing::warn!( - ?receipt_id, + ?sign_request_identifier, presignature_id, ?error, "unable to initialize incoming signature protocol" @@ -455,7 +464,7 @@ impl MessageHandler for RunningState { } Err(err) => { tracing::warn!( - ?receipt_id, + ?sign_request_identifier, ?err, "Unexpected error encounted while generating signature" ); diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index b0158b2ed..35bb200a7 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -234,7 +234,7 @@ impl PresignatureManager { ))); } - tracing::info!(id, "starting protocol to generate a new presignature"); + tracing::debug!(id, "starting protocol to generate a new presignature"); let generator = Self::generate_internal( participants, self.me, diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index e148ca107..5584c3a22 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -38,18 +38,12 @@ pub struct SignRequest { /// Type that preserves the insertion order of requests. #[derive(Default)] pub struct ParticipantRequests { - requests: HashMap, - order: VecDeque, + requests: VecDeque, } impl ParticipantRequests { - fn insert(&mut self, receipt_id: ReceiptId, request: SignRequest) { - self.requests.insert(receipt_id, request); - self.order.push_back(receipt_id); - } - - fn contains_key(&self, receipt_id: &ReceiptId) -> bool { - self.requests.contains_key(receipt_id) + fn insert(&mut self, request: SignRequest) { + self.requests.push_back(request); } pub fn len(&self) -> usize { @@ -60,11 +54,8 @@ impl ParticipantRequests { self.len() == 0 } - pub fn pop_front(&mut self) -> Option<(ReceiptId, SignRequest)> { - let receipt_id = self.order.pop_front()?; - self.requests - .remove(&receipt_id) - .map(|req| (receipt_id, req)) + pub fn pop_front(&mut self) -> Option { + self.requests.pop_front() } } @@ -127,7 +118,7 @@ impl SignQueue { "saving sign request: node is in the signer subset" ); let proposer_requests = self.requests.entry(proposer).or_default(); - proposer_requests.insert(request.receipt_id, request); + proposer_requests.insert(request); if is_mine { crate::metrics::NUM_SIGN_REQUESTS_MINE .with_label_values(&[my_account_id.as_str()]) @@ -145,13 +136,6 @@ impl SignQueue { } } - pub fn contains(&self, participant: Participant, receipt_id: ReceiptId) -> bool { - let Some(participant_requests) = self.requests.get(&participant) else { - return false; - }; - participant_requests.contains_key(&receipt_id) - } - pub fn my_requests(&mut self, me: Participant) -> &mut ParticipantRequests { self.requests.entry(me).or_default() } @@ -232,13 +216,30 @@ pub struct GenerationRequest { pub sign_request_timestamp: Instant, } +#[derive(Debug, Clone, Eq, Hash, PartialEq)] +pub struct SignRequestIdentifier { + pub receipt_id: ReceiptId, + pub epsilon: Vec, + pub payload: Vec, +} + +impl SignRequestIdentifier { + pub fn new(receipt_id: ReceiptId, epsilon: Scalar, payload: Scalar) -> Self { + Self { + receipt_id, + epsilon: borsh::to_vec(&SerializableScalar { scalar: epsilon }).unwrap(), + payload: borsh::to_vec(&SerializableScalar { scalar: payload }).unwrap(), + } + } +} + pub struct SignatureManager { /// Ongoing signature generation protocols. - generators: HashMap, + generators: HashMap, /// Failed signatures awaiting to be retried. - failed: VecDeque<(ReceiptId, GenerationRequest)>, + failed: VecDeque<(SignRequestIdentifier, GenerationRequest)>, /// Set of completed signatures - completed: HashMap, + completed: HashMap, /// Generated signatures assigned to the current node that are yet to be published. /// Vec<(receipt_id, msg_hash, timestamp, output)> signatures: Vec, @@ -356,13 +357,13 @@ impl SignatureManager { #[allow(clippy::result_large_err)] fn retry_failed_generation( &mut self, - receipt_id: ReceiptId, + sign_request_identifier: SignRequestIdentifier, req: GenerationRequest, presignature: Presignature, participants: &Participants, cfg: &ProtocolConfig, ) -> Result<(), (Presignature, InitializationError)> { - tracing::info!(receipt_id = %receipt_id, participants = ?participants.keys_vec(), "restarting failed protocol to generate signature"); + tracing::info!(sign_request_identifier = ?sign_request_identifier, participants = ?participants.keys_vec(), "restarting failed protocol to generate signature"); let generator = Self::generate_internal( participants, self.me, @@ -374,7 +375,7 @@ impl SignatureManager { crate::metrics::NUM_TOTAL_HISTORICAL_SIGNATURE_GENERATORS .with_label_values(&[self.my_account_id.as_str()]) .inc(); - self.generators.insert(receipt_id, generator); + self.generators.insert(sign_request_identifier, generator); Ok(()) } @@ -392,8 +393,10 @@ impl SignatureManager { sign_request_timestamp: Instant, cfg: &ProtocolConfig, ) -> Result<(), (Presignature, InitializationError)> { + let sign_request_identifier = + SignRequestIdentifier::new(receipt_id, epsilon, request.payload); tracing::info!( - %receipt_id, + ?sign_request_identifier, me = ?self.me, presignature_id = presignature.id, participants = ?participants.keys_vec(), @@ -417,7 +420,7 @@ impl SignatureManager { crate::metrics::NUM_TOTAL_HISTORICAL_SIGNATURE_GENERATORS .with_label_values(&[self.my_account_id.as_str()]) .inc(); - self.generators.insert(receipt_id, generator); + self.generators.insert(sign_request_identifier, generator); Ok(()) } @@ -440,13 +443,15 @@ impl SignatureManager { presignature_manager: &mut PresignatureManager, cfg: &ProtocolConfig, ) -> Result<&mut SignatureProtocol, GenerationError> { - if self.completed.contains_key(&receipt_id) { - tracing::warn!(%receipt_id, presignature_id, "presignature has already been used to generate a signature"); + let sign_request_identifier = + SignRequestIdentifier::new(receipt_id, epsilon, request.payload); + if self.completed.contains_key(&sign_request_identifier) { + tracing::warn!(sign_request_identifier = ?sign_request_identifier.clone(), presignature_id, "presignature has already been used to generate a signature"); return Err(GenerationError::AlreadyGenerated); } - match self.generators.entry(receipt_id) { + match self.generators.entry(sign_request_identifier.clone()) { Entry::Vacant(entry) => { - tracing::info!(%receipt_id, me = ?self.me, presignature_id, "joining protocol to generate a new signature"); + tracing::info!(sign_request_identifier = ?sign_request_identifier.clone(), me = ?self.me, presignature_id, "joining protocol to generate a new signature"); let presignature = match presignature_manager.take(presignature_id) { Ok(presignature) => presignature, Err(err @ GenerationError::PresignatureIsGenerating(_)) => { @@ -502,7 +507,7 @@ impl SignatureManager { /// An empty vector means we cannot progress until we receive a new message. pub fn poke(&mut self) -> Vec<(Participant, SignatureMessage)> { let mut messages = Vec::new(); - self.generators.retain(|receipt_id, generator| { + self.generators.retain(|sign_request_identifier, generator| { loop { let action = match generator.poke() { Ok(action) => action, @@ -516,7 +521,7 @@ impl SignatureManager { // only retry the signature generation if it was initially proposed by us. We do not // want any nodes to be proposing the same signature multiple times. self.failed.push_back(( - *receipt_id, + sign_request_identifier.clone(), GenerationRequest { proposer: generator.proposer, request: generator.request.clone(), @@ -527,7 +532,7 @@ impl SignatureManager { }, )); } else { - self.completed.insert(*receipt_id, Instant::now()); + self.completed.insert(sign_request_identifier.clone(), Instant::now()); crate::metrics::SIGNATURE_FAILURES .with_label_values(&[self.my_account_id.as_str()]) .inc(); @@ -548,7 +553,7 @@ impl SignatureManager { messages.push(( *p, SignatureMessage { - receipt_id: *receipt_id, + receipt_id: sign_request_identifier.receipt_id, proposer: generator.proposer, presignature_id: generator.presignature_id, request: generator.request.clone(), @@ -565,7 +570,7 @@ impl SignatureManager { Action::SendPrivate(p, data) => messages.push(( p, SignatureMessage { - receipt_id: *receipt_id, + receipt_id: sign_request_identifier.receipt_id, proposer: generator.proposer, presignature_id: generator.presignature_id, request: generator.request.clone(), @@ -579,21 +584,21 @@ impl SignatureManager { )), Action::Return(output) => { tracing::info!( - ?receipt_id, + sign_request_identifier =?sign_request_identifier.clone(), me = ?self.me, presignature_id = generator.presignature_id, big_r = ?output.big_r.to_base58(), s = ?output.s, "completed signature generation" ); - self.completed.insert(*receipt_id, Instant::now()); + self.completed.insert(sign_request_identifier.clone(), Instant::now()); let request = SignatureRequest { epsilon: SerializableScalar {scalar: generator.epsilon}, payload_hash: generator.request.payload.into(), }; if generator.proposer == self.me { self.signatures - .push(ToPublish::new(*receipt_id, request, generator.sign_request_timestamp, output)); + .push(ToPublish::new(sign_request_identifier.receipt_id, request, generator.sign_request_timestamp, output)); } // Do not retain the protocol return false; @@ -644,17 +649,22 @@ impl SignatureManager { // TODO: we need to decide how to prioritize certain requests over others such as with gas or time of // when the request made it into the NEAR network. // issue: https://github.com/near/mpc-recovery/issues/596 - if let Some((receipt_id, failed_req)) = self.failed.pop_front() { + if let Some((sign_request_identifier, failed_req)) = self.failed.pop_front() { if let Err((presignature, InitializationError::BadParameters(err))) = self .retry_failed_generation( - receipt_id, + sign_request_identifier.clone(), failed_req, presignature, &sig_participants, cfg, ) { - tracing::warn!(%receipt_id, presig_id, ?err, "failed to retry signature generation: trashing presignature"); + tracing::warn!( + ?sign_request_identifier, + presig_id, + ?err, + "failed to retry signature generation: trashing presignature" + ); failed_presigs.push(presignature); continue; } @@ -666,13 +676,14 @@ impl SignatureManager { } } - let Some((receipt_id, my_request)) = my_requests.pop_front() else { + let Some(my_request) = my_requests.pop_front() else { failed_presigs.push(presignature); continue; }; + if let Err((presignature, InitializationError::BadParameters(err))) = self.generate( &sig_participants, - receipt_id, + my_request.receipt_id, presignature, my_request.request, my_request.epsilon, @@ -681,7 +692,7 @@ impl SignatureManager { cfg, ) { failed_presigs.push(presignature); - tracing::warn!(%receipt_id, presig_id, ?err, "failed to start signature generation: trashing presignature"); + tracing::warn!(%my_request.receipt_id, presig_id, ?err, "failed to start signature generation: trashing presignature"); continue; } } @@ -733,7 +744,7 @@ impl SignatureManager { { Ok(response) => response, Err(err) => { - tracing::error!(%receipt_id, error = ?err, "Failed to publish the signature"); + tracing::error!(%receipt_id, request = ?request, error = ?err, "Failed to publish the signature"); crate::metrics::SIGNATURE_PUBLISH_FAILURES .with_label_values(&[self.my_account_id.as_str()]) .inc(); @@ -748,7 +759,7 @@ impl SignatureManager { match response.json() { Ok(()) => { - tracing::info!(%receipt_id, bi_r = signature.big_r.affine_point.to_base58(), s = ?signature.s, "published signature sucessfully") + tracing::info!(%receipt_id, request = ?request, bi_r = signature.big_r.affine_point.to_base58(), s = ?signature.s, "published signature sucessfully") } Err(err) => { tracing::error!(%receipt_id, bi_r = signature.big_r.affine_point.to_base58(), s = ?signature.s, error = ?err, "smart contract threw error"); @@ -790,10 +801,10 @@ impl SignatureManager { } } - pub fn refresh_gc(&mut self, id: &ReceiptId) -> bool { + pub fn refresh_gc(&mut self, id: &SignRequestIdentifier) -> bool { let entry = self .completed - .entry(*id) + .entry(id.clone()) .and_modify(|e| *e = Instant::now()); matches!(entry, Entry::Occupied(_)) } diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 2bcca581b..c76ac16b6 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -226,7 +226,7 @@ impl TripleManager { ))); } - tracing::info!(id, "starting protocol to generate a new triple"); + tracing::debug!(id, "starting protocol to generate a new triple"); let participants: Vec<_> = participants.keys().cloned().collect(); let protocol: TripleProtocol = Box::new(cait_sith::triples::generate_triple::( &participants, diff --git a/integration-tests/chain-signatures/Cargo.lock b/integration-tests/chain-signatures/Cargo.lock index e76ab76df..ce810acb0 100644 --- a/integration-tests/chain-signatures/Cargo.lock +++ b/integration-tests/chain-signatures/Cargo.lock @@ -3264,6 +3264,7 @@ dependencies = [ "anyhow", "async-process", "backon", + "base64 0.22.1", "bollard", "cait-sith", "clap", @@ -3695,6 +3696,7 @@ dependencies = [ "aws-types", "axum", "axum-extra", + "borsh", "cait-sith", "chrono", "clap", diff --git a/integration-tests/chain-signatures/Cargo.toml b/integration-tests/chain-signatures/Cargo.toml index 0712fba45..63dd5131a 100644 --- a/integration-tests/chain-signatures/Cargo.toml +++ b/integration-tests/chain-signatures/Cargo.toml @@ -8,6 +8,7 @@ publish = false [dependencies] anyhow = { version = "1.0", features = ["backtrace"] } async-process = "1" +base64 = "0.22.1" bollard = "0.13" clap = { version = "4.5.4", features = ["derive"] } futures = "0.3" diff --git a/integration-tests/chain-signatures/tests/actions/mod.rs b/integration-tests/chain-signatures/tests/actions/mod.rs index fa2236ba4..5cd6b4f2a 100644 --- a/integration-tests/chain-signatures/tests/actions/mod.rs +++ b/integration-tests/chain-signatures/tests/actions/mod.rs @@ -20,6 +20,7 @@ use mpc_contract::RunningContractState; use mpc_node::kdf::into_eth_sig; use near_crypto::InMemorySigner; use near_fetch::ops::AsyncTransactionStatus; +use near_fetch::ops::Function; use near_workspaces::types::Gas; use near_workspaces::types::NearToken; use near_workspaces::Account; @@ -71,6 +72,42 @@ pub async fn request_sign( Ok((payload, payload_hashed, account, status)) } +pub async fn request_batch_sign( + ctx: &MultichainTestContext<'_>, +) -> anyhow::Result<(Vec<([u8; 32], [u8; 32])>, Account, AsyncTransactionStatus)> { + let worker = &ctx.nodes.ctx().worker; + let account = worker.dev_create_account().await?; + let signer = InMemorySigner { + account_id: account.id().clone(), + public_key: account.secret_key().public_key().to_string().parse()?, + secret_key: account.secret_key().to_string().parse()?, + }; + + let mut payloads: Vec<([u8; 32], [u8; 32])> = vec![]; + let mut tx = ctx.rpc_client.batch(&signer, ctx.contract().id()); + for _ in 0..3 { + let payload: [u8; 32] = rand::thread_rng().gen(); + let payload_hashed = web3::signing::keccak256(&payload); + payloads.push((payload, payload_hashed)); + let request = SignRequest { + payload: payload_hashed, + path: "test".to_string(), + key_version: 0, + }; + let function = Function::new("sign") + .args_json(serde_json::json!({ + "request": request, + })) + .gas(Gas::from_tgas(50)) + .deposit(NearToken::from_yoctonear(1)); + tx = tx.call(function); + } + + let status = tx.transact_async().await?; + tokio::time::sleep(Duration::from_secs(3)).await; + Ok((payloads, account, status)) +} + pub async fn assert_signature( account_id: &near_workspaces::AccountId, mpc_pk_bytes: &[u8], @@ -303,6 +340,25 @@ pub async fn clear_toxics() -> anyhow::Result<()> { Ok(()) } +pub async fn batch_signature_production( + ctx: &MultichainTestContext<'_>, + state: &RunningContractState, +) -> anyhow::Result<()> { + let (payloads, account, status) = request_batch_sign(ctx).await?; + let signatures = wait_for::batch_signature_responded(status).await?; + + let mut mpc_pk_bytes = vec![0x04]; + mpc_pk_bytes.extend_from_slice(&state.public_key.as_bytes()[1..]); + assert_eq!(payloads.len(), signatures.len()); + for i in 0..payloads.len() { + let (_, payload_hash) = payloads.get(i).unwrap(); + let signature = signatures.get(i).unwrap(); + assert_signature(account.id(), &mpc_pk_bytes, *payload_hash, signature).await; + } + + Ok(()) +} + // This test hardcodes the output of the signing process and checks that everything verifies as expected // If you find yourself changing the constants in this test you are likely breaking backwards compatibility #[tokio::test] diff --git a/integration-tests/chain-signatures/tests/actions/wait_for.rs b/integration-tests/chain-signatures/tests/actions/wait_for.rs index ac2e00693..82245609d 100644 --- a/integration-tests/chain-signatures/tests/actions/wait_for.rs +++ b/integration-tests/chain-signatures/tests/actions/wait_for.rs @@ -14,9 +14,13 @@ use mpc_contract::ProtocolContractState; use mpc_contract::RunningContractState; use mpc_node::web::StateView; use near_fetch::ops::AsyncTransactionStatus; +use near_lake_primitives::CryptoHash; use near_primitives::errors::ActionErrorKind; +use near_primitives::views::ExecutionOutcomeWithIdView; +use near_primitives::views::ExecutionStatusView; use near_primitives::views::FinalExecutionStatus; use near_workspaces::Account; +use std::collections::HashMap; use url::Url; pub async fn running_mpc<'a>( @@ -256,6 +260,7 @@ pub enum WaitForError { enum Outcome { Signature(FullSignature), Failed(String), + Signatures(Vec>), } pub async fn signature_responded( @@ -290,6 +295,9 @@ pub async fn signature_responded( match is_tx_ready.retry(&strategy).await? { Outcome::Signature(signature) => Ok(signature), Outcome::Failed(err) => Err(WaitForError::Signature(SignatureError::Failed(err))), + _ => Err(WaitForError::Signature(SignatureError::Failed( + "Should not return more than one signature".to_string(), + ))), } } @@ -366,3 +374,91 @@ pub async fn rogue_message_responded(status: AsyncTransactionStatus) -> anyhow:: Ok(signature.clone()) } + +pub async fn batch_signature_responded( + status: AsyncTransactionStatus, +) -> Result>, WaitForError> { + let is_tx_ready = || async { + let Poll::Ready(outcome) = status + .status() + .await + .map_err(|err| WaitForError::JsonRpc(format!("{err:?}")))? + else { + return Err(WaitForError::Signature(SignatureError::NotYetAvailable)); + }; + + if !outcome.is_success() { + return Err(WaitForError::Signature(SignatureError::Failed(format!( + "status: {:?}", + outcome.status() + )))); + } + + let receipt_outcomes = outcome.details.receipt_outcomes(); + let mut result_receipts: HashMap> = HashMap::new(); + for receipt_outcome in receipt_outcomes { + result_receipts + .entry(receipt_outcome.id) + .or_insert(receipt_outcome.outcome.receipt_ids.clone()); + } + let mut receipt_outcomes_keyed: HashMap = + HashMap::new(); + for receipt_outcome in receipt_outcomes { + receipt_outcomes_keyed + .entry(receipt_outcome.id) + .or_insert(receipt_outcome); + } + + let starting_receipts = &receipt_outcomes.get(0).unwrap().outcome.receipt_ids; + + let mut signatures: Vec> = vec![]; + for receipt_id in starting_receipts { + if !result_receipts.contains_key(receipt_id) { + break; + } + let sign_receipt_id = receipt_id; + for receipt_id in result_receipts.get(sign_receipt_id).unwrap() { + let receipt_outcome = receipt_outcomes_keyed + .get(receipt_id) + .unwrap() + .outcome + .clone(); + if receipt_outcome + .logs + .contains(&"Signature is ready.".to_string()) + { + match receipt_outcome.status { + ExecutionStatusView::SuccessValue(value) => { + let result: SignatureResponse = serde_json::from_slice(&value) + .map_err(|err| WaitForError::SerdeJson(format!("{err:?}")))?; + let signature = cait_sith::FullSignature:: { + big_r: result.big_r.affine_point, + s: result.s.scalar, + }; + signatures.push(signature); + } + _ => { + return Err(WaitForError::Signature(SignatureError::Failed( + "one signature not done.".to_string(), + ))) + } + } + } + } + } + + Ok(Outcome::Signatures(signatures)) + }; + + let strategy = ConstantBuilder::default() + .with_delay(Duration::from_secs(20)) + .with_max_times(5); + + match is_tx_ready.retry(&strategy).await? { + Outcome::Signature(_) => Err(WaitForError::Signature(SignatureError::Failed( + "Should not return just 1 signature".to_string(), + ))), + Outcome::Failed(err) => Err(WaitForError::Signature(SignatureError::Failed(err))), + Outcome::Signatures(signatures) => Ok(signatures), + } +} diff --git a/integration-tests/chain-signatures/tests/cases/mod.rs b/integration-tests/chain-signatures/tests/cases/mod.rs index a166d7d23..f12755257 100644 --- a/integration-tests/chain-signatures/tests/cases/mod.rs +++ b/integration-tests/chain-signatures/tests/cases/mod.rs @@ -392,3 +392,16 @@ async fn test_multichain_update_contract() -> anyhow::Result<()> { }) .await } + +#[test(tokio::test)] +async fn test_batch_signature() -> anyhow::Result<()> { + with_multichain_nodes(MultichainConfig::default(), |ctx| { + Box::pin(async move { + let state_0 = wait_for::running_mpc(&ctx, Some(0)).await?; + assert_eq!(state_0.participants.len(), 3); + actions::batch_signature_production(&ctx, &state_0).await?; + Ok(()) + }) + }) + .await +}