Skip to content

Commit

Permalink
Mini cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser authored and darach committed May 27, 2024
1 parent fa79839 commit 306de3b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 17 deletions.
12 changes: 6 additions & 6 deletions tremor-connectors/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1254,9 +1254,9 @@ async fn build_events(
)
.await;
let (port, payload) = match line_value {
Ok(decoded) => (port.unwrap_or(&OUT).clone(), decoded),
Err(None) => continue,
Err(Some(e)) => (
Ok(Some(decoded)) => (port.unwrap_or(&OUT).clone(), decoded),
Ok(None) => continue,
Err(e) => (
ERR,
make_error(alias, &e.into(), stream_state.stream_id, pull_id, meta),
),
Expand Down Expand Up @@ -1315,9 +1315,9 @@ async fn build_last_events(
)
.await;
let (port, payload) = match line_value {
Ok(decoded) => (port.unwrap_or(&OUT).clone(), decoded),
Err(None) => continue,
Err(Some(e)) => (
Ok(Some(decoded)) => (port.unwrap_or(&OUT).clone(), decoded),
Ok(None) => continue,
Err(e) => (
ERR,
make_error(alias, &e.into(), stream_state.stream_id, pull_id, meta),
),
Expand Down
18 changes: 7 additions & 11 deletions tremor-script/src/srs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,18 @@ impl EventPayload {
meta: Value<'static>,
ingest_ns: &mut u64,
codec: &mut Box<dyn Codec>,
) -> std::result::Result<Self, Option<crate::errors::Error>> {
) -> std::result::Result<Option<Self>, crate::errors::Error> {
let mut raw = Pin::new(raw);
// We are keeping the Vector pinnd and as part of the same struct, and the decoded data
// can't leak this function aside of inside this struct.
// ALLOW: See above explenation
let r = unsafe { mem::transmute::<&mut [u8], &'static mut [u8]>(raw.as_mut().get_mut()) };
let res = codec.decode(r, *ingest_ns, meta).await;
match res {
Ok(None) => Err(None),
Err(e) => Err(Some(e.into())),
Ok(Some((decoded, meta))) => {
let data = ValueAndMeta::from_parts(decoded, meta);
let raw = vec![Arc::new(raw)];
Ok(Self { raw, data })
}
}
let res = codec.decode(r, *ingest_ns, meta).await?;
Ok(res.map(|(decoded, meta)| {
let data = ValueAndMeta::from_parts(decoded, meta);
let raw = vec![Arc::new(raw)];
Self { raw, data }
}))
}

/// Named after the original rental struct for easy rewriting.
Expand Down

0 comments on commit 306de3b

Please sign in to comment.