Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:near/mpc-recovery into phuong/fi…
Browse files Browse the repository at this point in the history
…x/contract-tests
  • Loading branch information
ChaoticTempest committed Oct 24, 2024
2 parents 4878f1a + 20aacf6 commit e61f53e
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 64 deletions.
14 changes: 13 additions & 1 deletion chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig};
use crate::gcp::GcpService;
use crate::protocol::{MpcSignProtocol, SignQueue};
use crate::storage::triple_storage::LockTripleNodeStorageBox;
use crate::{indexer, storage, web};
use crate::{http_client, indexer, mesh, storage, web};
use clap::Parser;
use local_ip_address::local_ip;
use near_account_id::AccountId;
Expand Down Expand Up @@ -63,6 +63,10 @@ pub enum Cli {
/// referer header for mainnet whitelist
#[arg(long, env("MPC_CLIENT_HEADER_REFERER"), default_value(None))]
client_header_referer: Option<String>,
#[clap(flatten)]
mesh_options: mesh::Options,
#[clap(flatten)]
message_options: http_client::Options,
},
}

Expand All @@ -83,6 +87,8 @@ impl Cli {
storage_options,
override_config,
client_header_referer,
mesh_options,
message_options,
} => {
let mut args = vec![
"start".to_string(),
Expand Down Expand Up @@ -120,6 +126,8 @@ impl Cli {

args.extend(indexer_options.into_str_args());
args.extend(storage_options.into_str_args());
args.extend(mesh_options.into_str_args());
args.extend(message_options.into_str_args());
args
}
}
Expand Down Expand Up @@ -176,6 +184,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
storage_options,
override_config,
client_header_referer,
mesh_options,
message_options,
} => {
let sign_queue = Arc::new(RwLock::new(SignQueue::new()));
let rt = tokio::runtime::Builder::new_multi_thread()
Expand Down Expand Up @@ -237,6 +247,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
sign_sk,
},
}),
mesh_options,
message_options,
);

