Skip to content

Commit

Permalink
darkfid2: various json-rpc subscribers added
Browse files Browse the repository at this point in the history
  • Loading branch information
aggstam committed Aug 8, 2023
1 parent d3aa5ba commit a2fb71e
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 19 deletions.
29 changes: 24 additions & 5 deletions bin/darkfid2/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use std::collections::HashMap;

use async_std::{stream::StreamExt, sync::Arc};
use log::{error, info};
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
Expand All @@ -26,7 +28,7 @@ use darkfi::{
blockchain::BlockInfo,
cli_desc,
net::{settings::SettingsOpt, P2pPtr},
rpc::server::listen_and_serve,
rpc::{jsonrpc::MethodSubscriber, server::listen_and_serve},
util::time::TimeKeeper,
validator::{Validator, ValidatorConfig, ValidatorPtr},
Result,
Expand Down Expand Up @@ -99,19 +101,26 @@ struct Args {
verbose: u8,
}

/// Daemon structure
pub struct Darkfid {
/// Syncing P2P network pointer
sync_p2p: P2pPtr,
/// Optional consensus P2P network pointer
consensus_p2p: Option<P2pPtr>,
/// Validator(node) pointer
validator: ValidatorPtr,
/// A map of various subscribers exporting live info from the blockchain
subscribers: HashMap<&'static str, MethodSubscriber>,
}

impl Darkfid {
pub async fn new(
sync_p2p: P2pPtr,
consensus_p2p: Option<P2pPtr>,
validator: ValidatorPtr,
subscribers: HashMap<&'static str, MethodSubscriber>,
) -> Self {
Self { sync_p2p, consensus_p2p, validator }
Self { sync_p2p, consensus_p2p, validator, subscribers }
}
}

Expand Down Expand Up @@ -146,18 +155,28 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'_>>) -> Result<()> {
// Initialize validator
let validator = Validator::new(&sled_db, config).await?;

// Here we initialize various subscribers that can export live blockchain/consensus data.
let mut subscribers = HashMap::new();
subscribers.insert("blocks", MethodSubscriber::new("blockchain.subscribe_blocks".into()));
subscribers.insert("txs", MethodSubscriber::new("blockchain.subscribe_txs".into()));
if args.consensus {
subscribers
.insert("proposals", MethodSubscriber::new("blockchain.subscribe_proposals".into()));
}

// Initialize syncing P2P network
let sync_p2p = spawn_sync_p2p(&args.sync_net.into(), &validator).await;
let sync_p2p = spawn_sync_p2p(&args.sync_net.into(), &validator, &subscribers).await;

// Initialize consensus P2P network
let consensus_p2p = if args.consensus {
Some(spawn_consensus_p2p(&args.consensus_net.into(), &validator).await)
Some(spawn_consensus_p2p(&args.consensus_net.into(), &validator, &subscribers).await)
} else {
None
};

// Initialize node
let darkfid = Darkfid::new(sync_p2p.clone(), consensus_p2p.clone(), validator.clone()).await;
let darkfid =
Darkfid::new(sync_p2p.clone(), consensus_p2p.clone(), validator.clone(), subscribers).await;
let darkfid = Arc::new(darkfid);
info!(target: "darkfid", "Node initialized successfully!");

Expand Down
9 changes: 8 additions & 1 deletion bin/darkfid2/src/proto/protocol_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use darkfi::{
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
rpc::jsonrpc::MethodSubscriber,
validator::ValidatorPtr,
Result,
};
Expand All @@ -52,13 +53,15 @@ pub struct ProtocolBlock {
validator: ValidatorPtr,
p2p: P2pPtr,
channel_address: Url,
subscriber: MethodSubscriber,
}

impl ProtocolBlock {
pub async fn init(
channel: ChannelPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
subscriber: MethodSubscriber,
) -> Result<ProtocolBasePtr> {
debug!(
target: "validator::protocol_block::init",
Expand All @@ -75,6 +78,7 @@ impl ProtocolBlock {
validator,
p2p,
channel_address: channel.address().clone(),
subscriber,
}))
}

Expand Down Expand Up @@ -118,7 +122,10 @@ impl ProtocolBlock {
let block_copy = (*block).clone();

match self.validator.write().await.append_block(&block_copy.0).await {
Ok(()) => self.p2p.broadcast_with_exclude(&block_copy, &exclude_list).await,
Ok(()) => {
self.p2p.broadcast_with_exclude(&block_copy, &exclude_list).await;
self.subscriber.notify(&block_copy).await;
}
Err(e) => {
debug!(
target: "validator::protocol_block::handle_receive_block",
Expand Down
9 changes: 8 additions & 1 deletion bin/darkfid2/src/proto/protocol_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use darkfi::{
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
rpc::jsonrpc::MethodSubscriber,
validator::{consensus::Proposal, ValidatorPtr},
Result,
};
Expand All @@ -45,13 +46,15 @@ pub struct ProtocolProposal {
validator: ValidatorPtr,
p2p: P2pPtr,
channel_address: Url,
subscriber: MethodSubscriber,
}

impl ProtocolProposal {
pub async fn init(
channel: ChannelPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
subscriber: MethodSubscriber,
) -> Result<ProtocolBasePtr> {
debug!(
target: "validator::protocol_proposal::init",
Expand All @@ -68,6 +71,7 @@ impl ProtocolProposal {
validator,
p2p,
channel_address: channel.address().clone(),
subscriber,
}))
}

Expand Down Expand Up @@ -108,7 +112,10 @@ impl ProtocolProposal {
let proposal_copy = (*proposal).clone();

match self.validator.write().await.consensus.append_proposal(&proposal_copy.0).await {
Ok(()) => self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await,
Ok(()) => {
self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await;
self.subscriber.notify(&proposal_copy).await;
}
Err(e) => {
debug!(
target: "validator::protocol_proposal::handle_receive_proposal",
Expand Down
9 changes: 8 additions & 1 deletion bin/darkfid2/src/proto/protocol_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use darkfi::{
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
rpc::jsonrpc::MethodSubscriber,
tx::Transaction,
validator::ValidatorPtr,
Result,
Expand All @@ -46,13 +47,15 @@ pub struct ProtocolTx {
validator: ValidatorPtr,
p2p: P2pPtr,
channel_address: Url,
subscriber: MethodSubscriber,
}

impl ProtocolTx {
pub async fn init(
channel: ChannelPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
subscriber: MethodSubscriber,
) -> Result<ProtocolBasePtr> {
debug!(
target: "validator::protocol_tx::init",
Expand All @@ -69,6 +72,7 @@ impl ProtocolTx {
validator,
p2p,
channel_address: channel.address().clone(),
subscriber,
}))
}

Expand Down Expand Up @@ -104,7 +108,10 @@ impl ProtocolTx {

// Nodes use unconfirmed_txs vector as seen_txs pool.
match self.validator.write().await.append_tx(&tx_copy.0).await {
Ok(()) => self.p2p.broadcast_with_exclude(&tx_copy, &exclude_list).await,
Ok(()) => {
self.p2p.broadcast_with_exclude(&tx_copy, &exclude_list).await;
self.subscriber.notify(&tx_copy).await;
}
Err(e) => {
debug!(
target: "validator::protocol_tx::handle_receive_tx",
Expand Down
9 changes: 9 additions & 0 deletions bin/darkfid2/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ impl RequestHandler for Darkfid {
Some("blockchain.lookup_zkas") => {
return self.blockchain_lookup_zkas(req.id, params).await
}
Some("blockchain.subscribe_blocks") => {
return self.blockchain_subscribe_blocks(req.id, params).await
}
Some("blockchain.subscribe_txs") => {
return self.blockchain_subscribe_txs(req.id, params).await
}
Some("blockchain.subscribe_proposals") => {
return self.blockchain_subscribe_proposals(req.id, params).await
}

// ===================
// Transaction methods
Expand Down
59 changes: 58 additions & 1 deletion bin/darkfid2/src/rpc_blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use serde_json::{json, Value};
use darkfi::{
rpc::jsonrpc::{
ErrorCode::{InternalError, InvalidParams, ParseError},
JsonError, JsonResponse, JsonResult,
JsonError, JsonResponse, JsonResult, JsonSubscriber,
},
runtime::vm_runtime::SMART_CONTRACT_ZKAS_DB_NAME,
};
Expand Down Expand Up @@ -138,6 +138,63 @@ impl Darkfid {
JsonResponse::new(json!(last_slot.0), id).into()
}

// RPCAPI:
// Initializes a subscription to new incoming blocks.
// Once a subscription is established, `darkfid` will send JSON-RPC notifications of
// new incoming blocks to the subscriber.
//
// --> {"jsonrpc": "2.0", "method": "blockchain.subscribe_blocks", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "method": "blockchain.subscribe_blocks", "params": [`blockinfo`]}
pub async fn blockchain_subscribe_blocks(&self, id: Value, params: &[Value]) -> JsonResult {
if !params.is_empty() {
return JsonError::new(InvalidParams, None, id).into()
}

let blocks_subscriber = self.subscribers.get("blocks").unwrap().clone();

JsonSubscriber::new(blocks_subscriber).into()
}

// RPCAPI:
// Initializes a subscription to new incoming transactions.
// Once a subscription is established, `darkfid` will send JSON-RPC notifications of
// new incoming transactions to the subscriber.
//
// --> {"jsonrpc": "2.0", "method": "blockchain.subscribe_txs", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "method": "blockchain.subscribe_txs", "params": [`tx_hash`]}
pub async fn blockchain_subscribe_txs(&self, id: Value, params: &[Value]) -> JsonResult {
if !params.is_empty() {
return JsonError::new(InvalidParams, None, id).into()
}

let txs_subscriber = self.subscribers.get("txs").unwrap().clone();

JsonSubscriber::new(txs_subscriber).into()
}

// RPCAPI:
// Initializes a subscription to new incoming proposals, asuming node participates
// in consensus. Once a subscription is established, `darkfid` will send JSON-RPC
// notifications of new incoming proposals to the subscriber.
//
// --> {"jsonrpc": "2.0", "method": "blockchain.subscribe_proposals", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "method": "blockchain.subscribe_proposals", "params": [`blockinfo`]}
pub async fn blockchain_subscribe_proposals(&self, id: Value, params: &[Value]) -> JsonResult {
if !params.is_empty() {
return JsonError::new(InvalidParams, None, id).into()
}

// Since proposals subscriber is only active if we participate to consensus,
// we have to check if it actually exists in the subscribers map.
let proposals_subscriber = self.subscribers.get("proposals");
if proposals_subscriber.is_none() {
error!(target: "darkfid::rpc::blockchain_subscribe_proposals", "Proposals subscriber not found");
return JsonError::new(InternalError, None, id).into()
}

JsonSubscriber::new(proposals_subscriber.unwrap().clone()).into()
}

// RPCAPI:
// Performs a lookup of zkas bincodes for a given contract ID and returns all of
// them, including their namespace.
Expand Down
6 changes: 6 additions & 0 deletions bin/darkfid2/src/task/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<SyncResponse>().await;
let block_response_sub = channel.subscribe_msg::<SyncResponse>().await?;
let notif_sub = node.subscribers.get("blocks").unwrap();

// TODO: make this parallel and use a head selection method,
// for example use a manual known head and only connect to nodes
Expand All @@ -67,6 +68,11 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
debug!(target: "darkfid::task::sync_task", "Processing received blocks");
node.validator.write().await.add_blocks(&response.blocks).await?;

// Notify subscriber
for block in &response.blocks {
notif_sub.notify(block).await;
}

let last_received = node.validator.read().await.blockchain.last()?;
info!(target: "darkfid::task::sync_task", "Last received block: {:?} - {:?}", last_received.0, last_received.1);

Expand Down
18 changes: 15 additions & 3 deletions bin/darkfid2/src/tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use std::collections::HashMap;

use async_std::sync::Arc;
use darkfi::{
blockchain::{BlockInfo, Header},
net::Settings,
rpc::jsonrpc::MethodSubscriber,
util::time::TimeKeeper,
validator::{
consensus::{next_block_reward, pid::slot_pid_output},
Expand Down Expand Up @@ -207,6 +210,7 @@ impl Harness {
}
}

// Note: This function should mirror darkfid::main
pub async fn generate_node(
vks: &Vec<(Vec<u8>, String, Vec<u8>)>,
config: &ValidatorConfig,
Expand All @@ -220,13 +224,21 @@ pub async fn generate_node(

let validator = Validator::new(&sled_db, config.clone()).await?;

let sync_p2p = spawn_sync_p2p(&sync_settings, &validator).await;
let mut subscribers = HashMap::new();
subscribers.insert("blocks", MethodSubscriber::new("blockchain.subscribe_blocks".into()));
subscribers.insert("txs", MethodSubscriber::new("blockchain.subscribe_txs".into()));
if consensus_settings.is_some() {
subscribers
.insert("proposals", MethodSubscriber::new("blockchain.subscribe_proposals".into()));
}

let sync_p2p = spawn_sync_p2p(&sync_settings, &validator, &subscribers).await;
let consensus_p2p = if let Some(settings) = consensus_settings {
Some(spawn_consensus_p2p(settings, &validator).await)
Some(spawn_consensus_p2p(settings, &validator, &subscribers).await)
} else {
None
};
let node = Darkfid::new(sync_p2p.clone(), consensus_p2p.clone(), validator).await;
let node = Darkfid::new(sync_p2p.clone(), consensus_p2p.clone(), validator, subscribers).await;

sync_p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
Expand Down
Loading

0 comments on commit a2fb71e

Please sign in to comment.