Skip to content

Commit

Permalink
Fixed some issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Lol3rrr committed Apr 11, 2024
1 parent ea22fec commit d539d5d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
39 changes: 23 additions & 16 deletions src/eventstream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{future::Future, sync::Arc, time::Duration};

use bytes::BytesMut;
use bytes::{Buf, BytesMut};
use reqwest::Url;
use serde::Deserialize;

Expand Down Expand Up @@ -40,29 +40,36 @@ impl EventStream {
_ => break,
};

if chunk.len() <= 2 {
continue;
}

pending.extend(chunk.as_ref());

let event: EventResponse = match serde_json::from_slice(&pending) {
let tmp = pending
.iter()
.enumerate()
.find(|(_, v)| **v == b'\n')
.map(|(i, _)| i);
let end_index = match tmp {
Some(i) => i,
None => continue,
};

let content = pending.split_to(end_index);
let _ = pending.split_to(1);

let event: EventResponse = match serde_json::from_slice(&content) {
Ok(e) => e,
Err(err) => {
tracing::error!("Parsing Event: {:?}", err);
// tracing::error!("Chunk: {:?}", chunk);
// tracing::error!("Pending: {:?}", pending);
continue;
}
};

pending.clear();

tracing::debug!("Event: {:#?}", event);

notify.notify_waiters();
if let Some(index) = event.index {
self.index = core::cmp::max(self.index, index);
}

self.index = event.index;
notify.notify_waiters();
}
} else {
tracing::error!("{:?}", resp);
Expand All @@ -82,15 +89,15 @@ impl EventStream {
#[derive(Debug, Deserialize)]
struct EventResponse {
#[serde(rename = "Events")]
events: Vec<Event>,
events: Option<Vec<Event>>,
#[serde(rename = "Index")]
index: usize,
index: Option<usize>,
}

#[derive(Debug, Deserialize)]
struct Event {
#[serde(rename = "FilterKeys")]
filter_keys: Vec<String>,
#[serde(rename = "FilterKeys", default)]
filter_keys: Option<Vec<String>>,
#[serde(rename = "Index")]
index: usize,
#[serde(rename = "Key")]
Expand Down
10 changes: 7 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ impl Client {
async fn check(&self) {
tracing::info!("Running Check");
tracing::info!("Loading Tasks...");
let raw_task_list = nomad::list_jobs(&self.client, &self.nomad_url)
.await
.unwrap();
let raw_task_list = match nomad::list_jobs(&self.client, &self.nomad_url).await {
Ok(t) => t,
Err(e) => {
tracing::error!("Loading List: {:?}", e);
return;
}
};

let tasks = {
let mut tmp = Vec::new();
Expand Down

0 comments on commit d539d5d

Please sign in to comment.