From 9a43ba8862892ca1d3fe13be0972150bc365151f Mon Sep 17 00:00:00 2001 From: yse Date: Sun, 10 Nov 2024 16:42:08 +0100 Subject: [PATCH] feat: add `pull` method Uses the `pull_changes` method to grab the remote records, merges them with outgoing sync data (if necessary) and persists them --- lib/core/src/persist/chain.rs | 2 +- lib/core/src/persist/receive.rs | 2 +- lib/core/src/persist/send.rs | 2 +- lib/core/src/sync/mod.rs | 117 +++++++++++++++++++++++++++ lib/core/src/sync/model/data.rs | 137 +++++++++++++++++++++++++++++++- lib/core/src/sync/model/mod.rs | 58 ++++++++++++++ 6 files changed, 314 insertions(+), 4 deletions(-) diff --git a/lib/core/src/persist/chain.rs b/lib/core/src/persist/chain.rs index 2425b9c42..c5ae501ea 100644 --- a/lib/core/src/persist/chain.rs +++ b/lib/core/src/persist/chain.rs @@ -19,7 +19,7 @@ impl Persister { // so we split up the insert into two statements. let mut stmt = con.prepare( " - INSERT INTO chain_swaps ( + INSERT OR REPLACE INTO chain_swaps ( id, id_hash, direction, diff --git a/lib/core/src/persist/receive.rs b/lib/core/src/persist/receive.rs index 13a31f004..92b9c13fb 100644 --- a/lib/core/src/persist/receive.rs +++ b/lib/core/src/persist/receive.rs @@ -17,7 +17,7 @@ impl Persister { let mut stmt = con.prepare( " - INSERT INTO receive_swaps ( + INSERT OR REPLACE INTO receive_swaps ( id, id_hash, preimage, diff --git a/lib/core/src/persist/send.rs b/lib/core/src/persist/send.rs index f956d1714..a93d8ac13 100644 --- a/lib/core/src/persist/send.rs +++ b/lib/core/src/persist/send.rs @@ -17,7 +17,7 @@ impl Persister { let mut stmt = con.prepare( " - INSERT INTO send_swaps ( + INSERT OR REPLACE INTO send_swaps ( id, id_hash, invoice, diff --git a/lib/core/src/sync/mod.rs b/lib/core/src/sync/mod.rs index fafc8d037..6ef2e61ed 100644 --- a/lib/core/src/sync/mod.rs +++ b/lib/core/src/sync/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use anyhow::{anyhow, Result}; @@ -5,6 +6,13 @@ use anyhow::{anyhow, Result}; use crate::{persist::Persister, prelude::Signer}; use self::client::SyncerClient; +use self::model::DecryptedRecord; +use self::model::{ + data::{ChainSyncData, ReceiveSyncData, SendSyncData, SyncData}, + sync::ListChangesRequest, + RecordType, SyncState, +}; + pub(crate) mod client; pub(crate) mod model; @@ -30,4 +38,113 @@ impl SyncService { } } + fn apply_record(&self, decrypted_record: &DecryptedRecord) -> Result<()> { + match decrypted_record.data.clone() { + SyncData::Chain(chain_data) => self.persister.insert_chain_swap(&chain_data.into())?, + SyncData::Send(send_data) => self.persister.insert_send_swap(&send_data.into())?, + SyncData::Receive(receive_data) => { + self.persister.insert_receive_swap(&receive_data.into())? + } + } + Ok(()) + } + + fn fetch_sync_data(&self, data_id: &str, record_type: RecordType) -> Result { + let data = match record_type { + RecordType::Receive => { + let receive_data: ReceiveSyncData = self + .persister + .fetch_receive_swap_by_id(data_id)? + .ok_or(anyhow!("Could not find Receive swap {data_id}"))? + .into(); + SyncData::Receive(receive_data) + } + RecordType::Send => { + let send_data: SendSyncData = self + .persister + .fetch_send_swap_by_id(data_id)? + .ok_or(anyhow!("Could not find Send swap {data_id}"))? + .into(); + SyncData::Send(send_data) + } + RecordType::Chain => { + let chain_data: ChainSyncData = self + .persister + .fetch_chain_swap_by_id(data_id)? + .ok_or(anyhow!("Could not find Chain swap {data_id}"))? + .into(); + SyncData::Chain(chain_data) + } + }; + Ok(data) + } + + async fn pull(&self) -> Result<()> { + let latest_revision = self + .persister + .get_sync_settings()? + .latest_revision + .unwrap_or(0); + + let req = ListChangesRequest::new(latest_revision, self.signer.clone())?; + let incoming_records = self.client.pull(req).await?.changes; + let latest_revision = incoming_records.last().map(|record| record.revision); + + for new_record in incoming_records { + // Step 1: Check whether or not record is updatable (from its schema_version) + if new_record.is_major_update()? { + self.persister.set_incoming_record(&new_record)?; + continue; + } + + // Step 2: Check whether we already have this record, and if the revision is newer + let maybe_sync_state = self.persister.get_sync_state_by_record_id(&new_record.id)?; + if let Some(sync_state) = &maybe_sync_state { + if sync_state.record_revision > new_record.revision { + continue; + } + } + + // Step 3: Decrypt the incoming record + let mut decrypted_record = new_record.decrypt(self.signer.clone())?; + + // Step 4: Merge with outgoing records, if present + let maybe_outgoing_details = self + .persister + .get_sync_outgoing_details(&decrypted_record.id)?; + + if let Some(outgoing_details) = maybe_outgoing_details { + if let Some(updated_fields) = outgoing_details.updated_fields { + let local_data = self.fetch_sync_data( + &decrypted_record.data.id(), + outgoing_details.record_type, + )?; + decrypted_record.data.merge(&local_data, updated_fields)?; + } + } + + // Step 5: Apply the changes + self.apply_record(&decrypted_record)?; + + // Step 6: Update sync state + self.persister.set_sync_state(SyncState { + data_id: decrypted_record.data.id().to_string(), + record_id: decrypted_record.id, + record_revision: decrypted_record.revision, + is_local: maybe_sync_state + .map(|state| state.is_local) + .unwrap_or(false), + })?; + } + + // Step 7: Update local tip + if let Some(latest_revision) = latest_revision { + self.persister.set_sync_settings(HashMap::from([ + ("remote_url", self.remote_url.clone()), + ("latest_revision", latest_revision.to_string()), + ]))?; + } + + Ok(()) + } } diff --git a/lib/core/src/sync/model/data.rs b/lib/core/src/sync/model/data.rs index 9c4d65596..dcc86cd39 100644 --- a/lib/core/src/sync/model/data.rs +++ b/lib/core/src/sync/model/data.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use crate::prelude::{ChainSwap, Direction, ReceiveSwap, SendSwap}; +use crate::prelude::{ChainSwap, Direction, PaymentState, ReceiveSwap, SendSwap}; #[derive(Serialize, Deserialize, Clone, Debug)] pub(crate) struct ChainSyncData { @@ -21,6 +21,46 @@ pub(crate) struct ChainSyncData { pub(crate) claim_address: Option, } +impl ChainSyncData { + pub(crate) fn merge(&mut self, other: &Self, updated_fields: Vec) { + for field in updated_fields { + match field.as_str() { + "description" => clone_if_changed(&mut self.description, &other.description), + "claim_address" => clone_if_changed(&mut self.claim_address, &other.claim_address), + "accept_zero_conf" => self.accept_zero_conf = other.accept_zero_conf, + _ => continue, + } + } + } +} + +impl Into for ChainSyncData { + fn into(self) -> ChainSwap { + ChainSwap { + id: self.swap_id, + preimage: self.preimage, + direction: self.direction, + create_response_json: self.create_response_json, + claim_private_key: self.claim_private_key, + refund_private_key: self.refund_private_key, + payer_amount_sat: self.payer_amount_sat, + receiver_amount_sat: self.receiver_amount_sat, + claim_fees_sat: self.claim_fees_sat, + claim_address: self.claim_address, + lockup_address: self.lockup_address, + timeout_block_height: self.timeout_block_height, + accept_zero_conf: self.accept_zero_conf, + description: self.description, + server_lockup_tx_id: None, + user_lockup_tx_id: None, + claim_tx_id: None, + refund_tx_id: None, + created_at: self.created_at, + state: PaymentState::Created, + } + } +} + impl From for ChainSyncData { fn from(value: ChainSwap) -> Self { Self { @@ -57,6 +97,39 @@ pub(crate) struct SendSyncData { pub(crate) description: Option, } +impl SendSyncData { + pub(crate) fn merge(&mut self, other: &Self, updated_fields: Vec) { + for field in updated_fields { + match field.as_str() { + "preimage" => clone_if_changed(&mut self.preimage, &other.preimage), + "payment_hash" => clone_if_changed(&mut self.payment_hash, &other.payment_hash), + "description" => clone_if_changed(&mut self.description, &other.description), + _ => continue, + } + } + } +} + +impl Into for SendSyncData { + fn into(self) -> SendSwap { + SendSwap { + id: self.swap_id, + preimage: self.preimage, + create_response_json: self.create_response_json, + invoice: self.invoice, + payment_hash: self.payment_hash, + description: self.description, + payer_amount_sat: self.payer_amount_sat, + receiver_amount_sat: self.receiver_amount_sat, + refund_private_key: self.refund_private_key, + lockup_tx_id: None, + refund_tx_id: None, + created_at: self.created_at, + state: PaymentState::Created, + } + } +} + impl From for SendSyncData { fn from(value: SendSwap) -> Self { Self { @@ -91,6 +164,42 @@ pub(crate) struct ReceiveSyncData { pub(crate) description: Option, } +impl ReceiveSyncData { + pub(crate) fn merge(&mut self, other: &Self, updated_fields: Vec) { + for field in updated_fields { + match field.as_str() { + "payment_hash" => clone_if_changed(&mut self.payment_hash, &other.payment_hash), + "description" => clone_if_changed(&mut self.description, &other.description), + _ => continue, + } + } + } +} + +impl Into for ReceiveSyncData { + fn into(self) -> ReceiveSwap { + ReceiveSwap { + id: self.swap_id, + preimage: self.preimage, + create_response_json: self.create_response_json, + claim_private_key: self.claim_private_key, + invoice: self.invoice, + payment_hash: self.payment_hash, + description: self.description, + payer_amount_sat: self.payer_amount_sat, + receiver_amount_sat: self.receiver_amount_sat, + claim_fees_sat: self.claim_fees_sat, + mrh_address: self.mrh_address, + mrh_script_pubkey: self.mrh_script_pubkey, + mrh_tx_id: None, + claim_tx_id: None, + lockup_tx_id: None, + created_at: self.created_at, + state: PaymentState::Created, + } + } +} + impl From for ReceiveSyncData { fn from(value: ReceiveSwap) -> Self { Self { @@ -131,4 +240,30 @@ impl SyncData { pub(crate) fn to_bytes(&self) -> serde_json::Result> { serde_json::to_vec(self) } + + pub(crate) fn merge( + &mut self, + other: &Self, + updated_fields: Vec, + ) -> anyhow::Result<()> { + match (self, other) { + (SyncData::Chain(ref mut base), SyncData::Chain(other)) => { + base.merge(other, updated_fields) + } + (SyncData::Send(ref mut base), SyncData::Send(other)) => { + base.merge(other, updated_fields) + } + (SyncData::Receive(ref mut base), SyncData::Receive(other)) => { + base.merge(other, updated_fields) + } + _ => return Err(anyhow::anyhow!("Cannot merge data from two separate types")), + }; + Ok(()) + } +} + +fn clone_if_changed(s: &mut Option, other: &Option) { + if other.is_some() { + s.clone_from(other) + } } diff --git a/lib/core/src/sync/model/mod.rs b/lib/core/src/sync/model/mod.rs index 87b3bcd33..e4c31e354 100644 --- a/lib/core/src/sync/model/mod.rs +++ b/lib/core/src/sync/model/mod.rs @@ -1,3 +1,8 @@ +use std::sync::Arc; + +use self::{data::SyncData, sync::Record}; +use crate::prelude::Signer; +use anyhow::Result; use rusqlite::{ types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef}, ToSql, @@ -8,6 +13,7 @@ pub(crate) mod data; pub(crate) mod sync; const MESSAGE_PREFIX: &[u8; 13] = b"realtimesync:"; +const CURRENT_SCHEMA_VERSION: f32 = 0.01; #[derive(Copy, Clone)] pub(crate) enum RecordType { @@ -53,3 +59,55 @@ pub(crate) struct SyncOutgoingDetails { pub(crate) record_type: RecordType, pub(crate) updated_fields: Option>, } + +pub(crate) struct DecryptedRecord { + pub(crate) revision: u64, + pub(crate) id: String, + pub(crate) schema_version: String, + pub(crate) data: SyncData, +} + +impl DecryptedRecord { + pub(crate) fn encrypt(self, signer: Arc>) -> Result { + Record::new(self.id, self.data, self.revision, signer) + } +} + +impl Record { + pub(crate) fn new( + id: String, + data: SyncData, + revision: u64, + signer: Arc>, + ) -> Result { + let data = data.to_bytes()?; + let data = signer + .ecies_encrypt(&data) + .map_err(|err| anyhow::anyhow!("Could not encrypt sync data: {err:?}"))?; + Ok(Self { + id, + revision, + schema_version: format!("{CURRENT_SCHEMA_VERSION:.2}"), + data, + }) + } + + pub(crate) fn is_major_update(&self) -> Result { + Ok(self + .schema_version + .parse::()? + .floor() + .le(&CURRENT_SCHEMA_VERSION.floor())) + } + + pub(crate) fn decrypt(self, signer: Arc>) -> Result { + let dec_data = signer.ecies_decrypt(self.data.as_slice())?; + let data = serde_json::from_slice(&dec_data)?; + Ok(DecryptedRecord { + id: self.id, + revision: self.revision, + schema_version: self.schema_version, + data, + }) + } +}