Skip to content

Commit

Permalink
feat: add last_derivation_index to sync service
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Dec 5, 2024
1 parent 01e67ee commit 083c85b
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 10 deletions.
27 changes: 23 additions & 4 deletions lib/core/src/persist/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use anyhow::Result;
use rusqlite::{Transaction, TransactionBehavior};
use std::str::FromStr;

use crate::sync::model::{data::LAST_DERIVATION_INDEX_DATA_ID, RecordType};

use super::Persister;

const KEY_SWAPPER_PROXY_URL: &str = "swapper_proxy_url";
const KEY_IS_FIRST_SYNC_COMPLETE: &str = "is_first_sync_complete";
const KEY_WEBHOOK_URL: &str = "webhook_url";
// TODO: The `last_derivation_index` needs to be synced
const KEY_LAST_DERIVATION_INDEX: &str = "last_derivation_index";
pub(crate) const KEY_LAST_DERIVATION_INDEX: &str = "last_derivation_index";

impl Persister {
fn get_cached_item_inner(tx: &Transaction, key: &str) -> Result<Option<String>> {
Expand All @@ -20,7 +21,11 @@ impl Persister {
Ok(res.ok())
}

fn update_cached_item_inner(tx: &Transaction, key: &str, value: String) -> Result<()> {
pub(crate) fn update_cached_item_inner(
tx: &Transaction,
key: &str,
value: String,
) -> Result<()> {
tx.execute(
"INSERT OR REPLACE INTO cached_items (key, value) VALUES (?1,?2)",
(key, value),
Expand Down Expand Up @@ -92,7 +97,21 @@ impl Persister {
}

pub fn set_last_derivation_index(&self, index: u32) -> Result<()> {
self.update_cached_item(KEY_LAST_DERIVATION_INDEX, index.to_string())
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

Self::update_cached_item_inner(&tx, KEY_LAST_DERIVATION_INDEX, index.to_string())?;
self.commit_outgoing(
&tx,
LAST_DERIVATION_INDEX_DATA_ID,
RecordType::LastDerivationIndex,
// insert a mock updated field so that merging with incoming data works as expected
Some(vec![LAST_DERIVATION_INDEX_DATA_ID.to_string()]),
)?;
tx.commit()?;
self.sync_trigger.try_send(())?;

Ok(())
}

pub fn get_last_derivation_index(&self) -> Result<Option<u32>> {
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/persist/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod address;
mod backup;
mod cache;
pub(crate) mod cache;
pub(crate) mod chain;
mod migrations;
pub(crate) mod receive;
Expand Down
42 changes: 40 additions & 2 deletions lib/core/src/persist/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use rusqlite::{
named_params, Connection, OptionalExtension, Row, Statement, Transaction, TransactionBehavior,
};

use super::{PaymentState, Persister};
use super::{cache::KEY_LAST_DERIVATION_INDEX, PaymentState, Persister};
use crate::{
sync::model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData},
data::{ChainSyncData, ReceiveSyncData, SendSyncData, LAST_DERIVATION_INDEX_DATA_ID},
sync::Record,
RecordType, SyncOutgoingChanges, SyncSettings, SyncState,
},
Expand Down Expand Up @@ -702,4 +702,42 @@ impl Persister {

Ok(())
}

pub(crate) fn commit_incoming_address_index(
&self,
new_address_index: u32,
sync_state: SyncState,
last_commit_time: Option<u32>,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

if let Some(last_commit_time) = last_commit_time {
Self::check_commit_update(
&tx,
&Record::get_id_from_record_type(
RecordType::LastDerivationIndex,
LAST_DERIVATION_INDEX_DATA_ID,
),
last_commit_time,
)?;
}

Self::update_cached_item_inner(
&tx,
KEY_LAST_DERIVATION_INDEX,
new_address_index.to_string(),
)?;

Self::set_sync_state_stmt(&tx)?.execute(named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
})?;

tx.commit()?;

Ok(())
}
}
103 changes: 100 additions & 3 deletions lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use tokio::sync::{watch, Mutex};

use crate::sync::model::sync::{Record, SetRecordRequest, SetRecordStatus};
use crate::utils;
use crate::{persist::Persister, prelude::Signer};
use crate::{
persist::{cache::KEY_LAST_DERIVATION_INDEX, Persister},
prelude::Signer,
};

use self::client::SyncerClient;
use self::model::{
Expand Down Expand Up @@ -127,6 +130,9 @@ impl SyncService {
is_update,
last_commit_time,
)?,
SyncData::LastDerivationIndex(new_address_index) => self
.persister
.commit_incoming_address_index(new_address_index, sync_state, last_commit_time)?,
}
Ok(())
}
Expand Down Expand Up @@ -157,6 +163,12 @@ impl SyncService {
.into();
SyncData::Chain(chain_data)
}
RecordType::LastDerivationIndex => SyncData::LastDerivationIndex(
self.persister
.get_cached_item(KEY_LAST_DERIVATION_INDEX)?
.ok_or(anyhow!("Could not find last derivation index"))?
.parse()?,
),
};
Ok(data)
}
Expand Down Expand Up @@ -351,9 +363,9 @@ mod tests {
use std::{collections::HashMap, sync::Arc};

use crate::{
persist::Persister,
persist::{cache::KEY_LAST_DERIVATION_INDEX, Persister},
prelude::{Direction, PaymentState, Signer},
sync::model::SyncState,
sync::model::{data::LAST_DERIVATION_INDEX_DATA_ID, SyncState},
test_utils::{
chain_swap::new_chain_swap,
persist::{create_persister, new_receive_swap, new_send_swap},
Expand Down Expand Up @@ -623,4 +635,89 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_last_derivation_index_update() -> Result<()> {
create_persister!(persister);
let signer: Arc<Box<dyn Signer>> = Arc::new(Box::new(MockSigner::new()));

let (incoming_tx, outgoing_records, sync_service) =
new_sync_service(persister.clone(), signer.clone())?;

// Check pull
assert_eq!(persister.get_cached_item(KEY_LAST_DERIVATION_INDEX)?, None);

let new_last_derivation_index = 10;
let data = SyncData::LastDerivationIndex(new_last_derivation_index);
incoming_tx
.send(Record::new(data, 0, signer.clone())?)
.await?;

sync_service.pull().await?;

assert_eq!(
persister.get_cached_item(KEY_LAST_DERIVATION_INDEX)?,
Some(new_last_derivation_index.to_string())
);

// Check push
let new_last_derivation_index = 20;
persister.set_last_derivation_index(new_last_derivation_index)?;

sync_service.push().await?;

let outgoing = outgoing_records.lock().await;
let record = get_outgoing_record(
persister.clone(),
&outgoing,
LAST_DERIVATION_INDEX_DATA_ID,
RecordType::LastDerivationIndex,
)?;
let decrypted_record = record.clone().decrypt(signer.clone())?;
match decrypted_record.data {
SyncData::LastDerivationIndex(last_derivation_index) => {
assert_eq!(last_derivation_index, new_last_derivation_index);
}
_ => {
return Err(anyhow::anyhow!("Unexpected sync data type received."));
}
}

// Check pull with merge
let new_local_last_derivation_index = 30;
persister.set_last_derivation_index(new_local_last_derivation_index)?;

let new_remote_last_derivation_index = 25;
let data = SyncData::LastDerivationIndex(new_remote_last_derivation_index);
incoming_tx
.send(Record::new(data, 0, signer.clone())?)
.await?;

sync_service.pull().await?;

// Newer one is persisted (local > remote)
assert_eq!(
persister.get_cached_item(KEY_LAST_DERIVATION_INDEX)?,
Some(new_local_last_derivation_index.to_string())
);

let new_local_last_derivation_index = 35;
persister.set_last_derivation_index(new_local_last_derivation_index)?;

let new_remote_last_derivation_index = 40;
let data = SyncData::LastDerivationIndex(new_remote_last_derivation_index);
incoming_tx
.send(Record::new(data, 2, signer.clone())?)
.await?;

sync_service.pull().await?;

// Newer one is persisted (remote > local)
assert_eq!(
persister.get_cached_item(KEY_LAST_DERIVATION_INDEX)?,
Some(new_remote_last_derivation_index.to_string())
);

Ok(())
}
}
10 changes: 10 additions & 0 deletions lib/core/src/sync/model/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};

use crate::prelude::{ChainSwap, Direction, ReceiveSwap, SendSwap};

pub(crate) const LAST_DERIVATION_INDEX_DATA_ID: &str = "last-derivation-index";

#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ChainSyncData {
pub(crate) swap_id: String,
Expand Down Expand Up @@ -135,6 +137,7 @@ pub(crate) enum SyncData {
Chain(ChainSyncData),
Send(SendSyncData),
Receive(ReceiveSyncData),
LastDerivationIndex(u32),
}

impl SyncData {
Expand All @@ -143,6 +146,7 @@ impl SyncData {
SyncData::Chain(chain_data) => &chain_data.swap_id,
SyncData::Send(send_data) => &send_data.swap_id,
SyncData::Receive(receive_data) => &receive_data.swap_id,
SyncData::LastDerivationIndex(_) => LAST_DERIVATION_INDEX_DATA_ID,
}
}

Expand All @@ -161,6 +165,12 @@ impl SyncData {
(SyncData::Receive(ref mut _base), SyncData::Receive(_other)) => {
log::warn!("Attempting to merge for unnecessary type SyncData::Receive");
}
(
SyncData::LastDerivationIndex(our_index),
SyncData::LastDerivationIndex(their_index),
) => {
*our_index = std::cmp::max(*their_index, *our_index);
}
_ => return Err(anyhow::anyhow!("Cannot merge data from two separate types")),
};
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions lib/core/src/sync/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub(crate) enum RecordType {
Receive = 0,
Send = 1,
Chain = 2,
LastDerivationIndex = 3,
}

impl ToSql for RecordType {
Expand All @@ -41,6 +42,7 @@ impl FromSql for RecordType {
0 => Ok(Self::Receive),
1 => Ok(Self::Send),
2 => Ok(Self::Chain),
3 => Ok(Self::LastDerivationIndex),
_ => Err(FromSqlError::OutOfRange(i)),
},
_ => Err(FromSqlError::InvalidType),
Expand Down Expand Up @@ -105,6 +107,7 @@ impl Record {
SyncData::Chain(_) => "chain-swap",
SyncData::Send(_) => "send-swap",
SyncData::Receive(_) => "receive-swap",
SyncData::LastDerivationIndex(_) => "derivation-index",
}
.to_string();
Self::id(prefix, data.id())
Expand All @@ -115,6 +118,7 @@ impl Record {
RecordType::Chain => "chain-swap",
RecordType::Send => "send-swap",
RecordType::Receive => "receive-swap",
RecordType::LastDerivationIndex => "derivation-index",
}
.to_string();
Self::id(prefix, data_id)
Expand Down

0 comments on commit 083c85b

Please sign in to comment.