Skip to content

Commit

Permalink
feat(chain): Introduce tx_graph::Update WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
LagginTimes committed Aug 19, 2024
1 parent e0822d7 commit 6818d79
Show file tree
Hide file tree
Showing 16 changed files with 306 additions and 151 deletions.
11 changes: 6 additions & 5 deletions crates/chain/src/spk_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use crate::{
alloc::{boxed::Box, collections::VecDeque, vec::Vec},
collections::BTreeMap,
local_chain::CheckPoint,
ConfirmationBlockTime, Indexed, TxGraph,
tx_graph::Update,
ConfirmationBlockTime, Indexed,
};
use bitcoin::{OutPoint, Script, ScriptBuf, Txid};

Expand Down Expand Up @@ -345,8 +346,8 @@ impl<I> SyncRequest<I> {
#[must_use]
#[derive(Debug)]
pub struct SyncResult<A = ConfirmationBlockTime> {
/// The update to apply to the receiving [`TxGraph`].
pub graph_update: TxGraph<A>,
/// The update to apply to the receiving [`Update`].
pub graph_update: Update<A>,
/// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain).
pub chain_update: Option<CheckPoint>,
}
Expand Down Expand Up @@ -497,8 +498,8 @@ impl<K: Ord + Clone> FullScanRequest<K> {
#[derive(Debug)]
pub struct FullScanResult<K, A = ConfirmationBlockTime> {
/// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain).
pub graph_update: TxGraph<A>,
/// The update to apply to the receiving [`TxGraph`].
pub graph_update: Update<A>,
/// The update to apply to the receiving [`Update`].
pub chain_update: Option<CheckPoint>,
/// Last active indices for the corresponding keychains (`K`).
pub last_active_indices: BTreeMap<K, u32>,
Expand Down
185 changes: 155 additions & 30 deletions crates/chain/src/tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@
//! [`insert_txout`]: TxGraph::insert_txout

use crate::{
collections::*, Anchor, Balance, BlockId, ChainOracle, ChainPosition, FullTxOut, Merge,
collections::*, Anchor, Balance, BlockId, ChainOracle, ChainPosition, ConfirmationBlockTime,
FullTxOut, Merge,
};
use alloc::collections::vec_deque::VecDeque;
use alloc::sync::Arc;
Expand Down Expand Up @@ -640,15 +641,15 @@ impl<A: Clone + Ord> TxGraph<A> {
///
/// The returned [`ChangeSet`] is the set difference between `update` and `self` (transactions that
/// exist in `update` but not in `self`).
pub fn apply_update(&mut self, update: TxGraph<A>) -> ChangeSet<A> {
let changeset = self.determine_changeset(update);
pub fn apply_update(&mut self, update: impl Into<Update<A>>) -> ChangeSet<A> {
let changeset = self.determine_changeset(update.into());
self.apply_changeset(changeset.clone());
changeset
}

/// Determines the [`ChangeSet`] between `self` and an empty [`TxGraph`].
pub fn initial_changeset(&self) -> ChangeSet<A> {
Self::default().determine_changeset(self.clone())
Self::default().determine_changeset(Update::from(self.clone()))
}

/// Applies [`ChangeSet`] to [`TxGraph`].
Expand Down Expand Up @@ -715,36 +716,28 @@ impl<A: Clone + Ord> TxGraph<A> {
///
/// The [`ChangeSet`] would be the set difference between `update` and `self` (transactions that
/// exist in `update` but not in `self`).
pub(crate) fn determine_changeset(&self, update: TxGraph<A>) -> ChangeSet<A> {
pub(crate) fn determine_changeset(&self, update: Update<A>) -> ChangeSet<A> {
let mut changeset = ChangeSet::<A>::default();

for (&txid, (update_tx_node, _)) in &update.txs {
match (self.txs.get(&txid), update_tx_node) {
(None, TxNodeInternal::Whole(update_tx)) => {
changeset.txs.insert(update_tx.clone());
for (txid, tx) in update.whole_txs {
match self.txs.get(&txid) {
None | Some((TxNodeInternal::Partial(_), _)) => {
changeset.txs.insert(tx);
}
(None, TxNodeInternal::Partial(update_txos)) => {
changeset.txouts.extend(
update_txos
.iter()
.map(|(&vout, txo)| (OutPoint::new(txid, vout), txo.clone())),
);
}
(Some((TxNodeInternal::Whole(_), _)), _) => {}
(Some((TxNodeInternal::Partial(_), _)), TxNodeInternal::Whole(update_tx)) => {
changeset.txs.insert(update_tx.clone());
}
(
Some((TxNodeInternal::Partial(txos), _)),
TxNodeInternal::Partial(update_txos),
) => {
changeset.txouts.extend(
update_txos
.iter()
.filter(|(vout, _)| !txos.contains_key(*vout))
.map(|(&vout, txo)| (OutPoint::new(txid, vout), txo.clone())),
);
Some((TxNodeInternal::Whole(old_tx), _)) if *old_tx != tx => {
// Update the `tx` in graph if does not match up with the tx from `update`.
changeset.txs.insert(tx);
}
_ => {}
}
}

for (op, txout) in update.partial_txs {
if matches!(
self.txs.get(&op.txid),
None | Some((TxNodeInternal::Partial(_), _))
) {
changeset.txouts.insert(op, txout);
}
}

Expand All @@ -761,6 +754,138 @@ impl<A: Clone + Ord> TxGraph<A> {
}
}

/// An update to [`TxGraph`].
#[derive(Clone, Debug)]
pub struct Update<A = ()> {
whole_txs: HashMap<Txid, Arc<Transaction>>,
partial_txs: HashMap<OutPoint, TxOut>,
last_seen: HashMap<Txid, u64>,
anchors: BTreeSet<(A, Txid)>,
}

impl<A> Default for Update<A> {
fn default() -> Self {
Update {
whole_txs: Default::default(),
partial_txs: Default::default(),
last_seen: Default::default(),
anchors: Default::default(),
}
}
}

impl<A> Update<A> {
/// Iterate over all full transactions in the graph.
pub fn whole_txs(&self) -> impl Iterator<Item = (Txid, Arc<Transaction>)> {
self.whole_txs.clone().into_iter()
}

/// Get a transaction by txid. This only returns `Some` for full transactions.
pub fn get_tx(&self, txid: Txid) -> Option<Arc<Transaction>> {
self.whole_txs.get(&txid).cloned()
}

/// Inserts the given transaction into [`Update`].
pub fn insert_tx<T: Into<Arc<Transaction>>>(&mut self, tx: T) {
let tx = tx.into();
let txid = tx.compute_txid();

// Remove any floating txouts with the full transaction's txid to enforce invariance.
self.partial_txs.retain(|op, _| op.txid != txid);

self.whole_txs.insert(txid, tx);
}

/// Inserts the given [`TxOut`] at [`OutPoint`] into [`Update`].
///
/// Inserting floating txouts are useful for determining fee/feerate of transactions we care
/// about.
pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) {
self.partial_txs.insert(outpoint, txout);
}

/// Inserts the given `seen_at` for `txid` into [`Update`].
pub fn insert_seen_at(&mut self, txid: Txid, seen_at: u64) {
self.last_seen.insert(txid, seen_at);
}
}

impl<A: Anchor> Update<A> {
/// Get all transaction anchors known by [`Update`].
pub fn all_anchors(&self) -> &BTreeSet<(A, Txid)> {
&self.anchors
}

/// Returns the [`Update`] as a `TxGraph` with `ConfirmationBlockTime` anchors.
pub fn into_tx_graph(self) -> TxGraph<ConfirmationBlockTime> {
let mut txs = HashMap::new();
let mut conf_anchors = BTreeSet::new();

for (txid, tx) in self.whole_txs {
txs.insert(txid, (TxNodeInternal::Whole(tx), BTreeSet::new()));
}
for (op, txout) in self.partial_txs {
txs.insert(
op.txid,
(
TxNodeInternal::Partial([(op.vout, txout)].into()),
BTreeSet::new(),
),
);
}
for (anchor, txid) in self.anchors {
conf_anchors.insert((
ConfirmationBlockTime {
block_id: anchor.anchor_block(),
confirmation_time: anchor.confirmation_height_upper_bound() as u64,
},
txid,
));
}

TxGraph {
txs,
spends: BTreeMap::new(),
anchors: conf_anchors,
last_seen: self.last_seen,
empty_outspends: HashSet::new(),
}
}
}

impl<A: Ord> Update<A> {
/// Inserts the given `anchor` into [`Update`].
pub fn insert_anchor(&mut self, txid: Txid, anchor: A) {
self.anchors.insert((anchor, txid));
}

/// Extends this [`Update`] with another so that `self` becomes the union of the two sets of
/// [`Update`]s.
pub fn extend(&mut self, update: Update<A>) {
self.whole_txs.extend(update.whole_txs);
self.partial_txs.extend(update.partial_txs);
self.last_seen.extend(update.last_seen);
self.anchors.extend(update.anchors);
}
}

impl<A> From<TxGraph<A>> for Update<A> {
fn from(graph: TxGraph<A>) -> Self {
Update {
whole_txs: graph
.full_txs()
.map(|value| (value.txid, value.tx))
.collect::<HashMap<_, _>>(),
partial_txs: graph
.floating_txouts()
.map(|(op, txout)| (op, txout.clone()))
.collect::<HashMap<_, _>>(),
last_seen: graph.last_seen,
anchors: graph.anchors,
}
}
}

impl<A: Anchor> TxGraph<A> {
/// Get the position of the transaction in `chain` with tip `chain_tip`.
///
Expand Down
37 changes: 17 additions & 20 deletions crates/electrum/src/bdk_electrum_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use bdk_chain::{
collections::{BTreeMap, HashMap},
local_chain::CheckPoint,
spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult},
tx_graph::TxGraph,
tx_graph::Update,
Anchor, BlockId, ConfirmationBlockTime,
};
use electrum_client::{ElectrumApi, Error, HeaderNotification};
Expand Down Expand Up @@ -39,11 +39,8 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {

/// Inserts transactions into the transaction cache so that the client will not fetch these
/// transactions.
pub fn populate_tx_cache<A>(&self, tx_graph: impl AsRef<TxGraph<A>>) {
let txs = tx_graph
.as_ref()
.full_txs()
.map(|tx_node| (tx_node.txid, tx_node.tx));
pub fn populate_tx_cache<A>(&self, update: impl Into<Update<A>>) {
let txs = update.into().whole_txs();

let mut tx_cache = self.tx_cache.lock().unwrap();
for (txid, tx) in txs {
Expand Down Expand Up @@ -138,7 +135,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
None => None,
};

let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
let mut graph_update = Update::<ConfirmationBlockTime>::default();
let mut last_active_indices = BTreeMap::<K, u32>::default();
for keychain in request.keychains() {
let spks = request.iter_spks(keychain.clone());
Expand Down Expand Up @@ -205,7 +202,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
None => None,
};

let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
let mut graph_update = Update::<ConfirmationBlockTime>::default();
self.populate_with_spks(
&mut graph_update,
request
Expand Down Expand Up @@ -245,7 +242,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
/// also included.
fn populate_with_spks(
&self,
graph_update: &mut TxGraph<ConfirmationBlockTime>,
graph_update: &mut Update<ConfirmationBlockTime>,
mut spks: impl Iterator<Item = (u32, ScriptBuf)>,
stop_gap: usize,
batch_size: usize,
Expand Down Expand Up @@ -278,7 +275,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
}

for tx_res in spk_history {
let _ = graph_update.insert_tx(self.fetch_tx(tx_res.tx_hash)?);
graph_update.insert_tx(self.fetch_tx(tx_res.tx_hash)?);
self.validate_merkle_for_anchor(graph_update, tx_res.tx_hash, tx_res.height)?;
}
}
Expand All @@ -291,7 +288,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
/// included. Anchors of the aforementioned transactions are included.
fn populate_with_outpoints(
&self,
graph_update: &mut TxGraph<ConfirmationBlockTime>,
graph_update: &mut Update<ConfirmationBlockTime>,
outpoints: impl IntoIterator<Item = OutPoint>,
) -> Result<(), Error> {
for outpoint in outpoints {
Expand All @@ -314,7 +311,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {

if !has_residing && res.tx_hash == op_txid {
has_residing = true;
let _ = graph_update.insert_tx(Arc::clone(&op_tx));
graph_update.insert_tx(Arc::clone(&op_tx));
self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?;
}

Expand All @@ -328,7 +325,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
if !has_spending {
continue;
}
let _ = graph_update.insert_tx(Arc::clone(&res_tx));
graph_update.insert_tx(Arc::clone(&res_tx));
self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?;
}
}
Expand All @@ -339,7 +336,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
/// Populate the `graph_update` with transactions/anchors of the provided `txids`.
fn populate_with_txids(
&self,
graph_update: &mut TxGraph<ConfirmationBlockTime>,
graph_update: &mut Update<ConfirmationBlockTime>,
txids: impl IntoIterator<Item = Txid>,
) -> Result<(), Error> {
for txid in txids {
Expand All @@ -366,7 +363,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
self.validate_merkle_for_anchor(graph_update, txid, r.height)?;
}

let _ = graph_update.insert_tx(tx);
graph_update.insert_tx(tx);
}
Ok(())
}
Expand All @@ -375,7 +372,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
// An anchor is inserted if the transaction is validated to be in a confirmed block.
fn validate_merkle_for_anchor(
&self,
graph_update: &mut TxGraph<ConfirmationBlockTime>,
graph_update: &mut Update<ConfirmationBlockTime>,
txid: Txid,
confirmation_height: i32,
) -> Result<(), Error> {
Expand All @@ -402,7 +399,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
}

if is_confirmed_tx {
let _ = graph_update.insert_anchor(
graph_update.insert_anchor(
txid,
ConfirmationBlockTime {
confirmation_time: header.time as u64,
Expand All @@ -421,17 +418,17 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
// which we do not have by default. This data is needed to calculate the transaction fee.
fn fetch_prev_txout(
&self,
graph_update: &mut TxGraph<ConfirmationBlockTime>,
graph_update: &mut Update<ConfirmationBlockTime>,
) -> Result<(), Error> {
let full_txs: Vec<Arc<Transaction>> =
graph_update.full_txs().map(|tx_node| tx_node.tx).collect();
graph_update.whole_txs().map(|(_txid, tx)| tx).collect();
for tx in full_txs {
for vin in &tx.input {
let outpoint = vin.previous_output;
let vout = outpoint.vout;
let prev_tx = self.fetch_tx(outpoint.txid)?;
let txout = prev_tx.output[vout as usize].clone();
let _ = graph_update.insert_txout(outpoint, txout);
graph_update.insert_txout(outpoint, txout);
}
}
Ok(())
Expand Down
Loading

0 comments on commit 6818d79

Please sign in to comment.