Skip to content

Commit

Permalink
FIX: Fix ts_out decoding with DBNv1 upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen committed Feb 28, 2024
1 parent ac0d8bf commit 13b5ac2
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 30 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
field delimiter character, allowing DBN to be encoded as tab-separated values (TSV)
- Document cancellation safety for `AsyncRecordDecoder::decode_ref` (credit: @yongqli)
- Added new publisher values for consolidated DBEQ.MAX
- Added C FFI conversion functions from `ErrorMsgV1` to `ErrorMsg` and `SystemMsgV1`
to `SystemMsg`
- Upgraded `async-compression` to 0.4.6
- Upgraded `strum` to 0.26

Expand All @@ -24,10 +26,14 @@
- Made `StatType` and `VersionUpgradePolicy` non-exhaustive to allow future additions
without breaking changes
- Renamed `_dummy` field in `ImbalanceMsg` and `StatMsg` to `_reserved`
- Added `ts_out` parameter to `RecordDecoder` and `AsyncRecordDecoder`
`with_upgrade_policy` methods

### Bug fixes
- Fixed handling of `ts_out` when upgrading DBNv1 records to version 2
- Added missing `StatType::Vwap` variant used in the ICE datasets
- Fixed an issue with Python stub file distribution
- Fixed missing handling of `ErrorMsgV1` and `SystemMsgV1` in `rtype` dispatch macros

## 0.15.1 - 2024-01-23

Expand Down
16 changes: 14 additions & 2 deletions c/src/compat.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use dbn::{
compat::{InstrumentDefMsgV1, SymbolMappingMsgV1},
InstrumentDefMsg, SymbolMappingMsg,
compat::{ErrorMsgV1, InstrumentDefMsgV1, SymbolMappingMsgV1, SystemMsgV1},
ErrorMsg, InstrumentDefMsg, SymbolMappingMsg, SystemMsg,
};

/// Converts an V1 ErrorMsg to V2.
#[no_mangle]
pub extern "C" fn from_error_v1_to_v2(def_v1: &ErrorMsgV1) -> ErrorMsg {
ErrorMsg::from(def_v1)
}