rt.block_on(async {
Expand Down
52 changes: 43 additions & 9 deletions chain-signatures/node/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ use std::time::{Duration, Instant};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

#[derive(Debug, Clone, clap::Parser)]
#[group(id = "message_options")]
pub struct Options {
#[clap(long, env("MPC_MESSAGE_TIMEOUT"), default_value = "1000")]
pub timeout: u64,
}

impl Options {
pub fn into_str_args(self) -> Vec<String> {
vec!["--timeout".to_string(), self.timeout.to_string()]
}
}

#[derive(Debug, thiserror::Error)]
pub enum SendError {
#[error("http request was unsuccessful: {0}")]
Expand All @@ -36,19 +49,25 @@ async fn send_encrypted<U: IntoUrl>(
client: &Client,
url: U,
message: Vec<Ciphered>,
request_timeout: Duration,
) -> Result<(), SendError> {
let _span = tracing::info_span!("message_request");
let mut url = url.into_url()?;
url.set_path("msg");
tracing::debug!(?from, to = %url, "making http request: sending encrypted message");
let action = || async {
let response = client
.post(url.clone())
.header("content-type", "application/json")
.json(&message)
.send()
.await
.map_err(SendError::ReqwestClientError)?;
let response = tokio::time::timeout(
request_timeout,
client
.post(url.clone())
.header("content-type", "application/json")
.json(&message)
.send(),
)
.await
.map_err(|_| SendError::Timeout(format!("send encrypted from {from:?} to {url}")))?
.map_err(SendError::ReqwestClientError)?;

let status = response.status();
let response_bytes = response
.bytes()
Expand All @@ -75,13 +94,21 @@ async fn send_encrypted<U: IntoUrl>(

// TODO: add in retry logic either in struct or at call site.
// TODO: add check for participant list to see if the messages to be sent are still valid.
#[derive(Default)]
pub struct MessageQueue {
deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>,
seen_counts: HashSet<String>,
message_options: Options,
}

impl MessageQueue {
pub fn new(options: Options) -> Self {
Self {
deque: VecDeque::default(),
seen_counts: HashSet::default(),
message_options: options,
}
}

pub fn len(&self) -> usize {
self.deque.len()
}
Expand Down Expand Up @@ -147,7 +174,14 @@ impl MessageQueue {
crate::metrics::NUM_SEND_ENCRYPTED_TOTAL
.with_label_values(&[account_id.as_str()])
.inc();
if let Err(err) = send_encrypted(from, client, &info.url, encrypted_partition).await
if let Err(err) = send_encrypted(
from,
client,
&info.url,
encrypted_partition,
Duration::from_millis(self.message_options.timeout),
)
.await
{
crate::metrics::NUM_SEND_ENCRYPTED_FAILURE
.with_label_values(&[account_id.as_str()])
Expand Down
121 changes: 74 additions & 47 deletions chain-signatures/node/src/mesh/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use tokio::sync::RwLock;
use url::Url;

use crate::protocol::contract::primitives::Participants;
use crate::protocol::ParticipantInfo;
use crate::protocol::ProtocolState;
use crate::web::StateView;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);

// TODO: this is a basic connection pool and does not do most of the work yet. This is
// mostly here just to facilitate offline node handling for now.
// TODO/NOTE: we can use libp2p to facilitate most the of low level TCP connection work.
Expand All @@ -25,12 +24,43 @@ pub struct Pool {
current_active: RwLock<Option<(Participants, Instant)>>,
// Potentially active participants that we can use to establish a connection in the next epoch.
potential_active: RwLock<Option<(Participants, Instant)>>,
fetch_participant_timeout: Duration,
refresh_active_timeout: Duration,
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum FetchParticipantError {
#[error("request timed out")]
Timeout,
#[error("Response cannot be converted to JSON")]
JsonConversion,
#[error("Invalid URL")]
InvalidUrl,
#[error("Network error: {0}")]
NetworkError(String),
}

impl Pool {
pub fn new(fetch_participant_timeout: Duration, refresh_active_timeout: Duration) -> Self {
tracing::info!(
?fetch_participant_timeout,
?refresh_active_timeout,
"creating a new pool"
);
Self {
http: reqwest::Client::new(),
connections: RwLock::new(Participants::default()),
potential_connections: RwLock::new(Participants::default()),
status: RwLock::new(HashMap::default()),
current_active: RwLock::new(Option::default()),
potential_active: RwLock::new(Option::default()),
fetch_participant_timeout,
refresh_active_timeout,
}
}
pub async fn ping(&self) -> Participants {
if let Some((ref active, timestamp)) = *self.current_active.read().await {
if timestamp.elapsed() < DEFAULT_TIMEOUT {
if timestamp.elapsed() < self.refresh_active_timeout {
return active.clone();
}
}
Expand All @@ -40,35 +70,15 @@ impl Pool {
let mut status = self.status.write().await;
let mut participants = Participants::default();
for (participant, info) in connections.iter() {
let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else {
tracing::error!(
"Pool.ping url is invalid participant {:?} url {} /state",
participant,
info.url
);
continue;
};

let Ok(resp) = self.http.get(url.clone()).send().await else {
tracing::warn!(
"Pool.ping resp err participant {:?} url {}",
participant,
url
);
continue;
};

let Ok(state): Result<StateView, _> = resp.json().await else {
tracing::warn!(
"Pool.ping state view err participant {:?} url {}",
participant,
url
);
continue;
};

status.insert(*participant, state);
participants.insert(participant, info.clone());
match self.fetch_participant_state(info).await {
Ok(state) => {
status.insert(*participant, state);
participants.insert(participant, info.clone());
}
Err(e) => {
tracing::warn!("Fetch state for participant {participant:?} with url {} has failed with error {e}.", info.url);
}
}
}
drop(status);

Expand All @@ -79,7 +89,7 @@ impl Pool {

pub async fn ping_potential(&self) -> Participants {
if let Some((ref active, timestamp)) = *self.potential_active.read().await {
if timestamp.elapsed() < DEFAULT_TIMEOUT {
if timestamp.elapsed() < self.refresh_active_timeout {
return active.clone();
}
}
Expand All @@ -89,20 +99,15 @@ impl Pool {
let mut status = self.status.write().await;
let mut participants = Participants::default();
for (participant, info) in connections.iter() {
let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else {
continue;
};

let Ok(resp) = self.http.get(url).send().await else {
continue;
};

let Ok(state): Result<StateView, _> = resp.json().await else {
continue;
};

status.insert(*participant, state);
participants.insert(participant, info.clone());
match self.fetch_participant_state(info).await {
Ok(state) => {
status.insert(*participant, state);
participants.insert(participant, info.clone());
}
Err(e) => {
tracing::warn!("Fetch state for participant {participant:?} with url {} has failed with error {e}.", info.url);
}
}
}
drop(status);

Expand Down Expand Up @@ -159,4 +164,26 @@ impl Pool {
_ => false,
})
}

async fn fetch_participant_state(
&self,
participant_info: &ParticipantInfo,
) -> Result<StateView, FetchParticipantError> {
let Ok(Ok(url)) = Url::parse(&participant_info.url).map(|url| url.join("/state")) else {
return Err(FetchParticipantError::InvalidUrl);
};
match tokio::time::timeout(
self.fetch_participant_timeout,
self.http.get(url.clone()).send(),
)
.await
{
Ok(Ok(resp)) => match resp.json::<StateView>().await {
Ok(state) => Ok(state),
Err(_) => Err(FetchParticipantError::JsonConversion),
},
Ok(Err(e)) => Err(FetchParticipantError::NetworkError(e.to_string())),
Err(_) => Err(FetchParticipantError::Timeout),
}
}
}
38 changes: 37 additions & 1 deletion chain-signatures/node/src/mesh/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,34 @@
use std::time::Duration;

use crate::protocol::contract::primitives::Participants;
use crate::protocol::ProtocolState;

pub mod connection;

#[derive(Default)]
#[derive(Debug, Clone, clap::Parser)]
#[group(id = "mesh_options")]
pub struct Options {
#[clap(
long,
env("MPC_MESH_FETCH_PARTICIPANT_TIMEOUT"),
default_value = "1000"
)]
pub fetch_participant_timeout: u64,
#[clap(long, env("MPC_MESH_REFRESH_ACTIVE_TIMEOUT"), default_value = "1000")]
pub refresh_active_timeout: u64,
}

impl Options {
pub fn into_str_args(self) -> Vec<String> {
vec![
"--fetch-participant-timeout".to_string(),
self.fetch_participant_timeout.to_string(),
"--refresh-active-timeout".to_string(),
self.refresh_active_timeout.to_string(),
]
}
}

pub struct Mesh {
/// Pool of connections to participants. Used to check who is alive in the network.
pub connections: connection::Pool,
Expand All @@ -17,6 +42,17 @@ pub struct Mesh {
}

impl Mesh {
pub fn new(options: Options) -> Self {
Self {
connections: connection::Pool::new(
Duration::from_millis(options.fetch_participant_timeout),
Duration::from_millis(options.refresh_active_timeout),
),
active_participants: Participants::default(),
active_potential_participants: Participants::default(),
}
}

/// Participants that are active at the beginning of each protocol loop.
pub fn active_participants(&self) -> &Participants {
&self.active_participants
Expand Down
Loading

0 comments on commit e61f53e

Please sign in to comment.