From 306de3b3414609a68db10ac5656709d8a091dc0d Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Mon, 27 May 2024 14:49:07 +0200 Subject: [PATCH] Mini cleanup Signed-off-by: Heinz N. Gies --- tremor-connectors/src/source.rs | 12 ++++++------ tremor-script/src/srs.rs | 18 +++++++----------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/tremor-connectors/src/source.rs b/tremor-connectors/src/source.rs index edde26abbc..acfe50a7ac 100644 --- a/tremor-connectors/src/source.rs +++ b/tremor-connectors/src/source.rs @@ -1254,9 +1254,9 @@ async fn build_events( ) .await; let (port, payload) = match line_value { - Ok(decoded) => (port.unwrap_or(&OUT).clone(), decoded), - Err(None) => continue, - Err(Some(e)) => ( + Ok(Some(decoded)) => (port.unwrap_or(&OUT).clone(), decoded), + Ok(None) => continue, + Err(e) => ( ERR, make_error(alias, &e.into(), stream_state.stream_id, pull_id, meta), ), @@ -1315,9 +1315,9 @@ async fn build_last_events( ) .await; let (port, payload) = match line_value { - Ok(decoded) => (port.unwrap_or(&OUT).clone(), decoded), - Err(None) => continue, - Err(Some(e)) => ( + Ok(Some(decoded)) => (port.unwrap_or(&OUT).clone(), decoded), + Ok(None) => continue, + Err(e) => ( ERR, make_error(alias, &e.into(), stream_state.stream_id, pull_id, meta), ), diff --git a/tremor-script/src/srs.rs b/tremor-script/src/srs.rs index 415cb37c46..cc7a3f4bd0 100644 --- a/tremor-script/src/srs.rs +++ b/tremor-script/src/srs.rs @@ -139,22 +139,18 @@ impl EventPayload { meta: Value<'static>, ingest_ns: &mut u64, codec: &mut Box, - ) -> std::result::Result> { + ) -> std::result::Result, crate::errors::Error> { let mut raw = Pin::new(raw); // We are keeping the Vector pinnd and as part of the same struct, and the decoded data // can't leak this function aside of inside this struct. // ALLOW: See above explenation let r = unsafe { mem::transmute::<&mut [u8], &'static mut [u8]>(raw.as_mut().get_mut()) }; - let res = codec.decode(r, *ingest_ns, meta).await; - match res { - Ok(None) => Err(None), - Err(e) => Err(Some(e.into())), - Ok(Some((decoded, meta))) => { - let data = ValueAndMeta::from_parts(decoded, meta); - let raw = vec![Arc::new(raw)]; - Ok(Self { raw, data }) - } - } + let res = codec.decode(r, *ingest_ns, meta).await?; + Ok(res.map(|(decoded, meta)| { + let data = ValueAndMeta::from_parts(decoded, meta); + let raw = vec![Arc::new(raw)]; + Self { raw, data } + })) } /// Named after the original rental struct for easy rewriting.