Skip to content

Commit

Permalink
Test querying missing messages
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Jan 15, 2025
1 parent 2a326ca commit 3900f23
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 26 deletions.
79 changes: 61 additions & 18 deletions src/ckb/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,58 @@ pub enum CellStatus {
Consumed,
}

// The problem of channel announcement is that each nodes will query the block timestamp
// and use it as the channel announcement timestamp.
// Guaranteeing the block timestamp is the same across all nodes is important
// because if a node A has a greater channel announcement timestamp than node B, then when
// A tries to get broadcast messages after this channel announcement timestamp, B will return
// the channel announcement. But for A, it is not a later broadcast message. This process will
// cause an infinite loop.
// So here we create an static lock which is shared across all nodes, and we use this lock to
// guarantee that the block timestamp is the same across all nodes.
pub static BLOCK_TIMESTAMP_CONTEXT: OnceCell<TokioRwLock<BlockTimestampContext>> = OnceCell::new();

#[derive(Default)]
pub struct BlockTimestampContext {
pub timestamps: HashMap<H256, u64>,
// If non empty, we will use this
pub next_timestamp: Option<u64>,
}

pub fn get_block_timestamp(block: H256) -> u64 {
let context = BLOCK_TIMESTAMP_CONTEXT.get_or_init(|| TokioRwLock::new(Default::default()));
let mut context = context
.try_write()
.expect("get block timestamp context lock");
context.get_block_timestamp(block)
}

pub fn set_next_block_timestamp(next_timestamp: u64) {
let context = BLOCK_TIMESTAMP_CONTEXT.get_or_init(|| TokioRwLock::new(Default::default()));
let mut context = context
.try_write()
.expect("get block timestamp context lock");
context.set_next_block_timestamp(next_timestamp);
}

impl BlockTimestampContext {
fn get_block_timestamp(&mut self, block: H256) -> u64 {
if let Some(timestamp) = self.timestamps.get(&block) {
return *timestamp;
}
let timestamp = self
.next_timestamp
.take()
.unwrap_or(now_timestamp_as_millis_u64());
self.timestamps.insert(block, timestamp);
return timestamp;
}

fn set_next_block_timestamp(&mut self, next_timestamp: u64) {
self.next_timestamp = Some(next_timestamp);
}
}

pub static MOCK_CONTEXT: Lazy<RwLock<MockContext>> = Lazy::new(|| RwLock::new(MockContext::new()));

pub struct MockContext {
Expand Down Expand Up @@ -492,24 +544,7 @@ impl Actor for MockChainActor {
};
}
GetBlockTimestamp(request, rpc_reply_port) => {
// The problem of channel announcement is that each nodes will query the block timestamp
// and use it as the channel announcement timestamp.
// Guaranteeing the block timestamp is the same across all nodes is important
// because if a node A has a greater channel announcement timestamp than node B, then when
// A tries to get broadcast messages after this channel announcement timestamp, B will return
// the channel announcement. But for A, it is not a later broadcast message. This process will
// cause an infinite loop.
// So here we create an static lock which is shared across all nodes, and we use this lock to
// guarantee that the block timestamp is the same across all nodes.
static BLOCK_TIMESTAMP: OnceCell<TokioRwLock<HashMap<H256, u64>>> = OnceCell::new();
BLOCK_TIMESTAMP.get_or_init(|| TokioRwLock::new(HashMap::new()));
let timestamp = *BLOCK_TIMESTAMP
.get()
.unwrap()
.write()
.await
.entry(request.block_hash())
.or_insert(now_timestamp_as_millis_u64());
let timestamp = get_block_timestamp(request.block_hash());

let _ = rpc_reply_port.send(Ok(Some(timestamp)));
}
Expand Down Expand Up @@ -587,3 +622,11 @@ pub async fn get_tx_from_hash(
.map(|tx| Transaction::from(tx.inner).into_view())
.ok_or(anyhow!("tx not found in trace tx response"))
}

