Skip to content

Commit

Permalink
VER: Release 0.18.2
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen authored Jun 18, 2024
2 parents a4ef81e + 59d064b commit 7eee1ee
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 27 deletions.
24 changes: 19 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## 0.18.2 - 2024-06-18

### Enhancements
- Added new `shutdown` method to async encoders to more easily ensure the end
of output is written and I/O cleaned up. Previously this required a call to
`.get_mut().shutdown().await`
- Changed `AsyncDynWriter` and `AsyncDbnEncoder::with_zstd` to use a zstd checksum like
the sync equivalents
- Added new publisher values for `XNAS.BASIC` and `XNAS.NLS`

### Bug fixes
- Fixed bug where DBN metadata would still be upgraded after passing `AsIs` to
`DbnDecoder::set_upgrade_policy` and `AsyncDbnDecoder::set_upgrade_policy`

## 0.18.1 - 2024-06-04

### Enhancements
Expand All @@ -8,7 +22,7 @@
- Added new off-market publisher values for `IFEU.IMPACT` and `NDEX.IMPACT`

### Bug fixes
- Fix descriptions for `FINN` and `FINY` publishers.
- Fixed descriptions for `FINN` and `FINY` publishers

## 0.18.0 - 2024-05-21

Expand Down Expand Up @@ -451,7 +465,7 @@
- Added new `OHLCV_EOD` rtype for future daily OHLCV schema based on the trading
session
- Added new `SType::Nasdaq` and `SType::Cms` to support querying US equities datasets
using either convention, regardless of the original convention of the dataset.
using either convention, regardless of the original convention of the dataset
- Relaxed `pyo3`, `tokio`, and `zstd` dependency version requirements
- Added `FIXED_PRICE_SCALE` constant to `databento_dbn` Python package
- Added generated field metadata for each record type to aid in pandas DataFrame
Expand All @@ -466,15 +480,15 @@
## 0.8.0 - 2023-07-19
### Enhancements
- Switched from `anyhow::Error` to custom `dbn::Error` for all public fallible functions
and methods. This should make it easier to disambiguate between error types.
and methods. This should make it easier to disambiguate between error types
- `EncodeDbn::encode_record` and `EncodeDbn::record_record_ref` no longer treat a
`BrokenPipe` error differently
- Added `AsyncDbnDecoder`
- Added `pretty::Px` and `pretty::Ts` newtypes to expose price and timestamp formatting
logic outside of CSV and JSON encoding
- Added interning for Python strings
- Added `rtype` to encoded JSON and CSV to aid differeniating between different record types.
This is particularly important when working with live data.
This is particularly important when working with live data
- Added `pretty_` Python attributes for DBN price fields
- Added `pretty_` Python attributes for DBN UTC timestamp fields

Expand All @@ -491,7 +505,7 @@
- Updated `InstrumentDefMsg` serialization order to serialize `raw_symbol`,
`security_update_action`, and `instrument_class` earlier given their importance
- Removed `bool` return value from `EncodeDbn::encode_record` and
`EncodeDbn::record_record_ref`. These now return `dbn::Result<()>`.
`EncodeDbn::record_record_ref`. These now return `dbn::Result<()>`

### Bug fixes
- Fixed handling of NUL byte when encoding DBN to CSV and JSON
Expand Down
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ resolver = "2"
[workspace.package]
authors = ["Databento <[email protected]>"]
edition = "2021"
version = "0.18.1"
version = "0.18.2"
documentation = "https://docs.databento.com"
repository = "https://github.com/databento/dbn"
license = "Apache-2.0"
Expand Down
4 changes: 2 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "databento-dbn"
version = "0.18.1"
version = "0.18.2"
description = "Python bindings for encoding and decoding Databento Binary Encoding (DBN)"
authors = ["Databento <[email protected]>"]
license = "Apache-2.0"
Expand All @@ -17,7 +17,7 @@ build-backend = "maturin"

[project]
name = "databento-dbn"
version = "0.18.1"
version = "0.18.2"
authors = [
{ name = "Databento", email = "[email protected]" }
]
Expand Down
2 changes: 1 addition & 1 deletion rust/dbn-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ name = "dbn"
path = "src/main.rs"

[dependencies]
dbn = { path = "../dbn", version = "=0.18.1", default-features = false }
dbn = { path = "../dbn", version = "=0.18.2", default-features = false }

