Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add block_headers and blocks streams full APIs (w/ checkpoint and reorg handling) #10

Merged
merged 18 commits into from
Aug 11, 2022
Merged
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
67e5f0a
feature: implement an initial mempool.space websocket client, and lib…
oleonardolima Jul 7, 2022
7816922
feature: add unit and integration tests to block events from #8
oleonardolima Jul 7, 2022
2b4f04a
feat+test: add initial http client and tests
oleonardolima Jun 27, 2022
c6f3505
wip(feat): add initial approach for subscribing to blocks from a star…
oleonardolima Jun 27, 2022
71778c0
wip(feat): fix fn and use a tuple as return type for prev and new blo…
oleonardolima Jun 27, 2022
1a4bf12
wip(refactor): initial updates for fns refactor and architecture change
oleonardolima Jun 29, 2022
220be90
wip(feat+refactor): add fn to process candidate BlockHeaders, handle …
oleonardolima Jul 1, 2022
0ce755d
wip(refactor+docs): extract fns to cache struct, and add documentatio…
oleonardolima Jul 5, 2022
3902f51
wip(fixes+tests): fixes, improve error handling and add new integrati…
oleonardolima Jul 6, 2022
d199590
wip(fix+test): fix common ancestor and fork branch bug, add new reorg…
oleonardolima Jul 6, 2022
d6b282f
fix: disconnected and connected block events emitting order for reorg…
oleonardolima Jul 6, 2022
3c8191b
chore: simplify cli usage and output
oleonardolima Jul 6, 2022
2ad93bc
chore: update docs and readme examples, use u32 instead of u64
oleonardolima Jul 6, 2022
f8ce1ef
fix: remaining cargo.toml conflicts from other branches
oleonardolima Jul 7, 2022
2b9e908
feat: add full block stream api, better error handling
oleonardolima Jul 26, 2022
4148fc1
refactor: do not use features expect two base_url instead, and improv…
oleonardolima Aug 4, 2022
fa4775f
chore: add and update CHANGELOG.md file
oleonardolima Aug 9, 2022
c52c1b4
fix(test): docs and integration tests
oleonardolima Aug 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip(fix+test): fix common ancestor and fork branch bug, add new reorg…
… tests
oleonardolima committed Jul 7, 2022
commit d199590196b7539035bf0adcc7e6adaca49b85db
11 changes: 6 additions & 5 deletions src/bin.rs
Original file line number Diff line number Diff line change
@@ -76,11 +76,12 @@ async fn main() -> anyhow::Result<()> {
// async fetch the data stream through the lib
let block_events = block_events::subscribe_to_blocks(
base_url.as_str(),
Some((
102,
BlockHash::from_str("3d3ab0efd0f8f0eb047d9e77f7ad7c6b6791c896bd8a21437da555670f799e08")
.unwrap(),
)),
// Some((
// 102,
// BlockHash::from_str("3d3ab0efd0f8f0eb047d9e77f7ad7c6b6791c896bd8a21437da555670f799e08")
// .unwrap(),
// )),
None,
)
.await?;

37 changes: 23 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -85,12 +85,13 @@ impl BlockHeadersCache {
let mut common_ancestor = branch_candidate;
let mut fork_branch: VecDeque<BlockExtended> = VecDeque::new();
while !self.active_headers.contains_key(&common_ancestor.id) {
log::debug!("{:?}", common_ancestor);
fork_branch.push_back(common_ancestor);
common_ancestor = http_client
._get_block(common_ancestor.prev_blockhash)
.await?;
}
log::debug!("[common_ancestor] {:?}", common_ancestor);
log::debug!("[fork_branch] {:?}", fork_branch);
Ok((common_ancestor, fork_branch))
}

@@ -108,8 +109,10 @@ impl BlockHeadersCache {
all_disconnected.push_back(disconnected_header);
self.stale_headers
.insert(disconnected_hash, disconnected_header);
self.tip = block.id;
self.tip = disconnected_header.prev_blockhash;
}
log::info!("[all_disconnected] {:?}", all_disconnected);
log::info!("[self.tip] {:?}", self.tip);
Ok(all_disconnected)
}

@@ -119,13 +122,15 @@ impl BlockHeadersCache {
pub fn apply_fork_chain(
&mut self,
mut fork_branch: VecDeque<BlockExtended>,
) -> anyhow::Result<BlockHash> {
) -> anyhow::Result<(BlockHash, VecDeque<BlockExtended>)> {
let mut connected = VecDeque::new();
while !fork_branch.is_empty() {
let block = fork_branch.pop_front().unwrap();
connected.push_back(block);
self.active_headers.insert(block.id, block);
self.tip = block.id;
}
Ok(self.tip)
Ok((self.tip, connected))
}
}