#[test]
fn test_set_and_get_block_timestamp() {
let now = now_timestamp_as_millis_u64();
set_next_block_timestamp(now);
let timestamp = get_block_timestamp(H256::default());
assert_eq!(timestamp, now);
}
10 changes: 10 additions & 0 deletions src/fiber/tests/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,14 @@ async fn test_saving_channel_update_after_saving_channel_announcement() {
42,
42,
42,
None,
),
channel_context.create_channel_update_of_node2(
ChannelUpdateChannelFlags::empty(),
42,
42,
42,
None,
),
] {
context.save_message(BroadcastMessage::ChannelUpdate(channel_update.clone()));
Expand All @@ -300,12 +302,14 @@ async fn test_saving_channel_update_before_saving_channel_announcement() {
42,
42,
42,
None,
),
channel_context.create_channel_update_of_node2(
ChannelUpdateChannelFlags::empty(),
42,
42,
42,
None,
),
] {
context.save_message(BroadcastMessage::ChannelUpdate(channel_update.clone()));
Expand Down Expand Up @@ -357,12 +361,14 @@ async fn test_saving_invalid_channel_update() {
42,
42,
42,
None,
),
channel_context.create_channel_update_of_node2(
ChannelUpdateChannelFlags::empty(),
42,
42,
42,
None,
),
] {
channel_update.signature = Some(create_invalid_ecdsa_signature());
Expand Down Expand Up @@ -398,12 +404,14 @@ async fn test_saving_channel_update_independency() {
42,
42,
42,
None,
),
channel_context.create_channel_update_of_node2(
ChannelUpdateChannelFlags::empty(),
42,
42,
42,
None,
),
] {
if channel_update.is_update_of_node_1() && node1_has_invalid_signature {
Expand Down Expand Up @@ -487,12 +495,14 @@ async fn test_saving_channel_update_with_invalid_channel_announcement() {
42,
42,
42,
None,
),
channel_context.create_channel_update_of_node2(
ChannelUpdateChannelFlags::empty(),
42,
42,
42,
None,
),
] {
context.save_message(BroadcastMessage::ChannelUpdate(channel_update.clone()));
Expand Down
89 changes: 83 additions & 6 deletions src/fiber/tests/network.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use super::test_utils::{init_tracing, NetworkNode};
use crate::{
ckb::tests::test_utils::set_next_block_timestamp,
fiber::{
channel::ShutdownInfo,
config::DEFAULT_TLC_EXPIRY_DELTA,
graph::ChannelUpdateInfo,
network::{NetworkActorStateStore, SendPaymentCommand, SendPaymentData},
tests::test_utils::NetworkNodeConfigBuilder,
types::{
BroadcastMessage, ChannelAnnouncement, ChannelUpdateChannelFlags, NodeAnnouncement,
Privkey, Pubkey,
BroadcastMessage, BroadcastMessageWithTimestamp, ChannelAnnouncement,
ChannelUpdateChannelFlags, NodeAnnouncement, Privkey, Pubkey,
},
NetworkActorCommand, NetworkActorEvent, NetworkActorMessage,
},
Expand Down Expand Up @@ -272,8 +273,13 @@ async fn test_node1_node2_channel_update() {
.expect("send message to network actor");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let channel_update_of_node1 =
channel_context.create_channel_update_of_node1(ChannelUpdateChannelFlags::empty(), 1, 1, 1);
let channel_update_of_node1 = channel_context.create_channel_update_of_node1(
ChannelUpdateChannelFlags::empty(),
1,
1,
1,
None,
);
node.network_actor
.send_message(NetworkActorMessage::Event(
NetworkActorEvent::GossipMessage(
Expand All @@ -291,8 +297,13 @@ async fn test_node1_node2_channel_update() {
Some(ChannelUpdateInfo::from(&channel_update_of_node1))
);

let channel_update_of_node2 =
channel_context.create_channel_update_of_node2(ChannelUpdateChannelFlags::empty(), 2, 2, 2);
let channel_update_of_node2 = channel_context.create_channel_update_of_node2(
ChannelUpdateChannelFlags::empty(),
2,
2,
2,
None,
);
node.network_actor
.send_message(NetworkActorMessage::Event(
NetworkActorEvent::GossipMessage(
Expand Down Expand Up @@ -342,6 +353,7 @@ async fn test_channel_update_version() {
i.into(),
i.into(),
i.into(),
None,
))
}
let [channel_update_1, channel_update_2, channel_update_3] =
Expand Down Expand Up @@ -398,6 +410,71 @@ async fn test_channel_update_version() {
);
}

#[tokio::test]
async fn test_query_missing_broadcast_message() {
let channel_context = ChannelTestContext::gen();
let funding_tx = channel_context.funding_tx.clone();
let out_point = channel_context.channel_outpoint().clone();
let channel_announcement = channel_context.channel_announcement.clone();
// A timestamp so large that the other node will unlikely try to send GetBroadcastMessages
// with timestamp smaller than this one.
let long_long_time_ago = now_timestamp_as_millis_u64() - 30 * 24 * 3600 * 100;

let mut node1 = NetworkNode::new().await;
set_next_block_timestamp(long_long_time_ago);
node1.submit_tx(funding_tx.clone()).await;
node1
.network_actor
.send_message(NetworkActorMessage::Event(
NetworkActorEvent::GossipMessage(
get_test_peer_id(),
BroadcastMessage::ChannelAnnouncement(channel_announcement)
.create_broadcast_messages_filter_result(),
),
))
.expect("send message to network actor");
let channel_update = channel_context.create_channel_update_of_node1(
ChannelUpdateChannelFlags::empty(),
1,
1,
1,
Some(long_long_time_ago + 10),
);
node1
.network_actor
.send_message(NetworkActorMessage::Event(
NetworkActorEvent::GossipMessage(
get_test_peer_id(),
BroadcastMessage::ChannelUpdate(channel_update.clone())
.create_broadcast_messages_filter_result(),
),
))
.expect("send message to network actor");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let node1_channel_info = node1.get_network_graph_channel(&out_point).await.unwrap();
assert_ne!(node1_channel_info.update_of_node1, None);

let mut node2 = NetworkNode::new().await;
node1.connect_to(&node2).await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// Verify that node2 still does not have channel info after active syncing done.
let node2_channel_info = node2.get_network_graph_channel(&out_point).await;
assert_eq!(node2_channel_info, None);

node1
.network_actor
.send_message(NetworkActorMessage::Command(
NetworkActorCommand::BroadcastMessages(vec![
BroadcastMessageWithTimestamp::ChannelUpdate(channel_update.clone()),
]),
))
.expect("send message to network actor");
node2.submit_tx(funding_tx.clone()).await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let node2_channel_info = node2.get_network_graph_channel(&out_point).await.unwrap();
assert_eq!(node1_channel_info, node2_channel_info);
}

#[tokio::test]
async fn test_sync_node_announcement_version() {
init_tracing();
Expand Down
8 changes: 6 additions & 2 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,12 @@ impl ChannelTestContext {
tlc_expiry_delta: u64,
tlc_minimum_value: u128,
tlc_fee_proportional_millionths: u128,
timestamp: Option<u64>,
) -> ChannelUpdate {
let timestamp = timestamp.unwrap_or(now_timestamp_as_millis_u64());
let mut unsigned_channel_update = ChannelUpdate::new_unsigned(
self.channel_announcement.channel_outpoint.clone(),
now_timestamp_as_millis_u64(),
timestamp,
ChannelUpdateMessageFlags::UPDATE_OF_NODE1,
channel_flags,
tlc_expiry_delta,
Expand All @@ -158,10 +160,12 @@ impl ChannelTestContext {
tlc_expiry_delta: u64,
tlc_minimum_value: u128,
tlc_fee_proportional_millionths: u128,
timestamp: Option<u64>,
) -> ChannelUpdate {
let timestamp = timestamp.unwrap_or(now_timestamp_as_millis_u64());
let mut unsigned_channel_update = ChannelUpdate::new_unsigned(
self.channel_announcement.channel_outpoint.clone(),
now_timestamp_as_millis_u64(),
timestamp,
ChannelUpdateMessageFlags::UPDATE_OF_NODE2,
channel_flags,
tlc_expiry_delta,
Expand Down

0 comments on commit 3900f23

Please sign in to comment.