anyhow = { workspace = true }
clap = { version = "4.5", features = ["derive", "wrap_help"] }
Expand Down
2 changes: 1 addition & 1 deletion rust/dbn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ serde = ["dep:serde", "time/parsing", "time/serde"]
trivial_copy = []

[dependencies]
dbn-macros = { version = "=0.18.1", path = "../dbn-macros" }
dbn-macros = { version = "=0.18.2", path = "../dbn-macros" }

async-compression = { version = "0.4.11", features = ["tokio", "zstd"], optional = true }
csv = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion rust/dbn/src/decode/dbn/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ where

/// Sets the behavior for decoding DBN data of previous versions.
pub fn set_upgrade_policy(&mut self, upgrade_policy: VersionUpgradePolicy) {
self.metadata.upgrade(upgrade_policy);
self.metadata
.set_version(self.decoder.version, upgrade_policy);
self.decoder.set_upgrade_policy(upgrade_policy);
}

Expand Down
3 changes: 2 additions & 1 deletion rust/dbn/src/decode/dbn/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ where

/// Sets the behavior for decoding DBN data of previous versions.
pub fn set_upgrade_policy(&mut self, upgrade_policy: VersionUpgradePolicy) {
self.metadata.upgrade(upgrade_policy);
self.metadata
.set_version(self.decoder.version, upgrade_policy);
self.decoder.set_upgrade_policy(upgrade_policy);
}
}
Expand Down
11 changes: 11 additions & 0 deletions rust/dbn/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ fn zstd_encoder<'a, W: io::Write>(writer: W) -> Result<zstd::stream::AutoFinishE
Ok(zstd_encoder.auto_finish())
}

#[cfg(feature = "async")]
fn async_zstd_encoder<W: tokio::io::AsyncWriteExt + Unpin>(
writer: W,
) -> async_compression::tokio::write::ZstdEncoder<W> {
async_compression::tokio::write::ZstdEncoder::with_quality_and_params(
writer,
async_compression::Level::Precise(ZSTD_COMPRESSION_LEVEL),
&[async_compression::zstd::CParameter::checksum_flag(true)],
)
}

#[cfg(test)]
mod test_data {
use fallible_streaming_iterator::FallibleStreamingIterator;
Expand Down
56 changes: 52 additions & 4 deletions rust/dbn/src/encode/dbn/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use async_compression::tokio::write::ZstdEncoder;
use tokio::io;

use crate::{
encode::DbnEncodable, record_ref::RecordRef, Error, Metadata, Result, SymbolMapping,
DBN_VERSION, NULL_LIMIT, NULL_RECORD_COUNT, NULL_SCHEMA, NULL_STYPE, UNDEF_TIMESTAMP,
encode::{async_zstd_encoder, DbnEncodable},
record_ref::RecordRef,
Error, Metadata, Result, SymbolMapping, DBN_VERSION, NULL_LIMIT, NULL_RECORD_COUNT,
NULL_SCHEMA, NULL_STYPE, UNDEF_TIMESTAMP,
};

/// An async encoder for DBN streams.
Expand Down Expand Up @@ -84,6 +86,14 @@ where
pub async fn flush(&mut self) -> Result<()> {
self.record_encoder.flush().await
}

/// Initiates or attempts to shut down the inner writer.
///
/// # Errors
/// This function returns an error if the shut down did not complete successfully.
pub async fn shutdown(self) -> Result<()> {
self.record_encoder.shutdown().await
}
}

impl<W> Encoder<ZstdEncoder<W>>
Expand All @@ -103,7 +113,7 @@ where
/// metadata may have been partially written, but future calls will begin writing
/// the encoded metadata from the beginning.
pub async fn with_zstd(writer: W, metadata: &Metadata) -> Result<Self> {
Self::new(ZstdEncoder::new(writer), metadata).await
Self::new(async_zstd_encoder(writer), metadata).await
}
}

Expand Down Expand Up @@ -171,6 +181,17 @@ where
.map_err(|e| Error::io(e, "flushing output".to_owned()))
}

/// Initiates or attempts to shut down the inner writer.
///
/// # Errors
/// This function returns an error if the shut down did not complete successfully.
pub async fn shutdown(mut self) -> Result<()> {
self.writer
.shutdown()
.await
.map_err(|e| Error::io(e, "shutting down".to_owned()))
}

/// Returns a reference to the underlying writer.
pub fn get_ref(&self) -> &W {
&self.writer
Expand Down Expand Up @@ -291,11 +312,38 @@ where
Ok(())
}