/// Converts an V1 InstrumentDefMsg to V2.
#[no_mangle]
pub extern "C" fn from_instrument_def_v1_to_v2(def_v1: &InstrumentDefMsgV1) -> InstrumentDefMsg {
Expand All @@ -14,3 +20,9 @@ pub extern "C" fn from_instrument_def_v1_to_v2(def_v1: &InstrumentDefMsgV1) -> I
pub extern "C" fn from_symbol_mapping_v1_to_v2(def_v1: &SymbolMappingMsgV1) -> SymbolMappingMsg {
SymbolMappingMsg::from(def_v1)
}

/// Converts an V1 SystemMsg to V2.
#[no_mangle]
pub extern "C" fn from_system_v1_to_v2(def_v1: &SystemMsgV1) -> SystemMsg {
SystemMsg::from(def_v1)
}
10 changes: 7 additions & 3 deletions python/src/dbn_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,13 @@ impl DbnDecoder {
}
}
let mut read_position = self.buffer.position() as usize;
let mut decoder =
RecordDecoder::with_version(&mut self.buffer, self.input_version, self.upgrade_policy)
.map_err(to_val_err)?;
let mut decoder = RecordDecoder::with_version(
&mut self.buffer,
self.input_version,
self.upgrade_policy,
self.ts_out,
)
.map_err(to_val_err)?;
Python::with_gil(|py| -> PyResult<()> {
while let Some(rec) = decoder.decode_ref().map_err(to_val_err)? {
// Bug in clippy generates an error here. trivial_copy feature isn't enabled,
Expand Down
3 changes: 3 additions & 0 deletions python/src/transcoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ impl<const OUTPUT_ENC: u8> Inner<OUTPUT_ENC> {
&mut self.buffer,
self.input_version,
self.upgrade_policy,
self.ts_out,
)
.map_err(to_val_err)?;
let mut encoder = DbnRecordEncoder::new(&mut self.output);
Expand Down Expand Up @@ -246,6 +247,7 @@ impl<const OUTPUT_ENC: u8> Inner<OUTPUT_ENC> {
&mut self.buffer,
self.input_version,
self.upgrade_policy,
self.ts_out,
)
.map_err(to_val_err)?;

Expand Down Expand Up @@ -300,6 +302,7 @@ impl<const OUTPUT_ENC: u8> Inner<OUTPUT_ENC> {
&mut self.buffer,
self.input_version,
self.upgrade_policy,
self.ts_out,
)
.map_err(to_val_err)?;

Expand Down
9 changes: 8 additions & 1 deletion rust/dbn-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@ use dbn_cli::{
const STDIN_SENTINEL: &str = "-";

fn wrap_frag(args: &Args, reader: impl io::Read) -> anyhow::Result<impl DecodeRecordRef> {
// assume no ts_out for fragments
const TS_OUT: bool = false;
Ok(LimitFilter::new_no_metadata(
SchemaFilter::new_no_metadata(
DbnRecordDecoder::with_version(reader, args.input_version(), args.upgrade_policy())?,
DbnRecordDecoder::with_version(
reader,
args.input_version(),
args.upgrade_policy(),
TS_OUT,
)?,
args.schema_filter,
),
args.limit,
Expand Down
106 changes: 89 additions & 17 deletions rust/dbn/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
macros::{dbn_record, CsvSerialize, JsonSerialize},
record::{transmute_header_bytes, transmute_record_bytes},
rtype, HasRType, RecordHeader, RecordRef, SecurityUpdateAction, UserDefinedInstrument,
VersionUpgradePolicy,
VersionUpgradePolicy, WithTsOut,
};

// Dummy derive macro to get around `cfg_attr` incompatibility of several
Expand Down Expand Up @@ -44,43 +44,59 @@ pub use crate::record::SystemMsg as SystemMsgV2;
pub unsafe fn decode_record_ref<'a>(
version: u8,
upgrade_policy: VersionUpgradePolicy,
ts_out: bool,
compat_buffer: &'a mut [u8; crate::MAX_RECORD_LEN],
input: &'a [u8],
) -> RecordRef<'a> {
if version == 1 && upgrade_policy == VersionUpgradePolicy::Upgrade {
let header = transmute_header_bytes(input).unwrap();
match header.rtype {
rtype::INSTRUMENT_DEF => {
let definition = InstrumentDefMsgV2::from(
transmute_record_bytes::<InstrumentDefMsgV1>(input).unwrap(),
return upgrade_record::<InstrumentDefMsgV1, InstrumentDefMsgV2>(
ts_out,
compat_buffer,
input,
);
std::ptr::copy_nonoverlapping(&definition, compat_buffer.as_mut_ptr().cast(), 1);
return RecordRef::new(compat_buffer);
}
rtype::SYMBOL_MAPPING => {
let definition = SymbolMappingMsgV2::from(
transmute_record_bytes::<SymbolMappingMsgV1>(input).unwrap(),
return upgrade_record::<SymbolMappingMsgV1, SymbolMappingMsgV2>(
ts_out,
compat_buffer,
input,
);
std::ptr::copy_nonoverlapping(&definition, compat_buffer.as_mut_ptr().cast(), 1);
return RecordRef::new(compat_buffer);
}
rtype::ERROR => {
let system = ErrorMsgV2::from(transmute_record_bytes::<ErrorMsgV1>(input).unwrap());
std::ptr::copy_nonoverlapping(&system, compat_buffer.as_mut_ptr().cast(), 1);
return RecordRef::new(compat_buffer);
return upgrade_record::<ErrorMsgV1, ErrorMsgV2>(ts_out, compat_buffer, input);
}
rtype::SYSTEM => {
let system =
SystemMsgV2::from(transmute_record_bytes::<SystemMsgV1>(input).unwrap());
std::ptr::copy_nonoverlapping(&system, compat_buffer.as_mut_ptr().cast(), 1);
return RecordRef::new(compat_buffer);
return upgrade_record::<SystemMsgV1, SystemMsgV2>(ts_out, compat_buffer, input);
}
_ => (),
}
}
RecordRef::new(input)
}

unsafe fn upgrade_record<'a, T, U>(
ts_out: bool,
compat_buffer: &'a mut [u8; crate::MAX_RECORD_LEN],
input: &'a [u8],
) -> RecordRef<'a>
where
T: HasRType,
U: HasRType + for<'b> From<&'b T>,
{
if ts_out {
let rec = transmute_record_bytes::<WithTsOut<T>>(input).unwrap();
let upgraded = WithTsOut::new(U::from(&rec.rec), rec.ts_out);
std::ptr::copy_nonoverlapping(&upgraded, compat_buffer.as_mut_ptr().cast(), 1);
} else {
let upgraded = U::from(transmute_record_bytes::<T>(input).unwrap());
std::ptr::copy_nonoverlapping(&upgraded, compat_buffer.as_mut_ptr().cast(), 1);
}
RecordRef::new(compat_buffer)
}

/// Definition of an instrument in DBN version 1. The record of the
/// [`Definition`](crate::enums::Schema::Definition) schema.
#[repr(C)]
Expand Down Expand Up @@ -621,10 +637,13 @@ impl SymbolMappingRec for SymbolMappingMsgV2 {

#[cfg(test)]
mod tests {
use std::mem;
use std::{ffi::c_char, mem};

use time::OffsetDateTime;
use type_layout::{Field, TypeLayout};

use crate::{Mbp1Msg, Record, MAX_RECORD_LEN};

use super::*;

#[cfg(feature = "python")]
Expand Down Expand Up @@ -671,4 +690,57 @@ mod tests {
);
}
}

#[test]
fn upgrade_symbol_mapping_ts_out() -> crate::Result<()> {
let orig = WithTsOut::new(
SymbolMappingMsgV1::new(1, 2, "ES.c.0", "ESH4", 0, 0)?,
OffsetDateTime::now_utc().unix_timestamp_nanos() as u64,
);
let mut compat_buffer = [0; MAX_RECORD_LEN];
let res = unsafe {
decode_record_ref(
1,
VersionUpgradePolicy::Upgrade,
true,
&mut compat_buffer,
orig.as_ref(),
)
};
let upgraded = res.get::<WithTsOut<SymbolMappingMsgV2>>().unwrap();
assert_eq!(orig.ts_out, upgraded.ts_out);
assert_eq!(orig.rec.stype_in_symbol()?, upgraded.rec.stype_in_symbol()?);
assert_eq!(
orig.rec.stype_out_symbol()?,
upgraded.rec.stype_out_symbol()?
);
assert_eq!(upgraded.record_size(), std::mem::size_of_val(upgraded));
// used compat buffer
assert!(std::ptr::addr_eq(upgraded.header(), compat_buffer.as_ptr()));
Ok(())
}

#[test]
fn upgrade_mbp1_ts_out() -> crate::Result<()> {
let rec = Mbp1Msg {
price: 1_250_000_000,
side: b'A' as c_char,
..Default::default()
};
let orig = WithTsOut::new(rec, OffsetDateTime::now_utc().unix_timestamp_nanos() as u64);
let mut compat_buffer = [0; MAX_RECORD_LEN];
let res = unsafe {
decode_record_ref(
1,
VersionUpgradePolicy::Upgrade,
true,
&mut compat_buffer,
orig.as_ref(),
)
};
let upgraded = res.get::<WithTsOut<Mbp1Msg>>().unwrap();
// compat buffer unused and pointer unchanged
assert!(std::ptr::eq(orig.header(), upgraded.header()));
Ok(())
}
}
14 changes: 12 additions & 2 deletions rust/dbn/src/decode/dbn/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ where
reader,
metadata.version,
VersionUpgradePolicy::Upgrade,
metadata.ts_out,
)?,
metadata,
})
Expand All @@ -79,7 +80,7 @@ where
let version = metadata.version;
metadata.upgrade(upgrade_policy);
Ok(Self {
decoder: RecordDecoder::with_version(reader, version, upgrade_policy)?,
decoder: RecordDecoder::with_version(reader, version, upgrade_policy, metadata.ts_out)?,
metadata,
})
}
Expand Down Expand Up @@ -227,6 +228,7 @@ where
{
version: u8,
upgrade_policy: VersionUpgradePolicy,
ts_out: bool,
reader: R,
state: DecoderState,
framer: RecordFrameDecoder,
Expand All @@ -250,7 +252,7 @@ where
/// Note: assumes the input is of the current DBN version. To decode records from a
/// previous version, use [`RecordDecoder::with_version()`].
pub fn new(reader: R) -> Self {
Self::with_version(reader, DBN_VERSION, VersionUpgradePolicy::AsIs).unwrap()
Self::with_version(reader, DBN_VERSION, VersionUpgradePolicy::AsIs, false).unwrap()
}

/// Creates a new `RecordDecoder` that will decode from `reader`
Expand All @@ -262,13 +264,15 @@ where
reader: R,
version: u8,
upgrade_policy: VersionUpgradePolicy,
ts_out: bool,
) -> crate::Result<Self> {
if version > DBN_VERSION {
return Err(crate::Error::decode(format!("can't decode newer version of DBN. Decoder version is {DBN_VERSION}, input version is {version}")));
}
Ok(Self {
version,
upgrade_policy,
ts_out,
reader,
state: DecoderState::Read,
framer: RecordFrameDecoder::Head,
Expand Down Expand Up @@ -296,6 +300,11 @@ where
self.upgrade_policy = upgrade_policy;
}

/// Sets whether to expect a send timestamp appended after every record.
pub fn set_ts_out(&mut self, ts_out: bool) {
self.ts_out = ts_out;
}

/// Tries to decode a single record and returns a reference to the record that
/// lasts until the next method call. Returns `None` if `reader` has been
/// exhausted.
Expand Down Expand Up @@ -367,6 +376,7 @@ where
compat::decode_record_ref(
self.version,
self.upgrade_policy,
self.ts_out,
&mut self.compat_buf,
// Recreate slice to get around borrow checker
std::slice::from_raw_parts(frame.as_ptr(), frame.len()),
Expand Down
Loading

0 comments on commit 13b5ac2

Please sign in to comment.