Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jdx committed Dec 7, 2024
1 parent 47c3320 commit 6c43e60
Showing 1 changed file with 27 additions and 16 deletions.
43 changes: 27 additions & 16 deletions src/ipc/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::ipc::{deserialize, fs_name, serialize, IpcMessage};
use crate::{env, Result};
use eyre::{bail, eyre};
use interprocess::local_socket::tokio::SendHalf;
use interprocess::local_socket::tokio::{RecvHalf, SendHalf};
use interprocess::local_socket::traits::tokio::Listener;
use interprocess::local_socket::traits::tokio::Stream;
use interprocess::local_socket::ListenerOptions;
Expand Down Expand Up @@ -47,6 +47,15 @@ impl IpcServer {
send.write_all(&msg).await?;
Ok(())
}

async fn read_message(recv: &mut BufReader<RecvHalf>) -> Result<Option<IpcMessage>> {
let mut bytes = Vec::new();
recv.read_until(0, &mut bytes).await?;
if bytes.is_empty() {
return Ok(None);
}
Ok(Some(deserialize(&bytes)?))
}

pub async fn read(&mut self) -> Result<IpcMessage> {
self.rx
Expand All @@ -57,25 +66,24 @@ impl IpcServer {

async fn listen(listener: &interprocess::local_socket::tokio::Listener, tx: tokio::sync::mpsc::Sender<IpcMessage>) -> Result<()> {
let stream = listener.accept().await?;
trace!("Client accepted");
let (recv, mut send) = stream.split();
let mut recv = BufReader::new(recv);
let mut bytes = Vec::new();
recv.read_until(0, &mut bytes).await?;
match deserialize(&bytes)? {
IpcMessage::Connect(id) => {
match Self::read_message(&mut recv).await? {
Some(IpcMessage::Connect(id)) => {
debug!("Client connected: {}", id);
Self::send(&mut send, IpcMessage::Response("Hello from server!".into())).await?;
tokio::spawn(async move {
loop {
let mut bytes = Vec::new();
if recv.read_until(0, &mut bytes).await.is_err() {
break;
}
if bytes.is_empty() {
break;
}
let msg = match deserialize(&bytes) {
Ok(msg) => msg,
let msg = match Self::read_message(&mut recv).await {
Ok(Some(msg)) => {
trace!("Received message: {:?}", msg);
msg
},
Ok(None) => {
trace!("Client disconnected: {}", id);
break;
}
Err(err) => {
error!("Failed to deserialize message: {:?}", err);
continue;
Expand All @@ -87,8 +95,11 @@ impl IpcServer {
}
});
},
msg => {
bail!("Unexpected message: {:?}", msg);
Some(msg) => {
bail!("Unexpected message: {:?}", msg);
},
None => {
bail!("No message");
},
};
Ok(())
Expand Down

0 comments on commit 6c43e60

Please sign in to comment.