Skip to content

Commit

Permalink
Abstract dlq inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-sherman committed Dec 1, 2024
1 parent 17d6ac8 commit 1b1e74b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 12 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages-rs/drainpipe-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ edition = "2021"
[dependencies]
anyhow = "1.0.93"
bincode = "1.3.3"
jetstream = { path = "../jetstream" }
log = "0.4.22"
serde = { version = "1.0.215", features = ["derive"] }
serde_json = "1.0.133"
sled = "0.34.7"
21 changes: 16 additions & 5 deletions packages-rs/drainpipe-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,27 @@ impl Store {
.transpose()
}

pub fn record_dead_letter(
&self,
commit_json: String,
error_message: String,
) -> anyhow::Result<()> {
fn record_dead_letter(&self, commit_json: String, error_message: String) -> anyhow::Result<()> {
let key = self.db.generate_id()?.to_string();
self.dead_letter_tree.insert(
key.clone(),
bincode::serialize(&DeadLetter::new(key, commit_json, error_message))?,
)?;
Ok(())
}

pub fn record_dead_letter_commit(
&self,
commit: &jetstream::event::CommitEvent,
error_message: String,
) -> anyhow::Result<()> {
self.record_dead_letter(serde_json::to_string(commit)?, error_message)
}

pub fn record_dead_letter_jetstream_error(
&self,
error: &jetstream::error::JetstreamEventError,
) -> anyhow::Result<()> {
self.record_dead_letter("null".into(), error.to_string())
}
}
7 changes: 2 additions & 5 deletions packages-rs/drainpipe/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ async fn main() -> anyhow::Result<()> {

send_frontpage_commit(&config, commit).await.or_else(|e| {
log::error!("Error processing commit: {:?}", e);
store.record_dead_letter(
serde_json::to_string(commit)?,
e.to_string(),
)
store.record_dead_letter_commit(&commit, e.to_string())
})?
}

Expand All @@ -90,7 +87,7 @@ async fn main() -> anyhow::Result<()> {
}

Ok(Err(e)) => {
store.record_dead_letter("null".into(), e.to_string())?;
store.record_dead_letter_jetstream_error(&e)?;
log::error!(
"Error receiving event (possible junk event structure?): {:?}",
e
Expand Down

0 comments on commit 1b1e74b

Please sign in to comment.