Skip to content

Commit

Permalink
Add BlockAddedHeader notification (kaspanet#416)
Browse files Browse the repository at this point in the history
BlockAddedHeader notification is the slim version of
BlockAdded notifaction, but without tx data
  • Loading branch information
gvbgduh committed Mar 11, 2024
1 parent e9cf674 commit 36aa8c2
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 4 deletions.
21 changes: 21 additions & 0 deletions consensus/notify/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub enum Notification {

#[display(fmt = "NewBlockTemplate notification")]
NewBlockTemplate(NewBlockTemplateNotification),

#[display(fmt = "BlockAddedHeader notification: block hash {}", "_0.block.header.hash")]
BlockAddedHeader(BlockAddedHeaderNotification),
}
}

Expand Down Expand Up @@ -78,6 +81,13 @@ impl NotificationTrait for Notification {
Some(self.clone())
}

fn apply_block_added_header_subscription(&self, subscription: &OverallSubscription) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn event_type(&self) -> EventType {
self.into()
}
Expand Down Expand Up @@ -172,3 +182,14 @@ pub struct PruningPointUtxoSetOverrideNotification {}

#[derive(Debug, Clone)]
pub struct NewBlockTemplateNotification {}

#[derive(Debug, Clone)]
pub struct BlockAddedHeaderNotification {
pub block: Block,
}

impl BlockAddedHeaderNotification {
pub fn new(block: Block) -> Self {
Self { block: Block::from_header_arc(block.header) }
}
}
7 changes: 7 additions & 0 deletions indexes/core/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ impl NotificationTrait for Notification {
}
}

fn apply_block_added_header_subscription(&self, subscription: &OverallSubscription) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn event_type(&self) -> EventType {
self.into()
}
Expand Down
4 changes: 4 additions & 0 deletions notify/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ mod tests {
fn event_type(&self) -> EventType {
unimplemented!()
}

fn apply_block_added_header_subscription(&self, _: &OverallSubscription) -> Option<Self> {
unimplemented!()
}
}

#[tokio::test]
Expand Down
3 changes: 2 additions & 1 deletion notify/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ pub enum EventType {
VirtualDaaScoreChanged,
PruningPointUtxoSetOverride,
NewBlockTemplate,
BlockAddedHeader,
}
}

pub const EVENT_COUNT: usize = 9;
pub const EVENT_COUNT: usize = 10;

/// Generic array with [`EventType`] strongly-typed index
#[derive(Default, Clone, Copy, Debug)]
Expand Down
27 changes: 27 additions & 0 deletions notify/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub trait Notification: Clone + Debug + Display + Send + Sync + 'static {
}
}

fn apply_block_added_header_subscription(&self, subscription: &OverallSubscription) -> Option<Self>;

fn event_type(&self) -> EventType;
}

Expand Down Expand Up @@ -93,6 +95,11 @@ pub mod test_helpers {
pub addresses: Arc<Vec<Address>>,
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct BlockAddedHeaderNotification {
pub data: u64,
}

full_featured! {
#[derive(Clone, Debug, Display, PartialEq, Eq)]
pub enum TestNotification {
Expand All @@ -102,6 +109,8 @@ pub mod test_helpers {
VirtualChainChanged(VirtualChainChangedNotification),
#[display(fmt = "UtxosChanged #{}", "_0.data")]
UtxosChanged(UtxosChangedNotification),
#[display(fmt = "BlockAddedHeader #{}", "_0.data")]
BlockAddedHeader(BlockAddedHeaderNotification),
}
}

Expand Down Expand Up @@ -154,6 +163,13 @@ pub mod test_helpers {
}
}

fn apply_block_added_header_subscription(&self, subscription: &OverallSubscription) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn event_type(&self) -> EventType {
self.into()
}
Expand Down Expand Up @@ -191,12 +207,22 @@ pub mod test_helpers {
&mut self.data
}
}
impl Data for BlockAddedHeaderNotification {
fn data(&self) -> u64 {
self.data
}

fn data_mut(&mut self) -> &mut u64 {
&mut self.data
}
}
impl Data for TestNotification {
fn data(&self) -> u64 {
match self {
TestNotification::BlockAdded(n) => n.data(),
TestNotification::VirtualChainChanged(n) => n.data(),
TestNotification::UtxosChanged(n) => n.data(),
TestNotification::BlockAddedHeader(n) => n.data(),
}
}

Expand All @@ -205,6 +231,7 @@ pub mod test_helpers {
TestNotification::BlockAdded(n) => n.data_mut(),
TestNotification::VirtualChainChanged(n) => n.data_mut(),
TestNotification::UtxosChanged(n) => n.data_mut(),
TestNotification::BlockAddedHeader(n) => n.data_mut(),
}
}
}
Expand Down
64 changes: 62 additions & 2 deletions notify/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ pub mod test_helpers {
address::test_helpers::get_3_addresses,
connection::ChannelConnection,
notification::test_helpers::{
BlockAddedNotification, Data, TestNotification, UtxosChangedNotification, VirtualChainChangedNotification,
BlockAddedNotification, Data, TestNotification, UtxosChangedNotification, VirtualChainChangedNotification, BlockAddedHeaderNotification,
},
scope::{BlockAddedScope, UtxosChangedScope, VirtualChainChangedScope},
scope::{BlockAddedScope, UtxosChangedScope, VirtualChainChangedScope, BlockAddedHeaderScope},
subscriber::test_helpers::SubscriptionMessage,
};
use async_channel::Sender;
Expand Down Expand Up @@ -681,6 +681,59 @@ pub mod test_helpers {
])
}

