From 1b1e74bbcaaefe69b0cfcee1b56492b8144ec1a8 Mon Sep 17 00:00:00 2001 From: Tom Sherman Date: Sun, 1 Dec 2024 12:13:57 +0000 Subject: [PATCH] Abstract dlq inputs --- Cargo.lock | 6 ++++-- packages-rs/drainpipe-store/Cargo.toml | 2 ++ packages-rs/drainpipe-store/src/lib.rs | 21 ++++++++++++++++----- packages-rs/drainpipe/src/main.rs | 7 ++----- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b0c0bf4..969dc6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,8 +461,10 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "jetstream", "log", "serde", + "serde_json", "sled", ] @@ -1678,9 +1680,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", diff --git a/packages-rs/drainpipe-store/Cargo.toml b/packages-rs/drainpipe-store/Cargo.toml index f204e14..fdb5a07 100644 --- a/packages-rs/drainpipe-store/Cargo.toml +++ b/packages-rs/drainpipe-store/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] anyhow = "1.0.93" bincode = "1.3.3" +jetstream = { path = "../jetstream" } log = "0.4.22" serde = { version = "1.0.215", features = ["derive"] } +serde_json = "1.0.133" sled = "0.34.7" diff --git a/packages-rs/drainpipe-store/src/lib.rs b/packages-rs/drainpipe-store/src/lib.rs index 92871b6..37a8fb7 100644 --- a/packages-rs/drainpipe-store/src/lib.rs +++ b/packages-rs/drainpipe-store/src/lib.rs @@ -94,11 +94,7 @@ impl Store { .transpose() } - pub fn record_dead_letter( - &self, - commit_json: String, - error_message: String, - ) -> anyhow::Result<()> { + fn record_dead_letter(&self, commit_json: String, error_message: String) -> anyhow::Result<()> { let key = self.db.generate_id()?.to_string(); self.dead_letter_tree.insert( key.clone(), @@ -106,4 +102,19 @@ impl Store { )?; Ok(()) } + + pub fn record_dead_letter_commit( + &self, + commit: &jetstream::event::CommitEvent, + error_message: String, + ) -> anyhow::Result<()> { + self.record_dead_letter(serde_json::to_string(commit)?, error_message) + } + + pub fn record_dead_letter_jetstream_error( + &self, + error: &jetstream::error::JetstreamEventError, + ) -> anyhow::Result<()> { + self.record_dead_letter("null".into(), error.to_string()) + } } diff --git a/packages-rs/drainpipe/src/main.rs b/packages-rs/drainpipe/src/main.rs index 8b88f72..bd55a87 100644 --- a/packages-rs/drainpipe/src/main.rs +++ b/packages-rs/drainpipe/src/main.rs @@ -75,10 +75,7 @@ async fn main() -> anyhow::Result<()> { send_frontpage_commit(&config, commit).await.or_else(|e| { log::error!("Error processing commit: {:?}", e); - store.record_dead_letter( - serde_json::to_string(commit)?, - e.to_string(), - ) + store.record_dead_letter_commit(&commit, e.to_string()) })? } @@ -90,7 +87,7 @@ async fn main() -> anyhow::Result<()> { } Ok(Err(e)) => { - store.record_dead_letter("null".into(), e.to_string())?; + store.record_dead_letter_jetstream_error(&e)?; log::error!( "Error receiving event (possible junk event structure?): {:?}", e