Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query missing broadcast messages from peers #428

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 58 additions & 18 deletions src/ckb/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,55 @@ pub enum CellStatus {
Consumed,
}

// The problem of channel announcement is that each nodes will query the block timestamp
// and use it as the channel announcement timestamp.
// Guaranteeing the block timestamp is the same across all nodes is important
// because if a node A has a greater channel announcement timestamp than node B, then when
// A tries to get broadcast messages after this channel announcement timestamp, B will return
// the channel announcement. But for A, it is not a later broadcast message. This process will
// cause an infinite loop.
// So here we create an static block timestamp context which is shared across all nodes,
// and we use this context to guarantee that the block timestamp is the same across all nodes.
pub static BLOCK_TIMESTAMP_CONTEXT: OnceCell<TokioRwLock<BlockTimestampContext>> = OnceCell::new();

#[derive(Default)]
pub struct BlockTimestampContext {
pub timestamps: HashMap<H256, u64>,
// If non empty, we will use this as the timestamp for the next block.
// This is normally used to mock an ancient block.
pub next_timestamp: Option<u64>,
}

pub async fn get_block_timestamp(block: H256) -> u64 {
let context = BLOCK_TIMESTAMP_CONTEXT.get_or_init(|| TokioRwLock::new(Default::default()));
let mut context = context.write().await;
context.get_block_timestamp(block)
}

pub async fn set_next_block_timestamp(next_timestamp: u64) {
let context = BLOCK_TIMESTAMP_CONTEXT.get_or_init(|| TokioRwLock::new(Default::default()));
let mut context = context.write().await;
context.set_next_block_timestamp(next_timestamp);
}

impl BlockTimestampContext {
fn get_block_timestamp(&mut self, block: H256) -> u64 {
if let Some(timestamp) = self.timestamps.get(&block) {
return *timestamp;
}
let timestamp = self
.next_timestamp
.take()
.unwrap_or(now_timestamp_as_millis_u64());
self.timestamps.insert(block, timestamp);
return timestamp;
}

fn set_next_block_timestamp(&mut self, next_timestamp: u64) {
self.next_timestamp = Some(next_timestamp);
}
}

pub static MOCK_CONTEXT: Lazy<RwLock<MockContext>> = Lazy::new(|| RwLock::new(MockContext::new()));

pub struct MockContext {
Expand Down Expand Up @@ -492,24 +541,7 @@ impl Actor for MockChainActor {
};
}
GetBlockTimestamp(request, rpc_reply_port) => {
// The problem of channel announcement is that each nodes will query the block timestamp
// and use it as the channel announcement timestamp.
// Guaranteeing the block timestamp is the same across all nodes is important
// because if a node A has a greater channel announcement timestamp than node B, then when
// A tries to get broadcast messages after this channel announcement timestamp, B will return
// the channel announcement. But for A, it is not a later broadcast message. This process will
// cause an infinite loop.
// So here we create an static lock which is shared across all nodes, and we use this lock to
// guarantee that the block timestamp is the same across all nodes.
static BLOCK_TIMESTAMP: OnceCell<TokioRwLock<HashMap<H256, u64>>> = OnceCell::new();
BLOCK_TIMESTAMP.get_or_init(|| TokioRwLock::new(HashMap::new()));
let timestamp = *BLOCK_TIMESTAMP
.get()
.unwrap()
.write()
.await
.entry(request.block_hash())
.or_insert(now_timestamp_as_millis_u64());
let timestamp = get_block_timestamp(request.block_hash()).await;

let _ = rpc_reply_port.send(Ok(Some(timestamp)));
}
Expand Down Expand Up @@ -587,3 +619,11 @@ pub async fn get_tx_from_hash(
.map(|tx| Transaction::from(tx.inner).into_view())
.ok_or(anyhow!("tx not found in trace tx response"))
}

#[tokio::test]
async fn test_set_and_get_block_timestamp() {
let now = now_timestamp_as_millis_u64();
set_next_block_timestamp(now).await;
let timestamp = get_block_timestamp(H256::default()).await;
assert_eq!(timestamp, now);
}
50 changes: 6 additions & 44 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ use crate::{
},
serde_utils::{CompactSignatureAsBytes, EntityHex, PubNonceAsBytes},
types::{
AcceptChannel, AddTlc, AnnouncementSignatures, BroadcastMessageQuery,
BroadcastMessageQueryFlags, ChannelAnnouncement, ChannelReady, ChannelUpdate,
ClosingSigned, CommitmentSigned, EcdsaSignature, FiberChannelMessage, FiberMessage,
Hash256, OpenChannel, PaymentOnionPacket, PeeledPaymentOnionPacket, Privkey, Pubkey,
ReestablishChannel, RemoveTlc, RemoveTlcFulfill, RemoveTlcReason, RevokeAndAck,
Shutdown, TlcErr, TlcErrPacket, TlcErrorCode, TxCollaborationMsg, TxComplete, TxUpdate,
NO_SHARED_SECRET,
AcceptChannel, AddTlc, AnnouncementSignatures, ChannelAnnouncement, ChannelReady,
ChannelUpdate, ClosingSigned, CommitmentSigned, EcdsaSignature, FiberChannelMessage,
FiberMessage, Hash256, OpenChannel, PaymentOnionPacket, PeeledPaymentOnionPacket,
Privkey, Pubkey, ReestablishChannel, RemoveTlc, RemoveTlcFulfill, RemoveTlcReason,
RevokeAndAck, Shutdown, TlcErr, TlcErrPacket, TlcErrorCode, TxCollaborationMsg,
TxComplete, TxUpdate, NO_SHARED_SECRET,
},
NetworkActorCommand, NetworkActorEvent, NetworkActorMessage, ASSUME_NETWORK_ACTOR_ALIVE,
},
Expand Down Expand Up @@ -5646,43 +5645,6 @@ impl ChannelActorState {
]),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);

// Note that there is a racing condition here. The peer may have not finished
// generating the channel update message yet. In order to reliably query the
// peer for the channel update message, we may to retry the query a few times.
let peer_id = self.get_remote_peer_id();
let queries = if self.local_is_node1() {
vec![
BroadcastMessageQuery {
channel_outpoint: self.must_get_funding_transaction_outpoint(),
flags: BroadcastMessageQueryFlags::ChannelUpdateOfNode2,
},
BroadcastMessageQuery {
channel_outpoint: self.must_get_funding_transaction_outpoint(),
flags: BroadcastMessageQueryFlags::NodeAnnouncementNode2,
},
]
} else {
vec![
BroadcastMessageQuery {
channel_outpoint: self.must_get_funding_transaction_outpoint(),
flags: BroadcastMessageQueryFlags::ChannelUpdateOfNode1,
},
BroadcastMessageQuery {
channel_outpoint: self.must_get_funding_transaction_outpoint(),
flags: BroadcastMessageQueryFlags::NodeAnnouncementNode1,
},
]
};
debug!(
"Querying for channel update and node announcement messages from {:?}",
&peer_id
);
network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::QueryBroadcastMessages(peer_id, queries),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
}
}

Expand Down
Loading
Loading