Skip to content

Commit

Permalink
Merge pull request #54 from getAlby/chore/update-poll-mempool-entries…
Browse files Browse the repository at this point in the history
…-interatively

fix: For bitcoin RPC backend - poll mempool entries interatively
  • Loading branch information
rolznz authored Nov 26, 2024
2 parents f3b3743 + a15ec85 commit 3ba4e8a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 38 deletions.
117 changes: 80 additions & 37 deletions src/chain/bitcoind_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,33 @@ impl BitcoindRpcClient {
}
}

pub(crate) async fn get_raw_mempool(&self) -> std::io::Result<Vec<RawMempoolEntry>> {
let verbose_flag_json = serde_json::json!(true);
pub(crate) async fn get_raw_mempool(&self) -> std::io::Result<Vec<Txid>> {
let verbose_flag_json = serde_json::json!(false);
self.rpc_client
.call_method::<GetRawMempoolResponse>("getrawmempool", &vec![verbose_flag_json])
.await
.map(|resp| resp.0)
}

pub(crate) async fn get_mempool_entry(&self, txid: &Txid) -> std::io::Result<MempoolEntry> {
let txid_hex = bitcoin::consensus::encode::serialize_hex(txid);
let txid_json = serde_json::json!(txid_hex);
self.rpc_client
.call_method::<GetMempoolEntryResponse>("getmempoolentry", &vec![txid_json])
.await
.map(|resp| MempoolEntry { txid: txid.clone(), height: resp.height, time: resp.time })
}

pub(crate) async fn get_mempool_entries(&self) -> std::io::Result<Vec<MempoolEntry>> {
let mempool_txids = self.get_raw_mempool().await?;
let mut mempool_entries = Vec::with_capacity(mempool_txids.len());
for txid in &mempool_txids {
let entry = self.get_mempool_entry(txid).await?;
mempool_entries.push(entry);
}
Ok(mempool_entries)
}

/// Get mempool transactions, alongside their first-seen unix timestamps.
///
/// This method is an adapted version of `bdk_bitcoind_rpc::Emitter::mempool`. It emits each
Expand All @@ -132,7 +151,7 @@ impl BitcoindRpcClient {
let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed);
let mut latest_time = prev_mempool_time;

let mempool_entries = self.get_raw_mempool().await?;
let mempool_entries = self.get_mempool_entries().await?;
let mut txs_to_emit = Vec::new();

for entry in mempool_entries {
Expand Down Expand Up @@ -254,58 +273,82 @@ impl TryInto<GetRawTransactionResponse> for JsonResponse {
}
}

pub struct GetRawMempoolResponse(Vec<RawMempoolEntry>);
pub struct GetRawMempoolResponse(Vec<Txid>);

impl TryInto<GetRawMempoolResponse> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<GetRawMempoolResponse> {
let mut mempool_transactions = Vec::new();
let res = self.0.as_object().ok_or(std::io::Error::new(
let res = self.0.as_array().ok_or(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
))?;

for (k, v) in res {
let txid = match bitcoin::consensus::encode::deserialize_hex(k) {
Ok(txid) => txid,
Err(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
));
},
};

let time = match v["time"].as_u64() {
Some(time) => time,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
));
},
};
let mut mempool_transactions = Vec::with_capacity(res.len());

let height = match v["height"].as_u64().and_then(|h| h.try_into().ok()) {
Some(height) => height,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
));
},
for hex in res {
let txid = if let Some(hex_str) = hex.as_str() {
match bitcoin::consensus::encode::deserialize_hex(hex_str) {
Ok(txid) => txid,
Err(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
));
},
}
} else {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getrawmempool response",
));
};
let entry = RawMempoolEntry { txid, time, height };

mempool_transactions.push(entry);
mempool_transactions.push(txid);
}

Ok(GetRawMempoolResponse(mempool_transactions))
}
}

pub struct GetMempoolEntryResponse {
time: u64,
height: u32,
}

impl TryInto<GetMempoolEntryResponse> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<GetMempoolEntryResponse> {
let res = self.0.as_object().ok_or(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getmempoolentry response",
))?;

let time = match res["time"].as_u64() {
Some(time) => time,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getmempoolentry response",
));
},
};

let height = match res["height"].as_u64().and_then(|h| h.try_into().ok()) {
Some(height) => height,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to parse getmempoolentry response",
));
},
};

Ok(GetMempoolEntryResponse { time, height })
}
}

#[derive(Debug, Clone)]
pub(crate) struct RawMempoolEntry {
pub(crate) struct MempoolEntry {
/// The transaction id
txid: Txid,
/// Local time transaction entered pool in seconds since 1 Jan 1970 GMT
Expand Down
2 changes: 1 addition & 1 deletion src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/ap
// The default Esplora client timeout we're using.
pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10;

const CHAIN_POLLING_INTERVAL_SECS: u64 = 1;
const CHAIN_POLLING_INTERVAL_SECS: u64 = 2;

pub(crate) enum WalletSyncStatus {
Completed,
Expand Down

0 comments on commit 3ba4e8a

Please sign in to comment.