diff --git a/Cargo.toml b/Cargo.toml index 37d2f6d3e..cb1d78f22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,7 +166,6 @@ polars = { version = "0.43", features = [ "performant", "pivot", "semi_anti_join", - "serde", "serde-lazy", "sql", "streaming", diff --git a/contrib/completions/examples/qsv.bash b/contrib/completions/examples/qsv.bash index 50b3a4056..5b2dc9cc2 100644 --- a/contrib/completions/examples/qsv.bash +++ b/contrib/completions/examples/qsv.bash @@ -3756,7 +3756,7 @@ _qsv() { return 0 ;; qsv__sqlp) - opts="-h --format --try-parsedates --infer-len --streaming --low-memory --no-optimizations --truncate-ragged-lines --ignore-errors --rnull-values --decimal-comma --datetime-format --date-format --time-format --float-precision --wnull-value --compression --compress-level --statistics --output --delimiter --quiet --help" + opts="-h --format --try-parsedates --infer-len --cache-schema --streaming --low-memory --no-optimizations --truncate-ragged-lines --ignore-errors --rnull-values --decimal-comma --datetime-format --date-format --time-format --float-precision --wnull-value --compression --compress-level --statistics --output --delimiter --quiet --help" if [[ ${cur} == -* || ${COMP_CWORD} -eq 2 ]] ; then COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) return 0 diff --git a/contrib/completions/examples/qsv.elv b/contrib/completions/examples/qsv.elv index 1b808240b..4c6aff843 100644 --- a/contrib/completions/examples/qsv.elv +++ b/contrib/completions/examples/qsv.elv @@ -1398,6 +1398,7 @@ set edit:completion:arg-completer[qsv] = {|@words| cand --format 'format' cand --try-parsedates 'try-parsedates' cand --infer-len 'infer-len' + cand --cache-schema 'cache-schema' cand --streaming 'streaming' cand --low-memory 'low-memory' cand --no-optimizations 'no-optimizations' diff --git a/contrib/completions/examples/qsv.fig.js b/contrib/completions/examples/qsv.fig.js index 7cb8238cf..9d56c217a 100644 --- a/contrib/completions/examples/qsv.fig.js +++ b/contrib/completions/examples/qsv.fig.js @@ -3404,6 +3404,9 @@ const completion: Fig.Spec = { { name: "--infer-len", }, + { + name: "--cache-schema", + }, { name: "--streaming", }, diff --git a/contrib/completions/examples/qsv.fish b/contrib/completions/examples/qsv.fish index 4254f968e..0e82b0897 100644 --- a/contrib/completions/examples/qsv.fish +++ b/contrib/completions/examples/qsv.fish @@ -1076,6 +1076,7 @@ complete -c qsv -n "__fish_qsv_using_subcommand split" -s h -l help -d 'Print he complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l format complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l try-parsedates complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l infer-len +complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l cache-schema complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l streaming complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l low-memory complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l no-optimizations diff --git a/contrib/completions/examples/qsv.nu b/contrib/completions/examples/qsv.nu index 9d26930f7..2e32d7eb1 100644 --- a/contrib/completions/examples/qsv.nu +++ b/contrib/completions/examples/qsv.nu @@ -1306,6 +1306,7 @@ module completions { --format --try-parsedates --infer-len + --cache-schema --streaming --low-memory --no-optimizations diff --git a/contrib/completions/examples/qsv.ps1 b/contrib/completions/examples/qsv.ps1 index 625b8dddd..4196389f4 100644 --- a/contrib/completions/examples/qsv.ps1 +++ b/contrib/completions/examples/qsv.ps1 @@ -1524,6 +1524,7 @@ Register-ArgumentCompleter -Native -CommandName 'qsv' -ScriptBlock { [CompletionResult]::new('--format', 'format', [CompletionResultType]::ParameterName, 'format') [CompletionResult]::new('--try-parsedates', 'try-parsedates', [CompletionResultType]::ParameterName, 'try-parsedates') [CompletionResult]::new('--infer-len', 'infer-len', [CompletionResultType]::ParameterName, 'infer-len') + [CompletionResult]::new('--cache-schema', 'cache-schema', [CompletionResultType]::ParameterName, 'cache-schema') [CompletionResult]::new('--streaming', 'streaming', [CompletionResultType]::ParameterName, 'streaming') [CompletionResult]::new('--low-memory', 'low-memory', [CompletionResultType]::ParameterName, 'low-memory') [CompletionResult]::new('--no-optimizations', 'no-optimizations', [CompletionResultType]::ParameterName, 'no-optimizations') diff --git a/contrib/completions/examples/qsv.zsh b/contrib/completions/examples/qsv.zsh index 378efafb7..ad63a26e6 100644 --- a/contrib/completions/examples/qsv.zsh +++ b/contrib/completions/examples/qsv.zsh @@ -1686,6 +1686,7 @@ _arguments "${_arguments_options[@]}" : \ '--format[]' \ '--try-parsedates[]' \ '--infer-len[]' \ +'--cache-schema[]' \ '--streaming[]' \ '--low-memory[]' \ '--no-optimizations[]' \ diff --git a/contrib/completions/src/cmd/sqlp.rs b/contrib/completions/src/cmd/sqlp.rs index 36945ff8f..3afd145d8 100644 --- a/contrib/completions/src/cmd/sqlp.rs +++ b/contrib/completions/src/cmd/sqlp.rs @@ -5,6 +5,7 @@ pub fn sqlp_cmd() -> Command { arg!(--format), arg!(--"try-parsedates"), arg!(--"infer-len"), + arg!(--"cache-schema"), arg!(--streaming), arg!(--"low-memory"), arg!(--"no-optimizations"), diff --git a/src/cmd/sqlp.rs b/src/cmd/sqlp.rs index 9b06155c4..c7ea563fa 100644 --- a/src/cmd/sqlp.rs +++ b/src/cmd/sqlp.rs @@ -187,6 +187,14 @@ sqlp options: --infer-len The number of rows to scan when inferring the schema of the CSV. Set to 0 to do a full table scan (warning: can be slow). [default: 10000] + --cache-schema Create and cache Polars schema JSON files. + If specified and the schema file/s do not exist, it will save the + inferred schemas in JSON format. Each schema file will have the same + file stem as the corresponding input file, with the extension ".pschema.json" + (data.csv's Polars schema file will be data.pschema.json) + If the file/s exists, it will load the schema instead of inferring it + (ignoring --infer-len) and attempt to use it for each corresponding + Polars "table" with the same file stem. --streaming Use streaming mode when parsing CSVs. This will use less memory but will be slower. Only use this when you get out of memory errors. --low-memory Use low memory mode when parsing CSVs. This will use less memory @@ -257,7 +265,7 @@ use std::{ env, fs::File, io, - io::{Read, Write}, + io::{BufReader, BufWriter, Read, Write}, path::{Path, PathBuf}, str::FromStr, time::Instant, @@ -267,9 +275,9 @@ use polars::{ datatypes::PlSmallStr, io::avro::{AvroWriter, Compression as AvroCompression}, prelude::{ - CsvWriter, DataFrame, GzipLevel, IpcCompression, IpcWriter, JsonFormat, JsonWriter, + Arc, CsvWriter, DataFrame, GzipLevel, IpcCompression, IpcWriter, JsonFormat, JsonWriter, LazyCsvReader, LazyFileListReader, NullValues, OptFlags, ParquetCompression, ParquetWriter, - SerWriter, StatisticsOptions, ZstdLevel, + Schema, SerWriter, StatisticsOptions, ZstdLevel, }, sql::SQLContext, }; @@ -294,6 +302,7 @@ struct Args { flag_format: String, flag_try_parsedates: bool, flag_infer_len: usize, + flag_cache_schema: bool, flag_streaming: bool, flag_low_memory: bool, flag_no_optimizations: bool, @@ -677,6 +686,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> { && !args.flag_no_optimizations && !args.flag_try_parsedates && args.flag_infer_len == 10_000 // make sure this matches the usage text default + && !args.flag_cache_schema && !args.flag_streaming && !args.flag_low_memory && !args.flag_truncate_ragged_lines @@ -713,12 +723,18 @@ pub fn run(argv: &[&str]) -> CliResult<()> { log::debug!("Using fast path - Modified Query: {modified_query}"); } } else { + // -------------------------------------------- + // we have more than one input and/or we are using CSV parsing options, so we need to + // parse the CSV first, and register the input files as tables in the SQL context + // AKA the "slow path" + // -------------------------------------------- + if debuglog_flag { - // Using the slow path to read and parse the CSV/s into tables in the SQL context. log::debug!("Using the slow path..."); } - // we have more than one input and/or we are using CSV parsing options, so we need to - // parse the CSV first, and register the input files as tables in the SQL context + + let cache_schemas = args.flag_cache_schema; + for (idx, table) in args.arg_input.iter().enumerate() { // as we are using the table name as alias, we need to make sure that the table name is // a valid identifier. if its not utf8, we use the lossy version @@ -738,20 +754,77 @@ pub fn run(argv: &[&str]) -> CliResult<()> { alias = table_aliases.get(table_name).unwrap(), ); } - let lf = LazyCsvReader::new(table) - .with_has_header(true) - .with_missing_is_null(true) - .with_comment_prefix(comment_char.clone()) - .with_null_values(Some(NullValues::AllColumns(rnull_values.clone()))) - .with_separator(tsvssv_delim(table, delim)) - .with_infer_schema_length(Some(args.flag_infer_len)) - .with_try_parse_dates(args.flag_try_parsedates) - .with_ignore_errors(args.flag_ignore_errors) - .with_truncate_ragged_lines(args.flag_truncate_ragged_lines) - .with_decimal_comma(args.flag_decimal_comma) - .with_low_memory(args.flag_low_memory) - .finish()?; - ctx.register(table_name, lf.with_optimizations(optflags)); + + // we build the lazyframe, accounting for the --cache-schema flag + let mut create_schema = cache_schemas; + let mut lf = if cache_schemas { + let mut work_lf = LazyCsvReader::new(table) + .with_has_header(true) + .with_missing_is_null(true) + .with_comment_prefix(comment_char.clone()) + .with_null_values(Some(NullValues::AllColumns(rnull_values.clone()))) + .with_separator(tsvssv_delim(table, delim)) + .with_try_parse_dates(args.flag_try_parsedates) + .with_ignore_errors(args.flag_ignore_errors) + .with_truncate_ragged_lines(args.flag_truncate_ragged_lines) + .with_decimal_comma(args.flag_decimal_comma) + .with_low_memory(args.flag_low_memory); + + // --cache-schema is enabled, check if a valid pschema.json file exists for this + // table + let schema_file = table.canonicalize()?.with_extension("pschema.json"); + if schema_file.exists() + && schema_file.metadata()?.modified()? > table.metadata()?.modified()? + { + // We have a valid pschema.json file - it exists and is newer than the table + // load the schema and deserialize it and use it with the lazy frame + let file = File::open(&schema_file)?; + let mut buf_reader = BufReader::new(file); + let mut schema_json = String::with_capacity(100); + buf_reader.read_to_string(&mut schema_json)?; + let schema: Schema = serde_json::from_str(&schema_json)?; + if debuglog_flag { + log::debug!("Loaded schema from file: {}", schema_file.display()); + } + work_lf = work_lf.with_schema(Some(Arc::new(schema))); + create_schema = false; + } else { + // there is no valid pschema.json file, infer the schema using --infer-len + work_lf = work_lf.with_infer_schema_length(Some(args.flag_infer_len)); + create_schema = true; + } + work_lf.finish()? + } else { + // --cache-schema is not enabled, we always --infer-len schema + LazyCsvReader::new(table) + .with_has_header(true) + .with_missing_is_null(true) + .with_comment_prefix(comment_char.clone()) + .with_null_values(Some(NullValues::AllColumns(rnull_values.clone()))) + .with_separator(tsvssv_delim(table, delim)) + .with_infer_schema_length(Some(args.flag_infer_len)) + .with_try_parse_dates(args.flag_try_parsedates) + .with_ignore_errors(args.flag_ignore_errors) + .with_truncate_ragged_lines(args.flag_truncate_ragged_lines) + .with_decimal_comma(args.flag_decimal_comma) + .with_low_memory(args.flag_low_memory) + .finish()? + }; + ctx.register(table_name, lf.clone().with_optimizations(optflags)); + + // the lazy frame's schema has been updated and --cache-schema is enabled + // update the pschema.json file + if create_schema { + let schema = lf.collect_schema()?; + let schema_json = serde_json::to_string_pretty(&schema)?; + let schema_file = table.canonicalize()?.with_extension("pschema.json"); + let mut file = BufWriter::new(File::create(&schema_file)?); + file.write_all(schema_json.as_bytes())?; + file.flush()?; + if debuglog_flag { + log::debug!("Saved schema to file: {}", schema_file.display()); + } + } } }