diff --git a/Cargo.lock b/Cargo.lock index af9d925..2cb6f3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -291,7 +291,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1a816186fa68d9e426e3cb4ae4dff1fcd8e4a2c34b781bf7a822574a0d0aac8" dependencies = [ - "sct", + "sct 0.6.1", ] [[package]] @@ -668,11 +668,11 @@ dependencies = [ "futures-util", "hyper", "log", - "rustls", + "rustls 0.19.1", "rustls-native-certs", "tokio", - "tokio-rustls", - "webpki", + "tokio-rustls 0.22.0", + "webpki 0.21.4", ] [[package]] @@ -770,9 +770,9 @@ dependencies = [ [[package]] name = "libunftp" -version = "0.18.1" +version = "0.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9b1b787fe6c72fcc8e4193685f1c41bcfdb821354f2b6142b5e6a06ba27ed18" +checksum = "dfc17ea706d8085decd7cc7dfdc2ab1f64df57fc89f78ee08750feb4ee6dc77d" dependencies = [ "async-trait", "bitflags", @@ -784,14 +784,15 @@ dependencies = [ "lazy_static", "md-5", "moka", - "prometheus", + "prometheus 0.13.0", "proxy-protocol", - "rustls", + "rustls 0.20.1", + "rustls-pemfile", "slog", "slog-stdlog", "thiserror", "tokio", - "tokio-rustls", + "tokio-rustls 0.23.1", "tokio-util", "tracing", "tracing-attributes", @@ -1204,6 +1205,20 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prometheus" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7f64969ffd5dd8f39bd57a68ac53c163a095ed9d0fb707146da1b27025a3504" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "thiserror", +] + [[package]] name = "protobuf" version = "2.23.0" @@ -1428,8 +1443,20 @@ dependencies = [ "base64", "log", "ring", - "sct", - "webpki", + "sct 0.6.1", + "webpki 0.21.4", +] + +[[package]] +name = "rustls" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dac4581f0fc0e0efd529d069e8189ec7b90b8e7680e21beb35141bdc45f36040" +dependencies = [ + "log", + "ring", + "sct 0.7.0", + "webpki 0.22.0", ] [[package]] @@ -1439,11 +1466,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" dependencies = [ "openssl-probe", - "rustls", + "rustls 0.19.1", "schannel", "security-framework", ] +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64", +] + [[package]] name = "rustversion" version = "1.0.5" @@ -1500,6 +1536,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "seahash" version = "4.1.0" @@ -1848,9 +1894,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.12.0" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc" +checksum = "70e992e41e0d2fb9f755b37446f20900f64446ef54874f40a60c78f021ac6144" dependencies = [ "autocfg", "bytes", @@ -1868,9 +1914,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.2.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" +checksum = "c9efc1aba077437943f7515666aa2b882dfabfbfdf89c819ea75a8d6e9eaba5e" dependencies = [ "proc-macro2", "quote", @@ -1883,9 +1929,20 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ - "rustls", + "rustls 0.19.1", + "tokio", + "webpki 0.21.4", +] + +[[package]] +name = "tokio-rustls" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4baa378e417d780beff82bf54ceb0d195193ea6a00c14e22359e7f39456b5689" +dependencies = [ + "rustls 0.20.1", "tokio", - "webpki", + "webpki 0.22.0", ] [[package]] @@ -1901,9 +1958,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.8" +version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd" +checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" dependencies = [ "bytes", "futures-core", @@ -1931,9 +1988,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.28" +version = "0.1.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8" +checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" dependencies = [ "cfg-if 1.0.0", "pin-project-lite", @@ -1943,9 +2000,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.16" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98863d0dd09fa59a1b79c6750ad80dbda6b75f4e71c437a6a1a8cb91a8bcbd77" +checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" dependencies = [ "proc-macro2", "quote", @@ -1954,9 +2011,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.20" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46125608c26121c81b0c6d693eab5a420e416da7e43c426d2e8f7df8da8a3acf" +checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" dependencies = [ "lazy_static", ] @@ -1995,7 +2052,7 @@ dependencies = [ "lazy_static", "libunftp", "pretty_assertions", - "prometheus", + "prometheus 0.12.0", "serde", "serde_json", "slog", @@ -2307,6 +2364,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "winapi" version = "0.3.9" @@ -2369,7 +2436,7 @@ dependencies = [ "hyper-rustls", "log", "percent-encoding", - "rustls", + "rustls 0.19.1", "seahash", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index d1684ea..d64e07d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ authors = [ "Steven Meunier ", "Rob klein Gunnewiek " ] -edition = "2018" +edition = "2021" description = "When you need to FTP, but don't want to. An async, cloud orientated FTP(S) server built on libunftp" repository = "https://github.com/bolcom/unFTP" homepage = "https://github.com/bolcom/unFTP" @@ -36,7 +36,7 @@ http = "0.2.5" hyper = { version = "0.14.13", features = ["server", "http1"] } hyper-rustls = "^0.22" lazy_static = "1.4.0" -libunftp = "0.18.1" +libunftp = "0.18.2" prometheus = { version = "0.12.0", features = ["process"] } serde = { version = "1.0.130", features = ["derive"] } serde_json = { version = "1.0.68" } diff --git a/src/http.rs b/src/http.rs index 7f485ba..07f0c1f 100644 --- a/src/http.rs +++ b/src/http.rs @@ -17,7 +17,13 @@ const PATH_HEALTH: &str = "/health"; const PATH_READINESS: &str = "/ready"; // starts an HTTP server and exports Prometheus metrics. -pub async fn start(log: &Logger, bind_addr: &str, ftp_addr: SocketAddr) -> Result<(), String> { +pub async fn start( + log: &Logger, + bind_addr: &str, + ftp_addr: SocketAddr, + mut shutdown: tokio::sync::broadcast::Receiver<()>, + done: tokio::sync::mpsc::Sender<()>, +) -> Result<(), String> { let http_addr: SocketAddr = bind_addr .parse() .map_err(|e| format!("unable to parse HTTP address {}: {}", bind_addr, e))?; @@ -32,7 +38,12 @@ pub async fn start(log: &Logger, bind_addr: &str, ftp_addr: SocketAddr) -> Resul } }); - let http_server = hyper::Server::bind(&http_addr).serve(make_svc); + let http_server = hyper::Server::bind(&http_addr) + .serve(make_svc) + .with_graceful_shutdown(async { + shutdown.recv().await.ok(); + info!(log, "Shutting down HTTP server"); + }); info!(log, "Starting HTTP service."; "address" => &http_addr); info!(log, "Exposing {} service home.", app::NAME; "path" => PATH_HOME); @@ -43,6 +54,9 @@ pub async fn start(log: &Logger, bind_addr: &str, ftp_addr: SocketAddr) -> Resul if let Err(e) = http_server.await { error!(log, "HTTP server error: {}", e) } + + info!(log, "HTTP shutdown OK"); + drop(done); Ok(()) } diff --git a/src/main.rs b/src/main.rs index ac4fbfb..8d90e5a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,6 +32,7 @@ use libunftp::{ }; use slog::*; use std::process::Command; +use std::time::Duration; use std::{ env, fs, net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs}, @@ -244,7 +245,13 @@ fn gcs_storage_backend( } // starts the FTP server as a Tokio task. -fn start_ftp(log: &Logger, root_log: &Logger, m: &clap::ArgMatches) -> Result<(), String> { +fn start_ftp( + log: &Logger, + root_log: &Logger, + m: &clap::ArgMatches, + shutdown: tokio::sync::broadcast::Receiver<()>, + done: tokio::sync::mpsc::Sender<()>, +) -> Result<(), String> { let event_dispatcher = notify::create_event_dispatcher(Arc::new(log.new(o!("module" => "storage"))), m)?; match m.value_of(args::STORAGE_BACKEND_TYPE) { @@ -254,6 +261,8 @@ fn start_ftp(log: &Logger, root_log: &Logger, m: &clap::ArgMatches) -> Result<() m, fs_storage_backend(root_log, m, event_dispatcher.clone()), event_dispatcher.clone(), + shutdown, + done, ), Some("gcs") => start_ftp_with_storage( log, @@ -261,6 +270,8 @@ fn start_ftp(log: &Logger, root_log: &Logger, m: &clap::ArgMatches) -> Result<() m, gcs_storage_backend(root_log, m, event_dispatcher.clone())?, event_dispatcher.clone(), + shutdown, + done, ), Some(x) => Err(format!("unknown storage back-end type {}", x)), } @@ -306,6 +317,8 @@ fn start_ftp_with_storage( arg_matches: &ArgMatches, storage_backend: Box S) + Send + Sync>, event_dispatcher: Arc>, + mut shutdown: tokio::sync::broadcast::Receiver<()>, + done: tokio::sync::mpsc::Sender<()>, ) -> Result<(), String> where S: StorageBackend + Send + Sync + 'static, @@ -367,6 +380,7 @@ where &host_name, ); + let l = log.clone(); let mut server = Server::with_authenticator(storage_backend, Arc::new(authenticator)) .greeting("Welcome to unFTP") .passive_ports(start_port..end_port) @@ -374,6 +388,11 @@ where .logger(root_log.new(o!("lib" => "libunftp"))) .passive_host(passive_host) .sitemd5(md5_setting) + .shutdown_indicator(async move { + shutdown.recv().await.ok(); + info!(l, "Shutting down FTP server"); + libunftp::options::Shutdown::new().grace_period(Duration::from_secs(11)) + }) .metrics(); // Setup proxy protocol mode. @@ -465,7 +484,14 @@ where } }; - tokio::spawn(server.listen(addr)); + let log = log.clone(); + tokio::spawn(async move { + if let Err(e) = server.listen(addr).await { + error!(log, "FTP server error: {:?}", e) + } + info!(log, "FTP exiting"); + drop(done) + }); tokio::spawn(async move { event_dispatcher @@ -502,24 +528,44 @@ async fn listen_for_signals() -> Result { } async fn main_task(arg_matches: ArgMatches<'_>, log: &Logger, root_log: &Logger) -> Result { + let (shutdown_sender, http_receiver) = tokio::sync::broadcast::channel(1); + let (http_done_sender, mut shutdown_done_received) = tokio::sync::mpsc::channel(1); + let ftp_done_sender = http_done_sender.clone(); + let ftp_addr: SocketAddr = arg_matches .value_of(args::BIND_ADDRESS) .unwrap() .parse() .map_err(|_| "could not parse FTP address")?; + if let Some(addr) = arg_matches.value_of(args::HTTP_BIND_ADDRESS) { let addr = String::from(addr); let log = log.clone(); tokio::spawn(async move { - if let Err(e) = http::start(&log, &*addr, ftp_addr).await { + if let Err(e) = http::start(&log, &*addr, ftp_addr, http_receiver, http_done_sender).await { error!(log, "HTTP Server error: {}", e) } }); } - start_ftp(log, root_log, &arg_matches)?; + start_ftp( + log, + root_log, + &arg_matches, + shutdown_sender.subscribe(), + ftp_done_sender, + )?; + + let signal = listen_for_signals().await?; + info!(log, "Received signal {}, shutting down...", signal.0); + + drop(shutdown_sender); + + // When every sender has gone out of scope, the recv call + // will return with an error. We ignore the error. + let _ = shutdown_done_received.recv().await; - listen_for_signals().await + Ok(signal) } fn run(arg_matches: ArgMatches) -> Result<(), String> { @@ -547,8 +593,8 @@ fn run(arg_matches: ArgMatches) -> Result<(), String> { ); let runtime = Runtime::new().map_err(|e| format!("could not construct runtime: {}", e))?; - let ExitSignal(signal) = runtime.block_on(main_task(arg_matches, &log, &root_logger))?; - info!(log, "Received signal {}, shutting down...", signal); + let _ = runtime.block_on(main_task(arg_matches, &log, &root_logger))?; + info!(log, "Exiting..."); Ok(()) }