diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index ab6e9dbc9..8873597f8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -53,6 +53,41 @@ jobs: cd ../cli cargo clippy -- -D warnings + build: + name: Cargo Build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: Swatinem/rust-cache@v2 + with: + workspaces: | + lib -> target + cache-on-failure: "true" + + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + version: "27.2" + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - run: cargo build + working-directory: lib + + - name: Check git status + env: + GIT_PAGER: cat + run: | + status=$(git status --porcelain) + if [[ -n "$status" ]]; then + echo "Git status has changes" + echo "$status" + git diff + exit 1 + else + echo "No changes in git status" + fi + tests: name: Test sdk-core runs-on: ubuntu-latest @@ -341,4 +376,4 @@ jobs: rm Cargo.lock cargo update --package secp256k1-zkp - cargo clippy -- -D warnings \ No newline at end of file + cargo clippy -- -D warnings diff --git a/lib/Cargo.lock b/lib/Cargo.lock index 28dfd6c8a..fdd5f15ef 100644 --- a/lib/Cargo.lock +++ b/lib/Cargo.lock @@ -376,7 +376,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", @@ -392,7 +392,34 @@ dependencies = [ "rustversion", "serde", "sync_wrapper 0.1.2", - "tower", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.1", + "tower 0.5.1", "tower-layer", "tower-service", ] @@ -414,6 +441,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -693,6 +740,7 @@ dependencies = [ "lwk_wollet", "openssl", "paste", + "prost 0.13.3", "reqwest 0.11.20", "rusqlite", "rusqlite_migration", @@ -709,6 +757,8 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tungstenite", + "tonic 0.12.3", + "tonic-build 0.12.3", "url", "uuid", "x509-parser", @@ -1750,6 +1800,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -1787,6 +1838,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1815,7 +1879,7 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", + "tower 0.4.13", "tower-service", "tracing", ] @@ -2559,6 +2623,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prettyplease" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba" +dependencies = [ + "proc-macro2", + "syn 2.0.77", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2599,7 +2673,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.9", +] + +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes", + "prost-derive 0.13.3", ] [[package]] @@ -2615,15 +2699,36 @@ dependencies = [ "log", "multimap", "petgraph", - "prettyplease", - "prost", - "prost-types", + "prettyplease 0.1.25", + "prost 0.11.9", + "prost-types 0.11.9", "regex", "syn 1.0.109", "tempfile", "which", ] +[[package]] +name = "prost-build" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease 0.2.22", + "prost 0.13.3", + "prost-types 0.13.3", + "regex", + "syn 2.0.77", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.11.9" @@ -2637,13 +2742,35 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "prost-types" version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" dependencies = [ - "prost", + "prost 0.11.9", +] + +[[package]] +name = "prost-types" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" +dependencies = [ + "prost 0.13.3", ] [[package]] @@ -3054,6 +3181,7 @@ version = "0.23.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" dependencies = [ + "log", "once_cell", "ring 0.17.8", "rustls-pki-types", @@ -3193,7 +3321,7 @@ dependencies = [ "lightning 0.0.118", "lightning-invoice 0.26.0", "log", - "prost", + "prost 0.11.9", "querystring", "regex", "reqwest 0.11.20", @@ -3202,8 +3330,8 @@ dependencies = [ "strum_macros", "thiserror", "tokio", - "tonic", - "tonic-build", + "tonic 0.8.3", + "tonic-build 0.8.4", "url", "urlencoding", ] @@ -3854,7 +3982,7 @@ checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.13.1", "bytes", "futures-core", @@ -3863,18 +3991,18 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project", - "prost", - "prost-derive", + "prost 0.11.9", + "prost-derive 0.11.9", "rustls-native-certs", "rustls-pemfile 1.0.4", "tokio", "tokio-rustls 0.23.4", "tokio-stream", "tokio-util", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -3882,19 +4010,65 @@ dependencies = [ "webpki-roots 0.22.6", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.7", + "base64 0.22.1", + "bytes", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-timeout 0.5.1", + "hyper-util", + "percent-encoding", + "pin-project", + "prost 0.13.3", + "rustls-pemfile 2.1.3", + "socket2", + "tokio", + "tokio-rustls 0.26.0", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-build" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4" dependencies = [ - "prettyplease", + "prettyplease 0.1.25", "proc-macro2", - "prost-build", + "prost-build 0.11.9", "quote", "syn 1.0.109", ] +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease 0.2.22", + "proc-macro2", + "prost-build 0.13.3", + "prost-types 0.13.3", + "quote", + "syn 2.0.77", +] + [[package]] name = "tower" version = "0.4.13" @@ -3915,6 +4089,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" diff --git a/lib/core/Cargo.toml b/lib/core/Cargo.toml index 9a9fb5add..13a8cebde 100644 --- a/lib/core/Cargo.toml +++ b/lib/core/Cargo.toml @@ -49,6 +49,9 @@ electrum-client = { version = "0.19.0" } zbase32 = "0.1.2" x509-parser = { version = "0.16.0" } tempfile = "3" +tonic = { version = "0.12.3", features = ["tls"] } +prost = "0.13.3" +uuid = { version = "1.8.0", features = ["v4"] } [dev-dependencies] lazy_static = "1.5.0" @@ -59,6 +62,7 @@ uuid = { version = "1.8.0", features = ["v4"] } [build-dependencies] anyhow = { version = "1.0.79", features = ["backtrace"] } glob = "0.3.1" +tonic-build = "0.12.3" # Pin these versions to fix iOS build issues [target.'cfg(target_os = "ios")'.build-dependencies] diff --git a/lib/core/build.rs b/lib/core/build.rs index a5be78907..37bc80270 100644 --- a/lib/core/build.rs +++ b/lib/core/build.rs @@ -32,7 +32,16 @@ fn setup_x86_64_android_workaround() { } } +fn compile_protos() -> Result<()> { + tonic_build::configure() + .build_server(false) + .out_dir("./src/sync/model") + .compile_protos(&["src/sync/proto/sync.proto"], &["src/sync/proto"])?; + Ok(()) +} + fn main() -> Result<()> { setup_x86_64_android_workaround(); + compile_protos()?; Ok(()) } diff --git a/lib/core/src/lib.rs b/lib/core/src/lib.rs index 5190ad22b..d55036f8f 100644 --- a/lib/core/src/lib.rs +++ b/lib/core/src/lib.rs @@ -182,6 +182,7 @@ pub mod sdk; pub(crate) mod send_swap; pub(crate) mod signer; pub(crate) mod swapper; +pub(crate) mod sync; pub(crate) mod test_utils; pub(crate) mod utils; pub(crate) mod wallet; diff --git a/lib/core/src/persist/migrations.rs b/lib/core/src/persist/migrations.rs index 279d987f4..072d4b25e 100644 --- a/lib/core/src/persist/migrations.rs +++ b/lib/core/src/persist/migrations.rs @@ -183,5 +183,28 @@ pub(crate) fn current_migrations() -> Vec<&'static str> { DROP TABLE old_chain_swaps; ", + "CREATE TABLE IF NOT EXISTS sync_state( + data_id TEXT NOT NULL PRIMARY KEY, + record_id TEXT NOT NULL, + record_revision INTEGER NOT NULL, + is_local INTEGER NOT NULL DEFAULT 1 + ) STRICT;", + "CREATE TABLE IF NOT EXISTS sync_settings( + key TEXT NOT NULL PRIMARY KEY, + value TEXT NOT NULL + ) STRICT;", + "CREATE TABLE IF NOT EXISTS sync_outgoing( + record_id TEXT NOT NULL PRIMARY KEY, + data_id TEXT NOT NULL UNIQUE, + record_type INTEGER NOT NULL, + commit_time INTEGER NOT NULL, + updated_fields_json TEXT + ) STRICT;", + "CREATE TABLE IF NOT EXISTS sync_incoming( + record_id TEXT NOT NULL PRIMARY KEY, + revision INTEGER NOT NULL UNIQUE, + schema_version TEXT NOT NULL, + data BLOB NOT NULL + ) STRICT;", ] } diff --git a/lib/core/src/persist/mod.rs b/lib/core/src/persist/mod.rs index 5f3967b37..297fefd99 100644 --- a/lib/core/src/persist/mod.rs +++ b/lib/core/src/persist/mod.rs @@ -5,6 +5,7 @@ pub(crate) mod chain; mod migrations; pub(crate) mod receive; pub(crate) mod send; +pub(crate) mod sync; use std::collections::HashSet; use std::{fs::create_dir_all, path::PathBuf, str::FromStr}; diff --git a/lib/core/src/persist/sync.rs b/lib/core/src/persist/sync.rs new file mode 100644 index 000000000..956a9c162 --- /dev/null +++ b/lib/core/src/persist/sync.rs @@ -0,0 +1,651 @@ +use std::collections::HashMap; + +use anyhow::Result; +use rusqlite::{named_params, Connection, OptionalExtension, Row, Statement, TransactionBehavior}; + +use super::Persister; +use crate::{ + sync::model::{sync::Record, RecordType, SyncOutgoingChanges, SyncSettings, SyncState}, + utils, +}; + +impl Persister { + fn select_sync_state_query(where_clauses: Vec) -> String { + let mut where_clause_str = String::new(); + if !where_clauses.is_empty() { + where_clause_str = String::from("WHERE "); + where_clause_str.push_str(where_clauses.join(" AND ").as_str()); + } + + format!( + " + SELECT + data_id, + record_id, + record_revision, + is_local + FROM sync_state + {where_clause_str} + " + ) + } + + fn sql_row_to_sync_state(row: &Row) -> rusqlite::Result { + Ok(SyncState { + data_id: row.get(0)?, + record_id: row.get(1)?, + record_revision: row.get(2)?, + is_local: row.get(3)?, + }) + } + + pub(crate) fn get_sync_state_by_record_id(&self, record_id: &str) -> Result> { + let con = self.get_connection()?; + let query = Self::select_sync_state_query(vec!["record_id = ?1".to_string()]); + let sync_state = con + .query_row(&query, [record_id], Self::sql_row_to_sync_state) + .optional()?; + Ok(sync_state) + } + + pub(crate) fn get_sync_state_by_data_id(&self, data_id: &str) -> Result> { + let con = self.get_connection()?; + let query = Self::select_sync_state_query(vec!["data_id = ?1".to_string()]); + let sync_state = con + .query_row(&query, [data_id], Self::sql_row_to_sync_state) + .optional()?; + Ok(sync_state) + } + + fn set_sync_state_stmt(con: &Connection) -> rusqlite::Result { + con.prepare( + " + INSERT OR REPLACE INTO sync_state(data_id, record_id, record_revision, is_local) + VALUES (:data_id, :record_id, :record_revision, :is_local) + ", + ) + } + + pub(crate) fn set_sync_state(&self, sync_state: SyncState) -> Result<()> { + let con = self.get_connection()?; + + Self::set_sync_state_stmt(&con)?.execute(named_params! { + ":data_id": &sync_state.data_id, + ":record_id": &sync_state.record_id, + ":record_revision": &sync_state.record_revision, + ":is_local": &sync_state.is_local, + })?; + + Ok(()) + } + + pub(crate) fn get_sync_settings(&self) -> Result { + let con = self.get_connection()?; + + let settings: HashMap = con + .prepare("SELECT key, value FROM sync_settings")? + .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))? + .map(|e| e.unwrap()) + .collect(); + + let latest_revision = match settings.get("latest_revision") { + Some(revision) => Some(revision.parse()?), + None => None, + }; + + let sync_settings = SyncSettings { + remote_url: settings.get("remote_url").cloned(), + latest_revision, + }; + + Ok(sync_settings) + } + + pub(crate) fn set_sync_settings(&self, map: HashMap<&'static str, String>) -> Result<()> { + let mut con = self.get_connection()?; + let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + + for (key, value) in map { + tx.execute( + "INSERT OR REPLACE INTO sync_settings(key, value) VALUES(:key, :value)", + named_params! { + ":key": key, + ":value": value, + }, + )?; + } + + tx.commit()?; + + Ok(()) + } + + pub(crate) fn get_incoming_records(&self) -> Result> { + let con = self.get_connection()?; + + let mut stmt = con.prepare( + " + SELECT + record_id, + revision, + schema_version, + data + FROM sync_incoming + ", + )?; + let records = stmt + .query_map([], |row| { + Ok(Record { + id: row.get(0)?, + revision: row.get(1)?, + schema_version: row.get(2)?, + data: row.get(3)?, + }) + })? + .map(|i| i.unwrap()) + .collect(); + + Ok(records) + } + + pub(crate) fn set_incoming_records(&self, records: &[Record]) -> Result<()> { + let mut con = self.get_connection()?; + let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + + for record in records { + tx.execute( + " + INSERT OR REPLACE INTO sync_incoming(record_id, revision, schema_version, data) + VALUES(:record_id, :revision, :schema_version, :data) + ", + named_params! { + ":record_id": record.id, + ":revision": record.revision, + ":schema_version": record.schema_version, + ":data": record.data, + }, + )?; + } + + tx.commit()?; + + Ok(()) + } + + pub(crate) fn remove_incoming_records(&self, record_ids: Vec) -> Result<()> { + let mut con = self.get_connection()?; + let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + + for record_id in record_ids { + tx.execute( + "DELETE FROM sync_incoming WHERE record_id = :record_id", + named_params! { + ":record_id": record_id + }, + )?; + } + + tx.commit()?; + + Ok(()) + } + + pub(crate) fn commit_outgoing( + con: &Connection, + data_id: &str, + record_type: RecordType, + updated_fields: Option>, + ) -> Result<()> { + let record_id = Record::get_id_from_record_type(record_type, data_id); + let updated_fields = updated_fields + .map(|fields| { + let fields = fields + .iter() + .map(|field| format!("'$[#]', '{field}'")) + .collect::>() + .join(","); + format!("json_insert( + COALESCE((SELECT updated_fields_json FROM sync_outgoing WHERE record_id = :record_id), '[]'), + {fields} + )") + }) + .unwrap_or("NULL".to_string()); + con.execute(&format!(" + INSERT OR REPLACE INTO sync_outgoing(record_id, data_id, record_type, commit_time, updated_fields_json) + VALUES( + :record_id, + :data_id, + :record_type, + :commit_time, + {updated_fields} + ) + "), + named_params! { + ":record_id": record_id, + ":data_id": data_id, + ":record_type": record_type, + ":commit_time": utils::now(), + }, + )?; + + Ok(()) + } + + fn select_sync_outgoing_changes_query(where_clauses: Vec) -> String { + let mut where_clause_str = String::new(); + if !where_clauses.is_empty() { + where_clause_str = String::from("WHERE "); + where_clause_str.push_str(where_clauses.join(" AND ").as_str()); + } + + format!( + " + SELECT + record_id, + data_id, + record_type, + commit_time, + updated_fields_json + FROM sync_outgoing + {where_clause_str} + " + ) + } + + fn sql_row_to_sync_outgoing_changes(row: &Row) -> Result { + let record_id = row.get(0)?; + let data_id = row.get(1)?; + let record_type = row.get(2)?; + let commit_time = row.get(3)?; + let updated_fields = match row.get::<_, Option>(4)? { + Some(fields) => Some(serde_json::from_str(&fields)?), + None => None, + }; + + Ok(SyncOutgoingChanges { + record_id, + data_id, + record_type, + commit_time, + updated_fields, + }) + } + + pub(crate) fn get_sync_outgoing_changes(&self) -> Result> { + let con = self.get_connection()?; + + let query = Self::select_sync_outgoing_changes_query(vec![]); + let mut stmt = con.prepare(&query)?; + let mut rows = stmt.query([])?; + + let mut outgoing_changes = vec![]; + while let Some(row) = rows.next()? { + let detail = Self::sql_row_to_sync_outgoing_changes(row)?; + outgoing_changes.push(detail); + } + + Ok(outgoing_changes) + } + + pub(crate) fn get_sync_outgoing_changes_by_id( + &self, + record_id: &str, + ) -> Result> { + let con = self.get_connection()?; + let query = + Self::select_sync_outgoing_changes_query(vec!["record_id = :record_id".to_string()]); + let mut stmt = con.prepare(&query)?; + let mut rows = stmt.query(named_params! { + ":record_id": record_id, + })?; + + if let Some(row) = rows.next()? { + return Ok(Some(Self::sql_row_to_sync_outgoing_changes(row)?)); + } + + Ok(None) + } + + pub(crate) fn remove_sync_outgoing_changes(&self, record_ids: Vec) -> Result<()> { + let mut con = self.get_connection()?; + let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + + for record_id in record_ids { + tx.execute( + "DELETE FROM sync_outgoing WHERE record_id = :record_id", + named_params! { + ":record_id": record_id + }, + )?; + } + + tx.commit()?; + + Ok(()) + } + + fn check_commit_update(con: &Connection, record_id: &str, last_commit_time: u32) -> Result<()> { + let query = + Self::select_sync_outgoing_changes_query(vec!["record_id = :record_id".to_string()]); + let mut stmt = con.prepare(&query)?; + let mut rows = stmt.query(named_params! { + ":record_id": record_id, + })?; + + if let Some(row) = rows.next()? { + let sync_outgoing_changes = Self::sql_row_to_sync_outgoing_changes(row)?; + + if sync_outgoing_changes.commit_time > last_commit_time { + return Err(anyhow::anyhow!("Record has been updated while pulling")); + } + } + + Ok(()) + } + + pub(crate) fn commit_incoming_receive_swap( + &self, + data: &ReceiveSyncData, + sync_state: SyncState, + is_update: bool, + last_commit_time: Option, + ) -> Result<()> { + let mut con = self.get_connection()?; + let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + + if let Some(last_commit_time) = last_commit_time { + Self::check_commit_update(&tx, &sync_state.record_id, last_commit_time)?; + } + + let params = named_params! { + ":id": &data.swap_id, + ":invoice": &data.invoice, + ":preimage": &data.preimage, + ":create_response_json": &data.create_response_json, + ":claim_fees_sat": &data.claim_fees_sat, + ":claim_private_key": &data.claim_private_key, + ":payer_amount_sat": &data.payer_amount_sat, + ":receiver_amount_sat": &data.receiver_amount_sat, + ":mrh_address": &data.mrh_address, + ":created_at": &data.created_at, + ":payment_hash": &data.payment_hash, + ":description": &data.description, + }; + match is_update { + true => { + tx.execute( + " + UPDATE receive_swaps + SET + invoice = :invoice, + preimage = :preimage, + create_response_json = :create_response_json, + claim_fees_sat = :claim_fees_sat, + claim_private_key = :claim_private_key, + payer_amount_sat = :payer_amount_sat, + receiver_amount_sat = :receiver_amount_sat, + mrh_address = :mrh_address, + created_at = :created_at, + payment_hash = :payment_hash, + description = :description + WHERE id = :id", + params, + )?; + } + false => { + tx.execute( + " + INSERT INTO receive_swaps( + id, + invoice, + preimage, + create_response_json, + claim_fees_sat, + claim_private_key, + payer_amount_sat, + receiver_amount_sat, + mrh_address, + created_at, + payment_hash, + description, + state + ) + VALUES( + :id, + :invoice, + :preimage, + :create_response_json, + :claim_fees_sat, + :claim_private_key, + :payer_amount_sat, + :receiver_amount_sat, + :mrh_address, + :created_at, + :payment_hash, + :description, + :state + )", + [params, &[(":state", &PaymentState::Created)]] + .concat() + .as_slice(), + )?; + } + } + + Self::set_sync_state_stmt(&tx)?.execute(named_params! { + ":data_id": &sync_state.data_id, + ":record_id": &sync_state.record_id, + ":record_revision": &sync_state.record_revision, + ":is_local": &sync_state.is_local, + })?; + + tx.commit()?; + + Ok(()) + } + + pub(crate) fn commit_incoming_send_swap( + &self, + data: &SendSyncData, + sync_state: SyncState, + is_update: bool, + last_commit_time: Option, + ) -> Result<()> { + let mut con = self.get_connection()?; + let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + + if let Some(last_commit_time) = last_commit_time { + Self::check_commit_update(&tx, &sync_state.record_id, last_commit_time)?; + } + + let params = named_params! { + ":id": &data.swap_id, + ":invoice": &data.invoice, + ":create_response_json": &data.create_response_json, + ":refund_private_key": &data.refund_private_key, + ":payer_amount_sat": &data.payer_amount_sat, + ":receiver_amount_sat": &data.receiver_amount_sat, + ":created_at": &data.created_at, + ":preimage": &data.preimage, + ":payment_hash": &data.payment_hash, + ":description": &data.description, + }; + match is_update { + true => { + tx.execute( + " + UPDATE send_swaps + SET + invoice = :invoice, + create_response_json = :create_response_json, + refund_private_key = :refund_private_key, + payer_amount_sat = :payer_amount_sat, + receiver_amount_sat = :receiver_amount_sat, + created_at = :created_at, + preimage = :preimage, + payment_hash = :payment_hash, + description = :description + WHERE id = :id", + params, + )?; + } + false => { + tx.execute( + " + INSERT INTO send_swaps( + id, + invoice, + create_response_json, + refund_private_key, + payer_amount_sat, + receiver_amount_sat, + created_at, + preimage, + payment_hash, + description, + state + ) + VALUES( + :id, + :invoice, + :create_response_json, + :refund_private_key, + :payer_amount_sat, + :receiver_amount_sat, + :created_at, + :preimage, + :payment_hash, + :description, + :state + )", + [params, &[(":state", &PaymentState::Created)]] + .concat() + .as_slice(), + )?; + } + } + + Self::set_sync_state_stmt(&tx)?.execute(named_params! { + ":data_id": &sync_state.data_id, + ":record_id": &sync_state.record_id, + ":record_revision": &sync_state.record_revision, + ":is_local": &sync_state.is_local, + })?; + + tx.commit()?; + + Ok(()) + } + + pub(crate) fn commit_incoming_chain_swap( + &self, + data: &ChainSyncData, + sync_state: SyncState, + is_update: bool, + last_commit_time: Option, + ) -> Result<()> { + let mut con = self.get_connection()?; + let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + + if let Some(last_commit_time) = last_commit_time { + Self::check_commit_update(&tx, &sync_state.record_id, last_commit_time)?; + } + + let params = named_params! { + ":id": &data.swap_id, + ":preimage": &data.preimage, + ":create_response_json": &data.create_response_json, + ":direction": &data.direction, + ":lockup_address": &data.lockup_address, + ":claim_fees_sat": &data.claim_fees_sat, + ":claim_private_key": &data.claim_private_key, + ":refund_private_key": &data.refund_private_key, + ":timeout_block_height": &data.timeout_block_height, + ":payer_amount_sat": &data.payer_amount_sat, + ":receiver_amount_sat": &data.receiver_amount_sat, + ":accept_zero_conf": &data.accept_zero_conf, + ":created_at": &data.created_at, + ":description": &data.description, + }; + match is_update { + true => { + tx.execute( + " + UPDATE chain_swaps + SET + preimage = :preimage, + create_response_json = :create_response_json, + direction = :direction, + lockup_address = :lockup_address, + claim_fees_sat = :claim_fees_sat, + claim_private_key = :claim_private_key, + refund_private_key = :refund_private_key, + timeout_block_height = :timeout_block_height, + payer_amount_sat = :payer_amount_sat, + receiver_amount_sat = :receiver_amount_sat, + accept_zero_conf = :accept_zero_conf, + created_at = :created_at, + description = :description, + server_lockup_tx_id = :server_lockup_tx_id + WHERE id = :id", + params, + )?; + } + false => { + tx.execute( + " + INSERT INTO chain_swaps( + id, + preimage, + create_response_json, + direction, + lockup_address, + claim_fees_sat, + claim_private_key, + refund_private_key, + timeout_block_height, + payer_amount_sat, + receiver_amount_sat, + accept_zero_conf, + created_at, + description, + server_lockup_tx_id, + state + ) + VALUES( + :id, + :preimage, + :create_response_json, + :direction, + :lockup_address, + :claim_fees_sat, + :claim_private_key, + :refund_private_key, + :timeout_block_height, + :payer_amount_sat, + :receiver_amount_sat, + :accept_zero_conf, + :created_at, + :description, + :server_lockup_tx_id, + :state + )", + [params, &[(":state", &PaymentState::Created)]] + .concat() + .as_slice(), + )?; + } + } + + Self::set_sync_state_stmt(&tx)?.execute(named_params! { + ":data_id": &sync_state.data_id, + ":record_id": &sync_state.record_id, + ":record_revision": &sync_state.record_revision, + ":is_local": &sync_state.is_local, + })?; + + tx.commit()?; + + Ok(()) + } +} diff --git a/lib/core/src/sync/mod.rs b/lib/core/src/sync/mod.rs new file mode 100644 index 000000000..d9a52514a --- /dev/null +++ b/lib/core/src/sync/mod.rs @@ -0,0 +1 @@ +pub(crate) mod model; diff --git a/lib/core/src/sync/model/mod.rs b/lib/core/src/sync/model/mod.rs new file mode 100644 index 000000000..fac48b461 --- /dev/null +++ b/lib/core/src/sync/model/mod.rs @@ -0,0 +1,53 @@ +use rusqlite::{ + types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef}, + ToSql, +}; + +pub(crate) mod sync; + +#[derive(Copy, Clone)] +pub(crate) enum RecordType { + Receive = 0, + Send = 1, + Chain = 2, +} + +impl ToSql for RecordType { + fn to_sql(&self) -> rusqlite::Result> { + Ok(rusqlite::types::ToSqlOutput::from(*self as i8)) + } +} + +impl FromSql for RecordType { + fn column_result(value: ValueRef<'_>) -> FromSqlResult { + match value { + ValueRef::Integer(i) => match i as u8 { + 0 => Ok(Self::Receive), + 1 => Ok(Self::Send), + 2 => Ok(Self::Chain), + _ => Err(FromSqlError::OutOfRange(i)), + }, + _ => Err(FromSqlError::InvalidType), + } + } +} + +pub(crate) struct SyncState { + pub(crate) data_id: String, + pub(crate) record_id: String, + pub(crate) record_revision: u64, + pub(crate) is_local: bool, +} + +pub(crate) struct SyncSettings { + pub(crate) remote_url: Option, + pub(crate) latest_revision: Option, +} + +pub(crate) struct SyncOutgoingChanges { + pub(crate) record_id: String, + pub(crate) data_id: String, + pub(crate) record_type: RecordType, + pub(crate) commit_time: u32, + pub(crate) updated_fields: Option>, +} diff --git a/lib/core/src/sync/model/sync.rs b/lib/core/src/sync/model/sync.rs new file mode 100644 index 000000000..aee7f261c --- /dev/null +++ b/lib/core/src/sync/model/sync.rs @@ -0,0 +1,228 @@ +// This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Record { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(uint64, tag = "2")] + pub revision: u64, + #[prost(string, tag = "3")] + pub schema_version: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "4")] + pub data: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetRecordRequest { + #[prost(message, optional, tag = "1")] + pub record: ::core::option::Option, + #[prost(uint32, tag = "2")] + pub request_time: u32, + #[prost(string, tag = "3")] + pub signature: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct SetRecordReply { + #[prost(enumeration = "SetRecordStatus", tag = "1")] + pub status: i32, + #[prost(uint64, tag = "2")] + pub new_revision: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListChangesRequest { + #[prost(uint64, tag = "1")] + pub since_revision: u64, + #[prost(uint32, tag = "2")] + pub request_time: u32, + #[prost(string, tag = "3")] + pub signature: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListChangesReply { + #[prost(message, repeated, tag = "1")] + pub changes: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TrackChangesRequest { + #[prost(uint32, tag = "1")] + pub request_time: u32, + #[prost(string, tag = "2")] + pub signature: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum SetRecordStatus { + Success = 0, + Conflict = 1, +} +impl SetRecordStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Success => "SUCCESS", + Self::Conflict => "CONFLICT", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SUCCESS" => Some(Self::Success), + "CONFLICT" => Some(Self::Conflict), + _ => None, + } + } +} +/// Generated client implementations. +pub mod syncer_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct SyncerClient { + inner: tonic::client::Grpc, + } + impl SyncerClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl SyncerClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> SyncerClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + SyncerClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn set_record( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/sync.Syncer/SetRecord"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("sync.Syncer", "SetRecord")); + self.inner.unary(req, path, codec).await + } + pub async fn list_changes( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/sync.Syncer/ListChanges"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("sync.Syncer", "ListChanges")); + self.inner.unary(req, path, codec).await + } + pub async fn track_changes( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/sync.Syncer/TrackChanges"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("sync.Syncer", "TrackChanges")); + self.inner.server_streaming(req, path, codec).await + } + } +} diff --git a/lib/core/src/sync/proto/sync.proto b/lib/core/src/sync/proto/sync.proto new file mode 100644 index 000000000..51e345f3a --- /dev/null +++ b/lib/core/src/sync/proto/sync.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +option go_package = "github.com/breez/data-sync/proto"; +package sync; + +service Syncer { + rpc SetRecord(SetRecordRequest) returns (SetRecordReply) {} + rpc ListChanges(ListChangesRequest) returns (ListChangesReply) {} + rpc TrackChanges(TrackChangesRequest) returns (stream Record); +} + +message Record { + string id = 1; + uint64 revision = 2; + string schema_version = 3; + bytes data = 4; +} + +message SetRecordRequest { + Record record = 1; + uint32 request_time = 2; + string signature = 3; +} +enum SetRecordStatus { + SUCCESS = 0; + CONFLICT = 1; +} +message SetRecordReply { + SetRecordStatus status = 1; + uint64 new_revision = 2; +} + +message ListChangesRequest { + uint64 since_revision = 1; + uint32 request_time = 2; + string signature = 3; +} +message ListChangesReply { repeated Record changes = 1; } + +message TrackChangesRequest { + uint32 request_time = 1; + string signature = 2; +}