Skip to content

Commit

Permalink
Differenetiate the different DroppedConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
QuantumEntangledAndy committed Aug 2, 2024
1 parent b0e009b commit 8f19d12
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 22 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/bc_protocol/connection/bcconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ impl Poller {
};
}
PollCommand::Disconnect => {
return Err(Error::DroppedConnection);
return Err(Error::ConnectionShutdown);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/bc_protocol/connection/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl Discoverer {
}
}
}
Err(Error::DroppedConnection)
Err(Error::DiscoveryIgnored)
} => {v},
v = async {
// Send every inter for ever or until channel is no longer viable
Expand Down
27 changes: 10 additions & 17 deletions crates/core/src/bc_protocol/connection/udpsource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,19 +350,18 @@ impl UdpPayloadInner {
loop {
break tokio::select!{
_ = recv_timeout.as_mut() => {
log::trace!("DroppedConnection: Timeout");
Err(Error::DroppedConnection)
Err(Error::BcUdpTimeout)
}
packet = inner.next() => {
log::trace!("Cam->App");
let packet = packet.ok_or(Error::DroppedConnection)??;
let packet = packet.ok_or(Error::BcUdpDropReciver)??;
recv_timeout.as_mut().reset(Instant::now() + Duration::from_secs(TIME_OUT));
// let packet = socket_rx.next().await.ok_or(Error::DroppedConnection)??;
socket_out_tx.try_send(packet).map_err(|_| Error::DroppedConnection)?;
// let packet = socket_rx.next().await.ok_or(Error::BcUdpDropReciver)??;
socket_out_tx.try_send(packet).map_err(|_| Error::BcUdpDropReciver)?;
continue;
},
packet = socket_in_rx.next() => {
let packet = packet.ok_or(Error::DroppedConnection)?;
let packet = packet.ok_or(Error::BcUdpDropSender)?;
match tokio::time::timeout(tokio::time::Duration::from_millis(250), inner.send((packet, thread_camera_addr))).await {
Ok(written) => {
written?;
Expand All @@ -372,8 +371,8 @@ impl UdpPayloadInner {
// Socket is (maybe) broken
// Seems to happen with network reconnects like over
// a lossy cellular network
let stream = Arc::new(tokio::time::timeout(tokio::time::Duration::from_millis(250), connect_try_port(inner.inner.get_ref().local_addr()?.port())).await.map_err(|_| Error::DroppedConnection)??);
inner = tokio::time::timeout(tokio::time::Duration::from_millis(250), BcUdpSource::new_from_socket(stream, inner.addr)).await.map_err(|_| Error::DroppedConnection)??;
let stream = Arc::new(tokio::time::timeout(tokio::time::Duration::from_millis(250), connect_try_port(inner.inner.get_ref().local_addr()?.port())).await.map_err(|_| Error::BcUdpReconnectTimeout)??);
inner = tokio::time::timeout(tokio::time::Duration::from_millis(250), BcUdpSource::new_from_socket(stream, inner.addr)).await.map_err(|_| Error::BcUdpReconnectTimeout)??;

// Inform the camera that we are the same client
//
Expand Down Expand Up @@ -492,10 +491,7 @@ impl UdpPayloadInner {
log::trace!("App->Camera");
// Incomming from application
// Outgoing on socket
if v.is_none() {
log::trace!("DroppedConnection: self.thread_sink.next(): {:?}", v);
}
let item = v.ok_or(Error::DroppedConnection)?;
let item = v.ok_or(Error::BcUdpDropSender)?;

for chunk in item.chunks(MTU - UDPDATA_HEADER_SIZE) {
let udp_data = UdpData {
Expand All @@ -513,10 +509,7 @@ impl UdpPayloadInner {
log::trace!("Camera->App");
// Incomming from socket
// Outgoing to application
if v.is_none() {
log::trace!("DroppedConnection: self.socket_out.next()");
}
let (item, addr) = v.ok_or(Error::DroppedConnection)?;
let (item, addr) = v.ok_or(Error::BcUdpDropReciver)?;
if addr == camera_addr {
match item {
BcUdp::Discovery(UdpDiscovery{
Expand Down Expand Up @@ -658,7 +651,7 @@ impl UdpPayloadSource {
if payload_inner.thread_stream.is_closed() {
log::trace!("payload_inner.thread_stream.is_closed");
payload_inner.thread_sink.close();
return Err(Error::DroppedConnection);
return Err(Error::BcUdpPayloadDroppedInner);
}
log::trace!("Calling inner");
let res = payload_inner.run().await;
Expand Down
32 changes: 32 additions & 0 deletions crates/core/src/bc_protocol/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,38 @@ pub enum Error {
#[error("Dropped connection (Broadcast TryRecv)")]
BroadcastDroppedConnectionTry(#[from] tokio::sync::broadcast::error::TryRecvError),

/// Raised when a stream thread has finished
#[error("End of Stream")]
StreamFinished,

/// Raised when a connection requests shutdown
#[error("Connection shuting down")]
ConnectionShutdown,

/// Raised when a discovery attempt fails to get a reply
#[error("No reply to discovery packet")]
DiscoveryIgnored,

/// Raised when there is no reply to a UDP packet
#[error("BcUDP packet timeout")]
BcUdpTimeout,

/// Raised when a BcUdp incomming connection is dropped
#[error("BcUDP reciver dropped")]
BcUdpDropReciver,

/// Raised when a BcUdp outgoing connection is dropped
#[error("BcUDP sender dropped")]
BcUdpDropSender,

/// Raised when a BcUdp outgoing connection is dropped
#[error("BcUDPPayload inner protocol was dropped")]
BcUdpPayloadDroppedInner,

/// Raised when BcUdp sender fails to reconnect in time
#[error("BcUDP reconnect timeout")]
BcUdpReconnectTimeout,

/// Raised when a connection is dropped during a TryRecv event
#[error("Send Error")]
TokioBcSendError,
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/bc_protocol/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ impl StreamData {
if handle.is_finished() {
self.abort_handle.cancel();
handle.await??;
return Err(Error::DroppedConnection);
return Err(Error::StreamFinished);
}
} else {
self.abort_handle.cancel();
return Err(Error::DroppedConnection);
return Err(Error::StreamFinished);
}
match self.rx.recv().await {
Some(data) => Ok(data),
None => {
self.abort_handle.cancel();
Err(Error::DroppedConnection)
Err(Error::StreamFinished)
}
}
}
Expand Down

0 comments on commit 8f19d12

Please sign in to comment.