Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mempool scan fix #299

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/bin/nomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use nomic::app::IbcDest;
use nomic::app::InnerApp;
use nomic::app::Nom;
use nomic::bitcoin::adapter::Adapter;
use nomic::bitcoin::deposit_index::DepositIndex;
use nomic::bitcoin::matches_bitcoin_network;
use nomic::bitcoin::signatory::SignatorySet;
use nomic::bitcoin::Nbtc;
Expand All @@ -39,9 +40,11 @@ use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tendermint_rpc::Client as _;
use tokio::sync::Mutex;

const BANNER: &str = r#"
███╗ ██╗ ██████╗ ███╗ ███╗ ██╗ ██████╗
Expand Down
2 changes: 1 addition & 1 deletion src/bitcoin/deposit_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct DepositInfo {

type ReceiverIndex = HashMap<String, HashMap<Address, HashMap<(Txid, u32), Deposit>>>;

#[derive(Default)]
#[derive(Default, Debug)]
pub struct DepositIndex {
pub receiver_index: ReceiverIndex,
}
Expand Down
175 changes: 90 additions & 85 deletions src/bitcoin/relayer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(async_closure)]

Check failure on line 1 in src/bitcoin/relayer.rs

View workflow job for this annotation

GitHub Actions / clippy

crate-level attribute should be in the root module

error: crate-level attribute should be in the root module --> src/bitcoin/relayer.rs:1:1 | 1 | #![feature(async_closure)] | ^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: `-D unused-attributes` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(unused_attributes)]`
use super::signatory::Signatory;
use super::SignatorySet;
use super::SIGSET_THRESHOLD;
Expand All @@ -10,6 +11,7 @@
use crate::orga::encoding::Encode;
use crate::utils::time_now;
use bitcoin::consensus::{Decodable, Encodable};
use bitcoin::TxOut;

Check failure on line 14 in src/bitcoin/relayer.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `bitcoin::TxOut`

error: unused import: `bitcoin::TxOut` --> src/bitcoin/relayer.rs:14:5 | 14 | use bitcoin::TxOut; | ^^^^^^^^^^^^^^ | = note: `-D unused-imports` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(unused_imports)]`
use bitcoin::Txid;
use bitcoin::{hashes::Hash, Block, BlockHash, Transaction};
use bitcoincore_rpc_async::{json::GetBlockHeaderResult, Client as BitcoinRpcClient, RpcApi};
Expand All @@ -22,7 +24,7 @@
use std::future::Future;
use std::sync::Arc;
use tokio::join;
use tokio::sync::mpsc::Receiver;

Check failure on line 27 in src/bitcoin/relayer.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `tokio::sync::mpsc::Receiver`

error: unused import: `tokio::sync::mpsc::Receiver` --> src/bitcoin/relayer.rs:27:5 | 27 | use tokio::sync::mpsc::Receiver; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^
use tokio::sync::{Mutex, RwLock, RwLockReadGuard};
use warp::reject;
use warp::reply::Json;
Expand All @@ -45,7 +47,7 @@
btc_client: Arc<RwLock<BitcoinRpcClient>>,
app_client_addr: String,

scripts: Option<WatchedScriptStore>,
scripts: Arc<Mutex<Option<WatchedScriptStore>>>,
deposit_buffer: Option<u64>,
}

Expand All @@ -54,7 +56,7 @@
Relayer {
btc_client: Arc::new(RwLock::new(btc_client)),
app_client_addr,
scripts: None,
scripts: Arc::new(Mutex::new(None)),
deposit_buffer: None,
}
}
Expand Down Expand Up @@ -120,34 +122,49 @@
) -> Result<()> {
info!("Starting deposit relay...");

let index = Arc::new(Mutex::new(DepositIndex::new()));
let scripts = WatchedScriptStore::open(store_path, &self.app_client_addr).await?;
self.scripts = Some(scripts);
self.scripts = Arc::new(Mutex::new(Some(scripts)));

self.deposit_buffer = Some(deposit_buffer);

let index = Arc::new(Mutex::new(DepositIndex::new()));
let (server, mut recv) = self.create_address_server(index.clone())?;
let server = self.create_address_server(index.clone())?;

let deposit_relay = async {
loop {
if let Err(e) = self.relay_deposits(&mut recv, index.clone()).await {
if let Err(e) = self.relay_deposits(index.clone()).await {
error!("Deposit relay error: {}", e);
}

tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
};

join!(server, deposit_relay);
let mut seen_mempool_txids = HashSet::new();

let mempool_relay = async {
loop {
if let Err(e) = self
.scan_for_mempool_deposits(index.clone(), &mut seen_mempool_txids)
.await
{
if !e.to_string().contains("No completed checkpoints yet") {
error!("Mempool deposit relay error: {}", e);
}
}

tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
};

join!(server, deposit_relay, mempool_relay);
Ok(())
}

fn create_address_server(
&self,
index: Arc<Mutex<DepositIndex>>,
) -> Result<(impl Future<Output = ()>, Receiver<(Dest, u32)>)> {
let (send, recv) = tokio::sync::mpsc::channel(1024);

) -> Result<impl Future<Output = ()>> {
let sigsets = Arc::new(Mutex::new(BTreeMap::new()));

// TODO: pass into closures more cleanly
Expand All @@ -159,20 +176,23 @@
None => return Err(Error::Relayer("Deposit buffer not set".to_string())),
};

let watched_script_store = self.scripts.clone();
// TODO: configurable listen address
use bytes::Bytes;
use warp::Filter;
let bcast_route = warp::post()
.and(warp::path("address"))
.and(warp::query::<DepositAddress>())
.and(warp::filters::body::bytes())
.map(move |query: DepositAddress, body| (query, send.clone(), sigsets.clone(), body))
.map(move |query: DepositAddress, body| {
(query, sigsets.clone(), body, watched_script_store.clone())
})
.and_then(
move |(query, send, sigsets, body): (
move |(query, sigsets, body, watched_script_store): (
DepositAddress,
tokio::sync::mpsc::Sender<_>,
Arc<Mutex<BTreeMap<_, _>>>,
Bytes,
Arc<Mutex<_>>,
)| {
async move {
let dest = Dest::decode(body.to_vec().as_slice())
Expand Down Expand Up @@ -220,21 +240,28 @@
dest,
sigset.create_time,
query.sigset_index,
send,
watched_script_store,
sigset.clone(),
))
}
},
)
.and_then(
move |(dest, create_time, sigset_index, send): (
move |(dest, create_time, sigset_index, watched_script_store, sigset): (
Dest,
u64,
u32,
tokio::sync::mpsc::Sender<_>,
Arc<Mutex<Option<WatchedScriptStore>>>,
SignatorySet,
)| {
async move {
debug!("Received deposit commitment: {:?}, {}", dest, sigset_index);
send.send((dest, sigset_index)).await.unwrap();
let mut script_guard = watched_script_store.lock().await;
script_guard
.as_mut()
.unwrap()
.insert(dest, &sigset)
.map_err(|e| warp::reject::custom(Error::from(e)))?;

Check failure on line 264 in src/bitcoin/relayer.rs

View workflow job for this annotation

GitHub Actions / clippy

useless conversion to the same type: `error::Error`

error: useless conversion to the same type: `error::Error` --> src/bitcoin/relayer.rs:264:63 | 264 | ... .map_err(|e| warp::reject::custom(Error::from(e)))?; | ^^^^^^^^^^^^^^ help: consider removing `Error::from()`: `e` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_conversion = note: `-D clippy::useless-conversion` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::useless_conversion)]`
let max_deposit_age = app_client(app_client_addr)
.query(|app| Ok(app.bitcoin.config.max_deposit_age))
.await
Expand Down Expand Up @@ -329,23 +356,15 @@
),
)
.run(([0, 0, 0, 0], 8999));
Ok((server, recv))
Ok(server)
}

