From 63b05dacc9959954de05cf986607d1296cfd5412 Mon Sep 17 00:00:00 2001 From: yse <70684173+hydra-yse@users.noreply.github.com> Date: Thu, 5 Dec 2024 02:36:03 +0100 Subject: [PATCH] feat(rt-sync): add pull and merge (#556) --- lib/core/Cargo.toml | 5 +- lib/core/src/model.rs | 8 +- lib/core/src/persist/sync.rs | 8 +- lib/core/src/signer.rs | 21 +- lib/core/src/sync/client.rs | 59 ++++++ lib/core/src/sync/mod.rs | 317 ++++++++++++++++++++++++++++++ lib/core/src/sync/model/client.rs | 31 +++ lib/core/src/sync/model/data.rs | 174 ++++++++++++++++ lib/core/src/sync/model/mod.rs | 85 ++++++++ lib/core/src/test_utils/mod.rs | 1 + lib/core/src/test_utils/sync.rs | 102 ++++++++++ lib/core/src/test_utils/wallet.rs | 41 +++- 12 files changed, 842 insertions(+), 10 deletions(-) create mode 100644 lib/core/src/sync/client.rs create mode 100644 lib/core/src/sync/model/client.rs create mode 100644 lib/core/src/sync/model/data.rs create mode 100644 lib/core/src/test_utils/sync.rs diff --git a/lib/core/Cargo.toml b/lib/core/Cargo.toml index b426ff3e6..e6f2f5e35 100644 --- a/lib/core/Cargo.toml +++ b/lib/core/Cargo.toml @@ -56,10 +56,11 @@ x509-parser = { version = "0.16.0" } tempfile = "3" tonic = { version = "0.12.3", features = ["tls"] } prost = "0.13.3" -uuid = { version = "1.8.0", features = ["v4"] } +ecies = "0.2.7" +semver = "1.0.23" +lazy_static = "1.5.0" [dev-dependencies] -lazy_static = "1.5.0" paste = "1.0.15" tempdir = "0.3.7" uuid = { version = "1.8.0", features = ["v4"] } diff --git a/lib/core/src/model.rs b/lib/core/src/model.rs index 608ee507a..973260b99 100644 --- a/lib/core/src/model.rs +++ b/lib/core/src/model.rs @@ -264,6 +264,12 @@ pub trait Signer: Send + Sync { /// HMAC-SHA256 using the private key derived from the given derivation path /// This is used to calculate the linking key of lnurl-auth specification: https://github.com/lnurl/luds/blob/luds/05.md fn hmac_sha256(&self, msg: Vec, derivation_path: String) -> Result, SignerError>; + + /// Encrypts a message using (ECIES)[ecies::encrypt] + fn ecies_encrypt(&self, msg: &[u8]) -> Result, SignerError>; + + /// Decrypts a message using (ECIES)[ecies::decrypt] + fn ecies_decrypt(&self, msg: &[u8]) -> Result, SignerError>; } /// An argument when calling [crate::sdk::LiquidSdk::connect]. @@ -623,7 +629,7 @@ impl SwapScriptV2 { } } -#[derive(Debug, Copy, Clone, PartialEq, Serialize)] +#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] pub enum Direction { Incoming = 0, Outgoing = 1, diff --git a/lib/core/src/persist/sync.rs b/lib/core/src/persist/sync.rs index 956a9c162..72a9d1724 100644 --- a/lib/core/src/persist/sync.rs +++ b/lib/core/src/persist/sync.rs @@ -3,9 +3,13 @@ use std::collections::HashMap; use anyhow::Result; use rusqlite::{named_params, Connection, OptionalExtension, Row, Statement, TransactionBehavior}; -use super::Persister; +use super::{PaymentState, Persister}; use crate::{ - sync::model::{sync::Record, RecordType, SyncOutgoingChanges, SyncSettings, SyncState}, + sync::model::{ + data::{ChainSyncData, ReceiveSyncData, SendSyncData}, + sync::Record, + RecordType, SyncOutgoingChanges, SyncSettings, SyncState, + }, utils, }; diff --git a/lib/core/src/signer.rs b/lib/core/src/signer.rs index cbb2a622a..1746bf4c5 100644 --- a/lib/core/src/signer.rs +++ b/lib/core/src/signer.rs @@ -6,7 +6,7 @@ use boltz_client::PublicKey; use lwk_common::Signer as LwkSigner; use lwk_wollet::bitcoin::bip32::Xpriv; use lwk_wollet::bitcoin::Network; -use lwk_wollet::elements_miniscript; +use lwk_wollet::elements_miniscript::{self, ToPublicKey as _}; use lwk_wollet::elements_miniscript::{ bitcoin::{self, bip32::DerivationPath}, elements::{ @@ -253,6 +253,25 @@ impl Signer for SdkSigner { .as_byte_array() .to_vec()) } + + fn ecies_encrypt(&self, msg: &[u8]) -> Result, SignerError> { + let keypair = self.xprv.to_keypair(&self.secp); + let rc_pub = keypair.public_key().to_public_key().to_bytes(); + Ok( + ecies::encrypt(&rc_pub, msg).map_err(|err| SignerError::Generic { + err: format!("Could not encrypt data: {err}"), + })?, + ) + } + + fn ecies_decrypt(&self, msg: &[u8]) -> Result, SignerError> { + let rc_prv = self.xprv.to_priv().to_bytes(); + Ok( + ecies::decrypt(&rc_prv, msg).map_err(|err| SignerError::Generic { + err: format!("Could not decrypt data: {err}"), + })?, + ) + } } #[cfg(test)] diff --git a/lib/core/src/sync/client.rs b/lib/core/src/sync/client.rs new file mode 100644 index 000000000..8f9682e69 --- /dev/null +++ b/lib/core/src/sync/client.rs @@ -0,0 +1,59 @@ +use anyhow::{anyhow, Result}; + +use async_trait::async_trait; +use log::debug; +use tokio::sync::Mutex; + +use super::model::sync::{ + syncer_client::SyncerClient as ProtoSyncerClient, ListChangesReply, ListChangesRequest, + SetRecordReply, SetRecordRequest, +}; + +#[async_trait] +pub(crate) trait SyncerClient: Send + Sync { + async fn connect(&self, connect_url: String) -> Result<()>; + async fn push(&self, req: SetRecordRequest) -> Result; + async fn pull(&self, req: ListChangesRequest) -> Result; + async fn disconnect(&self) -> Result<()>; +} + +pub(crate) struct BreezSyncerClient { + inner: Mutex>>, +} + +impl BreezSyncerClient { + pub(crate) fn new() -> Self { + Self { + inner: Default::default(), + } + } +} + +#[async_trait] +impl SyncerClient for BreezSyncerClient { + async fn connect(&self, connect_url: String) -> Result<()> { + let mut client = self.inner.lock().await; + *client = Some(ProtoSyncerClient::connect(connect_url.clone()).await?); + debug!("Successfully connected to {connect_url}"); + Ok(()) + } + + async fn push(&self, req: SetRecordRequest) -> Result { + let Some(mut client) = self.inner.lock().await.clone() else { + return Err(anyhow!("Cannot run `set_record`: client not connected")); + }; + Ok(client.set_record(req).await?.into_inner()) + } + async fn pull(&self, req: ListChangesRequest) -> Result { + let Some(mut client) = self.inner.lock().await.clone() else { + return Err(anyhow!("Cannot run `list_changes`: client not connected")); + }; + Ok(client.list_changes(req).await?.into_inner()) + } + + async fn disconnect(&self) -> Result<()> { + let mut client = self.inner.lock().await; + *client = None; + Ok(()) + } +} diff --git a/lib/core/src/sync/mod.rs b/lib/core/src/sync/mod.rs index d9a52514a..2eb80240d 100644 --- a/lib/core/src/sync/mod.rs +++ b/lib/core/src/sync/mod.rs @@ -1 +1,318 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::{anyhow, Result}; + +use crate::{persist::Persister, prelude::Signer}; + +use self::client::SyncerClient; +use self::model::sync::Record; +use self::model::DecryptedRecord; +use self::model::{ + data::{ChainSyncData, ReceiveSyncData, SendSyncData, SyncData}, + sync::ListChangesRequest, + RecordType, SyncState, +}; + +pub(crate) mod client; pub(crate) mod model; + +pub(crate) struct SyncService { + remote_url: String, + persister: Arc, + signer: Arc>, + client: Box, +} + +impl SyncService { + pub(crate) fn new( + remote_url: String, + persister: Arc, + signer: Arc>, + client: Box, + ) -> Self { + Self { + remote_url, + persister, + signer, + client, + } + } + + fn commit_record( + &self, + decrypted_record: &DecryptedRecord, + sync_state: SyncState, + is_update: bool, + last_commit_time: Option, + ) -> Result<()> { + match decrypted_record.data.clone() { + SyncData::Chain(chain_data) => self.persister.commit_incoming_chain_swap( + &chain_data, + sync_state, + is_update, + last_commit_time, + )?, + SyncData::Send(send_data) => self.persister.commit_incoming_send_swap( + &send_data, + sync_state, + is_update, + last_commit_time, + )?, + SyncData::Receive(receive_data) => self.persister.commit_incoming_receive_swap( + &receive_data, + sync_state, + is_update, + last_commit_time, + )?, + } + Ok(()) + } + + fn load_sync_data(&self, data_id: &str, record_type: RecordType) -> Result { + let data = match record_type { + RecordType::Receive => { + let receive_data: ReceiveSyncData = self + .persister + .fetch_receive_swap_by_id(data_id)? + .ok_or(anyhow!("Could not find Receive swap {data_id}"))? + .into(); + SyncData::Receive(receive_data) + } + RecordType::Send => { + let send_data: SendSyncData = self + .persister + .fetch_send_swap_by_id(data_id)? + .ok_or(anyhow!("Could not find Send swap {data_id}"))? + .into(); + SyncData::Send(send_data) + } + RecordType::Chain => { + let chain_data: ChainSyncData = self + .persister + .fetch_chain_swap_by_id(data_id)? + .ok_or(anyhow!("Could not find Chain swap {data_id}"))? + .into(); + SyncData::Chain(chain_data) + } + }; + Ok(data) + } + + async fn fetch_and_save_records(&self) -> Result<()> { + log::info!("Initiating record pull"); + + let local_latest_revision = self + .persister + .get_sync_settings()? + .latest_revision + .unwrap_or(0); + let req = ListChangesRequest::new(local_latest_revision, self.signer.clone())?; + let incoming_records = self.client.pull(req).await?.changes; + + self.persister.set_incoming_records(&incoming_records)?; + let remote_latest_revision = incoming_records.last().map(|record| record.revision); + if let Some(latest_revision) = remote_latest_revision { + self.persister.set_sync_settings(HashMap::from([( + "latest_revision", + latest_revision.to_string(), + )]))?; + log::info!( + "Successfully pulled and persisted records. New latest revision: {latest_revision}" + ); + } else { + log::info!("No new records found. Local latest revision: {local_latest_revision}"); + } + + Ok(()) + } + + async fn handle_pull(&self, new_record: Record) -> Result<()> { + log::info!("Handling pull for record record_id {}", &new_record.id); + + // Step 3: Check whether or not record is applicable (from its schema_version) + if !new_record.is_applicable()? { + return Err(anyhow!("Record is not applicable: schema_version too high")); + } + + // Step 4: Check whether we already have this record, and if the revision is newer + let maybe_sync_state = self.persister.get_sync_state_by_record_id(&new_record.id)?; + if let Some(sync_state) = &maybe_sync_state { + if sync_state.record_revision >= new_record.revision { + log::info!("Remote record revision is lower or equal to the persisted one. Skipping update."); + return Ok(()); + } + } + + // Step 5: Decrypt the incoming record + let mut decrypted_record = new_record.decrypt(self.signer.clone())?; + + // Step 6: Merge with outgoing records, if present + let maybe_outgoing_changes = self + .persister + .get_sync_outgoing_changes_by_id(&decrypted_record.id)?; + + if let Some(outgoing_changes) = &maybe_outgoing_changes { + if let Some(updated_fields) = &outgoing_changes.updated_fields { + let local_data = + self.load_sync_data(decrypted_record.data.id(), outgoing_changes.record_type)?; + decrypted_record.data.merge(&local_data, updated_fields)?; + } + } + + // Step 7: Apply the changes and update sync state + let new_sync_state = SyncState { + data_id: decrypted_record.data.id().to_string(), + record_id: decrypted_record.id.clone(), + record_revision: decrypted_record.revision, + is_local: maybe_sync_state + .as_ref() + .map(|state| state.is_local) + .unwrap_or(false), + }; + let is_update = maybe_sync_state.is_some(); + let last_commit_time = maybe_outgoing_changes.map(|details| details.commit_time); + self.commit_record( + &decrypted_record, + new_sync_state, + is_update, + last_commit_time, + )?; + + log::info!( + "Successfully pulled record record_id {}", + &decrypted_record.id + ); + + Ok(()) + } + + async fn pull(&self) -> Result<()> { + // Step 1: Fetch and save incoming records from remote, then update local tip + self.fetch_and_save_records().await?; + + // Step 2: Grab all pending incoming records from the database, merge them with + // outgoing if necessary, then apply + let mut succeded = vec![]; + let incoming_records = self.persister.get_incoming_records()?; + for new_record in incoming_records { + let record_id = new_record.id.clone(); + if let Err(err) = self.handle_pull(new_record).await { + log::debug!("Could not handle incoming record {record_id}: {err:?}"); + continue; + } + succeded.push(record_id); + } + + if !succeded.is_empty() { + self.persister.remove_incoming_records(succeded)?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use anyhow::{anyhow, Result}; + use std::sync::Arc; + use tokio::sync::mpsc; + + use crate::{ + prelude::Signer, + test_utils::{ + persist::new_persister, + sync::{ + new_chain_sync_data, new_receive_sync_data, new_send_sync_data, MockSyncerClient, + }, + wallet::MockSigner, + }, + }; + + use super::{ + model::{data::SyncData, sync::Record}, + SyncService, + }; + + #[tokio::test] + async fn test_incoming_sync_create_and_update() -> Result<()> { + let (_temp_dir, persister) = new_persister()?; + let persister = Arc::new(persister); + + let signer: Arc> = Arc::new(Box::new(MockSigner::new())); + + let sync_data = vec![ + SyncData::Receive(new_receive_sync_data()), + SyncData::Send(new_send_sync_data(None)), + SyncData::Chain(new_chain_sync_data(None)), + ]; + let incoming_records = vec![ + Record::new(sync_data[0].clone(), 1, signer.clone())?, + Record::new(sync_data[1].clone(), 2, signer.clone())?, + Record::new(sync_data[2].clone(), 3, signer.clone())?, + ]; + + let (incoming_tx, incoming_rx) = mpsc::channel::(10); + let client = Box::new(MockSyncerClient::new(incoming_rx)); + let sync_service = + SyncService::new("".to_string(), persister.clone(), signer.clone(), client); + + for record in incoming_records { + incoming_tx.send(record).await?; + } + sync_service.pull().await?; + + if let Some(receive_swap) = persister.fetch_receive_swap_by_id(&sync_data[0].id())? { + assert!(receive_swap.description.is_none()); + assert!(receive_swap.payment_hash.is_none()); + } else { + return Err(anyhow!("Receive swap not found")); + } + if let Some(send_swap) = persister.fetch_send_swap_by_id(&sync_data[1].id())? { + assert!(send_swap.preimage.is_none()); + assert!(send_swap.description.is_none()); + assert!(send_swap.payment_hash.is_none()); + } else { + return Err(anyhow!("Send swap not found")); + } + if let Some(chain_swap) = persister.fetch_chain_swap_by_id(&sync_data[2].id())? { + assert!(chain_swap.claim_address.is_none()); + assert!(chain_swap.description.is_none()); + assert!(chain_swap.accept_zero_conf.eq(&true)); + } else { + return Err(anyhow!("Chain swap not found")); + } + + let new_preimage = Some("preimage".to_string()); + let new_accept_zero_conf = false; + let new_server_lockup_tx_id = Some("server_lockup_tx_id".to_string()); + let sync_data = vec![ + SyncData::Send(new_send_sync_data(new_preimage.clone())), + SyncData::Chain(new_chain_sync_data(Some(new_accept_zero_conf))), + ]; + let incoming_records = vec![ + Record::new(sync_data[0].clone(), 4, signer.clone())?, + Record::new(sync_data[1].clone(), 5, signer.clone())?, + Record::new(sync_data[2].clone(), 6, signer.clone())?, + ]; + + for record in incoming_records { + incoming_tx.send(record).await?; + } + sync_service.pull().await?; + + if let Some(send_swap) = persister.fetch_send_swap_by_id(&sync_data[1].id())? { + assert_eq!(send_swap.preimage, new_preimage); + } else { + return Err(anyhow!("Send swap not found")); + } + if let Some(chain_swap) = persister.fetch_chain_swap_by_id(&sync_data[2].id())? { + assert_eq!(chain_swap.accept_zero_conf, new_accept_zero_conf); + assert_eq!(chain_swap.server_lockup_tx_id, new_server_lockup_tx_id); + } else { + return Err(anyhow!("Chain swap not found")); + } + + Ok(()) + } +} diff --git a/lib/core/src/sync/model/client.rs b/lib/core/src/sync/model/client.rs new file mode 100644 index 000000000..84f053b90 --- /dev/null +++ b/lib/core/src/sync/model/client.rs @@ -0,0 +1,31 @@ +use anyhow::Result; +use openssl::sha::sha256; +use std::sync::Arc; + +use crate::{ + prelude::{Signer, SignerError}, + utils, +}; + +use super::{sync::ListChangesRequest, MESSAGE_PREFIX}; + +fn sign_message(msg: &[u8], signer: Arc>) -> Result { + let msg = [MESSAGE_PREFIX, msg].concat(); + let digest = sha256(&sha256(&msg)); + signer + .sign_ecdsa_recoverable(digest.into()) + .map(|bytes| zbase32::encode_full_bytes(&bytes)) +} + +impl ListChangesRequest { + pub(crate) fn new(since_revision: u64, signer: Arc>) -> Result { + let request_time = utils::now(); + let msg = format!("{}-{}", since_revision, request_time); + let signature = sign_message(msg.as_bytes(), signer)?; + Ok(Self { + since_revision, + request_time, + signature, + }) + } +} diff --git a/lib/core/src/sync/model/data.rs b/lib/core/src/sync/model/data.rs new file mode 100644 index 000000000..e80881e90 --- /dev/null +++ b/lib/core/src/sync/model/data.rs @@ -0,0 +1,174 @@ +use serde::{Deserialize, Serialize}; + +use crate::prelude::{ChainSwap, Direction, ReceiveSwap, SendSwap}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub(crate) struct ChainSyncData { + pub(crate) swap_id: String, + pub(crate) preimage: String, + pub(crate) create_response_json: String, + pub(crate) direction: Direction, + pub(crate) lockup_address: String, + pub(crate) claim_fees_sat: u64, + pub(crate) claim_private_key: String, + pub(crate) refund_private_key: String, + pub(crate) timeout_block_height: u32, + pub(crate) payer_amount_sat: u64, + pub(crate) receiver_amount_sat: u64, + pub(crate) accept_zero_conf: bool, + pub(crate) created_at: u32, + pub(crate) description: Option, +} + +impl ChainSyncData { + pub(crate) fn merge(&mut self, other: &Self, updated_fields: &[String]) { + for field in updated_fields { + match field.as_str() { + "accept_zero_conf" => self.accept_zero_conf = other.accept_zero_conf, + _ => continue, + } + } + } +} + +impl From for ChainSyncData { + fn from(value: ChainSwap) -> Self { + Self { + swap_id: value.id, + preimage: value.preimage, + create_response_json: value.create_response_json, + direction: value.direction, + lockup_address: value.lockup_address, + claim_fees_sat: value.claim_fees_sat, + claim_private_key: value.claim_private_key, + refund_private_key: value.refund_private_key, + timeout_block_height: value.timeout_block_height, + payer_amount_sat: value.payer_amount_sat, + receiver_amount_sat: value.receiver_amount_sat, + accept_zero_conf: value.accept_zero_conf, + created_at: value.created_at, + description: value.description, + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub(crate) struct SendSyncData { + pub(crate) swap_id: String, + pub(crate) invoice: String, + pub(crate) create_response_json: String, + pub(crate) refund_private_key: String, + pub(crate) payer_amount_sat: u64, + pub(crate) receiver_amount_sat: u64, + pub(crate) created_at: u32, + pub(crate) preimage: Option, + pub(crate) payment_hash: Option, + pub(crate) description: Option, +} + +impl SendSyncData { + pub(crate) fn merge(&mut self, other: &Self, updated_fields: &[String]) { + for field in updated_fields { + match field.as_str() { + "preimage" => clone_if_set(&mut self.preimage, &other.preimage), + _ => continue, + } + } + } +} + +impl From for SendSyncData { + fn from(value: SendSwap) -> Self { + Self { + swap_id: value.id, + payment_hash: value.payment_hash, + invoice: value.invoice, + create_response_json: value.create_response_json, + refund_private_key: value.refund_private_key, + payer_amount_sat: value.payer_amount_sat, + receiver_amount_sat: value.receiver_amount_sat, + created_at: value.created_at, + preimage: value.preimage, + description: value.description, + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub(crate) struct ReceiveSyncData { + pub(crate) swap_id: String, + pub(crate) invoice: String, + pub(crate) preimage: String, + pub(crate) create_response_json: String, + pub(crate) claim_fees_sat: u64, + pub(crate) claim_private_key: String, + pub(crate) payer_amount_sat: u64, + pub(crate) receiver_amount_sat: u64, + pub(crate) mrh_address: String, + pub(crate) created_at: u32, + pub(crate) payment_hash: Option, + pub(crate) description: Option, +} + +impl From for ReceiveSyncData { + fn from(value: ReceiveSwap) -> Self { + Self { + swap_id: value.id, + payment_hash: value.payment_hash, + invoice: value.invoice, + preimage: value.preimage, + create_response_json: value.create_response_json, + claim_fees_sat: value.claim_fees_sat, + claim_private_key: value.claim_private_key, + payer_amount_sat: value.payer_amount_sat, + receiver_amount_sat: value.receiver_amount_sat, + mrh_address: value.mrh_address, + created_at: value.created_at, + description: value.description, + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(tag = "data_type", content = "data")] +pub(crate) enum SyncData { + Chain(ChainSyncData), + Send(SendSyncData), + Receive(ReceiveSyncData), +} + +impl SyncData { + pub(crate) fn id(&self) -> &str { + match self { + SyncData::Chain(chain_data) => &chain_data.swap_id, + SyncData::Send(send_data) => &send_data.swap_id, + SyncData::Receive(receive_data) => &receive_data.swap_id, + } + } + + pub(crate) fn to_bytes(&self) -> serde_json::Result> { + serde_json::to_vec(self) + } + + pub(crate) fn merge(&mut self, other: &Self, updated_fields: &[String]) -> anyhow::Result<()> { + match (self, other) { + (SyncData::Chain(ref mut base), SyncData::Chain(other)) => { + base.merge(other, updated_fields) + } + (SyncData::Send(ref mut base), SyncData::Send(other)) => { + base.merge(other, updated_fields) + } + (SyncData::Receive(ref mut _base), SyncData::Receive(_other)) => { + log::warn!("Attempting to merge for unnecessary type SyncData::Receive"); + } + _ => return Err(anyhow::anyhow!("Cannot merge data from two separate types")), + }; + Ok(()) + } +} + +fn clone_if_set(s: &mut Option, other: &Option) { + if other.is_some() { + s.clone_from(other) + } +} diff --git a/lib/core/src/sync/model/mod.rs b/lib/core/src/sync/model/mod.rs index fac48b461..98e460bce 100644 --- a/lib/core/src/sync/model/mod.rs +++ b/lib/core/src/sync/model/mod.rs @@ -1,10 +1,26 @@ +use std::sync::Arc; + +use self::{data::SyncData, sync::Record}; +use crate::prelude::Signer; +use anyhow::Result; +use lazy_static::lazy_static; +use lwk_wollet::hashes::hex::DisplayHex; +use openssl::sha::sha256; use rusqlite::{ types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef}, ToSql, }; +use semver::Version; +pub(crate) mod client; +pub(crate) mod data; pub(crate) mod sync; +const MESSAGE_PREFIX: &[u8; 13] = b"realtimesync:"; +lazy_static! { + static ref CURRENT_SCHEMA_VERSION: Version = Version::parse("0.0.1").unwrap(); +} + #[derive(Copy, Clone)] pub(crate) enum RecordType { Receive = 0, @@ -51,3 +67,72 @@ pub(crate) struct SyncOutgoingChanges { pub(crate) commit_time: u32, pub(crate) updated_fields: Option>, } + +pub(crate) struct DecryptedRecord { + pub(crate) revision: u64, + pub(crate) id: String, + #[allow(dead_code)] + pub(crate) schema_version: String, + pub(crate) data: SyncData, +} + +impl Record { + pub(crate) fn new( + data: SyncData, + revision: u64, + signer: Arc>, + ) -> Result { + let id = Self::get_id_from_sync_data(&data); + let data = data.to_bytes()?; + let data = signer + .ecies_encrypt(&data) + .map_err(|err| anyhow::anyhow!("Could not encrypt sync data: {err:?}"))?; + let schema_version = CURRENT_SCHEMA_VERSION.to_string(); + Ok(Self { + id, + revision, + schema_version, + data, + }) + } + + fn id(prefix: String, data_id: &str) -> String { + sha256((prefix + ":" + data_id).as_bytes()).to_lower_hex_string() + } + + pub(crate) fn get_id_from_sync_data(data: &SyncData) -> String { + let prefix = match data { + SyncData::Chain(_) => "chain-swap", + SyncData::Send(_) => "send-swap", + SyncData::Receive(_) => "receive-swap", + } + .to_string(); + Self::id(prefix, data.id()) + } + + pub(crate) fn get_id_from_record_type(record_type: RecordType, data_id: &str) -> String { + let prefix = match record_type { + RecordType::Chain => "chain-swap", + RecordType::Send => "send-swap", + RecordType::Receive => "receive-swap", + } + .to_string(); + Self::id(prefix, data_id) + } + + pub(crate) fn is_applicable(&self) -> Result { + let record_version = Version::parse(&self.schema_version)?; + Ok(CURRENT_SCHEMA_VERSION.major >= record_version.major) + } + + pub(crate) fn decrypt(self, signer: Arc>) -> Result { + let dec_data = signer.ecies_decrypt(self.data.as_slice())?; + let data = serde_json::from_slice(&dec_data)?; + Ok(DecryptedRecord { + id: self.id, + revision: self.revision, + schema_version: self.schema_version, + data, + }) + } +} diff --git a/lib/core/src/test_utils/mod.rs b/lib/core/src/test_utils/mod.rs index 9e71df43e..9312f0b2a 100644 --- a/lib/core/src/test_utils/mod.rs +++ b/lib/core/src/test_utils/mod.rs @@ -10,6 +10,7 @@ pub(crate) mod sdk; pub(crate) mod send_swap; pub(crate) mod status_stream; pub(crate) mod swapper; +pub(crate) mod sync; pub(crate) mod wallet; pub(crate) fn generate_random_string(size: usize) -> String { diff --git a/lib/core/src/test_utils/sync.rs b/lib/core/src/test_utils/sync.rs new file mode 100644 index 000000000..aaa5d8027 --- /dev/null +++ b/lib/core/src/test_utils/sync.rs @@ -0,0 +1,102 @@ +#![cfg(test)] + +use crate::{ + prelude::Direction, + sync::{ + client::SyncerClient, + model::{ + data::{ChainSyncData, ReceiveSyncData, SendSyncData}, + sync::{ + ListChangesReply, ListChangesRequest, Record, SetRecordReply, SetRecordRequest, + }, + }, + }, +}; +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::{mpsc::Receiver, Mutex}; + +pub(crate) struct MockSyncerClient { + pub(crate) incoming_rx: Mutex>, +} + +impl MockSyncerClient { + pub(crate) fn new(incoming_rx: Receiver) -> Self { + Self { + incoming_rx: Mutex::new(incoming_rx), + } + } +} + +#[async_trait] +impl SyncerClient for MockSyncerClient { + async fn connect(&self, _connect_url: String) -> Result<()> { + todo!() + } + + async fn push(&self, _req: SetRecordRequest) -> Result { + todo!() + } + + async fn pull(&self, _req: ListChangesRequest) -> Result { + let mut rx = self.incoming_rx.lock().await; + let mut changes = Vec::with_capacity(3); + rx.recv_many(&mut changes, 3).await; + Ok(ListChangesReply { changes }) + } + + async fn disconnect(&self) -> Result<()> { + todo!() + } +} + +pub(crate) fn new_receive_sync_data() -> ReceiveSyncData { + ReceiveSyncData { + swap_id: "receive-swap".to_string(), + invoice: "".to_string(), + create_response_json: "".to_string(), + payer_amount_sat: 0, + receiver_amount_sat: 0, + created_at: 0, + claim_fees_sat: 0, + claim_private_key: "".to_string(), + mrh_address: "".to_string(), + preimage: "".to_string(), + payment_hash: None, + description: None, + } +} + +pub(crate) fn new_send_sync_data(preimage: Option) -> SendSyncData { + SendSyncData { + swap_id: "send-swap".to_string(), + invoice: "".to_string(), + create_response_json: "".to_string(), + refund_private_key: "".to_string(), + payer_amount_sat: 0, + receiver_amount_sat: 0, + created_at: 0, + preimage, + payment_hash: None, + description: None, + } +} + +pub(crate) fn new_chain_sync_data(accept_zero_conf: Option) -> ChainSyncData { + ChainSyncData { + swap_id: "chain-swap".to_string(), + preimage: "".to_string(), + create_response_json: "".to_string(), + direction: Direction::Incoming, + lockup_address: "".to_string(), + claim_fees_sat: 0, + claim_private_key: "".to_string(), + refund_private_key: "".to_string(), + timeout_block_height: 0, + payer_amount_sat: 0, + receiver_amount_sat: 0, + accept_zero_conf: accept_zero_conf.unwrap_or(true), + created_at: 0, + description: None, + } +} diff --git a/lib/core/src/test_utils/wallet.rs b/lib/core/src/test_utils/wallet.rs index cd7a6c254..db32c6060 100644 --- a/lib/core/src/test_utils/wallet.rs +++ b/lib/core/src/test_utils/wallet.rs @@ -10,9 +10,12 @@ use crate::{ }; use anyhow::Result; use async_trait::async_trait; +use boltz_client::{Keypair, Secp256k1}; use lazy_static::lazy_static; use lwk_wollet::{ elements::{Address, Transaction}, + elements_miniscript::ToPublicKey as _, + secp256k1::Message, Tip, WalletTx, }; @@ -91,11 +94,15 @@ impl OnchainWallet for MockWallet { } } -pub(crate) struct MockSigner {} +pub(crate) struct MockSigner { + keypair: Keypair, +} impl MockSigner { pub(crate) fn new() -> Self { - Self {} + let secp = Secp256k1::new(); + let keypair = Keypair::new(&secp, &mut bip39::rand::thread_rng()); + Self { keypair } } } @@ -112,8 +119,16 @@ impl Signer for MockSigner { todo!() } - fn sign_ecdsa_recoverable(&self, _msg: Vec) -> Result, SignerError> { - todo!() + fn sign_ecdsa_recoverable(&self, msg: Vec) -> Result, SignerError> { + let secp = Secp256k1::new(); + let msg: Message = Message::from_digest_slice(msg.as_slice()) + .map_err(|e| SignerError::Generic { err: e.to_string() })?; + // Get message signature and encode to zbase32 + let recoverable_sig = secp.sign_ecdsa_recoverable(&msg, &self.keypair.secret_key()); + let (recovery_id, sig) = recoverable_sig.serialize_compact(); + let mut complete_signature = vec![31 + recovery_id.to_i32() as u8]; + complete_signature.extend_from_slice(&sig); + Ok(complete_signature) } fn slip77_master_blinding_key(&self) -> Result, SignerError> { @@ -123,4 +138,22 @@ impl Signer for MockSigner { fn hmac_sha256(&self, _msg: Vec, _derivation_path: String) -> Result, SignerError> { todo!() } + + fn ecies_encrypt(&self, msg: &[u8]) -> Result, SignerError> { + let rc_pub = self.keypair.public_key().to_public_key().to_bytes(); + Ok( + ecies::encrypt(&rc_pub, msg).map_err(|err| SignerError::Generic { + err: format!("Could not encrypt data: {err}"), + })?, + ) + } + + fn ecies_decrypt(&self, msg: &[u8]) -> Result, SignerError> { + let rc_prv = self.keypair.secret_bytes(); + Ok( + ecies::decrypt(&rc_prv, msg).map_err(|err| SignerError::Generic { + err: format!("Could not decrypt data: {err}"), + })?, + ) + } }