Skip to content

Commit

Permalink
refactor!: Beautify stream implementation
Browse files Browse the repository at this point in the history
Co-authored-by: Clément DOUIN <[email protected]>
  • Loading branch information
jakoschiko and soywod committed Nov 13, 2024
1 parent e2eb299 commit de9d490
Showing 1 changed file with 18 additions and 25 deletions.
43 changes: 18 additions & 25 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{
collections::VecDeque,
convert::Infallible,
future::{poll_fn, Future},
future::poll_fn,
io::IoSlice,
pin::pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};

use futures_util::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use futures_util::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
FutureExt,
};
#[cfg(debug_assertions)]
use imap_codec::imap_types::utils::escape_byte_string;
use thiserror::Error;
Expand Down Expand Up @@ -85,27 +87,25 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {

// Progress the stream
poll_fn(|cx| {
let result = if self.write_buffer.needs_write() {
let bytes = if self.write_buffer.needs_write() {
// We read and write the stream simultaneously because otherwise
// a deadlock between client and server might occur if both sides
// would only read or only write. We achieve this by polling both
// operations before blocking.
match self.write_buffer.poll_write(&mut self.stream, cx) {
Poll::Ready(result) => return Poll::Ready(result),
Poll::Pending => self.read_buffer.poll_read(&mut self.stream, cx)?,
Poll::Pending => {
ready!(self.read_buffer.poll_read(&mut self.stream, cx)?)
}
}
} else {
// Nothing to write, just read
self.read_buffer.poll_read(&mut self.stream, cx)?
ready!(self.read_buffer.poll_read(&mut self.stream, cx)?)
};

if let Poll::Ready(bytes) = result {
// Provide input bytes to the client/server and try again
state.enqueue_input(bytes);
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
// Provide input bytes to the client/server and try again
state.enqueue_input(bytes);
Poll::Ready(Ok(()))
})
.await?;
};
Expand Down Expand Up @@ -152,10 +152,7 @@ impl ReadBuffer {
cx: &mut Context<'_>,
) -> Poll<Result<&[u8], ReadWriteError>> {
// Constructing this future is cheap
let read_future = pin!(stream.read(&mut self.bytes));
let Poll::Ready(byte_count) = read_future.poll(cx)? else {
return Poll::Pending;
};
let byte_count = ready!(stream.read(&mut self.bytes).poll_unpin(cx)?);

#[cfg(debug_assertions)]
trace!(
Expand Down Expand Up @@ -205,20 +202,16 @@ impl WriteBuffer {
stream: &mut S,
cx: &mut Context<'_>,
) -> Poll<Result<(), ReadWriteError>> {
while !self.bytes.is_empty() {
while self.needs_write() {
let write_slices = &self.write_slices();

// Constructing this future is cheap
let write_future = pin!(stream.write_vectored(write_slices));
let Poll::Ready(byte_count) = write_future.poll(cx)? else {
return Poll::Pending;
};
let byte_count = ready!(stream.write_vectored(write_slices).poll_unpin(cx)?);

#[cfg(debug_assertions)]
trace!(
data = escape_byte_string(
self
.bytes
self.bytes
.iter()
.copied()
.take(byte_count)
Expand Down

0 comments on commit de9d490

Please sign in to comment.