diff --git a/Cargo.lock b/Cargo.lock
index 917a0da6c..db17f490d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1,6 +1,6 @@
 # This file is automatically @generated by Cargo.
 # It is not intended for manual editing.
-version = 3
+version = 4
 
 [[package]]
 name = "addr2line"
@@ -1890,11 +1890,10 @@ dependencies = [
 
 [[package]]
 name = "built"
-version = "0.6.1"
+version = "0.7.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b99c4cdc7b2c2364182331055623bdf45254fcb679fea565c40c3c11c101889a"
+checksum = "c360505aed52b7ec96a3636c3f039d99103c37d1d9b4f7a8c743d3ea9ffcd03b"
 dependencies = [
- "cargo-lock",
  "git2",
 ]
 
@@ -2009,18 +2008,6 @@ dependencies = [
  "serde",
 ]
 
-[[package]]
-name = "cargo-lock"
-version = "9.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e11c675378efb449ed3ce8de78d75d0d80542fc98487c26aba28eb3b82feac72"
-dependencies = [
- "semver 1.0.23",
- "serde",
- "toml",
- "url",
-]
-
 [[package]]
 name = "cargo-platform"
 version = "0.1.8"
@@ -2113,6 +2100,7 @@ dependencies = [
  "multibase 0.9.1",
  "multihash-codetable",
  "serde",
+ "shutdown",
  "sqlx",
  "tokio",
  "tracing",
@@ -2144,6 +2132,7 @@ dependencies = [
  "serde",
  "serde_ipld_dagcbor",
  "serde_json",
+ "shutdown",
  "swagger",
  "test-log",
  "tikv-jemalloc-ctl",
@@ -2330,11 +2319,13 @@ dependencies = [
  "http 1.1.0",
  "mockall",
  "object_store",
+ "shutdown",
  "test-log",
  "tokio",
  "tokio-stream",
  "tonic 0.12.3",
  "tracing",
+ "tracing-subscriber",
 ]
 
 [[package]]
@@ -2438,7 +2429,6 @@ name = "ceramic-metadata"
 version = "0.47.3"
 dependencies = [
  "built",
- "project-root",
  "serde",
 ]
 
@@ -2510,6 +2500,7 @@ dependencies = [
  "prometheus-client",
  "recon",
  "serde_ipld_dagcbor",
+ "shutdown",
  "signal-hook",
  "signal-hook-tokio",
  "swagger",
@@ -4890,11 +4881,11 @@ dependencies = [
 
 [[package]]
 name = "git2"
-version = "0.17.2"
+version = "0.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7b989d6a7ca95a362cf2cfc5ad688b3a467be1f87e480b8dad07fee8c79b0044"
+checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724"
 dependencies = [
- "bitflags 1.3.2",
+ "bitflags 2.6.0",
  "libc",
  "libgit2-sys",
  "log",
@@ -5318,7 +5309,7 @@ dependencies = [
  "httpdate",
  "itoa",
  "pin-project-lite",
- "socket2 0.4.10",
+ "socket2 0.5.7",
  "tokio",
  "tower-service",
  "tracing",
@@ -6414,9 +6405,9 @@ checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5"
 
 [[package]]
 name = "libgit2-sys"
-version = "0.15.2+1.6.4"
+version = "0.17.0+1.8.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a80df2e11fb4a61f4ba2ab42dbe7f74468da143f1a75c74e11dee7c813f694fa"
+checksum = "10472326a8a6477c3c20a64547b0059e4b0d086869eee31e6d7da728a8eb7224"
 dependencies = [
  "cc",
  "libc",
@@ -8813,12 +8804,6 @@ dependencies = [
  "unicode-ident",
 ]
 
-[[package]]
-name = "project-root"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8bccbff07d5ed689c4087d20d7307a52ab6141edeedf487c3876a55b86cf63df"
-
 [[package]]
 name = "prometheus-client"
 version = "0.22.3"
@@ -8900,7 +8885,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
 dependencies = [
  "bytes 1.7.2",
  "heck 0.5.0",
- "itertools 0.11.0",
+ "itertools 0.13.0",
  "log",
  "multimap",
  "once_cell",
@@ -8946,7 +8931,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5"
 dependencies = [
  "anyhow",
- "itertools 0.11.0",
+ "itertools 0.13.0",
  "proc-macro2",
  "quote",
  "syn 2.0.90",
@@ -10111,15 +10096,6 @@ dependencies = [
  "serde",
 ]
 
-[[package]]
-name = "serde_spanned"
-version = "0.6.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1"
-dependencies = [
- "serde",
-]
-
 [[package]]
 name = "serde_tokenstream"
 version = "0.2.2"
@@ -10278,6 +10254,14 @@ version = "1.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
 
+[[package]]
+name = "shutdown"
+version = "0.47.3"
+dependencies = [
+ "futures",
+ "tokio",
+]
+
 [[package]]
 name = "signal-hook"
 version = "0.3.17"
@@ -11792,26 +11776,11 @@ dependencies = [
  "tokio",
 ]
 
-[[package]]
-name = "toml"
-version = "0.7.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257"
-dependencies = [
- "serde",
- "serde_spanned",
- "toml_datetime",
- "toml_edit 0.19.15",
-]
-
 [[package]]
 name = "toml_datetime"
 version = "0.6.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
-dependencies = [
- "serde",
-]
 
 [[package]]
 name = "toml_edit"
@@ -11820,8 +11789,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
 dependencies = [
  "indexmap 2.5.0",
- "serde",
- "serde_spanned",
  "toml_datetime",
  "winnow 0.5.40",
 ]
@@ -12571,7 +12538,7 @@ version = "0.1.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
 dependencies = [
- "windows-sys 0.48.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 1d5864af5..ba3eb6f78 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,6 +20,7 @@ members = [
     "peer-svc",
     "pipeline",
     "recon",
+    "shutdown",
     "sql",
     "validation",
     "beetle/iroh-bitswap",
@@ -182,6 +183,7 @@ serde_qs = "0.10.1"
 serde_with = "2.1"
 sha2 = { version = "0.10", default-features = false }
 sha3 = "0.10"
+shutdown = { path = "./shutdown/" }
 smallvec = "1.10"
 # pragma optimize hangs forver on 0.8, possibly due to libsqlite-sys upgrade
 sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "chrono"] }
diff --git a/anchor-service/Cargo.toml b/anchor-service/Cargo.toml
index b86861569..5199c5f48 100644
--- a/anchor-service/Cargo.toml
+++ b/anchor-service/Cargo.toml
@@ -29,3 +29,6 @@ chrono.workspace = true
 
 [features]
 test-network = []
+
+[dev-dependencies]
+shutdown.workspace = true
diff --git a/anchor-service/src/anchor_batch.rs b/anchor-service/src/anchor_batch.rs
index 142726317..283a958b9 100644
--- a/anchor-service/src/anchor_batch.rs
+++ b/anchor-service/src/anchor_batch.rs
@@ -72,7 +72,7 @@ impl AnchorService {
     /// - Store the TimeEvents using the AnchorClient
     ///
     /// This function will run indefinitely, or until the process is shutdown.
-    pub async fn run(&mut self, shutdown_signal: impl Future<Output = ()>) {
+    pub async fn run(mut self, shutdown_signal: impl Future<Output = ()>) {
         let shutdown_signal = shutdown_signal.fuse();
         pin_mut!(shutdown_signal);
 
@@ -235,7 +235,8 @@ mod tests {
     use ceramic_core::NodeKey;
     use ceramic_sql::sqlite::SqlitePool;
     use expect_test::expect_file;
-    use tokio::{sync::broadcast, time::sleep};
+    use shutdown::Shutdown;
+    use tokio::time::sleep;
 
     use super::AnchorService;
     use crate::{MockAnchorEventService, MockCas};
@@ -248,7 +249,7 @@ mod tests {
         let node_id = NodeKey::random().id();
         let anchor_interval = Duration::from_millis(5);
         let anchor_batch_size = 1000000;
-        let mut anchor_service = AnchorService::new(
+        let anchor_service = AnchorService::new(
             tx_manager,
             event_service.clone(),
             pool,
@@ -256,20 +257,14 @@ mod tests {
             anchor_interval,
             anchor_batch_size,
         );
-        let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1);
-        tokio::spawn(async move {
-            anchor_service
-                .run(async move {
-                    let _ = shutdown_signal.recv().await;
-                })
-                .await
-        });
+        let shutdown = Shutdown::new();
+        tokio::spawn(anchor_service.run(shutdown.wait_fut()));
         while event_service.events.lock().unwrap().is_empty() {
             sleep(Duration::from_millis(1)).await;
         }
         expect_file!["./test-data/test_anchor_service_run.txt"]
             .assert_debug_eq(&event_service.events.lock().unwrap());
-        shutdown_signal_tx.send(()).unwrap();
+        shutdown.shutdown();
     }
 
     #[tokio::test]
@@ -280,7 +275,7 @@ mod tests {
         let node_id = NodeKey::random().id();
         let anchor_interval = Duration::from_millis(5);
         let anchor_batch_size = 1000000;
-        let mut anchor_service = AnchorService::new(
+        let anchor_service = AnchorService::new(
             tx_manager,
             event_service.clone(),
             pool,
@@ -288,20 +283,13 @@ mod tests {
             anchor_interval,
             anchor_batch_size,
         );
-        let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1);
-        // let mut shutdown_signal = shutdown_signal_rx.resubscribe();
-        tokio::spawn(async move {
-            anchor_service
-                .run(async move {
-                    let _ = shutdown_signal.recv().await;
-                })
-                .await
-        });
+        let shutdown = Shutdown::new();
+        tokio::spawn(anchor_service.run(shutdown.wait_fut()));
         while event_service.events.lock().unwrap().is_empty() {
             sleep(Duration::from_millis(1)).await;
         }
         expect_file!["./test-data/test_anchor_service_run_1.txt"]
             .assert_debug_eq(&event_service.events.lock().unwrap());
-        shutdown_signal_tx.send(()).unwrap();
+        shutdown.shutdown();
     }
 }
diff --git a/api/Cargo.toml b/api/Cargo.toml
index d95b690d8..c84837ff5 100644
--- a/api/Cargo.toml
+++ b/api/Cargo.toml
@@ -26,6 +26,7 @@ recon.workspace = true
 serde.workspace = true
 serde_ipld_dagcbor.workspace = true
 serde_json.workspace = true
+shutdown.workspace = true
 swagger.workspace = true
 tokio.workspace = true
 tracing.workspace = true
diff --git a/api/src/server.rs b/api/src/server.rs
index 1f5d4a734..53bf827ed 100644
--- a/api/src/server.rs
+++ b/api/src/server.rs
@@ -52,6 +52,7 @@ use datafusion::logical_expr::{col, lit, BuiltInWindowFunction, Expr, ExprFuncti
 use futures::TryFutureExt;
 use multiaddr::Protocol;
 use recon::Key;
+use shutdown::Shutdown;
 use swagger::{ApiError, ByteArray};
 #[cfg(not(target_env = "msvc"))]
 use tikv_jemalloc_ctl::epoch;
@@ -401,7 +402,7 @@ where
         model: Arc<M>,
         p2p: P,
         pipeline: Option<SessionContext>,
-        shutdown_signal: broadcast::Receiver<()>,
+        shutdown_signal: Shutdown,
     ) -> Self {
         let (tx, event_rx) = tokio::sync::mpsc::channel::<EventInsert>(1024);
         let event_store = model.clone();
@@ -433,7 +434,7 @@ where
         event_store: Arc<M>,
         mut event_rx: tokio::sync::mpsc::Receiver<EventInsert>,
         node_id: NodeId,
-        mut shutdown_signal: broadcast::Receiver<()>,
+        shutdown_signal: Shutdown,
     ) -> tokio::task::JoinHandle<()> {
         tokio::spawn(async move {
             let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS));
@@ -455,7 +456,7 @@ where
                             events.extend(buf);
                         }
                     }
-                    _ = shutdown_signal.recv() => {
+                    _ = shutdown_signal.wait_fut() => {
                         tracing::debug!("Insert many task got shutdown signal");
                         shutdown = true;
                     }
diff --git a/api/src/tests.rs b/api/src/tests.rs
index f145a27b2..bf090a1ff 100644
--- a/api/src/tests.rs
+++ b/api/src/tests.rs
@@ -32,6 +32,7 @@ use mockall::{mock, predicate};
 use multiaddr::Multiaddr;
 use multibase::Base;
 use recon::Key;
+use shutdown::Shutdown;
 use test_log::test;
 use tokio::join;
 
@@ -202,8 +203,8 @@ where
     M: EventService + 'static,
     P: P2PService,
 {
-    let (_, rx) = tokio::sync::broadcast::channel(1);
-    Server::new(node_id, network, interest, model, p2p, pipeline, rx)
+    let shutdown = Shutdown::new();
+    Server::new(node_id, network, interest, model, p2p, pipeline, shutdown)
 }
 
 #[test(tokio::test)]
diff --git a/core/src/peer.rs b/core/src/peer.rs
index 24a0640e6..a300a6137 100644
--- a/core/src/peer.rs
+++ b/core/src/peer.rs
@@ -184,7 +184,7 @@ pub struct WithId<'a> {
     node_key: &'a NodeKey,
     expiration: u64,
 }
-impl<'a> BuilderState for WithId<'a> {}
+impl BuilderState for WithId<'_> {}
 
 /// Build state where the addresses are known.
 pub struct WithAddresses<'a> {
@@ -192,7 +192,7 @@ pub struct WithAddresses<'a> {
     expiration: u64,
     addresses: Vec<Multiaddr>,
 }
-impl<'a> BuilderState for WithAddresses<'a> {}
+impl BuilderState for WithAddresses<'_> {}
 
 impl Builder<Init> {
     /// Set the expiration to earliest possible value.
@@ -245,7 +245,7 @@ impl<'a> Builder<WithId<'a>> {
         }
     }
 }
-impl<'a> Builder<WithAddresses<'a>> {
+impl Builder<WithAddresses<'_>> {
     /// Finish the build producing a [`PeerKey`].
     pub fn build(self) -> PeerKey {
         let entry = PeerEntry::new(
diff --git a/core/src/signer.rs b/core/src/signer.rs
index c37a3a741..e16eae000 100644
--- a/core/src/signer.rs
+++ b/core/src/signer.rs
@@ -15,7 +15,7 @@ pub trait Signer {
     fn sign_jws(&self, payload: &str) -> anyhow::Result<String>;
 }
 
-impl<'a, S: Signer + Sync> Signer for &'a S {
+impl<S: Signer + Sync> Signer for &'_ S {
     fn algorithm(&self) -> Algorithm {
         (*self).algorithm()
     }
diff --git a/core/src/stream_id.rs b/core/src/stream_id.rs
index 06de2f368..bb56a5d30 100644
--- a/core/src/stream_id.rs
+++ b/core/src/stream_id.rs
@@ -226,7 +226,7 @@ impl<'de> Deserialize<'de> for StreamId {
 
 struct StreamIdVisitor;
 
-impl<'de> serde::de::Visitor<'de> for StreamIdVisitor {
+impl serde::de::Visitor<'_> for StreamIdVisitor {
     type Value = StreamId;
 
     fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
diff --git a/event/src/bytes.rs b/event/src/bytes.rs
index af019f464..f327588bb 100644
--- a/event/src/bytes.rs
+++ b/event/src/bytes.rs
@@ -44,7 +44,7 @@ impl<'de> Deserialize<'de> for Bytes {
 }
 struct BytesVisitor;
 
-impl<'de> Visitor<'de> for BytesVisitor {
+impl Visitor<'_> for BytesVisitor {
     type Value = Bytes;
 
     fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
diff --git a/event/src/lib.rs b/event/src/lib.rs
index a50a387f3..d6ea63d60 100644
--- a/event/src/lib.rs
+++ b/event/src/lib.rs
@@ -8,12 +8,14 @@ pub mod unvalidated;
 
 pub use ceramic_core::*;
 
+/// Shared testing logic with the crate.
 #[cfg(test)]
 pub mod tests {
     use ceramic_core::DidDocument;
 
     use crate::unvalidated::signed::JwkSigner;
 
+    /// Pretty print json
     pub fn to_pretty_json(json_data: &[u8]) -> String {
         let json: serde_json::Value = match serde_json::from_slice(json_data) {
             Ok(r) => r,
@@ -27,10 +29,12 @@ pub mod tests {
         serde_json::to_string_pretty(&json).unwrap()
     }
 
+    /// Serialize to pretty json
     pub fn serialize_to_pretty_json<T: serde::Serialize>(data: &T) -> String {
         serde_json::to_string_pretty(data).unwrap()
     }
 
+    /// Construct a signer with a hardcoded private key
     pub async fn signer() -> JwkSigner {
         JwkSigner::new(
             DidDocument::new("did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw"),
diff --git a/flight/Cargo.toml b/flight/Cargo.toml
index 46bb40902..30030598a 100644
--- a/flight/Cargo.toml
+++ b/flight/Cargo.toml
@@ -26,12 +26,14 @@ tracing.workspace = true
 ceramic-arrow-test.workspace = true
 ceramic-pipeline.workspace = true
 expect-test.workspace = true
-tokio = { workspace = true, features = ["macros", "rt"] }
-test-log.workspace = true
 http.workspace = true
-tokio-stream = { workspace = true, features = ["net"] }
 mockall.workspace = true
 object_store.workspace = true
+shutdown.workspace = true
+test-log.workspace = true
+tokio = { workspace = true, features = ["macros", "rt"] }
+tokio-stream = { workspace = true, features = ["net"] }
+tracing-subscriber.workspace = true
 
 [package.metadata.cargo-machete]
 ignored = [
diff --git a/metadata/Cargo.toml b/metadata/Cargo.toml
index eeda6c890..26b3b588c 100644
--- a/metadata/Cargo.toml
+++ b/metadata/Cargo.toml
@@ -12,5 +12,4 @@ publish = false
 serde.workspace = true
 
 [build-dependencies]
-built = { version = "0.6.0", features = ["git2"] }
-project-root = "0.2.2"
+built = { version = "0.7", features = ["git2"] }
diff --git a/metadata/build.rs b/metadata/build.rs
index 3cf7cd4a6..d8f91cb91 100644
--- a/metadata/build.rs
+++ b/metadata/build.rs
@@ -1,9 +1,3 @@
 fn main() {
-    let mut opts = built::Options::default();
-    opts.set_dependencies(true);
-
-    let src = project_root::get_project_root().unwrap();
-    let dst = std::path::Path::new(&std::env::var("OUT_DIR").unwrap()).join("built.rs");
-    built::write_built_file_with_opts(&opts, src.as_ref(), &dst)
-        .expect("Failed to acquire build-time information");
+    built::write_built_file().expect("Failed to acquire build-time information");
 }
diff --git a/one/Cargo.toml b/one/Cargo.toml
index c84f58acd..b9449cedb 100644
--- a/one/Cargo.toml
+++ b/one/Cargo.toml
@@ -51,6 +51,7 @@ object_store.workspace = true
 prometheus-client.workspace = true
 recon.workspace = true
 serde_ipld_dagcbor.workspace = true
+shutdown.workspace = true
 signal-hook = "0.3.17"
 signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
 swagger.workspace = true
diff --git a/one/src/daemon.rs b/one/src/daemon.rs
index f5ac7d9c7..8ce1fe400 100644
--- a/one/src/daemon.rs
+++ b/one/src/daemon.rs
@@ -20,11 +20,11 @@ use clap::Args;
 use object_store::aws::AmazonS3Builder;
 use object_store::local::LocalFileSystem;
 use recon::{Recon, ReconInterestProvider};
+use shutdown::{Shutdown, ShutdownSignal};
 use signal_hook::consts::signal::*;
 use signal_hook_tokio::Signals;
 use std::sync::Arc;
 use swagger::{auth::MakeAllowAllAuthenticator, EmptyContext};
-use tokio::sync::broadcast;
 use tracing::{debug, error, info, warn};
 
 #[derive(Args, Debug)]
@@ -338,14 +338,14 @@ async fn get_eth_rpc_providers(
 
 fn spawn_database_optimizer(
     sqlite_pool: SqlitePool,
-    mut shutdown: tokio::sync::broadcast::Receiver<()>,
+    mut shutdown: ShutdownSignal,
 ) -> tokio::task::JoinHandle<()> {
     tokio::spawn(async move {
         let mut duration = std::time::Duration::from_secs(60 * 60 * 24); // once daily
         loop {
             // recreate interval in case it's been shortened due to error
             tokio::select! {
-                _ = shutdown.recv() => {
+                _ = &mut shutdown => {
                     break;
                 }
                 _ = tokio::time::sleep(duration) => {
@@ -408,11 +408,11 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
     debug!(dir = %opts.p2p_key_dir.display(), "using p2p key directory");
 
     // Setup shutdown signal
-    let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1);
+    let shutdown = Shutdown::new();
     let signals = Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT])?;
     let handle = signals.handle();
     debug!("starting signal handler task");
-    let signals_handle = tokio::spawn(handle_signals(signals, shutdown_signal_tx));
+    let signals_handle = tokio::spawn(handle_signals(signals, shutdown.clone()));
 
     // Construct sqlite_pool
     let sqlite_pool = opts
@@ -424,8 +424,10 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
         // spawn (and run) optimize right before we start using the database (e.g. ordering events)
         info!("running initial sqlite database optimize, this may take quite a while on large databases.");
         sqlite_pool.optimize(true).await?;
-        let ss = shutdown_signal.resubscribe();
-        Some(spawn_database_optimizer(sqlite_pool.clone(), ss))
+        Some(spawn_database_optimizer(
+            sqlite_pool.clone(),
+            shutdown.wait_fut(),
+        ))
     } else {
         None
     };
@@ -436,14 +438,11 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
     let peer_svc = Arc::new(PeerService::new(sqlite_pool.clone()));
     let interest_svc = Arc::new(InterestService::new(sqlite_pool.clone()));
     let event_validation = opts.event_validation.unwrap_or(true);
-    let mut ss = shutdown_signal.resubscribe();
     let event_svc = Arc::new(
         EventService::try_new(
             sqlite_pool.clone(),
             ceramic_event_svc::UndeliveredEventReview::Process {
-                shutdown_signal: Box::new(async move {
-                    let _ = ss.recv().await;
-                }),
+                shutdown_signal: Box::new(shutdown.wait_fut()),
             },
             event_validation,
             rpc_providers,
@@ -636,14 +635,10 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
 
         // Start aggregator
         let aggregator_handle = if opts.aggregator.unwrap_or_default() {
-            let mut ss = shutdown_signal.resubscribe();
             let ctx = ctx.clone();
+            let s = shutdown.wait_fut();
             Some(tokio::spawn(async move {
-                if let Err(err) = ceramic_pipeline::aggregator::run(ctx, async move {
-                    let _ = ss.recv().await;
-                })
-                .await
-                {
+                if let Err(err) = ceramic_pipeline::aggregator::run(ctx, s).await {
                     error!(%err, "aggregator task failed");
                 }
             }))
@@ -651,14 +646,9 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
             None
         };
 
-        let mut ss = shutdown_signal.resubscribe();
         let pipeline_ctx = ctx.clone();
-        let flight_handle = tokio::spawn(async move {
-            ceramic_flight::server::run(ctx, addr, async move {
-                let _ = ss.recv().await;
-            })
-            .await
-        });
+        let flight_handle =
+            tokio::spawn(ceramic_flight::server::run(ctx, addr, shutdown.wait_fut()));
         (Some(pipeline_ctx), aggregator_handle, Some(flight_handle))
     } else {
         (None, None, None)
@@ -679,7 +669,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
                 Duration::from_secs(opts.anchor_poll_interval),
                 opts.anchor_poll_retry_count,
             );
-            let mut anchor_service = AnchorService::new(
+            let anchor_service = AnchorService::new(
                 Arc::new(remote_cas),
                 event_svc.clone(),
                 sqlite_pool.clone(),
@@ -688,14 +678,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
                 opts.anchor_batch_size,
             );
 
-            let mut shutdown_signal = shutdown_signal.resubscribe();
-            Some(tokio::spawn(async move {
-                anchor_service
-                    .run(async move {
-                        let _ = shutdown_signal.recv().await;
-                    })
-                    .await
-            }))
+            Some(tokio::spawn(anchor_service.run(shutdown.wait_fut())))
         } else {
             None
         };
@@ -708,7 +691,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
         Arc::new(model_svc),
         ipfs.client(),
         pipeline_ctx,
-        shutdown_signal.resubscribe(),
+        shutdown.clone(),
     );
     if opts.authentication {
         ceramic_server.with_authentication(true);
@@ -740,9 +723,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
     hyper::server::Server::try_bind(&opts.bind_address.parse()?)
         .map_err(|e| anyhow!("Failed to bind address: {}. {}", opts.bind_address, e))?
         .serve(service)
-        .with_graceful_shutdown(async move {
-            let _ = shutdown_signal.recv().await;
-        })
+        .with_graceful_shutdown(shutdown.wait_fut())
         .await?;
     debug!("api server finished, starting shutdown...");
 
diff --git a/one/src/lib.rs b/one/src/lib.rs
index a9ccaa0bd..dd68ece39 100644
--- a/one/src/lib.rs
+++ b/one/src/lib.rs
@@ -20,10 +20,11 @@ use multibase::Base;
 use multihash::Multihash;
 use multihash_codetable::Code;
 use multihash_derive::Hasher;
+use shutdown::Shutdown;
 use signal_hook_tokio::Signals;
 use std::str::FromStr;
 use std::{env, path::PathBuf};
-use tokio::{io::AsyncReadExt, sync::broadcast};
+use tokio::io::AsyncReadExt;
 use tracing::{debug, error, info, warn};
 
 #[derive(Parser, Debug)]
@@ -343,15 +344,13 @@ impl DBOpts {
     }
 }
 
-async fn handle_signals(mut signals: Signals, shutdown: broadcast::Sender<()>) {
+async fn handle_signals(mut signals: Signals, shutdown: Shutdown) {
     let mut shutdown = Some(shutdown);
     while let Some(signal) = signals.next().await {
         debug!(?signal, "signal received");
         if let Some(shutdown) = shutdown.take() {
             info!("sending shutdown message");
-            shutdown
-                .send(())
-                .expect("should be able to send shutdown message");
+            shutdown.shutdown();
         }
     }
 }
diff --git a/shutdown/Cargo.toml b/shutdown/Cargo.toml
new file mode 100644
index 000000000..fd84fcc77
--- /dev/null
+++ b/shutdown/Cargo.toml
@@ -0,0 +1,11 @@
+[package]
+name = "shutdown"
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[dependencies]
+tokio.workspace = true
+futures.workspace = true
diff --git a/shutdown/src/lib.rs b/shutdown/src/lib.rs
new file mode 100644
index 000000000..b5f6d9a7c
--- /dev/null
+++ b/shutdown/src/lib.rs
@@ -0,0 +1,39 @@
+use futures::future::BoxFuture;
+use tokio::sync::broadcast;
+
+/// A shutdown signal is a future that resolve to unit.
+pub type ShutdownSignal = BoxFuture<'static, ()>;
+
+/// Shutdown can be used to signal shutdown across many different tasks.
+/// Shutdown is cheaply clonable so it can be shared with as many tasks as needed.
+#[derive(Clone)]
+pub struct Shutdown {
+    tx: broadcast::Sender<()>,
+}
+
+impl Default for Shutdown {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl Shutdown {
+    pub fn new() -> Self {
+        let (tx, _rx) = broadcast::channel(1);
+        Self { tx }
+    }
+    /// Signal that all listeners should shutdown.
+    /// Shutdown can be called from any clone.
+    pub fn shutdown(&self) {
+        let _ = self.tx.send(());
+    }
+    /// Construct a future that resolves when the shutdown signal is sent.
+    ///
+    /// The future is cancel safe.
+    pub fn wait_fut(&self) -> ShutdownSignal {
+        let mut sub = self.tx.subscribe();
+        Box::pin(async move {
+            let _ = sub.recv().await;
+        })
+    }
+}