Skip to content

Commit

Permalink
feat: add sync_incoming
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Nov 14, 2024
1 parent ac6d316 commit 3e95eff
Showing 1 changed file with 70 additions and 0 deletions.
70 changes: 70 additions & 0 deletions lib/core/src/persist/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,74 @@ impl Persister {
Ok(())
}

pub(crate) fn get_incoming_records(&self) -> Result<Vec<Record>> {
let con = self.get_connection()?;

let mut stmt = con.prepare(
"
SELECT
record_id,
revision,
schema_version,
data
FROM sync_incoming
",
)?;
let records = stmt
.query_map([], |row| {
Ok(Record {
id: row.get(0)?,
revision: row.get(1)?,
schema_version: row.get(2)?,
data: row.get(3)?,
})
})?
.map(|i| i.unwrap())
.collect();

Ok(records)
}

pub(crate) fn set_incoming_records(&self, records: &[Record]) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

for record in records {
tx.execute(
"
INSERT OR REPLACE INTO sync_incoming(record_id, revision, schema_version, data)
VALUES(:record_id, :revision, :schema_version, :data)
",
named_params! {
":record_id": record.id,
":revision": record.revision,
":schema_version": record.schema_version,
":data": record.data,
},
)?;
}

tx.commit()?;

Ok(())
}

pub(crate) fn remove_incoming_records(&self, record_ids: Vec<String>) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

for record_id in record_ids {
tx.execute(
"DELETE FROM sync_incoming WHERE record_id = :record_id",
named_params! {
":record_id": record_id
},
)?;
}

tx.commit()?;

Ok(())
}

}

0 comments on commit 3e95eff

Please sign in to comment.