Skip to content

Commit

Permalink
Return .broker/currentClient:subscriptions as Map
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Aug 10, 2024
1 parent e645a1a commit 47e4d6b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
25 changes: 19 additions & 6 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use log::{debug, error, info, log, warn};
use crate::config::{AccessConfig, BrokerConfig, Password};
use shvrpc::rpcframe::RpcFrame;
use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode, RqId, Tag};
use shvproto::{List, MetaMap, RpcValue, rpcvalue};
use shvproto::{Map, MetaMap, RpcValue, rpcvalue};
use shvrpc::rpc::{Glob, ShvRI, SubscriptionParam};
use crate::shvnode::{AppNode, BrokerAccessMountsNode, BrokerAccessRolesNode, BrokerAccessUsersNode, BrokerCurrentClientNode, BrokerNode, DIR_APP, DIR_BROKER, DIR_BROKER_ACCESS_MOUNTS, DIR_BROKER_ACCESS_ROLES, DIR_BROKER_ACCESS_USERS, DIR_BROKER_CURRENT_CLIENT, find_longest_prefix, METH_DIR, METH_SUBSCRIBE, process_local_dir_ls, ShvNode};
use shvrpc::util::{join_path, sha1_hash, split_glob_on_match};
Expand Down Expand Up @@ -384,7 +384,7 @@ impl BrokerState {
}
}
fn peer_to_info(client_id: PeerId, peer: &Peer) -> rpcvalue::Map {
let subs: List = peer.subscriptions.iter().map(|subs| subs.param.to_rpcvalue()).collect();
let subs = Self::subscriptions_to_map(&peer.subscriptions);
let (device_id, mount_point) = if let PeerKind::Device { mount_point, device_id, .. } = &peer.peer_kind {
(device_id.clone().unwrap_or_default(), mount_point.clone())
} else {
Expand Down Expand Up @@ -412,23 +412,36 @@ impl BrokerState {
}
None
}
pub(crate) fn subscriptions(&self, client_id: PeerId) -> shvrpc::Result<List> {
fn subscriptions_to_map(subscriptions: &[Subscription]) -> Map {
subscriptions.iter().map(|subscr| {
if subscr.param.ttl == 0 {
let key = subscr.glob.as_str().to_string();
(key, 0.into())
} else {
let key = subscr.glob.as_str().to_string();
let ttl = Instant::now() + Duration::from_secs(subscr.param.ttl as u64) - subscr.subscribed;
(key, (ttl.as_secs() as i64).into())
}
}).collect()
}
pub(crate) fn subscriptions(&self, client_id: PeerId) -> shvrpc::Result<Map> {
let peer = self.peers.get(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?;
let subs: List = peer.subscriptions.iter().map(|subs| subs.param.to_rpcvalue()).collect();
Ok(subs)
Ok(Self::subscriptions_to_map(&peer.subscriptions))
}
pub(crate) fn subscribe(&mut self, client_id: PeerId, subpar: &SubscriptionParam) -> shvrpc::Result<bool> {
let peer = self.peers.get_mut(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?;
if let Some(sub) = peer.subscriptions.iter_mut().find(|sub| sub.param.ri == subpar.ri) {
log!(target: "Subscr", Level::Debug, "Changing subscription TTL for client id: {} - {:?}", client_id, subpar);
sub.param.ttl = subpar.ttl;
Ok(false)
} else {
log!(target: "Subscr", Level::Debug, "Adding subscription for client id: {} - {:?}", client_id, subpar);
peer.subscriptions.push(Subscription::new(subpar)?);
Ok(true)
}
}
pub(crate) fn unsubscribe(&mut self, client_id: PeerId, subpar: &SubscriptionParam) -> shvrpc::Result<bool> {
log!(target: "Subscr", Level::Debug, "Remove subscription for client id: {} - {:?}", client_id, subpar);
log!(target: "Subscr", Level::Debug, "Removing subscription for client id: {} - {:?}", client_id, subpar);
let peer = self.peers.get_mut(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?;
let cnt = peer.subscriptions.len();
peer.subscriptions.retain(|subscr| subscr.param.ri != subpar.ri);
Expand Down
2 changes: 1 addition & 1 deletion src/shvnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ impl ShvNode for BrokerNode {
const META_METH_INFO: MetaMethod = MetaMethod { name: METH_INFO, flags: Flag::None as u32, access: AccessLevel::Browse, param: "Int", result: "ClientInfo", signals: &[], description: "" };
const META_METH_SUBSCRIBE: MetaMethod = MetaMethod { name: METH_SUBSCRIBE, flags: Flag::None as u32, access: AccessLevel::Browse, param: "SubscribeParams", result: "void", signals: &[], description: "" };
const META_METH_UNSUBSCRIBE: MetaMethod = MetaMethod { name: METH_UNSUBSCRIBE, flags: Flag::None as u32, access: AccessLevel::Browse, param: "SubscribeParams", result: "void", signals: &[], description: "" };
const META_METH_SUBSCRIPTIONS: MetaMethod = MetaMethod { name: METH_SUBSCRIPTIONS, flags: Flag::None as u32, access: AccessLevel::Browse, param: "void", result: "List", signals: &[], description: "" };
const META_METH_SUBSCRIPTIONS: MetaMethod = MetaMethod { name: METH_SUBSCRIPTIONS, flags: Flag::None as u32, access: AccessLevel::Browse, param: "void", result: "Map", signals: &[], description: "" };

pub(crate) struct BrokerCurrentClientNode {}
impl BrokerCurrentClientNode {
Expand Down
12 changes: 6 additions & 6 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::brokerimpl::BrokerImpl;
use async_std::channel::{Receiver, Sender};
use async_std::{channel, task};
use rusqlite::Connection;
use shvproto::{List, RpcValue};
use shvproto::{RpcValue};
use shvrpc::rpcframe::RpcFrame;
use shvrpc::{RpcMessage, RpcMessageMetaTags};
use shvrpc::rpc::{ShvRI, SubscriptionParam};
Expand Down Expand Up @@ -90,7 +90,7 @@ async fn test_broker_loop() {
assert_eq!(m.get("clientId").unwrap(), &RpcValue::from(2));
assert_eq!(m.get("mountPoint").unwrap(), &RpcValue::from("test/device"));
assert_eq!(m.get("userName").unwrap(), &RpcValue::from(user));
assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(List::new()));
assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(shvproto::Map::new()));

// subscriptions
let subs = SubscriptionParam { ri: ShvRI::try_from("shv/**:*").unwrap(), ttl: 0 };
Expand All @@ -103,15 +103,15 @@ async fn test_broker_loop() {
assert!(!result.as_bool());
let resp = call(".broker/currentClient", "info", None, &call_ctx).await;
let subs = resp.as_map().get("subscriptions").unwrap();
let subs_list = subs.as_list();
assert_eq!(subs_list.len(), 1);
let subs_map = subs.as_map();
assert_eq!(subs_map.len(), 1);
}
{
call(".broker/currentClient", METH_UNSUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;
let resp = call(".broker/currentClient", "info", None, &call_ctx).await;
let subs = resp.as_map().get("subscriptions").unwrap();
let subs_list = subs.as_list();
assert_eq!(subs_list.len(), 0);
let subs_map = subs.as_map();
assert_eq!(subs_map.len(), 0);
}

// access/mounts
Expand Down

0 comments on commit 47e4d6b

Please sign in to comment.