From 8f19d12b8ce53995d8dca0086072613ed08a31d0 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Fri, 2 Aug 2024 20:01:41 +0700 Subject: [PATCH] Differenetiate the different DroppedConnection --- .../core/src/bc_protocol/connection/bcconn.rs | 2 +- .../src/bc_protocol/connection/discovery.rs | 2 +- .../src/bc_protocol/connection/udpsource.rs | 27 ++++++---------- crates/core/src/bc_protocol/errors.rs | 32 +++++++++++++++++++ crates/core/src/bc_protocol/stream.rs | 6 ++-- 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/crates/core/src/bc_protocol/connection/bcconn.rs b/crates/core/src/bc_protocol/connection/bcconn.rs index 04a7b0be..06a5d059 100644 --- a/crates/core/src/bc_protocol/connection/bcconn.rs +++ b/crates/core/src/bc_protocol/connection/bcconn.rs @@ -396,7 +396,7 @@ impl Poller { }; } PollCommand::Disconnect => { - return Err(Error::DroppedConnection); + return Err(Error::ConnectionShutdown); } } } diff --git a/crates/core/src/bc_protocol/connection/discovery.rs b/crates/core/src/bc_protocol/connection/discovery.rs index e6b5d170..7fb70fc1 100644 --- a/crates/core/src/bc_protocol/connection/discovery.rs +++ b/crates/core/src/bc_protocol/connection/discovery.rs @@ -300,7 +300,7 @@ impl Discoverer { } } } - Err(Error::DroppedConnection) + Err(Error::DiscoveryIgnored) } => {v}, v = async { // Send every inter for ever or until channel is no longer viable diff --git a/crates/core/src/bc_protocol/connection/udpsource.rs b/crates/core/src/bc_protocol/connection/udpsource.rs index 7194af21..fa12bebf 100644 --- a/crates/core/src/bc_protocol/connection/udpsource.rs +++ b/crates/core/src/bc_protocol/connection/udpsource.rs @@ -350,19 +350,18 @@ impl UdpPayloadInner { loop { break tokio::select!{ _ = recv_timeout.as_mut() => { - log::trace!("DroppedConnection: Timeout"); - Err(Error::DroppedConnection) + Err(Error::BcUdpTimeout) } packet = inner.next() => { log::trace!("Cam->App"); - let packet = packet.ok_or(Error::DroppedConnection)??; + let packet = packet.ok_or(Error::BcUdpDropReciver)??; recv_timeout.as_mut().reset(Instant::now() + Duration::from_secs(TIME_OUT)); - // let packet = socket_rx.next().await.ok_or(Error::DroppedConnection)??; - socket_out_tx.try_send(packet).map_err(|_| Error::DroppedConnection)?; + // let packet = socket_rx.next().await.ok_or(Error::BcUdpDropReciver)??; + socket_out_tx.try_send(packet).map_err(|_| Error::BcUdpDropReciver)?; continue; }, packet = socket_in_rx.next() => { - let packet = packet.ok_or(Error::DroppedConnection)?; + let packet = packet.ok_or(Error::BcUdpDropSender)?; match tokio::time::timeout(tokio::time::Duration::from_millis(250), inner.send((packet, thread_camera_addr))).await { Ok(written) => { written?; @@ -372,8 +371,8 @@ impl UdpPayloadInner { // Socket is (maybe) broken // Seems to happen with network reconnects like over // a lossy cellular network - let stream = Arc::new(tokio::time::timeout(tokio::time::Duration::from_millis(250), connect_try_port(inner.inner.get_ref().local_addr()?.port())).await.map_err(|_| Error::DroppedConnection)??); - inner = tokio::time::timeout(tokio::time::Duration::from_millis(250), BcUdpSource::new_from_socket(stream, inner.addr)).await.map_err(|_| Error::DroppedConnection)??; + let stream = Arc::new(tokio::time::timeout(tokio::time::Duration::from_millis(250), connect_try_port(inner.inner.get_ref().local_addr()?.port())).await.map_err(|_| Error::BcUdpReconnectTimeout)??); + inner = tokio::time::timeout(tokio::time::Duration::from_millis(250), BcUdpSource::new_from_socket(stream, inner.addr)).await.map_err(|_| Error::BcUdpReconnectTimeout)??; // Inform the camera that we are the same client // @@ -492,10 +491,7 @@ impl UdpPayloadInner { log::trace!("App->Camera"); // Incomming from application // Outgoing on socket - if v.is_none() { - log::trace!("DroppedConnection: self.thread_sink.next(): {:?}", v); - } - let item = v.ok_or(Error::DroppedConnection)?; + let item = v.ok_or(Error::BcUdpDropSender)?; for chunk in item.chunks(MTU - UDPDATA_HEADER_SIZE) { let udp_data = UdpData { @@ -513,10 +509,7 @@ impl UdpPayloadInner { log::trace!("Camera->App"); // Incomming from socket // Outgoing to application - if v.is_none() { - log::trace!("DroppedConnection: self.socket_out.next()"); - } - let (item, addr) = v.ok_or(Error::DroppedConnection)?; + let (item, addr) = v.ok_or(Error::BcUdpDropReciver)?; if addr == camera_addr { match item { BcUdp::Discovery(UdpDiscovery{ @@ -658,7 +651,7 @@ impl UdpPayloadSource { if payload_inner.thread_stream.is_closed() { log::trace!("payload_inner.thread_stream.is_closed"); payload_inner.thread_sink.close(); - return Err(Error::DroppedConnection); + return Err(Error::BcUdpPayloadDroppedInner); } log::trace!("Calling inner"); let res = payload_inner.run().await; diff --git a/crates/core/src/bc_protocol/errors.rs b/crates/core/src/bc_protocol/errors.rs index 64fc30da..d0e6db3f 100644 --- a/crates/core/src/bc_protocol/errors.rs +++ b/crates/core/src/bc_protocol/errors.rs @@ -72,6 +72,38 @@ pub enum Error { #[error("Dropped connection (Broadcast TryRecv)")] BroadcastDroppedConnectionTry(#[from] tokio::sync::broadcast::error::TryRecvError), + /// Raised when a stream thread has finished + #[error("End of Stream")] + StreamFinished, + + /// Raised when a connection requests shutdown + #[error("Connection shuting down")] + ConnectionShutdown, + + /// Raised when a discovery attempt fails to get a reply + #[error("No reply to discovery packet")] + DiscoveryIgnored, + + /// Raised when there is no reply to a UDP packet + #[error("BcUDP packet timeout")] + BcUdpTimeout, + + /// Raised when a BcUdp incomming connection is dropped + #[error("BcUDP reciver dropped")] + BcUdpDropReciver, + + /// Raised when a BcUdp outgoing connection is dropped + #[error("BcUDP sender dropped")] + BcUdpDropSender, + + /// Raised when a BcUdp outgoing connection is dropped + #[error("BcUDPPayload inner protocol was dropped")] + BcUdpPayloadDroppedInner, + + /// Raised when BcUdp sender fails to reconnect in time + #[error("BcUDP reconnect timeout")] + BcUdpReconnectTimeout, + /// Raised when a connection is dropped during a TryRecv event #[error("Send Error")] TokioBcSendError, diff --git a/crates/core/src/bc_protocol/stream.rs b/crates/core/src/bc_protocol/stream.rs index 240a7598..a372bfb9 100644 --- a/crates/core/src/bc_protocol/stream.rs +++ b/crates/core/src/bc_protocol/stream.rs @@ -51,17 +51,17 @@ impl StreamData { if handle.is_finished() { self.abort_handle.cancel(); handle.await??; - return Err(Error::DroppedConnection); + return Err(Error::StreamFinished); } } else { self.abort_handle.cancel(); - return Err(Error::DroppedConnection); + return Err(Error::StreamFinished); } match self.rx.recv().await { Some(data) => Ok(data), None => { self.abort_handle.cancel(); - Err(Error::DroppedConnection) + Err(Error::StreamFinished) } } }