pub fn block_added_header_test_steps(listener_id: ListenerId) -> Vec<Step> {
fn m(command: Command) -> Option<Mutation> {
Some(Mutation { command, scope: Scope::BlockAddedHeader(BlockAddedHeaderScope {}) })
}
let s = |command: Command| -> Option<SubscriptionMessage> {
Some(SubscriptionMessage { listener_id, mutation: Mutation { command, scope: Scope::BlockAddedHeader(BlockAddedHeaderScope {}) } })
};
fn n() -> TestNotification {
TestNotification::BlockAddedHeader(BlockAddedHeaderNotification::default())
}
fn e() -> Option<TestNotification> {
Some(TestNotification::BlockAddedHeader(BlockAddedHeaderNotification::default()))
}

set_steps_data(vec![
Step {
name: "do nothing",
mutations: vec![],
expected_subscriptions: vec![],
notification: n(),
expected_notifications: vec![None, None],
},
Step {
name: "L0 on",
mutations: vec![m(Command::Start), None],
expected_subscriptions: vec![s(Command::Start), None],
notification: n(),
expected_notifications: vec![e(), None],
},
Step {
name: "L0 & L1 on",
mutations: vec![None, m(Command::Start)],
expected_subscriptions: vec![None, None],
notification: n(),
expected_notifications: vec![e(), e()],
},
Step {
name: "L1 on",
mutations: vec![m(Command::Stop), None],
expected_subscriptions: vec![None, None],
notification: n(),
expected_notifications: vec![None, e()],
},
Step {
name: "all off",
mutations: vec![None, m(Command::Stop)],
expected_subscriptions: vec![None, s(Command::Stop)],
notification: n(),
expected_notifications: vec![None, None],
},
])
}

fn set_steps_data(mut steps: Vec<Step>) -> Vec<Step> {
// Prepare the notification data markers for the test
for (idx, step) in steps.iter_mut().enumerate() {
Expand Down Expand Up @@ -858,4 +911,11 @@ mod tests {
let test = Test::new("UtxosChanged broadcast", 3, utxos_changed_test_steps(SUBSCRIPTION_MANAGER_ID));
test.run().await;
}

#[tokio::test]
async fn test_block_added_header() {
kaspa_core::log::try_init_logger("trace,kaspa_notify=trace");
let test = Test::new("BlockAddedHeader broadcast", 3, block_added_header_test_steps(SUBSCRIPTION_MANAGER_ID));
test.run().await;
}
}
4 changes: 4 additions & 0 deletions notify/src/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub enum Scope {
VirtualDaaScoreChanged,
PruningPointUtxoSetOverride,
NewBlockTemplate,
BlockAddedHeader,
}
}

Expand Down Expand Up @@ -113,3 +114,6 @@ pub struct PruningPointUtxoSetOverrideScope {}

#[derive(Clone, Display, Debug, Default, PartialEq, Eq, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
pub struct NewBlockTemplateScope {}

#[derive(Clone, Display, Debug, Default, PartialEq, Eq, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
pub struct BlockAddedHeaderScope {}
11 changes: 11 additions & 0 deletions rpc/core/src/api/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub enum Notification {

#[display(fmt = "NewBlockTemplate notification")]
NewBlockTemplate(NewBlockTemplateNotification),

#[display(fmt = "BlockAddedHeader notification: block hash {}", "_0.block.header.hash")]
BlockAddedHeader(BlockAddedHeaderNotification),
}
}

Expand All @@ -59,6 +62,7 @@ impl Notification {
Notification::VirtualDaaScoreChanged(v) => to_value(&v),
Notification::SinkBlueScoreChanged(v) => to_value(&v),
Notification::VirtualChainChanged(v) => to_value(&v),
Notification::BlockAddedHeader(v) => to_value(&v),
}
}
}
Expand Down Expand Up @@ -99,6 +103,13 @@ impl NotificationTrait for Notification {
}
}

