Skip to content

Commit

Permalink
WIP: Use tokio-streams
Browse files Browse the repository at this point in the history
  • Loading branch information
wiktor-k committed Feb 5, 2024
1 parent 349a62e commit 76bab91
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ bytes = { version = "1.1", optional = true }
#futures = { version = "0.1.25", optional = true }
futures = { version = "0.3.21", optional = true }
log = { version = "0.4.16", optional = true }
tokio = { version = "1", optional = true, features = ["rt", "net"] }
tokio = { version = "1", optional = true, features = ["rt", "net", "rt-multi-thread"] }
tokio-util = { version = "0.7", optional = true, features = ["codec"] }
tokio-stream = { version = "0.1.14", optional = true, features = ["net"] }
#tokio-uds = { version = "0.2.5", optional = true }

[features]
default = ["agent"]
agent = ["log", "tokio", "tokio-util", "bytes", "futures"]
agent = ["log", "tokio", "tokio-util", "bytes", "futures", "tokio-stream"]

[[example]]
name = "key_storage"
Expand Down
18 changes: 10 additions & 8 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@ impl Encoder<Message> for MessageCodec {

fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = to_bytes(&to_bytes(&item)?)?;
dst.put(bytes);
dst.put(&bytes[..]);
Ok(())
}
}

macro_rules! handle_clients {
($self:ident, $socket:ident) => {{
($self:ident, $wrapper:ident, $socket:ident) => {{
use futures::FutureExt;
use futures::TryFutureExt;
info!("Listening; socket = {:?}", $socket);
let arc_self = Arc::new($self);
$socket
.incoming()
tokio_stream::wrappers::$wrapper::new($socket)
.map_err(|e| error!("Failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let socket = socket.unwrap(); //FIXME
let (write, read) = Framed::new(socket, MessageCodec).split();
let arc_self = arc_self.clone();
let connection = write
Expand All @@ -95,13 +95,13 @@ pub trait Agent: 'static + Sync + Send + Sized {
&self,
message: Message,
) -> Box<dyn Future<Output = Result<Message, Self::Error>> + Send + Sync> {
Box::new(self.handle(message))
Box::new(async { self.handle(message) })
}

#[allow(clippy::unit_arg)]
fn run_listener(self, socket: UnixListener) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let res = rt.block_on(handle_clients!(self, socket));
let res = rt.block_on(handle_clients!(self, UnixListenerStream, socket));
Ok(res)
}

Expand All @@ -111,9 +111,11 @@ pub trait Agent: 'static + Sync + Send + Sized {

#[allow(clippy::unit_arg)]
fn run_tcp(self, addr: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
let socket = TcpListener::bind(&addr.parse::<SocketAddr>()?)?;
let mut rt = tokio::runtime::Runtime::new().unwrap();
let res = rt.block_on(handle_clients!(self, socket));
let res = rt.block_on(async {
let socket = TcpListener::bind(&addr.parse::<SocketAddr>()?).await?;
handle_clients!(self, TcpListenerStream, socket)
});
Ok(res)
}
}

0 comments on commit 76bab91

Please sign in to comment.