Skip to content

Commit

Permalink
Removed Arc and RwLock for triple/presignature storage
Browse files Browse the repository at this point in the history
  • Loading branch information
ChaoticTempest committed Oct 30, 2024
1 parent 566a134 commit e190515
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 130 deletions.
13 changes: 3 additions & 10 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig};
use crate::gcp::GcpService;
use crate::protocol::{MpcSignProtocol, SignQueue};
use crate::storage::presignature_storage::LockPresignatureRedisStorage;
use crate::storage::triple_storage::LockTripleRedisStorage;
use crate::{http_client, indexer, mesh, storage, web};
use clap::Parser;
use deadpool_redis::Runtime;
Expand Down Expand Up @@ -213,14 +211,9 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {

let redis_cfg = deadpool_redis::Config::from_url(redis_url);
let redis_pool = redis_cfg.create_pool(Some(Runtime::Tokio1)).unwrap();

let triple_storage: LockTripleRedisStorage = Arc::new(RwLock::new(
storage::triple_storage::init(redis_pool.clone(), &account_id),
));

let presignature_storage: LockPresignatureRedisStorage = Arc::new(RwLock::new(
storage::presignature_storage::init(redis_pool.clone(), &account_id),
));
let triple_storage = storage::triple_storage::init(&redis_pool, &account_id);
let presignature_storage =
storage::presignature_storage::init(&redis_pool, &account_id);

let sign_sk = sign_sk.unwrap_or_else(|| account_sk.clone());
let my_address = my_address
Expand Down
12 changes: 6 additions & 6 deletions chain-signatures/node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use crate::protocol::presignature::PresignatureManager;
use crate::protocol::signature::SignatureManager;
use crate::protocol::state::{GeneratingState, ResharingState};
use crate::protocol::triple::TripleManager;
use crate::storage::presignature_storage::LockPresignatureRedisStorage;
use crate::storage::presignature_storage::PresignatureRedisStorage;
use crate::storage::secret_storage::SecretNodeStorageBox;
use crate::storage::triple_storage::LockTripleRedisStorage;
use crate::storage::triple_storage::TripleRedisStorage;
use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare};
use crate::util::AffinePointExt;
use crate::{http_client, rpc_client};
Expand All @@ -40,8 +40,8 @@ pub trait ConsensusCtx {
fn my_address(&self) -> &Url;
fn sign_queue(&self) -> Arc<RwLock<SignQueue>>;
fn secret_storage(&self) -> &SecretNodeStorageBox;
fn triple_storage(&self) -> LockTripleRedisStorage;
fn presignature_storage(&self) -> LockPresignatureRedisStorage;
fn triple_storage(&self) -> &TripleRedisStorage;
fn presignature_storage(&self) -> &PresignatureRedisStorage;
fn cfg(&self) -> &Config;
fn message_options(&self) -> http_client::Options;
}
Expand Down Expand Up @@ -361,14 +361,14 @@ impl ConsensusProtocol for WaitingForConsensusState {

// Clear triples from storage before starting the new epoch. This is necessary if the node has accumulated
// triples from previous epochs. If it was not able to clear the previous triples, we'll leave them as-is
if let Err(err) = ctx.triple_storage().write().await.clear().await {
if let Err(err) = ctx.triple_storage().clear().await {
tracing::error!(
?err,
"failed to clear triples from storage on new epoch start"
);
}

if let Err(err) = ctx.presignature_storage().write().await.clear().await {
if let Err(err) = ctx.presignature_storage().clear().await {
tracing::error!(
?err,
"failed to clear presignatures from storage on new epoch start"
Expand Down
20 changes: 10 additions & 10 deletions chain-signatures/node/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use crate::protocol::consensus::ConsensusProtocol;
use crate::protocol::cryptography::CryptographicProtocol;
use crate::protocol::message::{MessageHandler, MpcMessageQueue};
use crate::rpc_client;
use crate::storage::presignature_storage::LockPresignatureRedisStorage;
use crate::storage::presignature_storage::PresignatureRedisStorage;
use crate::storage::secret_storage::SecretNodeStorageBox;
use crate::storage::triple_storage::LockTripleRedisStorage;
use crate::storage::triple_storage::TripleRedisStorage;

use cait_sith::protocol::Participant;
use near_account_id::AccountId;
Expand All @@ -53,8 +53,8 @@ struct Ctx {
http_client: reqwest::Client,
sign_queue: Arc<RwLock<SignQueue>>,
secret_storage: SecretNodeStorageBox,
triple_storage: LockTripleRedisStorage,
presignature_storage: LockPresignatureRedisStorage,
triple_storage: TripleRedisStorage,
presignature_storage: PresignatureRedisStorage,
cfg: Config,
mesh: Mesh,
message_options: http_client::Options,
Expand Down Expand Up @@ -97,12 +97,12 @@ impl ConsensusCtx for &mut MpcSignProtocol {
&self.ctx.cfg
}

fn triple_storage(&self) -> LockTripleRedisStorage {
self.ctx.triple_storage.clone()
fn triple_storage(&self) -> &TripleRedisStorage {
&self.ctx.triple_storage
}

fn presignature_storage(&self) -> LockPresignatureRedisStorage {
self.ctx.presignature_storage.clone()
fn presignature_storage(&self) -> &PresignatureRedisStorage {
&self.ctx.presignature_storage
}

fn message_options(&self) -> http_client::Options {
Expand Down Expand Up @@ -177,8 +177,8 @@ impl MpcSignProtocol {
receiver: mpsc::Receiver<MpcMessage>,
sign_queue: Arc<RwLock<SignQueue>>,
secret_storage: SecretNodeStorageBox,
triple_storage: LockTripleRedisStorage,
presignature_storage: LockPresignatureRedisStorage,
triple_storage: TripleRedisStorage,
presignature_storage: PresignatureRedisStorage,
cfg: Config,
mesh_options: mesh::Options,
message_options: http_client::Options,
Expand Down
49 changes: 10 additions & 39 deletions chain-signatures/node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::message::PresignatureMessage;
use super::triple::{Triple, TripleId, TripleManager};
use crate::protocol::contract::primitives::Participants;
use crate::storage::presignature_storage::LockPresignatureRedisStorage;
use crate::storage::presignature_storage::PresignatureRedisStorage;
use crate::types::{PresignatureProtocol, SecretKeyShare};
use crate::util::AffinePointExt;

Expand Down Expand Up @@ -150,7 +150,7 @@ pub enum GenerationError {
/// Abstracts how triples are generated by providing a way to request a new triple that will be
/// complete some time in the future and a way to take an already generated triple.
pub struct PresignatureManager {
presignature_storage: LockPresignatureRedisStorage,
presignature_storage: PresignatureRedisStorage,
/// Ongoing presignature generation protocols.
generators: HashMap<PresignatureId, PresignatureGenerator>,
/// The set of presignatures that were introduced to the system by the current node.
Expand All @@ -171,10 +171,10 @@ impl PresignatureManager {
threshold: usize,
epoch: u64,
my_account_id: &AccountId,
presignature_storage: LockPresignatureRedisStorage,
storage: &PresignatureRedisStorage,
) -> Self {
Self {
presignature_storage,
presignature_storage: storage.clone(),
generators: HashMap::new(),
introduced: HashSet::new(),
gc: HashMap::new(),
Expand All @@ -189,13 +189,7 @@ impl PresignatureManager {
tracing::debug!(id = ?presignature.id, "inserting presignature");
// Remove from taken list if it was there
self.gc.remove(&presignature.id);
if let Err(e) = self
.presignature_storage
.write()
.await
.insert(presignature)
.await
{
if let Err(e) = self.presignature_storage.insert(presignature).await {
tracing::error!(?e, "failed to insert presignature");
}
}
Expand All @@ -204,22 +198,14 @@ impl PresignatureManager {
tracing::debug!(id = ?presignature.id, "inserting mine presignature");
// Remove from taken list if it was there
self.gc.remove(&presignature.id);
if let Err(e) = self
.presignature_storage
.write()
.await
.insert_mine(presignature)
.await
{
if let Err(e) = self.presignature_storage.insert_mine(presignature).await {
tracing::error!(?e, "failed to insert mine presignature");
}
}

/// Returns true if the presignature with the given id is already generated
pub async fn contains(&self, id: &PresignatureId) -> bool {
self.presignature_storage
.read()
.await
.contains(id)
.await
.map_err(|e| {
Expand All @@ -231,8 +217,6 @@ impl PresignatureManager {
/// Returns true if the mine presignature with the given id is already generated
pub async fn contains_mine(&self, id: &PresignatureId) -> bool {
self.presignature_storage
.read()
.await
.contains_mine(id)
.await
.map_err(|e| {
Expand All @@ -242,17 +226,10 @@ impl PresignatureManager {
}

pub async fn take(&mut self, id: PresignatureId) -> Result<Presignature, GenerationError> {
if let Some(presignature) = self
.presignature_storage
.write()
.await
.take(&id)
.await
.map_err(|e| {
tracing::error!(?e, "failed to look for presignature");
GenerationError::PresignatureIsMissing(id)
})?
{
if let Some(presignature) = self.presignature_storage.take(&id).await.map_err(|e| {
tracing::error!(?e, "failed to look for presignature");
GenerationError::PresignatureIsMissing(id)
})? {
self.gc.insert(id, Instant::now());
tracing::debug!(id, "took presignature");
return Ok(presignature);
Expand All @@ -273,8 +250,6 @@ impl PresignatureManager {
pub async fn take_mine(&mut self) -> Option<Presignature> {
if let Some(presignature) = self
.presignature_storage
.write()
.await
.take_mine()
.await
.map_err(|e| {
Expand All @@ -291,8 +266,6 @@ impl PresignatureManager {
/// Returns the number of unspent presignatures available in the manager.
pub async fn len_generated(&self) -> usize {
self.presignature_storage
.read()
.await
.len_generated()
.await
.map_err(|e| {
Expand All @@ -304,8 +277,6 @@ impl PresignatureManager {
/// Returns the number of unspent presignatures assigned to this node.
pub async fn len_mine(&self) -> usize {
self.presignature_storage
.read()
.await
.len_mine()
.await
.map_err(|e| {
Expand Down
44 changes: 12 additions & 32 deletions chain-signatures/node/src/protocol/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::contract::primitives::Participants;
use super::cryptography::CryptographicError;
use super::message::TripleMessage;
use super::presignature::GenerationError;
use crate::storage::triple_storage::LockTripleRedisStorage;
use crate::storage::triple_storage::TripleRedisStorage;
use crate::types::TripleProtocol;
use crate::util::AffinePointExt;

Expand Down Expand Up @@ -80,7 +80,7 @@ impl TripleGenerator {
/// complete some time in the future and a way to take an already generated triple.
pub struct TripleManager {
/// Triple Storage
pub triple_storage: LockTripleRedisStorage,
pub triple_storage: TripleRedisStorage,

/// The pool of triple protocols that have yet to be completed.
pub generators: HashMap<TripleId, TripleGenerator>,
Expand Down Expand Up @@ -128,7 +128,7 @@ impl TripleManager {
threshold: usize,
epoch: u64,
my_account_id: &AccountId,
triple_storage: LockTripleRedisStorage,
storage: &TripleRedisStorage,
) -> Self {
Self {
generators: HashMap::new(),
Expand All @@ -139,31 +139,29 @@ impl TripleManager {
me,
threshold,
epoch,
triple_storage,
triple_storage: storage.clone(),
my_account_id: my_account_id.clone(),
}
}

pub async fn insert(&mut self, triple: Triple) {
tracing::debug!(id = triple.id, "inserting triple");
self.gc.remove(&triple.id);
if let Err(e) = self.triple_storage.write().await.insert(triple).await {
if let Err(e) = self.triple_storage.insert(triple).await {
tracing::warn!(?e, "failed to insert triple");
}
}

pub async fn insert_mine(&mut self, triple: Triple) {
tracing::debug!(id = triple.id, "inserting mine triple");
self.gc.remove(&triple.id);
if let Err(e) = self.triple_storage.write().await.insert_mine(triple).await {
if let Err(e) = self.triple_storage.insert_mine(triple).await {
tracing::warn!(?e, "failed to insert mine triple");
}
}

pub async fn contains(&self, id: &TripleId) -> bool {
self.triple_storage
.read()
.await
.contains(id)
.await
.map_err(|e| tracing::warn!(?e, "failed to check if triple exists"))
Expand All @@ -172,8 +170,6 @@ impl TripleManager {

pub async fn contains_mine(&self, id: &TripleId) -> bool {
self.triple_storage
.read()
.await
.contains_mine(id)
.await
.map_err(|e| tracing::warn!(?e, "failed to check if mine triple exists"))
Expand All @@ -189,7 +185,7 @@ impl TripleManager {
id0: TripleId,
id1: TripleId,
) -> Result<(Triple, Triple), GenerationError> {
let mut triples = self.triple_storage.write().await;
let triples = &self.triple_storage;
let triple_0 = match triples.take(&id0).await {
Ok(Some(triple)) => triple,
Ok(None) => {
Expand Down Expand Up @@ -248,7 +244,7 @@ impl TripleManager {
/// It is very important to NOT reuse the same triple twice for two different
/// protocols.
pub async fn take_two_mine(&mut self) -> Option<(Triple, Triple)> {
let mut triples = self.triple_storage.write().await;
let triples = &self.triple_storage;
if triples.len_mine().await.unwrap_or(0) < 2 {
tracing::warn!("not enough mine triples");
return None;
Expand All @@ -268,21 +264,15 @@ impl TripleManager {
let triple_1 = match triples.take_mine().await {
Ok(Some(triple)) => triple,
Ok(None) => {
if let Err(e) = triples
.insert_mine(triple_0)
.await
{
if let Err(e) = triples.insert_mine(triple_0).await {
tracing::warn!(?e, "failed to insert mine triple back");
}
tracing::warn!("no mine triple left");
return None;
}
Err(e) => {
tracing::warn!(?e, "failed to take mine triple");
if let Err(e) = triples
.insert_mine(triple_0)
.await
{
if let Err(e) = triples.insert_mine(triple_0).await {
tracing::warn!(?e, "failed to insert mine triple back");
}
return None;
Expand All @@ -299,22 +289,12 @@ impl TripleManager {

/// Returns the number of unspent triples available in the manager.
pub async fn len_generated(&self) -> usize {
self.triple_storage
.read()
.await
.len_generated()
.await
.unwrap_or(0)
self.triple_storage.len_generated().await.unwrap_or(0)
}

/// Returns the number of unspent triples assigned to this node.
pub async fn len_mine(&self) -> usize {
self.triple_storage
.read()
.await
.len_mine()
.await
.unwrap_or(0)
self.triple_storage.len_mine().await.unwrap_or(0)
}

/// Returns if there's any unspent triple in the manager.
Expand Down
Loading

0 comments on commit e190515

Please sign in to comment.