Skip to content

Commit

Permalink
ADD: Add schema filter to dbn CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen committed Dec 15, 2023
1 parent e9a8e72 commit b932e2f
Show file tree
Hide file tree
Showing 15 changed files with 306 additions and 121 deletions.
19 changes: 16 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
# Changelog

## 0.14.3 - TBD
## 0.15.0 - TBD
### Enhancements
- Added type definition for `Metadata.__init__`
- Added `--schema` option to `dbn` CLI tool to filter a DBN to a particular schema. This
allows outputting saved live data to CSV
- Allowed passing `--limit` option to `dbn` CLI tool with `--metadata` flag
- Improved performance of decoding uncompressed DBN fragments with the `dbn` CLI tool
- Added `version` param to Python `Metadata` contructor choose between DBNv1 and DBNv2
- Implemented `Hash` for all record types
- Implemented `Deserialize` and `Serialize` for all records and enums (with `serde`
feature enabled). This allows serializing records with additional encodings not
supported by the DBN crate
- Implemented `Hash` for all record types
- Added new publisher value for OPRA MIAX Sapphire
- Added Python type definition for `Metadata.__init__`
- Added `metadata_mut` method to decoders to get a mutable reference to the decoded
metadata

### Breaking changes
- Split `DecodeDbn` trait into `DecodeRecord` and `DbnMetadata` traits for more
flexibility. `DecodeDbn` continues to exist as a trait alias
- Moved `decode_stream` out of `DecodeDbn` to its own separate trait `DecodeStream`
- Changed trait bounds of `EncodeDbn::encode_decoded` and `encode_decoded_with_limit` to
`DecodeRecordRef + DbnMetadata`

### Bug fixes
- Fixed panic in `TsSymbolMap` when `start_date` == `end_date`
Expand Down
2 changes: 1 addition & 1 deletion c/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

use dbn::{
decode::{DecodeDbn, DecodeRecordRef, DynDecoder},
decode::{DbnMetadata, DecodeRecordRef, DynDecoder},
Compression, Metadata, Record, RecordHeader, VersionUpgradePolicy,
};