@@ -135,14 +140,17 @@ pub async fn subscribe_to_blocks(
checkpoint: Option<(u64, BlockHash)>,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockEvent>>>> {
let http_client = http::HttpClient::new(base_url, DEFAULT_CONCURRENT_REQUESTS);

let current_tip = match checkpoint {
Some((height, _)) => height - 1,
_ => http_client._get_height().await?,
};

let cache = BlockHeadersCache {
tip: http_client
._get_block_height(http_client._get_height().await?)
.await?,
tip: http_client._get_block_height(current_tip).await?,
active_headers: HashMap::new(),
stale_headers: HashMap::new(),
};
log::debug!("[BlockHeadersCache] {:?}", cache);

match checkpoint {
Some(checkpoint) => {
@@ -193,10 +201,16 @@ async fn process_candidates(
for disconnected in cache.rollback_active_chain(common_ancestor).await.unwrap().iter() {
yield BlockEvent::Disconnected((disconnected.height, disconnected.id));
}
// TODO: (@leonardo.lima) fix order of return

// iterate over forked chain candidates
// update [`Cache`] active_headers field with candidates
let _ = cache.apply_fork_chain(fork_chain);
let (_, connected) = cache.apply_fork_chain(fork_chain).unwrap();
for block in connected {
yield BlockEvent::Connected(BlockHeader::from(block.clone()));
}
// TODO: (@leonardo.lima fix order of return)

}
};
Ok(stream)
@@ -219,18 +233,13 @@ pub async fn fetch_blocks(
let mut tip = http_client._get_height().await?;
let mut height = ckpt_height;

log::debug!("tip: {}", tip);
log::debug!("ckpt: {:?}", checkpoint);

let mut interval = Instant::now(); // it should try to update the tip every 5 minutes.
let stream = stream! {
while height <= tip {
let hash = http_client._get_block_height(height).await.unwrap();
let block = http_client._get_block(hash).await.unwrap();

height += 1;
log::debug!("height: {}", height);
log::debug!("block: {:?}", block);

if interval.elapsed() >= Duration::from_secs(300) {
interval = Instant::now();
110 changes: 98 additions & 12 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -385,17 +385,6 @@ async fn test_block_events_stream_with_checkpoint() {
.await
.unwrap();

// std::thread::sleep(delay);
// // generate 5 new blocks through bitcoind rpc-client
// let mut generated_blocks = VecDeque::from(
// rpc_client
// .generate_to_address(3, &rpc_client.get_new_address(None, None).unwrap())
// .unwrap(),
// );

// insert the first blocks starting from checkpoint
// first_blocks.append(&mut generated_blocks);

// consume new blocks from block-events stream
pin_mut!(block_events);
while !first_blocks.is_empty() {
@@ -417,4 +406,101 @@ async fn test_block_events_stream_with_checkpoint() {

#[tokio::test]
#[serial]
async fn test_block_events_stream_with_reorg() {}
async fn test_block_events_stream_with_reorg() {
let _ = env_logger::try_init();
let delay = Duration::from_millis(5000);

let docker = clients::Cli::docker();
let client = MempoolTestClient::default();

let _mariadb = docker.run(client.mariadb_database);

std::thread::sleep(delay); // there is some delay between running the docker and the port being really available
let mempool = docker.run(client.mempool_backend);

// get block-events stream
let block_events = block_events::subscribe_to_blocks(
build_base_url(mempool.get_host_port_ipv4(mempool.get_host_port_ipv4(8999))).as_str(),
None,
)
.await
.unwrap();

// initiate bitcoind client
let rpc_client = &client.bitcoind.client;

// generate 5 new blocks through bitcoind rpc-client
let generated_blocks = VecDeque::from(
rpc_client
.generate_to_address(5, &rpc_client.get_new_address(None, None).unwrap())
.unwrap(),
);
let mut new_blocks = generated_blocks.clone();

// consume new blocks from block-events stream
pin_mut!(block_events);
while !new_blocks.is_empty() {
let block_hash = new_blocks.pop_front().unwrap();
let block_event = block_events.next().await.unwrap();

// should produce a BlockEvent::Connected result for each block event
assert!(matches!(block_event, BlockEvent::Connected { .. }));

// should parse the BlockEvent::Connected successfully
let connected_block = match block_event {
BlockEvent::Connected(block) => block,
_ => unreachable!("This test is supposed to have only connected blocks, please check why it's generating disconnected and/or errors at the moment."),
};
assert_eq!(block_hash.to_owned(), connected_block.block_hash());
}

// invalidate last 2 blocks
let mut invalidated_blocks = VecDeque::new();
for block in generated_blocks.range(3..) {
rpc_client.invalidate_block(block).unwrap();
invalidated_blocks.push_front(block);
}
log::debug!("invalidated_blocks {:?}", invalidated_blocks);

// generate 2 new blocks
let mut new_blocks = VecDeque::from(
rpc_client
.generate_to_address(3, &rpc_client.get_new_address(None, None).unwrap())
.unwrap(),
);
log::debug!("new_blocks {:?}", new_blocks);

// should disconnect invalidated blocks
while !invalidated_blocks.is_empty() {
log::info!("len {:?}", invalidated_blocks.len());
let invalidated = invalidated_blocks.pop_front().unwrap();
let block_event = block_events.next().await.unwrap();

log::info!("{:?}", block_event);
// should produce a BlockEvent::Connected result for each block event
assert!(matches!(block_event, BlockEvent::Disconnected(..)));

// should parse the BlockEvent::Connected successfully
let disconnected = match block_event {
BlockEvent::Disconnected((_, hash)) => hash,
_ => unreachable!("This test is supposed to have only connected blocks, please check why it's generating disconnected and/or errors at the moment."),
};
assert_eq!(invalidated.to_owned(), disconnected);
}

// should connect the new created blocks
while !new_blocks.is_empty() {
let new_block = new_blocks.pop_front().unwrap();
let block_event = block_events.next().await.unwrap();

// should produce a BlockEvent::Connected result for each block event
assert!(matches!(block_event, BlockEvent::Connected { .. }));

// should parse the BlockEvent::Connected successfully
let connected = match block_event {
BlockEvent::Connected(block) => block.block_hash(),
_ => unreachable!("This test is supposed to have only connected blocks, please check why it's generating disconnected and/or errors at the moment."),
};
assert_eq!(new_block.to_owned(), connected);
}
}