From 2c71e3597c18d8ad2aebe243c791f334d52e4631 Mon Sep 17 00:00:00 2001 From: Riccardo Attilio Galli Date: Sun, 11 Feb 2024 16:46:42 -0800 Subject: [PATCH] Add a new parser that does not buffer lines --- src/bin/tuc.rs | 3 + src/lib.rs | 1 + src/stream.rs | 294 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 298 insertions(+) create mode 100644 src/stream.rs diff --git a/src/bin/tuc.rs b/src/bin/tuc.rs index f309c37..dc1f2cf 100644 --- a/src/bin/tuc.rs +++ b/src/bin/tuc.rs @@ -9,6 +9,7 @@ use tuc::cut_lines::read_and_cut_lines; use tuc::cut_str::read_and_cut_str; use tuc::help::{get_help, get_short_help}; use tuc::options::{Opt, EOL}; +use tuc::stream::{read_and_cut_bytes_stream, StreamOpt}; #[cfg(feature = "fast-lane")] use tuc::fast_lane::{read_and_cut_text_as_bytes, FastOpt}; @@ -251,6 +252,8 @@ fn main() -> Result<()> { read_and_cut_bytes(&mut stdin, &mut stdout, &opt)?; } else if opt.bounds_type == BoundsType::Lines { read_and_cut_lines(&mut stdin, &mut stdout, &opt)?; + } else if let Ok(stream_opt) = StreamOpt::try_from(&opt) { + read_and_cut_bytes_stream(&mut stdin, &mut stdout, &stream_opt)?; } else if let Ok(fast_opt) = FastOpt::try_from(&opt) { read_and_cut_text_as_bytes(&mut stdin, &mut stdout, &fast_opt)?; } else { diff --git a/src/lib.rs b/src/lib.rs index 0e91263..76bff02 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,3 +7,4 @@ pub mod fast_lane; pub mod help; pub mod options; mod read_utils; +pub mod stream; diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..e96ab03 --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,294 @@ +use crate::bounds::{BoundOrFiller, BoundsType, Side, UserBounds, UserBoundsList, UserBoundsTrait}; +use crate::options::{Opt, EOL}; +use anyhow::Result; +use bstr::ByteSlice; +use std::convert::TryFrom; +use std::io::BufRead; +use std::io::Write; +use std::ops::Deref; + +#[derive(Debug)] +struct ForwardBounds { + pub list: UserBoundsList, + last_bound_idx: usize, +} + +impl TryFrom<&UserBoundsList> for ForwardBounds { + type Error = &'static str; + + fn try_from(value: &UserBoundsList) -> Result { + if value.is_empty() { + Err("Cannot create ForwardBounds from an empty UserBoundsList") + } else if value.is_forward_only() { + let value: UserBoundsList = + value.iter().cloned().collect::>().into(); + let mut maybe_last_bound: Option = None; + value.iter().enumerate().rev().any(|(idx, bof)| { + if matches!(bof, BoundOrFiller::Bound(_)) { + maybe_last_bound = Some(idx); + true + } else { + false + } + }); + + if let Some(last_bound_idx) = maybe_last_bound { + Ok(ForwardBounds { + list: value, + last_bound_idx, + }) + } else { + Err("Cannot create ForwardBounds from UserBoundsList without bounds") + } + } else { + Err("The provided UserBoundsList is not forward only") + } + } +} + +impl Deref for ForwardBounds { + type Target = UserBoundsList; + + fn deref(&self) -> &Self::Target { + &self.list + } +} + +impl ForwardBounds { + fn get_last_bound(&self) -> &UserBounds { + if let Some(BoundOrFiller::Bound(b)) = self.list.get(self.last_bound_idx) { + b + } else { + panic!("Invariant error: last_bound_idx failed to match a bound") + } + } +} + +#[derive(Debug)] +pub struct StreamOpt { + delimiter: u8, + join: bool, + eol: EOL, + bounds: ForwardBounds, + // ## only_delimited: bool, ## + // We can't support it, because we read fixed blocks of data. + // If we don't find the delimiter in one block and move the next block, + // and then we find the delimiter, we can't print the content from the + // previous block, it's lost. + // The alternative would be to start buffering blocks, but who knows + // how much they'd grow: it would be not different from buffering the + // whole line, and this mode is about doing the job with fixed memory. +} + +impl TryFrom<&Opt> for StreamOpt { + type Error = &'static str; + + fn try_from(value: &Opt) -> Result { + if !value.delimiter.as_bytes().len() == 1 { + return Err("Delimiter must be 1 byte wide for FastOpt"); + } + + if value.complement + || value.greedy_delimiter + || value.compress_delimiter + || value.json + || value.bounds_type != BoundsType::Fields + || value.replace_delimiter.is_some() + || value.trim.is_some() + || value.regex_bag.is_some() + // only_delimited can't be supported without reading the full line first + // to search for delimiters, which can't be done if we read by chunks. + || value.only_delimited + { + return Err( + "StreamOpt supports solely forward fields, join and single-character delimiters", + ); + } + + if let Ok(forward_bounds) = ForwardBounds::try_from(&value.bounds) { + Ok(StreamOpt { + delimiter: value.delimiter.as_bytes().first().unwrap().to_owned(), + join: value.join, + eol: value.eol, + bounds: forward_bounds, + }) + } else { + Err("Bounds cannot be converted to ForwardBounds") + } + } +} + +pub fn read_and_cut_bytes_stream( + stdin: &mut R, + stdout: &mut W, + opt: &StreamOpt, +) -> Result<()> { + let last_interesting_field = opt.bounds.get_last_bound().r; + cut_bytes_stream(stdin, stdout, opt, last_interesting_field)?; + Ok(()) +} + +#[inline(always)] +fn print_field( + stdin: &mut W, + buffer: &[u8], + delim: u8, + prepend_delimiter: bool, +) -> Result<()> { + if prepend_delimiter { + stdin.write_all(&[delim])?; + } + stdin.write_all(buffer)?; + Ok(()) +} + +fn cut_bytes_stream( + stdin: &mut R, + stdout: &mut W, + opt: &StreamOpt, + last_interesting_field: Side, +) -> Result<()> { + //let mut buffer: Vec = Vec::with_capacity(64 * 1024); + + let eol: u8 = opt.eol.into(); + + // With this algorithm we can only move forward, so we can't + // support overlapping ranges + // XXX TODO panic (or move to something different than StreamOpt) + + 'outer: loop { + // new line + // dbg!("new line"); + + let mut bounds_idx = 0; + let mut available; + + let mut curr_field = 0; + let mut go_to_next_line = false; + let mut field_is_continuation = false; + + let mut used; + + 'fields: loop { + available = stdin.fill_buf()?; + //let tmp = available.to_str_lossy(); + + if available.is_empty() { + // end of file + stdout.write_all(&[opt.eol.into()])?; + break 'outer; + } + + let mut prev_idx = 0; + for idx in memchr::memchr2_iter(opt.delimiter, eol, available) { + used = idx + 1; + + curr_field += 1; + + if let Some(BoundOrFiller::Bound(b)) = opt.bounds.get(bounds_idx) { + // TODO creates a dedicated match function + if b.matches(curr_field).unwrap() { + if field_is_continuation && idx == 0 { + bounds_idx += 1; + } else { + print_field( + stdout, + &available[prev_idx..idx], + //opt.delimiter, + b'\t', + !field_is_continuation + && curr_field > 1 + && (opt.join || (b.l != b.r || b.r == Side::Continue)), + )?; + + if b.r == Side::Some(curr_field) { + bounds_idx += 1; + } + } + } + } + + field_is_continuation = false; + + prev_idx = idx + 1; + + if available[idx] == eol { + // end of line reached + break 'fields; + } + + if Side::Some(curr_field) == last_interesting_field { + // There are no more fields we're interested in, + // let's move to the next line + go_to_next_line = true; + break 'fields; + } + } + + // We exhausted the buffer before reaching the next line, so + // - there could be more fields to read + // - the last byte was likely not a delimiter and there is the + // start of a field still in the buffer + + if !available[prev_idx..].is_empty() { + curr_field += 1; + + if let Some(BoundOrFiller::Bound(b)) = opt.bounds.get(bounds_idx) { + if b.matches(curr_field).unwrap() { + print_field( + stdout, + &available[prev_idx..], + // opt.delimiter, false)?; + b'\t', + !field_is_continuation && curr_field > 1 && (opt.join), + )?; + } + } + + // the field was split in two parts, let's reset its counter + curr_field -= 1; + field_is_continuation = true; + } + + // We keep `curr_field` as-is, consume the buffer and read the next block + + used = available.len(); + stdin.consume(used); + } + + // We consumed every field we were interested in in this line + + if go_to_next_line { + let mut idx = used - 1; // remove one. We know it wasn't a newline and + // it ensure that the buffer is not empty during the first loop + + // let mut must_read_more = true; + loop { + //if !must_read_more && available[idx..].is_empty() { + if available[idx..].is_empty() { + stdout.write_all(&[opt.eol.into()])?; + break 'outer; + } + + if let Some(eol_idx) = memchr::memchr(eol, &available[idx..]) { + used = idx + eol_idx + 1; + break; + } + + // Whops, eol was not found in the current buffer. Let's read some more + + used = available.len(); + stdin.consume(used); + available = stdin.fill_buf()?; + idx = 0; + //must_read_more = true; + } + } + + stdin.consume(used); + + stdout.write_all(&[opt.eol.into()])?; + } + + Ok(()) +}