Skip to content

Commit

Permalink
FIX: Align sync and async on partial records
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen committed Nov 20, 2024
1 parent 1ad1982 commit d7d3155
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 0.24.0 - TBD

### Breaking changes
- Changed async DBN decoding to return `Ok(None)` when an incomplete record remains in
the stream. This matches the existing behavior of sync DBN decoding

## 0.23.1 - 2024-11-12

### Enhancements
Expand Down
24 changes: 13 additions & 11 deletions rust/dbn/src/decode/dbn/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,16 +377,7 @@ where
}))
};
} else if self.state == DecoderState::Eof {
// there should be no remaining bytes in the buffer after reaching EOF
// and yielding all complete records
return if self.read_buf.remaining() == 0 {
Ok(None)
} else {
Err(crate::Error::decode(format!(
"unexpected partial record remaining in stream: {} bytes",
self.read_buf.remaining()
)))
};
return Ok(None);
} else {
self.state = DecoderState::Read;
}
Expand Down Expand Up @@ -834,13 +825,24 @@ mod tests {
);
}

#[tokio::test]
async fn test_decode_partial_record() {
let buf = vec![6u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
assert!(buf[0] as usize * RecordHeader::LENGTH_MULTIPLIER > buf.len());

let mut target = RecordDecoder::new(buf.as_slice());
let res = target.decode_ref().await;
dbg!(&res);
assert!(matches!(res, Ok(None)));
}

#[tokio::test]
async fn test_decode_record_length_longer_than_buffer() {
let rec = ErrorMsg::new(1680703198000000000, "Test", true);
let mut target = RecordDecoder::new(&rec.as_ref()[..rec.record_size() - 1]);
let res = target.decode_ref().await;
dbg!(&res);
assert!(matches!(res, Err(Error::Decode(msg)) if msg.starts_with("unexpected")));
assert!(matches!(res, Ok(None)));
}

#[tokio::test]
Expand Down
11 changes: 11 additions & 0 deletions rust/dbn/src/decode/dbn/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,17 @@ mod tests {
);
}

#[test]
fn test_decode_partial_record() {
let buf = vec![6u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
assert!(buf[0] as usize * RecordHeader::LENGTH_MULTIPLIER > buf.len());

let mut target = RecordDecoder::new(buf.as_slice());
let res = target.decode_ref();
dbg!(&res);
assert!(matches!(res, Ok(None)));
}

#[test]
fn test_decode_record_length_less_than_header() {
let buf = vec![3u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
Expand Down

0 comments on commit d7d3155

Please sign in to comment.