Skip to content

Commit

Permalink
Recover onchain data for monitored swaps during sync
Browse files Browse the repository at this point in the history
  • Loading branch information
dangeross committed Nov 15, 2024
1 parent 1f66c83 commit 0b838eb
Show file tree
Hide file tree
Showing 10 changed files with 487 additions and 405 deletions.
94 changes: 28 additions & 66 deletions lib/core/src/chain_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_trait::async_trait;
use boltz_client::{
boltz::{self},
swaps::boltz::{ChainSwapStates, CreateChainResponse, SwapUpdateTxDetails},
Address, ElementsLockTime, LockTime, Secp256k1, Serialize, ToHex,
ElementsLockTime, LockTime, Secp256k1, Serialize, ToHex,
};
use futures_util::TryFutureExt;
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -55,7 +55,10 @@ impl BlockListener for ChainSwapHandler {
if let Err(e) = self.rescan_incoming_user_lockup_txs(height, false).await {
error!("Error rescanning incoming user txs: {e:?}");
}
if let Err(e) = self.rescan_outgoing_claim_txs(height).await {
if let Err(e) = self
.rescan_server_lockup_txs(Direction::Outgoing, height)
.await
{
error!("Error rescanning outgoing server txs: {e:?}");
}
}
Expand All @@ -64,7 +67,10 @@ impl BlockListener for ChainSwapHandler {
if let Err(e) = self.check_outgoing_refunds(height).await {
warn!("Error checking outgoing refunds: {e:?}");
}
if let Err(e) = self.rescan_incoming_server_lockup_txs(height).await {
if let Err(e) = self
.rescan_server_lockup_txs(Direction::Incoming, height)
.await
{
error!("Error rescanning incoming server txs: {e:?}");
}
}
Expand Down Expand Up @@ -207,104 +213,60 @@ impl ChainSwapHandler {
Ok(())
}

pub(crate) async fn rescan_incoming_server_lockup_txs(&self, height: u32) -> Result<()> {
pub(crate) async fn rescan_server_lockup_txs(
&self,
direction: Direction,
height: u32,
) -> Result<()> {
let chain_swaps: Vec<ChainSwap> = self
.persister
.list_chain_swaps()?
.into_iter()
.filter(|s| {
s.direction == Direction::Incoming && s.state == Pending && s.claim_tx_id.is_none()
})
.filter(|s| s.direction == direction && s.state == Pending && s.claim_tx_id.is_none())
.collect();
info!(
"Rescanning {} incoming Chain Swap(s) server lockup txs at height {}",
"Rescanning {} {:?} Chain Swap(s) server lockup txs at height {}",
chain_swaps.len(),
direction,
height
);
for swap in chain_swaps {
if let Err(e) = self
.rescan_incoming_chain_swap_server_lockup_tx(&swap)
.await
{
if let Err(e) = self.rescan_chain_swap_server_lockup_tx(&swap).await {
error!(
"Error rescanning server lockup of incoming Chain Swap {}: {e:?}",
swap.id
"Error rescanning server lockup of {:?} Chain Swap {}: {e:?}",
direction, swap.id,
);
}
}
Ok(())
}

async fn rescan_incoming_chain_swap_server_lockup_tx(&self, swap: &ChainSwap) -> Result<()> {
async fn rescan_chain_swap_server_lockup_tx(&self, swap: &ChainSwap) -> Result<()> {
let Some(tx_id) = swap.server_lockup_tx_id.clone() else {
// Skip the rescan if there is no server_lockup_tx_id yet
return Ok(());
};
let swap_id = &swap.id;
let swap_script = swap.get_claim_swap_script()?;
let script_history = self.fetch_liquid_script_history(&swap_script).await?;
let script_history = match swap.direction {
Direction::Incoming => self.fetch_liquid_script_history(&swap_script).await,
Direction::Outgoing => self.fetch_bitcoin_script_history(&swap_script).await,
}?;
let tx_history = script_history
.iter()
.find(|h| h.txid.to_hex().eq(&tx_id))
.ok_or(anyhow!(
"Server lockup tx for incoming Chain Swap {swap_id} was not found, txid={tx_id}"
"Server lockup tx for Chain Swap {swap_id} was not found, txid={tx_id}"
))?;
if tx_history.height > 0 {
info!("Incoming Chain Swap {swap_id} server lockup tx is confirmed");
info!("Chain Swap {swap_id} server lockup tx is confirmed");
self.claim(swap_id)
.await
.map_err(|e| anyhow!("Could not claim Chain Swap {swap_id}: {e:?}"))?;
}
Ok(())
}

pub(crate) async fn rescan_outgoing_claim_txs(&self, height: u32) -> Result<()> {
let chain_swaps: Vec<ChainSwap> = self
.persister
.list_chain_swaps()?
.into_iter()
.filter(|s| {
s.direction == Direction::Outgoing && s.state == Pending && s.claim_tx_id.is_some()
})
.collect();
info!(
"Rescanning {} outgoing Chain Swap(s) claim txs at height {}",
chain_swaps.len(),
height
);
for swap in chain_swaps {
if let Err(e) = self.rescan_outgoing_chain_swap_claim_tx(&swap).await {
error!("Error rescanning outgoing Chain Swap {}: {e:?}", swap.id);
}
}
Ok(())
}

async fn rescan_outgoing_chain_swap_claim_tx(&self, swap: &ChainSwap) -> Result<()> {
if let Some(claim_address) = &swap.claim_address {
let address = Address::from_str(claim_address)?;
let claim_tx_id = swap.claim_tx_id.clone().ok_or(anyhow!("No claim tx id"))?;
let script_pubkey = address.assume_checked().script_pubkey();
let script_history = self
.bitcoin_chain_service
.lock()
.await
.get_script_history(script_pubkey.as_script())?;
let claim_tx_history = script_history
.iter()
.find(|h| h.txid.to_hex().eq(&claim_tx_id) && h.height > 0);
if claim_tx_history.is_some() {
info!(
"Outgoing Chain Swap {} claim tx is confirmed. Setting the swap to Complete",
swap.id
);
self.update_swap_info(&swap.id, Complete, None, None, None, None)
.await?;
}
}
Ok(())
}

async fn on_new_incoming_status(&self, swap: &ChainSwap, update: &boltz::Update) -> Result<()> {
let id = &update.id;
let status = &update.status;
Expand Down Expand Up @@ -704,7 +666,7 @@ impl ChainSwapHandler {
claim_tx_id: Option<&str>,
refund_tx_id: Option<&str>,
) -> Result<(), PaymentError> {
info!("Transitioning Chain swap {swap_id} to {to_state:?} (server_lockup_tx_id = {:?}, user_lockup_tx_id = {:?}, claim_tx_id = {:?}), refund_tx_id = {:?})", server_lockup_tx_id, user_lockup_tx_id, claim_tx_id, refund_tx_id);
info!("Transitioning Chain swap {swap_id} to {to_state:?} (server_lockup_tx_id = {:?}, user_lockup_tx_id = {:?}, claim_tx_id = {:?}, refund_tx_id = {:?})", server_lockup_tx_id, user_lockup_tx_id, claim_tx_id, refund_tx_id);

let swap: ChainSwap = self
.persister
Expand Down
60 changes: 12 additions & 48 deletions lib/core/src/persist/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use anyhow::Result;
use boltz_client::swaps::boltz::{ChainSwapDetails, CreateChainResponse};
use rusqlite::{named_params, params, Connection, Row};
Expand Down Expand Up @@ -184,63 +182,29 @@ impl Persister {

pub(crate) fn list_chain_swaps_by_state(
&self,
con: &Connection,
states: Vec<PaymentState>,
) -> Result<Vec<ChainSwap>> {
let con = self.get_connection()?;
let where_clause = vec![get_where_clause_state_in(&states)];
self.list_chain_swaps_where(con, where_clause)
self.list_chain_swaps_where(&con, where_clause)
}

pub(crate) fn list_ongoing_chain_swaps(&self, con: &Connection) -> Result<Vec<ChainSwap>> {
self.list_chain_swaps_by_state(con, vec![PaymentState::Created, PaymentState::Pending])
pub(crate) fn list_ongoing_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
let con = self.get_connection()?;
let where_clause = vec![get_where_clause_state_in(&[
PaymentState::Created,
PaymentState::Pending,
])];

self.list_chain_swaps_where(&con, where_clause)
}

pub(crate) fn list_pending_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
let con: Connection = self.get_connection()?;
self.list_chain_swaps_by_state(
&con,
vec![PaymentState::Pending, PaymentState::RefundPending],
)
self.list_chain_swaps_by_state(vec![PaymentState::Pending, PaymentState::RefundPending])
}

pub(crate) fn list_refundable_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
let con: Connection = self.get_connection()?;
self.list_chain_swaps_by_state(&con, vec![PaymentState::Refundable])
}

/// Pending Chain swaps, indexed by refund tx id
pub(crate) fn list_pending_chain_swaps_by_refund_tx_id(
&self,
) -> Result<HashMap<String, ChainSwap>> {
let res: HashMap<String, ChainSwap> = self
.list_pending_chain_swaps()?
.iter()
.filter_map(|pending_chain_swap| {
pending_chain_swap
.refund_tx_id
.as_ref()
.map(|refund_tx_id| (refund_tx_id.clone(), pending_chain_swap.clone()))
})
.collect();
Ok(res)
}

/// This only returns the swaps that have a claim tx, skipping the pending ones that are being refunded.
pub(crate) fn list_pending_chain_swaps_by_claim_tx_id(
&self,
) -> Result<HashMap<String, ChainSwap>> {
let con: Connection = self.get_connection()?;
let res: HashMap<String, ChainSwap> = self
.list_chain_swaps_by_state(&con, vec![PaymentState::Pending])?
.iter()
.filter_map(|pending_chain_swap| {
pending_chain_swap
.claim_tx_id
.as_ref()
.map(|claim_tx_id| (claim_tx_id.clone(), pending_chain_swap.clone()))
})
.collect();
Ok(res)
self.list_chain_swaps_by_state(vec![PaymentState::Refundable])
}

pub(crate) fn update_chain_swap_accept_zero_conf(
Expand Down
62 changes: 57 additions & 5 deletions lib/core/src/persist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ mod migrations;
pub(crate) mod receive;
pub(crate) mod send;

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::{fs::create_dir_all, path::PathBuf, str::FromStr};

use crate::error::PaymentError;
use crate::lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription};
use crate::model::*;
use crate::{get_invoice_description, utils};
use anyhow::{anyhow, Result};
use lwk_wollet::WalletTx;
use migrations::current_migrations;
use rusqlite::{params, params_from_iter, Connection, OptionalExtension, Row, ToSql};
use rusqlite_migration::{Migrations, M};
use sdk_common::bitcoin::hashes::hex::ToHex;

const DEFAULT_DB_FILENAME: &str = "storage.sql";

Expand Down Expand Up @@ -85,6 +87,35 @@ impl Persister {
}
}

pub(crate) fn insert_or_update_payment_with_wallet_tx(
&self,
tx: &WalletTx,
) -> Result<(), PaymentError> {
let tx_id = tx.txid.to_string();
let is_tx_confirmed = tx.height.is_some();
let amount_sat = tx.balance.values().sum::<i64>();
let maybe_script_pubkey = tx
.outputs
.iter()
.find(|output| output.is_some())
.and_then(|output| output.clone().map(|o| o.script_pubkey.to_hex()));
self.insert_or_update_payment(
PaymentTxData {
tx_id: tx_id.clone(),
timestamp: tx.timestamp,
amount_sat: amount_sat.unsigned_abs(),
fees_sat: tx.fee,
payment_type: match amount_sat >= 0 {
true => PaymentType::Receive,
false => PaymentType::Send,
},
is_confirmed: is_tx_confirmed,
},
maybe_script_pubkey,
None,
)
}

pub(crate) fn insert_or_update_payment(
&self,
ptx: PaymentTxData,
Expand Down Expand Up @@ -130,19 +161,18 @@ impl Persister {
}

pub(crate) fn list_ongoing_swaps(&self) -> Result<Vec<Swap>> {
let con = self.get_connection()?;
let ongoing_send_swaps: Vec<Swap> = self
.list_ongoing_send_swaps(&con)?
.list_ongoing_send_swaps()?
.into_iter()
.map(Swap::Send)
.collect();
let ongoing_receive_swaps: Vec<Swap> = self
.list_ongoing_receive_swaps(&con)?
.list_ongoing_receive_swaps()?
.into_iter()
.map(Swap::Receive)
.collect();
let ongoing_chain_swaps: Vec<Swap> = self
.list_ongoing_chain_swaps(&con)?
.list_ongoing_chain_swaps()?
.into_iter()
.map(Swap::Chain)
.collect();
Expand Down Expand Up @@ -460,6 +490,28 @@ impl Persister {
.collect();
Ok(payments)
}

pub fn get_payments_by_tx_id(
&self,
req: &ListPaymentsRequest,
) -> Result<HashMap<String, Payment>> {
let res: HashMap<String, Payment> = self
.get_payments(req)?
.into_iter()
.flat_map(|payment| {
// Index payments by both tx_id (lockup/claim) and refund_tx_id
let mut res = vec![];
if let Some(tx_id) = payment.tx_id.clone() {
res.push((tx_id, payment.clone()));
}
if let Some(refund_tx_id) = payment.get_refund_tx_id() {
res.push((refund_tx_id, payment));
}
res
})
.collect();
Ok(res)
}
}

fn filter_to_where_clause(req: &ListPaymentsRequest) -> (String, Vec<Box<dyn ToSql + '_>>) {
Expand Down
Loading

0 comments on commit 0b838eb

Please sign in to comment.