Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: uid-mux #28

Merged
merged 10 commits into from
May 24, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix deadlock
sinui0 committed May 15, 2024
commit cce24c7bf9dc35f0c0469426b41e25ddbbf4a516
56 changes: 44 additions & 12 deletions uid-mux/src/future.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,10 @@ use std::{
use futures::{ready, AsyncRead, AsyncWrite, Future};
use tokio::sync::oneshot;

use crate::InternalId;
use crate::{
log::{error, trace},
InternalId,
};

const BUF: usize = 32;

@@ -17,6 +20,12 @@ struct Inner<Io> {
id: [u8; BUF],
}

impl<Io> Inner<Io> {
fn is_done(&self) -> bool {
self.count == 32
}
}

#[derive(Debug)]
enum State<Io> {
Pending(Inner<Io>),
@@ -60,9 +69,12 @@ where
pin!(&mut state.io).poll_read(cx, &mut state.id[state.count as usize..])?
{
state.count += read as u8;
if state.count == 32 {
return Poll::Ready(Ok((state.id.into(), state.io)));
if state.is_done() {
let id = InternalId(state.id);
trace!("read id: {}", id);
return Poll::Ready(Ok((id, state.io)));
} else if read == 0 {
error!("remote closed before sending id");
return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into()));
}
}
@@ -99,16 +111,29 @@ where
panic!("poll after completion");
};

while let Poll::Ready(sent) =
pin!(&mut state.io).poll_write(cx, &state.id[state.count as usize..])?
{
state.count += sent as u8;
if state.count == 32 {
// If we haven't finished sending the id, keep sending it.
if !state.is_done() {
while let Poll::Ready(sent) =
pin!(&mut state.io).poll_write(cx, &state.id[state.count as usize..])?
{
state.count += sent as u8;
if state.is_done() {
break;
}
}
}

// If we've finished sending, flush the write buffer. If flushing
// succeeds then we can return Ready, otherwise we need to keep
// trying.
if state.is_done() {
if pin!(&mut state.io).poll_flush(cx)?.is_ready() {
return Poll::Ready(Ok(state.io));
}
}

self.0 = State::Pending(state);

Poll::Pending
}
}
@@ -153,19 +178,26 @@ where
#[cfg(test)]
mod tests {
use super::*;

use tokio::io::duplex;
use tokio_util::compat::TokioAsyncReadCompatExt as _;

#[test]
fn test_id_future_multiple() {
let id = InternalId([42u8; 32]);
fn test_id_future() {
tracing_subscriber::fmt::init();
let id_0 = InternalId([42u8; 32]);

// send 1 byte at a time
let (io_0, io_1) = duplex(1);

futures::executor::block_on(async {
futures::try_join!(WriteId::new(io_0.compat(), id), ReadId::new(io_1.compat()))
.unwrap();
let (_, (id_1, _)) = futures::try_join!(
WriteId::new(io_0.compat(), id_0),
ReadId::new(io_1.compat())
)
.unwrap();

assert_eq!(id_0, id_1);
});
}
}
6 changes: 0 additions & 6 deletions uid-mux/src/lib.rs
Original file line number Diff line number Diff line change
@@ -40,12 +40,6 @@ impl AsRef<[u8]> for InternalId {
}
}

impl From<[u8; 32]> for InternalId {
fn from(id: [u8; 32]) -> Self {
Self(id)
}
}

/// A multiplexer that opens streams with unique ids.
#[async_trait]
pub trait UidMux<Id> {
Loading