diff --git a/src/peer.rs b/src/peer.rs index a24e102..565e8dd 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -60,6 +60,7 @@ impl PeerProcessor { loop { tokio::select! { + Some(_) = tasks.next() => {}, recv_result = receiver.recv() => { match recv_result { Ok(peer) => { @@ -73,13 +74,19 @@ impl PeerProcessor { is_recheck, processing.clone(), )); + } else { + // Wait for a task to complete before processing more peers + tasks.next().await; + } + } + Err(_) => { + // Receiver is closed; if no tasks are left, break the loop + if tasks.is_empty() { + break; } } - Err(_) => break, // Sender has been dropped } }, - Some(_) = tasks.next() => {}, - else => break, } } } @@ -123,12 +130,22 @@ impl PeerProcessor { }) .collect::>(); + // Ensure resources close before completing stream.close(); - peer_conn.close().await?; + drop(stream); + + match peer_conn.close().await { + Ok(_) => (), + Err(e) => debug!("Error closing connection for peer {}: {:?}", peer, e), + } + drop(peer_conn); + Ok::<_, Box>(new_peers) }) .await; + processing.remove(&peer); + match result { Ok(Ok(new_peers)) => { for new_peer in new_peers { @@ -148,8 +165,6 @@ impl PeerProcessor { .await; } } - - processing.remove(&peer); } }