Skip to content

Commit

Permalink
Merge pull request #2224 from jqnatividad/2211-sqlp-cache-schema
Browse files Browse the repository at this point in the history
`sqlp`: add `--cache-schema` option
  • Loading branch information
jqnatividad authored Oct 21, 2024
2 parents e735d15 + 8acaead commit b1ec116
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 22 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ polars = { version = "0.43", features = [
"performant",
"pivot",
"semi_anti_join",
"serde",
"serde-lazy",
"sql",
"streaming",
Expand Down
2 changes: 1 addition & 1 deletion contrib/completions/examples/qsv.bash
Original file line number Diff line number Diff line change
Expand Up @@ -3756,7 +3756,7 @@ _qsv() {
return 0
;;
qsv__sqlp)
opts="-h --format --try-parsedates --infer-len --streaming --low-memory --no-optimizations --truncate-ragged-lines --ignore-errors --rnull-values --decimal-comma --datetime-format --date-format --time-format --float-precision --wnull-value --compression --compress-level --statistics --output --delimiter --quiet --help"
opts="-h --format --try-parsedates --infer-len --cache-schema --streaming --low-memory --no-optimizations --truncate-ragged-lines --ignore-errors --rnull-values --decimal-comma --datetime-format --date-format --time-format --float-precision --wnull-value --compression --compress-level --statistics --output --delimiter --quiet --help"
if [[ ${cur} == -* || ${COMP_CWORD} -eq 2 ]] ; then
COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") )
return 0
Expand Down
1 change: 1 addition & 0 deletions contrib/completions/examples/qsv.elv
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,7 @@ set edit:completion:arg-completer[qsv] = {|@words|
cand --format 'format'
cand --try-parsedates 'try-parsedates'
cand --infer-len 'infer-len'
cand --cache-schema 'cache-schema'
cand --streaming 'streaming'
cand --low-memory 'low-memory'
cand --no-optimizations 'no-optimizations'
Expand Down
3 changes: 3 additions & 0 deletions contrib/completions/examples/qsv.fig.js
Original file line number Diff line number Diff line change
Expand Up @@ -3404,6 +3404,9 @@ const completion: Fig.Spec = {
{
name: "--infer-len",
},
{
name: "--cache-schema",
},
{
name: "--streaming",
},
Expand Down
1 change: 1 addition & 0 deletions contrib/completions/examples/qsv.fish
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,7 @@ complete -c qsv -n "__fish_qsv_using_subcommand split" -s h -l help -d 'Print he
complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l format
complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l try-parsedates
complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l infer-len
complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l cache-schema
complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l streaming
complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l low-memory
complete -c qsv -n "__fish_qsv_using_subcommand sqlp" -l no-optimizations
Expand Down
1 change: 1 addition & 0 deletions contrib/completions/examples/qsv.nu
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,7 @@ module completions {
--format
--try-parsedates
--infer-len
--cache-schema
--streaming
--low-memory
--no-optimizations
Expand Down
1 change: 1 addition & 0 deletions contrib/completions/examples/qsv.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -1524,6 +1524,7 @@ Register-ArgumentCompleter -Native -CommandName 'qsv' -ScriptBlock {
[CompletionResult]::new('--format', 'format', [CompletionResultType]::ParameterName, 'format')
[CompletionResult]::new('--try-parsedates', 'try-parsedates', [CompletionResultType]::ParameterName, 'try-parsedates')
[CompletionResult]::new('--infer-len', 'infer-len', [CompletionResultType]::ParameterName, 'infer-len')
[CompletionResult]::new('--cache-schema', 'cache-schema', [CompletionResultType]::ParameterName, 'cache-schema')
[CompletionResult]::new('--streaming', 'streaming', [CompletionResultType]::ParameterName, 'streaming')
[CompletionResult]::new('--low-memory', 'low-memory', [CompletionResultType]::ParameterName, 'low-memory')
[CompletionResult]::new('--no-optimizations', 'no-optimizations', [CompletionResultType]::ParameterName, 'no-optimizations')
Expand Down
1 change: 1 addition & 0 deletions contrib/completions/examples/qsv.zsh
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,7 @@ _arguments "${_arguments_options[@]}" : \
'--format[]' \
'--try-parsedates[]' \
'--infer-len[]' \
'--cache-schema[]' \
'--streaming[]' \
'--low-memory[]' \
'--no-optimizations[]' \
Expand Down
1 change: 1 addition & 0 deletions contrib/completions/src/cmd/sqlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub fn sqlp_cmd() -> Command {
arg!(--format),
arg!(--"try-parsedates"),
arg!(--"infer-len"),
arg!(--"cache-schema"),
arg!(--streaming),
arg!(--"low-memory"),
arg!(--"no-optimizations"),
Expand Down
113 changes: 93 additions & 20 deletions src/cmd/sqlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ sqlp options:
--infer-len <arg> The number of rows to scan when inferring the schema of the CSV.
Set to 0 to do a full table scan (warning: can be slow).
[default: 10000]
--cache-schema Create and cache Polars schema JSON files.
If specified and the schema file/s do not exist, it will save the
inferred schemas in JSON format. Each schema file will have the same
file stem as the corresponding input file, with the extension ".pschema.json"
(data.csv's Polars schema file will be data.pschema.json)
If the file/s exists, it will load the schema instead of inferring it
(ignoring --infer-len) and attempt to use it for each corresponding
Polars "table" with the same file stem.
--streaming Use streaming mode when parsing CSVs. This will use less memory
but will be slower. Only use this when you get out of memory errors.
--low-memory Use low memory mode when parsing CSVs. This will use less memory
Expand Down Expand Up @@ -257,7 +265,7 @@ use std::{
env,
fs::File,
io,
io::{Read, Write},
io::{BufReader, BufWriter, Read, Write},
path::{Path, PathBuf},
str::FromStr,
time::Instant,
Expand All @@ -267,9 +275,9 @@ use polars::{
datatypes::PlSmallStr,
io::avro::{AvroWriter, Compression as AvroCompression},
prelude::{
CsvWriter, DataFrame, GzipLevel, IpcCompression, IpcWriter, JsonFormat, JsonWriter,
Arc, CsvWriter, DataFrame, GzipLevel, IpcCompression, IpcWriter, JsonFormat, JsonWriter,
LazyCsvReader, LazyFileListReader, NullValues, OptFlags, ParquetCompression, ParquetWriter,
SerWriter, StatisticsOptions, ZstdLevel,
Schema, SerWriter, StatisticsOptions, ZstdLevel,
},
sql::SQLContext,
};
Expand All @@ -294,6 +302,7 @@ struct Args {
flag_format: String,
flag_try_parsedates: bool,
flag_infer_len: usize,
flag_cache_schema: bool,
flag_streaming: bool,
flag_low_memory: bool,
flag_no_optimizations: bool,
Expand Down Expand Up @@ -677,6 +686,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
&& !args.flag_no_optimizations
&& !args.flag_try_parsedates
&& args.flag_infer_len == 10_000 // make sure this matches the usage text default
&& !args.flag_cache_schema
&& !args.flag_streaming
&& !args.flag_low_memory
&& !args.flag_truncate_ragged_lines
Expand Down Expand Up @@ -713,12 +723,18 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
log::debug!("Using fast path - Modified Query: {modified_query}");
}
} else {
// --------------------------------------------
// we have more than one input and/or we are using CSV parsing options, so we need to
// parse the CSV first, and register the input files as tables in the SQL context
// AKA the "slow path"
// --------------------------------------------

if debuglog_flag {
// Using the slow path to read and parse the CSV/s into tables in the SQL context.
log::debug!("Using the slow path...");
}
// we have more than one input and/or we are using CSV parsing options, so we need to
// parse the CSV first, and register the input files as tables in the SQL context

let cache_schemas = args.flag_cache_schema;

for (idx, table) in args.arg_input.iter().enumerate() {
// as we are using the table name as alias, we need to make sure that the table name is
// a valid identifier. if its not utf8, we use the lossy version
Expand All @@ -738,20 +754,77 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
alias = table_aliases.get(table_name).unwrap(),
);
}
let lf = LazyCsvReader::new(table)
.with_has_header(true)
.with_missing_is_null(true)
.with_comment_prefix(comment_char.clone())
.with_null_values(Some(NullValues::AllColumns(rnull_values.clone())))
.with_separator(tsvssv_delim(table, delim))
.with_infer_schema_length(Some(args.flag_infer_len))
.with_try_parse_dates(args.flag_try_parsedates)
.with_ignore_errors(args.flag_ignore_errors)
.with_truncate_ragged_lines(args.flag_truncate_ragged_lines)
.with_decimal_comma(args.flag_decimal_comma)
.with_low_memory(args.flag_low_memory)
.finish()?;
ctx.register(table_name, lf.with_optimizations(optflags));

// we build the lazyframe, accounting for the --cache-schema flag
let mut create_schema = cache_schemas;
let mut lf = if cache_schemas {
let mut work_lf = LazyCsvReader::new(table)
.with_has_header(true)
.with_missing_is_null(true)
.with_comment_prefix(comment_char.clone())
.with_null_values(Some(NullValues::AllColumns(rnull_values.clone())))
.with_separator(tsvssv_delim(table, delim))
.with_try_parse_dates(args.flag_try_parsedates)
.with_ignore_errors(args.flag_ignore_errors)
.with_truncate_ragged_lines(args.flag_truncate_ragged_lines)
.with_decimal_comma(args.flag_decimal_comma)
.with_low_memory(args.flag_low_memory);

// --cache-schema is enabled, check if a valid pschema.json file exists for this
// table
let schema_file = table.canonicalize()?.with_extension("pschema.json");
if schema_file.exists()
&& schema_file.metadata()?.modified()? > table.metadata()?.modified()?
{
// We have a valid pschema.json file - it exists and is newer than the table
// load the schema and deserialize it and use it with the lazy frame
let file = File::open(&schema_file)?;
let mut buf_reader = BufReader::new(file);
let mut schema_json = String::with_capacity(100);
buf_reader.read_to_string(&mut schema_json)?;
let schema: Schema = serde_json::from_str(&schema_json)?;
if debuglog_flag {
log::debug!("Loaded schema from file: {}", schema_file.display());
}
work_lf = work_lf.with_schema(Some(Arc::new(schema)));
create_schema = false;
} else {
// there is no valid pschema.json file, infer the schema using --infer-len
work_lf = work_lf.with_infer_schema_length(Some(args.flag_infer_len));
create_schema = true;
}
work_lf.finish()?
} else {
// --cache-schema is not enabled, we always --infer-len schema
LazyCsvReader::new(table)
.with_has_header(true)
.with_missing_is_null(true)
.with_comment_prefix(comment_char.clone())
.with_null_values(Some(NullValues::AllColumns(rnull_values.clone())))
.with_separator(tsvssv_delim(table, delim))
.with_infer_schema_length(Some(args.flag_infer_len))
.with_try_parse_dates(args.flag_try_parsedates)
.with_ignore_errors(args.flag_ignore_errors)
.with_truncate_ragged_lines(args.flag_truncate_ragged_lines)
.with_decimal_comma(args.flag_decimal_comma)
.with_low_memory(args.flag_low_memory)
.finish()?
};
ctx.register(table_name, lf.clone().with_optimizations(optflags));

// the lazy frame's schema has been updated and --cache-schema is enabled
// update the pschema.json file
if create_schema {
let schema = lf.collect_schema()?;
let schema_json = serde_json::to_string_pretty(&schema)?;
let schema_file = table.canonicalize()?.with_extension("pschema.json");
let mut file = BufWriter::new(File::create(&schema_file)?);
file.write_all(schema_json.as_bytes())?;
file.flush()?;
if debuglog_flag {
log::debug!("Saved schema to file: {}", schema_file.display());
}
}
}
}

Expand Down

0 comments on commit b1ec116

Please sign in to comment.