Skip to content

Commit

Permalink
Append chunks from the event stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Lol3rrr committed Apr 10, 2024
1 parent 57c5657 commit 2a8f8b4
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ axum = { version = "0.6" }
reqwest = { version = "0.11", features = ["rustls-tls"], default_features = false }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "std", "json"] }
jwt = { version = "0.16" }
jwt = { version = "0.16" }
bytes = "1.6"
10 changes: 9 additions & 1 deletion src/eventstream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{future::Future, sync::Arc, time::Duration};

use bytes::BytesMut;
use reqwest::Url;
use serde::Deserialize;

Expand All @@ -22,6 +23,8 @@ impl EventStream {
async fn listen(mut self, notify: Arc<tokio::sync::Notify>) {
let req_url = self.base_url.join("v1/event/stream").expect("");

let mut pending = BytesMut::new();

loop {
let mut specific_url = req_url.clone();
specific_url.set_query(Some(&format!("index={}", self.index)));
Expand All @@ -41,15 +44,20 @@ impl EventStream {
continue;
}

let event: EventResponse = match serde_json::from_slice(&chunk) {
pending.extend(chunk.as_ref());

let event: EventResponse = match serde_json::from_slice(&pending) {
Ok(e) => e,
Err(err) => {
tracing::error!("Parsing Event: {:?}", err);
tracing::error!("Chunk: {:?}", chunk);
tracing::error!("Pending: {:?}", pending);
continue;
}
};

pending.clear();

tracing::debug!("Event: {:#?}", event);

notify.notify_waiters();
Expand Down

0 comments on commit 2a8f8b4

Please sign in to comment.