Skip to content

Commit

Permalink
Add support for missing ttl in .broker/currentClient::subscribe param
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Aug 26, 2024
1 parent 47e4d6b commit 0e04520
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 28 deletions.
56 changes: 32 additions & 24 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,13 +414,16 @@ impl BrokerState {
}
fn subscriptions_to_map(subscriptions: &[Subscription]) -> Map {
subscriptions.iter().map(|subscr| {
if subscr.param.ttl == 0 {
let key = subscr.glob.as_str().to_string();
(key, 0.into())
} else {
let key = subscr.glob.as_str().to_string();
let ttl = Instant::now() + Duration::from_secs(subscr.param.ttl as u64) - subscr.subscribed;
(key, (ttl.as_secs() as i64).into())
match subscr.param.ttl {
None => {
let key = subscr.glob.as_str().to_string();
(key, ().into())
}
Some(ttl) => {
let key = subscr.glob.as_str().to_string();
let ttl = Instant::now() + Duration::from_secs(ttl as u64) - subscr.subscribed;
(key, (ttl.as_secs() as i64).into())
}
}
}).collect()
}
Expand Down Expand Up @@ -459,14 +462,17 @@ impl BrokerState {
let now = Instant::now();
for peer in self.peers.values_mut() {
peer.subscriptions.retain(|subscr| {
if subscr.param.ttl > 0 {
let expired = now - subscr.subscribed > Duration::from_secs(subscr.param.ttl as u64);
if expired {
log!(target: "Subscr", Level::Debug, "Subscription expired: {:?}", subscr.param);
match subscr.param.ttl {
None => {
true
}
Some(ttl) => {
let expired = now - subscr.subscribed > Duration::from_secs(ttl as u64);
if expired {
log!(target: "Subscr", Level::Debug, "Subscription expired: {:?}", subscr.param);
}
!expired
}
!expired
} else {
true
}
});
}
Expand All @@ -478,7 +484,7 @@ impl BrokerState {
fwd_subs.insert(subscr.param.ri.clone());
}
}
const DEFAULT_TTL: u32 = 10 * 60;
const DEFAULT_TTL: Option<u32> = Some(10 * 60);
let mut to_forward: HashMap<PeerId, HashSet<ShvRI>> = Default::default();
for (peer_id, peer) in &self.peers {
if let PeerKind::Device { mount_point, subscribe_path: Some(SubscribePath::CanSubscribe(_)), .. } = &peer.peer_kind {
Expand All @@ -498,16 +504,18 @@ impl BrokerState {
}
}
for (peer_id, peer) in &mut self.peers {
if let Some(to_fwd) = to_forward.get_mut(peer_id) {
// remove fwd subscritions not found in to_fwd
if let Some(to_fwd_peer) = to_forward.get_mut(peer_id) {
// remove fwd subscritions not found in to_fwd_peer
peer.forwarded_subscriptions.retain_mut(|subs| {
if to_fwd.contains(&subs.param.ri) {
to_fwd.remove(&subs.param.ri);
if to_fwd_peer.contains(&subs.param.ri) {
to_fwd_peer.remove(&subs.param.ri);
if let Some(subscribed) = &subs.subscribed {
let expires = subscribed.add(Duration::from_secs(subs.param.ttl as u64 - 10));
if expires < Instant::now() {
// subscriptions near to expiration, schedule subscribe RPC call
subs.subscribed = None;
if let Some(ttl) = subs.param.ttl {
let expires = subscribed.add(Duration::from_secs(ttl as u64 - 10));
if expires < Instant::now() {
// subscriptions near to expiration, schedule subscribe RPC call
subs.subscribed = None;
}
}
}
true
Expand All @@ -516,7 +524,7 @@ impl BrokerState {
}
});
// add new fwd subscriptions
for ri in to_fwd.iter() {
for ri in to_fwd_peer.iter() {
peer.forwarded_subscriptions.push(ForwardedSubscription {
param: SubscriptionParam { ri: ri.clone(), ttl: DEFAULT_TTL },
subscribed: None,
Expand Down
10 changes: 6 additions & 4 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,20 @@ async fn test_broker_loop() {
assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(shvproto::Map::new()));

// subscriptions
let subs = SubscriptionParam { ri: ShvRI::try_from("shv/**:*").unwrap(), ttl: 0 };
let subs_ri = "shv/**:*";
let subs = SubscriptionParam { ri: ShvRI::try_from(subs_ri).unwrap(), ttl: None };
{
// subscribe
let result = call(".broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;
assert!(result.as_bool());
// cannot subscribe the same twice
let result = call(".broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;
assert!(!result.as_bool());
let resp = call(".broker/currentClient", "info", None, &call_ctx).await;
let subs = resp.as_map().get("subscriptions").unwrap();
let subs_map = subs.as_map();
let resp = call(".broker/currentClient", "subscriptions", None, &call_ctx).await;
let subs_map = resp.as_map();
// let s = format!("{:?}", subs_map);
assert_eq!(subs_map.len(), 1);
assert_eq!(subs_map.first_key_value().unwrap().0, subs_ri);
}
{
call(".broker/currentClient", METH_UNSUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;
Expand Down

0 comments on commit 0e04520

Please sign in to comment.