Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mime-type checking; better panic handling #2304

Merged
merged 8 commits into from
Nov 21, 2024
233 changes: 143 additions & 90 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ gzp = { version = "0.11", default-features = false, features = [
"snappy_default",
] }
hashbrown = { version = "0.15", optional = true }
human-panic = "2"
indexmap = "2.5"
indicatif = "0.17"
itertools = "0.13"
Expand Down
8 changes: 4 additions & 4 deletions docs/ENVIRONMENT_VARIABLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
| `QSV_MAX_JOBS` | number of jobs to use for multithreaded commands (currently `apply`, `applydp`, `dedup`, `diff`, `extsort`, `frequency`, `joinp`, `schema`, `snappy`, `sort`, `split`, `stats`, `to`, `tojsonl` & `validate`). If not set, max_jobs is set to the detected number of logical processors. See [Multithreading](docs/PERFORMANCE.md#multithreading) for more info. |
| `QSV_NO_UPDATE` | if set, prohibit self-update version check for the latest qsv release published on GitHub. |
| `QSV_LLM_APIKEY` | The API key of the supported LLM service to use with the `describegpt` command. |
| `QSV_OUTPUT_BOM` | if set, the output will have a Byte Order Mark (BOM) at the beginning. This is
used to generate Excel-friendly CSVs on Windows. |
| `QSV_OUTPUT_BOM` | if set, the output will have a Byte Order Mark (BOM) at the beginning. This is used to generate Excel-friendly CSVs on Windows. |
| `QSV_PREFER_DMY` | if set, date parsing will use DMY format. Otherwise, use MDY format (used with `datefmt`, `schema`, `sniff` & `stats` commands). |
| `QSV_REGEX_UNICODE` | if set, makes `search`, `searchset` & `replace` commands unicode-aware. For increased performance, these commands are not unicode-aware by default & will ignore unicode values when matching & will abort when unicode characters are used in the regex. Note that the `apply operations regex_replace` operation is always unicode-aware. |
| `QSV_RDR_BUFFER_CAPACITY` | reader buffer size (default (bytes): 16384) |
| `QSV_WTR_BUFFER_CAPACITY` | writer buffer size (default (bytes): 65536) |
| `QSV_RDR_BUFFER_CAPACITY` | reader buffer size (default - 128k (bytes): 131072) |
| `QSV_SKIP_FORMAT_CHECK` | if set, skips mime-type checking of input files. Set this when optimizing for performance and when encountering false positives as a format check involves scanning the input file to infer the mime-type/format. |
| `QSV_WTR_BUFFER_CAPACITY` | writer buffer size (default - 512k (bytes): 524288) |
| `QSV_FREEMEMORY_HEADROOM_PCT` | the percentage of free available memory required when running qsv in "non-streaming" mode (i.e. the entire file needs to be loaded into memory). If the incoming file is greater than the available memory after the headroom is subtracted, qsv will not proceed. See [Memory Management](#memory-management) for more info. (default: (percent) 20 ) |
| `QSV_MEMORY_CHECK` | if set, check if input file size < AVAILABLE memory - HEADROOM (CONSERVATIVE mode) when running in "non-streaming" mode. Otherwise, qsv will only check if the input file size < TOTAL memory - HEADROOM (NORMAL mode). This is done to prevent Out-of-Memory errors. See [Memory Management](#memory-management) for more info. |
| `QSV_LOG_LEVEL` | desired level (default - off; `error`, `warn`, `info`, `trace`, `debug`). |
Expand Down
101 changes: 78 additions & 23 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,28 @@ impl<'de> Deserialize<'de> for Delimiter {

#[derive(Clone, Debug)]
pub struct Config {
pub path: Option<PathBuf>, // None implies <stdin>
idx_path: Option<PathBuf>,
select_columns: Option<SelectColumns>,
delimiter: u8,
pub no_headers: bool,
pub flexible: bool,
terminator: csv::Terminator,
pub quote: u8,
quote_style: csv::QuoteStyle,
double_quote: bool,
escape: Option<u8>,
quoting: bool,
pub preamble_rows: u64,
trim: csv::Trim,
pub autoindex_size: u64,
prefer_dmy: bool,
pub comment: Option<u8>,
snappy: bool, // flag to enable snappy compression/decompression
pub read_buffer: u32,
pub write_buffer: u32,
pub path: Option<PathBuf>, // None implies <stdin>
idx_path: Option<PathBuf>,
select_columns: Option<SelectColumns>,
delimiter: u8,
pub no_headers: bool,
pub flexible: bool,
terminator: csv::Terminator,
pub quote: u8,
quote_style: csv::QuoteStyle,
double_quote: bool,
escape: Option<u8>,
quoting: bool,
pub preamble_rows: u64,
trim: csv::Trim,
pub autoindex_size: u64,
prefer_dmy: bool,
pub comment: Option<u8>,
snappy: bool, // flag to enable snappy compression/decompression
pub read_buffer: u32,
pub write_buffer: u32,
pub skip_format_check: bool,
pub format_error: Option<String>,
}

// Empty trait as an alias for Seek and Read that avoids auto trait errors
Expand Down Expand Up @@ -137,11 +139,14 @@ impl Config {
/// - `QSV_PREFER_DMY`: Sets date format preference.
/// - `QSV_RDR_BUFFER_CAPACITY`: Sets read buffer capacity.
/// - `QSV_WTR_BUFFER_CAPACITY`: Sets write buffer capacity.
/// - `QSV_SKIP_FORMAT_CHECK`: Set to skip mime-type checking.
pub fn new(path: Option<&String>) -> Config {
let default_delim = match env::var("QSV_DEFAULT_DELIMITER") {
Ok(delim) => Delimiter::decode_delimiter(&delim).unwrap().as_byte(),
_ => b',',
};
let mut skip_format_check = true;
let mut format_error = None;
let (path, mut delim, snappy) = match path {
None => (None, default_delim, false),
// WIP: support remote files; currently only http(s) is supported
Expand All @@ -163,6 +168,30 @@ impl Config {
Some(s) if s == "-" => (None, default_delim, false),
Some(ref s) => {
let path = PathBuf::from(s);
skip_format_check = util::get_envvar_flag("QSV_SKIP_FORMAT_CHECK");
if !skip_format_check {
if let Ok(file_format) = file_format::FileFormat::from_file(&path) {
let detected_mime = file_format.media_type();
// determine the file type by scanning the file
// we support the following mime-types:
// x-empty: empty file
// octet-stream: the file-format crate falls back to this when it cannot
// figure the mime-type, so its not actually binary data
// x-snappy-framed: for snappy compressed files
// text/*: its a text file type of some sort that is a possible CSV
// candidate that we will trap later on with the csv crate
if !(detected_mime == "application/x-empty"
|| detected_mime == "application/octet-stream"
|| detected_mime == "application/x-snappy-framed"
|| detected_mime.starts_with("text/"))
{
format_error = Some(format!(
"{} is using an unsupported file format: {detected_mime}",
path.display()
));
}
}
}
let (file_extension, delim, snappy) = get_delim_by_extension(&path, default_delim);
(Some(path), delim, snappy || file_extension.ends_with("sz"))
},
Expand Down Expand Up @@ -228,6 +257,8 @@ impl Config {
.unwrap_or_else(|_| DEFAULT_WTR_BUFFER_CAPACITY.to_string())
.parse()
.unwrap_or(DEFAULT_WTR_BUFFER_CAPACITY as u32),
format_error,
skip_format_check,
}
}

Expand Down Expand Up @@ -391,7 +422,14 @@ impl Config {
}

pub fn reader(&self) -> io::Result<csv::Reader<Box<dyn io::Read + Send + 'static>>> {
Ok(self.from_reader(self.io_reader()?))
if !self.skip_format_check && self.format_error.is_some() {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
self.format_error.clone().unwrap(),
))
} else {
Ok(self.from_reader(self.io_reader()?))
}
}

pub fn reader_file(&self) -> io::Result<csv::Reader<fs::File>> {
Expand All @@ -400,7 +438,16 @@ impl Config {
io::ErrorKind::InvalidInput,
"Cannot use <stdin> here",
)),
Some(ref p) => fs::File::open(p).map(|f| self.from_reader(f)),
Some(ref p) => {
if !self.skip_format_check && self.format_error.is_some() {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
self.format_error.clone().unwrap(),
))
} else {
fs::File::open(p).map(|f| self.from_reader(f))
}
},
}
}

