From 47e4d6bcbd8b4acb22ce9cad057c683e17cabae0 Mon Sep 17 00:00:00 2001 From: Fanda Vacek Date: Sat, 10 Aug 2024 18:40:08 +0200 Subject: [PATCH] Return .broker/currentClient:subscriptions as Map --- src/brokerimpl.rs | 25 +++++++++++++++++++------ src/shvnode.rs | 2 +- src/test.rs | 12 ++++++------ 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/brokerimpl.rs b/src/brokerimpl.rs index 4e0f992..ec3b57b 100644 --- a/src/brokerimpl.rs +++ b/src/brokerimpl.rs @@ -10,7 +10,7 @@ use log::{debug, error, info, log, warn}; use crate::config::{AccessConfig, BrokerConfig, Password}; use shvrpc::rpcframe::RpcFrame; use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode, RqId, Tag}; -use shvproto::{List, MetaMap, RpcValue, rpcvalue}; +use shvproto::{Map, MetaMap, RpcValue, rpcvalue}; use shvrpc::rpc::{Glob, ShvRI, SubscriptionParam}; use crate::shvnode::{AppNode, BrokerAccessMountsNode, BrokerAccessRolesNode, BrokerAccessUsersNode, BrokerCurrentClientNode, BrokerNode, DIR_APP, DIR_BROKER, DIR_BROKER_ACCESS_MOUNTS, DIR_BROKER_ACCESS_ROLES, DIR_BROKER_ACCESS_USERS, DIR_BROKER_CURRENT_CLIENT, find_longest_prefix, METH_DIR, METH_SUBSCRIBE, process_local_dir_ls, ShvNode}; use shvrpc::util::{join_path, sha1_hash, split_glob_on_match}; @@ -384,7 +384,7 @@ impl BrokerState { } } fn peer_to_info(client_id: PeerId, peer: &Peer) -> rpcvalue::Map { - let subs: List = peer.subscriptions.iter().map(|subs| subs.param.to_rpcvalue()).collect(); + let subs = Self::subscriptions_to_map(&peer.subscriptions); let (device_id, mount_point) = if let PeerKind::Device { mount_point, device_id, .. } = &peer.peer_kind { (device_id.clone().unwrap_or_default(), mount_point.clone()) } else { @@ -412,23 +412,36 @@ impl BrokerState { } None } - pub(crate) fn subscriptions(&self, client_id: PeerId) -> shvrpc::Result { + fn subscriptions_to_map(subscriptions: &[Subscription]) -> Map { + subscriptions.iter().map(|subscr| { + if subscr.param.ttl == 0 { + let key = subscr.glob.as_str().to_string(); + (key, 0.into()) + } else { + let key = subscr.glob.as_str().to_string(); + let ttl = Instant::now() + Duration::from_secs(subscr.param.ttl as u64) - subscr.subscribed; + (key, (ttl.as_secs() as i64).into()) + } + }).collect() + } + pub(crate) fn subscriptions(&self, client_id: PeerId) -> shvrpc::Result { let peer = self.peers.get(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?; - let subs: List = peer.subscriptions.iter().map(|subs| subs.param.to_rpcvalue()).collect(); - Ok(subs) + Ok(Self::subscriptions_to_map(&peer.subscriptions)) } pub(crate) fn subscribe(&mut self, client_id: PeerId, subpar: &SubscriptionParam) -> shvrpc::Result { let peer = self.peers.get_mut(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?; if let Some(sub) = peer.subscriptions.iter_mut().find(|sub| sub.param.ri == subpar.ri) { + log!(target: "Subscr", Level::Debug, "Changing subscription TTL for client id: {} - {:?}", client_id, subpar); sub.param.ttl = subpar.ttl; Ok(false) } else { + log!(target: "Subscr", Level::Debug, "Adding subscription for client id: {} - {:?}", client_id, subpar); peer.subscriptions.push(Subscription::new(subpar)?); Ok(true) } } pub(crate) fn unsubscribe(&mut self, client_id: PeerId, subpar: &SubscriptionParam) -> shvrpc::Result { - log!(target: "Subscr", Level::Debug, "Remove subscription for client id: {} - {:?}", client_id, subpar); + log!(target: "Subscr", Level::Debug, "Removing subscription for client id: {} - {:?}", client_id, subpar); let peer = self.peers.get_mut(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?; let cnt = peer.subscriptions.len(); peer.subscriptions.retain(|subscr| subscr.param.ri != subpar.ri); diff --git a/src/shvnode.rs b/src/shvnode.rs index 87ce933..3bf70eb 100644 --- a/src/shvnode.rs +++ b/src/shvnode.rs @@ -482,7 +482,7 @@ impl ShvNode for BrokerNode { const META_METH_INFO: MetaMethod = MetaMethod { name: METH_INFO, flags: Flag::None as u32, access: AccessLevel::Browse, param: "Int", result: "ClientInfo", signals: &[], description: "" }; const META_METH_SUBSCRIBE: MetaMethod = MetaMethod { name: METH_SUBSCRIBE, flags: Flag::None as u32, access: AccessLevel::Browse, param: "SubscribeParams", result: "void", signals: &[], description: "" }; const META_METH_UNSUBSCRIBE: MetaMethod = MetaMethod { name: METH_UNSUBSCRIBE, flags: Flag::None as u32, access: AccessLevel::Browse, param: "SubscribeParams", result: "void", signals: &[], description: "" }; -const META_METH_SUBSCRIPTIONS: MetaMethod = MetaMethod { name: METH_SUBSCRIPTIONS, flags: Flag::None as u32, access: AccessLevel::Browse, param: "void", result: "List", signals: &[], description: "" }; +const META_METH_SUBSCRIPTIONS: MetaMethod = MetaMethod { name: METH_SUBSCRIPTIONS, flags: Flag::None as u32, access: AccessLevel::Browse, param: "void", result: "Map", signals: &[], description: "" }; pub(crate) struct BrokerCurrentClientNode {} impl BrokerCurrentClientNode { diff --git a/src/test.rs b/src/test.rs index 2f4d6d4..406a943 100644 --- a/src/test.rs +++ b/src/test.rs @@ -2,7 +2,7 @@ use crate::brokerimpl::BrokerImpl; use async_std::channel::{Receiver, Sender}; use async_std::{channel, task}; use rusqlite::Connection; -use shvproto::{List, RpcValue}; +use shvproto::{RpcValue}; use shvrpc::rpcframe::RpcFrame; use shvrpc::{RpcMessage, RpcMessageMetaTags}; use shvrpc::rpc::{ShvRI, SubscriptionParam}; @@ -90,7 +90,7 @@ async fn test_broker_loop() { assert_eq!(m.get("clientId").unwrap(), &RpcValue::from(2)); assert_eq!(m.get("mountPoint").unwrap(), &RpcValue::from("test/device")); assert_eq!(m.get("userName").unwrap(), &RpcValue::from(user)); - assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(List::new())); + assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(shvproto::Map::new())); // subscriptions let subs = SubscriptionParam { ri: ShvRI::try_from("shv/**:*").unwrap(), ttl: 0 }; @@ -103,15 +103,15 @@ async fn test_broker_loop() { assert!(!result.as_bool()); let resp = call(".broker/currentClient", "info", None, &call_ctx).await; let subs = resp.as_map().get("subscriptions").unwrap(); - let subs_list = subs.as_list(); - assert_eq!(subs_list.len(), 1); + let subs_map = subs.as_map(); + assert_eq!(subs_map.len(), 1); } { call(".broker/currentClient", METH_UNSUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await; let resp = call(".broker/currentClient", "info", None, &call_ctx).await; let subs = resp.as_map().get("subscriptions").unwrap(); - let subs_list = subs.as_list(); - assert_eq!(subs_list.len(), 0); + let subs_map = subs.as_map(); + assert_eq!(subs_map.len(), 0); } // access/mounts