Skip to content

Commit

Permalink
ADD: Add DBN merging
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen committed Dec 18, 2024
1 parent f8d10c1 commit eb536e7
Show file tree
Hide file tree
Showing 20 changed files with 1,119 additions and 181 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
functions to the C API
- Updated the value of the `MAX_RECORD_LEN` constant for the changes to
`InstrumentDefMsg` in version 3
- Added initial support for merging DBN:
- Decoding streams: `MergeDecoder` and `MergeRecordDecoder` structs
- Metadata: `MergeDecoder` struct and `Metadata::merge()` method
- In the CLI: specify more than one input file to initiate a merge
- Relaxed `DecodeRecord` trait constraint on `StreamIterDecoder`'s inner decoder
- Added `DbnMetadata` implementation for `StreamInnerDecoder` if the inner decoder
implements `DbnMetadata`

## 0.25.0 - 2024-12-17

Expand Down
4 changes: 2 additions & 2 deletions rust/dbn-cli/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn silence_broken_pipe(err: anyhow::Error) -> anyhow::Result<()> {
Err(err)
}

pub fn encode_from_dbn<D>(mut decoder: D, args: &Args) -> anyhow::Result<()>
pub fn encode_from_dbn<D>(args: &Args, mut decoder: D) -> anyhow::Result<()>
where
D: DecodeRecordRef + DbnMetadata,
{
Expand Down Expand Up @@ -67,7 +67,7 @@ where
Ok(())
}

