Skip to content

Commit

Permalink
Merge pull request #7 from j4r0u53k/fix-clippy-errors-and-warnings
Browse files Browse the repository at this point in the history
Fix errors and warnings reported by clippy
  • Loading branch information
fvacek authored Mar 7, 2024
2 parents 1cd48d9 + 1899341 commit dcb8652
Show file tree
Hide file tree
Showing 26 changed files with 447 additions and 511 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Build
run: cargo build --verbose
run: cargo build --verbose --all-features
- name: Run tests
run: cargo test --verbose --all-features -- --test-threads=1
4 changes: 2 additions & 2 deletions examples/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn main() -> shv::Result<()> {
let mut logger = SimpleLogger::new();
logger = logger.with_level(LevelFilter::Info);
if let Some(module_names) = &cli_opts.verbose {
for (module, level) in parse_log_verbosity(&module_names, module_path!()) {
for (module, level) in parse_log_verbosity(module_names, module_path!()) {
logger = logger.with_module_level(module, level);
}
}
Expand All @@ -71,7 +71,7 @@ fn main() -> shv::Result<()> {

let state = DeviceState {
node_app: AppNode{
app_name: "testdeviceapp".into(),
app_name: "testdeviceapp",
shv_version_major: 3,
shv_version_minor: 0,
},
Expand Down
6 changes: 3 additions & 3 deletions src/bin/shvbroker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub(crate) fn main() -> shv::Result<()> {
} else {
Default::default()
};
let (access, create_editable_access_file) = loop {
let (access, create_editable_access_file) = 'access: {
let mut create_editable_access_file = false;
if let Some(data_dir) = &config.data_directory {
if config.editable_access {
Expand All @@ -58,7 +58,7 @@ pub(crate) fn main() -> shv::Result<()> {
info!("Loading access file {file_name}");
match AccessControl::from_file(&file_name) {
Ok(acc) => {
break (acc, false);
break 'access (acc, false);
}
Err(err) => {
error!("Cannot read access file: {file_name} - {err}");
Expand All @@ -69,7 +69,7 @@ pub(crate) fn main() -> shv::Result<()> {
}
}
}
break (config.access.clone(), create_editable_access_file);
break 'access (config.access.clone(), create_editable_access_file);
};
if create_editable_access_file {
let data_dir = &config.data_directory.clone().unwrap_or("/tmp/shvbroker/data".into());
Expand Down
6 changes: 3 additions & 3 deletions src/bin/shvcall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub(crate) fn main() -> Result {
let mut logger = SimpleLogger::new();
logger = logger.with_level(LevelFilter::Info);
if let Some(module_names) = &opts.verbose {
for (module, level) in parse_log_verbosity(&module_names, module_path!()) {
for (module, level) in parse_log_verbosity(module_names, module_path!()) {
logger = logger.with_module_level(module, level);
}
}
Expand Down Expand Up @@ -178,7 +178,7 @@ async fn make_call(url: &Url, opts: &Opts) -> Result {
format!("RES {}\n", res.to_cpon())
}
Err(err) => {
format!("ERR {}\n", err.to_string())
format!("ERR {}\n", err)
}
}
} else {
Expand Down Expand Up @@ -325,7 +325,7 @@ async fn make_call(url: &Url, opts: &Opts) -> Result {
match parse_line(&line) {
Ok((path, method, param)) => {
let rqid =
send_request(&mut *frame_writer, &path, &method, &param)
send_request(&mut *frame_writer, path, method, param)
.await?;
loop {
let resp = frame_reader.receive_message().await?;
Expand Down
56 changes: 25 additions & 31 deletions src/broker/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Broker {
frame.set_shvpath(node_path);
frame.set_access(grant.as_str());
let device_peer_id = device.peer_id;
self.peers.get(&device_peer_id).ok_or_else(|| "client ID must exist")?.sender.send(BrokerToPeerMessage::SendFrame(frame)).await?;
self.peers.get(&device_peer_id).ok_or("client ID must exist")?.sender.send(BrokerToPeerMessage::SendFrame(frame)).await?;
}
Mount::Node(node) => {
let mut frame = frame;
Expand Down Expand Up @@ -136,12 +136,10 @@ impl Broker {
if let Some(mount_point) = self.client_id_to_mount_point(peer_id) {
let new_path = util::join_path(&mount_point, frame.shv_path().unwrap_or_default());
for (cli_id, peer) in self.peers.iter() {
if &peer_id != cli_id {
if peer.is_signal_subscribed(&new_path, frame.method().unwrap_or_default()) {
let mut frame = frame.clone();
frame.set_shvpath(&new_path);
peer.sender.send(BrokerToPeerMessage::SendFrame(frame)).await?;
}
if &peer_id != cli_id && peer.is_signal_subscribed(&new_path, frame.method().unwrap_or_default()) {
let mut frame = frame.clone();
frame.set_shvpath(&new_path);
peer.sender.send(BrokerToPeerMessage::SendFrame(frame)).await?;
}
}
}
Expand All @@ -159,7 +157,7 @@ impl Broker {
async fn process_pending_broker_rpc_call(&mut self, client_id: CliId, response_frame: RpcFrame) -> crate::Result<()> {
assert!(response_frame.is_response());
assert!(response_frame.caller_ids().is_empty());
let rqid = response_frame.request_id().ok_or_else(|| "Request ID must be set.")?;
let rqid = response_frame.request_id().ok_or("Request ID must be set.")?;
let mut pending_call_ix = None;
for (ix, pc) in self.pending_rpc_calls.iter().enumerate() {
if pc.request_id == rqid && pc.client_id == client_id {
Expand Down Expand Up @@ -261,15 +259,15 @@ impl Broker {
pub(crate) async fn mount_device(&mut self, client_id: i32, device_id: Option<String>, mount_point: Option<String>, subscribe_path: Option<SubscribePath>) -> crate::Result<()> {
let client_path = format!(".app/broker/client/{}", client_id);
self.mounts.insert(client_path, Mount::Peer(Device { peer_id: client_id }));
let mount_point = loop {
let mount_point = 'mount_point: {
if let Some(ref mount_point) = mount_point {
if mount_point.starts_with("test/") {
info!("Client id: {} mounted on path: '{}'", client_id, &mount_point);
break Some(mount_point.clone());
break 'mount_point Some(mount_point.clone());
}
}
if let Some(device_id) = device_id {
break match self.access.mounts.get(&device_id) {
match self.access.mounts.get(&device_id) {
None => {
warn!("Cannot find mount-point for device ID: {device_id}");
None
Expand All @@ -279,9 +277,10 @@ impl Broker {
info!("Client id: {}, device id: {} mounted on path: '{}'", client_id, device_id, &mount_point);
Some(mount_point)
}
};
}
} else {
None
}
break None;
};
if let Some(mount_point) = &mount_point {
if let Some(peer) = self.peers.get_mut(&client_id) {
Expand Down Expand Up @@ -348,11 +347,11 @@ impl Broker {
Ok(false)
}
let found_cmd = BrokerCommand::PropagateSubscriptions { client_id };
if let Some(true) = check_path(client_id, "/.app/broker/currentClient", &broker_command_sender).await.ok() {
if let Ok(true) = check_path(client_id, "/.app/broker/currentClient", &broker_command_sender).await {
let _ = broker_command_sender.send(found_cmd).await;
} else if let Some(true) = check_path(client_id, ".app/broker/currentClient", &broker_command_sender).await.ok() {
} else if let Ok(true) = check_path(client_id, ".app/broker/currentClient", &broker_command_sender).await{
let _ = broker_command_sender.send(found_cmd).await;
} else if let Some(true) = check_path(client_id,".broker/app", &broker_command_sender).await.ok() {
} else if let Ok(true) = check_path(client_id,".broker/app", &broker_command_sender).await {
let _ = broker_command_sender.send(found_cmd).await;
} else {
let cmd = BrokerCommand::SetSubscribeMethodPath {
Expand All @@ -369,7 +368,7 @@ impl Broker {
//let subscribe_path = peer.broker_subscribe_path()?;
let mount_point = peer.mount_point.clone().ok_or_else(|| format!("Mount point is missing, client ID: {client_id}"))?;
let mut subscribed = HashMap::new();
for (_id, peer) in &self.peers {
for peer in self.peers.values() {
//if id == &client_id { continue }
for subscr in &peer.subscriptions {
subscribed.insert(subscr.to_string(), (subscr.paths.as_str(), subscr.methods.as_str()));
Expand All @@ -383,7 +382,7 @@ impl Broker {
error!("Invalid pattern '{paths}'.")
}
}
let to_subscribe = to_subscribe.iter().map(|s| Subscription::from_str(s)).collect();
let to_subscribe = to_subscribe.iter().map(|s| Subscription::from_str_unchecked(s)).collect();
self.call_subscribe_async(client_id, to_subscribe)?;
Ok(())
}
Expand Down Expand Up @@ -466,13 +465,13 @@ impl Broker {
}
}
}
return Err(RpcError::new(RpcErrorCode::PermissionDenied, format!("Access denied for user: {}", peer.user)));
Err(RpcError::new(RpcErrorCode::PermissionDenied, format!("Access denied for user: {}", peer.user)))
}
PeerKind::ParentBroker => {
match frame.access() {
None => { return Err(RpcError::new(RpcErrorCode::PermissionDenied, "")) }
Some(access) => { return Ok(access.into()); }
};
None => Err(RpcError::new(RpcErrorCode::PermissionDenied, "")),
Some(access) => Ok(access.into()),
}
}
}
}
Expand All @@ -488,15 +487,10 @@ impl Broker {
)
}
fn client_info(&mut self, client_id: CliId) -> Option<rpcvalue::Map> {
match self.peers.get(&client_id) {
None => { None }
Some(peer) => {
Some(Broker::peer_to_info(client_id, peer))
}
}
self.peers.get(&client_id).map(|peer| Broker::peer_to_info(client_id, peer))
}
async fn disconnect_client(&mut self, client_id: CliId) -> crate::Result<()> {
let peer = self.peers.get(&client_id).ok_or_else(|| "Invalid client ID")?;
let peer = self.peers.get(&client_id).ok_or("Invalid client ID")?;
peer.sender.send(BrokerToPeerMessage::DisconnectByBroker).await?;
Ok(())
}
Expand All @@ -514,7 +508,7 @@ impl Broker {
log!(target: "Subscr", Level::Debug, "New subscription for client id: {} - {}", client_id, subscription);
let peer = self.peers.get_mut(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?;
peer.subscriptions.push(SubscriptionPattern::from_subscription(subscription)?);
self.propagate_subscription_to_matching_devices(&subscription)?;
self.propagate_subscription_to_matching_devices(subscription)?;
Ok(())
}
fn unsubscribe(&mut self, client_id: CliId, subscription: &Subscription) -> crate::Result<bool> {
Expand Down Expand Up @@ -593,7 +587,7 @@ impl Broker {
return Ok(())
}
node::METH_CLIENTS => {
let clients: rpcvalue::List = self.peers.iter().map(|(id, _)| RpcValue::from(*id)).collect();
let clients: rpcvalue::List = self.peers.keys().map(|id| RpcValue::from(*id)).collect();
ctx.command_sender.send(send_result_cmd(Ok(clients.into()))).await?;
return Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions src/broker/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl BrokerConfig {
let file_path = Path::new(file_name);
if file_path.exists() {
info!("Loading config file {file_name}");
return match Self::from_file(&file_name) {
return match Self::from_file(file_name) {
Ok(cfg) => {
Ok(cfg)
}
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Default for BrokerConfig {
roles: vec![],
access: vec![
AccessRule { paths: "**".to_string(), methods: "".to_string(), grant: "su,dot-local".to_string() },
].into(),
],
}),
("client".to_string(), Role { roles: vec!["ping".to_string(), "subscribe".to_string(), "browse".to_string()], access: vec![] }),
("device".to_string(), Role { roles: vec!["client".to_string()], access: vec![] }),
Expand All @@ -137,7 +137,7 @@ impl Default for BrokerConfig {
//}),
("child-broker".to_string(), Role { roles: vec!["device".to_string()], access: vec![] }),
("tester".to_string(), Role {
roles: vec!["client".to_string()].into(),
roles: vec!["client".to_string()],
access: vec![
AccessRule { paths: "test/**".to_string(), methods: "".to_string(), grant: "cfg".to_string() },
],
Expand Down Expand Up @@ -169,4 +169,4 @@ impl Default for BrokerConfig {
},
}
}
}
}
25 changes: 13 additions & 12 deletions src/broker/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<BrokerComman
break;
} else {
debug!("Client ID: {client_id}, login credentials.");
frame_writer.send_error(resp_meta, &format!("Invalid login credentials.")).await?;
frame_writer.send_error(resp_meta, "Invalid login credentials.").await?;
continue;
}
}
Expand Down Expand Up @@ -164,15 +164,15 @@ pub(crate) async fn parent_broker_peer_loop_with_reconnect(client_id: i32, confi
if url.scheme() != "tcp" {
return Err(format!("Scheme {} is not supported yet.", url.scheme()).into());
}
let reconnect_interval: std::time::Duration = loop {
let reconnect_interval: std::time::Duration = 'interval: {
if let Some(time_str) = &config.client.reconnect_interval {
if let Ok(interval) = duration_str::parse(time_str) {
break interval
break 'interval interval;
}
}
const DEFAULT_RECONNECT_INTERVAL_SEC: u64 = 10;
info!("Parent broker connection reconnect interval is not set explicitly, default value {DEFAULT_RECONNECT_INTERVAL_SEC} will be used.");
break std::time::Duration::from_secs(DEFAULT_RECONNECT_INTERVAL_SEC)
std::time::Duration::from_secs(DEFAULT_RECONNECT_INTERVAL_SEC)
};
info!("Reconnect interval set to: {:?}", reconnect_interval);
loop {
Expand All @@ -192,8 +192,8 @@ pub(crate) async fn parent_broker_peer_loop_with_reconnect(client_id: i32, confi
fn cut_prefix(shv_path: &str, prefix: &str) -> Option<String> {
if shv_path.starts_with(prefix) && (shv_path.len() == prefix.len() || shv_path[prefix.len() ..].starts_with('/')) {
let shv_path = &shv_path[prefix.len() ..];
if shv_path.starts_with('/') {
Some(shv_path[1 ..].to_string())
if let Some(stripped_path) = shv_path.strip_prefix('/') {
Some(stripped_path.to_string())
} else {
Some(shv_path.to_string())
}
Expand Down Expand Up @@ -224,8 +224,8 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
let login_params = LoginParams{
user,
password,
mount_point: (&config.client.mount.clone().unwrap_or_default()).to_owned(),
device_id: (&config.client.device_id.clone().unwrap_or_default()).to_owned(),
mount_point: config.client.mount.clone().unwrap_or_default().to_owned(),
device_id: config.client.device_id.clone().unwrap_or_default().to_owned(),
heartbeat_interval,
..Default::default()
};
Expand All @@ -251,7 +251,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
if frame.is_request() {
fn is_dot_local_granted(frame: &RpcFrame) -> bool {
let access = frame.access().unwrap_or_default();
access.split(',').find(|s| *s == DOT_LOCAL_GRANT).is_some()
access.split(',').any(|s| s == DOT_LOCAL_GRANT)
}
fn is_dot_local_request(frame: &RpcFrame) -> bool {
let shv_path = frame.shv_path().unwrap_or_default();
Expand All @@ -261,13 +261,14 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
false
}
let shv_path = frame.shv_path().unwrap_or_default().to_owned();
let shv_path = if shv_path.starts_with("/") {
let shv_path = if let Some(stripped_path) = shv_path.strip_prefix('/') {
// parent broker can send requests with absolute path
// to call subscribe(), rejectNotSubscribe() etc.
shv_path[1 ..].to_string()
stripped_path.to_string()
} else if is_dot_local_request(&frame) {
let shv_path = &shv_path[DOT_LOCAL_DIR.len() ..];
let shv_path = if shv_path.starts_with('/') { &shv_path[1 ..] } else { shv_path };
let shv_path = if let Some(stripped_path) = shv_path.strip_prefix('/') {
stripped_path } else { shv_path };
shv_path.to_string()
} else {
if shv_path.is_empty() && is_dot_local_granted(&frame) {
Expand Down
Loading

0 comments on commit dcb8652

Please sign in to comment.