Skip to content

Commit

Permalink
better async handling
Browse files Browse the repository at this point in the history
  • Loading branch information
denisu committed Nov 5, 2024
1 parent fabd25a commit 2ff7c50
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl PeerProcessor {

loop {
tokio::select! {
Some(_) = tasks.next() => {},
recv_result = receiver.recv() => {
match recv_result {
Ok(peer) => {
Expand All @@ -73,13 +74,19 @@ impl PeerProcessor {
is_recheck,
processing.clone(),
));
} else {
// Wait for a task to complete before processing more peers
tasks.next().await;
}
}
Err(_) => {
// Receiver is closed; if no tasks are left, break the loop
if tasks.is_empty() {
break;
}
}
Err(_) => break, // Sender has been dropped
}
},
Some(_) = tasks.next() => {},
else => break,
}
}
}
Expand Down Expand Up @@ -123,12 +130,22 @@ impl PeerProcessor {
})
.collect::<Vec<_>>();

// Ensure resources close before completing
stream.close();
peer_conn.close().await?;
drop(stream);

match peer_conn.close().await {
Ok(_) => (),
Err(e) => debug!("Error closing connection for peer {}: {:?}", peer, e),
}
drop(peer_conn);

Ok::<_, Box<dyn std::error::Error + Send + Sync>>(new_peers)
})
.await;

processing.remove(&peer);

match result {
Ok(Ok(new_peers)) => {
for new_peer in new_peers {
Expand All @@ -148,8 +165,6 @@ impl PeerProcessor {
.await;
}
}

processing.remove(&peer);
}
}

Expand Down

0 comments on commit 2ff7c50

Please sign in to comment.