diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 85042f6a..83131e2d 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -1,8 +1,6 @@ use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig}; use crate::gcp::GcpService; use crate::protocol::{MpcSignProtocol, SignQueue}; -use crate::storage::presignature_storage::LockPresignatureRedisStorage; -use crate::storage::triple_storage::LockTripleRedisStorage; use crate::{http_client, indexer, mesh, storage, web}; use clap::Parser; use deadpool_redis::Runtime; @@ -213,14 +211,9 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { let redis_cfg = deadpool_redis::Config::from_url(redis_url); let redis_pool = redis_cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); - - let triple_storage: LockTripleRedisStorage = Arc::new(RwLock::new( - storage::triple_storage::init(redis_pool.clone(), &account_id), - )); - - let presignature_storage: LockPresignatureRedisStorage = Arc::new(RwLock::new( - storage::presignature_storage::init(redis_pool.clone(), &account_id), - )); + let triple_storage = storage::triple_storage::init(&redis_pool, &account_id); + let presignature_storage = + storage::presignature_storage::init(&redis_pool, &account_id); let sign_sk = sign_sk.unwrap_or_else(|| account_sk.clone()); let my_address = my_address diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index af070acb..d50fd031 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -12,9 +12,9 @@ use crate::protocol::presignature::PresignatureManager; use crate::protocol::signature::SignatureManager; use crate::protocol::state::{GeneratingState, ResharingState}; use crate::protocol::triple::TripleManager; -use crate::storage::presignature_storage::LockPresignatureRedisStorage; +use crate::storage::presignature_storage::PresignatureRedisStorage; use crate::storage::secret_storage::SecretNodeStorageBox; -use crate::storage::triple_storage::LockTripleRedisStorage; +use crate::storage::triple_storage::TripleRedisStorage; use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare}; use crate::util::AffinePointExt; use crate::{http_client, rpc_client}; @@ -40,8 +40,8 @@ pub trait ConsensusCtx { fn my_address(&self) -> &Url; fn sign_queue(&self) -> Arc>; fn secret_storage(&self) -> &SecretNodeStorageBox; - fn triple_storage(&self) -> LockTripleRedisStorage; - fn presignature_storage(&self) -> LockPresignatureRedisStorage; + fn triple_storage(&self) -> &TripleRedisStorage; + fn presignature_storage(&self) -> &PresignatureRedisStorage; fn cfg(&self) -> &Config; fn message_options(&self) -> http_client::Options; } @@ -361,14 +361,14 @@ impl ConsensusProtocol for WaitingForConsensusState { // Clear triples from storage before starting the new epoch. This is necessary if the node has accumulated // triples from previous epochs. If it was not able to clear the previous triples, we'll leave them as-is - if let Err(err) = ctx.triple_storage().write().await.clear().await { + if let Err(err) = ctx.triple_storage().clear().await { tracing::error!( ?err, "failed to clear triples from storage on new epoch start" ); } - if let Err(err) = ctx.presignature_storage().write().await.clear().await { + if let Err(err) = ctx.presignature_storage().clear().await { tracing::error!( ?err, "failed to clear presignatures from storage on new epoch start" diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 48bc8938..7109e7c4 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -29,9 +29,9 @@ use crate::protocol::consensus::ConsensusProtocol; use crate::protocol::cryptography::CryptographicProtocol; use crate::protocol::message::{MessageHandler, MpcMessageQueue}; use crate::rpc_client; -use crate::storage::presignature_storage::LockPresignatureRedisStorage; +use crate::storage::presignature_storage::PresignatureRedisStorage; use crate::storage::secret_storage::SecretNodeStorageBox; -use crate::storage::triple_storage::LockTripleRedisStorage; +use crate::storage::triple_storage::TripleRedisStorage; use cait_sith::protocol::Participant; use near_account_id::AccountId; @@ -53,8 +53,8 @@ struct Ctx { http_client: reqwest::Client, sign_queue: Arc>, secret_storage: SecretNodeStorageBox, - triple_storage: LockTripleRedisStorage, - presignature_storage: LockPresignatureRedisStorage, + triple_storage: TripleRedisStorage, + presignature_storage: PresignatureRedisStorage, cfg: Config, mesh: Mesh, message_options: http_client::Options, @@ -97,12 +97,12 @@ impl ConsensusCtx for &mut MpcSignProtocol { &self.ctx.cfg } - fn triple_storage(&self) -> LockTripleRedisStorage { - self.ctx.triple_storage.clone() + fn triple_storage(&self) -> &TripleRedisStorage { + &self.ctx.triple_storage } - fn presignature_storage(&self) -> LockPresignatureRedisStorage { - self.ctx.presignature_storage.clone() + fn presignature_storage(&self) -> &PresignatureRedisStorage { + &self.ctx.presignature_storage } fn message_options(&self) -> http_client::Options { @@ -177,8 +177,8 @@ impl MpcSignProtocol { receiver: mpsc::Receiver, sign_queue: Arc>, secret_storage: SecretNodeStorageBox, - triple_storage: LockTripleRedisStorage, - presignature_storage: LockPresignatureRedisStorage, + triple_storage: TripleRedisStorage, + presignature_storage: PresignatureRedisStorage, cfg: Config, mesh_options: mesh::Options, message_options: http_client::Options, diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 16aca0a2..6aecae80 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -1,7 +1,7 @@ use super::message::PresignatureMessage; use super::triple::{Triple, TripleId, TripleManager}; use crate::protocol::contract::primitives::Participants; -use crate::storage::presignature_storage::LockPresignatureRedisStorage; +use crate::storage::presignature_storage::PresignatureRedisStorage; use crate::types::{PresignatureProtocol, SecretKeyShare}; use crate::util::AffinePointExt; @@ -150,7 +150,7 @@ pub enum GenerationError { /// Abstracts how triples are generated by providing a way to request a new triple that will be /// complete some time in the future and a way to take an already generated triple. pub struct PresignatureManager { - presignature_storage: LockPresignatureRedisStorage, + presignature_storage: PresignatureRedisStorage, /// Ongoing presignature generation protocols. generators: HashMap, /// The set of presignatures that were introduced to the system by the current node. @@ -171,10 +171,10 @@ impl PresignatureManager { threshold: usize, epoch: u64, my_account_id: &AccountId, - presignature_storage: LockPresignatureRedisStorage, + storage: &PresignatureRedisStorage, ) -> Self { Self { - presignature_storage, + presignature_storage: storage.clone(), generators: HashMap::new(), introduced: HashSet::new(), gc: HashMap::new(), @@ -189,13 +189,7 @@ impl PresignatureManager { tracing::debug!(id = ?presignature.id, "inserting presignature"); // Remove from taken list if it was there self.gc.remove(&presignature.id); - if let Err(e) = self - .presignature_storage - .write() - .await - .insert(presignature) - .await - { + if let Err(e) = self.presignature_storage.insert(presignature).await { tracing::error!(?e, "failed to insert presignature"); } } @@ -204,13 +198,7 @@ impl PresignatureManager { tracing::debug!(id = ?presignature.id, "inserting mine presignature"); // Remove from taken list if it was there self.gc.remove(&presignature.id); - if let Err(e) = self - .presignature_storage - .write() - .await - .insert_mine(presignature) - .await - { + if let Err(e) = self.presignature_storage.insert_mine(presignature).await { tracing::error!(?e, "failed to insert mine presignature"); } } @@ -218,8 +206,6 @@ impl PresignatureManager { /// Returns true if the presignature with the given id is already generated pub async fn contains(&self, id: &PresignatureId) -> bool { self.presignature_storage - .read() - .await .contains(id) .await .map_err(|e| { @@ -231,8 +217,6 @@ impl PresignatureManager { /// Returns true if the mine presignature with the given id is already generated pub async fn contains_mine(&self, id: &PresignatureId) -> bool { self.presignature_storage - .read() - .await .contains_mine(id) .await .map_err(|e| { @@ -242,17 +226,10 @@ impl PresignatureManager { } pub async fn take(&mut self, id: PresignatureId) -> Result { - if let Some(presignature) = self - .presignature_storage - .write() - .await - .take(&id) - .await - .map_err(|e| { - tracing::error!(?e, "failed to look for presignature"); - GenerationError::PresignatureIsMissing(id) - })? - { + if let Some(presignature) = self.presignature_storage.take(&id).await.map_err(|e| { + tracing::error!(?e, "failed to look for presignature"); + GenerationError::PresignatureIsMissing(id) + })? { self.gc.insert(id, Instant::now()); tracing::debug!(id, "took presignature"); return Ok(presignature); @@ -273,8 +250,6 @@ impl PresignatureManager { pub async fn take_mine(&mut self) -> Option { if let Some(presignature) = self .presignature_storage - .write() - .await .take_mine() .await .map_err(|e| { @@ -291,8 +266,6 @@ impl PresignatureManager { /// Returns the number of unspent presignatures available in the manager. pub async fn len_generated(&self) -> usize { self.presignature_storage - .read() - .await .len_generated() .await .map_err(|e| { @@ -304,8 +277,6 @@ impl PresignatureManager { /// Returns the number of unspent presignatures assigned to this node. pub async fn len_mine(&self) -> usize { self.presignature_storage - .read() - .await .len_mine() .await .map_err(|e| { diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 327d543f..798b4d35 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -2,7 +2,7 @@ use super::contract::primitives::Participants; use super::cryptography::CryptographicError; use super::message::TripleMessage; use super::presignature::GenerationError; -use crate::storage::triple_storage::LockTripleRedisStorage; +use crate::storage::triple_storage::TripleRedisStorage; use crate::types::TripleProtocol; use crate::util::AffinePointExt; @@ -80,7 +80,7 @@ impl TripleGenerator { /// complete some time in the future and a way to take an already generated triple. pub struct TripleManager { /// Triple Storage - pub triple_storage: LockTripleRedisStorage, + pub triple_storage: TripleRedisStorage, /// The pool of triple protocols that have yet to be completed. pub generators: HashMap, @@ -128,7 +128,7 @@ impl TripleManager { threshold: usize, epoch: u64, my_account_id: &AccountId, - triple_storage: LockTripleRedisStorage, + storage: &TripleRedisStorage, ) -> Self { Self { generators: HashMap::new(), @@ -139,7 +139,7 @@ impl TripleManager { me, threshold, epoch, - triple_storage, + triple_storage: storage.clone(), my_account_id: my_account_id.clone(), } } @@ -147,7 +147,7 @@ impl TripleManager { pub async fn insert(&mut self, triple: Triple) { tracing::debug!(id = triple.id, "inserting triple"); self.gc.remove(&triple.id); - if let Err(e) = self.triple_storage.write().await.insert(triple).await { + if let Err(e) = self.triple_storage.insert(triple).await { tracing::warn!(?e, "failed to insert triple"); } } @@ -155,15 +155,13 @@ impl TripleManager { pub async fn insert_mine(&mut self, triple: Triple) { tracing::debug!(id = triple.id, "inserting mine triple"); self.gc.remove(&triple.id); - if let Err(e) = self.triple_storage.write().await.insert_mine(triple).await { + if let Err(e) = self.triple_storage.insert_mine(triple).await { tracing::warn!(?e, "failed to insert mine triple"); } } pub async fn contains(&self, id: &TripleId) -> bool { self.triple_storage - .read() - .await .contains(id) .await .map_err(|e| tracing::warn!(?e, "failed to check if triple exists")) @@ -172,8 +170,6 @@ impl TripleManager { pub async fn contains_mine(&self, id: &TripleId) -> bool { self.triple_storage - .read() - .await .contains_mine(id) .await .map_err(|e| tracing::warn!(?e, "failed to check if mine triple exists")) @@ -189,7 +185,7 @@ impl TripleManager { id0: TripleId, id1: TripleId, ) -> Result<(Triple, Triple), GenerationError> { - let mut triples = self.triple_storage.write().await; + let triples = &self.triple_storage; let triple_0 = match triples.take(&id0).await { Ok(Some(triple)) => triple, Ok(None) => { @@ -248,7 +244,7 @@ impl TripleManager { /// It is very important to NOT reuse the same triple twice for two different /// protocols. pub async fn take_two_mine(&mut self) -> Option<(Triple, Triple)> { - let mut triples = self.triple_storage.write().await; + let triples = &self.triple_storage; if triples.len_mine().await.unwrap_or(0) < 2 { tracing::warn!("not enough mine triples"); return None; @@ -268,10 +264,7 @@ impl TripleManager { let triple_1 = match triples.take_mine().await { Ok(Some(triple)) => triple, Ok(None) => { - if let Err(e) = triples - .insert_mine(triple_0) - .await - { + if let Err(e) = triples.insert_mine(triple_0).await { tracing::warn!(?e, "failed to insert mine triple back"); } tracing::warn!("no mine triple left"); @@ -279,10 +272,7 @@ impl TripleManager { } Err(e) => { tracing::warn!(?e, "failed to take mine triple"); - if let Err(e) = triples - .insert_mine(triple_0) - .await - { + if let Err(e) = triples.insert_mine(triple_0).await { tracing::warn!(?e, "failed to insert mine triple back"); } return None; @@ -299,22 +289,12 @@ impl TripleManager { /// Returns the number of unspent triples available in the manager. pub async fn len_generated(&self) -> usize { - self.triple_storage - .read() - .await - .len_generated() - .await - .unwrap_or(0) + self.triple_storage.len_generated().await.unwrap_or(0) } /// Returns the number of unspent triples assigned to this node. pub async fn len_mine(&self) -> usize { - self.triple_storage - .read() - .await - .len_mine() - .await - .unwrap_or(0) + self.triple_storage.len_mine().await.unwrap_or(0) } /// Returns if there's any unspent triple in the manager. diff --git a/chain-signatures/node/src/storage/presignature_storage.rs b/chain-signatures/node/src/storage/presignature_storage.rs index 7720976b..0f750185 100644 --- a/chain-signatures/node/src/storage/presignature_storage.rs +++ b/chain-signatures/node/src/storage/presignature_storage.rs @@ -1,33 +1,30 @@ -use std::sync::Arc; - use anyhow::Ok; use deadpool_redis::Pool; use near_sdk::AccountId; use redis::{AsyncCommands, FromRedisValue, RedisWrite, ToRedisArgs}; -use tokio::sync::RwLock; use crate::protocol::presignature::{Presignature, PresignatureId}; type PresigResult = std::result::Result; -pub type LockPresignatureRedisStorage = Arc>; // Can be used to "clear" redis storage in case of a breaking change const PRESIGNATURE_STORAGE_VERSION: &str = "v1"; -pub fn init(redis_pool: Pool, node_account_id: &AccountId) -> PresignatureRedisStorage { +pub fn init(pool: &Pool, node_account_id: &AccountId) -> PresignatureRedisStorage { PresignatureRedisStorage { - redis_pool, + redis_pool: pool.clone(), node_account_id: node_account_id.clone(), } } +#[derive(Clone)] pub struct PresignatureRedisStorage { redis_pool: Pool, node_account_id: AccountId, } impl PresignatureRedisStorage { - pub async fn insert(&mut self, presignature: Presignature) -> PresigResult<()> { + pub async fn insert(&self, presignature: Presignature) -> PresigResult<()> { let mut connection = self.redis_pool.get().await?; connection .hset::<&str, PresignatureId, Presignature, ()>( @@ -39,7 +36,7 @@ impl PresignatureRedisStorage { Ok(()) } - pub async fn insert_mine(&mut self, presignature: Presignature) -> PresigResult<()> { + pub async fn insert_mine(&self, presignature: Presignature) -> PresigResult<()> { let mut connection = self.redis_pool.get().await?; connection .sadd::<&str, PresignatureId, ()>(&self.mine_key(), presignature.id) @@ -60,7 +57,7 @@ impl PresignatureRedisStorage { Ok(result) } - pub async fn take(&mut self, id: &PresignatureId) -> PresigResult> { + pub async fn take(&self, id: &PresignatureId) -> PresigResult> { let mut connection = self.redis_pool.get().await?; if self.contains_mine(id).await? { tracing::error!("Can not take mine presignature as foreign: {:?}", id); @@ -78,7 +75,7 @@ impl PresignatureRedisStorage { } } - pub async fn take_mine(&mut self) -> PresigResult> { + pub async fn take_mine(&self) -> PresigResult> { let mut connection = self.redis_pool.get().await?; let id: Option = connection.spop(self.mine_key()).await?; match id { @@ -99,7 +96,7 @@ impl PresignatureRedisStorage { Ok(result) } - pub async fn clear(&mut self) -> PresigResult<()> { + pub async fn clear(&self) -> PresigResult<()> { let mut connection = self.redis_pool.get().await?; connection.del::<&str, ()>(&self.presig_key()).await?; connection.del::<&str, ()>(&self.mine_key()).await?; diff --git a/chain-signatures/node/src/storage/triple_storage.rs b/chain-signatures/node/src/storage/triple_storage.rs index 89c02151..4a76c336 100644 --- a/chain-signatures/node/src/storage/triple_storage.rs +++ b/chain-signatures/node/src/storage/triple_storage.rs @@ -1,39 +1,37 @@ use crate::protocol::triple::{Triple, TripleId}; -use std::sync::Arc; use deadpool_redis::Pool; use redis::{AsyncCommands, FromRedisValue, RedisWrite, ToRedisArgs}; -use tokio::sync::RwLock; use near_account_id::AccountId; -pub type LockTripleRedisStorage = Arc>; type TripleResult = std::result::Result; // Can be used to "clear" redis storage in case of a breaking change const TRIPLE_STORAGE_VERSION: &str = "v1"; -pub fn init(redis_pool: Pool, account_id: &AccountId) -> TripleRedisStorage { +pub fn init(pool: &Pool, account_id: &AccountId) -> TripleRedisStorage { TripleRedisStorage { - redis_pool, + redis_pool: pool.clone(), node_account_id: account_id.clone(), } } +#[derive(Clone)] pub struct TripleRedisStorage { redis_pool: Pool, node_account_id: AccountId, } impl TripleRedisStorage { - pub async fn insert(&mut self, triple: Triple) -> TripleResult<()> { + pub async fn insert(&self, triple: Triple) -> TripleResult<()> { let mut conn = self.redis_pool.get().await?; conn.hset::<&str, TripleId, Triple, ()>(&self.triple_key(), triple.id, triple) .await?; Ok(()) } - pub async fn insert_mine(&mut self, triple: Triple) -> TripleResult<()> { + pub async fn insert_mine(&self, triple: Triple) -> TripleResult<()> { let mut conn = self.redis_pool.get().await?; conn.sadd::<&str, TripleId, ()>(&self.mine_key(), triple.id) .await?; @@ -53,7 +51,7 @@ impl TripleRedisStorage { Ok(result) } - pub async fn take(&mut self, id: &TripleId) -> TripleResult> { + pub async fn take(&self, id: &TripleId) -> TripleResult> { let mut conn = self.redis_pool.get().await?; if self.contains_mine(id).await? { tracing::error!("Can not take mine triple as foreign: {:?}", id); @@ -70,7 +68,7 @@ impl TripleRedisStorage { } } - pub async fn take_mine(&mut self) -> TripleResult> { + pub async fn take_mine(&self) -> TripleResult> { let mut conn = self.redis_pool.get().await?; let id: Option = conn.spop(self.mine_key()).await?; match id { @@ -91,7 +89,7 @@ impl TripleRedisStorage { Ok(result) } - pub async fn clear(&mut self) -> TripleResult<()> { + pub async fn clear(&self) -> TripleResult<()> { let mut conn = self.redis_pool.get().await?; conn.del::<&str, ()>(&self.triple_key()).await?; conn.del::<&str, ()>(&self.mine_key()).await?; diff --git a/integration-tests/chain-signatures/tests/cases/mod.rs b/integration-tests/chain-signatures/tests/cases/mod.rs index 8413af52..77295a44 100644 --- a/integration-tests/chain-signatures/tests/cases/mod.rs +++ b/integration-tests/chain-signatures/tests/cases/mod.rs @@ -20,7 +20,6 @@ use mpc_node::kdf::into_eth_sig; use mpc_node::protocol::presignature::{Presignature, PresignatureId, PresignatureManager}; use mpc_node::protocol::triple::{Triple, TripleManager}; use mpc_node::storage; -use mpc_node::storage::presignature_storage::LockPresignatureRedisStorage; use mpc_node::types::LatestBlockHeight; use mpc_node::util::NearPublicKeyExt; use near_account_id::AccountId; @@ -220,11 +219,10 @@ async fn test_triple_persistence() -> anyhow::Result<()> { let redis_url = Url::parse(redis.internal_address.as_str())?; let redis_cfg = deadpool_redis::Config::from_url(redis_url); let redis_pool = redis_cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); - let triple_storage: storage::triple_storage::LockTripleRedisStorage = - Arc::new(RwLock::new(storage::triple_storage::init( - redis_pool.clone(), - &AccountId::from_str("test.near").unwrap(), - ))); + let triple_storage = storage::triple_storage::init( + redis_pool.clone(), + &AccountId::from_str("test.near").unwrap(), + ); let mut triple_manager = TripleManager::new( Participant::from(0), @@ -311,17 +309,16 @@ async fn test_presignature_persistence() -> anyhow::Result<()> { let redis_url = Url::parse(redis.internal_address.as_str())?; let redis_cfg = deadpool_redis::Config::from_url(redis_url); let redis_pool = redis_cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); - let presignature_storage: LockPresignatureRedisStorage = - Arc::new(RwLock::new(storage::presignature_storage::init( - redis_pool.clone(), - &AccountId::from_str("test.near").unwrap(), - ))); + let presignature_storage = storage::presignature_storage::init( + &redis_pool, + &AccountId::from_str("test.near").unwrap(), + ); let mut presignature_manager = PresignatureManager::new( Participant::from(0), 5, 123, &AccountId::from_str("test.near").unwrap(), - presignature_storage, + &presignature_storage, ); let presignature = dummy_presignature();