Skip to content

Commit

Permalink
feat: add pull method
Browse files Browse the repository at this point in the history
Uses the `pull_changes` method to grab the remote records, merges them
with outgoing sync data (if necessary) and persists them
  • Loading branch information
hydra-yse committed Nov 13, 2024
1 parent 730d715 commit ee0a2ba
Show file tree
Hide file tree
Showing 4 changed files with 463 additions and 18 deletions.
202 changes: 184 additions & 18 deletions lib/core/src/persist/sync.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::collections::HashMap;

use anyhow::Result;
use rusqlite::{named_params, Row, TransactionBehavior};
use rusqlite::{named_params, Connection, Row, Statement, TransactionBehavior};

use super::Persister;
use crate::sync::model::{sync::Record, SyncOutgoingDetails, SyncSettings, SyncState};
use crate::sync::model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData},
sync::Record,
RecordType, SyncOutgoingDetails, SyncSettings, SyncState,
};

impl Persister {
fn select_sync_state_query(where_clauses: Vec<String>) -> String {
Expand Down Expand Up @@ -53,21 +57,24 @@ impl Persister {
Ok(sync_state)
}

pub(crate) fn set_sync_state(&self, sync_state: SyncState) -> Result<()> {
let con = self.get_connection()?;

con.execute(
fn set_sync_state_stmt(con: &Connection) -> Result<Statement, rusqlite::Error> {
con.prepare(
"
INSERT OR REPLACE INTO sync_state(data_id, record_id, record_revision, is_local)
VALUES (:data_id, :record_id, :record_revision, :is_local)
",
named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
},
)?;
)
}

pub(crate) fn set_sync_state(&self, sync_state: SyncState) -> Result<()> {
let con = self.get_connection()?;

Self::set_sync_state_stmt(&con)?.execute(named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
})?;

Ok(())
}
Expand Down Expand Up @@ -209,14 +216,13 @@ impl Persister {
None => None,
};

return Ok(match row.get(0)? {
Some(record_type) => Some(SyncOutgoingDetails {
return Ok(row.get::<_, Option<RecordType>>(0)?.map(|record_type| {
SyncOutgoingDetails {
record_id: record_id.to_string(),
record_type,
updated_fields,
}),
None => None,
});
}
}));
}

Ok(None)
Expand Down Expand Up @@ -257,4 +263,164 @@ impl Persister {

Ok(())
}

pub(crate) fn commit_receive_swap(
&self,
data: &ReceiveSyncData,
sync_state: SyncState,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

tx.execute(
"
UPDATE receive_swaps
SET
invoice = :invoice,
preimage = :preimage,
create_response_json = :create_response_json,
claim_fees_sat = :claim_fees_sat,
claim_private_key = :claim_private_key,
payer_amount_sat = :payer_amount_sat,
receiver_amount_sat = :receiver_amount_sat,
mrh_address = :mrh_address,
mrh_script_pubkey = :mrh_script_pubkey,
created_at = :created_at,
payment_hash = :payment_hash,
description = :description
WHERE id = :id",
named_params! {
":id": &data.swap_id,
":invoice": &data.invoice,
":preimage": &data.preimage,
":create_response_json": &data.create_response_json,
":claim_fees_sat": &data.claim_fees_sat,
":claim_private_key": &data.claim_private_key,
":payer_amount_sat": &data.payer_amount_sat,
":receiver_amount_sat": &data.receiver_amount_sat,
":mrh_address": &data.mrh_address,
":mrh_script_pubkey": &data.mrh_script_pubkey,
":created_at": &data.created_at,
":payment_hash": &data.payment_hash,
":description": &data.description,
},
)?;

Self::set_sync_state_stmt(&tx)?.execute(named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
})?;

tx.commit()?;

Ok(())
}

pub(crate) fn commit_send_swap(
&self,
data: &SendSyncData,
sync_state: SyncState,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

tx.execute(
"
UPDATE send_swaps
SET
invoice = :invoice,
create_response_json = :create_response_json,
refund_private_key = :refund_private_key,
payer_amount_sat = :payer_amount_sat,
receiver_amount_sat = :receiver_amount_sat,
created_at = :created_at,
preimage = :preimage,
payment_hash = :payment_hash,
description = :description
WHERE id = :id",
named_params! {
":id": &data.swap_id,
":invoice": &data.invoice,
":create_response_json": &data.create_response_json,
":refund_private_key": &data.refund_private_key,
":payer_amount_sat": &data.payer_amount_sat,
":receiver_amount_sat": &data.receiver_amount_sat,
":created_at": &data.created_at,
":preimage": &data.preimage,
":payment_hash": &data.payment_hash,
":description": &data.description,
},
)?;

Self::set_sync_state_stmt(&tx)?.execute(named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
})?;

tx.commit()?;

Ok(())
}

pub(crate) fn commit_chain_swap(
&self,
data: &ChainSyncData,
sync_state: SyncState,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

tx.execute(
"
UPDATE send_swaps
SET
preimage = :preimage,
create_response_json = :create_response_json,
direction = :direction,
lockup_address = :lockup_address,
claim_fees_sat = :claim_fees_sat,
claim_private_key = :claim_private_key,
refund_private_key = :refund_private_key,
timeout_block_height = :timeout_block_height,
payer_amount_sat = :payer_amount_sat,
receiver_amount_sat = :receiver_amount_sat,
accept_zero_conf = :accept_zero_conf,
created_at = :created_at,
description = :description,
claim_address = :claim_address
WHERE id = :id",
named_params! {
":id": &data.swap_id,
":preimage": &data.preimage,
":create_response_json": &data.create_response_json,
":direction": &data.direction,
":lockup_address": &data.lockup_address,
":claim_fees_sat": &data.claim_fees_sat,
":claim_private_key": &data.claim_private_key,
":refund_private_key": &data.refund_private_key,
":timeout_block_height": &data.timeout_block_height,
":payer_amount_sat": &data.payer_amount_sat,
":receiver_amount_sat": &data.receiver_amount_sat,
":accept_zero_conf": &data.accept_zero_conf,
":created_at": &data.created_at,
":description": &data.description,
":claim_address": &data.claim_address,
},
)?;

Self::set_sync_state_stmt(&tx)?.execute(named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
})?;

tx.commit()?;

Ok(())
}
}
Loading

0 comments on commit ee0a2ba

Please sign in to comment.