Skip to content

Commit

Permalink
mining-device-sv1: channels
Browse files Browse the repository at this point in the history
  • Loading branch information
Shourya742 committed Jan 28, 2025
1 parent e601511 commit 58230c2
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
11 changes: 6 additions & 5 deletions roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions roles/test-utils/mining-device-sv1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ num-bigint = "0.4.3"
num-traits = "0.2.15"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
tokio = "^1.38.0"
31 changes: 17 additions & 14 deletions roles/test-utils/mining-device-sv1/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use async_std::net::TcpStream;
use std::{convert::TryInto, net::SocketAddr, ops::Div};

use async_channel::{bounded, Receiver, Sender};
use async_std::{io::BufReader, prelude::*, task};
use num_bigint::BigUint;
use num_traits::FromPrimitive;
Expand Down Expand Up @@ -33,9 +31,9 @@ pub struct Client {
sented_authorize_request: Vec<(u64, String)>, // (id, user_name)
authorized: Vec<String>,
/// Receives incoming messages from the SV1 Upstream node.
receiver_incoming: Receiver<String>,
sender_incoming: tokio::sync::broadcast::Sender<String>,
/// Sends outgoing messages to the SV1 Upstream node.
sender_outgoing: Sender<String>,
sender_outgoing: tokio::sync::mpsc::Sender<String>,
/// Representation of the Mining Devices
miner: Arc<Mutex<Miner>>,
}
Expand Down Expand Up @@ -74,14 +72,19 @@ impl Client {

// `sender_incoming` listens on socket for incoming messages from the Upstream and sends
// messages to the `receiver_incoming` to be parsed and handled by the `Client`
let (sender_incoming, receiver_incoming) = bounded(10);
// broadcast need to be used as cloning is taking place.
// let (sender_incoming, receiver_incoming) = bounded(10);
let (sender_incoming, _) = tokio::sync::broadcast::channel(10);
// `sender_outgoing` sends the message parsed by the `Client` to the `receiver_outgoing`
// which writes the messages to the socket to the Upstream
let (sender_outgoing, receiver_outgoing) = bounded(10);
// mpsc can be used
// let (sender_outgoing, receiver_outgoing) = bounded(10);
let (sender_outgoing, mut receiver_outgoing) = tokio::sync::mpsc::channel(10);
// `sender_share` sends job share results to the `receiver_share` where the job share
// results are formated into a "mining.submit" messages that is then sent to the
// Upstream via `sender_outgoing`
let (sender_share, receiver_share) = bounded(10);
// mpsc can be used
let (sender_share,mut receiver_share) = tokio::sync::mpsc::channel(10);

// Instantiates a new `Miner` (a mock of an actual Mining Device) with a job id of 0.
let miner = Arc::new(Mutex::new(Miner::new(0)));
Expand All @@ -100,12 +103,13 @@ impl Client {

// Reads messages sent by the Upstream from the socket to be passed to the
// `receiver_incoming`
let sender_incoming_clone = sender_incoming.clone();
task::spawn(async move {
let mut messages = BufReader::new(&*reader).lines();
while let Some(message) = messages.next().await {
match message {
Ok(msg) => {
if let Err(e) = sender_incoming.send(msg).await {
if let Err(e) = sender_incoming_clone.send(msg) {
error!("Failed to send message to receiver_incoming: {:?}", e);
break; // Exit the loop if sending fails
}
Expand Down Expand Up @@ -142,7 +146,7 @@ impl Client {
status: ClientStatus::Init,
sented_authorize_request: vec![],
authorized: vec![],
receiver_incoming,
sender_incoming,
sender_outgoing,
miner,
}));
Expand Down Expand Up @@ -181,9 +185,8 @@ impl Client {
// then serialized into json to be sent to the Upstream via the `sender_outgoing` sender.
let cloned = client.clone();
task::spawn(async move {
let recv = receiver_share.clone();
loop {
let (nonce, job_id, _version, ntime) = recv.recv().await.unwrap();
let (nonce, job_id, _version, ntime) = receiver_share.recv().await.unwrap();
if cloned.clone().safe_lock(|c| c.status).unwrap() != ClientStatus::Subscribed {
continue;
}
Expand All @@ -205,13 +208,13 @@ impl Client {
sender_outgoing_clone.send(message).await.unwrap();
}
});
let recv_incoming = client.safe_lock(|c| c.receiver_incoming.clone()).unwrap();
let mut recv_incoming = client.safe_lock(|c| c.sender_incoming.clone()).unwrap().subscribe();

loop {
match client.clone().safe_lock(|c| c.status).unwrap() {
ClientStatus::Init => panic!("impossible state"),
ClientStatus::Configured => {
let incoming = recv_incoming.clone().recv().await.unwrap();
let incoming = recv_incoming.recv().await.unwrap();
Self::parse_message(client.clone(), Ok(incoming)).await;
}
ClientStatus::Subscribed => {
Expand Down Expand Up @@ -253,7 +256,7 @@ impl Client {
}

/// Send SV1 messages to the receiver_outgoing which writes to the socket (aka Upstream node)
async fn send_message(sender: Sender<String>, msg: json_rpc::Message) {
async fn send_message(sender: tokio::sync::mpsc::Sender<String>, msg: json_rpc::Message) {
let msg = format!("{}\n", serde_json::to_string(&msg).unwrap());
info!(" - Send: {}", &msg);
sender.send(msg).await.unwrap();
Expand Down

0 comments on commit 58230c2

Please sign in to comment.