Expand All @@ -414,7 +461,15 @@ impl Config {
stdin.lock().read_to_end(&mut buffer)?;
self.from_reader(Box::new(io::Cursor::new(buffer)))
},
Some(ref p) => self.from_reader(Box::new(fs::File::open(p).unwrap())),
Some(ref p) => {
if !self.skip_format_check && self.format_error.is_some() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
self.format_error.clone().unwrap(),
));
}
self.from_reader(Box::new(fs::File::open(p)?))
},
})
}

Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ struct Args {
}

fn main() -> QsvExitCode {
util::qsv_custom_panic();

let mut enabled_commands = String::new();
#[cfg(all(feature = "apply", feature = "feature_capable"))]
enabled_commands.push_str(" apply Apply series of transformations to a column\n");
Expand Down
2 changes: 2 additions & 0 deletions src/maindp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ struct Args {
}

fn main() -> QsvExitCode {
util::qsv_custom_panic();

let now = Instant::now();
let (qsv_args, _) = match util::init_logger() {
Ok((qsv_args, logger_handle)) => (qsv_args, logger_handle),
Expand Down
2 changes: 2 additions & 0 deletions src/mainlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ struct Args {
}

fn main() -> QsvExitCode {
util::qsv_custom_panic();

let now = Instant::now();
let (qsv_args, _) = match util::init_logger() {
Ok((qsv_args, logger_handle)) => (qsv_args, logger_handle),
Expand Down
17 changes: 14 additions & 3 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
use csv::ByteRecord;
use docopt::Docopt;
use filetime::FileTime;
use human_panic::setup_panic;
#[cfg(any(feature = "feature_capable", feature = "lite"))]
use indicatif::{HumanCount, ProgressBar, ProgressDrawTarget, ProgressStyle};
use log::{info, log_enabled};
Expand Down Expand Up @@ -104,6 +105,15 @@ const QSV_POLARS_REV: &str = match option_env!("QSV_POLARS_REV") {
None => "",
};

pub fn qsv_custom_panic() {
setup_panic!(
human_panic::Metadata::new(env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
.authors("datHere qsv maintainers")
.homepage("https://qsv.dathere.com")
.support("- Open a GitHub issue at https://github.com/jqnatividad/qsv/issues")
);
}

fn default_user_agent() -> String {
let unknown_command = "Unknown".to_string();
let current_command = CURRENT_COMMAND.get().unwrap_or(&unknown_command);
Expand Down Expand Up @@ -1380,6 +1390,7 @@ pub fn load_dotenv() -> CliResult<()> {
Ok(())
}

#[inline]
pub fn get_envvar_flag(key: &str) -> bool {
if let Ok(tf_val) = std::env::var(key) {
let tf_val = tf_val.to_lowercase();
Expand Down Expand Up @@ -1981,7 +1992,7 @@ pub fn get_stats_records(
break;
}
s_slice = curr_line.as_bytes().to_vec();
match simd_json::serde::from_slice(&mut **&mut s_slice) {
match simd_json::serde::from_slice(&mut s_slice) {
Ok(stats) => csv_stats.push(stats),
Err(_) => continue,
}
Expand Down Expand Up @@ -2112,7 +2123,7 @@ pub fn get_stats_records(
}

// create a statsdatajon from the output of the stats command
csv_to_jsonl(&tempfile_path, &get_stats_data_types(), &statsdatajson_path)?;
csv_to_jsonl(&tempfile_path, &get_stats_data_types(), statsdatajson_path)?;

let statsdatajson_rdr =
BufReader::with_capacity(DEFAULT_RDR_BUFFER_CAPACITY, File::open(statsdatajson_path)?);
Expand All @@ -2125,7 +2136,7 @@ pub fn get_stats_records(
break;
}
s_slice = curr_line.as_bytes().to_vec();
match simd_json::serde::from_slice(&mut **&mut s_slice) {
match simd_json::serde::from_slice(&mut s_slice) {
Ok(stats) => csv_stats.push(stats),
Err(_) => continue,
}
Expand Down
1 change: 1 addition & 0 deletions tests/test_cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ where

let mut cmd = wrk.command("cat");
modify_cmd(cmd.arg(which).arg("in1.csv").arg("in2.csv"));
wrk.assert_success(&mut cmd);
wrk.read_stdout(&mut cmd)
}

Expand Down
2 changes: 2 additions & 0 deletions tests/test_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ fn snappy_automatic_decompression() {
let mut cmd = wrk.command("count");
cmd.arg(test_file);

wrk.assert_success(&mut cmd);

let got: String = wrk.stdout(&mut cmd);
let expected = "100";
assert_eq!(got, expected);
Expand Down
Loading