fn apply_block_added_header_subscription(&self, subscription: &OverallSubscription) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn event_type(&self) -> EventType {
self.into()
}
Expand Down
4 changes: 4 additions & 0 deletions rpc/core/src/api/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub enum RpcApiOps {
NotifyVirtualDaaScoreChanged,
NotifyVirtualChainChanged,
NotifySinkBlueScoreChanged,
NotifyBlockAddedHeader,

// ~
Subscribe,
Expand All @@ -114,6 +115,7 @@ pub enum RpcApiOps {
VirtualDaaScoreChangedNotification,
PruningPointUtxoSetOverrideNotification,
NewBlockTemplateNotification,
BlockAddedHeaderNotification,
}

impl RpcApiOps {
Expand All @@ -129,6 +131,7 @@ impl RpcApiOps {
| RpcApiOps::NotifyFinalityConflictResolved
| RpcApiOps::NotifySinkBlueScoreChanged
| RpcApiOps::NotifyVirtualDaaScoreChanged
| RpcApiOps::NotifyBlockAddedHeader
| RpcApiOps::Subscribe
| RpcApiOps::Unsubscribe
)
Expand All @@ -155,6 +158,7 @@ impl From<EventType> for RpcApiOps {
EventType::VirtualDaaScoreChanged => RpcApiOps::VirtualDaaScoreChangedNotification,
EventType::PruningPointUtxoSetOverride => RpcApiOps::PruningPointUtxoSetOverrideNotification,
EventType::NewBlockTemplate => RpcApiOps::NewBlockTemplateNotification,
EventType::BlockAddedHeader => RpcApiOps::BlockAddedHeaderNotification,
}
}
}
8 changes: 8 additions & 0 deletions rpc/core/src/convert/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
convert::utxo::utxo_set_into_rpc, BlockAddedNotification, FinalityConflictNotification, FinalityConflictResolvedNotification,
NewBlockTemplateNotification, Notification, PruningPointUtxoSetOverrideNotification, RpcAcceptedTransactionIds,
SinkBlueScoreChangedNotification, UtxosChangedNotification, VirtualChainChangedNotification, VirtualDaaScoreChangedNotification,
BlockAddedHeaderNotification
};
use kaspa_consensus_notify::notification as consensus_notify;
use kaspa_index_core::notification as index_notify;
Expand Down Expand Up @@ -29,6 +30,7 @@ impl From<&consensus_notify::Notification> for Notification {
consensus_notify::Notification::VirtualDaaScoreChanged(msg) => Notification::VirtualDaaScoreChanged(msg.into()),
consensus_notify::Notification::PruningPointUtxoSetOverride(msg) => Notification::PruningPointUtxoSetOverride(msg.into()),
consensus_notify::Notification::NewBlockTemplate(msg) => Notification::NewBlockTemplate(msg.into()),
consensus_notify::Notification::BlockAddedHeader(msg) => Notification::BlockAddedHeader(msg.into()),
}
}
}
Expand Down Expand Up @@ -110,6 +112,12 @@ impl From<&consensus_notify::NewBlockTemplateNotification> for NewBlockTemplateN
}
}

impl From<&consensus_notify::BlockAddedHeaderNotification> for BlockAddedHeaderNotification {
fn from(item: &consensus_notify::BlockAddedHeaderNotification) -> Self {
Self { block: Arc::new((&item.block).into()) }
}
}

// ----------------------------------------------------------------------------
// index to rpc_core
// ----------------------------------------------------------------------------
Expand Down
31 changes: 31 additions & 0 deletions rpc/core/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1143,3 +1143,34 @@ impl SubscribeResponse {
#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[serde(rename_all = "camelCase")]
pub struct UnsubscribeResponse {}

// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// BlockAddedHeaderNotification

/// NotifyBlockAddedHeaderRequest registers this connection for blockAddedHeader notifications.
///
/// See: BlockAddedHeaderNotification
#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[serde(rename_all = "camelCase")]
pub struct NotifyBlockAddedHeaderRequest {
pub command: Command,
}
impl NotifyBlockAddedHeaderRequest {
pub fn new(command: Command) -> Self {
Self { command }
}
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[serde(rename_all = "camelCase")]
pub struct NotifyBlockAddedHeaderResponse {}

/// BlockAddedHeaderNotification is sent whenever a blocks has been added (NOT accepted)
/// into the DAG.
///
/// See: NotifyBlockAddedHeaderRequest
#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct BlockAddedHeaderNotification {
pub block: Arc<RpcBlock>,
}
3 changes: 2 additions & 1 deletion rpc/grpc/core/src/convert/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ from!(item: &kaspa_rpc_core::Notification, Payload, {
Notification::VirtualDaaScoreChanged(ref notification) => Payload::VirtualDaaScoreChangedNotification(notification.into()),
Notification::PruningPointUtxoSetOverride(ref notification) => {
Payload::PruningPointUtxoSetOverrideNotification(notification.into())
}
},
Notification::BlockAddedHeader(ref notification) => Payload::BlockAddedHeaderNotification(notification.into()),
}
});

Expand Down

0 comments on commit 36aa8c2

Please sign in to comment.