diff --git a/CHANGELOG.md b/CHANGELOG.md index a20abe4..9965a76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,14 +1,27 @@ # Changelog -## 0.14.3 - TBD +## 0.15.0 - TBD ### Enhancements -- Added type definition for `Metadata.__init__` +- Added `--schema` option to `dbn` CLI tool to filter a DBN to a particular schema. This + allows outputting saved live data to CSV +- Allowed passing `--limit` option to `dbn` CLI tool with `--metadata` flag +- Improved performance of decoding uncompressed DBN fragments with the `dbn` CLI tool - Added `version` param to Python `Metadata` contructor choose between DBNv1 and DBNv2 -- Implemented `Hash` for all record types - Implemented `Deserialize` and `Serialize` for all records and enums (with `serde` feature enabled). This allows serializing records with additional encodings not supported by the DBN crate +- Implemented `Hash` for all record types - Added new publisher value for OPRA MIAX Sapphire +- Added Python type definition for `Metadata.__init__` +- Added `metadata_mut` method to decoders to get a mutable reference to the decoded + metadata + +### Breaking changes +- Split `DecodeDbn` trait into `DecodeRecord` and `DbnMetadata` traits for more + flexibility. `DecodeDbn` continues to exist as a trait alias +- Moved `decode_stream` out of `DecodeDbn` to its own separate trait `DecodeStream` +- Changed trait bounds of `EncodeDbn::encode_decoded` and `encode_decoded_with_limit` to + `DecodeRecordRef + DbnMetadata` ### Bug fixes - Fixed panic in `TsSymbolMap` when `start_date` == `end_date` diff --git a/c/src/decode.rs b/c/src/decode.rs index f99b9a0..45beb58 100644 --- a/c/src/decode.rs +++ b/c/src/decode.rs @@ -9,7 +9,7 @@ use std::{ }; use dbn::{ - decode::{DecodeDbn, DecodeRecordRef, DynDecoder}, + decode::{DbnMetadata, DecodeRecordRef, DynDecoder}, Compression, Metadata, Record, RecordHeader, VersionUpgradePolicy, }; diff --git a/python/src/encode.rs b/python/src/encode.rs index a280954..ab40034 100644 --- a/python/src/encode.rs +++ b/python/src/encode.rs @@ -210,16 +210,15 @@ fn py_to_rs_io_err(e: PyErr) -> io::Error { #[cfg(test)] pub mod tests { - use std::io::{Cursor, Seek, Write}; - - use std::sync::{Arc, Mutex}; + use std::{ + io::{Cursor, Seek, Write}, + sync::{Arc, Mutex}, + }; - use dbn::datasets::GLBX_MDP3; use dbn::{ - decode::{dbn::Decoder as DbnDecoder, DecodeDbn}, - enums::SType, - metadata::MetadataBuilder, - record::TbboMsg, + datasets::GLBX_MDP3, + decode::{dbn::Decoder as DbnDecoder, DbnMetadata, DecodeRecord}, + SType, TbboMsg, }; use super::*; @@ -298,7 +297,7 @@ pub mod tests { let mock_file = MockPyFile::new(); let output_buf = mock_file.inner(); let mock_file = Py::new(py, mock_file).unwrap().into_py(py); - let metadata = MetadataBuilder::new() + let metadata = Metadata::builder() .dataset(DATASET.to_owned()) .schema(Some($schema)) .start(0) diff --git a/rust/dbn-cli/src/encode.rs b/rust/dbn-cli/src/encode.rs index a21693c..7f9f8c9 100644 --- a/rust/dbn-cli/src/encode.rs +++ b/rust/dbn-cli/src/encode.rs @@ -1,7 +1,7 @@ use std::io; use dbn::{ - decode::{DbnRecordDecoder, DecodeDbn, DecodeRecordRef, DynDecoder}, + decode::{DbnMetadata, DecodeRecordRef}, encode::{ json, DbnEncodable, DbnRecordEncoder, DynEncoder, DynWriter, EncodeDbn, EncodeRecordRef, }, @@ -10,7 +10,10 @@ use dbn::{ use crate::{infer_encoding_and_compression, output_from_args, Args}; -pub fn encode_from_dbn(decoder: DynDecoder, args: &Args) -> anyhow::Result<()> { +pub fn encode_from_dbn(decoder: D, args: &Args) -> anyhow::Result<()> +where + D: DecodeRecordRef + DbnMetadata, +{ let writer = output_from_args(args)?; let (encoding, compression) = infer_encoding_and_compression(args)?; let encode_res = if args.should_output_metadata { @@ -23,21 +26,7 @@ pub fn encode_from_dbn(decoder: DynDecoder, args: &Args) -> a ) .encode_metadata(decoder.metadata()) } else if args.fragment { - encode_fragment(decoder, writer, compression, args) - } else if let Some(limit) = args.limit { - let mut metadata = decoder.metadata().clone(); - // Update metadata - metadata.limit = args.limit; - DynEncoder::new( - writer, - encoding, - compression, - &metadata, - args.should_pretty_print, - args.should_pretty_print, - args.should_pretty_print, - )? - .encode_decoded_with_limit(decoder, limit) + encode_fragment(decoder, writer, compression) } else { DynEncoder::new( writer, @@ -59,14 +48,14 @@ pub fn encode_from_dbn(decoder: DynDecoder, args: &Args) -> a } } -pub fn encode_from_frag( - mut decoder: DbnRecordDecoder, - args: &Args, -) -> anyhow::Result<()> { +pub fn encode_from_frag(mut decoder: D, args: &Args) -> anyhow::Result<()> +where + D: DecodeRecordRef, +{ let writer = output_from_args(args)?; let (encoding, compression) = infer_encoding_and_compression(args)?; if args.fragment { - encode_fragment(decoder, writer, compression, args)?; + encode_fragment(decoder, writer, compression)?; return Ok(()); } assert!(!args.should_output_metadata); @@ -87,7 +76,6 @@ pub fn encode_from_frag( args.should_pretty_print, args.should_pretty_print, )?; - let mut n = 0; let mut has_written_header = encoding != Encoding::Csv; fn write_header( _record: &T, @@ -115,10 +103,6 @@ pub fn encode_from_frag( } res => res?, }; - n += 1; - if args.limit.map_or(false, |l| n >= l.get()) { - break; - } } Ok(()) } @@ -127,16 +111,10 @@ fn encode_fragment( mut decoder: D, writer: Box, compression: Compression, - args: &Args, ) -> dbn::Result<()> { let mut encoder = DbnRecordEncoder::new(DynWriter::new(writer, compression)?); - let mut n = 0; while let Some(record) = decoder.decode_record_ref()? { encoder.encode_record_ref(record)?; - n += 1; - if args.limit.map_or(false, |l| n >= l.get()) { - break; - } } Ok(()) } diff --git a/rust/dbn-cli/src/filter.rs b/rust/dbn-cli/src/filter.rs new file mode 100644 index 0000000..ab3521c --- /dev/null +++ b/rust/dbn-cli/src/filter.rs @@ -0,0 +1,122 @@ +use std::num::NonZeroU64; + +use dbn::{ + decode::{DbnMetadata, DecodeRecordRef}, + RType, Record, RecordRef, Schema, +}; + +#[derive(Debug)] +pub struct SchemaFilter { + decoder: D, + rtype: Option, +} + +impl SchemaFilter +where + D: DbnMetadata, +{ + pub fn new(mut decoder: D, schema: Option) -> Self { + if let Some(schema) = schema { + decoder.metadata_mut().schema = Some(schema); + } + Self::new_no_metadata(decoder, schema) + } +} + +impl SchemaFilter { + pub fn new_no_metadata(decoder: D, schema: Option) -> Self { + Self { + decoder, + rtype: schema.map(RType::from), + } + } +} + +impl DbnMetadata for SchemaFilter { + fn metadata(&self) -> &dbn::Metadata { + self.decoder.metadata() + } + + fn metadata_mut(&mut self) -> &mut dbn::Metadata { + self.decoder.metadata_mut() + } +} + +impl DecodeRecordRef for SchemaFilter { + fn decode_record_ref(&mut self) -> dbn::Result> { + while let Some(record) = self.decoder.decode_record_ref()? { + if self + .rtype + .map(|rtype| rtype as u8 == record.header().rtype) + .unwrap_or(true) + { + // Safe: casting reference to pointer so the pointer will always be valid. + // Getting around borrow checker limitation. + return Ok(Some(unsafe { + RecordRef::unchecked_from_header(record.header()) + })); + } + } + Ok(None) + } +} + +#[derive(Debug)] +pub struct LimitFilter { + decoder: D, + limit: Option, + record_count: u64, +} + +impl LimitFilter +where + D: DbnMetadata, +{ + pub fn new(mut decoder: D, limit: Option) -> Self { + if let Some(limit) = limit { + let metadata_limit = &mut decoder.metadata_mut().limit; + if let Some(metadata_limit) = metadata_limit { + *metadata_limit = (*metadata_limit).min(limit); + } else { + *metadata_limit = Some(limit); + } + } + Self::new_no_metadata(decoder, limit) + } +} + +impl LimitFilter { + pub fn new_no_metadata(decoder: D, limit: Option) -> Self { + Self { + decoder, + limit, + record_count: 0, + } + } +} + +impl DbnMetadata for LimitFilter { + fn metadata(&self) -> &dbn::Metadata { + self.decoder.metadata() + } + + fn metadata_mut(&mut self) -> &mut dbn::Metadata { + self.decoder.metadata_mut() + } +} + +impl DecodeRecordRef for LimitFilter { + fn decode_record_ref(&mut self) -> dbn::Result> { + if self + .limit + .map(|limit| self.record_count >= limit.get()) + .unwrap_or(false) + { + return Ok(None); + } + Ok(self.decoder.decode_record_ref()?.map(|rec| { + self.record_count += 1; + rec + })) + } +} diff --git a/rust/dbn-cli/src/lib.rs b/rust/dbn-cli/src/lib.rs index cd235d3..1bbe991 100644 --- a/rust/dbn-cli/src/lib.rs +++ b/rust/dbn-cli/src/lib.rs @@ -10,10 +10,11 @@ use clap::{ArgAction, Parser, ValueEnum}; use dbn::{ enums::{Compression, Encoding}, - VersionUpgradePolicy, + Schema, VersionUpgradePolicy, }; pub mod encode; +pub mod filter; /// How the output of the `dbn` command will be encoded. #[derive(Clone, Copy, Debug, ValueEnum)] @@ -118,7 +119,6 @@ pub struct Args { short = 'l', long = "limit", value_name = "NUM_RECORDS", - conflicts_with = "should_output_metadata", help = "Limit the number of records in the output to the specified number" )] pub limit: Option, @@ -149,6 +149,12 @@ pub struct Args { requires = "input_fragment" )] pub input_dbn_version_override: Option, + #[clap( + long = "schema", + help = "Only encode records of this schema. This is particularly useful for transcoding mixed-schema DBN to CSV, which doesn't support mixing schemas", + value_name = "SCHEMA" + )] + pub schema_filter: Option, } impl Args { @@ -174,6 +180,10 @@ impl Args { VersionUpgradePolicy::AsIs } } + + pub fn input_version(&self) -> u8 { + self.input_dbn_version_override.unwrap_or(dbn::DBN_VERSION) + } } /// Infer the [`Encoding`] and [`Compression`] from `args` if they aren't already explicitly @@ -228,8 +238,7 @@ pub fn output_from_args(args: &Args) -> anyhow::Result> { fn open_output_file(path: &PathBuf, force: bool) -> anyhow::Result { let mut options = File::options(); - options.write(true); - options.truncate(true); + options.write(true).truncate(true); if force { options.create(true); } else if path.exists() { diff --git a/rust/dbn-cli/src/main.rs b/rust/dbn-cli/src/main.rs index d57e365..13005ff 100644 --- a/rust/dbn-cli/src/main.rs +++ b/rust/dbn-cli/src/main.rs @@ -1,65 +1,81 @@ -use std::{fs::File, io}; +use std::{ + fs::File, + io::{self, BufReader}, +}; use clap::Parser; -use dbn::decode::{DbnRecordDecoder, DynDecoder}; +use dbn::decode::{DbnMetadata, DbnRecordDecoder, DecodeRecordRef, DynDecoder}; use dbn_cli::{ encode::{encode_from_dbn, encode_from_frag}, + filter::{LimitFilter, SchemaFilter}, Args, }; const STDIN_SENTINEL: &str = "-"; +fn wrap_frag(args: &Args, reader: impl io::Read) -> anyhow::Result { + Ok(LimitFilter::new_no_metadata( + SchemaFilter::new_no_metadata( + DbnRecordDecoder::with_version(reader, args.input_version(), args.upgrade_policy())?, + args.schema_filter, + ), + args.limit, + )) +} + +fn wrap( + args: &Args, + decoder: DynDecoder<'static, R>, +) -> impl DecodeRecordRef + DbnMetadata { + LimitFilter::new(SchemaFilter::new(decoder, args.schema_filter), args.limit) +} + fn main() -> anyhow::Result<()> { let args = Args::parse(); - let input_version = args.input_dbn_version_override.unwrap_or(dbn::DBN_VERSION); + // DBN fragment if args.is_input_fragment { if args.input.as_os_str() == STDIN_SENTINEL { - encode_from_frag( - DbnRecordDecoder::with_version( - io::stdin().lock(), - input_version, - args.upgrade_policy(), - )?, - &args, - ) + encode_from_frag(wrap_frag(&args, io::stdin().lock())?, &args) } else { encode_from_frag( - DbnRecordDecoder::with_version( - File::open(args.input.clone())?, - input_version, - args.upgrade_policy(), - )?, + wrap_frag(&args, BufReader::new(File::open(args.input.clone())?))?, &args, ) } + // Zstd-compressed DBN fragment } else if args.is_input_zstd_fragment { if args.input.as_os_str() == STDIN_SENTINEL { encode_from_frag( - DbnRecordDecoder::with_version( + wrap_frag( + &args, zstd::stream::Decoder::with_buffer(io::stdin().lock())?, - input_version, - args.upgrade_policy(), )?, &args, ) } else { encode_from_frag( - DbnRecordDecoder::with_version( + wrap_frag( + &args, zstd::stream::Decoder::new(File::open(args.input.clone())?)?, - input_version, - args.upgrade_policy(), )?, &args, ) } + // DBN stream (with metadata) } else if args.input.as_os_str() == STDIN_SENTINEL { encode_from_dbn( - DynDecoder::inferred_with_buffer(io::stdin().lock(), args.upgrade_policy())?, + wrap( + &args, + DynDecoder::inferred_with_buffer(io::stdin().lock(), args.upgrade_policy())?, + ), &args, ) } else { encode_from_dbn( - DynDecoder::from_file(&args.input, args.upgrade_policy())?, + wrap( + &args, + DynDecoder::from_file(&args.input, args.upgrade_policy())?, + ), &args, ) } diff --git a/rust/dbn-cli/tests/integration_tests.rs b/rust/dbn-cli/tests/integration_tests.rs index 0105851..77b9365 100644 --- a/rust/dbn-cli/tests/integration_tests.rs +++ b/rust/dbn-cli/tests/integration_tests.rs @@ -351,18 +351,21 @@ fn convert_dbz_to_dbn() { } #[test] -fn metadata_conflicts_with_limit() { +fn limit_and_schema_filter_update_metadata() { cmd() .args([ - &format!("{TEST_DATA_PATH}/test_data.definition.dbn.zst"), + &format!("{TEST_DATA_PATH}/test_data.ohlcv-1m.dbn.zst"), "--json", "--metadata", "--limit", "1", + "--schema", + "ohlcv-1d", ]) .assert() - .failure() - .stderr(contains("'--metadata' cannot be used with '--limit")); + .success() + .stdout(contains(r#""limit":"1""#)) + .stdout(contains(r#""schema":"ohlcv-1d""#)); } #[rstest] diff --git a/rust/dbn/src/decode.rs b/rust/dbn/src/decode.rs index 648c575..3f931a9 100644 --- a/rust/dbn/src/decode.rs +++ b/rust/dbn/src/decode.rs @@ -51,11 +51,17 @@ pub trait DecodeRecordRef { fn decode_record_ref(&mut self) -> crate::Result>; } -/// Trait for types that decode DBN records of a particular type. -pub trait DecodeDbn: DecodeRecordRef + private::BufferSlice { - /// Returns a reference to the decoded [`Metadata`]. +/// Trait for decoders with metadata about what's being decoded. +pub trait DbnMetadata { + /// Returns an immutable reference to the decoded [`Metadata`]. fn metadata(&self) -> &Metadata; + /// Returns a mutable reference to the decoded [`Metadata`]. + fn metadata_mut(&mut self) -> &mut Metadata; +} + +/// Trait for types that decode DBN records of a particular type. +pub trait DecodeRecord { /// Tries to decode a reference to a single record of type `T`. Returns `Ok(None)` /// if the input has been exhausted. /// @@ -70,12 +76,6 @@ pub trait DecodeDbn: DecodeRecordRef + private::BufferSlice { /// [`Error::Decode`](crate::Error::Decode) will be returned. fn decode_record(&mut self) -> crate::Result>; - /// Converts the decoder into a streaming iterator of records of type `T`. This - /// lazily decodes the data. - fn decode_stream(self) -> StreamIterDecoder - where - Self: Sized; - /// Tries to decode all records into a `Vec`. This eagerly decodes the data. /// /// # Errors @@ -99,6 +99,18 @@ pub trait DecodeDbn: DecodeRecordRef + private::BufferSlice { } } +/// A trait alias for DBN decoders with metadata. +pub trait DecodeDbn: DecodeRecord + DecodeRecordRef + DbnMetadata {} + +/// A trait for decoders that can be converted to streaming iterators. +pub trait DecodeStream: DecodeRecord + private::BufferSlice { + /// Converts the decoder into a streaming iterator of records of type `T`. This + /// lazily decodes the data. + fn decode_stream(self) -> StreamIterDecoder + where + Self: Sized; +} + /// A decoder implementing [`DecodeDbn`] whose [`Encoding`](crate::enums::Encoding) and /// [`Compression`] are determined at runtime by peeking at the first few bytes. pub struct DynDecoder<'a, R>(DynDecoderImpl<'a, R>) @@ -250,7 +262,7 @@ where } #[allow(deprecated)] -impl<'a, R> DecodeDbn for DynDecoder<'a, R> +impl<'a, R> DbnMetadata for DynDecoder<'a, R> where R: io::BufRead, { @@ -262,6 +274,20 @@ where } } + fn metadata_mut(&mut self) -> &mut Metadata { + match &mut self.0 { + DynDecoderImpl::Dbn(decoder) => decoder.metadata_mut(), + DynDecoderImpl::ZstdDbn(decoder) => decoder.metadata_mut(), + DynDecoderImpl::LegacyDbz(decoder) => decoder.metadata_mut(), + } + } +} + +#[allow(deprecated)] +impl<'a, R> DecodeRecord for DynDecoder<'a, R> +where + R: io::BufRead, +{ fn decode_record(&mut self) -> crate::Result> { match &mut self.0 { DynDecoderImpl::Dbn(decoder) => decoder.decode_record(), @@ -269,7 +295,12 @@ where DynDecoderImpl::LegacyDbz(decoder) => decoder.decode_record(), } } +} +impl<'a, R> DecodeStream for DynDecoder<'a, R> +where + R: io::BufRead, +{ fn decode_stream(self) -> StreamIterDecoder where Self: Sized, diff --git a/rust/dbn/src/decode/dbn/sync.rs b/rust/dbn/src/decode/dbn/sync.rs index 97e75c3..0f2be11 100644 --- a/rust/dbn/src/decode/dbn/sync.rs +++ b/rust/dbn/src/decode/dbn/sync.rs @@ -11,8 +11,8 @@ use super::{DBN_PREFIX, DBN_PREFIX_LEN}; use crate::{ compat::{self, SYMBOL_CSTR_LEN_V1}, decode::{ - private::BufferSlice, DecodeDbn, DecodeRecordRef, FromLittleEndianSlice, StreamIterDecoder, - VersionUpgradePolicy, + private::BufferSlice, DbnMetadata, DecodeRecord, DecodeRecordRef, DecodeStream, + FromLittleEndianSlice, StreamIterDecoder, VersionUpgradePolicy, }, error::silence_eof_error, HasRType, MappingInterval, Metadata, Record, RecordHeader, RecordRef, SType, Schema, @@ -20,10 +20,7 @@ use crate::{ }; /// Type for decoding files and streams in Databento Binary Encoding (DBN), both metadata and records. -pub struct Decoder -where - R: io::Read, -{ +pub struct Decoder { metadata: Metadata, decoder: RecordDecoder, } @@ -170,18 +167,29 @@ where } } -impl DecodeDbn for Decoder -where - R: io::Read, -{ +impl DbnMetadata for Decoder { fn metadata(&self) -> &Metadata { &self.metadata } + fn metadata_mut(&mut self) -> &mut Metadata { + &mut self.metadata + } +} + +impl DecodeRecord for Decoder +where + R: io::Read, +{ fn decode_record(&mut self) -> crate::Result> { self.decoder.decode() } +} +impl DecodeStream for Decoder +where + R: io::Read, +{ fn decode_stream(self) -> StreamIterDecoder { StreamIterDecoder::new(self) } @@ -197,10 +205,7 @@ where } /// A DBN decoder of records -pub struct RecordDecoder -where - R: io::Read, -{ +pub struct RecordDecoder { /// For future use with reading different DBN versions. version: u8, upgrade_policy: VersionUpgradePolicy, diff --git a/rust/dbn/src/decode/dbz.rs b/rust/dbn/src/decode/dbz.rs index 515842d..9cb03a9 100644 --- a/rust/dbn/src/decode/dbz.rs +++ b/rust/dbn/src/decode/dbz.rs @@ -9,8 +9,8 @@ use std::{ }; use super::{ - private::BufferSlice, zstd::ZSTD_SKIPPABLE_MAGIC_RANGE, DecodeDbn, DecodeRecordRef, - StreamIterDecoder, VersionUpgradePolicy, + private::BufferSlice, zstd::ZSTD_SKIPPABLE_MAGIC_RANGE, DbnMetadata, DecodeRecord, + DecodeRecordRef, DecodeStream, StreamIterDecoder, VersionUpgradePolicy, }; use crate::{ compat, @@ -127,11 +127,17 @@ impl DecodeRecordRef for Decoder { } } -impl DecodeDbn for Decoder { +impl DbnMetadata for Decoder { fn metadata(&self) -> &Metadata { &self.metadata } + fn metadata_mut(&mut self) -> &mut Metadata { + &mut self.metadata + } +} + +impl DecodeRecord for Decoder { fn decode_record(&mut self) -> crate::Result> { let rec_ref = self.decode_record_ref()?; if let Some(rec_ref) = rec_ref { @@ -148,7 +154,9 @@ impl DecodeDbn for Decoder { Ok(None) } } +} +impl DecodeStream for Decoder { /// Try to decode the DBZ file into a streaming iterator. This decodes the /// data lazily. /// diff --git a/rust/dbn/src/decode/stream.rs b/rust/dbn/src/decode/stream.rs index a0bf065..991c566 100644 --- a/rust/dbn/src/decode/stream.rs +++ b/rust/dbn/src/decode/stream.rs @@ -2,16 +2,16 @@ use std::marker::PhantomData; use streaming_iterator::StreamingIterator; -use super::DecodeDbn; +use super::{DecodeRecord, DecodeStream}; use crate::record::{transmute_record_bytes, HasRType}; -/// A consuming iterator wrapping a [`DecodeDbn`]. Lazily decodes the contents of the file -/// or other input stream. +/// A consuming iterator wrapping a [`DecodeRecord`]. Lazily decodes the contents of the +/// file or other input stream. /// /// Implements [`streaming_iterator::StreamingIterator`]. pub struct StreamIterDecoder where - D: DecodeDbn, + D: DecodeRecord, T: HasRType, { /// The underlying decoder implementation. @@ -27,7 +27,7 @@ where impl StreamIterDecoder where - D: DecodeDbn, + D: DecodeRecord, T: HasRType, { pub(crate) fn new(decoder: D) -> Self { @@ -47,7 +47,7 @@ where impl StreamingIterator for StreamIterDecoder where - D: DecodeDbn, + D: DecodeStream, T: HasRType, { type Item = T; diff --git a/rust/dbn/src/encode.rs b/rust/dbn/src/encode.rs index eacd9ba..b1b0ae8 100644 --- a/rust/dbn/src/encode.rs +++ b/rust/dbn/src/encode.rs @@ -26,8 +26,9 @@ pub use self::{ }; use crate::{ - decode::DecodeDbn, rtype_method_dispatch, rtype_ts_out_method_dispatch, Compression, Encoding, - Error, HasRType, Metadata, Record, RecordRef, Result, Schema, + decode::{DbnMetadata, DecodeRecordRef}, + rtype_method_dispatch, rtype_ts_out_method_dispatch, Compression, Encoding, Error, HasRType, + Metadata, Record, RecordRef, Result, Schema, }; use self::{csv::serialize::CsvSerialize, json::serialize::JsonSerialize}; @@ -112,7 +113,7 @@ pub trait EncodeDbn: EncodeRecord + EncodeRecordRef { /// # Errors /// This function returns an error if it's unable to write to the underlying writer /// or there's a serialization error. - fn encode_decoded(&mut self, mut decoder: D) -> Result<()> { + fn encode_decoded(&mut self, mut decoder: D) -> Result<()> { let ts_out = decoder.metadata().ts_out; while let Some(record) = decoder.decode_record_ref()? { // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out` @@ -129,7 +130,7 @@ pub trait EncodeDbn: EncodeRecord + EncodeRecordRef { /// # Errors /// This function returns an error if it's unable to write to the underlying writer /// or there's a serialization error. - fn encode_decoded_with_limit( + fn encode_decoded_with_limit( &mut self, mut decoder: D, limit: NonZeroU64, @@ -417,7 +418,7 @@ where self.0.encode_stream(stream) } - fn encode_decoded(&mut self, decoder: D) -> Result<()> { + fn encode_decoded(&mut self, decoder: D) -> Result<()> { self.0.encode_decoded(decoder) } } @@ -490,7 +491,7 @@ macro_rules! encoder_enum_dispatch { } } - fn encode_decoded( + fn encode_decoded( &mut self, decoder: D, ) -> Result<()> { diff --git a/rust/dbn/src/encode/csv/sync.rs b/rust/dbn/src/encode/csv/sync.rs index 639c9de..4c5308b 100644 --- a/rust/dbn/src/encode/csv/sync.rs +++ b/rust/dbn/src/encode/csv/sync.rs @@ -3,7 +3,7 @@ use std::{io, num::NonZeroU64}; use streaming_iterator::StreamingIterator; use crate::{ - decode::DecodeDbn, + decode::{DbnMetadata, DecodeRecordRef}, encode::{DbnEncodable, EncodeDbn, EncodeRecord, EncodeRecordRef, EncodeRecordTextExt}, rtype_method_dispatch, rtype_ts_out_method_dispatch, schema_method_dispatch, schema_ts_out_method_dispatch, Error, RType, Record, Result, Schema, @@ -178,7 +178,7 @@ where /// # Errors /// This function returns an error if it's unable to write to the underlying writer /// or there's a serialization error. - fn encode_decoded(&mut self, mut decoder: D) -> Result<()> { + fn encode_decoded(&mut self, mut decoder: D) -> Result<()> { let ts_out = decoder.metadata().ts_out; if let Some(schema) = decoder.metadata().schema { schema_method_dispatch!(schema, self, encode_header, false)?; @@ -198,7 +198,7 @@ where } } - fn encode_decoded_with_limit( + fn encode_decoded_with_limit( &mut self, mut decoder: D, limit: NonZeroU64, diff --git a/rust/dbn/src/python/metadata.rs b/rust/dbn/src/python/metadata.rs index 066d845..ebd0b0d 100644 --- a/rust/dbn/src/python/metadata.rs +++ b/rust/dbn/src/python/metadata.rs @@ -8,7 +8,7 @@ use pyo3::{ }; use crate::{ - decode::{DecodeDbn, DynDecoder}, + decode::{DbnMetadata, DynDecoder}, encode::dbn::MetadataEncoder, enums::{SType, Schema}, MappingInterval, Metadata, SymbolMapping, VersionUpgradePolicy,