From af90df4b02868879da21b7618ffca751691e6ead Mon Sep 17 00:00:00 2001 From: Carter Green Date: Wed, 11 Dec 2024 15:34:34 -0600 Subject: [PATCH] REF: Refactor DBN versioning --- CHANGELOG.md | 21 ++ python/python/databento_dbn/_lib.pyi | 16 +- python/python/databento_dbn/v1.py | 25 ++ python/python/databento_dbn/v2.py | 25 ++ python/src/dbn_decoder.rs | 7 +- python/src/transcoder.rs | 13 +- rust/dbn-cli/src/lib.rs | 2 +- rust/dbn/src/compat.rs | 328 +++++++-------------------- rust/dbn/src/decode/dbn/async.rs | 4 +- rust/dbn/src/decode/dbn/sync.rs | 13 +- rust/dbn/src/decode/dbz.rs | 2 +- rust/dbn/src/encode/dbn/async.rs | 8 +- rust/dbn/src/encode/dbn/sync.rs | 14 +- rust/dbn/src/encode/json/sync.rs | 5 +- rust/dbn/src/enums.rs | 30 +-- rust/dbn/src/lib.rs | 3 + rust/dbn/src/macros.rs | 10 +- rust/dbn/src/metadata.rs | 12 +- rust/dbn/src/record.rs | 2 +- rust/dbn/src/record/impl_default.rs | 105 --------- rust/dbn/src/record/methods.rs | 260 +-------------------- rust/dbn/src/symbol_map.rs | 6 +- rust/dbn/src/v1.rs | 35 +++ rust/dbn/src/v1/impl_default.rs | 111 +++++++++ rust/dbn/src/v1/methods.rs | 265 ++++++++++++++++++++++ rust/dbn/src/v2.rs | 182 +++++++++++++++ 26 files changed, 815 insertions(+), 689 deletions(-) create mode 100644 python/python/databento_dbn/v1.py create mode 100644 python/python/databento_dbn/v2.py create mode 100644 rust/dbn/src/v1.rs create mode 100644 rust/dbn/src/v1/impl_default.rs create mode 100644 rust/dbn/src/v1/methods.rs create mode 100644 rust/dbn/src/v2.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 44da036..5055c7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,26 @@ # Changelog +## 0.25.0 - TBD + +### Breaking changes +- Renamed `VersionUpgradePolicy::Upgrade` to `UpgradeToV2` in preparation for a future + DBN version 3 + +### Enhancements +- Added `v1` and `v2` modules to allow unambiguously referring to the record types for + a given DBN version regardless of whether the record type has changed. Previously + versioned aliases only existed for record types that have changed between DBN versions + - Added identical namespaces to `databento_dbn` Python package +- Changed `dataset()` method on `MetadataBuilder` to accept an `impl ToString` so now + `Dataset` and `&str` can be passed directly +- Added type aliases for `TBBOMsg`, `BBO1SMsg`, `BBO1MMsg`, `TCBBOMsg`, `CBBO1SMsg`, + `CBBO1MMsg` in Python + +### Deprecations +- Deprecated `dataset` module. The top-level `Dataset` enum and its `const` `as_str()` + method provide the same functionality for all datasets. This module will be removed in + a future version + ## 0.24.0 - 2024-12-10 ### Enhancements diff --git a/python/python/databento_dbn/_lib.pyi b/python/python/databento_dbn/_lib.pyi index 74cf886..452f80a 100644 --- a/python/python/databento_dbn/_lib.pyi +++ b/python/python/databento_dbn/_lib.pyi @@ -784,13 +784,15 @@ class VersionUpgradePolicy(Enum): AS_IS Decode data from previous versions as-is. - UPGRADE - Decode data from previous versions converting it to the latest version. + UPGRADE_TO_V2 + Decode and convert data from DBN versions prior to version 2 to that version. + Attempting to decode data from newer versions (when they're introduced) will + fail. """ AS_IS: int - UPGRADE: int + UPGRADE_TO_V2: int class Metadata(SupportsBytes): """ @@ -5324,3 +5326,11 @@ def update_encoded_metadata( When the file update fails. """ + +# Aliases +TBBOMsg = MBOMsg +BBO1SMsg = BBOMsg +BBO1MMsg = BBOMsg +TCBBOMsg = CMBP1Msg +CBBO1SMsg = CBBOMsg +CBBO1MMsg = CBBOMsg diff --git a/python/python/databento_dbn/v1.py b/python/python/databento_dbn/v1.py new file mode 100644 index 0000000..c99ac61 --- /dev/null +++ b/python/python/databento_dbn/v1.py @@ -0,0 +1,25 @@ +# ruff: noqa: F401, F811 +from ._lib import BBOMsg +from ._lib import CBBOMsg +from ._lib import CMBP1Msg +from ._lib import ErrorMsgV1 as ErrorMsg +from ._lib import ImbalanceMsg +from ._lib import InstrumentDefMsgV1 as InstrumentDefMsg +from ._lib import MBOMsg +from ._lib import MBP1Msg +from ._lib import MBP10Msg +from ._lib import OHLCVMsg +from ._lib import StatMsg +from ._lib import StatusMsg +from ._lib import SymbolMappingMsgV1 as SymbolMappingMsg +from ._lib import SystemMsgV1 as SystemMsg +from ._lib import TradeMsg + + +# Aliases +TBBOMsg = MBOMsg +BBO1SMsg = BBOMsg +BBO1MMsg = BBOMsg +TCBBOMsg = CMBP1Msg +CBBO1SMsg = CBBOMsg +CBBO1MMsg = CBBOMsg diff --git a/python/python/databento_dbn/v2.py b/python/python/databento_dbn/v2.py new file mode 100644 index 0000000..fdebcca --- /dev/null +++ b/python/python/databento_dbn/v2.py @@ -0,0 +1,25 @@ +# ruff: noqa: F401, F811 +from ._lib import BBOMsg +from ._lib import CBBOMsg +from ._lib import CMBP1Msg +from ._lib import ErrorMsg +from ._lib import ImbalanceMsg +from ._lib import InstrumentDefMsg +from ._lib import MBOMsg +from ._lib import MBP1Msg +from ._lib import MBP10Msg +from ._lib import OHLCVMsg +from ._lib import StatMsg +from ._lib import StatusMsg +from ._lib import SymbolMappingMsg +from ._lib import SystemMsg +from ._lib import TradeMsg + + +# Aliases +TBBOMsg = MBOMsg +BBO1SMsg = BBOMsg +BBO1MMsg = BBOMsg +TCBBOMsg = CMBP1Msg +CBBO1SMsg = CBBOMsg +CBBO1MMsg = CBBOMsg diff --git a/python/src/dbn_decoder.rs b/python/src/dbn_decoder.rs index 7865c36..4b728c5 100644 --- a/python/src/dbn_decoder.rs +++ b/python/src/dbn_decoder.rs @@ -129,11 +129,10 @@ impl DbnDecoder { #[cfg(test)] mod tests { use dbn::{ - datasets::XNAS_ITCH, encode::{dbn::Encoder, EncodeRecord}, enums::{rtype, SType, Schema}, record::{ErrorMsg, OhlcvMsg, RecordHeader}, - MetadataBuilder, DBN_VERSION, + Dataset, MetadataBuilder, DBN_VERSION, }; use pyo3::{py_run, types::PyString}; @@ -148,7 +147,7 @@ mod tests { let mut encoder = Encoder::new( buffer, &MetadataBuilder::new() - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch.to_string()) .schema(Some(Schema::Trades)) .stype_in(Some(SType::RawSymbol)) .stype_out(SType::InstrumentId) @@ -191,7 +190,7 @@ mod tests { let mut encoder = Encoder::new( buffer, &MetadataBuilder::new() - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch.to_string()) .schema(Some(Schema::Ohlcv1S)) .stype_in(Some(SType::RawSymbol)) .stype_out(SType::InstrumentId) diff --git a/python/src/transcoder.rs b/python/src/transcoder.rs index 38efefa..00a77a7 100644 --- a/python/src/transcoder.rs +++ b/python/src/transcoder.rs @@ -454,10 +454,9 @@ mod tests { use std::{io::Read, num::NonZeroU64}; use dbn::{ - datasets::XNAS_ITCH, encode::{DbnEncoder, EncodeRecord}, - rtype, ErrorMsg, MappingInterval, MetadataBuilder, OhlcvMsg, RecordHeader, SType, Schema, - SymbolMapping, SymbolMappingMsg, WithTsOut, DBN_VERSION, UNDEF_TIMESTAMP, + rtype, Dataset, ErrorMsg, MappingInterval, MetadataBuilder, OhlcvMsg, RecordHeader, SType, + Schema, SymbolMapping, SymbolMappingMsg, WithTsOut, DBN_VERSION, UNDEF_TIMESTAMP, }; use rstest::rstest; use time::macros::{date, datetime}; @@ -508,7 +507,7 @@ mod tests { let mut encoder = DbnEncoder::new( Vec::new(), &MetadataBuilder::new() - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch.to_string()) .schema(Some(Schema::Trades)) .stype_in(Some(SType::RawSymbol)) .stype_out(SType::InstrumentId) @@ -579,7 +578,7 @@ mod tests { let mut encoder = DbnEncoder::new( buffer, &MetadataBuilder::new() - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch.to_string()) .schema(Some(Schema::Ohlcv1S)) .stype_in(Some(SType::RawSymbol)) .stype_out(SType::InstrumentId) @@ -657,7 +656,7 @@ mod tests { let mut encoder = DbnEncoder::new( buffer, &MetadataBuilder::new() - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch.to_string()) .schema(Some(Schema::Ohlcv1S)) .stype_in(Some(SType::RawSymbol)) .stype_out(SType::InstrumentId) @@ -783,7 +782,7 @@ mod tests { let mut encoder = DbnEncoder::new( buffer, &MetadataBuilder::new() - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch.to_string()) .schema(None) // Live: mixed schema .stype_in(Some(SType::RawSymbol)) .stype_out(SType::InstrumentId) diff --git a/rust/dbn-cli/src/lib.rs b/rust/dbn-cli/src/lib.rs index 80a9395..0837f98 100644 --- a/rust/dbn-cli/src/lib.rs +++ b/rust/dbn-cli/src/lib.rs @@ -204,7 +204,7 @@ impl Args { pub fn upgrade_policy(&self) -> VersionUpgradePolicy { if self.should_upgrade { - VersionUpgradePolicy::Upgrade + VersionUpgradePolicy::UpgradeToV2 } else { VersionUpgradePolicy::AsIs } diff --git a/rust/dbn/src/compat.rs b/rust/dbn/src/compat.rs index 74b1783..aadd5a8 100644 --- a/rust/dbn/src/compat.rs +++ b/rust/dbn/src/compat.rs @@ -1,17 +1,4 @@ //! Compatibility shims for different DBN versions. -use std::os::raw::c_char; - -use crate::{ - macros::{dbn_record, CsvSerialize, JsonSerialize}, - record::{transmute_header_bytes, transmute_record_bytes}, - rtype, HasRType, RecordHeader, RecordRef, SecurityUpdateAction, UserDefinedInstrument, - VersionUpgradePolicy, WithTsOut, -}; - -// Dummy derive macro to get around `cfg_attr` incompatibility of several -// of pyo3's attribute macros. See https://github.com/PyO3/pyo3/issues/780 -#[cfg(not(feature = "python"))] -use dbn_macros::MockPyo3; /// The length of symbol fields in DBN version 1 (prior version being phased out). pub const SYMBOL_CSTR_LEN_V1: usize = 22; @@ -32,12 +19,30 @@ pub use crate::record::InstrumentDefMsg as InstrumentDefMsgV2; pub use crate::record::SymbolMappingMsg as SymbolMappingMsgV2; pub use crate::record::SystemMsg as SystemMsgV2; +use std::os::raw::c_char; + +// Dummy derive macro to get around `cfg_attr` incompatibility of several +// of pyo3's attribute macros. See https://github.com/PyO3/pyo3/issues/780 +#[cfg(not(feature = "python"))] +use dbn_macros::MockPyo3; + +use crate::{ + macros::{dbn_record, CsvSerialize, JsonSerialize}, + record::{transmute_header_bytes, transmute_record_bytes}, + rtype, HasRType, RecordHeader, RecordRef, SecurityUpdateAction, UserDefinedInstrument, + VersionUpgradePolicy, WithTsOut, DBN_VERSION, +}; + /// Decodes bytes into a [`RecordRef`], optionally applying conversion from structs /// of a prior DBN version to the current DBN version, according to the `version` and /// `upgrade_policy`. /// +/// # Preconditions +/// This function assumes `version` is valid (not greater than [`DBN_VERSION`]). +/// /// # Panics -/// This function will panic if it's passed only a single partial record in `input`. +/// This function will panic if it's passed only a single partial record in `input` and +/// an upgrade is attempted. It will also panic if `version` is greater than [`DBN_VERSION`]. /// /// # Safety /// Assumes `input` contains a full record. @@ -48,31 +53,40 @@ pub unsafe fn decode_record_ref<'a>( compat_buffer: &'a mut [u8; crate::MAX_RECORD_LEN], input: &'a [u8], ) -> RecordRef<'a> { - if version == 1 && upgrade_policy == VersionUpgradePolicy::Upgrade { - let header = transmute_header_bytes(input).unwrap(); - match header.rtype { - rtype::INSTRUMENT_DEF => { - return upgrade_record::( - ts_out, - compat_buffer, - input, - ); - } - rtype::SYMBOL_MAPPING => { - return upgrade_record::( - ts_out, - compat_buffer, - input, - ); + match (version, upgrade_policy) { + (1, VersionUpgradePolicy::UpgradeToV2) => { + let header = transmute_header_bytes(input).unwrap(); + match header.rtype { + rtype::INSTRUMENT_DEF => { + return upgrade_record::( + ts_out, + compat_buffer, + input, + ); + } + rtype::SYMBOL_MAPPING => { + return upgrade_record::( + ts_out, + compat_buffer, + input, + ); + } + rtype::ERROR => { + return upgrade_record::(ts_out, compat_buffer, input); + } + rtype::SYSTEM => { + return upgrade_record::( + ts_out, + compat_buffer, + input, + ); + } + _ => (), } - rtype::ERROR => { - return upgrade_record::(ts_out, compat_buffer, input); - } - rtype::SYSTEM => { - return upgrade_record::(ts_out, compat_buffer, input); - } - _ => (), } + (2, VersionUpgradePolicy::UpgradeToV2) => {} + (..=DBN_VERSION, VersionUpgradePolicy::AsIs) => {} + _ => panic!("Unsupported version {version}"), } RecordRef::new(input) } @@ -97,8 +111,34 @@ where RecordRef::new(compat_buffer) } +/// A trait for symbol mapping records. +pub trait SymbolMappingRec: HasRType { + /// Returns the input symbol as a `&str`. + /// + /// # Errors + /// This function returns an error if `stype_in_symbol` contains invalid UTF-8. + fn stype_in_symbol(&self) -> crate::Result<&str>; + + /// Returns the output symbol as a `&str`. + /// + /// # Errors + /// This function returns an error if `stype_out_symbol` contains invalid UTF-8. + fn stype_out_symbol(&self) -> crate::Result<&str>; + + /// Parses the raw start of the mapping interval into a datetime. Returns `None` if + /// `start_ts` contains the sentinel for a null timestamp. + fn start_ts(&self) -> Option; + + /// Parses the raw end of the mapping interval into a datetime. Returns `None` if + /// `end_ts` contains the sentinel for a null timestamp. + fn end_ts(&self) -> Option; +} + +// Versioned records need to be defined here to work with cbindgen. + /// Definition of an instrument in DBN version 1. The record of the /// [`Definition`](crate::enums::Schema::Definition) schema. +// cbindgen:export_name=InstrumentDefMsgV1 #[repr(C)] #[derive(Clone, CsvSerialize, JsonSerialize, PartialEq, Eq, Hash)] #[cfg_attr(feature = "trivial_copy", derive(Copy))] @@ -433,216 +473,6 @@ pub struct SystemMsgV1 { pub msg: [c_char; 64], } -impl From<&InstrumentDefMsgV1> for InstrumentDefMsgV2 { - fn from(old: &InstrumentDefMsgV1) -> Self { - let mut res = Self { - // recalculate length - hd: RecordHeader::new::( - rtype::INSTRUMENT_DEF, - old.hd.publisher_id, - old.hd.instrument_id, - old.hd.ts_event, - ), - ts_recv: old.ts_recv, - min_price_increment: old.min_price_increment, - display_factor: old.display_factor, - expiration: old.expiration, - activation: old.activation, - high_limit_price: old.high_limit_price, - low_limit_price: old.low_limit_price, - max_price_variation: old.max_price_variation, - trading_reference_price: old.trading_reference_price, - unit_of_measure_qty: old.unit_of_measure_qty, - min_price_increment_amount: old.min_price_increment_amount, - price_ratio: old.price_ratio, - inst_attrib_value: old.inst_attrib_value, - underlying_id: old.underlying_id, - raw_instrument_id: old.raw_instrument_id, - market_depth_implied: old.market_depth_implied, - market_depth: old.market_depth, - market_segment_id: old.market_segment_id, - max_trade_vol: old.max_trade_vol, - min_lot_size: old.min_lot_size, - min_lot_size_block: old.min_lot_size_block, - min_lot_size_round_lot: old.min_lot_size_round_lot, - min_trade_vol: old.min_trade_vol, - contract_multiplier: old.contract_multiplier, - decay_quantity: old.decay_quantity, - original_contract_size: old.original_contract_size, - trading_reference_date: old.trading_reference_date, - appl_id: old.appl_id, - maturity_year: old.maturity_year, - decay_start_date: old.decay_start_date, - channel_id: old.channel_id, - currency: old.currency, - settl_currency: old.settl_currency, - secsubtype: old.secsubtype, - group: old.group, - exchange: old.exchange, - asset: old.asset, - cfi: old.cfi, - security_type: old.security_type, - unit_of_measure: old.unit_of_measure, - underlying: old.underlying, - strike_price_currency: old.strike_price_currency, - instrument_class: old.instrument_class, - strike_price: old.strike_price, - match_algorithm: old.match_algorithm, - md_security_trading_status: old.md_security_trading_status, - main_fraction: old.main_fraction, - price_display_format: old.price_display_format, - settl_price_type: old.settl_price_type, - sub_fraction: old.sub_fraction, - underlying_product: old.underlying_product, - security_update_action: old.security_update_action as c_char, - maturity_month: old.maturity_month, - maturity_day: old.maturity_day, - maturity_week: old.maturity_week, - user_defined_instrument: old.user_defined_instrument, - contract_multiplier_unit: old.contract_multiplier_unit, - flow_schedule_type: old.flow_schedule_type, - tick_rule: old.tick_rule, - ..Default::default() - }; - // Safety: SYMBOL_CSTR_LEN_V1 is less than SYMBOL_CSTR_LEN - unsafe { - std::ptr::copy_nonoverlapping( - old.raw_symbol.as_ptr(), - res.raw_symbol.as_mut_ptr(), - SYMBOL_CSTR_LEN_V1, - ); - } - res - } -} - -impl From<&ErrorMsgV1> for ErrorMsgV2 { - fn from(old: &ErrorMsgV1) -> Self { - let mut new = Self { - hd: RecordHeader::new::( - rtype::ERROR, - old.hd.publisher_id, - old.hd.instrument_id, - old.hd.ts_event, - ), - ..Default::default() - }; - // Safety: new `err` is longer than older - unsafe { - std::ptr::copy_nonoverlapping(old.err.as_ptr(), new.err.as_mut_ptr(), new.err.len()); - } - new - } -} - -impl From<&SymbolMappingMsgV1> for SymbolMappingMsgV2 { - fn from(old: &SymbolMappingMsgV1) -> Self { - let mut res = Self { - hd: RecordHeader::new::( - rtype::SYMBOL_MAPPING, - old.hd.publisher_id, - old.hd.instrument_id, - old.hd.ts_event, - ), - start_ts: old.start_ts, - end_ts: old.end_ts, - ..Default::default() - }; - // Safety: SYMBOL_CSTR_LEN_V1 is less than SYMBOL_CSTR_LEN - unsafe { - std::ptr::copy_nonoverlapping( - old.stype_in_symbol.as_ptr(), - res.stype_in_symbol.as_mut_ptr(), - SYMBOL_CSTR_LEN_V1, - ); - std::ptr::copy_nonoverlapping( - old.stype_out_symbol.as_ptr(), - res.stype_out_symbol.as_mut_ptr(), - SYMBOL_CSTR_LEN_V1, - ); - } - res - } -} - -impl From<&SystemMsgV1> for SystemMsgV2 { - fn from(old: &SystemMsgV1) -> Self { - let mut new = Self { - hd: RecordHeader::new::( - rtype::SYSTEM, - old.hd.publisher_id, - old.hd.instrument_id, - old.hd.ts_event, - ), - ..Default::default() - }; - // Safety: new `msg` is longer than older - unsafe { - std::ptr::copy_nonoverlapping(old.msg.as_ptr(), new.msg.as_mut_ptr(), new.msg.len()); - } - new - } -} - -/// A trait for symbol mapping records. -pub trait SymbolMappingRec: HasRType { - /// Returns the input symbol as a `&str`. - /// - /// # Errors - /// This function returns an error if `stype_in_symbol` contains invalid UTF-8. - fn stype_in_symbol(&self) -> crate::Result<&str>; - - /// Returns the output symbol as a `&str`. - /// - /// # Errors - /// This function returns an error if `stype_out_symbol` contains invalid UTF-8. - fn stype_out_symbol(&self) -> crate::Result<&str>; - - /// Parses the raw start of the mapping interval into a datetime. Returns `None` if - /// `start_ts` contains the sentinel for a null timestamp. - fn start_ts(&self) -> Option; - - /// Parses the raw end of the mapping interval into a datetime. Returns `None` if - /// `end_ts` contains the sentinel for a null timestamp. - fn end_ts(&self) -> Option; -} - -impl SymbolMappingRec for SymbolMappingMsgV1 { - fn stype_in_symbol(&self) -> crate::Result<&str> { - Self::stype_in_symbol(self) - } - - fn stype_out_symbol(&self) -> crate::Result<&str> { - Self::stype_out_symbol(self) - } - - fn start_ts(&self) -> Option { - Self::start_ts(self) - } - - fn end_ts(&self) -> Option { - Self::end_ts(self) - } -} - -impl SymbolMappingRec for SymbolMappingMsgV2 { - fn stype_in_symbol(&self) -> crate::Result<&str> { - Self::stype_in_symbol(self) - } - - fn stype_out_symbol(&self) -> crate::Result<&str> { - Self::stype_out_symbol(self) - } - - fn start_ts(&self) -> Option { - Self::start_ts(self) - } - - fn end_ts(&self) -> Option { - Self::end_ts(self) - } -} - #[cfg(test)] mod tests { use std::{ffi::c_char, mem}; @@ -709,7 +539,7 @@ mod tests { let res = unsafe { decode_record_ref( 1, - VersionUpgradePolicy::Upgrade, + VersionUpgradePolicy::UpgradeToV2, true, &mut compat_buffer, orig.as_ref(), @@ -740,7 +570,7 @@ mod tests { let res = unsafe { decode_record_ref( 1, - VersionUpgradePolicy::Upgrade, + VersionUpgradePolicy::UpgradeToV2, true, &mut compat_buffer, orig.as_ref(), diff --git a/rust/dbn/src/decode/dbn/async.rs b/rust/dbn/src/decode/dbn/async.rs index c2eedd8..b79d029 100644 --- a/rust/dbn/src/decode/dbn/async.rs +++ b/rust/dbn/src/decode/dbn/async.rs @@ -45,7 +45,7 @@ where decoder: RecordDecoder::with_version( reader, metadata.version, - VersionUpgradePolicy::Upgrade, + VersionUpgradePolicy::UpgradeToV2, metadata.ts_out, )?, metadata, @@ -867,7 +867,7 @@ mod tests { tokio::fs::File::open(format!("{TEST_DATA_PATH}/test_data.definition.v1.dbn")) .await .unwrap(), - VersionUpgradePolicy::Upgrade, + VersionUpgradePolicy::UpgradeToV2, ) .await?; assert_eq!(decoder.metadata().version, crate::DBN_VERSION); diff --git a/rust/dbn/src/decode/dbn/sync.rs b/rust/dbn/src/decode/dbn/sync.rs index ebefcda..409590e 100644 --- a/rust/dbn/src/decode/dbn/sync.rs +++ b/rust/dbn/src/decode/dbn/sync.rs @@ -681,14 +681,13 @@ mod tests { use super::*; use crate::{ compat::InstrumentDefMsgV1, - datasets::XNAS_ITCH, decode::{tests::TEST_DATA_PATH, DynReader}, encode::{ dbn::Encoder, DbnEncodable, DbnRecordEncoder, DynWriter, EncodeDbn, EncodeRecord, }, - rtype, Bbo1MMsg, Bbo1SMsg, Cbbo1SMsg, Cmbp1Msg, Compression, Error, ErrorMsg, ImbalanceMsg, - InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, MetadataBuilder, OhlcvMsg, RecordHeader, - Result, StatMsg, StatusMsg, TbboMsg, TradeMsg, WithTsOut, SYMBOL_CSTR_LEN, + rtype, Bbo1MMsg, Bbo1SMsg, Cbbo1SMsg, Cmbp1Msg, Compression, Dataset, Error, ErrorMsg, + ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, MetadataBuilder, OhlcvMsg, + RecordHeader, Result, StatMsg, StatusMsg, TbboMsg, TradeMsg, WithTsOut, SYMBOL_CSTR_LEN, }; #[test] @@ -1032,7 +1031,7 @@ mod tests { let mut encoder = Encoder::new( &mut buffer, &MetadataBuilder::new() - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch.to_string()) .schema(Some(Schema::Mbo)) .start(0) .stype_in(Some(SType::InstrumentId)) @@ -1100,7 +1099,7 @@ mod tests { #[rstest] #[case::v1_as_is(InstrumentDefMsgV1::default(), VersionUpgradePolicy::AsIs)] - #[case::v1_upgrade(InstrumentDefMsg::default(), VersionUpgradePolicy::Upgrade)] + #[case::v1_upgrade(InstrumentDefMsg::default(), VersionUpgradePolicy::UpgradeToV2)] fn test_decode_multiframe_zst_from_v1( #[case] _r: R, #[case] upgrade_policy: VersionUpgradePolicy, @@ -1129,7 +1128,7 @@ mod tests { fn test_decode_upgrade() -> crate::Result<()> { let decoder = Decoder::with_upgrade_policy( File::open(format!("{TEST_DATA_PATH}/test_data.definition.v1.dbn")).unwrap(), - VersionUpgradePolicy::Upgrade, + VersionUpgradePolicy::UpgradeToV2, )?; assert_eq!(decoder.metadata().version, crate::DBN_VERSION); assert_eq!(decoder.metadata().symbol_cstr_len, crate::SYMBOL_CSTR_LEN); diff --git a/rust/dbn/src/decode/dbz.rs b/rust/dbn/src/decode/dbz.rs index 1ec3376..185fe93 100644 --- a/rust/dbn/src/decode/dbz.rs +++ b/rust/dbn/src/decode/dbz.rs @@ -70,7 +70,7 @@ impl Decoder { /// # Errors /// This function will return an error if it is unable to parse the metadata in `reader`. pub fn new(reader: R) -> crate::Result { - Self::with_upgrade_policy(reader, VersionUpgradePolicy::Upgrade) + Self::with_upgrade_policy(reader, VersionUpgradePolicy::default()) } /// Creates a new DBZ [`Decoder`] from `reader`. It will decode records from diff --git a/rust/dbn/src/encode/dbn/async.rs b/rust/dbn/src/encode/dbn/async.rs index 2b3b407..7d8ed6d 100644 --- a/rust/dbn/src/encode/dbn/async.rs +++ b/rust/dbn/src/encode/dbn/async.rs @@ -489,17 +489,15 @@ mod tests { use super::*; use crate::{ compat::version_symbol_cstr_len, - datasets::{GLBX_MDP3, XNAS_ITCH}, decode::{dbn::AsyncMetadataDecoder as MetadataDecoder, FromLittleEndianSlice}, - enums::{SType, Schema}, - MappingInterval, MetadataBuilder, + Dataset, MappingInterval, MetadataBuilder, SType, Schema, }; #[tokio::test] async fn test_encode_decode_metadata_identity() { let metadata = Metadata { version: crate::DBN_VERSION, - dataset: GLBX_MDP3.to_owned(), + dataset: Dataset::GlbxMdp3.to_string(), schema: Some(Schema::Mbp10), stype_in: Some(SType::RawSymbol), stype_out: SType::InstrumentId, @@ -619,7 +617,7 @@ mod tests { #[tokio::test] async fn test_encode_decode_nulls() { let metadata = MetadataBuilder::new() - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch.to_string()) .schema(Some(Schema::Mbo)) .start(1697240529000000000) .stype_in(Some(SType::RawSymbol)) diff --git a/rust/dbn/src/encode/dbn/sync.rs b/rust/dbn/src/encode/dbn/sync.rs index 8fd92ed..feca253 100644 --- a/rust/dbn/src/encode/dbn/sync.rs +++ b/rust/dbn/src/encode/dbn/sync.rs @@ -437,17 +437,15 @@ mod tests { use super::*; use crate::{ compat::version_symbol_cstr_len, - datasets::{GLBX_MDP3, XNAS_ITCH}, decode::{dbn::MetadataDecoder, FromLittleEndianSlice}, - enums::{SType, Schema}, - MappingInterval, MetadataBuilder, + Dataset, MappingInterval, MetadataBuilder, SType, Schema, }; #[test] fn test_encode_decode_metadata_identity() { let metadata = Metadata { version: crate::DBN_VERSION, - dataset: GLBX_MDP3.to_owned(), + dataset: Dataset::GlbxMdp3.to_string(), schema: Some(Schema::Mbp10), stype_in: Some(SType::RawSymbol), stype_out: SType::InstrumentId, @@ -567,7 +565,7 @@ mod tests { fn test_update_encoded(#[case] version: u8) { let orig_metadata = Metadata { version, - dataset: GLBX_MDP3.to_owned(), + dataset: Dataset::GlbxMdp3.to_string(), schema: Some(Schema::Mbo), stype_in: Some(SType::Parent), stype_out: SType::RawSymbol, @@ -615,7 +613,7 @@ mod tests { fn test_encode_decode_nulls(#[case] version: u8) { let metadata = MetadataBuilder::new() .version(version) - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch) .schema(Some(Schema::Mbo)) .start(1697240529000000000) .stype_in(Some(SType::RawSymbol)) @@ -636,7 +634,7 @@ mod tests { fn test_metadata_min_encoded_size(#[case] version: u8) { let metadata = MetadataBuilder::new() .version(version) - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch) .schema(Some(Schema::Mbo)) .start(1697240529000000000) .stype_in(Some(SType::RawSymbol)) @@ -654,7 +652,7 @@ mod tests { #[rstest] fn test_metadata_calc_size_unconventional_length() { let mut metadata = MetadataBuilder::new() - .dataset(XNAS_ITCH.to_owned()) + .dataset(Dataset::XnasItch) .schema(Some(Schema::Mbo)) .start(1697240529000000000) .stype_in(Some(SType::RawSymbol)) diff --git a/rust/dbn/src/encode/json/sync.rs b/rust/dbn/src/encode/json/sync.rs index e30b27e..cc95a9d 100644 --- a/rust/dbn/src/encode/json/sync.rs +++ b/rust/dbn/src/encode/json/sync.rs @@ -207,7 +207,6 @@ mod tests { use super::*; use crate::{ compat::SYMBOL_CSTR_LEN_V1, - datasets::GLBX_MDP3, encode::test_data::{VecStream, BID_ASK, RECORD_HEADER}, enums::{ rtype, InstrumentClass, SType, Schema, SecurityUpdateAction, StatType, @@ -217,7 +216,7 @@ mod tests { str_to_c_chars, ErrorMsg, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg, RecordHeader, StatMsg, StatusMsg, TradeMsg, WithTsOut, }, - MappingInterval, RecordRef, SymbolMapping, FIXED_PRICE_SCALE, + Dataset, MappingInterval, RecordRef, SymbolMapping, FIXED_PRICE_SCALE, }; fn write_json_to_string( @@ -614,7 +613,7 @@ mod tests { fn test_metadata_write_json() { let metadata = Metadata { version: 1, - dataset: GLBX_MDP3.to_owned(), + dataset: Dataset::GlbxMdp3.to_string(), schema: Some(Schema::Ohlcv1H), start: 1662734705128748281, end: NonZeroU64::new(1662734720914876944), diff --git a/rust/dbn/src/enums.rs b/rust/dbn/src/enums.rs index 3515d91..fdc22c3 100644 --- a/rust/dbn/src/enums.rs +++ b/rust/dbn/src/enums.rs @@ -2,10 +2,7 @@ //! Enums used in Databento APIs. -use std::{ - fmt::{self, Display, Formatter}, - str::FromStr, -}; +use std::fmt::{self, Display, Formatter}; // Dummy derive macro to get around `cfg_attr` incompatibility of several // of pyo3's attribute macros. See https://github.com/PyO3/pyo3/issues/780 @@ -1139,7 +1136,7 @@ impl From> for TriState { } } -/// How to handle decoding DBN data from a prior version. +/// How to handle decoding DBN data from other versions. #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)] #[cfg_attr( feature = "python", @@ -1148,25 +1145,14 @@ impl From> for TriState { )] #[non_exhaustive] pub enum VersionUpgradePolicy { - /// Decode data from previous versions as-is. + /// Decode data from all supported versions (less than or equal to + /// [`DBN_VERSION`](crate::DBN_VERSION)) as-is. AsIs, - /// Decode data from previous versions converting it to the latest version. This - /// breaks zero-copy decoding for structs that need updating, but makes usage - /// simpler. + /// Decode and convert data from DBN versions prior to version 2 to that version. + /// Attempting to decode data from newer versions (when they're introduced) will + /// fail. #[default] - Upgrade, -} - -impl FromStr for VersionUpgradePolicy { - type Err = crate::Error; - - fn from_str(s: &str) -> Result { - match s { - "AsIs" => Ok(Self::AsIs), - "Upgrade" => Ok(Self::Upgrade), - _ => Err(crate::Error::conversion::(s)), - } - } + UpgradeToV2, } #[cfg(feature = "serde")] diff --git a/rust/dbn/src/lib.rs b/rust/dbn/src/lib.rs index 5b550f8..76c81bf 100644 --- a/rust/dbn/src/lib.rs +++ b/rust/dbn/src/lib.rs @@ -54,6 +54,8 @@ pub mod record; mod record_enum; pub mod record_ref; pub mod symbol_map; +pub mod v1; +pub mod v2; pub use crate::{ enums::{ @@ -104,6 +106,7 @@ pub const UNDEF_TIMESTAMP: u64 = u64::MAX; pub const MAX_RECORD_LEN: usize = std::mem::size_of::>(); /// Contains dataset code constants. +#[deprecated(since = "0.25.0", note = "Use the `Dataset` enum instead")] pub mod datasets { use crate::publishers::Dataset; diff --git a/rust/dbn/src/macros.rs b/rust/dbn/src/macros.rs index 9d2c854..2ae0661 100644 --- a/rust/dbn/src/macros.rs +++ b/rust/dbn/src/macros.rs @@ -29,31 +29,29 @@ macro_rules! rtype_dispatch_base { RType::Imbalance => $handler!(ImbalanceMsg), RType::Status => $handler!(StatusMsg), RType::InstrumentDef => { - // TODO(carter): remove temporary version handling if $rec_ref.record_size() < std::mem::size_of::() { - $handler!($crate::compat::InstrumentDefMsgV1) + $handler!($crate::v1::InstrumentDefMsg) } else { $handler!(InstrumentDefMsg) } } RType::SymbolMapping => { - // TODO(carter): remove temporary version handling if $rec_ref.record_size() < std::mem::size_of::() { - $handler!($crate::compat::SymbolMappingMsgV1) + $handler!($crate::v1::SymbolMappingMsg) } else { $handler!(SymbolMappingMsg) } } RType::Error => { if $rec_ref.record_size() < std::mem::size_of::() { - $handler!($crate::compat::ErrorMsgV1) + $handler!($crate::v1::ErrorMsg) } else { $handler!(ErrorMsg) } } RType::System => { if $rec_ref.record_size() < std::mem::size_of::() { - $handler!($crate::compat::SystemMsgV1) + $handler!($crate::v1::SystemMsg) } else { $handler!(SystemMsg) } diff --git a/rust/dbn/src/metadata.rs b/rust/dbn/src/metadata.rs index 295dfca..a5e34b3 100644 --- a/rust/dbn/src/metadata.rs +++ b/rust/dbn/src/metadata.rs @@ -120,7 +120,8 @@ impl Metadata { /// Upgrades the metadata according to `upgrade_policy` if necessary. pub fn upgrade(&mut self, upgrade_policy: VersionUpgradePolicy) { - if self.version < crate::DBN_VERSION && upgrade_policy == VersionUpgradePolicy::Upgrade { + if self.version < crate::DBN_VERSION && upgrade_policy == VersionUpgradePolicy::UpgradeToV2 + { self.version = crate::DBN_VERSION; self.symbol_cstr_len = crate::SYMBOL_CSTR_LEN; } @@ -136,7 +137,7 @@ impl Metadata { self.version = input_version; self.symbol_cstr_len = crate::compat::SYMBOL_CSTR_LEN_V1; } - VersionUpgradePolicy::Upgrade => { + VersionUpgradePolicy::UpgradeToV2 => { self.version = crate::DBN_VERSION; self.symbol_cstr_len = crate::SYMBOL_CSTR_LEN; } @@ -197,10 +198,13 @@ impl MetadataBuilder { } /// Sets [`dataset`](Metadata::dataset) and returns the builder. - pub fn dataset(self, dataset: String) -> MetadataBuilder { + pub fn dataset( + self, + dataset: impl ToString, + ) -> MetadataBuilder { MetadataBuilder { version: self.version, - dataset, + dataset: dataset.to_string(), schema: self.schema, start: self.start, end: self.end, diff --git a/rust/dbn/src/record.rs b/rust/dbn/src/record.rs index f73887b..828c386 100644 --- a/rust/dbn/src/record.rs +++ b/rust/dbn/src/record.rs @@ -1,5 +1,5 @@ //! Market data types for encoding different Databento [`Schema`](crate::enums::Schema)s -//! and conversion functions. +//! in the most recent DBN version, as well as conversion functions. pub(crate) mod conv; mod impl_default; diff --git a/rust/dbn/src/record/impl_default.rs b/rust/dbn/src/record/impl_default.rs index 4231edf..8304566 100644 --- a/rust/dbn/src/record/impl_default.rs +++ b/rust/dbn/src/record/impl_default.rs @@ -1,7 +1,6 @@ use std::ffi::c_char; use crate::{ - compat::{ErrorMsgV1, InstrumentDefMsgV1, SymbolMappingMsgV1, SystemMsgV1, SYMBOL_CSTR_LEN_V1}, Schema, StatusAction, StatusReason, TradingEvent, TriState, UNDEF_ORDER_SIZE, UNDEF_PRICE, UNDEF_STAT_QUANTITY, UNDEF_TIMESTAMP, }; @@ -270,79 +269,6 @@ impl Default for InstrumentDefMsg { } } -impl Default for InstrumentDefMsgV1 { - fn default() -> Self { - Self { - hd: RecordHeader::default::(rtype::INSTRUMENT_DEF), - ts_recv: UNDEF_TIMESTAMP, - min_price_increment: UNDEF_PRICE, - display_factor: UNDEF_PRICE, - expiration: UNDEF_TIMESTAMP, - activation: UNDEF_TIMESTAMP, - high_limit_price: UNDEF_PRICE, - low_limit_price: UNDEF_PRICE, - max_price_variation: UNDEF_PRICE, - trading_reference_price: UNDEF_PRICE, - unit_of_measure_qty: UNDEF_PRICE, - min_price_increment_amount: UNDEF_PRICE, - price_ratio: UNDEF_PRICE, - inst_attrib_value: i32::MAX, - underlying_id: 0, - raw_instrument_id: 0, - market_depth_implied: i32::MAX, - market_depth: i32::MAX, - market_segment_id: u32::MAX, - max_trade_vol: u32::MAX, - min_lot_size: i32::MAX, - min_lot_size_block: i32::MAX, - min_lot_size_round_lot: i32::MAX, - min_trade_vol: u32::MAX, - contract_multiplier: i32::MAX, - decay_quantity: i32::MAX, - original_contract_size: i32::MAX, - trading_reference_date: u16::MAX, - appl_id: i16::MAX, - maturity_year: u16::MAX, - decay_start_date: u16::MAX, - channel_id: u16::MAX, - currency: Default::default(), - settl_currency: Default::default(), - secsubtype: Default::default(), - raw_symbol: Default::default(), - group: Default::default(), - exchange: Default::default(), - asset: Default::default(), - cfi: Default::default(), - security_type: Default::default(), - unit_of_measure: Default::default(), - underlying: Default::default(), - strike_price_currency: Default::default(), - instrument_class: 0, - strike_price: UNDEF_PRICE, - match_algorithm: MatchAlgorithm::Undefined as c_char, - md_security_trading_status: u8::MAX, - main_fraction: u8::MAX, - price_display_format: u8::MAX, - settl_price_type: u8::MAX, - sub_fraction: u8::MAX, - underlying_product: u8::MAX, - security_update_action: SecurityUpdateAction::Add, - maturity_month: u8::MAX, - maturity_day: u8::MAX, - maturity_week: u8::MAX, - user_defined_instrument: UserDefinedInstrument::No, - contract_multiplier_unit: i8::MAX, - flow_schedule_type: i8::MAX, - tick_rule: u8::MAX, - _reserved2: Default::default(), - _reserved3: Default::default(), - _reserved4: Default::default(), - _reserved5: Default::default(), - _dummy: Default::default(), - } - } -} - impl Default for ImbalanceMsg { fn default() -> Self { Self { @@ -391,15 +317,6 @@ impl Default for StatMsg { } } -impl Default for ErrorMsgV1 { - fn default() -> Self { - Self { - hd: RecordHeader::default::(rtype::ERROR), - err: [0; 64], - } - } -} - impl Default for ErrorMsg { fn default() -> Self { Self { @@ -425,19 +342,6 @@ impl Default for SymbolMappingMsg { } } -impl Default for SymbolMappingMsgV1 { - fn default() -> Self { - Self { - hd: RecordHeader::default::(rtype::SYMBOL_MAPPING), - stype_in_symbol: [0; SYMBOL_CSTR_LEN_V1], - stype_out_symbol: [0; SYMBOL_CSTR_LEN_V1], - _dummy: Default::default(), - start_ts: UNDEF_TIMESTAMP, - end_ts: UNDEF_TIMESTAMP, - } - } -} - impl Default for SystemMsg { fn default() -> Self { Self { @@ -447,12 +351,3 @@ impl Default for SystemMsg { } } } - -impl Default for SystemMsgV1 { - fn default() -> Self { - Self { - hd: RecordHeader::default::(rtype::SYSTEM), - msg: [0; 64], - } - } -} diff --git a/rust/dbn/src/record/methods.rs b/rust/dbn/src/record/methods.rs index a8707a2..ede69bc 100644 --- a/rust/dbn/src/record/methods.rs +++ b/rust/dbn/src/record/methods.rs @@ -3,7 +3,6 @@ use std::fmt::Debug; use num_enum::TryFromPrimitive; use crate::{ - compat::{ErrorMsgV1, InstrumentDefMsgV1, SymbolMappingMsgV1, SystemMsgV1}, enums::{StatusAction, StatusReason}, pretty::px_to_f64, SType, TradingEvent, TriState, @@ -648,146 +647,6 @@ impl InstrumentDefMsg { } } -impl InstrumentDefMsgV1 { - /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` - /// if `ts_recv` contains the sentinel for a null timestamp. - pub fn ts_recv(&self) -> Option { - ts_to_dt(self.ts_recv) - } - - /// Parses the raw last eligible trade time into a datetime. Returns `None` if - /// `expiration` contains the sentinel for a null timestamp. - pub fn expiration(&self) -> Option { - ts_to_dt(self.expiration) - } - - /// Parses the raw time of instrument action into a datetime. Returns `None` if - /// `activation` contains the sentinel for a null timestamp. - pub fn activation(&self) -> Option { - ts_to_dt(self.activation) - } - - /// Returns currency used for price fields as a `&str`. - /// - /// # Errors - /// This function returns an error if `currency` contains invalid UTF-8. - pub fn currency(&self) -> Result<&str> { - c_chars_to_str(&self.currency) - } - - /// Returns currency used for settlement as a `&str`. - /// - /// # Errors - /// This function returns an error if `settl_currency` contains invalid UTF-8. - pub fn settl_currency(&self) -> Result<&str> { - c_chars_to_str(&self.settl_currency) - } - - /// Returns the strategy type of the spread as a `&str`. - /// - /// # Errors - /// This function returns an error if `secsubtype` contains invalid UTF-8. - pub fn secsubtype(&self) -> Result<&str> { - c_chars_to_str(&self.secsubtype) - } - - /// Returns the instrument raw symbol assigned by the publisher as a `&str`. - /// - /// # Errors - /// This function returns an error if `raw_symbol` contains invalid UTF-8. - pub fn raw_symbol(&self) -> Result<&str> { - c_chars_to_str(&self.raw_symbol) - } - - /// Returns exchange used to identify the instrument as a `&str`. - /// - /// # Errors - /// This function returns an error if `exchange` contains invalid UTF-8. - pub fn exchange(&self) -> Result<&str> { - c_chars_to_str(&self.exchange) - } - - /// Returns the underlying asset code (product code) of the instrument as a `&str`. - /// - /// # Errors - /// This function returns an error if `asset` contains invalid UTF-8. - pub fn asset(&self) -> Result<&str> { - c_chars_to_str(&self.asset) - } - - /// Returns the ISO standard instrument categorization code as a `&str`. - /// - /// # Errors - /// This function returns an error if `cfi` contains invalid UTF-8. - pub fn cfi(&self) -> Result<&str> { - c_chars_to_str(&self.cfi) - } - - /// Returns the type of the strument, e.g. FUT for future or future spread as - /// a `&str`. - /// - /// # Errors - /// This function returns an error if `security_type` contains invalid UTF-8. - pub fn security_type(&self) -> Result<&str> { - c_chars_to_str(&self.security_type) - } - - /// Returns the unit of measure for the instrument's original contract size, e.g. - /// USD or LBS, as a `&str`. - /// - /// # Errors - /// This function returns an error if `unit_of_measure` contains invalid UTF-8. - pub fn unit_of_measure(&self) -> Result<&str> { - c_chars_to_str(&self.unit_of_measure) - } - - /// Returns the symbol of the first underlying instrument as a `&str`. - /// - /// # Errors - /// This function returns an error if `underlying` contains invalid UTF-8. - pub fn underlying(&self) -> Result<&str> { - c_chars_to_str(&self.underlying) - } - - /// Returns the currency of [`strike_price`](Self::strike_price) as a `&str`. - /// - /// # Errors - /// This function returns an error if `strike_price_currency` contains invalid UTF-8. - pub fn strike_price_currency(&self) -> Result<&str> { - c_chars_to_str(&self.strike_price_currency) - } - - /// Returns the security group code of the instrumnet as a `&str`. - /// - /// # Errors - /// This function returns an error if `group` contains invalid UTF-8. - pub fn group(&self) -> Result<&str> { - c_chars_to_str(&self.group) - } - - /// Tries to convert the raw classification of the instrument to an enum. - /// - /// # Errors - /// This function returns an error if the `instrument_class` field does not - /// contain a valid [`InstrumentClass`]. - pub fn instrument_class(&self) -> Result { - InstrumentClass::try_from(self.instrument_class as u8).map_err(|_| { - Error::conversion::(format!("{:#04X}", self.instrument_class as u8)) - }) - } - - /// Tries to convert the raw matching algorithm used for the instrument to an enum. - /// - /// # Errors - /// This function returns an error if the `match_algorithm` field does not - /// contain a valid [`MatchAlgorithm`]. - pub fn match_algorithm(&self) -> Result { - MatchAlgorithm::try_from(self.match_algorithm as u8).map_err(|_| { - Error::conversion::(format!("{:#04X}", self.match_algorithm as u8)) - }) - } -} - impl ImbalanceMsg { /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` /// if `ts_recv` contains the sentinel for a null timestamp. @@ -866,32 +725,6 @@ impl StatMsg { } } -impl ErrorMsgV1 { - /// Creates a new `ErrorMsgV1`. - /// - /// # Errors - /// This function returns an error if `msg` is too long. - pub fn new(ts_event: u64, msg: &str) -> Self { - let mut error = Self { - hd: RecordHeader::new::(rtype::ERROR, 0, 0, ts_event), - ..Default::default() - }; - // leave at least one null byte - for (i, byte) in msg.as_bytes().iter().take(error.err.len() - 1).enumerate() { - error.err[i] = *byte as c_char; - } - error - } - - /// Returns `err` as a `&str`. - /// - /// # Errors - /// This function returns an error if `err` contains invalid UTF-8. - pub fn err(&self) -> Result<&str> { - c_chars_to_str(&self.err) - } -} - impl ErrorMsg { /// Creates a new `ErrorMsg`. /// @@ -993,62 +826,8 @@ impl SymbolMappingMsg { } } -impl SymbolMappingMsgV1 { - /// Creates a new `SymbolMappingMsg`. - /// - /// # Errors - /// This function returns an error if `stype_in_symbol` or `stype_out_symbol` - /// contain more than maximum number of characters of 21. - pub fn new( - instrument_id: u32, - ts_event: u64, - stype_in_symbol: &str, - stype_out_symbol: &str, - start_ts: u64, - end_ts: u64, - ) -> crate::Result { - Ok(Self { - // symbol mappings aren't publisher-specific - hd: RecordHeader::new::(rtype::SYMBOL_MAPPING, 0, instrument_id, ts_event), - stype_in_symbol: str_to_c_chars(stype_in_symbol)?, - stype_out_symbol: str_to_c_chars(stype_out_symbol)?, - _dummy: Default::default(), - start_ts, - end_ts, - }) - } - - /// Returns the input symbol as a `&str`. - /// - /// # Errors - /// This function returns an error if `stype_in_symbol` contains invalid UTF-8. - pub fn stype_in_symbol(&self) -> Result<&str> { - c_chars_to_str(&self.stype_in_symbol) - } - - /// Returns the output symbol as a `&str`. - /// - /// # Errors - /// This function returns an error if `stype_out_symbol` contains invalid UTF-8. - pub fn stype_out_symbol(&self) -> Result<&str> { - c_chars_to_str(&self.stype_out_symbol) - } - - /// Parses the raw start of the mapping interval into a datetime. Returns `None` if - /// `start_ts` contains the sentinel for a null timestamp. - pub fn start_ts(&self) -> Option { - ts_to_dt(self.start_ts) - } - - /// Parses the raw end of the mapping interval into a datetime. Returns `None` if - /// `end_ts` contains the sentinel for a null timestamp. - pub fn end_ts(&self) -> Option { - ts_to_dt(self.end_ts) - } -} - impl SystemMsg { - const HEARTBEAT: &'static str = "Heartbeat"; + pub(crate) const HEARTBEAT: &'static str = "Heartbeat"; /// Creates a new `SystemMsg`. /// @@ -1088,43 +867,6 @@ impl SystemMsg { } } -impl SystemMsgV1 { - /// Creates a new `SystemMsgV1`. - /// - /// # Errors - /// This function returns an error if `msg` is too long. - pub fn new(ts_event: u64, msg: &str) -> Result { - Ok(Self { - hd: RecordHeader::new::(rtype::SYSTEM, 0, 0, ts_event), - msg: str_to_c_chars(msg)?, - }) - } - - /// Creates a new heartbeat `SystemMsg`. - pub fn heartbeat(ts_event: u64) -> Self { - Self { - hd: RecordHeader::new::(rtype::SYSTEM, 0, 0, ts_event), - msg: str_to_c_chars(SystemMsg::HEARTBEAT).unwrap(), - } - } - - /// Checks whether the message is a heartbeat from the gateway. - pub fn is_heartbeat(&self) -> bool { - self.msg() - .map(|msg| msg == SystemMsg::HEARTBEAT) - .unwrap_or_default() - } - - /// Returns the message from the Databento Live Subscription Gateway (LSG) as - /// a `&str`. - /// - /// # Errors - /// This function returns an error if `msg` contains invalid UTF-8. - pub fn msg(&self) -> Result<&str> { - c_chars_to_str(&self.msg) - } -} - impl Record for WithTsOut { fn header(&self) -> &RecordHeader { self.rec.header() diff --git a/rust/dbn/src/symbol_map.rs b/rust/dbn/src/symbol_map.rs index d470f54..da706a9 100644 --- a/rust/dbn/src/symbol_map.rs +++ b/rust/dbn/src/symbol_map.rs @@ -4,7 +4,9 @@ use std::{cmp::Ordering, collections::HashMap, ops::Deref, sync::Arc}; use time::{macros::time, PrimitiveDateTime}; -use crate::{compat, Error, HasRType, Metadata, RType, Record, RecordRef, SType, SymbolMappingMsg}; +use crate::{ + compat, v1, Error, HasRType, Metadata, RType, Record, RecordRef, SType, SymbolMappingMsg, +}; /// A timeseries symbol map. Generally useful for working with historical data /// and is commonly built from a [`Metadata`] object via [`Self::from_metadata()`]. @@ -236,7 +238,7 @@ impl PitSymbolMap { self.on_symbol_mapping(unsafe { record.get_unchecked::() }) } else { // Use get here to get still perform length checks - self.on_symbol_mapping(record.get::().unwrap()) + self.on_symbol_mapping(record.get::().unwrap()) } } else { Ok(()) diff --git a/rust/dbn/src/v1.rs b/rust/dbn/src/v1.rs new file mode 100644 index 0000000..04384c1 --- /dev/null +++ b/rust/dbn/src/v1.rs @@ -0,0 +1,35 @@ +//! Record data types for encoding different Databento [`Schema`](crate::enums::Schema)s +//! in DBN version 1. + +pub use crate::compat::ErrorMsgV1 as ErrorMsg; +pub use crate::compat::InstrumentDefMsgV1 as InstrumentDefMsg; +pub use crate::compat::SymbolMappingMsgV1 as SymbolMappingMsg; +pub use crate::compat::SystemMsgV1 as SystemMsg; +pub use crate::compat::SYMBOL_CSTR_LEN_V1 as SYMBOL_CSTR_LEN; +pub use crate::record::{ + Bbo1MMsg, Bbo1SMsg, BboMsg, Cbbo1MMsg, Cbbo1SMsg, CbboMsg, Cmbp1Msg, ImbalanceMsg, MboMsg, + OhlcvMsg, StatMsg, StatusMsg, TbboMsg, TcbboMsg, TradeMsg, WithTsOut, +}; + +mod impl_default; +mod methods; + +use crate::compat::SymbolMappingRec; + +impl SymbolMappingRec for SymbolMappingMsg { + fn stype_in_symbol(&self) -> crate::Result<&str> { + Self::stype_in_symbol(self) + } + + fn stype_out_symbol(&self) -> crate::Result<&str> { + Self::stype_out_symbol(self) + } + + fn start_ts(&self) -> Option { + Self::start_ts(self) + } + + fn end_ts(&self) -> Option { + Self::end_ts(self) + } +} diff --git a/rust/dbn/src/v1/impl_default.rs b/rust/dbn/src/v1/impl_default.rs new file mode 100644 index 0000000..f1959a3 --- /dev/null +++ b/rust/dbn/src/v1/impl_default.rs @@ -0,0 +1,111 @@ +use std::os::raw::c_char; + +use crate::{ + rtype, MatchAlgorithm, RecordHeader, SecurityUpdateAction, UserDefinedInstrument, UNDEF_PRICE, + UNDEF_TIMESTAMP, +}; + +use super::{ErrorMsg, InstrumentDefMsg, SymbolMappingMsg, SystemMsg, SYMBOL_CSTR_LEN}; + +impl Default for InstrumentDefMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::INSTRUMENT_DEF), + ts_recv: UNDEF_TIMESTAMP, + min_price_increment: UNDEF_PRICE, + display_factor: UNDEF_PRICE, + expiration: UNDEF_TIMESTAMP, + activation: UNDEF_TIMESTAMP, + high_limit_price: UNDEF_PRICE, + low_limit_price: UNDEF_PRICE, + max_price_variation: UNDEF_PRICE, + trading_reference_price: UNDEF_PRICE, + unit_of_measure_qty: UNDEF_PRICE, + min_price_increment_amount: UNDEF_PRICE, + price_ratio: UNDEF_PRICE, + inst_attrib_value: i32::MAX, + underlying_id: 0, + raw_instrument_id: 0, + market_depth_implied: i32::MAX, + market_depth: i32::MAX, + market_segment_id: u32::MAX, + max_trade_vol: u32::MAX, + min_lot_size: i32::MAX, + min_lot_size_block: i32::MAX, + min_lot_size_round_lot: i32::MAX, + min_trade_vol: u32::MAX, + contract_multiplier: i32::MAX, + decay_quantity: i32::MAX, + original_contract_size: i32::MAX, + trading_reference_date: u16::MAX, + appl_id: i16::MAX, + maturity_year: u16::MAX, + decay_start_date: u16::MAX, + channel_id: u16::MAX, + currency: Default::default(), + settl_currency: Default::default(), + secsubtype: Default::default(), + raw_symbol: Default::default(), + group: Default::default(), + exchange: Default::default(), + asset: Default::default(), + cfi: Default::default(), + security_type: Default::default(), + unit_of_measure: Default::default(), + underlying: Default::default(), + strike_price_currency: Default::default(), + instrument_class: 0, + strike_price: UNDEF_PRICE, + match_algorithm: MatchAlgorithm::Undefined as c_char, + md_security_trading_status: u8::MAX, + main_fraction: u8::MAX, + price_display_format: u8::MAX, + settl_price_type: u8::MAX, + sub_fraction: u8::MAX, + underlying_product: u8::MAX, + security_update_action: SecurityUpdateAction::Add, + maturity_month: u8::MAX, + maturity_day: u8::MAX, + maturity_week: u8::MAX, + user_defined_instrument: UserDefinedInstrument::No, + contract_multiplier_unit: i8::MAX, + flow_schedule_type: i8::MAX, + tick_rule: u8::MAX, + _reserved2: Default::default(), + _reserved3: Default::default(), + _reserved4: Default::default(), + _reserved5: Default::default(), + _dummy: Default::default(), + } + } +} + +impl Default for ErrorMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::ERROR), + err: [0; 64], + } + } +} +impl Default for SymbolMappingMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::SYMBOL_MAPPING), + stype_in_symbol: [0; SYMBOL_CSTR_LEN], + stype_out_symbol: [0; SYMBOL_CSTR_LEN], + _dummy: Default::default(), + start_ts: UNDEF_TIMESTAMP, + end_ts: UNDEF_TIMESTAMP, + } + } +} + +impl Default for SystemMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::SYSTEM), + msg: [0; 64], + } + } +} diff --git a/rust/dbn/src/v1/methods.rs b/rust/dbn/src/v1/methods.rs new file mode 100644 index 0000000..cb9e4b2 --- /dev/null +++ b/rust/dbn/src/v1/methods.rs @@ -0,0 +1,265 @@ +use std::os::raw::c_char; + +use crate::{ + record::{c_chars_to_str, str_to_c_chars, ts_to_dt}, + rtype, Error, InstrumentClass, MatchAlgorithm, RecordHeader, Result, +}; + +use super::{ErrorMsg, InstrumentDefMsg, SymbolMappingMsg, SystemMsg}; + +impl InstrumentDefMsg { + /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` + /// if `ts_recv` contains the sentinel for a null timestamp. + pub fn ts_recv(&self) -> Option { + ts_to_dt(self.ts_recv) + } + + /// Parses the raw last eligible trade time into a datetime. Returns `None` if + /// `expiration` contains the sentinel for a null timestamp. + pub fn expiration(&self) -> Option { + ts_to_dt(self.expiration) + } + + /// Parses the raw time of instrument action into a datetime. Returns `None` if + /// `activation` contains the sentinel for a null timestamp. + pub fn activation(&self) -> Option { + ts_to_dt(self.activation) + } + + /// Returns currency used for price fields as a `&str`. + /// + /// # Errors + /// This function returns an error if `currency` contains invalid UTF-8. + pub fn currency(&self) -> Result<&str> { + c_chars_to_str(&self.currency) + } + + /// Returns currency used for settlement as a `&str`. + /// + /// # Errors + /// This function returns an error if `settl_currency` contains invalid UTF-8. + pub fn settl_currency(&self) -> Result<&str> { + c_chars_to_str(&self.settl_currency) + } + + /// Returns the strategy type of the spread as a `&str`. + /// + /// # Errors + /// This function returns an error if `secsubtype` contains invalid UTF-8. + pub fn secsubtype(&self) -> Result<&str> { + c_chars_to_str(&self.secsubtype) + } + + /// Returns the instrument raw symbol assigned by the publisher as a `&str`. + /// + /// # Errors + /// This function returns an error if `raw_symbol` contains invalid UTF-8. + pub fn raw_symbol(&self) -> Result<&str> { + c_chars_to_str(&self.raw_symbol) + } + + /// Returns exchange used to identify the instrument as a `&str`. + /// + /// # Errors + /// This function returns an error if `exchange` contains invalid UTF-8. + pub fn exchange(&self) -> Result<&str> { + c_chars_to_str(&self.exchange) + } + + /// Returns the underlying asset code (product code) of the instrument as a `&str`. + /// + /// # Errors + /// This function returns an error if `asset` contains invalid UTF-8. + pub fn asset(&self) -> Result<&str> { + c_chars_to_str(&self.asset) + } + + /// Returns the ISO standard instrument categorization code as a `&str`. + /// + /// # Errors + /// This function returns an error if `cfi` contains invalid UTF-8. + pub fn cfi(&self) -> Result<&str> { + c_chars_to_str(&self.cfi) + } + + /// Returns the type of the strument, e.g. FUT for future or future spread as + /// a `&str`. + /// + /// # Errors + /// This function returns an error if `security_type` contains invalid UTF-8. + pub fn security_type(&self) -> Result<&str> { + c_chars_to_str(&self.security_type) + } + + /// Returns the unit of measure for the instrument's original contract size, e.g. + /// USD or LBS, as a `&str`. + /// + /// # Errors + /// This function returns an error if `unit_of_measure` contains invalid UTF-8. + pub fn unit_of_measure(&self) -> Result<&str> { + c_chars_to_str(&self.unit_of_measure) + } + + /// Returns the symbol of the first underlying instrument as a `&str`. + /// + /// # Errors + /// This function returns an error if `underlying` contains invalid UTF-8. + pub fn underlying(&self) -> Result<&str> { + c_chars_to_str(&self.underlying) + } + + /// Returns the currency of [`strike_price`](Self::strike_price) as a `&str`. + /// + /// # Errors + /// This function returns an error if `strike_price_currency` contains invalid UTF-8. + pub fn strike_price_currency(&self) -> Result<&str> { + c_chars_to_str(&self.strike_price_currency) + } + + /// Returns the security group code of the instrumnet as a `&str`. + /// + /// # Errors + /// This function returns an error if `group` contains invalid UTF-8. + pub fn group(&self) -> Result<&str> { + c_chars_to_str(&self.group) + } + + /// Tries to convert the raw classification of the instrument to an enum. + /// + /// # Errors + /// This function returns an error if the `instrument_class` field does not + /// contain a valid [`InstrumentClass`]. + pub fn instrument_class(&self) -> Result { + InstrumentClass::try_from(self.instrument_class as u8).map_err(|_| { + Error::conversion::(format!("{:#04X}", self.instrument_class as u8)) + }) + } + + /// Tries to convert the raw matching algorithm used for the instrument to an enum. + /// + /// # Errors + /// This function returns an error if the `match_algorithm` field does not + /// contain a valid [`MatchAlgorithm`]. + pub fn match_algorithm(&self) -> Result { + MatchAlgorithm::try_from(self.match_algorithm as u8).map_err(|_| { + Error::conversion::(format!("{:#04X}", self.match_algorithm as u8)) + }) + } +} + +impl ErrorMsg { + /// Creates a new `ErrorMsgV1`. + /// + /// # Errors + /// This function returns an error if `msg` is too long. + pub fn new(ts_event: u64, msg: &str) -> Self { + let mut error = Self { + hd: RecordHeader::new::(rtype::ERROR, 0, 0, ts_event), + ..Default::default() + }; + // leave at least one null byte + for (i, byte) in msg.as_bytes().iter().take(error.err.len() - 1).enumerate() { + error.err[i] = *byte as c_char; + } + error + } + + /// Returns `err` as a `&str`. + /// + /// # Errors + /// This function returns an error if `err` contains invalid UTF-8. + pub fn err(&self) -> Result<&str> { + c_chars_to_str(&self.err) + } +} + +impl SymbolMappingMsg { + /// Creates a new `SymbolMappingMsg`. + /// + /// # Errors + /// This function returns an error if `stype_in_symbol` or `stype_out_symbol` + /// contain more than maximum number of characters of 21. + pub fn new( + instrument_id: u32, + ts_event: u64, + stype_in_symbol: &str, + stype_out_symbol: &str, + start_ts: u64, + end_ts: u64, + ) -> crate::Result { + Ok(Self { + // symbol mappings aren't publisher-specific + hd: RecordHeader::new::(rtype::SYMBOL_MAPPING, 0, instrument_id, ts_event), + stype_in_symbol: str_to_c_chars(stype_in_symbol)?, + stype_out_symbol: str_to_c_chars(stype_out_symbol)?, + _dummy: Default::default(), + start_ts, + end_ts, + }) + } + + /// Returns the input symbol as a `&str`. + /// + /// # Errors + /// This function returns an error if `stype_in_symbol` contains invalid UTF-8. + pub fn stype_in_symbol(&self) -> Result<&str> { + c_chars_to_str(&self.stype_in_symbol) + } + + /// Returns the output symbol as a `&str`. + /// + /// # Errors + /// This function returns an error if `stype_out_symbol` contains invalid UTF-8. + pub fn stype_out_symbol(&self) -> Result<&str> { + c_chars_to_str(&self.stype_out_symbol) + } + + /// Parses the raw start of the mapping interval into a datetime. Returns `None` if + /// `start_ts` contains the sentinel for a null timestamp. + pub fn start_ts(&self) -> Option { + ts_to_dt(self.start_ts) + } + + /// Parses the raw end of the mapping interval into a datetime. Returns `None` if + /// `end_ts` contains the sentinel for a null timestamp. + pub fn end_ts(&self) -> Option { + ts_to_dt(self.end_ts) + } +} + +impl SystemMsg { + /// Creates a new `SystemMsgV1`. + /// + /// # Errors + /// This function returns an error if `msg` is too long. + pub fn new(ts_event: u64, msg: &str) -> Result { + Ok(Self { + hd: RecordHeader::new::(rtype::SYSTEM, 0, 0, ts_event), + msg: str_to_c_chars(msg)?, + }) + } + + /// Creates a new heartbeat `SystemMsg`. + pub fn heartbeat(ts_event: u64) -> Self { + Self { + hd: RecordHeader::new::(rtype::SYSTEM, 0, 0, ts_event), + msg: str_to_c_chars(crate::SystemMsg::HEARTBEAT).unwrap(), + } + } + + /// Checks whether the message is a heartbeat from the gateway. + pub fn is_heartbeat(&self) -> bool { + self.msg() + .map(|msg| msg == crate::SystemMsg::HEARTBEAT) + .unwrap_or_default() + } + + /// Returns the message from the Databento Live Subscription Gateway (LSG) as + /// a `&str`. + /// + /// # Errors + /// This function returns an error if `msg` contains invalid UTF-8. + pub fn msg(&self) -> Result<&str> { + c_chars_to_str(&self.msg) + } +} diff --git a/rust/dbn/src/v2.rs b/rust/dbn/src/v2.rs new file mode 100644 index 0000000..0b066b8 --- /dev/null +++ b/rust/dbn/src/v2.rs @@ -0,0 +1,182 @@ +//! Record data types for encoding different Databento [`Schema`](crate::enums::Schema)s +//! in DBN version 2. + +use std::os::raw::c_char; + +pub use crate::compat::SYMBOL_CSTR_LEN_V2 as SYMBOL_CSTR_LEN; +pub use crate::record::{ + Bbo1MMsg, Bbo1SMsg, BboMsg, Cbbo1MMsg, Cbbo1SMsg, CbboMsg, Cmbp1Msg, ErrorMsg, ImbalanceMsg, + InstrumentDefMsg, MboMsg, OhlcvMsg, StatMsg, StatusMsg, SymbolMappingMsg, SystemMsg, TbboMsg, + TcbboMsg, TradeMsg, WithTsOut, +}; + +use crate::{compat::SymbolMappingRec, rtype, v1, RecordHeader}; + +impl From<&v1::InstrumentDefMsg> for InstrumentDefMsg { + fn from(old: &v1::InstrumentDefMsg) -> Self { + let mut res = Self { + // recalculate length + hd: RecordHeader::new::( + rtype::INSTRUMENT_DEF, + old.hd.publisher_id, + old.hd.instrument_id, + old.hd.ts_event, + ), + ts_recv: old.ts_recv, + min_price_increment: old.min_price_increment, + display_factor: old.display_factor, + expiration: old.expiration, + activation: old.activation, + high_limit_price: old.high_limit_price, + low_limit_price: old.low_limit_price, + max_price_variation: old.max_price_variation, + trading_reference_price: old.trading_reference_price, + unit_of_measure_qty: old.unit_of_measure_qty, + min_price_increment_amount: old.min_price_increment_amount, + price_ratio: old.price_ratio, + inst_attrib_value: old.inst_attrib_value, + underlying_id: old.underlying_id, + raw_instrument_id: old.raw_instrument_id, + market_depth_implied: old.market_depth_implied, + market_depth: old.market_depth, + market_segment_id: old.market_segment_id, + max_trade_vol: old.max_trade_vol, + min_lot_size: old.min_lot_size, + min_lot_size_block: old.min_lot_size_block, + min_lot_size_round_lot: old.min_lot_size_round_lot, + min_trade_vol: old.min_trade_vol, + contract_multiplier: old.contract_multiplier, + decay_quantity: old.decay_quantity, + original_contract_size: old.original_contract_size, + trading_reference_date: old.trading_reference_date, + appl_id: old.appl_id, + maturity_year: old.maturity_year, + decay_start_date: old.decay_start_date, + channel_id: old.channel_id, + currency: old.currency, + settl_currency: old.settl_currency, + secsubtype: old.secsubtype, + group: old.group, + exchange: old.exchange, + asset: old.asset, + cfi: old.cfi, + security_type: old.security_type, + unit_of_measure: old.unit_of_measure, + underlying: old.underlying, + strike_price_currency: old.strike_price_currency, + instrument_class: old.instrument_class, + strike_price: old.strike_price, + match_algorithm: old.match_algorithm, + md_security_trading_status: old.md_security_trading_status, + main_fraction: old.main_fraction, + price_display_format: old.price_display_format, + settl_price_type: old.settl_price_type, + sub_fraction: old.sub_fraction, + underlying_product: old.underlying_product, + security_update_action: old.security_update_action as c_char, + maturity_month: old.maturity_month, + maturity_day: old.maturity_day, + maturity_week: old.maturity_week, + user_defined_instrument: old.user_defined_instrument, + contract_multiplier_unit: old.contract_multiplier_unit, + flow_schedule_type: old.flow_schedule_type, + tick_rule: old.tick_rule, + ..Default::default() + }; + // Safety: SYMBOL_CSTR_LEN_V1 is less than SYMBOL_CSTR_LEN + unsafe { + std::ptr::copy_nonoverlapping( + old.raw_symbol.as_ptr(), + res.raw_symbol.as_mut_ptr(), + v1::SYMBOL_CSTR_LEN, + ); + } + res + } +} + +impl From<&v1::ErrorMsg> for ErrorMsg { + fn from(old: &v1::ErrorMsg) -> Self { + let mut new = Self { + hd: RecordHeader::new::( + rtype::ERROR, + old.hd.publisher_id, + old.hd.instrument_id, + old.hd.ts_event, + ), + ..Default::default() + }; + // Safety: new `err` is longer than older + unsafe { + std::ptr::copy_nonoverlapping(old.err.as_ptr(), new.err.as_mut_ptr(), new.err.len()); + } + new + } +} + +impl From<&v1::SymbolMappingMsg> for SymbolMappingMsg { + fn from(old: &v1::SymbolMappingMsg) -> Self { + let mut res = Self { + hd: RecordHeader::new::( + rtype::SYMBOL_MAPPING, + old.hd.publisher_id, + old.hd.instrument_id, + old.hd.ts_event, + ), + start_ts: old.start_ts, + end_ts: old.end_ts, + ..Default::default() + }; + // Safety: SYMBOL_CSTR_LEN_V1 is less than SYMBOL_CSTR_LEN + unsafe { + std::ptr::copy_nonoverlapping( + old.stype_in_symbol.as_ptr(), + res.stype_in_symbol.as_mut_ptr(), + v1::SYMBOL_CSTR_LEN, + ); + std::ptr::copy_nonoverlapping( + old.stype_out_symbol.as_ptr(), + res.stype_out_symbol.as_mut_ptr(), + v1::SYMBOL_CSTR_LEN, + ); + } + res + } +} + +impl From<&v1::SystemMsg> for SystemMsg { + fn from(old: &v1::SystemMsg) -> Self { + let mut new = Self { + hd: RecordHeader::new::( + rtype::SYSTEM, + old.hd.publisher_id, + old.hd.instrument_id, + old.hd.ts_event, + ), + ..Default::default() + }; + // Safety: new `msg` is longer than older + unsafe { + std::ptr::copy_nonoverlapping(old.msg.as_ptr(), new.msg.as_mut_ptr(), new.msg.len()); + } + new + } +} + +impl SymbolMappingRec for SymbolMappingMsg { + fn stype_in_symbol(&self) -> crate::Result<&str> { + Self::stype_in_symbol(self) + } + + fn stype_out_symbol(&self) -> crate::Result<&str> { + Self::stype_out_symbol(self) + } + + fn start_ts(&self) -> Option { + Self::start_ts(self) + } + + fn end_ts(&self) -> Option { + Self::end_ts(self) + } +}