Skip to content

Commit

Permalink
Implement lsmod #10
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Sep 5, 2024
1 parent c59f0b8 commit bbc66af
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 18 deletions.
43 changes: 39 additions & 4 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,19 @@ impl BrokerState {
None
}
}
fn remove_peer(&mut self, peer_id: PeerId) -> shvrpc::Result<()> {
fn remove_peer(&mut self, peer_id: PeerId) -> shvrpc::Result<Option<String>> {
self.peers.remove(&peer_id);
self.mounts.retain(|_, v| if let Mount::Peer(id) = v { *id != peer_id } else { true });
Ok(())
let mut mount_point = None;
self.mounts.retain(|k, v| {
if let Mount::Peer(id) = v {
if *id == peer_id {
mount_point = Some(k.clone());
return false
}
}
true
});
Ok(mount_point)
}
fn set_subscribe_path(&mut self, peer_id: PeerId, subscribe_path: SubscribePath) -> shvrpc::Result<()> {
let peer = self.peers.get_mut(&peer_id).ok_or("Peer not found")?;
Expand Down Expand Up @@ -709,6 +718,12 @@ pub(crate) fn state_reader(state: &SharedBrokerState) -> RwLockReadGuard<BrokerS
pub(crate) fn state_writer(state: &SharedBrokerState) -> RwLockWriteGuard<BrokerState> {
state.write().unwrap()
}
fn split_mount_point(mount_point: &str) -> shvrpc::Result<(&str, &str)> {
let ix = mount_point.rfind('/').ok_or("Empty path ???")?;
let dir = &mount_point[ix + 1 ..];
let prefix = &mount_point[..ix];
Ok((prefix, dir))
}
impl BrokerImpl {
pub(crate) fn new(access: AccessConfig, sql_connection: Option<rusqlite::Connection>) -> Self {
let (command_sender, command_receiver) = unbounded();
Expand Down Expand Up @@ -952,11 +967,31 @@ impl BrokerImpl {
sender} => {
info!("New peer, id: {peer_id}.");
state_writer(&self.state).add_peer(peer_id, user, peer_kind, mount_point, device_id, sender)?;
let mount_point = state_reader(&self.state).mount_point(peer_id);
if let Some(mount_point) = mount_point {
let (shv_path, dir) = split_mount_point(&mount_point)?;
let command_sender = state_reader(&self.state).command_sender.clone();
command_sender.send(BrokerCommand::SendSignal {
shv_path: shv_path.to_string(),
signal: "lsmod".to_string(),
source: "ls".to_string(),
param: Map::from([(dir.to_string(), true.into())]).into(),
}).await?;
}
spawn_and_log_error(Self::on_device_mounted(self.state.clone(), peer_id));
}
BrokerCommand::PeerGone { peer_id } => {
info!("Peer gone, id: {peer_id}.");
state_writer(&self.state).remove_peer(peer_id)?;
let mount_point = state_writer(&self.state).remove_peer(peer_id)?;
if let Some(mount_point) = mount_point {
let (shv_path, dir) = split_mount_point(&mount_point)?;
self.command_sender.send(BrokerCommand::SendSignal {
shv_path: shv_path.to_string(),
signal: "lsmod".to_string(),
source: "ls".to_string(),
param: Map::from([(dir.to_string(), false.into())]).into(),
}).await?;
}
self.pending_rpc_calls.retain(|c| c.client_id != peer_id);
}
BrokerCommand::GetPassword { sender, user } => {
Expand Down
12 changes: 12 additions & 0 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ async fn test_broker_loop() {
device_id: None,
sender: peer_writer.clone() }).await.unwrap();

loop {
if let BrokerToPeerMessage::SendFrame(frame) = call_ctx.reader.recv().await.unwrap() {
if frame.method() == Some("lsmod") {
assert_eq!(frame.shv_path(), Some("test"));
assert_eq!(frame.source(), Some("ls"));
let msg = frame.to_rpcmesage().unwrap();
assert_eq!(msg.param().unwrap().as_map(), &Map::from([("device".to_string(), true.into())]));
break
}
}
}

let resp = call(".broker", "ls", Some("access".into()), &call_ctx).await.unwrap();
assert_eq!(resp, RpcValue::from(true));
let resp = call(".broker/access", "ls", None, &call_ctx).await.unwrap();
Expand Down
17 changes: 3 additions & 14 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ pub fn rpcmsg_from_output(output: Output) -> shvrpc::Result<RpcMessage> {
pub fn rpcvalue_from_output(output: Output) -> shvrpc::Result<RpcValue> {
let out = bytes_from_output(output)?;
let cpon = std::str::from_utf8(&out)?;
Ok(RpcValue::from_cpon(cpon)?)
let rv = RpcValue::from_cpon(cpon)?;
//println!("cpon: {}, rpc vla: {}", cpon, rv.to_cpon());
Ok(rv)
}
pub fn bytes_from_output(output: Output) -> shvrpc::Result<Vec<u8>> {
if !output.status.success() {
Expand All @@ -56,19 +58,6 @@ pub fn string_list_from_output(output: Output) -> shvrpc::Result<Vec<String>> {
}
Ok(values)
}
//pub fn value_list_from_output(output: Output) -> shv::Result<Vec<RpcValue>> {
// let mut values = List::new();
// let bytes = bytes_from_output(output)?;
// let mut buff: &[u8] = &bytes;
// let mut rd = CponReader::new(&mut buff);
// loop {
// match rd.read() {
// Ok(rv) => { values.push(rv) }
// Err(_) => { break; }
// }
// }
// Ok(values)
//}
pub fn result_from_output(output: Output) -> shvrpc::Result<RpcValue> {
let msg = rpcmsg_from_output(output)?;
let result = msg.result()?;
Expand Down

0 comments on commit bbc66af

Please sign in to comment.