Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Nov 17, 2023
1 parent eca6915 commit 8f9de78
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,48 +256,55 @@ where
if inner.io.is_closed() {
log::trace!("io has been closed, stop dispatcher");
inner.st = IoDispatcherState::Stop;
continue;
}

// decode incoming bytes stream
match inner.io.poll_recv_decode(this.codec, cx) {
Ok(decoded) => {
inner.update_timer(&decoded);
if let Some(el) = decoded.item {
DispatchItem::Item(el)
} else {
return Poll::Pending;
}
}
Err(RecvError::Stop) => {
log::trace!("dispatcher is instructed to stop");
inner.st = IoDispatcherState::Stop;
continue;
if let Poll::Ready(IoStatusUpdate::PeerGone(err)) =
inner.io.poll_status_update(cx)
{
DispatchItem::Disconnect(err)
} else {
DispatchItem::Disconnect(None)
}
Err(RecvError::KeepAlive) => {
log::trace!("keep-alive error, stopping dispatcher");
inner.st = IoDispatcherState::Stop;
if inner.flags.contains(Flags::READ_TIMEOUT) {
DispatchItem::ReadTimeout
} else {
DispatchItem::KeepAliveTimeout
} else {
// decode incoming bytes stream
match inner.io.poll_recv_decode(this.codec, cx) {
Ok(decoded) => {
inner.update_timer(&decoded);
if let Some(el) = decoded.item {
DispatchItem::Item(el)
} else {
return Poll::Pending;
}
}
}
Err(RecvError::WriteBackpressure) => {
if let Err(err) = ready!(inner.io.poll_flush(cx, false)) {
Err(RecvError::Stop) => {
log::trace!("dispatcher is instructed to stop");
inner.st = IoDispatcherState::Stop;
DispatchItem::Disconnect(Some(err))
} else {
continue;
}
}
Err(RecvError::Decoder(err)) => {
inner.st = IoDispatcherState::Stop;
DispatchItem::DecoderError(err)
}
Err(RecvError::PeerGone(err)) => {
inner.st = IoDispatcherState::Stop;
DispatchItem::Disconnect(err)
Err(RecvError::KeepAlive) => {
log::trace!("keep-alive error, stopping dispatcher");
inner.st = IoDispatcherState::Stop;
if inner.flags.contains(Flags::READ_TIMEOUT) {
DispatchItem::ReadTimeout
} else {
DispatchItem::KeepAliveTimeout
}
}
Err(RecvError::WriteBackpressure) => {
if let Err(err) = ready!(inner.io.poll_flush(cx, false))
{
inner.st = IoDispatcherState::Stop;
DispatchItem::Disconnect(Some(err))
} else {
continue;
}
}
Err(RecvError::Decoder(err)) => {
inner.st = IoDispatcherState::Stop;
DispatchItem::DecoderError(err)
}
Err(RecvError::PeerGone(err)) => {
inner.st = IoDispatcherState::Stop;
DispatchItem::Disconnect(err)
}
}
}
}
Expand Down

0 comments on commit 8f9de78

Please sign in to comment.