Expand Down
17 changes: 8 additions & 9 deletions python/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,15 @@ fn py_to_rs_io_err(e: PyErr) -> io::Error {
#[cfg(test)]
pub mod tests {

use std::io::{Cursor, Seek, Write};

use std::sync::{Arc, Mutex};
use std::{
io::{Cursor, Seek, Write},
sync::{Arc, Mutex},
};

use dbn::datasets::GLBX_MDP3;
use dbn::{
decode::{dbn::Decoder as DbnDecoder, DecodeDbn},
enums::SType,
metadata::MetadataBuilder,
record::TbboMsg,
datasets::GLBX_MDP3,
decode::{dbn::Decoder as DbnDecoder, DbnMetadata, DecodeRecord},
SType, TbboMsg,
};

use super::*;
Expand Down Expand Up @@ -298,7 +297,7 @@ pub mod tests {
let mock_file = MockPyFile::new();
let output_buf = mock_file.inner();
let mock_file = Py::new(py, mock_file).unwrap().into_py(py);
let metadata = MetadataBuilder::new()
let metadata = Metadata::builder()
.dataset(DATASET.to_owned())
.schema(Some($schema))
.start(0)
Expand Down
44 changes: 11 additions & 33 deletions rust/dbn-cli/src/encode.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;

use dbn::{
decode::{DbnRecordDecoder, DecodeDbn, DecodeRecordRef, DynDecoder},
decode::{DbnMetadata, DecodeRecordRef},
encode::{
json, DbnEncodable, DbnRecordEncoder, DynEncoder, DynWriter, EncodeDbn, EncodeRecordRef,
},
Expand All @@ -10,7 +10,10 @@ use dbn::{

use crate::{infer_encoding_and_compression, output_from_args, Args};

pub fn encode_from_dbn<R: io::BufRead>(decoder: DynDecoder<R>, args: &Args) -> anyhow::Result<()> {
pub fn encode_from_dbn<D>(decoder: D, args: &Args) -> anyhow::Result<()>
where
D: DecodeRecordRef + DbnMetadata,
{
let writer = output_from_args(args)?;
let (encoding, compression) = infer_encoding_and_compression(args)?;
let encode_res = if args.should_output_metadata {
Expand All @@ -23,21 +26,7 @@ pub fn encode_from_dbn<R: io::BufRead>(decoder: DynDecoder<R>, args: &Args) -> a
)
.encode_metadata(decoder.metadata())
} else if args.fragment {
encode_fragment(decoder, writer, compression, args)
} else if let Some(limit) = args.limit {
let mut metadata = decoder.metadata().clone();
// Update metadata
metadata.limit = args.limit;
DynEncoder::new(
writer,
encoding,
compression,
&metadata,
args.should_pretty_print,
args.should_pretty_print,
args.should_pretty_print,
)?
.encode_decoded_with_limit(decoder, limit)
encode_fragment(decoder, writer, compression)
} else {
DynEncoder::new(
writer,
Expand All @@ -59,14 +48,14 @@ pub fn encode_from_dbn<R: io::BufRead>(decoder: DynDecoder<R>, args: &Args) -> a
}
}

pub fn encode_from_frag<R: io::Read>(
mut decoder: DbnRecordDecoder<R>,
args: &Args,
) -> anyhow::Result<()> {
pub fn encode_from_frag<D>(mut decoder: D, args: &Args) -> anyhow::Result<()>
where
D: DecodeRecordRef,
{
let writer = output_from_args(args)?;
let (encoding, compression) = infer_encoding_and_compression(args)?;
if args.fragment {
encode_fragment(decoder, writer, compression, args)?;
encode_fragment(decoder, writer, compression)?;
return Ok(());
}
assert!(!args.should_output_metadata);
Expand All @@ -87,7 +76,6 @@ pub fn encode_from_frag<R: io::Read>(
args.should_pretty_print,
args.should_pretty_print,
)?;
let mut n = 0;
let mut has_written_header = encoding != Encoding::Csv;
fn write_header<T: DbnEncodable>(
_record: &T,
Expand Down Expand Up @@ -115,10 +103,6 @@ pub fn encode_from_frag<R: io::Read>(
}
res => res?,
};
n += 1;
if args.limit.map_or(false, |l| n >= l.get()) {
break;
}
}
Ok(())
}
Expand All @@ -127,16 +111,10 @@ fn encode_fragment<D: DecodeRecordRef>(
mut decoder: D,
writer: Box<dyn io::Write>,
compression: Compression,
args: &Args,
) -> dbn::Result<()> {
let mut encoder = DbnRecordEncoder::new(DynWriter::new(writer, compression)?);
let mut n = 0;
while let Some(record) = decoder.decode_record_ref()? {
encoder.encode_record_ref(record)?;
n += 1;
if args.limit.map_or(false, |l| n >= l.get()) {
break;
}
}
Ok(())
}
122 changes: 122 additions & 0 deletions rust/dbn-cli/src/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use std::num::NonZeroU64;

use dbn::{
decode::{DbnMetadata, DecodeRecordRef},
RType, Record, RecordRef, Schema,
};

#[derive(Debug)]
pub struct SchemaFilter<D> {
decoder: D,
rtype: Option<RType>,
}

impl<D> SchemaFilter<D>
where
D: DbnMetadata,
{
pub fn new(mut decoder: D, schema: Option<Schema>) -> Self {
if let Some(schema) = schema {
decoder.metadata_mut().schema = Some(schema);
}
Self::new_no_metadata(decoder, schema)
}
}

impl<D> SchemaFilter<D> {
pub fn new_no_metadata(decoder: D, schema: Option<Schema>) -> Self {
Self {
decoder,
rtype: schema.map(RType::from),
}
}
}

impl<D: DbnMetadata> DbnMetadata for SchemaFilter<D> {
fn metadata(&self) -> &dbn::Metadata {
self.decoder.metadata()
}

fn metadata_mut(&mut self) -> &mut dbn::Metadata {
self.decoder.metadata_mut()
}
}

impl<D: DecodeRecordRef> DecodeRecordRef for SchemaFilter<D> {
fn decode_record_ref(&mut self) -> dbn::Result<Option<dbn::RecordRef>> {
while let Some(record) = self.decoder.decode_record_ref()? {
if self
.rtype
.map(|rtype| rtype as u8 == record.header().rtype)
.unwrap_or(true)
{
// Safe: casting reference to pointer so the pointer will always be valid.
// Getting around borrow checker limitation.
return Ok(Some(unsafe {
RecordRef::unchecked_from_header(record.header())
}));
}
}
Ok(None)
}
}

#[derive(Debug)]
pub struct LimitFilter<D> {
decoder: D,
limit: Option<NonZeroU64>,
record_count: u64,
}

impl<D> LimitFilter<D>
where
D: DbnMetadata,
{
pub fn new(mut decoder: D, limit: Option<NonZeroU64>) -> Self {
if let Some(limit) = limit {
let metadata_limit = &mut decoder.metadata_mut().limit;
if let Some(metadata_limit) = metadata_limit {
*metadata_limit = (*metadata_limit).min(limit);
} else {
*metadata_limit = Some(limit);
}
}
Self::new_no_metadata(decoder, limit)
}
}

impl<D> LimitFilter<D> {
pub fn new_no_metadata(decoder: D, limit: Option<NonZeroU64>) -> Self {
Self {
decoder,
limit,
record_count: 0,
}
}
}

impl<D: DbnMetadata> DbnMetadata for LimitFilter<D> {
fn metadata(&self) -> &dbn::Metadata {
self.decoder.metadata()
}

fn metadata_mut(&mut self) -> &mut dbn::Metadata {
self.decoder.metadata_mut()
}
}

impl<D: DecodeRecordRef> DecodeRecordRef for LimitFilter<D> {
fn decode_record_ref(&mut self) -> dbn::Result<Option<RecordRef>> {
if self
.limit
.map(|limit| self.record_count >= limit.get())
.unwrap_or(false)
{
return Ok(None);
}
Ok(self.decoder.decode_record_ref()?.map(|rec| {
self.record_count += 1;
rec
}))
}
}
17 changes: 13 additions & 4 deletions rust/dbn-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use clap::{ArgAction, Parser, ValueEnum};

use dbn::{
enums::{Compression, Encoding},
VersionUpgradePolicy,
Schema, VersionUpgradePolicy,
};

pub mod encode;
pub mod filter;

/// How the output of the `dbn` command will be encoded.
#[derive(Clone, Copy, Debug, ValueEnum)]
Expand Down Expand Up @@ -118,7 +119,6 @@ pub struct Args {
short = 'l',
long = "limit",
value_name = "NUM_RECORDS",
conflicts_with = "should_output_metadata",
help = "Limit the number of records in the output to the specified number"
)]
pub limit: Option<NonZeroU64>,
Expand Down Expand Up @@ -149,6 +149,12 @@ pub struct Args {
requires = "input_fragment"
)]
pub input_dbn_version_override: Option<u8>,
#[clap(
long = "schema",
help = "Only encode records of this schema. This is particularly useful for transcoding mixed-schema DBN to CSV, which doesn't support mixing schemas",
value_name = "SCHEMA"
)]
pub schema_filter: Option<Schema>,
}

impl Args {
Expand All @@ -174,6 +180,10 @@ impl Args {
VersionUpgradePolicy::AsIs
}
}

pub fn input_version(&self) -> u8 {
self.input_dbn_version_override.unwrap_or(dbn::DBN_VERSION)
}
}

/// Infer the [`Encoding`] and [`Compression`] from `args` if they aren't already explicitly
Expand Down Expand Up @@ -228,8 +238,7 @@ pub fn output_from_args(args: &Args) -> anyhow::Result<Box<dyn io::Write>> {

fn open_output_file(path: &PathBuf, force: bool) -> anyhow::Result<File> {
let mut options = File::options();
options.write(true);
options.truncate(true);
options.write(true).truncate(true);
if force {
options.create(true);
} else if path.exists() {
Expand Down
Loading

0 comments on commit b932e2f

Please sign in to comment.