pub fn encode_from_frag<D>(mut decoder: D, args: &Args) -> anyhow::Result<()>
pub fn encode_from_frag<D>(args: &Args, mut decoder: D) -> anyhow::Result<()>
where
D: DecodeRecordRef,
{
Expand Down
8 changes: 5 additions & 3 deletions rust/dbn-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ pub enum OutputEncoding {
#[cfg_attr(test, derive(Default))]
pub struct Args {
#[clap(
help = "A DBN or legacy DBZ file to convert to another encoding. Pass '-' to read from standard input",
value_name = "FILE"
help = "One or more DBN or legacy DBZ files to decode. Passing multiple files will result in a merge. Pass '-' to read from standard input",
value_name = "FILE...",
value_delimiter = ' ',
num_args = 1..,
)]
pub input: PathBuf,
pub input: Vec<PathBuf>,
#[clap(
short,
long,
Expand Down
143 changes: 86 additions & 57 deletions rust/dbn-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::{
fs::File,
io::{self, BufReader},
io::{self, BufRead, BufReader},
path::Path,
};

use anyhow::Context;
use clap::Parser;
use dbn::decode::{DbnMetadata, DbnRecordDecoder, DecodeRecordRef, DynDecoder};
use dbn::decode::{
DbnMetadata, DbnRecordDecoder, DecodeRecordRef, DynDecoder, MergeDecoder, MergeRecordDecoder,
};
use dbn_cli::{
encode::{encode_from_dbn, encode_from_frag, silence_broken_pipe},
filter::{LimitFilter, SchemaFilter},
Expand All @@ -13,81 +17,106 @@ use dbn_cli::{

const STDIN_SENTINEL: &str = "-";

fn wrap_frag(args: &Args, reader: impl io::Read) -> anyhow::Result<impl DecodeRecordRef> {
// assume no ts_out for fragments
const TS_OUT: bool = false;
Ok(LimitFilter::new_no_metadata(
SchemaFilter::new_no_metadata(
DbnRecordDecoder::with_version(
reader,
args.input_version(),
args.upgrade_policy(),
TS_OUT,
)?,
args.schema_filter,
),
fn open_input_file(path: &Path) -> anyhow::Result<File> {
File::open(path).with_context(|| format!("opening file to decode at path '{}'", path.display()))
}

fn wrap_frag(args: &Args, decoder: impl DecodeRecordRef) -> impl DecodeRecordRef {
LimitFilter::new_no_metadata(
SchemaFilter::new_no_metadata(decoder, args.schema_filter),
args.limit,
)
}

/// assume no ts_out for fragments
const FRAG_TS_OUT: bool = false;

fn decode_frag(args: &Args, reader: impl io::Read) -> anyhow::Result<impl DecodeRecordRef> {
Ok(wrap_frag(
args,
DbnRecordDecoder::with_version(
reader,
args.input_version(),
args.upgrade_policy(),
FRAG_TS_OUT,
)?,
))
}

fn wrap<R: io::BufRead>(
fn wrap(
args: &Args,
decoder: DynDecoder<'static, R>,
decoder: impl DecodeRecordRef + DbnMetadata,
) -> impl DecodeRecordRef + DbnMetadata {
LimitFilter::new(SchemaFilter::new(decoder, args.schema_filter), args.limit)
}

fn main() -> anyhow::Result<()> {
main_impl().or_else(silence_broken_pipe)
fn with_inputs(args: Args) -> anyhow::Result<()> {
if args.is_input_fragment {
let decoders = args
.input
.iter()
.map(|input| {
Ok(DbnRecordDecoder::with_version(
BufReader::new(open_input_file(input)?),
args.input_version(),
args.upgrade_policy(),
FRAG_TS_OUT,
)?)
})
.collect::<anyhow::Result<Vec<DbnRecordDecoder<BufReader<File>>>>>()?;
encode_from_frag(&args, MergeRecordDecoder::new(decoders)?)
} else if args.is_input_zstd_fragment {
let decoders = args
.input
.iter()
.map(|input| {
Ok(DbnRecordDecoder::with_version(
zstd::stream::Decoder::new(open_input_file(input)?)?,
args.input_version(),
args.upgrade_policy(),
FRAG_TS_OUT,
)?)
})
.collect::<anyhow::Result<Vec<DbnRecordDecoder<zstd::stream::Decoder<BufReader<File>>>>>>()?;
encode_from_frag(&args, MergeRecordDecoder::new(decoders)?)
} else {
let decoders = args
.input
.iter()
.map(|input| DynDecoder::from_file(input, args.upgrade_policy()))
.collect::<dbn::Result<Vec<DynDecoder<BufReader<File>>>>>()?;
encode_from_dbn(&args, wrap(&args, MergeDecoder::new(decoders)?))
}
}

fn main_impl() -> anyhow::Result<()> {
let args = Args::parse();
// DBN fragment
fn with_input(args: Args, reader: impl BufRead) -> anyhow::Result<()> {
if args.is_input_fragment {
if args.input.as_os_str() == STDIN_SENTINEL {
encode_from_frag(wrap_frag(&args, io::stdin().lock())?, &args)
} else {
encode_from_frag(
wrap_frag(&args, BufReader::new(File::open(args.input.clone())?))?,
&args,
)
}
// Zstd-compressed DBN fragment
encode_from_frag(&args, decode_frag(&args, reader)?)
} else if args.is_input_zstd_fragment {
if args.input.as_os_str() == STDIN_SENTINEL {
encode_from_frag(
wrap_frag(
&args,
zstd::stream::Decoder::with_buffer(io::stdin().lock())?,
)?,
&args,
)
} else {
encode_from_frag(
wrap_frag(
&args,
zstd::stream::Decoder::new(File::open(args.input.clone())?)?,
)?,
&args,
)
}
// DBN stream (with metadata)
} else if args.input.as_os_str() == STDIN_SENTINEL {
encode_from_dbn(
wrap(
&args,
DynDecoder::inferred_with_buffer(io::stdin().lock(), args.upgrade_policy())?,
),
encode_from_frag(
&args,
decode_frag(&args, zstd::stream::Decoder::with_buffer(reader)?)?,
)
} else {
encode_from_dbn(
&args,
wrap(
&args,
DynDecoder::from_file(&args.input, args.upgrade_policy())?,
DynDecoder::inferred_with_buffer(reader, args.upgrade_policy())?,
),
&args,
)
}
}

fn main() -> anyhow::Result<()> {
let args = Args::parse();
if args.input.len() > 1 {
with_inputs(args)
} else if args.input[0].as_os_str() == STDIN_SENTINEL {
with_input(args, io::stdin().lock())
} else {
let reader = BufReader::new(open_input_file(&args.input[0])?);
with_input(args, reader)
}
.or_else(silence_broken_pipe)
}
Loading

0 comments on commit eb536e7

Please sign in to comment.