From 03cc5463a12a8e3a6e9f562a7a18ea5f1dd59aa0 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Mon, 22 Jul 2024 17:37:34 -0500 Subject: [PATCH 1/2] FIX: Fix support for multi-frame in `AsyncDynRead` --- CHANGELOG.md | 6 ++++++ rust/dbn/src/decode.rs | 35 +++++++++++++++++++++++++++++++- rust/dbn/src/decode/dbn/async.rs | 16 ++++----------- rust/dbn/src/decode/zstd.rs | 12 +++++++++++ 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e54e04..f9d5539 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.19.2 - TBD + +### Bug fixes +- Fixed issue where `AsyncDynReader` would only decode the first frame of multi-frame + Zstandard files + ## 0.19.1 - 2024-07-16 ### Bug fixes diff --git a/rust/dbn/src/decode.rs b/rust/dbn/src/decode.rs index 5f7ccad..6946719 100644 --- a/rust/dbn/src/decode.rs +++ b/rust/dbn/src/decode.rs @@ -570,6 +570,8 @@ mod r#async { use crate::enums::Compression; + use super::zstd::zstd_decoder; + /// A type for runtime polymorphism on compressed and uncompressed input. /// The async version of [`DynReader`](super::DynReader). pub struct DynReader(DynReaderImpl) @@ -629,7 +631,7 @@ mod r#async { .await .map_err(|e| crate::Error::io(e, "creating buffer to infer encoding"))?; Ok(if super::zstd::starts_with_prefix(first_bytes) { - Self(DynReaderImpl::ZStd(ZstdDecoder::new(reader))) + Self(DynReaderImpl::ZStd(zstd_decoder(reader))) } else { Self(DynReaderImpl::Uncompressed(reader)) }) @@ -693,4 +695,35 @@ mod r#async { } } } + + #[cfg(test)] + mod tests { + use crate::{ + compat::InstrumentDefMsgV1, + decode::{tests::TEST_DATA_PATH, AsyncDbnRecordDecoder}, + VersionUpgradePolicy, + }; + + use super::*; + + #[tokio::test] + async fn test_decode_multiframe_zst() { + let mut decoder = AsyncDbnRecordDecoder::with_version( + DynReader::from_file(&format!( + "{TEST_DATA_PATH}/multi-frame.definition.v1.dbn.frag.zst" + )) + .await + .unwrap(), + 1, + VersionUpgradePolicy::AsIs, + false, + ) + .unwrap(); + let mut count = 0; + while let Some(_rec) = decoder.decode::().await.unwrap() { + count += 1; + } + assert_eq!(count, 8); + } + } } diff --git a/rust/dbn/src/decode/dbn/async.rs b/rust/dbn/src/decode/dbn/async.rs index 273b8a4..e9e42f0 100644 --- a/rust/dbn/src/decode/dbn/async.rs +++ b/rust/dbn/src/decode/dbn/async.rs @@ -8,21 +8,13 @@ use tokio::{ use crate::{ compat, - decode::{r#async::ZSTD_FILE_BUFFER_CAPACITY, FromLittleEndianSlice, VersionUpgradePolicy}, + decode::{ + r#async::ZSTD_FILE_BUFFER_CAPACITY, zstd::zstd_decoder, FromLittleEndianSlice, + VersionUpgradePolicy, + }, HasRType, Metadata, Record, RecordHeader, RecordRef, Result, DBN_VERSION, METADATA_FIXED_LEN, }; -/// Helper to always set multiple members. -fn zstd_decoder(reader: R) -> ZstdDecoder -where - R: io::AsyncBufReadExt + Unpin, -{ - let mut zstd_decoder = ZstdDecoder::new(reader); - // explicitly enable decoding multiple frames - zstd_decoder.multiple_members(true); - zstd_decoder -} - /// An async decoder for Databento Binary Encoding (DBN), both metadata and records. pub struct Decoder where diff --git a/rust/dbn/src/decode/zstd.rs b/rust/dbn/src/decode/zstd.rs index 7d99132..b2fb397 100644 --- a/rust/dbn/src/decode/zstd.rs +++ b/rust/dbn/src/decode/zstd.rs @@ -15,6 +15,18 @@ pub fn starts_with_prefix(bytes: &[u8]) -> bool { ZSTD_MAGIC_NUMBER == magic } +/// Helper to always set multiple members. +#[cfg(feature = "async")] +pub(crate) fn zstd_decoder(reader: R) -> async_compression::tokio::bufread::ZstdDecoder +where + R: tokio::io::AsyncBufReadExt + Unpin, +{ + let mut zstd_decoder = async_compression::tokio::bufread::ZstdDecoder::new(reader); + // explicitly enable decoding multiple frames + zstd_decoder.multiple_members(true); + zstd_decoder +} + #[cfg(test)] mod tests { use std::{fs::File, io::Read}; From f62a74f86b1373b1d2635c7b724b85aa2d3ca72d Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 23 Jul 2024 08:34:45 -0500 Subject: [PATCH 2/2] VER: Release 0.19.2 --- Cargo.lock | 10 +++++----- Cargo.toml | 2 +- python/pyproject.toml | 4 ++-- rust/dbn-cli/Cargo.toml | 2 +- rust/dbn/Cargo.toml | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e54f10..fd9eab9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,7 +256,7 @@ dependencies = [ [[package]] name = "databento-dbn" -version = "0.19.1" +version = "0.19.2" dependencies = [ "dbn", "pyo3", @@ -267,7 +267,7 @@ dependencies = [ [[package]] name = "dbn" -version = "0.19.1" +version = "0.19.2" dependencies = [ "async-compression", "csv", @@ -289,7 +289,7 @@ dependencies = [ [[package]] name = "dbn-c" -version = "0.19.1" +version = "0.19.2" dependencies = [ "anyhow", "cbindgen", @@ -299,7 +299,7 @@ dependencies = [ [[package]] name = "dbn-cli" -version = "0.19.1" +version = "0.19.2" dependencies = [ "anyhow", "assert_cmd", @@ -314,7 +314,7 @@ dependencies = [ [[package]] name = "dbn-macros" -version = "0.19.1" +version = "0.19.2" dependencies = [ "csv", "dbn", diff --git a/Cargo.toml b/Cargo.toml index e3cceda..81bf2cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ resolver = "2" [workspace.package] authors = ["Databento "] edition = "2021" -version = "0.19.1" +version = "0.19.2" documentation = "https://docs.databento.com" repository = "https://github.com/databento/dbn" license = "Apache-2.0" diff --git a/python/pyproject.toml b/python/pyproject.toml index 4786bb8..ff43497 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databento-dbn" -version = "0.19.1" +version = "0.19.2" description = "Python bindings for encoding and decoding Databento Binary Encoding (DBN)" authors = ["Databento "] license = "Apache-2.0" @@ -17,7 +17,7 @@ build-backend = "maturin" [project] name = "databento-dbn" -version = "0.19.1" +version = "0.19.2" authors = [ { name = "Databento", email = "support@databento.com" } ] diff --git a/rust/dbn-cli/Cargo.toml b/rust/dbn-cli/Cargo.toml index 40fbed5..93c5c4b 100644 --- a/rust/dbn-cli/Cargo.toml +++ b/rust/dbn-cli/Cargo.toml @@ -16,7 +16,7 @@ name = "dbn" path = "src/main.rs" [dependencies] -dbn = { path = "../dbn", version = "=0.19.1", default-features = false } +dbn = { path = "../dbn", version = "=0.19.2", default-features = false } anyhow = { workspace = true } clap = { version = "4.5", features = ["derive", "wrap_help"] } diff --git a/rust/dbn/Cargo.toml b/rust/dbn/Cargo.toml index 7564fda..2cd10a5 100644 --- a/rust/dbn/Cargo.toml +++ b/rust/dbn/Cargo.toml @@ -25,7 +25,7 @@ serde = ["dep:serde", "time/parsing", "time/serde"] trivial_copy = [] [dependencies] -dbn-macros = { version = "=0.19.1", path = "../dbn-macros" } +dbn-macros = { version = "=0.19.2", path = "../dbn-macros" } async-compression = { version = "0.4.11", features = ["tokio", "zstd"], optional = true } csv = { workspace = true }