/// Returns a reference to the underlying writer.
pub fn get_ref(&self) -> &W {
&self.writer
}

/// Returns a mutable reference to the underlying writer.
pub fn get_mut(&mut self) -> &mut W {
&mut self.writer
}

/// Flushes any buffered content to the true output.
///
/// # Errors
/// This function returns an error if it's unable to flush the underlying writer.
pub async fn flush(&mut self) -> Result<()> {
self.writer
.flush()
.await
.map_err(|e| Error::io(e, "flushing output".to_owned()))
}

/// Initiates or attempts to shut down the inner writer.
///
/// # Errors
/// This function returns an error if the shut down did not complete successfully.
pub async fn shutdown(mut self) -> Result<()> {
self.writer
.shutdown()
.await
.map_err(|e| Error::io(e, "shutting down".to_owned()))
}

/// Consumes the encoder returning the original writer.
pub fn into_inner(self) -> W {
self.writer
Expand Down Expand Up @@ -430,7 +478,7 @@ where
/// Creates a new [`MetadataEncoder`] that will Zstandard compress the DBN data
/// written to `writer`.
pub fn with_zstd(writer: W) -> Self {
Self::new(ZstdEncoder::new(writer))
Self::new(async_zstd_encoder(writer))
}
}

Expand Down
4 changes: 2 additions & 2 deletions rust/dbn/src/encode/dyn_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ mod r#async {
use async_compression::tokio::write::ZstdEncoder;
use tokio::io;

use crate::enums::Compression;
use crate::{encode::async_zstd_encoder, enums::Compression};

/// An object that allows for abstracting over compressed and uncompressed output.
pub struct DynWriter<W>(DynWriterImpl<W>)
Expand All @@ -118,7 +118,7 @@ mod r#async {
pub fn new(writer: W, compression: Compression) -> Self {
Self(match compression {
Compression::None => DynWriterImpl::Uncompressed(writer),
Compression::ZStd => DynWriterImpl::ZStd(ZstdEncoder::new(writer)),
Compression::ZStd => DynWriterImpl::ZStd(async_zstd_encoder(writer)),
})
}

Expand Down
11 changes: 11 additions & 0 deletions rust/dbn/src/encode/json/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ where
.await
.map_err(|e| Error::io(e, "flushing output"))
}

/// Initiates or attempts to shut down the inner writer.
///
/// # Errors
/// This function returns an error if the shut down did not complete successfully.
pub async fn shutdown(mut self) -> Result<()> {
self.writer
.shutdown()
.await
.map_err(|e| Error::io(e, "shutting down".to_owned()))
}
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions rust/dbn/src/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ impl From<InstrumentClass> for char {
impl InstrumentClass {
/// Returns `true` if the instrument class is a type of option.
///
/// NOTE: excludes [`Self::MixedSpread`], which *may* include options.
/// Note: excludes [`Self::MixedSpread`], which *may* include options.
pub fn is_option(&self) -> bool {
matches!(self, Self::Call | Self::Put | Self::OptionSpread)
}

/// Returns `true` if the instrument class is a type of future.
///
/// NOTE: excludes [`Self::MixedSpread`], which *may* include futures.
/// Note: excludes [`Self::MixedSpread`], which *may* include futures.
pub fn is_future(&self) -> bool {
matches!(self, Self::Future | Self::FutureSpread)
}
Expand Down
18 changes: 18 additions & 0 deletions rust/dbn/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,24 @@ impl Metadata {
self.symbol_cstr_len = crate::SYMBOL_CSTR_LEN;
}
}

/// Allows upgrade policy to be configured from decoders after Metadata decoding.
/// Using [`upgrade()`] would leave metadata with the wrong `version` and
/// `symbol_cstr_len`.
pub(crate) fn set_version(&mut self, input_version: u8, upgrade_policy: VersionUpgradePolicy) {
if input_version < 2 {
match upgrade_policy {
VersionUpgradePolicy::AsIs => {
self.version = input_version;
self.symbol_cstr_len = crate::compat::SYMBOL_CSTR_LEN_V1;
}
VersionUpgradePolicy::Upgrade => {
self.version = crate::DBN_VERSION;
self.symbol_cstr_len = crate::SYMBOL_CSTR_LEN;
}
}
}
}
}

/// Helper for constructing [`Metadata`] structs with defaults.
Expand Down
Loading

0 comments on commit 7eee1ee

Please sign in to comment.