From 10b8c6ad8795b8f463438b044a54156ed45aea22 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 15 Jan 2025 18:19:49 +0100 Subject: [PATCH] Daemon: React to ctrl-c during connection setup The daemon tries to connect to the coordinator before it starts up. During that time, it didn't listen for ctrl-c signals yet. This commit fixes this limitation by checking for ctrl-c events during the event stream setup too. --- binaries/daemon/src/lib.rs | 114 +++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 48 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 5c4795a2a..e016f0c6b 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -36,6 +36,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, net::SocketAddr, path::{Path, PathBuf}, + pin::pin, sync::Arc, time::{Duration, Instant}, }; @@ -101,53 +102,28 @@ impl Daemon { ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); - let ctrlc_events = set_up_ctrlc_handler(clock.clone())?; - - // spawn inter daemon listen loop - let (events_tx, events_rx) = flume::bounded(10); - let listen_port = - inter_daemon::spawn_listener_loop(inter_daemon_addr, machine_id.clone(), events_tx) - .await?; - let daemon_events = events_rx.into_stream().map(|e| Timestamped { - inner: Event::Daemon(e.inner), - timestamp: e.timestamp, - }); + let mut ctrlc_events = set_up_ctrlc_handler(clock.clone())?; + let incoming_events = { + let incoming_events = set_up_event_stream( + coordinator_addr, + &machine_id, + inter_daemon_addr, + local_listen_port, + &clock, + ); - // connect to the coordinator - let coordinator_events = - coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock) - .await - .wrap_err("failed to connect to dora-coordinator")? - .map( - |Timestamped { - inner: event, - timestamp, - }| Timestamped { - inner: Event::Coordinator(event), - timestamp, - }, - ); - - // Spawn local listener loop - let (events_tx, events_rx) = flume::bounded(10); - let _listen_port = local_listener::spawn_listener_loop( - (LOCALHOST, local_listen_port).into(), - machine_id.clone(), - events_tx, - ) - .await?; - let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped { - inner: Event::DynamicNode(e.inner), - timestamp: e.timestamp, - }); + // finish early if ctrl-c is is pressed during event stream setup + let ctrl_c = pin!(ctrlc_events.recv()); + match futures::future::select(ctrl_c, pin!(incoming_events)).await { + future::Either::Left((_ctrl_c, _)) => { + tracing::info!("received ctrl-c signal -> stopping daemon"); + return Ok(()); + } + future::Either::Right((events, _)) => events?, + } + }; Self::run_general( - ( - coordinator_events, - ctrlc_events, - daemon_events, - dynamic_node_events, - ) - .merge(), + (ReceiverStream::new(ctrlc_events), incoming_events).merge(), Some(coordinator_addr), machine_id, None, @@ -180,7 +156,7 @@ impl Daemon { let clock = Arc::new(HLC::default()); - let ctrlc_events = set_up_ctrlc_handler(clock.clone())?; + let ctrlc_events = ReceiverStream::new(set_up_ctrlc_handler(clock.clone())?); let exit_when_done = spawn_command .nodes @@ -1352,6 +1328,48 @@ impl Daemon { } } +async fn set_up_event_stream( + coordinator_addr: SocketAddr, + machine_id: &String, + inter_daemon_addr: SocketAddr, + local_listen_port: u16, + clock: &Arc, +) -> eyre::Result<(impl Stream> + Unpin)> { + let (events_tx, events_rx) = flume::bounded(10); + let listen_port = + inter_daemon::spawn_listener_loop(inter_daemon_addr, machine_id.clone(), events_tx).await?; + let daemon_events = events_rx.into_stream().map(|e| Timestamped { + inner: Event::Daemon(e.inner), + timestamp: e.timestamp, + }); + let coordinator_events = + coordinator::register(coordinator_addr, machine_id.clone(), listen_port, clock) + .await + .wrap_err("failed to connect to dora-coordinator")? + .map( + |Timestamped { + inner: event, + timestamp, + }| Timestamped { + inner: Event::Coordinator(event), + timestamp, + }, + ); + let (events_tx, events_rx) = flume::bounded(10); + let _listen_port = local_listener::spawn_listener_loop( + (LOCALHOST, local_listen_port).into(), + machine_id.clone(), + events_tx, + ) + .await?; + let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped { + inner: Event::DynamicNode(e.inner), + timestamp: e.timestamp, + }); + let incoming = (coordinator_events, daemon_events, dynamic_node_events).merge(); + Ok(incoming) +} + async fn send_output_to_local_receivers( node_id: NodeId, output_id: DataId, @@ -1868,7 +1886,7 @@ fn send_with_timestamp( fn set_up_ctrlc_handler( clock: Arc, -) -> Result>, eyre::ErrReport> { +) -> eyre::Result>> { let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1); let mut ctrlc_sent = 0; @@ -1895,7 +1913,7 @@ fn set_up_ctrlc_handler( }) .wrap_err("failed to set ctrl-c handler")?; - Ok(ReceiverStream::new(ctrlc_rx)) + Ok(ctrlc_rx) } #[derive(Debug, Default, Clone, PartialEq, Eq)]