Skip to content

Commit

Permalink
Update ResultVec
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Sep 3, 2021
1 parent 81d865c commit 8bc5a2b
Show file tree
Hide file tree
Showing 23 changed files with 153 additions and 176 deletions.
19 changes: 9 additions & 10 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub enum Reply {
/// circuit breaker events, guaranteed delivery events, etc.
///
/// A response is an event generated from the sink delivery.
pub(crate) type ResultVec = Result<Option<Vec<Reply>>>;
pub(crate) type ResultVec = Result<Vec<Reply>>;

/// a sink for events -> sent to the outside world
#[async_trait::async_trait]
Expand Down Expand Up @@ -200,14 +200,13 @@ where
input: &str,
event: Event,
) -> Result<()> {
if let Some(mut replies) = self.sink.on_event(input, codec, codec_map, event).await? {
for reply in replies.drain(..) {
match reply {
Reply::Insight(e) => handle_insight(e, self.pipelines.values()).await?,
Reply::Response(port, event) => {
if let Some(pipelines) = self.dest_pipelines.get_mut(&port) {
handle_response(event, pipelines.iter()).await?;
}
let mut replies = self.sink.on_event(input, codec, codec_map, event).await?;
for reply in replies.drain(..) {
match reply {
Reply::Insight(e) => handle_insight(e, self.pipelines.values()).await?,
Reply::Response(port, event) => {
if let Some(pipelines) = self.dest_pipelines.get_mut(&port) {
handle_response(event, pipelines.iter()).await?;
}
}
}
Expand Down Expand Up @@ -245,7 +244,7 @@ where

async fn on_signal(&mut self, signal: Event) -> Option<Event> {
let replies = match self.sink.on_signal(signal).await {
Ok(results) => results?,
Ok(results) => results,
Err(e) => {
if let Some(sink_url) = &self.sink_url {
error!("[Sink::{}] Error processing signal: {}", sink_url, e);
Expand Down
4 changes: 2 additions & 2 deletions src/sink/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl Sink for Amqp {
}
}
}
Ok(None)
Ok(Vec::new())
}
fn default_codec(&self) -> &str {
"json"
Expand All @@ -251,7 +251,7 @@ impl Sink for Amqp {

async fn on_signal(&mut self, _signal: Event) -> ResultVec {
//self.drain_fatal_errors()?;
Ok(None)
Ok(Vec::new())
}
fn is_active(&self) -> bool {
true
Expand Down
6 changes: 3 additions & 3 deletions src/sink/blackhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// This is OK, Blackhole is benchmark only
#![allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]

use crate::sink::prelude::*;
use halfbrown::HashMap;
use hdrhistogram::serialization::{Deserializer, Serializer, V2Serializer};
use hdrhistogram::Histogram;
Expand All @@ -32,7 +33,6 @@ use std::io::{self, stdout, Read, Write};
use std::process;
use std::result;
use std::str;
use crate::sink::prelude::*;

#[derive(Deserialize, Debug, Clone)]
pub struct Config {
Expand Down Expand Up @@ -141,14 +141,14 @@ impl Sink for Blackhole {
self.delivered.record(delta_ns)?;
}
}
Ok(None)
Ok(Vec::new())
}
fn default_codec(&self) -> &str {
"null"
}

async fn on_signal(&mut self, _signal: Event) -> ResultVec {
Ok(None)
Ok(Vec::new())
}

fn is_active(&self) -> bool {
Expand Down
105 changes: 43 additions & 62 deletions src/sink/cb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ impl Sink for Cb {
}
}
}
Ok(Some(res))
Ok(res)
}

async fn on_signal(&mut self, _signal: Event) -> ResultVec {
// TODO: add signal reaction via config
Ok(None)
Ok(Vec::new())
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -169,20 +169,17 @@ mod tests {
let c: &mut dyn Codec = codec.borrow_mut();
let res = cb.on_event(in_, c, &codec_map, event).await?;

assert!(res.is_some(), "got nothing back");
if let Some(replies) = res {
assert_eq!(1, replies.len());
if let Some(Reply::Insight(insight)) = replies.first() {
assert_eq!(CbAction::Ack, insight.cb);
assert_eq!(id, insight.id);
assert_eq!(op_meta, insight.op_meta);
} else {
assert!(
false,
"expected to get anm insight back. Got {:?}",
replies.first()
);
}
assert_eq!(1, res.len());
if let Some(Reply::Insight(insight)) = res.first() {
assert_eq!(CbAction::Ack, insight.cb);
assert_eq!(id, insight.id);
assert_eq!(op_meta, insight.op_meta);
} else {
assert!(
false,
"expected to get anm insight back. Got {:?}",
res.first()
);
}

// meta takes precedence
Expand All @@ -199,20 +196,18 @@ mod tests {
};
let c: &mut dyn Codec = codec.borrow_mut();
let res = cb.on_event(in_, c, &codec_map, event).await?;
assert!(res.is_some(), "got nothing back");
if let Some(replies) = res {
assert_eq!(1, replies.len());
if let Some(Reply::Insight(insight)) = replies.first() {
assert_eq!(CbAction::Fail, insight.cb);
assert_eq!(id, insight.id);
assert_eq!(op_meta, insight.op_meta);
} else {
assert!(
false,
"expected to get anm insight back. Got {:?}",
replies.first()
);
}

assert_eq!(1, res.len());
if let Some(Reply::Insight(insight)) = res.first() {
assert_eq!(CbAction::Fail, insight.cb);
assert_eq!(id, insight.id);
assert_eq!(op_meta, insight.op_meta);
} else {
assert!(
false,
"expected to get anm insight back. Got {:?}",
res.first()
);
}

// array data - second ack/fail will be ignored, just one from ack/fail or open/close (trigger/restore) is returned
Expand All @@ -229,24 +224,17 @@ mod tests {

let c: &mut dyn Codec = codec.borrow_mut();
let res = cb.on_event(in_, c, &codec_map, event).await?;
assert!(res.is_some(), "got nothing back");
if let Some(replies) = res {
assert_eq!(2, replies.len());
match replies.as_slice() {
[Reply::Insight(insight1), Reply::Insight(insight2)] => {
assert_eq!(CbAction::Ack, insight1.cb);
assert_eq!(id, insight1.id);
assert_eq!(op_meta, insight1.op_meta);
assert_eq!(2, res.len());
match res.as_slice() {
[Reply::Insight(insight1), Reply::Insight(insight2)] => {
assert_eq!(CbAction::Ack, insight1.cb);
assert_eq!(id, insight1.id);
assert_eq!(op_meta, insight1.op_meta);

assert_eq!(CbAction::Open, insight2.cb);
assert_eq!(op_meta, insight2.op_meta);
}
_ => assert!(
false,
"expected to get two insights back. Got {:?}",
replies
),
assert_eq!(CbAction::Open, insight2.cb);
assert_eq!(op_meta, insight2.op_meta);
}
_ => assert!(false, "expected to get two insights back. Got {:?}", res),
}

// array data - second ack/fail will be ignored, just one from ack/fail or open/close (trigger/restore) is returned
Expand All @@ -262,24 +250,17 @@ mod tests {
};
let c: &mut dyn Codec = codec.borrow_mut();
let res = cb.on_event(in_, c, &codec_map, event).await?;
assert!(res.is_some(), "got nothing back");
if let Some(replies) = res {
assert_eq!(2, replies.len());
match replies.as_slice() {
[Reply::Insight(insight1), Reply::Insight(insight2)] => {
assert_eq!(CbAction::Fail, insight1.cb);
assert_eq!(id, insight1.id);
assert_eq!(op_meta, insight1.op_meta);
assert_eq!(2, res.len());
match res.as_slice() {
[Reply::Insight(insight1), Reply::Insight(insight2)] => {
assert_eq!(CbAction::Fail, insight1.cb);
assert_eq!(id, insight1.id);
assert_eq!(op_meta, insight1.op_meta);

assert_eq!(CbAction::Close, insight2.cb);
assert_eq!(op_meta, insight2.op_meta);
}
_ => assert!(
false,
"expected to get two insights back. Got {:?}",
replies
),
assert_eq!(CbAction::Close, insight2.cb);
assert_eq!(op_meta, insight2.op_meta);
}
_ => assert!(false, "expected to get two insights back. Got {:?}", res),
}

cb.terminate().await;
Expand Down
6 changes: 3 additions & 3 deletions src/sink/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
#![cfg(not(tarpaulin_include))]

use crate::sink::prelude::*;
use async_std::io;
use halfbrown::HashMap;
use std::time::{Duration, Instant};
use crate::sink::prelude::*;

#[derive(Debug, Clone)]
struct DebugBucket {
Expand Down Expand Up @@ -91,7 +91,7 @@ impl Sink for Debug {
}
self.cnt += 1;
}
Ok(None)
Ok(Vec::new())
}

fn default_codec(&self) -> &str {
Expand All @@ -113,7 +113,7 @@ impl Sink for Debug {
}

async fn on_signal(&mut self, _signal: Event) -> ResultVec {
Ok(None)
Ok(Vec::new())
}
fn is_active(&self) -> bool {
true
Expand Down
4 changes: 2 additions & 2 deletions src/sink/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ impl Sink for Dns {
}
}
}
Ok(Some(res))
Ok(res)
}

async fn on_signal(&mut self, _signal: Event) -> ResultVec {
Ok(None)
Ok(Vec::new())
}

async fn init(
Expand Down
10 changes: 6 additions & 4 deletions src/sink/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,15 +540,16 @@ impl Sink for Elastic {
"[Sink::{}] Received empty event. Won't send it to ES",
self.sink_url
);
Ok(Some(if event.transactional {
Ok(if event.transactional {
vec![Reply::Insight(event.insight_ack())]
} else {
vec![]
}))
})
} else {
// we have either one event or a batched one with > 1 event
self.maybe_enque(event).await;
Ok(None) // insights are sent via reply_channel directly
// insights are sent via reply_channel directly
Ok(Vec::new())
}
}

Expand Down Expand Up @@ -595,7 +596,8 @@ impl Sink for Elastic {
}

async fn on_signal(&mut self, _signal: Event) -> ResultVec {
Ok(None) // insights are sent via reply_channel directly
// insights are sent via reply_channel directly
Ok(Vec::new())
}
fn is_active(&self) -> bool {
true
Expand Down
6 changes: 3 additions & 3 deletions src/sink/exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#![cfg(not(tarpaulin_include))]

use crate::sink::prelude::*;
use halfbrown::HashMap;
use std::time::Duration;
use crate::sink::prelude::*;

pub struct Exit {}

Expand Down Expand Up @@ -55,7 +55,7 @@ impl Sink for Exit {
return Err("Unexpected event received in exit offramp".into());
}
}
Ok(None)
Ok(Vec::new())
}
fn default_codec(&self) -> &str {
"json"
Expand All @@ -74,7 +74,7 @@ impl Sink for Exit {
Ok(())
}
async fn on_signal(&mut self, _signal: Event) -> ResultVec {
Ok(None)
Ok(Vec::new())
}
fn is_active(&self) -> bool {
true
Expand Down
4 changes: 2 additions & 2 deletions src/sink/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Sink for File {
}
file.flush().await?;
}
Ok(Some(vec![sink::Reply::Insight(event.insight_ack())]))
Ok(vec![sink::Reply::Insight(event.insight_ack())])
}
fn default_codec(&self) -> &str {
"json"
Expand All @@ -110,7 +110,7 @@ impl Sink for File {
Ok(())
}
async fn on_signal(&mut self, _signal: Event) -> ResultVec {
Ok(None)
Ok(Vec::new())
}
fn is_active(&self) -> bool {
true
Expand Down
8 changes: 4 additions & 4 deletions src/sink/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl Sink for GoogleCloudStorage {
}

self.is_down = false;
return Ok(Some(vec![qos::ack(&mut event)]));
return Ok(vec![qos::ack(&mut event)]);
}

fn default_codec(&self) -> &str {
Expand Down Expand Up @@ -304,10 +304,10 @@ impl Sink for GoogleCloudStorage {
// This means the port is connectable
info!("Google Cloud Storage - sink remote endpoint - recovered and contactable");
self.is_down = false;
return Ok(Some(vec![qos::open(&mut signal)]));
Ok(vec![qos::open(&mut signal)])
} else {
Ok(Vec::new())
}

Ok(None)
}

fn is_active(&self) -> bool {
Expand Down
Loading

0 comments on commit 8bc5a2b

Please sign in to comment.