diff --git a/Cargo.lock b/Cargo.lock index 3d863f051..6802cb7a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1367,13 +1367,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "g3-signal" -version = "0.3.0" -dependencies = [ - "tokio", -] - [[package]] name = "g3-slog-types" version = "0.1.0" @@ -1607,7 +1600,6 @@ dependencies = [ "g3-io-ext", "g3-openssl", "g3-runtime", - "g3-signal", "g3-socket", "g3-socks", "g3-statsd-client", @@ -1712,7 +1704,6 @@ dependencies = [ "g3-histogram", "g3-io-ext", "g3-openssl", - "g3-signal", "g3-slog-types", "g3-socket", "g3-statsd-client", @@ -1815,7 +1806,6 @@ dependencies = [ "g3-msgpack", "g3-openssl", "g3-resolver", - "g3-signal", "g3-slog-types", "g3-smtp-proto", "g3-socket", @@ -1936,7 +1926,6 @@ dependencies = [ "g3-histogram", "g3-io-ext", "g3-openssl", - "g3-signal", "g3-slog-types", "g3-socket", "g3-statsd-client", @@ -3349,14 +3338,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "test-int-signal" -version = "0.1.0" -dependencies = [ - "g3-signal", - "tokio", -] - [[package]] name = "test-resolver" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 619bb37cb..83e01c9fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,6 @@ members = [ "lib/g3-daemon", "lib/g3-ctl", "lib/g3-socket", - "lib/g3-signal", "lib/g3-compat", "lib/g3-clap", "lib/g3-yaml", @@ -52,7 +51,6 @@ members = [ "g3keymess", "g3keymess/proto", "g3keymess/utils/ctl", - "demo/test-int-signal", "demo/test-tcp-relay", "demo/test-resolver", "demo/test-copy-yield", @@ -208,7 +206,6 @@ g3-msgpack = { version = "0.2", path = "lib/g3-msgpack" } g3-hickory-client = { version = "0.1", path = "lib/g3-hickory-client" } g3-resolver = { version = "0.5", path = "lib/g3-resolver" } g3-runtime = { version = "0.3", path = "lib/g3-runtime" } -g3-signal = { version = "0.3", path = "lib/g3-signal" } g3-socket = { version = "0.4", path = "lib/g3-socket" } g3-socks = { version = "0.1", path = "lib/g3-socks" } g3-openssl = { version = "0.1", path = "lib/g3-openssl" } diff --git a/demo/test-int-signal/Cargo.toml b/demo/test-int-signal/Cargo.toml deleted file mode 100644 index 5fac069aa..000000000 --- a/demo/test-int-signal/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "test-int-signal" -version = "0.1.0" -license.workspace = true -edition.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } -g3-signal.workspace = true diff --git a/demo/test-int-signal/src/main.rs b/demo/test-int-signal/src/main.rs deleted file mode 100644 index 58c461a7f..000000000 --- a/demo/test-int-signal/src/main.rs +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use tokio::signal::unix::SignalKind; - -use g3_signal::{ActionSignal, SigResult}; - -fn do_at_quit(count: u32) -> SigResult { - match count { - 1 => { - println!("press 'Ctrl-C' again to quit"); - SigResult::Continue - } - _ => { - println!("quit"); - SigResult::Break - } - } -} - -#[tokio::main] -async fn main() { - let sig = ActionSignal::new(SignalKind::interrupt(), &do_at_quit).unwrap(); - println!("SIGINT registered, press 'Ctrl-C' to quit"); - sig.await; -} diff --git a/g3bench/Cargo.toml b/g3bench/Cargo.toml index 7e0a512e9..ff5bf6cd6 100644 --- a/g3bench/Cargo.toml +++ b/g3bench/Cargo.toml @@ -14,7 +14,7 @@ anyhow.workspace = true clap.workspace = true clap_complete.workspace = true indicatif = "0.17" -tokio = { workspace = true, features = ["rt", "net", "macros"] } +tokio = { workspace = true, features = ["rt", "net", "macros", "signal"] } http.workspace = true url.workspace = true h2.workspace = true @@ -39,7 +39,6 @@ governor = { workspace = true, features = ["std", "jitter"] } hickory-client.workspace = true hickory-proto.workspace = true g3-runtime.workspace = true -g3-signal.workspace = true g3-types = { workspace = true, features = ["openssl", "rustls"] } g3-clap.workspace = true g3-socket.workspace = true diff --git a/g3bench/src/target/mod.rs b/g3bench/src/target/mod.rs index dc5fd304c..cec5f60ab 100644 --- a/g3bench/src/target/mod.rs +++ b/g3bench/src/target/mod.rs @@ -22,11 +22,9 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use governor::RateLimiter; use hdrhistogram::Histogram; -use tokio::signal::unix::SignalKind; use tokio::sync::{mpsc, Barrier, Semaphore}; use tokio::time::{Instant, MissedTickBehavior}; -use g3_signal::{ActionSignal, SigResult}; use g3_statsd_client::StatsdClient; use super::ProcArgs; @@ -169,9 +167,13 @@ where fn notify_finish(&mut self) {} } -fn quit_at_sigint(_count: u32) -> SigResult { - stats::mark_force_quit(); - SigResult::Break +fn register_signal_handler() { + tokio::spawn(async move { + if let Err(e) = tokio::signal::ctrl_c().await { + eprintln!("error when waiting Ctrl-C: {e}"); + } + stats::mark_force_quit(); + }); } async fn run(mut target: T, proc_args: &ProcArgs) -> anyhow::Result<()> @@ -188,10 +190,7 @@ where let progress_counter = progress.as_ref().map(|p| p.counter()); stats::init_global_state(proc_args.requests, proc_args.log_error_count); - tokio::spawn( - ActionSignal::new(SignalKind::interrupt(), &quit_at_sigint) - .map_err(|e| anyhow!("failed to set handler for SIGINT: {e:?}"))?, - ); + register_signal_handler(); let rate_limit = proc_args .rate_limit diff --git a/g3keymess/Cargo.toml b/g3keymess/Cargo.toml index 091a90461..07c5d0f6c 100644 --- a/g3keymess/Cargo.toml +++ b/g3keymess/Cargo.toml @@ -31,7 +31,6 @@ itoa.workspace = true arc-swap.workspace = true serde_json.workspace = true g3-daemon = { workspace = true, features = ["register"] } -g3-signal.workspace = true g3-yaml = { workspace = true, features = ["histogram"] } g3-types = { workspace = true, features = [] } g3-socket.workspace = true diff --git a/g3keymess/src/main.rs b/g3keymess/src/main.rs index 156e2d236..5f0ae7926 100644 --- a/g3keymess/src/main.rs +++ b/g3keymess/src/main.rs @@ -120,7 +120,7 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> { }); } - g3keymess::signal::setup_and_spawn().context("failed to setup signal handler")?; + g3keymess::signal::register().context("failed to setup signal handler")?; g3keymess::store::load_all() .await diff --git a/g3keymess/src/signal.rs b/g3keymess/src/signal.rs index e6f0a7e50..e2e090402 100644 --- a/g3keymess/src/signal.rs +++ b/g3keymess/src/signal.rs @@ -15,31 +15,12 @@ */ use log::{error, info, warn}; -use tokio::signal::unix::SignalKind; use tokio::sync::Mutex; -use g3_signal::{ActionSignal, SigResult}; +use g3_daemon::signal::AsyncSignalAction; static RELOAD_MUTEX: Mutex<()> = Mutex::const_new(()); -fn do_quit(_: u32) -> SigResult { - info!("got quit signal"); - tokio::spawn(crate::control::UniqueController::abort_immediately()); - SigResult::Break -} - -fn go_offline(_: u32) -> SigResult { - info!("got offline signal"); - tokio::spawn(crate::control::DaemonController::abort()); - SigResult::Break -} - -fn call_reload(_: u32) -> SigResult { - info!("got reload signal"); - tokio::spawn(do_reload()); - SigResult::Continue -} - async fn do_reload() { let _guard = RELOAD_MUTEX.lock().await; info!("reloading config"); @@ -59,10 +40,33 @@ async fn do_reload() { info!("reload finished"); } -pub fn setup_and_spawn() -> anyhow::Result<()> { - tokio::spawn(ActionSignal::new(SignalKind::quit(), &do_quit)?); - tokio::spawn(ActionSignal::new(SignalKind::interrupt(), &do_quit)?); - tokio::spawn(ActionSignal::new(SignalKind::terminate(), &go_offline)?); - tokio::spawn(ActionSignal::new(SignalKind::hangup(), &call_reload)?); - Ok(()) +#[derive(Clone, Copy)] +struct QuitAction {} + +impl AsyncSignalAction for QuitAction { + async fn run(&self) { + crate::control::UniqueController::abort_immediately().await + } +} + +#[derive(Clone, Copy)] +struct OfflineAction {} + +impl AsyncSignalAction for OfflineAction { + async fn run(&self) { + crate::control::DaemonController::abort().await + } +} + +#[derive(Clone, Copy)] +struct ReloadAction {} + +impl AsyncSignalAction for ReloadAction { + async fn run(&self) { + do_reload().await + } +} + +pub fn register() -> anyhow::Result<()> { + g3_daemon::signal::register(QuitAction {}, OfflineAction {}, ReloadAction {}) } diff --git a/g3proxy/Cargo.toml b/g3proxy/Cargo.toml index f53f19aa8..44b0b9037 100644 --- a/g3proxy/Cargo.toml +++ b/g3proxy/Cargo.toml @@ -63,7 +63,6 @@ pyo3 = { workspace = true, features = ["auto-initialize"], optional = true } g3-types = { workspace = true, features = ["auth-crypt", "rustls", "openssl", "acl-rule", "http", "route", "async-log"] } g3-socket.workspace = true g3-daemon.workspace = true -g3-signal.workspace = true g3-datetime.workspace = true g3-statsd-client.workspace = true g3-histogram.workspace = true diff --git a/g3proxy/src/main.rs b/g3proxy/src/main.rs index ed1fca9e0..a20639ea8 100644 --- a/g3proxy/src/main.rs +++ b/g3proxy/src/main.rs @@ -111,7 +111,7 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> { }); } - g3proxy::signal::setup_and_spawn().context("failed to setup signal handler")?; + g3proxy::signal::register().context("failed to setup signal handler")?; g3proxy::resolve::spawn_all() .await .context("failed to spawn all resolvers")?; diff --git a/g3proxy/src/signal.rs b/g3proxy/src/signal.rs index b4d9a7ecf..3f6a188a6 100644 --- a/g3proxy/src/signal.rs +++ b/g3proxy/src/signal.rs @@ -15,31 +15,12 @@ */ use log::{error, info, warn}; -use tokio::signal::unix::SignalKind; use tokio::sync::Mutex; -use g3_signal::{ActionSignal, SigResult}; +use g3_daemon::signal::AsyncSignalAction; static RELOAD_MUTEX: Mutex<()> = Mutex::const_new(()); -fn do_quit(_: u32) -> SigResult { - info!("got quit signal"); - tokio::spawn(crate::control::UniqueController::abort_immediately()); - SigResult::Break -} - -fn go_offline(_: u32) -> SigResult { - info!("got offline signal"); - tokio::spawn(crate::control::DaemonController::abort()); - SigResult::Break -} - -fn call_reload(_: u32) -> SigResult { - info!("got reload signal"); - tokio::spawn(do_reload()); - SigResult::Continue -} - async fn do_reload() { let _guard = RELOAD_MUTEX.lock().await; info!("reloading config"); @@ -68,10 +49,33 @@ async fn do_reload() { info!("reload finished"); } -pub fn setup_and_spawn() -> anyhow::Result<()> { - tokio::spawn(ActionSignal::new(SignalKind::quit(), &do_quit)?); - tokio::spawn(ActionSignal::new(SignalKind::interrupt(), &do_quit)?); - tokio::spawn(ActionSignal::new(SignalKind::terminate(), &go_offline)?); - tokio::spawn(ActionSignal::new(SignalKind::hangup(), &call_reload)?); - Ok(()) +#[derive(Clone, Copy)] +struct QuitAction {} + +impl AsyncSignalAction for QuitAction { + async fn run(&self) { + crate::control::UniqueController::abort_immediately().await + } +} + +#[derive(Clone, Copy)] +struct OfflineAction {} + +impl AsyncSignalAction for OfflineAction { + async fn run(&self) { + crate::control::DaemonController::abort().await + } +} + +#[derive(Clone, Copy)] +struct ReloadAction {} + +impl AsyncSignalAction for ReloadAction { + async fn run(&self) { + do_reload().await + } +} + +pub fn register() -> anyhow::Result<()> { + g3_daemon::signal::register(QuitAction {}, OfflineAction {}, ReloadAction {}) } diff --git a/g3tiles/Cargo.toml b/g3tiles/Cargo.toml index 7b32e8811..ac8402438 100644 --- a/g3tiles/Cargo.toml +++ b/g3tiles/Cargo.toml @@ -41,7 +41,6 @@ bitflags.workspace = true flume.workspace = true rustc-hash.workspace = true g3-daemon.workspace = true -g3-signal.workspace = true g3-yaml = { workspace = true, features = ["acl-rule", "route", "openssl", "rustls", "histogram"] } g3-types = { workspace = true, features = ["acl-rule", "route", "openssl", "rustls"] } g3-socket.workspace = true diff --git a/g3tiles/src/main.rs b/g3tiles/src/main.rs index ddb21a2fc..2207ab222 100644 --- a/g3tiles/src/main.rs +++ b/g3tiles/src/main.rs @@ -96,7 +96,7 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> { }); } - g3tiles::signal::setup_and_spawn().context("failed to setup signal handler")?; + g3tiles::signal::register().context("failed to setup signal handler")?; g3tiles::discover::load_all() .await diff --git a/g3tiles/src/signal.rs b/g3tiles/src/signal.rs index 6376906e6..bad5e9859 100644 --- a/g3tiles/src/signal.rs +++ b/g3tiles/src/signal.rs @@ -15,31 +15,12 @@ */ use log::{error, info, warn}; -use tokio::signal::unix::SignalKind; use tokio::sync::Mutex; -use g3_signal::{ActionSignal, SigResult}; +use g3_daemon::signal::AsyncSignalAction; static RELOAD_MUTEX: Mutex<()> = Mutex::const_new(()); -fn do_quit(_: u32) -> SigResult { - info!("got quit signal"); - tokio::spawn(crate::control::UniqueController::abort_immediately()); - SigResult::Break -} - -fn go_offline(_: u32) -> SigResult { - info!("got offline signal"); - tokio::spawn(crate::control::DaemonController::abort()); - SigResult::Break -} - -fn call_reload(_: u32) -> SigResult { - info!("got reload signal"); - tokio::spawn(do_reload()); - SigResult::Continue -} - async fn do_reload() { let _guard = RELOAD_MUTEX.lock().await; info!("reloading config"); @@ -62,10 +43,33 @@ async fn do_reload() { info!("reload finished"); } -pub fn setup_and_spawn() -> anyhow::Result<()> { - tokio::spawn(ActionSignal::new(SignalKind::quit(), &do_quit)?); - tokio::spawn(ActionSignal::new(SignalKind::interrupt(), &do_quit)?); - tokio::spawn(ActionSignal::new(SignalKind::terminate(), &go_offline)?); - tokio::spawn(ActionSignal::new(SignalKind::hangup(), &call_reload)?); - Ok(()) +#[derive(Clone, Copy)] +struct QuitAction {} + +impl AsyncSignalAction for QuitAction { + async fn run(&self) { + crate::control::UniqueController::abort_immediately().await + } +} + +#[derive(Clone, Copy)] +struct OfflineAction {} + +impl AsyncSignalAction for OfflineAction { + async fn run(&self) { + crate::control::DaemonController::abort().await + } +} + +#[derive(Clone, Copy)] +struct ReloadAction {} + +impl AsyncSignalAction for ReloadAction { + async fn run(&self) { + do_reload().await + } +} + +pub fn register() -> anyhow::Result<()> { + g3_daemon::signal::register(QuitAction {}, OfflineAction {}, ReloadAction {}) } diff --git a/lib/g3-daemon/Cargo.toml b/lib/g3-daemon/Cargo.toml index ddae40994..0e3cb1491 100644 --- a/lib/g3-daemon/Cargo.toml +++ b/lib/g3-daemon/Cargo.toml @@ -27,7 +27,7 @@ rand.workspace = true fastrand.workspace = true uuid = { workspace = true, features = ["v1"] } chrono.workspace = true -tokio = { workspace = true, features = ["net", "io-util"] } +tokio = { workspace = true, features = ["net", "io-util", "signal"] } tokio-util = { workspace = true, features = ["compat"] } http = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } diff --git a/lib/g3-daemon/src/lib.rs b/lib/g3-daemon/src/lib.rs index d175a9343..da6d20796 100644 --- a/lib/g3-daemon/src/lib.rs +++ b/lib/g3-daemon/src/lib.rs @@ -22,6 +22,7 @@ pub mod metrics; pub mod opts; pub mod runtime; pub mod server; +pub mod signal; pub mod stat; #[cfg(unix)] diff --git a/lib/g3-daemon/src/signal.rs b/lib/g3-daemon/src/signal.rs new file mode 100644 index 000000000..b48c91df5 --- /dev/null +++ b/lib/g3-daemon/src/signal.rs @@ -0,0 +1,78 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::future::{poll_fn, Future}; + +use anyhow::anyhow; +use log::info; + +pub trait AsyncSignalAction: Copy { + fn run(&self) -> impl Future + Send; +} + +pub fn register( + do_quit: QUIT, + go_offline: OFFLINE, + call_reload: RELOAD, +) -> anyhow::Result<()> +where + QUIT: AsyncSignalAction + Send + 'static, + OFFLINE: AsyncSignalAction + Send + 'static, + RELOAD: AsyncSignalAction + Send + 'static, +{ + use tokio::signal::unix::{signal, SignalKind}; + + let mut quit_sig = signal(SignalKind::quit()) + .map_err(|e| anyhow!("failed to create SIGQUIT listener: {e}"))?; + tokio::spawn(async move { + if poll_fn(|cx| quit_sig.poll_recv(cx)).await.is_some() { + info!("got offline signal"); + do_quit.run().await; + } + }); + + let mut int_sig = signal(SignalKind::interrupt()) + .map_err(|e| anyhow!("failed to create SIGINT listener: {e}"))?; + tokio::spawn(async move { + if poll_fn(|cx| int_sig.poll_recv(cx)).await.is_some() { + info!("got offline signal"); + do_quit.run().await; + } + }); + + let mut term_sig = signal(SignalKind::terminate()) + .map_err(|e| anyhow!("failed to create SIGTERM listener: {e}"))?; + tokio::spawn(async move { + if poll_fn(|cx| term_sig.poll_recv(cx)).await.is_some() { + info!("got offline signal"); + go_offline.run().await; + } + }); + + let mut hup_sig = signal(SignalKind::hangup()) + .map_err(|e| anyhow!("failed to create SIGHUP listener: {e}"))?; + tokio::spawn(async move { + loop { + if poll_fn(|cx| hup_sig.poll_recv(cx)).await.is_none() { + break; + } + info!("got reload signal"); + call_reload.run().await; + } + }); + + Ok(()) +} diff --git a/lib/g3-signal/Cargo.toml b/lib/g3-signal/Cargo.toml deleted file mode 100644 index 886b74279..000000000 --- a/lib/g3-signal/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "g3-signal" -version = "0.3.0" -license.workspace = true -edition.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -tokio = { workspace = true, features = ["signal"] } diff --git a/lib/g3-signal/src/lib.rs b/lib/g3-signal/src/lib.rs deleted file mode 100644 index 33fdf6dcb..000000000 --- a/lib/g3-signal/src/lib.rs +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -mod signal; - -pub use signal::{ActionSignal, SigResult}; diff --git a/lib/g3-signal/src/signal.rs b/lib/g3-signal/src/signal.rs deleted file mode 100644 index ecb5206ad..000000000 --- a/lib/g3-signal/src/signal.rs +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use tokio::signal::unix::{signal, Signal, SignalKind}; - -pub enum SigResult { - Continue, - Break, -} - -type SigAction = dyn Fn(u32) -> SigResult + Sync; - -pub struct ActionSignal<'a> { - signal: Signal, - count: u32, - action: &'a SigAction, -} - -impl<'a> ActionSignal<'a> { - pub fn new(signo: SignalKind, action: &'a SigAction) -> io::Result { - Ok(ActionSignal { - signal: signal(signo)?, - count: 0, - action, - }) - } -} - -impl<'a> Future for ActionSignal<'a> { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match Pin::new(&mut self.signal).poll_recv(cx) { - Poll::Ready(Some(_)) => { - self.count += 1; - match (self.action)(self.count) { - SigResult::Continue => continue, - SigResult::Break => return Poll::Ready(()), - } - } - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => return Poll::Pending, - } - } - } -}