Skip to content

Commit

Permalink
feat: prevent double claiming on status stream handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Nov 28, 2024
1 parent 49a78eb commit c6414d3
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 32 deletions.
49 changes: 49 additions & 0 deletions lib/core/src/chain_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ impl ChainSwapHandler {
.fetch_chain_swap_by_id(id)?
.ok_or(anyhow!("No ongoing Chain Swap found for ID {id}"))?;

if let Some(sync_state) = self.persister.get_sync_state_by_data_id(&swap.id)? {
if !sync_state.is_local {
let status = &update.status;
let swap_state = ChainSwapStates::from_str(status)
.map_err(|_| anyhow!("Invalid ChainSwapState for Chain Swap {id}: {status}"))?;

match swap_state {
ChainSwapStates::TransactionServerMempool
| ChainSwapStates::TransactionServerConfirmed => {
self.ensure_single_claim(&swap).await?;
}
_ => {}
}
}
}

match swap.direction {
Direction::Incoming => self.on_new_incoming_status(&swap, update).await,
Direction::Outgoing => self.on_new_outgoing_status(&swap, update).await,
Expand Down Expand Up @@ -692,6 +708,39 @@ impl ChainSwapHandler {
Ok(())
}

async fn ensure_single_claim(&self, swap: &ChainSwap) -> Result<()> {
let swap_script = swap.get_claim_swap_script()?;
let history = match swap_script {
SwapScriptV2::Bitcoin(script) => {
let bitcoin_chain_service = self.bitcoin_chain_service.lock().await;
let swap_script_pk = script
.to_address(self.config.network.into())
.map_err(|err| {
anyhow!("Could not retrieve taproot address from script: {err:?}")
})?
.script_pubkey();
bitcoin_chain_service.get_script_history(&swap_script_pk)?
}
SwapScriptV2::Liquid(script) => {
let liquid_chain_service = self.liquid_chain_service.lock().await;
let swap_script_pk = script
.to_address(self.config.network.into())
.map_err(|err| {
anyhow!("Could not retrieve taproot address from script: {err:?}")
})?
.script_pubkey();
liquid_chain_service
.get_script_history(&swap_script_pk)
.await?
}
};
if history.len() > 1 {
return Err(anyhow!("Claim transaction has already been broadcasted"));
}

Ok(())
}

async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> {
let swap = self
.persister
Expand Down
57 changes: 41 additions & 16 deletions lib/core/src/receive_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,34 @@ impl ReceiveSwapHandler {
/// Handles status updates from Boltz for Receive swaps
pub(crate) async fn on_new_status(&self, update: &boltz::Update) -> Result<()> {
let id = &update.id;
let swap_state = &update.status;
let status = &update.status;
let swap_state = RevSwapStates::from_str(status)
.map_err(|_| anyhow!("Invalid RevSwapState for Receive Swap {id}: {status}"))?;
let receive_swap = self
.persister
.fetch_receive_swap_by_id(id)?
.ok_or(anyhow!("No ongoing Receive Swap found for ID {id}"))?;

info!("Handling Receive Swap transition to {swap_state:?} for swap {id}");

match RevSwapStates::from_str(swap_state) {
Ok(
RevSwapStates::SwapExpired
| RevSwapStates::InvoiceExpired
| RevSwapStates::TransactionFailed
| RevSwapStates::TransactionRefunded,
) => {
if let Some(sync_state) = self.persister.get_sync_state_by_data_id(&receive_swap.id)? {
if !sync_state.is_local {
match swap_state {
// If the swap is not local (pulled from real-time sync) we want to ensure
// only one claim is being broadcast
RevSwapStates::TransactionMempool | RevSwapStates::TransactionConfirmed => {
self.ensure_single_claim(&receive_swap).await?;
}
_ => {}
}
}
}

match swap_state {
RevSwapStates::SwapExpired
| RevSwapStates::InvoiceExpired
| RevSwapStates::TransactionFailed
| RevSwapStates::TransactionRefunded => {
match receive_swap.mrh_tx_id {
Some(mrh_tx_id) => {
warn!("Swap {id} is expired but MRH payment was received: txid {mrh_tx_id}")
Expand All @@ -90,7 +103,7 @@ impl ReceiveSwapHandler {
}
// The lockup tx is in the mempool and we accept 0-conf => try to claim
// Execute 0-conf preconditions check
Ok(RevSwapStates::TransactionMempool) => {
RevSwapStates::TransactionMempool => {
let Some(transaction) = update.transaction.clone() else {
return Err(anyhow!("Unexpected payload from Boltz status stream"));
};
Expand Down Expand Up @@ -173,7 +186,7 @@ impl ReceiveSwapHandler {

Ok(())
}
Ok(RevSwapStates::TransactionConfirmed) => {
RevSwapStates::TransactionConfirmed => {
let Some(transaction) = update.transaction.clone() else {
return Err(anyhow!("Unexpected payload from Boltz status stream"));
};
Expand Down Expand Up @@ -219,14 +232,10 @@ impl ReceiveSwapHandler {
Ok(())
}

Ok(_) => {
debug!("Unhandled state for Receive Swap {id}: {swap_state}");
_ => {
debug!("Unhandled state for Receive Swap {id}: {swap_state:?}");
Ok(())
}

_ => Err(anyhow!(
"Invalid RevSwapState for Receive Swap {id}: {swap_state}"
)),
}
}

Expand Down Expand Up @@ -274,6 +283,22 @@ impl ReceiveSwapHandler {
Ok(())
}

async fn ensure_single_claim(&self, swap: &ReceiveSwap) -> Result<()> {
let liquid_chain_service = self.liquid_chain_service.lock().await;
let swap_script_pk = swap
.get_swap_script()?
.to_address(self.config.network.into())
.map_err(|err| anyhow!("Could not retrieve taproot address from script: {err:?}"))?
.script_pubkey();
let history = liquid_chain_service
.get_script_history(&swap_script_pk)
.await?;
if history.len() > 1 {
return Err(anyhow!("Claim transaction has already been broadcasted"));
}
Ok(())
}

async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> {
let swap = self
.persister
Expand Down
28 changes: 12 additions & 16 deletions lib/core/src/send_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ impl SendSwapHandler {
/// Handles status updates from Boltz for Send swaps
pub(crate) async fn on_new_status(&self, update: &boltz::Update) -> Result<()> {
let id = &update.id;
let swap_state = &update.status;
let status = &update.status;
let swap_state = SubSwapStates::from_str(status)
.map_err(|_| anyhow!("Invalid SubSwapState for Send Swap {id}: {status}"))?;
let swap = self
.persister
.fetch_send_swap_by_id(id)?
Expand All @@ -70,16 +72,16 @@ impl SendSwapHandler {
info!("Handling Send Swap transition to {swap_state:?} for swap {id}");

// See https://docs.boltz.exchange/v/api/lifecycle#normal-submarine-swaps
match SubSwapStates::from_str(swap_state) {
match swap_state {
// Boltz has locked the HTLC
Ok(SubSwapStates::InvoiceSet) => {
SubSwapStates::InvoiceSet => {
warn!("Received `invoice.set` state for Send Swap {id}");
Ok(())
}

// Boltz has detected the lockup in the mempool, we can speed up
// the claim by doing so cooperatively
Ok(SubSwapStates::TransactionClaimPending) => {
SubSwapStates::TransactionClaimPending => {
self.cooperate_claim(&swap).await.map_err(|e| {
error!("Could not cooperate Send Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
Expand All @@ -89,7 +91,7 @@ impl SendSwapHandler {
}

// Boltz announced they successfully broadcast the (cooperative or non-cooperative) claim tx
Ok(SubSwapStates::TransactionClaimed) => {
SubSwapStates::TransactionClaimed => {
debug!("Send Swap {id} has been claimed");

match swap.preimage {
Expand Down Expand Up @@ -118,11 +120,9 @@ impl SendSwapHandler {
// 2. The swap has expired (>24h)
// 3. Lockup failed (we sent too little funds)
// We initiate a cooperative refund, and then fallback to a regular one
Ok(
SubSwapStates::TransactionLockupFailed
| SubSwapStates::InvoiceFailedToPay
| SubSwapStates::SwapExpired,
) => {
SubSwapStates::TransactionLockupFailed
| SubSwapStates::InvoiceFailedToPay
| SubSwapStates::SwapExpired => {
match swap.lockup_tx_id {
Some(_) => match swap.refund_tx_id {
Some(refund_tx_id) => warn!(
Expand Down Expand Up @@ -161,14 +161,10 @@ impl SendSwapHandler {
Ok(())
}

Ok(_) => {
debug!("Unhandled state for Send Swap {id}: {swap_state}");
_ => {
debug!("Unhandled state for Send Swap {id}: {swap_state:?}");
Ok(())
}

_ => Err(anyhow!(
"Invalid SubSwapState for Send Swap {id}: {swap_state}"
)),
}
}

Expand Down

0 comments on commit c6414d3

Please sign in to comment.