diff --git a/src/brokerimpl.rs b/src/brokerimpl.rs index ec3b57b..93aadde 100644 --- a/src/brokerimpl.rs +++ b/src/brokerimpl.rs @@ -414,13 +414,16 @@ impl BrokerState { } fn subscriptions_to_map(subscriptions: &[Subscription]) -> Map { subscriptions.iter().map(|subscr| { - if subscr.param.ttl == 0 { - let key = subscr.glob.as_str().to_string(); - (key, 0.into()) - } else { - let key = subscr.glob.as_str().to_string(); - let ttl = Instant::now() + Duration::from_secs(subscr.param.ttl as u64) - subscr.subscribed; - (key, (ttl.as_secs() as i64).into()) + match subscr.param.ttl { + None => { + let key = subscr.glob.as_str().to_string(); + (key, ().into()) + } + Some(ttl) => { + let key = subscr.glob.as_str().to_string(); + let ttl = Instant::now() + Duration::from_secs(ttl as u64) - subscr.subscribed; + (key, (ttl.as_secs() as i64).into()) + } } }).collect() } @@ -459,14 +462,17 @@ impl BrokerState { let now = Instant::now(); for peer in self.peers.values_mut() { peer.subscriptions.retain(|subscr| { - if subscr.param.ttl > 0 { - let expired = now - subscr.subscribed > Duration::from_secs(subscr.param.ttl as u64); - if expired { - log!(target: "Subscr", Level::Debug, "Subscription expired: {:?}", subscr.param); + match subscr.param.ttl { + None => { + true + } + Some(ttl) => { + let expired = now - subscr.subscribed > Duration::from_secs(ttl as u64); + if expired { + log!(target: "Subscr", Level::Debug, "Subscription expired: {:?}", subscr.param); + } + !expired } - !expired - } else { - true } }); } @@ -478,7 +484,7 @@ impl BrokerState { fwd_subs.insert(subscr.param.ri.clone()); } } - const DEFAULT_TTL: u32 = 10 * 60; + const DEFAULT_TTL: Option = Some(10 * 60); let mut to_forward: HashMap> = Default::default(); for (peer_id, peer) in &self.peers { if let PeerKind::Device { mount_point, subscribe_path: Some(SubscribePath::CanSubscribe(_)), .. } = &peer.peer_kind { @@ -498,16 +504,18 @@ impl BrokerState { } } for (peer_id, peer) in &mut self.peers { - if let Some(to_fwd) = to_forward.get_mut(peer_id) { - // remove fwd subscritions not found in to_fwd + if let Some(to_fwd_peer) = to_forward.get_mut(peer_id) { + // remove fwd subscritions not found in to_fwd_peer peer.forwarded_subscriptions.retain_mut(|subs| { - if to_fwd.contains(&subs.param.ri) { - to_fwd.remove(&subs.param.ri); + if to_fwd_peer.contains(&subs.param.ri) { + to_fwd_peer.remove(&subs.param.ri); if let Some(subscribed) = &subs.subscribed { - let expires = subscribed.add(Duration::from_secs(subs.param.ttl as u64 - 10)); - if expires < Instant::now() { - // subscriptions near to expiration, schedule subscribe RPC call - subs.subscribed = None; + if let Some(ttl) = subs.param.ttl { + let expires = subscribed.add(Duration::from_secs(ttl as u64 - 10)); + if expires < Instant::now() { + // subscriptions near to expiration, schedule subscribe RPC call + subs.subscribed = None; + } } } true @@ -516,7 +524,7 @@ impl BrokerState { } }); // add new fwd subscriptions - for ri in to_fwd.iter() { + for ri in to_fwd_peer.iter() { peer.forwarded_subscriptions.push(ForwardedSubscription { param: SubscriptionParam { ri: ri.clone(), ttl: DEFAULT_TTL }, subscribed: None, diff --git a/src/test.rs b/src/test.rs index 406a943..420df04 100644 --- a/src/test.rs +++ b/src/test.rs @@ -93,7 +93,8 @@ async fn test_broker_loop() { assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(shvproto::Map::new())); // subscriptions - let subs = SubscriptionParam { ri: ShvRI::try_from("shv/**:*").unwrap(), ttl: 0 }; + let subs_ri = "shv/**:*"; + let subs = SubscriptionParam { ri: ShvRI::try_from(subs_ri).unwrap(), ttl: None }; { // subscribe let result = call(".broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await; @@ -101,10 +102,11 @@ async fn test_broker_loop() { // cannot subscribe the same twice let result = call(".broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await; assert!(!result.as_bool()); - let resp = call(".broker/currentClient", "info", None, &call_ctx).await; - let subs = resp.as_map().get("subscriptions").unwrap(); - let subs_map = subs.as_map(); + let resp = call(".broker/currentClient", "subscriptions", None, &call_ctx).await; + let subs_map = resp.as_map(); + // let s = format!("{:?}", subs_map); assert_eq!(subs_map.len(), 1); + assert_eq!(subs_map.first_key_value().unwrap().0, subs_ri); } { call(".broker/currentClient", METH_UNSUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;