Skip to content

Commit

Permalink
feat: add pull method
Browse files Browse the repository at this point in the history
Uses the `pull_changes` method to grab the remote records, merges them
with outgoing sync data (if necessary) and persists them
  • Loading branch information
hydra-yse committed Nov 12, 2024
1 parent 547cf5e commit 9a43ba8
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 4 deletions.
2 changes: 1 addition & 1 deletion lib/core/src/persist/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Persister {
// so we split up the insert into two statements.
let mut stmt = con.prepare(
"
INSERT INTO chain_swaps (
INSERT OR REPLACE INTO chain_swaps (
id,
id_hash,
direction,
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/persist/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl Persister {

let mut stmt = con.prepare(
"
INSERT INTO receive_swaps (
INSERT OR REPLACE INTO receive_swaps (
id,
id_hash,
preimage,
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/persist/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl Persister {

let mut stmt = con.prepare(
"
INSERT INTO send_swaps (
INSERT OR REPLACE INTO send_swaps (
id,
id_hash,
invoice,
Expand Down
117 changes: 117 additions & 0 deletions lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::{anyhow, Result};

use crate::{persist::Persister, prelude::Signer};

use self::client::SyncerClient;
use self::model::DecryptedRecord;
use self::model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData, SyncData},
sync::ListChangesRequest,
RecordType, SyncState,
};

pub(crate) mod client;
pub(crate) mod model;

Expand All @@ -30,4 +38,113 @@ impl SyncService {
}
}

fn apply_record(&self, decrypted_record: &DecryptedRecord) -> Result<()> {
match decrypted_record.data.clone() {
SyncData::Chain(chain_data) => self.persister.insert_chain_swap(&chain_data.into())?,
SyncData::Send(send_data) => self.persister.insert_send_swap(&send_data.into())?,
SyncData::Receive(receive_data) => {
self.persister.insert_receive_swap(&receive_data.into())?
}
}
Ok(())
}

fn fetch_sync_data(&self, data_id: &str, record_type: RecordType) -> Result<SyncData> {
let data = match record_type {
RecordType::Receive => {
let receive_data: ReceiveSyncData = self
.persister
.fetch_receive_swap_by_id(data_id)?
.ok_or(anyhow!("Could not find Receive swap {data_id}"))?
.into();
SyncData::Receive(receive_data)
}
RecordType::Send => {
let send_data: SendSyncData = self
.persister
.fetch_send_swap_by_id(data_id)?
.ok_or(anyhow!("Could not find Send swap {data_id}"))?
.into();
SyncData::Send(send_data)
}
RecordType::Chain => {
let chain_data: ChainSyncData = self
.persister
.fetch_chain_swap_by_id(data_id)?
.ok_or(anyhow!("Could not find Chain swap {data_id}"))?
.into();
SyncData::Chain(chain_data)
}
};
Ok(data)
}

async fn pull(&self) -> Result<()> {
let latest_revision = self
.persister
.get_sync_settings()?
.latest_revision
.unwrap_or(0);

let req = ListChangesRequest::new(latest_revision, self.signer.clone())?;
let incoming_records = self.client.pull(req).await?.changes;
let latest_revision = incoming_records.last().map(|record| record.revision);

for new_record in incoming_records {
// Step 1: Check whether or not record is updatable (from its schema_version)
if new_record.is_major_update()? {
self.persister.set_incoming_record(&new_record)?;
continue;
}

// Step 2: Check whether we already have this record, and if the revision is newer
let maybe_sync_state = self.persister.get_sync_state_by_record_id(&new_record.id)?;
if let Some(sync_state) = &maybe_sync_state {
if sync_state.record_revision > new_record.revision {
continue;
}
}

// Step 3: Decrypt the incoming record
let mut decrypted_record = new_record.decrypt(self.signer.clone())?;

// Step 4: Merge with outgoing records, if present
let maybe_outgoing_details = self
.persister
.get_sync_outgoing_details(&decrypted_record.id)?;

if let Some(outgoing_details) = maybe_outgoing_details {
if let Some(updated_fields) = outgoing_details.updated_fields {
let local_data = self.fetch_sync_data(
&decrypted_record.data.id(),
outgoing_details.record_type,
)?;
decrypted_record.data.merge(&local_data, updated_fields)?;
}
}

// Step 5: Apply the changes
self.apply_record(&decrypted_record)?;

// Step 6: Update sync state
self.persister.set_sync_state(SyncState {
data_id: decrypted_record.data.id().to_string(),
record_id: decrypted_record.id,
record_revision: decrypted_record.revision,
is_local: maybe_sync_state
.map(|state| state.is_local)
.unwrap_or(false),
})?;
}

// Step 7: Update local tip
if let Some(latest_revision) = latest_revision {
self.persister.set_sync_settings(HashMap::from([
("remote_url", self.remote_url.clone()),
("latest_revision", latest_revision.to_string()),
]))?;
}

Ok(())
}
}
137 changes: 136 additions & 1 deletion lib/core/src/sync/model/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};

use crate::prelude::{ChainSwap, Direction, ReceiveSwap, SendSwap};
use crate::prelude::{ChainSwap, Direction, PaymentState, ReceiveSwap, SendSwap};

#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ChainSyncData {
Expand All @@ -21,6 +21,46 @@ pub(crate) struct ChainSyncData {
pub(crate) claim_address: Option<String>,
}

impl ChainSyncData {
pub(crate) fn merge(&mut self, other: &Self, updated_fields: Vec<String>) {
for field in updated_fields {
match field.as_str() {
"description" => clone_if_changed(&mut self.description, &other.description),
"claim_address" => clone_if_changed(&mut self.claim_address, &other.claim_address),
"accept_zero_conf" => self.accept_zero_conf = other.accept_zero_conf,
_ => continue,
}
}
}
}

impl Into<ChainSwap> for ChainSyncData {
fn into(self) -> ChainSwap {
ChainSwap {
id: self.swap_id,
preimage: self.preimage,
direction: self.direction,
create_response_json: self.create_response_json,
claim_private_key: self.claim_private_key,
refund_private_key: self.refund_private_key,
payer_amount_sat: self.payer_amount_sat,
receiver_amount_sat: self.receiver_amount_sat,
claim_fees_sat: self.claim_fees_sat,
claim_address: self.claim_address,
lockup_address: self.lockup_address,
timeout_block_height: self.timeout_block_height,
accept_zero_conf: self.accept_zero_conf,
description: self.description,
server_lockup_tx_id: None,
user_lockup_tx_id: None,
claim_tx_id: None,
refund_tx_id: None,
created_at: self.created_at,
state: PaymentState::Created,
}
}
}

impl From<ChainSwap> for ChainSyncData {
fn from(value: ChainSwap) -> Self {
Self {
Expand Down Expand Up @@ -57,6 +97,39 @@ pub(crate) struct SendSyncData {
pub(crate) description: Option<String>,
}

impl SendSyncData {
pub(crate) fn merge(&mut self, other: &Self, updated_fields: Vec<String>) {
for field in updated_fields {
match field.as_str() {
"preimage" => clone_if_changed(&mut self.preimage, &other.preimage),
"payment_hash" => clone_if_changed(&mut self.payment_hash, &other.payment_hash),
"description" => clone_if_changed(&mut self.description, &other.description),
_ => continue,
}
}
}
}

impl Into<SendSwap> for SendSyncData {
fn into(self) -> SendSwap {
SendSwap {
id: self.swap_id,
preimage: self.preimage,
create_response_json: self.create_response_json,
invoice: self.invoice,
payment_hash: self.payment_hash,
description: self.description,
payer_amount_sat: self.payer_amount_sat,
receiver_amount_sat: self.receiver_amount_sat,
refund_private_key: self.refund_private_key,
lockup_tx_id: None,
refund_tx_id: None,
created_at: self.created_at,
state: PaymentState::Created,
}
}
}

impl From<SendSwap> for SendSyncData {
fn from(value: SendSwap) -> Self {
Self {
Expand Down Expand Up @@ -91,6 +164,42 @@ pub(crate) struct ReceiveSyncData {
pub(crate) description: Option<String>,
}

impl ReceiveSyncData {
pub(crate) fn merge(&mut self, other: &Self, updated_fields: Vec<String>) {
for field in updated_fields {
match field.as_str() {
"payment_hash" => clone_if_changed(&mut self.payment_hash, &other.payment_hash),
"description" => clone_if_changed(&mut self.description, &other.description),
_ => continue,
}
}
}
}

impl Into<ReceiveSwap> for ReceiveSyncData {
fn into(self) -> ReceiveSwap {
ReceiveSwap {
id: self.swap_id,
preimage: self.preimage,
create_response_json: self.create_response_json,
claim_private_key: self.claim_private_key,
invoice: self.invoice,
payment_hash: self.payment_hash,
description: self.description,
payer_amount_sat: self.payer_amount_sat,
receiver_amount_sat: self.receiver_amount_sat,
claim_fees_sat: self.claim_fees_sat,
mrh_address: self.mrh_address,
mrh_script_pubkey: self.mrh_script_pubkey,
mrh_tx_id: None,
claim_tx_id: None,
lockup_tx_id: None,
created_at: self.created_at,
state: PaymentState::Created,
}
}
}

impl From<ReceiveSwap> for ReceiveSyncData {
fn from(value: ReceiveSwap) -> Self {
Self {
Expand Down Expand Up @@ -131,4 +240,30 @@ impl SyncData {
pub(crate) fn to_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}

pub(crate) fn merge(
&mut self,
other: &Self,
updated_fields: Vec<String>,
) -> anyhow::Result<()> {
match (self, other) {
(SyncData::Chain(ref mut base), SyncData::Chain(other)) => {
base.merge(other, updated_fields)
}
(SyncData::Send(ref mut base), SyncData::Send(other)) => {
base.merge(other, updated_fields)
}
(SyncData::Receive(ref mut base), SyncData::Receive(other)) => {
base.merge(other, updated_fields)
}
_ => return Err(anyhow::anyhow!("Cannot merge data from two separate types")),
};
Ok(())
}
}

fn clone_if_changed<T: Clone>(s: &mut Option<T>, other: &Option<T>) {
if other.is_some() {
s.clone_from(other)
}
}
Loading

0 comments on commit 9a43ba8

Please sign in to comment.