Skip to content

Commit

Permalink
Subscribe returns bool value according to spec 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Jul 18, 2024
1 parent cecaaeb commit 1ce70d0
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 29 deletions.
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shvbroker"
version = "3.0.0"
version = "3.0.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down Expand Up @@ -28,11 +28,11 @@ crc = "3.2.1"
#getrandom = { version = "0.2", features = ["js"] }

# For local development
#[patch."https://github.com/silicon-heaven/libshvproto-rs"]
#shvproto = { path = "../libshvproto-rs" }
#
#[patch."https://github.com/silicon-heaven/libshvrpc-rs"]
#shvrpc = { path = "../libshvrpc-rs" }
[patch."https://github.com/silicon-heaven/libshvproto-rs"]
shvproto = { path = "../libshvproto-rs" }

[patch."https://github.com/silicon-heaven/libshvrpc-rs"]
shvrpc = { path = "../libshvrpc-rs" }

[[bin]]
name = "shvbroker"
Expand Down
7 changes: 0 additions & 7 deletions src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,6 @@ impl ParsedAccessRule {
}
}

//fn find_mount_mut<'a, 'b>(mounts: &'a mut BTreeMap<String, Mount>, shv_path: &'b str) -> Option<(&'a mut Mount, &'b str)> {
// if let Some((mount_dir, node_dir)) = find_longest_prefix(mounts, shv_path) {
// Some((mounts.get_mut(mount_dir).unwrap(), node_dir))
// } else {
// None
// }
//}
pub(crate) struct PendingRpcCall {
pub(crate) client_id: CliId,
pub(crate) request_id: RqId,
Expand Down
11 changes: 7 additions & 4 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,12 +507,15 @@ impl BrokerImpl {
}
None
}
async fn subscribe(&mut self, client_id: CliId, subscription: &Subscription) -> shvrpc::Result<()> {
async fn subscribe(&mut self, client_id: CliId, subscription: &Subscription) -> shvrpc::Result<bool> {
log!(target: "Subscr", Level::Debug, "New subscription for client id: {} - {}", client_id, subscription);
let peer = self.peers.get_mut(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?;
if peer.subscriptions.iter().find(|sp| *sp == subscription).is_some() {
return Ok(false);
}
peer.subscriptions.push(SubscriptionPattern::from_subscription(subscription)?);
self.propagate_subscription_to_matching_devices(subscription)?;
Ok(())
Ok(true)
}
fn unsubscribe(&mut self, client_id: CliId, subscription: &Subscription) -> shvrpc::Result<bool> {
log!(target: "Subscr", Level::Debug, "Remove subscription for client id: {} - {}", client_id, subscription);
Expand Down Expand Up @@ -616,8 +619,8 @@ impl BrokerImpl {
match ctx.method.name {
node::METH_SUBSCRIBE => {
let subscription = Subscription::from_rpcvalue(rq.param().unwrap_or_default());
self.subscribe(ctx.peer_id, &subscription).await?;
ctx.command_sender.send(send_result_cmd(Ok(RpcValue::null()))).await?;
let subs_added = self.subscribe(ctx.peer_id, &subscription).await?;
ctx.command_sender.send(send_result_cmd(Ok(subs_added.into()))).await?;
return Ok(())
}
node::METH_UNSUBSCRIBE => {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ mod brokerimpl;
mod node;
mod peer;

#[cfg(test)]
mod test;

mod spawn {
use log::error;
use std::future::Future;
Expand Down
23 changes: 12 additions & 11 deletions src/test.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use async_std::channel::{Receiver, Sender};
use async_std::{channel, task};
use async_std::channel::{Receiver};
use crate::broker::{BrokerToPeerMessage, PeerKind, BrokerCommand, Sender, SubscribePath};
use crate::rpcmessage::CliId;
use crate::{List, RpcMessage, RpcMessageMetaTags, RpcValue};
use crate::broker::brokerimpl::BrokerImpl;
use crate::broker::config::BrokerConfig;
use crate::broker::node::{METH_SUBSCRIBE, METH_UNSUBSCRIBE};
use crate::rpcframe::RpcFrame;
use shvproto::{List, Map, RpcValue};
use shvrpc::rpcframe::RpcFrame;
use shvrpc::{RpcMessage, RpcMessageMetaTags};
use shvrpc::rpcmessage::CliId;
use crate::broker::{BrokerToPeerMessage, PeerKind, BrokerCommand, SubscribePath};
use crate::brokerimpl::BrokerImpl;
use crate::config::BrokerConfig;
use crate::node::{METH_SUBSCRIBE, METH_UNSUBSCRIBE};


struct CallCtx<'a> {
Expand All @@ -15,8 +16,8 @@ struct CallCtx<'a> {
client_id: CliId,
}

async fn call(path: &str, method: &str, param: Option<RpcValue>, ctx: &CallCtx<'_>) -> RpcValue {
let msg = RpcMessage::new_request(path, method, param);
async fn call(shv_path: &str, method: &str, param: Option<RpcValue>, ctx: &CallCtx<'_>) -> RpcValue {
let msg = RpcMessage::new_request(shv_path, method, param);
let frame = RpcFrame::from_rpcmessage(&msg).expect("valid message");
println!("request: {}", frame.to_rpcmesage().unwrap());
ctx.writer.send(BrokerCommand::FrameReceived { client_id: ctx.client_id, frame }).await.unwrap();
Expand Down Expand Up @@ -97,7 +98,7 @@ async fn test_broker_loop() {
assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(List::new()));

// subscriptions
let subs_param = crate::Map::from([("paths".to_string(), RpcValue::from("shv/**"))]);
let subs_param = Map::from([("paths".to_string(), RpcValue::from("shv/**"))]);
{
call(".app/broker/currentClient", METH_SUBSCRIBE, Some(RpcValue::from(subs_param.clone())), &call_ctx).await;
let resp = call(".app/broker/currentClient", "info", None, &call_ctx).await;
Expand Down
2 changes: 1 addition & 1 deletion tests/test_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ fn test_broker() -> shvrpc::Result<()> {
let values = shv_call_many(calls, Some(port))?;
for v in values.iter() { println!("\t{}", v); }
let expected: Vec<String> = vec![
"RES null".into(), // response to subscribe
"RES true".into(), // response to subscribe
format!("SIG {property_path}:chng 42"), // SIG chng
"RES null".into(), // response to SET
"RES true".into(), // response to unsubscribe
Expand Down

0 comments on commit 1ce70d0

Please sign in to comment.