diff --git a/CHANGELOG.md b/CHANGELOG.md index 31cd25c..c1dd21d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,17 @@ # Changelog -## 0.11.2 - TBD +## 0.12.0 - TBD ### Enhancements -- Added new publisher values in preparation for DBEQ.PLUS -- Added `from_dataset_venue` function to `Publisher` to facilitate - destructuring. +- Added `map_symbols` support to Python `Transcoder` +- Added new publisher variants in preparation for DBEQ.PLUS dataset +- Added `from_dataset_venue` function to `Publisher` to facilitate destructuring +- Implemented `Default` for most records to make testing easier +- Added `from_zstd` function to `AsyncDbnEncoder` to match synchronous encoder +- Added re-exports for `enums::rtype`, `record::BidAskPair`, `record::RecordHeader`, and + `record::WithTsOut` to simplify imports + +### Breaking changes +- Changed `Default` implementation for `BidAskPair` by setting prices to `UNDEF_PRICE` ## 0.11.1 - 2023-10-05 ### Enhancements diff --git a/Cargo.lock b/Cargo.lock index 788ce1b..f29f6e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,7 @@ dependencies = [ "json-writer", "num_enum", "pyo3", + "rstest", "serde", "streaming-iterator", "strum", @@ -317,6 +318,7 @@ dependencies = [ "clap", "dbn", "predicates", + "rstest", "serde", "tempfile", "zstd", @@ -402,12 +404,101 @@ dependencies = [ "num-traits", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "gimli" version = "0.28.0" @@ -646,6 +737,12 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.27" @@ -818,12 +915,56 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "relative-path" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c707298afce11da2efef2f600116fa93ffa7a032b5d7b628aa17711ec81383ca" + +[[package]] +name = "rstest" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97eeab2f3c0a199bc4be135c36c924b6590b88c377d416494288c14f2db30199" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version", +] + +[[package]] +name = "rstest_macros" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d428f8247852f894ee1be110b375111b586d4fa431f6c46e64ba5a0dcccbe605" +dependencies = [ + "cfg-if", + "glob", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn 2.0.37", + "unicode-ident", +] + [[package]] name = "rustc-demangle" version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.17" @@ -855,6 +996,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0" + [[package]] name = "serde" version = "1.0.188" @@ -886,6 +1033,15 @@ dependencies = [ "serde", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.11.1" diff --git a/rust/dbn-cli/Cargo.toml b/rust/dbn-cli/Cargo.toml index 841a5bc..fa82c48 100644 --- a/rust/dbn-cli/Cargo.toml +++ b/rust/dbn-cli/Cargo.toml @@ -32,5 +32,6 @@ zstd = "0.12.4" assert_cmd = "2.0.12" # assert_cmd companion predicates = "3.0.3" +rstest = "0.18.2" # A library for managing temporary files and directories tempfile = "3.7.0" diff --git a/rust/dbn-cli/tests/integration_tests.rs b/rust/dbn-cli/tests/integration_tests.rs index d4e4be0..799d065 100644 --- a/rust/dbn-cli/tests/integration_tests.rs +++ b/rust/dbn-cli/tests/integration_tests.rs @@ -6,6 +6,7 @@ use std::{ use assert_cmd::Command; use predicates::str::{contains, ends_with, is_empty, is_match, starts_with}; +use rstest::rstest; use tempfile::{tempdir, NamedTempFile}; fn cmd() -> Command { @@ -14,14 +15,14 @@ fn cmd() -> Command { const TEST_DATA_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/data"); -#[test] -fn write_json_to_path() { +#[rstest] +fn write_json_to_path(#[values("dbn", "dbn.zst")] extension: &str) { // create a directory whose contents will be cleaned up at the end of the test let output_dir = tempdir().unwrap(); let output_path = format!("{}/a.json", output_dir.path().to_str().unwrap()); cmd() .args([ - &format!("{TEST_DATA_PATH}/test_data.mbp-1.dbn.zst"), + &format!("{TEST_DATA_PATH}/test_data.mbp-1.{extension}"), "--output", &output_path, "--json", @@ -79,14 +80,14 @@ fn read_from_nonexistent_path() { .stderr(contains("Error opening file to decode")); } -#[test] -fn write_csv() { +#[rstest] +fn write_csv(#[values("dbn", "dbn.zst")] extension: &str) { // create a directory whose contents will be cleaned up at the end of the test let output_dir = tempdir().unwrap(); - let output_path = format!("{}/a.json", output_dir.path().to_str().unwrap()); + let output_path = format!("{}/a.csv", output_dir.path().to_str().unwrap()); cmd() .args([ - &format!("{TEST_DATA_PATH}/test_data.mbp-1.dbn"), + &format!("{TEST_DATA_PATH}/test_data.mbp-1.{extension}"), "--output", &output_path, "--csv", @@ -307,20 +308,23 @@ fn pretty_print_data_metadata() { .stderr(is_empty()); } -#[test] -fn read_from_stdin() { +#[rstest] +fn read_from_stdin(#[values("csv", "json")] output_enc: &str) { let path = format!("{TEST_DATA_PATH}/test_data.mbp-10.dbn.zst"); let read_from_stdin_output = cmd() .args([ "-", // STDIN - "--json", + &format!("--{output_enc}"), ]) // Pipe input from file .pipe_stdin(&path) .unwrap() .ok() .unwrap(); - let read_from_file_output = cmd().args([&path, "--json"]).ok().unwrap(); + let read_from_file_output = cmd() + .args([&path, &format!("--{output_enc}")]) + .ok() + .unwrap(); assert_eq!(read_from_stdin_output.stdout, read_from_file_output.stdout); assert!(read_from_stdin_output.stderr.is_empty()); assert!(read_from_file_output.stderr.is_empty()); @@ -357,115 +361,59 @@ fn metadata_conflicts_with_limit() { .stderr(contains("'--metadata' cannot be used with '--limit")); } -#[test] -fn fragment_conflicts_with_metadata() { +#[rstest] +#[case::uncompressed("--fragment", "dbn.frag")] +#[case::zstd("--zstd-fragment", "dbn.frag.zst")] +fn fragment_conflicts_with_metadata(#[case] flag: &str, #[case] extension: &str) { cmd() .args([ - &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag"), - "--fragment", + &format!("{TEST_DATA_PATH}/test_data.definition.{extension}"), + flag, "--json", "--metadata", ]) .assert() .failure() - .stderr(contains("'--fragment' cannot be used with '--metadata'")); + .stderr(contains(&format!( + "'{flag}' cannot be used with '--metadata'" + ))); } -#[test] -fn zstd_fragment_conflicts_with_metadata() { +#[rstest] +#[case::uncompressed("--fragment", "dbn.frag")] +#[case::zstd("--zstd-fragment", "dbn.frag.zst")] +fn fragment_conflicts_with_dbn_output(#[case] flag: &str, #[case] extension: &str) { cmd() .args([ - &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag.zst"), - "--zstd-fragment", - "--json", - "--metadata", - ]) - .assert() - .failure() - .stderr(contains( - "'--zstd-fragment' cannot be used with '--metadata'", - )); -} - -#[test] -fn fragment_conflicts_with_dbn_output() { - cmd() - .args([ - &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag"), - "--fragment", + &format!("{TEST_DATA_PATH}/test_data.definition.{extension}"), + flag, "--dbn", ]) .assert() .failure() - .stderr(contains("'--fragment' cannot be used with '--dbn'")); + .stderr(contains(&format!("'{flag}' cannot be used with '--dbn'"))); } -#[test] -fn zstd_fragment_conflicts_with_dbn_output() { +#[rstest] +#[case::uncompressed_to_csv("csv", "--fragment", "dbn.frag", 3)] +#[case::uncompressed_to_json("json", "--fragment", "dbn.frag", 2)] +#[case::zstd_to_csv("csv", "--zstd-fragment", "dbn.frag.zst", 3)] +#[case::zstd_to_json("json", "--zstd-fragment", "dbn.frag.zst", 2)] +fn test_fragment( + #[case] output_enc: &str, + #[case] flag: &str, + #[case] extension: &str, + #[case] exp_line_count: usize, +) { cmd() .args([ - &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag.zst"), - "--zstd-fragment", - "--dbn", - ]) - .assert() - .failure() - .stderr(contains("'--zstd-fragment' cannot be used with '--dbn'")); -} - -#[test] -fn test_fragment() { - cmd() - .args([ - &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag"), - "--fragment", - "--json", - ]) - .assert() - .success() - .stdout(contains('\n').count(2)) - .stderr(is_empty()); -} - -#[test] -fn test_writes_csv_header_for_fragment() { - cmd() - .args([ - &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag"), - "--fragment", - "--csv", + &format!("{TEST_DATA_PATH}/test_data.definition.{extension}"), + flag, + &format!("--{output_enc}"), ]) .assert() .success() - .stdout(contains('\n').count(3)) - .stderr(is_empty()); -} - -#[test] -fn test_zstd_fragment() { - cmd() - .args([ - &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag.zst"), - "--zstd-fragment", - "--json", - ]) - .assert() - .success() - .stdout(contains('\n').count(2)) - .stderr(is_empty()); -} - -#[test] -fn test_writes_csv_header_for_zstd_fragment() { - cmd() - .args([ - &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag.zst"), - "--zstd-fragment", - "--csv", - ]) - .assert() - .success() - .stdout(contains('\n').count(3)) + .stdout(contains('\n').count(exp_line_count)) .stderr(is_empty()); } @@ -517,50 +465,40 @@ fn test_limit_updates_metadata() { .stdout(contains('\n').count(1)); } -#[cfg(not(target_os = "windows"))] // no `false` -#[test] -fn broken_pipe_is_silent() { - let dbn_cmd = process::Command::new(assert_cmd::cargo::cargo_bin("dbn")) - .args([&format!("{TEST_DATA_PATH}/test_data.mbo.dbn.zst"), "--json"]) - .stdout(process::Stdio::piped()) - .stderr(process::Stdio::piped()) - .spawn() - .unwrap(); - let mut false_cmd = process::Command::new("false"); - false_cmd.stdin(dbn_cmd.stdout.unwrap()); - Command::from_std(false_cmd) - .assert() - .failure() - .stdout(is_empty()) - .stderr(is_empty()); - let mut stderr = String::new(); - dbn_cmd.stderr.unwrap().read_to_string(&mut stderr).unwrap(); - assert!(stderr.is_empty(), "Stderr: {stderr}"); -} - -#[cfg(not(target_os = "windows"))] // no `false` -#[test] -fn broken_pipe_is_silent_fragment() { - // Test fragment separately because it's a different code path - let dbn_cmd = process::Command::new(assert_cmd::cargo::cargo_bin("dbn")) - .args([ - &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag"), - "--fragment", - "--csv", - ]) +#[cfg(not(target_os = "windows"))] +#[rstest] +#[case::uncompressed_to_csv("test_data.mbo.dbn", "--csv", "")] +#[case::zstd_to_csv("test_data.mbo.dbn.zst", "--csv", "")] +#[case::uncompressed_fragment_to_csv("test_data.definition.dbn.frag", "--csv", "--fragment")] +#[case::zstd_fragment_to_csv("test_data.definition.dbn.frag.zst", "--csv", "--zstd-fragment")] +#[case::uncompressed_to_json("test_data.mbo.dbn", "--json", "")] +#[case::zstd_to_json("test_data.mbo.dbn.zst", "--json", "")] +#[case::uncompressed_fragment_to_json("test_data.definition.dbn.frag", "--json", "--fragment")] +#[case::zstd_fragment_to_json("test_data.definition.dbn.frag.zst", "--json", "--zstd-fragment")] +fn broken_pipe_is_silent( + #[case] file_name: &str, + #[case] output_flag: &str, + #[case] fragment_flag: &str, +) { + let mut dbn_cmd = process::Command::new(assert_cmd::cargo::cargo_bin("dbn")); + dbn_cmd.args([&format!("{TEST_DATA_PATH}/{file_name}"), output_flag]); + if !fragment_flag.is_empty() { + dbn_cmd.arg(fragment_flag); + } + let dbn_res = dbn_cmd .stdout(process::Stdio::piped()) .stderr(process::Stdio::piped()) .spawn() .unwrap(); let mut false_cmd = process::Command::new("false"); - false_cmd.stdin(dbn_cmd.stdout.unwrap()); + false_cmd.stdin(dbn_res.stdout.unwrap()); Command::from_std(false_cmd) .assert() .failure() .stdout(is_empty()) .stderr(is_empty()); let mut stderr = String::new(); - dbn_cmd.stderr.unwrap().read_to_string(&mut stderr).unwrap(); + dbn_res.stderr.unwrap().read_to_string(&mut stderr).unwrap(); assert!(stderr.is_empty(), "Stderr: {stderr}"); } diff --git a/rust/dbn/Cargo.toml b/rust/dbn/Cargo.toml index e0ae3a4..4a10de3 100644 --- a/rust/dbn/Cargo.toml +++ b/rust/dbn/Cargo.toml @@ -53,5 +53,6 @@ tokio = { version = "1", features = ["fs", "io-util"], optional = true } zstd = "0.12" [dev-dependencies] +rstest = "0.18.2" strum = { version = "0.25", features = ["derive"] } tokio = { version = "1", features = ["fs", "io-util", "macros", "rt-multi-thread"] } diff --git a/rust/dbn/src/decode/dbn/async.rs b/rust/dbn/src/decode/dbn/async.rs index 596a390..801c111 100644 --- a/rust/dbn/src/decode/dbn/async.rs +++ b/rust/dbn/src/decode/dbn/async.rs @@ -358,138 +358,110 @@ where #[cfg(test)] mod tests { + use rstest::rstest; use tokio::io::AsyncWriteExt; use super::*; use crate::{ decode::tests::TEST_DATA_PATH, - encode::dbn::{AsyncMetadataEncoder, AsyncRecordEncoder}, + encode::{ + dbn::{AsyncEncoder, AsyncRecordEncoder}, + DbnEncodable, + }, enums::{rtype, Schema}, record::{ ErrorMsg, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg, RecordHeader, StatMsg, TbboMsg, TradeMsg, WithTsOut, }, - Error, + Error, Result, }; - macro_rules! test_dbn_identity { - ($test_name:ident, $record_type:ident, $schema:expr) => { - #[tokio::test] - async fn $test_name() { - let mut file = - tokio::fs::File::open(format!("{TEST_DATA_PATH}/test_data.{}.dbn", $schema)) - .await - .unwrap(); - let file_metadata = MetadataDecoder::new(&mut file).decode().await.unwrap(); - let mut file_decoder = RecordDecoder::new(&mut file); - let mut file_records = Vec::new(); - while let Ok(Some(record)) = file_decoder.decode::<$record_type>().await { - file_records.push(record.clone()); - } - let mut buffer = Vec::new(); - AsyncMetadataEncoder::new(&mut buffer) - .encode(&file_metadata) - .await - .unwrap(); - assert_eq!(file_records.is_empty(), $schema == Schema::Ohlcv1D); - let mut buf_encoder = AsyncRecordEncoder::new(&mut buffer); - for record in file_records.iter() { - buf_encoder.encode(record).await.unwrap(); - } - let mut buf_cursor = std::io::Cursor::new(&mut buffer); - let buf_metadata = MetadataDecoder::new(&mut buf_cursor) - .decode() - .await - .unwrap(); - assert_eq!(buf_metadata, file_metadata); - let mut buf_decoder = RecordDecoder::new(&mut buf_cursor); - let mut buf_records = Vec::new(); - while let Ok(Some(record)) = buf_decoder.decode::<$record_type>().await { - buf_records.push(record.clone()); - } - assert_eq!(buf_records, file_records); - } - }; - } + #[rstest] + #[case::mbo(Schema::Mbo, MboMsg::default())] + #[case::trades(Schema::Trades, TradeMsg::default())] + #[case::tbbo(Schema::Tbbo, TbboMsg::default())] + #[case::mbp1(Schema::Mbp1, Mbp1Msg::default())] + #[case::mbp10(Schema::Mbp10, Mbp10Msg::default())] + #[case::ohlcv1d(Schema::Ohlcv1D, OhlcvMsg::default_for_schema(Schema::Ohlcv1D))] + #[case::ohlcv1h(Schema::Ohlcv1H, OhlcvMsg::default_for_schema(Schema::Ohlcv1H))] + #[case::ohlcv1m(Schema::Ohlcv1M, OhlcvMsg::default_for_schema(Schema::Ohlcv1M))] + #[case::ohlcv1s(Schema::Ohlcv1S, OhlcvMsg::default_for_schema(Schema::Ohlcv1S))] + #[case::definitions(Schema::Definition, InstrumentDefMsg::default())] + #[case::imbalance(Schema::Imbalance, ImbalanceMsg::default())] + #[case::statistics(Schema::Statistics, StatMsg::default())] + #[tokio::test] + async fn test_dbn_identity( + #[case] schema: Schema, + #[case] _rec: R, + ) -> Result<()> { + let mut file_decoder = + Decoder::from_file(format!("{TEST_DATA_PATH}/test_data.{schema}.dbn")).await?; + let file_metadata = file_decoder.metadata().clone(); + let mut file_records = Vec::new(); + while let Some(record) = file_decoder.decode_record::().await? { + file_records.push(record.clone()); + } + assert_eq!(file_records.is_empty(), schema == Schema::Ohlcv1D); + let mut buffer = Vec::new(); + let mut buf_encoder = AsyncEncoder::new(&mut buffer, &file_metadata).await?; - macro_rules! test_dbn_zstd_identity { - ($test_name:ident, $record_type:ident, $schema:expr) => { - #[tokio::test] - async fn $test_name() { - let file = tokio::fs::File::open(format!( - "{TEST_DATA_PATH}/test_data.{}.dbn.zst", - $schema - )) - .await - .unwrap(); - let mut file_decoder = Decoder::with_zstd(file).await.unwrap(); - let file_metadata = file_decoder.metadata().clone(); - let mut file_records = Vec::new(); - while let Ok(Some(record)) = file_decoder.decode_record::<$record_type>().await { - file_records.push(record.clone()); - } - let mut buffer = Vec::new(); - let mut meta_encoder = AsyncMetadataEncoder::with_zstd(&mut buffer); - meta_encoder.encode(&file_metadata).await.unwrap(); - assert_eq!(file_records.is_empty(), $schema == Schema::Ohlcv1D); - let mut buf_encoder = AsyncRecordEncoder::from(meta_encoder); - for record in file_records.iter() { - buf_encoder.encode(record).await.unwrap(); - } - buf_encoder.into_inner().shutdown().await.unwrap(); - let mut buf_cursor = std::io::Cursor::new(&mut buffer); - let mut buf_decoder = Decoder::with_zstd_buffer(&mut buf_cursor).await.unwrap(); - let buf_metadata = buf_decoder.metadata().clone(); - assert_eq!(buf_metadata, file_metadata); - let mut buf_records = Vec::new(); - while let Ok(Some(record)) = buf_decoder.decode_record::<$record_type>().await { - buf_records.push(record.clone()); - } - assert_eq!(buf_records, file_records); - } - }; + for record in file_records.iter() { + buf_encoder.encode_record(record).await.unwrap(); + } + let mut buf_cursor = std::io::Cursor::new(&mut buffer); + let mut buf_decoder = Decoder::new(&mut buf_cursor).await?; + assert_eq!(*buf_decoder.metadata(), file_metadata); + let mut buf_records = Vec::new(); + while let Some(record) = buf_decoder.decode_record::().await? { + buf_records.push(record.clone()); + } + assert_eq!(buf_records, file_records); + Ok(()) } - test_dbn_identity!(test_dbn_identity_mbo, MboMsg, Schema::Mbo); - test_dbn_zstd_identity!(test_dbn_zstd_identity_mbo, MboMsg, Schema::Mbo); - test_dbn_identity!(test_dbn_identity_mbp1, Mbp1Msg, Schema::Mbp1); - test_dbn_zstd_identity!(test_dbn_zstd_identity_mbp1, Mbp1Msg, Schema::Mbp1); - test_dbn_identity!(test_dbn_identity_mbp10, Mbp10Msg, Schema::Mbp10); - test_dbn_zstd_identity!(test_dbn_zstd_identity_mbp10, Mbp10Msg, Schema::Mbp10); - test_dbn_identity!(test_dbn_identity_ohlcv1d, OhlcvMsg, Schema::Ohlcv1D); - test_dbn_zstd_identity!(test_dbn_zstd_identity_ohlcv1d, OhlcvMsg, Schema::Ohlcv1D); - test_dbn_identity!(test_dbn_identity_ohlcv1h, OhlcvMsg, Schema::Ohlcv1H); - test_dbn_zstd_identity!(test_dbn_zstd_identity_ohlcv1h, OhlcvMsg, Schema::Ohlcv1H); - test_dbn_identity!(test_dbn_identity_ohlcv1m, OhlcvMsg, Schema::Ohlcv1M); - test_dbn_zstd_identity!(test_dbn_zstd_identity_ohlcv1m, OhlcvMsg, Schema::Ohlcv1M); - test_dbn_identity!(test_dbn_identity_ohlcv1s, OhlcvMsg, Schema::Ohlcv1S); - test_dbn_zstd_identity!(test_dbn_zstd_identity_ohlcv1s, OhlcvMsg, Schema::Ohlcv1S); - test_dbn_identity!(test_dbn_identity_tbbo, TbboMsg, Schema::Tbbo); - test_dbn_zstd_identity!(test_dbn_zstd_identity_tbbo, TbboMsg, Schema::Tbbo); - test_dbn_identity!(test_dbn_identity_trades, TradeMsg, Schema::Trades); - test_dbn_zstd_identity!(test_dbn_zstd_identity_trades, TradeMsg, Schema::Trades); - test_dbn_identity!( - test_dbn_identity_instrument_def, - InstrumentDefMsg, - Schema::Definition - ); - test_dbn_zstd_identity!( - test_dbn_zstd_identity_instrument_def, - InstrumentDefMsg, - Schema::Definition - ); - test_dbn_identity!(test_dbn_identity_imbalance, ImbalanceMsg, Schema::Imbalance); - test_dbn_zstd_identity!( - test_dbn_zstd_identity_imbalance, - ImbalanceMsg, - Schema::Imbalance - ); - test_dbn_identity!(test_dbn_identity_statistics, StatMsg, Schema::Statistics); - test_dbn_zstd_identity!( - test_dbn_zstd_identity_statistics, - StatMsg, - Schema::Statistics - ); + #[rstest] + #[case::mbo(Schema::Mbo, MboMsg::default())] + #[case::trades(Schema::Trades, TradeMsg::default())] + #[case::tbbo(Schema::Tbbo, TbboMsg::default())] + #[case::mbp1(Schema::Mbp1, Mbp1Msg::default())] + #[case::mbp10(Schema::Mbp10, Mbp10Msg::default())] + #[case::ohlcv1d(Schema::Ohlcv1D, OhlcvMsg::default_for_schema(Schema::Ohlcv1D))] + #[case::ohlcv1h(Schema::Ohlcv1H, OhlcvMsg::default_for_schema(Schema::Ohlcv1H))] + #[case::ohlcv1m(Schema::Ohlcv1M, OhlcvMsg::default_for_schema(Schema::Ohlcv1M))] + #[case::ohlcv1s(Schema::Ohlcv1S, OhlcvMsg::default_for_schema(Schema::Ohlcv1S))] + #[case::definitions(Schema::Definition, InstrumentDefMsg::default())] + #[case::imbalance(Schema::Imbalance, ImbalanceMsg::default())] + #[case::statistics(Schema::Statistics, StatMsg::default())] + #[tokio::test] + async fn test_dbn_zstd_identity( + #[case] schema: Schema, + #[case] _rec: R, + ) -> Result<()> { + let mut file_decoder = + Decoder::from_zstd_file(format!("{TEST_DATA_PATH}/test_data.{schema}.dbn.zst")).await?; + let file_metadata = file_decoder.metadata().clone(); + let mut file_records = Vec::new(); + while let Some(record) = file_decoder.decode_record::().await? { + file_records.push(record.clone()); + } + assert_eq!(file_records.is_empty(), schema == Schema::Ohlcv1D); + let mut buffer = Vec::new(); + let mut buf_encoder = AsyncEncoder::with_zstd(&mut buffer, &file_metadata).await?; + + for record in file_records.iter() { + buf_encoder.encode_record(record).await.unwrap(); + } + buf_encoder.get_mut().shutdown().await.unwrap(); + let mut buf_cursor = std::io::Cursor::new(&mut buffer); + let mut buf_decoder = Decoder::with_zstd(&mut buf_cursor).await?; + assert_eq!(*buf_decoder.metadata(), file_metadata); + let mut buf_records = Vec::new(); + while let Some(record) = buf_decoder.decode_record::().await? { + buf_records.push(record.clone()); + } + assert_eq!(buf_records, file_records); + Ok(()) + } #[tokio::test] async fn test_dbn_identity_with_ts_out() { diff --git a/rust/dbn/src/decode/dbn/sync.rs b/rust/dbn/src/decode/dbn/sync.rs index 57c25ee..03d27ed 100644 --- a/rust/dbn/src/decode/dbn/sync.rs +++ b/rust/dbn/src/decode/dbn/sync.rs @@ -529,17 +529,18 @@ pub(crate) fn decode_iso8601(raw: u32) -> Result { mod tests { use std::fs::File; + use rstest::rstest; + use super::*; use crate::{ datasets::XNAS_ITCH, - decode::tests::TEST_DATA_PATH, - encode::{dbn::Encoder, EncodeDbn, EncodeRecord}, - enums::rtype, - record::{ - ErrorMsg, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg, - RecordHeader, StatMsg, TbboMsg, TradeMsg, + decode::{tests::TEST_DATA_PATH, DynReader}, + encode::{ + dbn::Encoder, DbnEncodable, DbnRecordEncoder, DynWriter, EncodeDbn, EncodeRecord, }, - Error, MetadataBuilder, SYMBOL_CSTR_LEN, + rtype, Compression, Error, ErrorMsg, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, + Mbp1Msg, MetadataBuilder, OhlcvMsg, RecordHeader, Result, StatMsg, TbboMsg, TradeMsg, + WithTsOut, SYMBOL_CSTR_LEN, }; #[test] @@ -584,93 +585,131 @@ mod tests { dbg!(&res); assert!(matches!(res, Err(e) if e.contains("a valid date"))); } - - macro_rules! test_dbn_identity { - ($test_name:ident, $record_type:ident, $schema:expr) => { - #[test] - fn $test_name() { - let file_decoder = Decoder::from_file(format!( - "{TEST_DATA_PATH}/test_data.{}.dbn", - $schema.as_str() - )) - .unwrap(); - let file_metadata = file_decoder.metadata().clone(); - let decoded_records = file_decoder.decode_records::<$record_type>().unwrap(); - let mut buffer = Vec::new(); - Encoder::new(&mut buffer, &file_metadata) - .unwrap() - .encode_records(decoded_records.as_slice()) - .unwrap(); - let buf_decoder = Decoder::new(buffer.as_slice()).unwrap(); - assert_eq!(buf_decoder.metadata(), &file_metadata); - assert_eq!(decoded_records, buf_decoder.decode_records().unwrap()); + #[rstest] + #[case::uncompressed_mbo(Schema::Mbo, Compression::None, MboMsg::default())] + #[case::uncompressed_trades(Schema::Trades, Compression::None, TradeMsg::default())] + #[case::uncompressed_tbbo(Schema::Tbbo, Compression::None, TbboMsg::default())] + #[case::uncompressed_mbp1(Schema::Mbp1, Compression::None, Mbp1Msg::default())] + #[case::uncompressed_mbp10(Schema::Mbp10, Compression::None, Mbp10Msg::default())] + #[case::uncompressed_ohlcv1d( + Schema::Ohlcv1D, + Compression::None, + OhlcvMsg::default_for_schema(Schema::Ohlcv1D) + )] + #[case::uncompressed_ohlcv1h( + Schema::Ohlcv1H, + Compression::None, + OhlcvMsg::default_for_schema(Schema::Ohlcv1H) + )] + #[case::uncompressed_ohlcv1m( + Schema::Ohlcv1M, + Compression::None, + OhlcvMsg::default_for_schema(Schema::Ohlcv1M) + )] + #[case::uncompressed_ohlcv1s( + Schema::Ohlcv1S, + Compression::None, + OhlcvMsg::default_for_schema(Schema::Ohlcv1S) + )] + #[case::uncompressed_definitions( + Schema::Definition, + Compression::None, + InstrumentDefMsg::default() + )] + #[case::uncompressed_imbalance(Schema::Imbalance, Compression::None, ImbalanceMsg::default())] + #[case::uncompressed_statistics(Schema::Statistics, Compression::None, StatMsg::default())] + #[case::zstd_mbo(Schema::Mbo, Compression::ZStd, MboMsg::default())] + #[case::zstd_trades(Schema::Trades, Compression::ZStd, TradeMsg::default())] + #[case::zstd_tbbo(Schema::Tbbo, Compression::ZStd, TbboMsg::default())] + #[case::zstd_mbp1(Schema::Mbp1, Compression::ZStd, Mbp1Msg::default())] + #[case::zstd_mbp10(Schema::Mbp10, Compression::ZStd, Mbp10Msg::default())] + #[case::zstd_ohlcv1d( + Schema::Ohlcv1D, + Compression::ZStd, + OhlcvMsg::default_for_schema(Schema::Ohlcv1D) + )] + #[case::zstd_ohlcv1h( + Schema::Ohlcv1H, + Compression::ZStd, + OhlcvMsg::default_for_schema(Schema::Ohlcv1H) + )] + #[case::zstd_ohlcv1m( + Schema::Ohlcv1M, + Compression::ZStd, + OhlcvMsg::default_for_schema(Schema::Ohlcv1M) + )] + #[case::zstd_ohlcv1s( + Schema::Ohlcv1S, + Compression::ZStd, + OhlcvMsg::default_for_schema(Schema::Ohlcv1S) + )] + #[case::zstd_definitions(Schema::Definition, Compression::ZStd, InstrumentDefMsg::default())] + #[case::zstd_imbalance(Schema::Imbalance, Compression::ZStd, ImbalanceMsg::default())] + #[case::zstd_statistics(Schema::Statistics, Compression::ZStd, StatMsg::default())] + fn test_dbn_identity( + #[case] schema: Schema, + #[case] compression: Compression, + #[case] _rec: R, + ) -> Result<()> { + let file_decoder = Decoder::new(DynReader::from_file(format!( + "{TEST_DATA_PATH}/test_data.{schema}.{}", + if compression == Compression::ZStd { + "dbn.zst" + } else { + "dbn" } - }; + ))?)?; + let file_metadata = file_decoder.metadata().clone(); + let decoded_records = file_decoder.decode_records::()?; + let mut buffer = Vec::new(); + + Encoder::new(DynWriter::new(&mut buffer, compression)?, &file_metadata)? + .encode_records(decoded_records.as_slice())?; + let buf_decoder = Decoder::new(DynReader::inferred_with_buffer(buffer.as_slice())?)?; + assert_eq!(buf_decoder.metadata(), &file_metadata); + assert_eq!(decoded_records, buf_decoder.decode_records()?); + Ok(()) } - macro_rules! test_dbn_zstd_identity { - ($test_name:ident, $record_type:ident, $schema:expr) => { - #[test] - fn $test_name() { - let file_decoder = Decoder::from_zstd_file(format!( - "{TEST_DATA_PATH}/test_data.{}.dbn.zst", - $schema.as_str() - )) - .unwrap(); - let file_metadata = file_decoder.metadata().clone(); - let decoded_records = file_decoder.decode_records::<$record_type>().unwrap(); - let mut buffer = Vec::new(); - Encoder::with_zstd(&mut buffer, &file_metadata) - .unwrap() - .encode_records(decoded_records.as_slice()) - .unwrap(); - let buf_decoder = Decoder::with_zstd(buffer.as_slice()).unwrap(); - assert_eq!(buf_decoder.metadata(), &file_metadata); - assert_eq!(decoded_records, buf_decoder.decode_records().unwrap()); - } + + #[test] + fn test_dbn_identity_with_ts_out() -> Result<()> { + let rec1 = WithTsOut { + rec: OhlcvMsg { + hd: RecordHeader::new::>(rtype::OHLCV_1D, 1, 446, 1678284110), + open: 160270000000, + high: 161870000000, + low: 157510000000, + close: 158180000000, + volume: 3170000, + }, + ts_out: 1678486110, }; + let mut rec2 = rec1.clone(); + rec2.rec.hd.instrument_id += 1; + rec2.ts_out = 1678486827; + let mut buffer = Vec::new(); + let mut encoder = DbnRecordEncoder::new(&mut buffer); + encoder.encode_record(&rec1)?; + encoder.encode_record(&rec2)?; + let mut decoder_with = RecordDecoder::new(buffer.as_slice()); + let res1_with = decoder_with + .decode::>()? + .unwrap() + .clone(); + let res2_with = decoder_with + .decode::>()? + .unwrap() + .clone(); + assert_eq!(rec1, res1_with); + assert_eq!(rec2, res2_with); + let mut decoder_without = RecordDecoder::new(buffer.as_slice()); + let res1_without = decoder_without.decode::()?.unwrap().clone(); + let res2_without = decoder_without.decode::()?.unwrap().clone(); + assert_eq!(rec1.rec, res1_without); + assert_eq!(rec2.rec, res2_without); + Ok(()) } - test_dbn_identity!(test_dbn_identity_mbo, MboMsg, Schema::Mbo); - test_dbn_zstd_identity!(test_dbn_zstd_identity_mbo, MboMsg, Schema::Mbo); - test_dbn_identity!(test_dbn_identity_mbp1, Mbp1Msg, Schema::Mbp1); - test_dbn_zstd_identity!(test_dbn_zstd_identity_mbp1, Mbp1Msg, Schema::Mbp1); - test_dbn_identity!(test_dbn_identity_mbp10, Mbp10Msg, Schema::Mbp10); - test_dbn_zstd_identity!(test_dbn_zstd_identity_mbp10, Mbp10Msg, Schema::Mbp10); - test_dbn_identity!(test_dbn_identity_ohlcv1d, OhlcvMsg, Schema::Ohlcv1D); - test_dbn_zstd_identity!(test_dbn_zstd_identity_ohlcv1d, OhlcvMsg, Schema::Ohlcv1D); - test_dbn_identity!(test_dbn_identity_ohlcv1h, OhlcvMsg, Schema::Ohlcv1H); - test_dbn_zstd_identity!(test_dbn_zstd_identity_ohlcv1h, OhlcvMsg, Schema::Ohlcv1H); - test_dbn_identity!(test_dbn_identity_ohlcv1m, OhlcvMsg, Schema::Ohlcv1M); - test_dbn_zstd_identity!(test_dbn_zstd_identity_ohlcv1m, OhlcvMsg, Schema::Ohlcv1M); - test_dbn_identity!(test_dbn_identity_ohlcv1s, OhlcvMsg, Schema::Ohlcv1S); - test_dbn_zstd_identity!(test_dbn_zstd_identity_ohlcv1s, OhlcvMsg, Schema::Ohlcv1S); - test_dbn_identity!(test_dbn_identity_tbbo, TbboMsg, Schema::Tbbo); - test_dbn_zstd_identity!(test_dbn_zstd_identity_tbbo, TbboMsg, Schema::Tbbo); - test_dbn_identity!(test_dbn_identity_trades, TradeMsg, Schema::Trades); - test_dbn_zstd_identity!(test_dbn_zstd_identity_trades, TradeMsg, Schema::Trades); - test_dbn_identity!( - test_dbn_identity_instrument_def, - InstrumentDefMsg, - Schema::Definition - ); - test_dbn_zstd_identity!( - test_dbn_zstd_identity_instrument_def, - InstrumentDefMsg, - Schema::Definition - ); - test_dbn_identity!(test_dbn_identity_imbalance, ImbalanceMsg, Schema::Imbalance); - test_dbn_zstd_identity!( - test_dbn_zstd_identity_imbalance, - ImbalanceMsg, - Schema::Imbalance - ); - test_dbn_identity!(test_dbn_identity_statistics, StatMsg, Schema::Statistics); - test_dbn_zstd_identity!( - test_dbn_zstd_identity_statistics, - StatMsg, - Schema::Statistics - ); - #[test] fn test_decode_record_ref() { let mut buffer = Vec::new(); diff --git a/rust/dbn/src/encode/dbn/async.rs b/rust/dbn/src/encode/dbn/async.rs index 48c7197..cb05a91 100644 --- a/rust/dbn/src/encode/dbn/async.rs +++ b/rust/dbn/src/encode/dbn/async.rs @@ -68,6 +68,21 @@ where } } +impl Encoder> +where + W: io::AsyncWriteExt + Unpin, +{ + /// Creates a new async [`Encoder`] that will Zstandard compress the DBN data + /// written to `writer`. + /// + /// # Errors + /// This function will return an error if it fails to encode `metadata` to + /// `writer`. + pub async fn with_zstd(writer: W, metadata: &Metadata) -> Result { + Self::new(ZstdEncoder::new(writer), metadata).await + } +} + /// An async encoder of DBN records. pub struct RecordEncoder where diff --git a/rust/dbn/src/lib.rs b/rust/dbn/src/lib.rs index 01cd344..4129fd6 100644 --- a/rust/dbn/src/lib.rs +++ b/rust/dbn/src/lib.rs @@ -60,8 +60,9 @@ pub use crate::{ metadata::{MappingInterval, Metadata, MetadataBuilder, SymbolMapping}, publishers::{Dataset, Publisher, Venue}, record::{ - ErrorMsg, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg, StatMsg, - StatusMsg, SymbolMappingMsg, SystemMsg, TbboMsg, TradeMsg, WithTsOut, + BidAskPair, ErrorMsg, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg, + RecordHeader, StatMsg, StatusMsg, SymbolMappingMsg, SystemMsg, TbboMsg, TradeMsg, + WithTsOut, }, record_enum::{RecordEnum, RecordRefEnum}, record_ref::RecordRef, diff --git a/rust/dbn/src/record.rs b/rust/dbn/src/record.rs index edf4489..72e7049 100644 --- a/rust/dbn/src/record.rs +++ b/rust/dbn/src/record.rs @@ -1,6 +1,9 @@ //! Market data types for encoding different Databento [`Schema`](crate::enums::Schema)s //! and conversion functions. +mod impl_default; +mod methods; + use std::{ffi::CStr, mem, os::raw::c_char, ptr::NonNull, slice}; // Dummy derive macro to get around `cfg_attr` incompatibility of several @@ -106,7 +109,7 @@ pub struct MboMsg { /// A level. #[repr(C)] -#[derive(Clone, Debug, JsonSerialize, PartialEq, Eq, Default)] +#[derive(Clone, Debug, JsonSerialize, PartialEq, Eq)] #[cfg_attr(feature = "trivial_copy", derive(Copy))] #[cfg_attr( feature = "python", @@ -846,569 +849,6 @@ pub trait HasRType { } } -impl RecordHeader { - /// The multiplier for converting the `length` field to the number of bytes. - pub const LENGTH_MULTIPLIER: usize = 4; - - /// Creates a new `RecordHeader`. `R` and `rtype` should be compatible. - pub const fn new( - rtype: u8, - publisher_id: u16, - instrument_id: u32, - ts_event: u64, - ) -> Self { - Self { - length: (mem::size_of::() / Self::LENGTH_MULTIPLIER) as u8, - rtype, - publisher_id, - instrument_id, - ts_event, - } - } - - /// Returns the size of the **entire** record in bytes. The size of a `RecordHeader` - /// is constant. - pub const fn record_size(&self) -> usize { - self.length as usize * Self::LENGTH_MULTIPLIER - } - - /// Tries to convert the raw record type into an enum. - /// - /// # Errors - /// This function returns an error if the `rtype` field does not - /// contain a valid, known [`RType`]. - pub fn rtype(&self) -> crate::Result { - RType::try_from(self.rtype) - .map_err(|_| Error::conversion::(format!("{:#02X}", self.rtype))) - } - - /// Tries to convert the raw `publisher_id` into an enum which is useful for - /// exhaustive pattern matching. - /// - /// # Errors - /// This function returns an error if the `publisher_id` does not correspond with - /// any known [`Publisher`]. - pub fn publisher(&self) -> crate::Result { - Publisher::try_from(self.publisher_id) - .map_err(|_| Error::conversion::(format!("{}", self.publisher_id))) - } - - /// Parses the raw matching-engine-received timestamp into a datetime. Returns - /// `None` if `ts_event` contains the sentinel for a null timestamp. - pub fn ts_event(&self) -> Option { - if self.ts_event == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_event as i128).unwrap()) - } - } -} - -impl MboMsg { - /// Tries to convert the raw order side to an enum. - /// - /// # Errors - /// This function returns an error if the `side` field does not - /// contain a valid [`Side`]. - pub fn side(&self) -> crate::Result { - Side::try_from(self.side as u8) - .map_err(|_| Error::conversion::(format!("{:#02X}", self.side as u8))) - } - - /// Tries to convert the raw event action to an enum. - /// - /// # Errors - /// This function returns an error if the `action` field does not - /// contain a valid [`Action`]. - pub fn action(&self) -> crate::Result { - Action::try_from(self.action as u8) - .map_err(|_| Error::conversion::(format!("{:#02X}", self.action as u8))) - } - - /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` - /// if `ts_recv` contains the sentinel for a null timestamp. - pub fn ts_recv(&self) -> Option { - if self.ts_recv == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) - } - } - - /// Parses the raw `ts_in_delta`—the delta of `ts_recv - ts_exchange_send`—into a duration. - pub fn ts_in_delta(&self) -> time::Duration { - time::Duration::new(0, self.ts_in_delta) - } -} - -impl TradeMsg { - /// Tries to convert the raw order side to an enum. - /// - /// # Errors - /// This function returns an error if the `side` field does not - /// contain a valid [`Side`]. - pub fn side(&self) -> crate::Result { - Side::try_from(self.side as u8) - .map_err(|_| Error::conversion::(format!("{:#02X}", self.side as u8))) - } - - /// Tries to convert the raw event action to an enum. - /// - /// # Errors - /// This function returns an error if the `action` field does not - /// contain a valid [`Action`]. - pub fn action(&self) -> crate::Result { - Action::try_from(self.action as u8) - .map_err(|_| Error::conversion::(format!("{:#02X}", self.action as u8))) - } - - /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` - /// if `ts_recv` contains the sentinel for a null timestamp. - pub fn ts_recv(&self) -> Option { - if self.ts_recv == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) - } - } - - /// Parses the raw `ts_in_delta`—the delta of `ts_recv - ts_exchange_send`—into a duration. - pub fn ts_in_delta(&self) -> time::Duration { - time::Duration::new(0, self.ts_in_delta) - } -} - -impl Mbp1Msg { - /// Tries to convert the raw `side` to an enum. - /// - /// # Errors - /// This function returns an error if the `side` field does not - /// contain a valid [`Side`]. - pub fn side(&self) -> crate::Result { - Side::try_from(self.side as u8) - .map_err(|_| Error::conversion::(format!("{:#02X}", self.side as u8))) - } - - /// Tries to convert the raw event action to an enum. - /// - /// # Errors - /// This function returns an error if the `action` field does not - /// contain a valid [`Action`]. - pub fn action(&self) -> crate::Result { - Action::try_from(self.action as u8) - .map_err(|_| Error::conversion::(format!("{:#02X}", self.action as u8))) - } - - /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` - /// if `ts_recv` contains the sentinel for a null timestamp. - pub fn ts_recv(&self) -> Option { - if self.ts_recv == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) - } - } - - /// Parses the raw `ts_in_delta`—the delta of `ts_recv - ts_exchange_send`—into a duration. - pub fn ts_in_delta(&self) -> time::Duration { - time::Duration::new(0, self.ts_in_delta) - } -} - -impl Mbp10Msg { - /// Tries to convert the raw `side` to an enum. - /// - /// # Errors - /// This function returns an error if the `side` field does not - /// contain a valid [`Side`]. - pub fn side(&self) -> Result { - Side::try_from(self.side as u8) - .map_err(|_| Error::conversion::(format!("{:#02X}", self.side as u8))) - } - - /// Tries to convert the raw event action to an enum. - /// - /// # Errors - /// This function returns an error if the `action` field does not - /// contain a valid [`Action`]. - pub fn action(&self) -> Result { - Action::try_from(self.action as u8) - .map_err(|_| Error::conversion::(format!("{:#02X}", self.action as u8))) - } - - /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` - /// if `ts_recv` contains the sentinel for a null timestamp. - pub fn ts_recv(&self) -> Option { - if self.ts_recv == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) - } - } - - /// Parses the raw `ts_in_delta`—the delta of `ts_recv - ts_exchange_send`—into a duration. - pub fn ts_in_delta(&self) -> time::Duration { - time::Duration::new(0, self.ts_in_delta) - } -} - -impl StatusMsg { - /// Returns `group` as a `&str`. - /// - /// # Errors - /// This function returns an error if `group` contains invalid UTF-8. - pub fn group(&self) -> Result<&str> { - c_chars_to_str(&self.group) - } -} - -impl InstrumentDefMsg { - /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` - /// if `ts_recv` contains the sentinel for a null timestamp. - pub fn ts_recv(&self) -> Option { - if self.ts_recv == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) - } - } - - /// Parses the raw last eligible trade time into a datetime. Returns `None` if - /// `expiration` contains the sentinel for a null timestamp. - pub fn expiration(&self) -> Option { - if self.expiration == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.expiration as i128).unwrap()) - } - } - - /// Parses the raw time of instrument action into a datetime. Returns `None` if - /// `activation` contains the sentinel for a null timestamp. - pub fn activation(&self) -> Option { - if self.activation == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.activation as i128).unwrap()) - } - } - - /// Returns currency used for price fields as a `&str`. - /// - /// # Errors - /// This function returns an error if `currency` contains invalid UTF-8. - pub fn currency(&self) -> Result<&str> { - c_chars_to_str(&self.currency) - } - - /// Returns currency used for settlement as a `&str`. - /// - /// # Errors - /// This function returns an error if `settl_currency` contains invalid UTF-8. - pub fn settl_currency(&self) -> Result<&str> { - c_chars_to_str(&self.settl_currency) - } - - /// Returns the strategy type of the spread as a `&str`. - /// - /// # Errors - /// This function returns an error if `secsubtype` contains invalid UTF-8. - pub fn secsubtype(&self) -> Result<&str> { - c_chars_to_str(&self.secsubtype) - } - - /// Returns the instrument raw symbol assigned by the publisher as a `&str`. - /// - /// # Errors - /// This function returns an error if `raw_symbol` contains invalid UTF-8. - pub fn raw_symbol(&self) -> Result<&str> { - c_chars_to_str(&self.raw_symbol) - } - - /// Returns exchange used to identify the instrument as a `&str`. - /// - /// # Errors - /// This function returns an error if `exchange` contains invalid UTF-8. - pub fn exchange(&self) -> Result<&str> { - c_chars_to_str(&self.exchange) - } - - /// Returns the underlying asset code (product code) of the instrument as a `&str`. - /// - /// # Errors - /// This function returns an error if `asset` contains invalid UTF-8. - pub fn asset(&self) -> Result<&str> { - c_chars_to_str(&self.asset) - } - - /// Returns the ISO standard instrument categorization code as a `&str`. - /// - /// # Errors - /// This function returns an error if `cfi` contains invalid UTF-8. - pub fn cfi(&self) -> Result<&str> { - c_chars_to_str(&self.cfi) - } - - /// Returns the type of the strument, e.g. FUT for future or future spread as - /// a `&str`. - /// - /// # Errors - /// This function returns an error if `security_type` contains invalid UTF-8. - pub fn security_type(&self) -> Result<&str> { - c_chars_to_str(&self.security_type) - } - - /// Returns the unit of measure for the instrument's original contract size, e.g. - /// USD or LBS, as a `&str`. - /// - /// # Errors - /// This function returns an error if `unit_of_measure` contains invalid UTF-8. - pub fn unit_of_measure(&self) -> Result<&str> { - c_chars_to_str(&self.unit_of_measure) - } - - /// Returns the symbol of the first underlying instrument as a `&str`. - /// - /// # Errors - /// This function returns an error if `underlying` contains invalid UTF-8. - pub fn underlying(&self) -> Result<&str> { - c_chars_to_str(&self.underlying) - } - - /// Returns the currency of [`strike_price`](Self::strike_price) as a `&str`. - /// - /// # Errors - /// This function returns an error if `strike_price_currency` contains invalid UTF-8. - pub fn strike_price_currency(&self) -> Result<&str> { - c_chars_to_str(&self.strike_price_currency) - } - - /// Returns the security group code of the instrumnet as a `&str`. - /// - /// # Errors - /// This function returns an error if `group` contains invalid UTF-8. - pub fn group(&self) -> Result<&str> { - c_chars_to_str(&self.group) - } - - /// Tries to convert the raw classification of the instrument to an enum. - /// - /// # Errors - /// This function returns an error if the `instrument_class` field does not - /// contain a valid [`InstrumentClass`]. - pub fn instrument_class(&self) -> Result { - InstrumentClass::try_from(self.instrument_class as u8).map_err(|_| { - Error::conversion::(format!("{:#02X}", self.instrument_class as u8)) - }) - } - - /// Tries to convert the raw matching algorithm used for the instrument to an enum. - /// - /// # Errors - /// This function returns an error if the `match_algorithm` field does not - /// contain a valid [`MatchAlgorithm`]. - pub fn match_algorithm(&self) -> Result { - MatchAlgorithm::try_from(self.match_algorithm as u8).map_err(|_| { - Error::conversion::(format!("{:#02X}", self.match_algorithm as u8)) - }) - } -} - -impl ImbalanceMsg { - /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` - /// if `ts_recv` contains the sentinel for a null timestamp. - pub fn ts_recv(&self) -> Option { - if self.ts_recv == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) - } - } -} - -impl StatMsg { - /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` - /// if `ts_recv` contains the sentinel for a null timestamp. - pub fn ts_recv(&self) -> Option { - if self.ts_recv == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) - } - } - - /// Parses the raw reference timestamp of the statistic value into a datetime. - /// Returns `None` if `ts_ref` contains the sentinel for a null timestamp. - pub fn ts_ref(&self) -> Option { - if self.ts_ref == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_ref as i128).unwrap()) - } - } - - /// Tries to convert the raw type of the statistic value to an enum. - /// - /// # Errors - /// This function returns an error if the `stat_type` field does not - /// contain a valid [`StatType`]. - pub fn stat_type(&self) -> Result { - StatType::try_from(self.stat_type) - .map_err(|_| Error::conversion::(format!("{:02X}", self.stat_type))) - } - - /// Tries to convert the raw `update_action` to an enum. - /// - /// # Errors - /// This function returns an error if the `update_action` field does not - /// contain a valid [`StatUpdateAction`]. - pub fn update_action(&self) -> Result { - StatUpdateAction::try_from(self.update_action).map_err(|_| { - Error::conversion::(format!("{:02X}", self.update_action)) - }) - } -} - -impl ErrorMsg { - /// Creates a new `ErrorMsg`. - /// - /// # Errors - /// This function returns an error if `msg` is too long. - pub fn new(ts_event: u64, msg: &str) -> Self { - let mut error = Self { - hd: RecordHeader::new::(rtype::ERROR, 0, 0, ts_event), - err: [0; 64], - }; - // leave at least one null byte - for (i, byte) in msg.as_bytes().iter().take(error.err.len() - 1).enumerate() { - error.err[i] = *byte as c_char; - } - error - } - - /// Returns `err` as a `&str`. - /// - /// # Errors - /// This function returns an error if `err` contains invalid UTF-8. - pub fn err(&self) -> Result<&str> { - c_chars_to_str(&self.err) - } -} - -impl SymbolMappingMsg { - /// Creates a new `SymbolMappingMsg`. - /// - /// # Errors - /// This function returns an error if `stype_in_symbol` or `stype_out_symbol` - /// contain more than maximum number of characters of 21. - pub fn new( - instrument_id: u32, - ts_event: u64, - stype_in_symbol: &str, - stype_out_symbol: &str, - start_ts: u64, - end_ts: u64, - ) -> crate::Result { - // symbol mappings aren't publisher-specific - Ok(Self { - hd: RecordHeader::new::(rtype::SYMBOL_MAPPING, 0, instrument_id, ts_event), - stype_in_symbol: str_to_c_chars(stype_in_symbol)?, - stype_out_symbol: str_to_c_chars(stype_out_symbol)?, - _dummy: Default::default(), - start_ts, - end_ts, - }) - } - - /// Returns the input symbol as a `&str`. - /// - /// # Errors - /// This function returns an error if `stype_in_symbol` contains invalid UTF-8. - pub fn stype_in_symbol(&self) -> Result<&str> { - c_chars_to_str(&self.stype_in_symbol) - } - - /// Returns the output symbol as a `&str`. - /// - /// # Errors - /// This function returns an error if `stype_out_symbol` contains invalid UTF-8. - pub fn stype_out_symbol(&self) -> Result<&str> { - c_chars_to_str(&self.stype_out_symbol) - } - - /// Parses the raw start of the mapping interval into a datetime. Returns `None` if - /// `start_ts` contains the sentinel for a null timestamp. - pub fn start_ts(&self) -> Option { - if self.start_ts == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.start_ts as i128).unwrap()) - } - } - - /// Parses the raw end of the mapping interval into a datetime. Returns `None` if - /// `end_ts` contains the sentinel for a null timestamp. - pub fn end_ts(&self) -> Option { - if self.end_ts == crate::UNDEF_TIMESTAMP { - None - } else { - // u64::MAX is within maximum allowable range - Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.end_ts as i128).unwrap()) - } - } -} - -impl SystemMsg { - const HEARTBEAT: &str = "Heartbeat"; - - /// Creates a new `SystemMsg`. - /// - /// # Errors - /// This function returns an error if `msg` is too long. - pub fn new(ts_event: u64, msg: &str) -> Result { - Ok(Self { - hd: RecordHeader::new::(rtype::SYSTEM, 0, 0, ts_event), - msg: str_to_c_chars(msg)?, - }) - } - - /// Creates a new heartbeat `SystemMsg`. - pub fn heartbeat(ts_event: u64) -> Self { - Self { - hd: RecordHeader::new::(rtype::SYSTEM, 0, 0, ts_event), - msg: str_to_c_chars(Self::HEARTBEAT).unwrap(), - } - } - - /// Checks whether the message is a heartbeat from the gateway. - pub fn is_heartbeat(&self) -> bool { - self.msg() - .map(|msg| msg == Self::HEARTBEAT) - .unwrap_or_default() - } - - /// Returns the message from the Databento Live Subscription Gateway (LSG) as - /// a `&str`. - /// - /// # Errors - /// This function returns an error if `msg` contains invalid UTF-8. - pub fn msg(&self) -> Result<&str> { - c_chars_to_str(&self.msg) - } -} - /// Wrapper object for records that include the live gateway send timestamp (`ts_out`). #[repr(C)] #[derive(Clone, Debug, PartialEq, Eq)] @@ -1420,45 +860,6 @@ pub struct WithTsOut { pub ts_out: u64, } -impl HasRType for WithTsOut { - fn has_rtype(rtype: u8) -> bool { - T::has_rtype(rtype) - } - - fn header(&self) -> &RecordHeader { - self.rec.header() - } - - fn header_mut(&mut self) -> &mut RecordHeader { - self.rec.header_mut() - } -} - -impl AsRef<[u8]> for WithTsOut -where - T: HasRType + AsRef<[u8]>, -{ - fn as_ref(&self) -> &[u8] { - unsafe { as_u8_slice(self) } - } -} - -impl WithTsOut { - /// Creates a new record with `ts_out`. Updates the `length` property in - /// [`RecordHeader`] to ensure the additional field is accounted for. - pub fn new(rec: T, ts_out: u64) -> Self { - let mut res = Self { rec, ts_out }; - res.header_mut().length = (mem::size_of_val(&res) / RecordHeader::LENGTH_MULTIPLIER) as u8; - res - } - - /// Parses the raw live gateway send timestamp into a datetime. - pub fn ts_out(&self) -> time::OffsetDateTime { - // u64::MAX is within maximum allowable range - time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_out as i128).unwrap() - } -} - /// Provides a _relatively safe_ method for converting a reference to /// [`RecordHeader`] to a struct beginning with the header. Because it accepts a /// reference, the lifetime of the returned reference is tied to the input. This diff --git a/rust/dbn/src/record/impl_default.rs b/rust/dbn/src/record/impl_default.rs new file mode 100644 index 0000000..f66484c --- /dev/null +++ b/rust/dbn/src/record/impl_default.rs @@ -0,0 +1,261 @@ +use crate::{Schema, UNDEF_ORDER_SIZE, UNDEF_PRICE, UNDEF_STAT_QUANTITY, UNDEF_TIMESTAMP}; + +use super::*; + +impl RecordHeader { + /// Creates a new `RecordHeader` with `rtype` and `length` set + /// for `R` while the other fields are set to their defaults. + pub const fn default(rtype: u8) -> Self { + Self::new::(rtype, 0, 0, UNDEF_TIMESTAMP) + } +} + +impl Default for MboMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::MBO), + order_id: 0, + price: UNDEF_PRICE, + size: UNDEF_ORDER_SIZE, + flags: 0, + channel_id: 0, + action: 0, + side: 0, + ts_recv: UNDEF_TIMESTAMP, + ts_in_delta: 0, + sequence: 0, + } + } +} + +impl Default for BidAskPair { + fn default() -> Self { + Self { + bid_px: UNDEF_PRICE, + ask_px: UNDEF_PRICE, + bid_sz: 0, + ask_sz: 0, + bid_ct: 0, + ask_ct: 0, + } + } +} + +impl Default for TradeMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::MBP_0), + price: UNDEF_PRICE, + size: UNDEF_ORDER_SIZE, + action: 0, + side: 0, + flags: 0, + depth: 0, + ts_recv: UNDEF_TIMESTAMP, + ts_in_delta: 0, + sequence: 0, + } + } +} + +impl Default for Mbp1Msg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::MBP_1), + price: UNDEF_PRICE, + size: UNDEF_ORDER_SIZE, + action: 0, + side: 0, + flags: 0, + depth: 0, + ts_recv: UNDEF_TIMESTAMP, + ts_in_delta: 0, + sequence: 0, + levels: Default::default(), + } + } +} + +impl Default for Mbp10Msg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::MBP_10), + price: UNDEF_PRICE, + size: UNDEF_ORDER_SIZE, + action: 0, + side: 0, + flags: 0, + depth: 0, + ts_recv: UNDEF_TIMESTAMP, + ts_in_delta: 0, + sequence: 0, + levels: Default::default(), + } + } +} + +impl OhlcvMsg { + /// Creates a new default OHLCV bar for the given `schema`. + pub fn default_for_schema(schema: Schema) -> Self { + Self { + hd: RecordHeader::default::(RType::from(schema) as u8), + open: UNDEF_PRICE, + high: UNDEF_PRICE, + low: UNDEF_PRICE, + close: UNDEF_PRICE, + volume: 0, + } + } +} + +impl Default for InstrumentDefMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::INSTRUMENT_DEF), + ts_recv: UNDEF_TIMESTAMP, + min_price_increment: UNDEF_PRICE, + display_factor: UNDEF_PRICE, + expiration: UNDEF_TIMESTAMP, + activation: UNDEF_TIMESTAMP, + high_limit_price: UNDEF_PRICE, + low_limit_price: UNDEF_PRICE, + max_price_variation: UNDEF_PRICE, + trading_reference_price: UNDEF_PRICE, + unit_of_measure_qty: UNDEF_PRICE, + min_price_increment_amount: UNDEF_PRICE, + price_ratio: UNDEF_PRICE, + inst_attrib_value: i32::MAX, + underlying_id: 0, + raw_instrument_id: 0, + market_depth_implied: i32::MAX, + market_depth: i32::MAX, + market_segment_id: u32::MAX, + max_trade_vol: u32::MAX, + min_lot_size: i32::MAX, + min_lot_size_block: i32::MAX, + min_lot_size_round_lot: i32::MAX, + min_trade_vol: u32::MAX, + _reserved2: Default::default(), + contract_multiplier: i32::MAX, + decay_quantity: i32::MAX, + original_contract_size: i32::MAX, + _reserved3: Default::default(), + trading_reference_date: u16::MAX, + appl_id: i16::MAX, + maturity_year: u16::MAX, + decay_start_date: u16::MAX, + channel_id: u16::MAX, + currency: Default::default(), + settl_currency: Default::default(), + secsubtype: Default::default(), + raw_symbol: Default::default(), + group: Default::default(), + exchange: Default::default(), + asset: Default::default(), + cfi: Default::default(), + security_type: Default::default(), + unit_of_measure: Default::default(), + underlying: Default::default(), + strike_price_currency: Default::default(), + instrument_class: 0, + _reserved4: Default::default(), + strike_price: UNDEF_PRICE, + _reserved5: Default::default(), + match_algorithm: MatchAlgorithm::Fifo as c_char, + md_security_trading_status: u8::MAX, + main_fraction: u8::MAX, + price_display_format: u8::MAX, + settl_price_type: u8::MAX, + sub_fraction: u8::MAX, + underlying_product: u8::MAX, + security_update_action: SecurityUpdateAction::Add, + maturity_month: u8::MAX, + maturity_day: u8::MAX, + maturity_week: u8::MAX, + user_defined_instrument: UserDefinedInstrument::No, + contract_multiplier_unit: i8::MAX, + flow_schedule_type: i8::MAX, + tick_rule: u8::MAX, + _dummy: Default::default(), + } + } +} + +impl Default for ImbalanceMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::IMBALANCE), + ts_recv: UNDEF_TIMESTAMP, + ref_price: UNDEF_PRICE, + auction_time: UNDEF_TIMESTAMP, + cont_book_clr_price: UNDEF_PRICE, + auct_interest_clr_price: UNDEF_PRICE, + ssr_filling_price: UNDEF_PRICE, + ind_match_price: UNDEF_PRICE, + upper_collar: UNDEF_PRICE, + lower_collar: UNDEF_PRICE, + paired_qty: UNDEF_ORDER_SIZE, + total_imbalance_qty: UNDEF_ORDER_SIZE, + market_imbalance_qty: UNDEF_ORDER_SIZE, + unpaired_qty: UNDEF_ORDER_SIZE, + auction_type: b'~' as c_char, + side: Side::None as c_char, + auction_status: 0, + freeze_status: 0, + num_extensions: 0, + unpaired_side: 0, + significant_imbalance: b'~' as c_char, + _dummy: Default::default(), + } + } +} + +impl Default for StatMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::STATISTICS), + ts_recv: UNDEF_TIMESTAMP, + ts_ref: UNDEF_TIMESTAMP, + price: UNDEF_PRICE, + quantity: UNDEF_STAT_QUANTITY, + sequence: 0, + ts_in_delta: 0, + stat_type: 0, + channel_id: 0, + update_action: StatUpdateAction::New as u8, + stat_flags: 0, + _dummy: Default::default(), + } + } +} + +impl Default for ErrorMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::ERROR), + err: [0; 64], + } + } +} + +impl Default for SymbolMappingMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::SYMBOL_MAPPING), + stype_in_symbol: [0; SYMBOL_CSTR_LEN], + stype_out_symbol: [0; SYMBOL_CSTR_LEN], + _dummy: Default::default(), + start_ts: UNDEF_TIMESTAMP, + end_ts: UNDEF_TIMESTAMP, + } + } +} + +impl Default for SystemMsg { + fn default() -> Self { + Self { + hd: RecordHeader::default::(rtype::SYSTEM), + msg: [0; 64], + } + } +} diff --git a/rust/dbn/src/record/methods.rs b/rust/dbn/src/record/methods.rs new file mode 100644 index 0000000..fd0dcf7 --- /dev/null +++ b/rust/dbn/src/record/methods.rs @@ -0,0 +1,603 @@ +use super::*; + +impl RecordHeader { + /// The multiplier for converting the `length` field to the number of bytes. + pub const LENGTH_MULTIPLIER: usize = 4; + + /// Creates a new `RecordHeader`. `R` and `rtype` should be compatible. + pub const fn new( + rtype: u8, + publisher_id: u16, + instrument_id: u32, + ts_event: u64, + ) -> Self { + Self { + length: (mem::size_of::() / Self::LENGTH_MULTIPLIER) as u8, + rtype, + publisher_id, + instrument_id, + ts_event, + } + } + + /// Returns the size of the **entire** record in bytes. The size of a `RecordHeader` + /// is constant. + pub const fn record_size(&self) -> usize { + self.length as usize * Self::LENGTH_MULTIPLIER + } + + /// Tries to convert the raw record type into an enum. + /// + /// # Errors + /// This function returns an error if the `rtype` field does not + /// contain a valid, known [`RType`]. + pub fn rtype(&self) -> crate::Result { + RType::try_from(self.rtype) + .map_err(|_| Error::conversion::(format!("{:#02X}", self.rtype))) + } + + /// Tries to convert the raw `publisher_id` into an enum which is useful for + /// exhaustive pattern matching. + /// + /// # Errors + /// This function returns an error if the `publisher_id` does not correspond with + /// any known [`Publisher`]. + pub fn publisher(&self) -> crate::Result { + Publisher::try_from(self.publisher_id) + .map_err(|_| Error::conversion::(format!("{}", self.publisher_id))) + } + + /// Parses the raw matching-engine-received timestamp into a datetime. Returns + /// `None` if `ts_event` contains the sentinel for a null timestamp. + pub fn ts_event(&self) -> Option { + if self.ts_event == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_event as i128).unwrap()) + } + } +} + +impl MboMsg { + /// Tries to convert the raw order side to an enum. + /// + /// # Errors + /// This function returns an error if the `side` field does not + /// contain a valid [`Side`]. + pub fn side(&self) -> crate::Result { + Side::try_from(self.side as u8) + .map_err(|_| Error::conversion::(format!("{:#02X}", self.side as u8))) + } + + /// Tries to convert the raw event action to an enum. + /// + /// # Errors + /// This function returns an error if the `action` field does not + /// contain a valid [`Action`]. + pub fn action(&self) -> crate::Result { + Action::try_from(self.action as u8) + .map_err(|_| Error::conversion::(format!("{:#02X}", self.action as u8))) + } + + /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` + /// if `ts_recv` contains the sentinel for a null timestamp. + pub fn ts_recv(&self) -> Option { + if self.ts_recv == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) + } + } + + /// Parses the raw `ts_in_delta`—the delta of `ts_recv - ts_exchange_send`—into a duration. + pub fn ts_in_delta(&self) -> time::Duration { + time::Duration::new(0, self.ts_in_delta) + } +} + +impl TradeMsg { + /// Tries to convert the raw order side to an enum. + /// + /// # Errors + /// This function returns an error if the `side` field does not + /// contain a valid [`Side`]. + pub fn side(&self) -> crate::Result { + Side::try_from(self.side as u8) + .map_err(|_| Error::conversion::(format!("{:#02X}", self.side as u8))) + } + + /// Tries to convert the raw event action to an enum. + /// + /// # Errors + /// This function returns an error if the `action` field does not + /// contain a valid [`Action`]. + pub fn action(&self) -> crate::Result { + Action::try_from(self.action as u8) + .map_err(|_| Error::conversion::(format!("{:#02X}", self.action as u8))) + } + + /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` + /// if `ts_recv` contains the sentinel for a null timestamp. + pub fn ts_recv(&self) -> Option { + if self.ts_recv == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) + } + } + + /// Parses the raw `ts_in_delta`—the delta of `ts_recv - ts_exchange_send`—into a duration. + pub fn ts_in_delta(&self) -> time::Duration { + time::Duration::new(0, self.ts_in_delta) + } +} + +impl Mbp1Msg { + /// Tries to convert the raw `side` to an enum. + /// + /// # Errors + /// This function returns an error if the `side` field does not + /// contain a valid [`Side`]. + pub fn side(&self) -> crate::Result { + Side::try_from(self.side as u8) + .map_err(|_| Error::conversion::(format!("{:#02X}", self.side as u8))) + } + + /// Tries to convert the raw event action to an enum. + /// + /// # Errors + /// This function returns an error if the `action` field does not + /// contain a valid [`Action`]. + pub fn action(&self) -> crate::Result { + Action::try_from(self.action as u8) + .map_err(|_| Error::conversion::(format!("{:#02X}", self.action as u8))) + } + + /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` + /// if `ts_recv` contains the sentinel for a null timestamp. + pub fn ts_recv(&self) -> Option { + if self.ts_recv == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) + } + } + + /// Parses the raw `ts_in_delta`—the delta of `ts_recv - ts_exchange_send`—into a duration. + pub fn ts_in_delta(&self) -> time::Duration { + time::Duration::new(0, self.ts_in_delta) + } +} + +impl Mbp10Msg { + /// Tries to convert the raw `side` to an enum. + /// + /// # Errors + /// This function returns an error if the `side` field does not + /// contain a valid [`Side`]. + pub fn side(&self) -> Result { + Side::try_from(self.side as u8) + .map_err(|_| Error::conversion::(format!("{:#02X}", self.side as u8))) + } + + /// Tries to convert the raw event action to an enum. + /// + /// # Errors + /// This function returns an error if the `action` field does not + /// contain a valid [`Action`]. + pub fn action(&self) -> Result { + Action::try_from(self.action as u8) + .map_err(|_| Error::conversion::(format!("{:#02X}", self.action as u8))) + } + + /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` + /// if `ts_recv` contains the sentinel for a null timestamp. + pub fn ts_recv(&self) -> Option { + if self.ts_recv == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) + } + } + + /// Parses the raw `ts_in_delta`—the delta of `ts_recv - ts_exchange_send`—into a duration. + pub fn ts_in_delta(&self) -> time::Duration { + time::Duration::new(0, self.ts_in_delta) + } +} + +impl StatusMsg { + /// Returns `group` as a `&str`. + /// + /// # Errors + /// This function returns an error if `group` contains invalid UTF-8. + pub fn group(&self) -> Result<&str> { + c_chars_to_str(&self.group) + } +} + +impl InstrumentDefMsg { + /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` + /// if `ts_recv` contains the sentinel for a null timestamp. + pub fn ts_recv(&self) -> Option { + if self.ts_recv == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) + } + } + + /// Parses the raw last eligible trade time into a datetime. Returns `None` if + /// `expiration` contains the sentinel for a null timestamp. + pub fn expiration(&self) -> Option { + if self.expiration == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.expiration as i128).unwrap()) + } + } + + /// Parses the raw time of instrument action into a datetime. Returns `None` if + /// `activation` contains the sentinel for a null timestamp. + pub fn activation(&self) -> Option { + if self.activation == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.activation as i128).unwrap()) + } + } + + /// Returns currency used for price fields as a `&str`. + /// + /// # Errors + /// This function returns an error if `currency` contains invalid UTF-8. + pub fn currency(&self) -> Result<&str> { + c_chars_to_str(&self.currency) + } + + /// Returns currency used for settlement as a `&str`. + /// + /// # Errors + /// This function returns an error if `settl_currency` contains invalid UTF-8. + pub fn settl_currency(&self) -> Result<&str> { + c_chars_to_str(&self.settl_currency) + } + + /// Returns the strategy type of the spread as a `&str`. + /// + /// # Errors + /// This function returns an error if `secsubtype` contains invalid UTF-8. + pub fn secsubtype(&self) -> Result<&str> { + c_chars_to_str(&self.secsubtype) + } + + /// Returns the instrument raw symbol assigned by the publisher as a `&str`. + /// + /// # Errors + /// This function returns an error if `raw_symbol` contains invalid UTF-8. + pub fn raw_symbol(&self) -> Result<&str> { + c_chars_to_str(&self.raw_symbol) + } + + /// Returns exchange used to identify the instrument as a `&str`. + /// + /// # Errors + /// This function returns an error if `exchange` contains invalid UTF-8. + pub fn exchange(&self) -> Result<&str> { + c_chars_to_str(&self.exchange) + } + + /// Returns the underlying asset code (product code) of the instrument as a `&str`. + /// + /// # Errors + /// This function returns an error if `asset` contains invalid UTF-8. + pub fn asset(&self) -> Result<&str> { + c_chars_to_str(&self.asset) + } + + /// Returns the ISO standard instrument categorization code as a `&str`. + /// + /// # Errors + /// This function returns an error if `cfi` contains invalid UTF-8. + pub fn cfi(&self) -> Result<&str> { + c_chars_to_str(&self.cfi) + } + + /// Returns the type of the strument, e.g. FUT for future or future spread as + /// a `&str`. + /// + /// # Errors + /// This function returns an error if `security_type` contains invalid UTF-8. + pub fn security_type(&self) -> Result<&str> { + c_chars_to_str(&self.security_type) + } + + /// Returns the unit of measure for the instrument's original contract size, e.g. + /// USD or LBS, as a `&str`. + /// + /// # Errors + /// This function returns an error if `unit_of_measure` contains invalid UTF-8. + pub fn unit_of_measure(&self) -> Result<&str> { + c_chars_to_str(&self.unit_of_measure) + } + + /// Returns the symbol of the first underlying instrument as a `&str`. + /// + /// # Errors + /// This function returns an error if `underlying` contains invalid UTF-8. + pub fn underlying(&self) -> Result<&str> { + c_chars_to_str(&self.underlying) + } + + /// Returns the currency of [`strike_price`](Self::strike_price) as a `&str`. + /// + /// # Errors + /// This function returns an error if `strike_price_currency` contains invalid UTF-8. + pub fn strike_price_currency(&self) -> Result<&str> { + c_chars_to_str(&self.strike_price_currency) + } + + /// Returns the security group code of the instrumnet as a `&str`. + /// + /// # Errors + /// This function returns an error if `group` contains invalid UTF-8. + pub fn group(&self) -> Result<&str> { + c_chars_to_str(&self.group) + } + + /// Tries to convert the raw classification of the instrument to an enum. + /// + /// # Errors + /// This function returns an error if the `instrument_class` field does not + /// contain a valid [`InstrumentClass`]. + pub fn instrument_class(&self) -> Result { + InstrumentClass::try_from(self.instrument_class as u8).map_err(|_| { + Error::conversion::(format!("{:#02X}", self.instrument_class as u8)) + }) + } + + /// Tries to convert the raw matching algorithm used for the instrument to an enum. + /// + /// # Errors + /// This function returns an error if the `match_algorithm` field does not + /// contain a valid [`MatchAlgorithm`]. + pub fn match_algorithm(&self) -> Result { + MatchAlgorithm::try_from(self.match_algorithm as u8).map_err(|_| { + Error::conversion::(format!("{:#02X}", self.match_algorithm as u8)) + }) + } +} + +impl ImbalanceMsg { + /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` + /// if `ts_recv` contains the sentinel for a null timestamp. + pub fn ts_recv(&self) -> Option { + if self.ts_recv == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) + } + } +} + +impl StatMsg { + /// Parses the raw capture-server-received timestamp into a datetime. Returns `None` + /// if `ts_recv` contains the sentinel for a null timestamp. + pub fn ts_recv(&self) -> Option { + if self.ts_recv == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_recv as i128).unwrap()) + } + } + + /// Parses the raw reference timestamp of the statistic value into a datetime. + /// Returns `None` if `ts_ref` contains the sentinel for a null timestamp. + pub fn ts_ref(&self) -> Option { + if self.ts_ref == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_ref as i128).unwrap()) + } + } + + /// Tries to convert the raw type of the statistic value to an enum. + /// + /// # Errors + /// This function returns an error if the `stat_type` field does not + /// contain a valid [`StatType`]. + pub fn stat_type(&self) -> Result { + StatType::try_from(self.stat_type) + .map_err(|_| Error::conversion::(format!("{:02X}", self.stat_type))) + } + + /// Tries to convert the raw `update_action` to an enum. + /// + /// # Errors + /// This function returns an error if the `update_action` field does not + /// contain a valid [`StatUpdateAction`]. + pub fn update_action(&self) -> Result { + StatUpdateAction::try_from(self.update_action).map_err(|_| { + Error::conversion::(format!("{:02X}", self.update_action)) + }) + } +} + +impl ErrorMsg { + /// Creates a new `ErrorMsg`. + /// + /// # Errors + /// This function returns an error if `msg` is too long. + pub fn new(ts_event: u64, msg: &str) -> Self { + let mut error = Self { + hd: RecordHeader::new::(rtype::ERROR, 0, 0, ts_event), + err: [0; 64], + }; + // leave at least one null byte + for (i, byte) in msg.as_bytes().iter().take(error.err.len() - 1).enumerate() { + error.err[i] = *byte as c_char; + } + error + } + + /// Returns `err` as a `&str`. + /// + /// # Errors + /// This function returns an error if `err` contains invalid UTF-8. + pub fn err(&self) -> Result<&str> { + c_chars_to_str(&self.err) + } +} + +impl SymbolMappingMsg { + /// Creates a new `SymbolMappingMsg`. + /// + /// # Errors + /// This function returns an error if `stype_in_symbol` or `stype_out_symbol` + /// contain more than maximum number of characters of 21. + pub fn new( + instrument_id: u32, + ts_event: u64, + stype_in_symbol: &str, + stype_out_symbol: &str, + start_ts: u64, + end_ts: u64, + ) -> crate::Result { + // symbol mappings aren't publisher-specific + Ok(Self { + hd: RecordHeader::new::(rtype::SYMBOL_MAPPING, 0, instrument_id, ts_event), + stype_in_symbol: str_to_c_chars(stype_in_symbol)?, + stype_out_symbol: str_to_c_chars(stype_out_symbol)?, + _dummy: Default::default(), + start_ts, + end_ts, + }) + } + + /// Returns the input symbol as a `&str`. + /// + /// # Errors + /// This function returns an error if `stype_in_symbol` contains invalid UTF-8. + pub fn stype_in_symbol(&self) -> Result<&str> { + c_chars_to_str(&self.stype_in_symbol) + } + + /// Returns the output symbol as a `&str`. + /// + /// # Errors + /// This function returns an error if `stype_out_symbol` contains invalid UTF-8. + pub fn stype_out_symbol(&self) -> Result<&str> { + c_chars_to_str(&self.stype_out_symbol) + } + + /// Parses the raw start of the mapping interval into a datetime. Returns `None` if + /// `start_ts` contains the sentinel for a null timestamp. + pub fn start_ts(&self) -> Option { + if self.start_ts == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.start_ts as i128).unwrap()) + } + } + + /// Parses the raw end of the mapping interval into a datetime. Returns `None` if + /// `end_ts` contains the sentinel for a null timestamp. + pub fn end_ts(&self) -> Option { + if self.end_ts == crate::UNDEF_TIMESTAMP { + None + } else { + // u64::MAX is within maximum allowable range + Some(time::OffsetDateTime::from_unix_timestamp_nanos(self.end_ts as i128).unwrap()) + } + } +} + +impl SystemMsg { + const HEARTBEAT: &str = "Heartbeat"; + + /// Creates a new `SystemMsg`. + /// + /// # Errors + /// This function returns an error if `msg` is too long. + pub fn new(ts_event: u64, msg: &str) -> Result { + Ok(Self { + hd: RecordHeader::new::(rtype::SYSTEM, 0, 0, ts_event), + msg: str_to_c_chars(msg)?, + }) + } + + /// Creates a new heartbeat `SystemMsg`. + pub fn heartbeat(ts_event: u64) -> Self { + Self { + hd: RecordHeader::new::(rtype::SYSTEM, 0, 0, ts_event), + msg: str_to_c_chars(Self::HEARTBEAT).unwrap(), + } + } + + /// Checks whether the message is a heartbeat from the gateway. + pub fn is_heartbeat(&self) -> bool { + self.msg() + .map(|msg| msg == Self::HEARTBEAT) + .unwrap_or_default() + } + + /// Returns the message from the Databento Live Subscription Gateway (LSG) as + /// a `&str`. + /// + /// # Errors + /// This function returns an error if `msg` contains invalid UTF-8. + pub fn msg(&self) -> Result<&str> { + c_chars_to_str(&self.msg) + } +} + +impl HasRType for WithTsOut { + fn has_rtype(rtype: u8) -> bool { + T::has_rtype(rtype) + } + + fn header(&self) -> &RecordHeader { + self.rec.header() + } + + fn header_mut(&mut self) -> &mut RecordHeader { + self.rec.header_mut() + } +} + +impl AsRef<[u8]> for WithTsOut +where + T: HasRType + AsRef<[u8]>, +{ + fn as_ref(&self) -> &[u8] { + unsafe { as_u8_slice(self) } + } +} + +impl WithTsOut { + /// Creates a new record with `ts_out`. Updates the `length` property in + /// [`RecordHeader`] to ensure the additional field is accounted for. + pub fn new(rec: T, ts_out: u64) -> Self { + let mut res = Self { rec, ts_out }; + res.header_mut().length = (mem::size_of_val(&res) / RecordHeader::LENGTH_MULTIPLIER) as u8; + res + } + + /// Parses the raw live gateway send timestamp into a datetime. + pub fn ts_out(&self) -> time::OffsetDateTime { + // u64::MAX is within maximum allowable range + time::OffsetDateTime::from_unix_timestamp_nanos(self.ts_out as i128).unwrap() + } +}