async fn relay_deposits(
&mut self,
recv: &mut Receiver<(Dest, u32)>,
index: Arc<Mutex<DepositIndex>>,
) -> Result<!> {
async fn relay_deposits(&self, index: Arc<Mutex<DepositIndex>>) -> Result<!> {
let mut prev_tip = None;
let mut seen_mempool_txids = HashSet::new();
let mut seen_mempool_txids: HashSet<Txid> = HashSet::new();

Check failure on line 364 in src/bitcoin/relayer.rs

View workflow job for this annotation

GitHub Actions / clippy

variable does not need to be mutable

error: variable does not need to be mutable --> src/bitcoin/relayer.rs:364:13 | 364 | let mut seen_mempool_txids: HashSet<Txid> = HashSet::new(); | ----^^^^^^^^^^^^^^^^^^ | | | help: remove this `mut` | = note: `-D unused-mut` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(unused_mut)]`

Check failure on line 364 in src/bitcoin/relayer.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `seen_mempool_txids`

error: unused variable: `seen_mempool_txids` --> src/bitcoin/relayer.rs:364:17 | 364 | let mut seen_mempool_txids: HashSet<Txid> = HashSet::new(); | ^^^^^^^^^^^^^^^^^^ help: if this is intentional, prefix it with an underscore: `_seen_mempool_txids` | = note: `-D unused-variables` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(unused_variables)]`

loop {
self.scan_for_mempool_deposits(index.clone(), &mut seen_mempool_txids)
.await?;
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;

self.insert_announced_addrs(recv).await?;
self.remove_expired().await?;

let tip = self.sidechain_block_hash().await?;
let prev = prev_tip.unwrap_or(tip);
Expand All @@ -369,7 +388,7 @@
}

async fn scan_for_deposits(
&mut self,
&self,
num_blocks: usize,
index: Arc<Mutex<DepositIndex>>,
) -> Result<BlockHash> {
Expand All @@ -384,7 +403,7 @@

for (i, block) in blocks.into_iter().enumerate().rev() {
let height = (base_height - i) as u32;
for (tx, matches) in self.relevant_txs(&block) {
for (tx, matches) in self.relevant_txs(&block).await? {
for output in matches {
if let Err(err) = self
.maybe_relay_deposit(tx, height, &block.block_hash(), output, index.clone())
Expand Down Expand Up @@ -421,12 +440,12 @@
let mut script_bytes = vec![];
output.script_pubkey.consensus_encode(&mut script_bytes)?;
let script = ::bitcoin::Script::consensus_decode(&mut script_bytes.as_slice())?;

if self.scripts.is_none() {
let script_guard = self.scripts.lock().await;
if script_guard.is_none() {
return Ok(());
}

if let Some((dest, _)) = self.scripts.as_ref().unwrap().scripts.get(&script) {
if let Some((dest, _)) = script_guard.as_ref().unwrap().scripts.get(&script) {
let bitcoin_address = bitcoin::Address::from_script(
&output.script_pubkey.clone(),
super::NETWORK,
Expand Down Expand Up @@ -577,7 +596,7 @@
info!("Starting recovery tx relay...");

let scripts = WatchedScriptStore::open(store_path, &self.app_client_addr).await?;
self.scripts = Some(scripts);
self.scripts = Arc::new(Mutex::new(Some(scripts)));

loop {
if let Err(e) = self.relay_recovery_txs().await {
Expand Down Expand Up @@ -687,9 +706,9 @@
if err.to_string().contains("No completed checkpoints yet") {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
continue;
} else {
return Err(err.into());
}

return Err(err.into());
}
}
};
Expand Down Expand Up @@ -769,27 +788,12 @@
Ok(None)
}

async fn insert_announced_addrs(&mut self, recv: &mut Receiver<(Dest, u32)>) -> Result<()> {
while let Ok((addr, sigset_index)) = recv.try_recv() {
let sigset_res = app_client(&self.app_client_addr)
.query(|app| Ok(app.bitcoin.checkpoints.get(sigset_index)?.sigset.clone()))
.await;
let sigset = match sigset_res {
Ok(sigset) => sigset,
Err(err) => {
error!("{}", err);
continue;
}
};

self.scripts.as_mut().unwrap().insert(addr, &sigset)?;
}

async fn remove_expired(&self) -> Result<()> {
let max_age = app_client(&self.app_client_addr)
.query(|app| Ok(app.bitcoin.checkpoints.config.max_age))
.await?;

self.scripts
let mut script_guard = self.scripts.lock().await;
script_guard
.as_mut()
.unwrap()
.scripts
Expand Down Expand Up @@ -817,43 +821,44 @@
Ok(blocks)
}

pub fn relevant_txs<'a>(
pub async fn relevant_txs<'a>(
&'a self,
block: &'a Block,
) -> impl Iterator<Item = (&'a Transaction, impl Iterator<Item = OutputMatch> + 'a)> + 'a {
block
.txdata
.iter()
.map(move |tx| (tx, self.relevant_outputs(tx)))
) -> Result<impl Iterator<Item = (&'a Transaction, impl Iterator<Item = OutputMatch> + 'a)> + 'a>
{
let mut txs = Vec::new();
for tx in block.txdata.iter() {
txs.push((tx, self.relevant_outputs(tx).await?));
}

Ok(txs.into_iter())
}

pub fn relevant_outputs<'a>(
pub async fn relevant_outputs<'a>(
&'a self,
tx: &'a Transaction,
) -> impl Iterator<Item = OutputMatch> + 'a {
tx.output
.iter()
.enumerate()
.filter_map(move |(vout, output)| {
let mut script_bytes = vec![];
output
.script_pubkey
.consensus_encode(&mut script_bytes)
.unwrap();
let script =
::bitcoin::Script::consensus_decode(&mut script_bytes.as_slice()).unwrap();
) -> Result<impl Iterator<Item = OutputMatch> + 'a> {
let mut matches = Vec::new();
for (vout, output) in tx.output.iter().enumerate() {
let mut script_bytes = vec![];
let encode: usize = output

Check failure on line 844 in src/bitcoin/relayer.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `encode`

error: unused variable: `encode` --> src/bitcoin/relayer.rs:844:17 | 844 | let encode: usize = output | ^^^^^^ help: if this is intentional, prefix it with an underscore: `_encode`
.script_pubkey
.consensus_encode(&mut script_bytes)
.unwrap();
let script = ::bitcoin::Script::consensus_decode(&mut script_bytes.as_slice()).unwrap();

let script_guard = self.scripts.lock().await;
if let Some((dest, sigset_index)) = script_guard.as_ref().unwrap().scripts.get(&script)
{
matches.push(OutputMatch {
sigset_index,
vout: vout as u32,
dest,
});
}
}

self.scripts
.as_ref()
.unwrap()
.scripts
.get(&script)
.map(|(dest, sigset_index)| OutputMatch {
sigset_index,
vout: vout as u32,
dest,
})
})
Ok(matches.into_iter())
}

async fn maybe_relay_deposit(
Expand Down
Loading
Loading