Skip to content

Commit

Permalink
enable batching multiple sign calls in one batch transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
ppca committed Oct 25, 2024
1 parent 20aacf6 commit 0e87e27
Show file tree
Hide file tree
Showing 15 changed files with 275 additions and 72 deletions.
1 change: 1 addition & 0 deletions chain-signatures/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 16 additions & 3 deletions chain-signatures/contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,23 @@ impl Default for VersionedMpcContract {
#[derive(BorshDeserialize, BorshSerialize, Debug)]
pub struct MpcContract {
protocol_state: ProtocolContractState,
pending_requests: LookupMap<SignatureRequest, YieldIndex>,
pending_requests: LookupMap<SignatureRequest, Option<YieldIndex>>,
request_counter: u32,
proposed_updates: ProposedUpdates,
config: Config,
}

impl MpcContract {
fn mark_request_received(&mut self, request: &SignatureRequest) {
if self.pending_requests.insert(request, &None).is_none() {
self.request_counter += 1;
}
}

fn add_request(&mut self, request: &SignatureRequest, data_id: CryptoHash) {
if self
.pending_requests
.insert(request, &YieldIndex { data_id })
.insert(request, &Some(YieldIndex { data_id }))
.is_none()
{
self.request_counter += 1;
Expand Down Expand Up @@ -166,6 +172,7 @@ impl VersionedMpcContract {
"sign: predecessor={predecessor}, payload={payload:?}, path={path:?}, key_version={key_version}",
);
env::log_str(&serde_json::to_string(&near_sdk::env::random_seed_array()).unwrap());
self.mark_request_received(&request);
let contract_signature_request = ContractSignatureRequest {
request,
requester: predecessor,
Expand Down Expand Up @@ -275,7 +282,7 @@ impl VersionedMpcContract {

match self {
Self::V0(mpc_contract) => {
if let Some(YieldIndex { data_id }) =
if let Some(Some(YieldIndex { data_id })) =
mpc_contract.pending_requests.get(&request)
{
env::promise_yield_resume(
Expand Down Expand Up @@ -803,6 +810,12 @@ impl VersionedMpcContract {
}
}

fn mark_request_received(&mut self, request: &SignatureRequest) {
match self {
Self::V0(ref mut mpc_contract) => mpc_contract.mark_request_received(request),
}
}

fn threshold(&self) -> Result<usize, Error> {
match self {
Self::V0(contract) => match &contract.protocol_state {
Expand Down
1 change: 1 addition & 0 deletions chain-signatures/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ aws-sdk-s3 = "1.29"
aws-types = "1.2"
axum = { version = "0.6.19" }
axum-extra = "0.7"
borsh = "1.5.0"
cait-sith = { git = "https://github.com/LIT-Protocol/cait-sith.git", features = [
"k256",
], rev = "8ad2316" }
Expand Down
2 changes: 1 addition & 1 deletion chain-signatures/node/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl MessageQueue {
}

if uncompacted > 0 {
tracing::info!(
tracing::debug!(
uncompacted,
compacted,
"{from:?} sent messages in {:?};",
Expand Down
2 changes: 1 addition & 1 deletion chain-signatures/node/src/mesh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Mesh {
pub fn all_active_participants(&self) -> Participants {
let mut participants = self.active_participants.clone();
let active = self.active_potential_participants.keys_vec();
tracing::info!(?active, "Getting potentially active participants");
tracing::debug!(?active, "Getting potentially active participants");
for (participant, info) in self.active_potential_participants.iter() {
if !participants.contains_key(participant) {
participants.insert(participant, info.clone());
Expand Down
2 changes: 1 addition & 1 deletion chain-signatures/node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl CryptographicProtocol for GeneratingState {
mut self,
mut ctx: C,
) -> Result<NodeState, CryptographicError> {
tracing::info!(active = ?ctx.mesh().active_participants().keys_vec(), "generating: progressing key generation");
tracing::debug!(active = ?ctx.mesh().active_participants().keys_vec(), "generating: progressing key generation");
let mut protocol = self.protocol.write().await;
loop {
let action = match protocol.poke() {
Expand Down
27 changes: 18 additions & 9 deletions chain-signatures/node/src/protocol/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::cryptography::CryptographicError;
use super::presignature::{GenerationError, PresignatureId};
use super::signature::SignRequestIdentifier;
use super::state::{GeneratingState, NodeState, ResharingState, RunningState};
use super::triple::TripleId;
use crate::gcp::error::SecretStorageError;
Expand Down Expand Up @@ -103,7 +104,7 @@ pub struct MpcMessageQueue {
resharing_bins: HashMap<u64, VecDeque<ResharingMessage>>,
triple_bins: HashMap<u64, HashMap<TripleId, VecDeque<TripleMessage>>>,
presignature_bins: HashMap<u64, HashMap<PresignatureId, VecDeque<PresignatureMessage>>>,
signature_bins: HashMap<u64, HashMap<CryptoHash, VecDeque<SignatureMessage>>>,
signature_bins: HashMap<u64, HashMap<SignRequestIdentifier, VecDeque<SignatureMessage>>>,
}

impl MpcMessageQueue {
Expand Down Expand Up @@ -133,7 +134,11 @@ impl MpcMessageQueue {
.signature_bins
.entry(message.epoch)
.or_default()
.entry(message.receipt_id)
.entry(SignRequestIdentifier::new(
message.receipt_id,
message.epsilon,
message.request.payload,
))
.or_default()
.push_back(message),
}
Expand Down Expand Up @@ -366,7 +371,7 @@ impl MessageHandler for RunningState {

let mut signature_manager = self.signature_manager.write().await;
let signature_messages = queue.signature_bins.entry(self.epoch).or_default();
signature_messages.retain(|receipt_id, queue| {
signature_messages.retain(|sign_request_identifier, queue| {
// Skip message if it already timed out
if queue.is_empty()
|| queue.iter().any(|msg| {
Expand All @@ -379,9 +384,9 @@ impl MessageHandler for RunningState {
return false;
}

!signature_manager.refresh_gc(receipt_id)
!signature_manager.refresh_gc(sign_request_identifier)
});
for (receipt_id, queue) in signature_messages {
for (sign_request_identifier, queue) in signature_messages {
// SAFETY: this unwrap() is safe since we have already checked that the queue is not empty.
let SignatureMessage {
proposer,
Expand Down Expand Up @@ -414,7 +419,7 @@ impl MessageHandler for RunningState {
// TODO: Validate that the message matches our sign_queue
let protocol = match signature_manager.get_or_generate(
participants,
*receipt_id,
sign_request_identifier.receipt_id,
*proposer,
*presignature_id,
request,
Expand All @@ -437,15 +442,19 @@ impl MessageHandler for RunningState {
// and have the other nodes timeout in the following cases:
// - If a presignature is in GC, then it was used already or failed to be produced.
// - If a presignature is missing, that means our system cannot process this signature.
tracing::warn!(%receipt_id, ?err, "signature cannot be generated");
tracing::warn!(
?sign_request_identifier,
?err,
"signature cannot be generated"
);
queue.clear();
continue;
}
Err(GenerationError::CaitSithInitializationError(error)) => {
// ignore the whole of the messages since the generation had bad parameters. Also have the other node who
// initiated the protocol resend the message or have it timeout on their side.
tracing::warn!(
?receipt_id,
?sign_request_identifier,
presignature_id,
?error,
"unable to initialize incoming signature protocol"
Expand All @@ -455,7 +464,7 @@ impl MessageHandler for RunningState {
}
Err(err) => {
tracing::warn!(
?receipt_id,
?sign_request_identifier,
?err,
"Unexpected error encounted while generating signature"
);
Expand Down
6 changes: 3 additions & 3 deletions chain-signatures/node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl PresignatureManager {
)));
}

tracing::info!(id, "starting protocol to generate a new presignature");
tracing::debug!(id, "starting protocol to generate a new presignature");
let generator = Self::generate_internal(
participants,
self.me,
Expand Down Expand Up @@ -341,7 +341,7 @@ impl PresignatureManager {
} else {
match self.generators.entry(id) {
Entry::Vacant(entry) => {
tracing::info!(id, "joining protocol to generate a new presignature");
tracing::debug!(id, "joining protocol to generate a new presignature");
let (triple0, triple1) = match triple_manager.take_two(triple0, triple1).await {
Ok(result) => result,
Err(error) => match error {
Expand Down Expand Up @@ -509,7 +509,7 @@ impl PresignatureManager {
},
);
if generator.mine {
tracing::info!(id, "assigning presignature to myself");
tracing::debug!(id, "assigning presignature to myself");
self.mine.push_back(*id);
crate::metrics::NUM_TOTAL_HISTORICAL_PRESIGNATURE_GENERATORS_MINE_SUCCESS
.with_label_values(&[self.my_account_id.as_str()])
Expand Down
Loading

0 comments on commit 0e87e27

Please sign in to comment.