From aa88868e83645480e3851af2426d0014ced9d7f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Standa=20Luke=C5=A1?= Date: Sun, 2 Jun 2024 14:36:20 +0200 Subject: [PATCH] Add support for multi-dimensional arrays By default, the arrays are flattened (instead of failing, as before). With the --array-handling options, the stripped information about array dimensions and alternative lowerbounds can be included. resolves #22 --- README.md | 17 ++++- cli/src/appenders/array.rs | 3 +- cli/src/datatypes/array.rs | 62 ++++++++++++++++++ cli/src/datatypes/mod.rs | 1 + cli/src/main.rs | 8 ++- cli/src/postgres_cloner.rs | 130 ++++++++++++++++++++++++++++++++----- py-tests/test_arrays.py | 40 ++++++++++++ py-tests/wrappers.py | 7 +- 8 files changed, 243 insertions(+), 25 deletions(-) create mode 100644 cli/src/datatypes/array.rs diff --git a/README.md b/README.md index 1bf8aca..31287a1 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,9 @@ Options: - prefer: Attempt to connect with TLS but allow sessions without (default behavior compiled with SSL support) - require: Require the use of TLS + --ssl-root-cert + File with a TLS root certificate in PEM or DER (.crt) format. When specified, the default CA certificates are considered untrusted. The option can be specified multiple times. Using this options implies --sslmode=require + --macaddr-handling How to handle `macaddr` columns @@ -175,7 +178,7 @@ Options: --numeric-handling How to handle `numeric` columns - [default: decimal] + [default: double] Possible values: - decimal: Numeric is stored using the DECIMAL parquet type. Use --decimal-precision and --decimal-scale to set the desired precision and scale @@ -184,7 +187,7 @@ Options: - string: Convert the numeric to a string and store it as UTF8 text. This option never looses precision. Note that text "NaN" may be present if NaN is present in the database --decimal-scale - How many decimal digits after the decimal point are stored in the Parquet file + How many decimal digits after the decimal point are stored in the Parquet file in DECIMAL data type [default: 18] @@ -192,4 +195,14 @@ Options: How many decimal digits are allowed in numeric/DECIMAL column. By default 38, the largest value which fits in 128 bits. If <= 9, the column is stored as INT32; if <= 18, the column is stored as INT64; otherwise BYTE_ARRAY [default: 38] + + --array-handling + Parquet does not support multi-dimensional arrays and arrays with different starting index. pg2parquet flattens the arrays, and this options allows including the stripped information in additional columns + + [default: plain] + + Possible values: + - plain: Postgres arrays are simply stored as Parquet LIST + - dimensions: Postgres arrays are stored as struct of { data: List[T], dims: List[int] } + - dimensions+lowerbound: Postgres arrays are stored as struct of { data: List[T], dims: List[int], lower_bound: List[int] } ``` diff --git a/cli/src/appenders/array.rs b/cli/src/appenders/array.rs index 4460982..b836175 100644 --- a/cli/src/appenders/array.rs +++ b/cli/src/appenders/array.rs @@ -22,7 +22,7 @@ impl ArrayColumnAppender panic!("Cannot create {}, repetition levels {} must be one less than inner repetition levels {}", std::any::type_name::(), rl, inner.max_rl()); } if inner.max_dl() != dl + 1 + allow_element_null as i16 { - panic!("Cannot create {}, definition levels {} must be {} less than inner definition levels {}", std::any::type_name::(), dl, if allow_element_null { "one" } else { "two" }, inner.max_dl()); + panic!("Cannot create {}, definition levels {} must be {} less than inner definition levels {}", std::any::type_name::(), dl, 1 + allow_element_null as i16, inner.max_dl()); } if dl < allow_null as i16 { panic!("Cannot create {}, definition levels {} must be positive", std::any::type_name::(), dl); @@ -94,6 +94,7 @@ impl<'a, TPg: Clone, TInner, TArray: Clone> ColumnAppender for ArrayColu Cow::Owned(Some(value)) => self.copy_value(repetition_index, Cow::::Owned(value)), Cow::Borrowed(Some(value)) => self.copy_value(repetition_index, Cow::Borrowed(value)), Cow::Owned(None) | Cow::Borrowed(None) => { + // if !self.allow_null, this writes an empty array let nested_ri = repetition_index.new_child(); self.inner.write_null(&nested_ri, self.dl - self.allow_null as i16) }, diff --git a/cli/src/datatypes/array.rs b/cli/src/datatypes/array.rs new file mode 100644 index 0000000..7b20f04 --- /dev/null +++ b/cli/src/datatypes/array.rs @@ -0,0 +1,62 @@ +use std::vec; + +use parquet::data_type::ByteArray; +use postgres::{fallible_iterator::FallibleIterator, types::{FromSql, Kind, Type}}; +use postgres_protocol::types::array_from_sql; + +use crate::myfrom::MyFrom; + +#[derive(Debug, Clone)] +pub struct PgMultidimArray { + pub data: Vec, + pub dims: Option>, + pub lower_bounds: PgMultidimArrayLowerBounds +} + +#[derive(Debug, Clone)] +pub enum PgMultidimArrayLowerBounds { + Const(i32), + PerDim(Vec), +} + +impl<'a, T> FromSql<'a> for PgMultidimArray where T: FromSql<'a> { + fn from_sql(ty: &postgres::types::Type, raw: &'a [u8]) -> Result> { + let member_type = match *ty.kind() { + Kind::Array(ref member) => member, + _ => panic!("expected array type"), + }; + + let array = array_from_sql(raw)?; + let mut dims_iter = array.dimensions(); + let (count, dims, lower_bounds) = if let Some(dim1) = dims_iter.next()? { + if let Some(dim2) = dims_iter.next()? { + let mut dims: Vec = vec![dim1.len, dim2.len]; + let mut count = dim1.len * dim2.len; + let mut lb = vec![dim1.lower_bound, dim2.lower_bound]; + for dim in dims_iter.iterator() { + let dim = dim?; + count *= dim.len; + dims.push(dim.len); + lb.push(dim.lower_bound); + } + (count, Some(dims), PgMultidimArrayLowerBounds::PerDim(lb)) + } else { + (dim1.len, None, PgMultidimArrayLowerBounds::Const(dim1.lower_bound)) + } + } else { + (0, None, PgMultidimArrayLowerBounds::Const(1)) + }; + + let mut data: Vec = Vec::with_capacity(count as usize); + for elem in array.values().iterator() { + let elem = elem?; + data.push(T::from_sql_nullable(member_type, elem)?); + } + + Ok(PgMultidimArray { data, dims, lower_bounds }) + } + + fn accepts(ty: &postgres::types::Type) -> bool { + matches!(ty.kind(), Kind::Array(_)) + } +} diff --git a/cli/src/datatypes/mod.rs b/cli/src/datatypes/mod.rs index fc578ba..c0875d5 100644 --- a/cli/src/datatypes/mod.rs +++ b/cli/src/datatypes/mod.rs @@ -2,3 +2,4 @@ pub mod numeric; pub mod money; pub mod jsonb; pub mod interval; +pub mod array; diff --git a/cli/src/main.rs b/cli/src/main.rs index 3525458..b6be50b 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -4,7 +4,7 @@ use std::{sync::Arc, path::PathBuf, process}; use clap::{Parser, ValueEnum, Command}; use parquet::{basic::{ZstdLevel, BrotliLevel, GzipLevel, Compression}, file::properties::DEFAULT_WRITE_BATCH_SIZE}; -use postgres_cloner::{SchemaSettingsMacaddrHandling, SchemaSettingsJsonHandling, SchemaSettingsEnumHandling, SchemaSettingsIntervalHandling, SchemaSettingsNumericHandling}; +use postgres_cloner::{SchemaSettingsArrayHandling, SchemaSettingsEnumHandling, SchemaSettingsIntervalHandling, SchemaSettingsJsonHandling, SchemaSettingsMacaddrHandling, SchemaSettingsNumericHandling}; mod postgresutils; mod myfrom; @@ -130,7 +130,10 @@ pub struct SchemaSettingsArgs { decimal_scale: i32, /// How many decimal digits are allowed in numeric/DECIMAL column. By default 38, the largest value which fits in 128 bits. If <= 9, the column is stored as INT32; if <= 18, the column is stored as INT64; otherwise BYTE_ARRAY. #[arg(long, hide_short_help = true, default_value_t = 38)] - decimal_precision: u32, + decimal_precision: u32, + /// Parquet does not support multi-dimensional arrays and arrays with different starting index. pg2parquet flattens the arrays, and this options allows including the stripped information in additional columns. + #[arg(long, hide_short_help = true, default_value = "plain")] + array_handling: SchemaSettingsArrayHandling, } @@ -232,6 +235,7 @@ fn perform_export(args: ExportArgs) { numeric_handling: args.schema_settings.numeric_handling, decimal_scale: args.schema_settings.decimal_scale, decimal_precision: args.schema_settings.decimal_precision, + array_handling: args.schema_settings.array_handling, }; let query = args.query.unwrap_or_else(|| { format!("SELECT * FROM {}", args.table.unwrap()) diff --git a/cli/src/postgres_cloner.rs b/cli/src/postgres_cloner.rs index a9ce47e..4e6ce50 100644 --- a/cli/src/postgres_cloner.rs +++ b/cli/src/postgres_cloner.rs @@ -6,6 +6,7 @@ use std::marker::PhantomData; use std::net::IpAddr; use std::path::PathBuf; use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use clap::error::Error; @@ -21,8 +22,9 @@ use postgres::{self, Client, RowIter, Row, Column, Statement, NoTls}; use postgres::fallible_iterator::FallibleIterator; use parquet::schema::types::{Type as ParquetType, TypePtr, GroupTypeBuilder}; +use crate::datatypes::array::{PgMultidimArray, PgMultidimArrayLowerBounds}; use crate::PostgresConnArgs; -use crate::appenders::{ColumnAppender, DynamicMergedAppender, RealMemorySize, ArrayColumnAppender, ColumnAppenderBase, GenericColumnAppender, BasicPgRowColumnAppender, RcWrapperAppender, StaticMergedAppender, new_autoconv_generic_appender, PreprocessExt, new_static_merged_appender, DynColumnAppender}; +use crate::appenders::{new_autoconv_generic_appender, new_static_merged_appender, ArrayColumnAppender, BasicPgRowColumnAppender, ColumnAppender, ColumnAppenderBase, DynColumnAppender, DynamicMergedAppender, GenericColumnAppender, PreprocessAppender, PreprocessExt, RcWrapperAppender, RealMemorySize, StaticMergedAppender}; use crate::datatypes::interval::PgInterval; use crate::datatypes::jsonb::PgRawJsonb; use crate::datatypes::money::PgMoney; @@ -42,6 +44,7 @@ pub struct SchemaSettings { pub numeric_handling: SchemaSettingsNumericHandling, pub decimal_scale: i32, pub decimal_precision: u32, + pub array_handling: SchemaSettingsArrayHandling, } #[derive(clap::ValueEnum, Clone, Copy, Debug)] @@ -93,6 +96,18 @@ pub enum SchemaSettingsNumericHandling { String } +#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq)] +pub enum SchemaSettingsArrayHandling { + /// Postgres arrays are simply stored as Parquet LIST + Plain, + /// Postgres arrays are stored as struct of { data: List[T], dims: List[int] } + #[clap(alias="dims")] + Dimensions, + /// Postgres arrays are stored as struct of { data: List[T], dims: List[int], lower_bound: List[int] } + #[clap(name="dimensions+lowerbound", alias="dimensions+lower_bound", alias="dimensions+lower-bound", alias="dims+lb")] + DimensionsAndLowerBound, +} + pub fn default_settings() -> SchemaSettings { SchemaSettings { macaddr_handling: SchemaSettingsMacaddrHandling::Text, @@ -102,6 +117,7 @@ pub fn default_settings() -> SchemaSettings { numeric_handling: SchemaSettingsNumericHandling::Double, decimal_scale: 18, decimal_precision: 38, + array_handling: SchemaSettingsArrayHandling::Plain, } } @@ -360,24 +376,43 @@ fn map_schema_column( let (element_appender, element_schema) = map_schema_column(element_type, &element_column, settings)?; debug_assert_eq!(element_schema.name(), "element"); - let list_schema = ParquetType::group_type_builder("list") - .with_repetition(Repetition::REPEATED) - .with_fields(&mut vec![ - Arc::new(element_schema) - ]) - .build().unwrap(); - let schema = ParquetType::group_type_builder(c.col_name()) - .with_logical_type(Some(LogicalType::List)) - .with_repetition(Repetition::OPTIONAL) - .with_fields(&mut vec![ - Arc::new(list_schema) - ]) - .build().unwrap(); + let plain_schema = settings.array_handling == SchemaSettingsArrayHandling::Plain; + + let schema = if plain_schema { + make_list_schema(c.col_name(), Repetition::OPTIONAL, element_schema) + } else { + make_list_schema("data", Repetition::REQUIRED, element_schema) + }; + assert_eq!(element_appender.max_dl(), element_column.definition_level + 1); assert_eq!(element_appender.max_rl(), element_column.repetition_level); - let array_appender = create_array_appender(element_appender, &c); - Ok((Box::new(array_appender), schema)) + let array_appender = create_array_appender(element_appender, &c, plain_schema); + let dim_appender = create_array_dim_appender::(&c); + let lb_appender = create_array_lower_bound_appender::(&c); + let dim_schema = make_list_schema("dims", Repetition::REQUIRED, ParquetType::primitive_type_builder("element", basic::Type::INT32).with_repetition(Repetition::REQUIRED).with_logical_type(Some(LogicalType::Integer { bit_width: 32, is_signed: false })).build().unwrap()); + let lb_schema = make_list_schema("lower_bound", Repetition::REQUIRED, ParquetType::primitive_type_builder("element", basic::Type::INT32).with_repetition(Repetition::REQUIRED).with_logical_type(Some(LogicalType::Integer { bit_width: 32, is_signed: true })).build().unwrap()); + match settings.array_handling { + SchemaSettingsArrayHandling::Plain => Ok((Box::new(array_appender), schema)), + SchemaSettingsArrayHandling::Dimensions => Ok(( + Box::new( + new_static_merged_appender(c.definition_level + 1, c.repetition_level).add_appender(array_appender).add_appender(dim_appender) + ), + ParquetType::group_type_builder(c.col_name()) + .with_repetition(Repetition::OPTIONAL) + .with_fields(&mut vec![ Arc::new(schema), Arc::new(dim_schema) ]) + .build().unwrap() + )), + SchemaSettingsArrayHandling::DimensionsAndLowerBound => Ok(( + Box::new( + new_static_merged_appender(c.definition_level + 1, c.repetition_level).add_appender(array_appender).add_appender(dim_appender).add_appender(lb_appender) + ), + ParquetType::group_type_builder(c.col_name()) + .with_repetition(Repetition::OPTIONAL) + .with_fields(&mut vec![ Arc::new(schema), Arc::new(dim_schema), Arc::new(lb_schema) ]) + .build().unwrap() + )) + } }, Kind::Domain(ref element_type) => { map_schema_column(element_type, c, settings) @@ -444,6 +479,20 @@ fn map_schema_column( } } +fn make_list_schema(name: &str, repetition: Repetition, element_schema: ParquetType) -> ParquetType { + ParquetType::group_type_builder(name) + .with_logical_type(Some(LogicalType::List)) + .with_repetition(repetition) + .with_fields(&mut vec![ + Arc::new(ParquetType::group_type_builder("list") + .with_repetition(Repetition::REPEATED) + .with_fields(&mut vec![ + Arc::new(element_schema) + ]) + .build().unwrap()) + ]) + .build().unwrap() +} fn map_simple_type( t: &PgType, @@ -641,11 +690,56 @@ fn create_complex_appender FromSql<'a> + Clone + 'static, TRow: PgAb wrap_pg_row_reader(c, RcWrapperAppender::new(main_cp)) } -fn create_array_appender(inner: DynColumnAppender, c: &ColumnInfo) -> impl ColumnAppender { +fn create_array_appender(inner: DynColumnAppender, c: &ColumnInfo, warn_on_multidim: bool) -> impl ColumnAppender { let outer_dl = c.definition_level + 1; debug_assert_eq!(outer_dl + 2, inner.max_dl()); let array_appender = ArrayColumnAppender::new(inner, true, true, outer_dl, c.repetition_level); - wrap_pg_row_reader::>>(c, array_appender) + let warned = AtomicBool::new(false); + let col_clone = c.clone(); + let multidim_appender = array_appender.preprocess(move |x: Cow>>| { + if warn_on_multidim && x.dims.is_some() && !warned.load(Ordering::Relaxed) { + if !warned.fetch_or(true, Ordering::SeqCst) { + eprintln!("Warning: Column {} contains a {}-dimensional array which will be flattened in Parquet (i.e. {} -> {}). Use --array-handling=dimensions, include another column with the PostgreSQL array dimensions.", + col_clone.full_name(), + x.dims.as_ref().unwrap().len(), + x.dims.as_ref().unwrap().iter().map(|x| x.to_string()).collect::>().join("x"), + x.data.len() + ) + } + } + match x { + Cow::Owned(x) => Cow::Owned(x.data), + Cow::Borrowed(x) => Cow::Borrowed(&x.data) + } + }); + wrap_pg_row_reader::>>(c, multidim_appender) +} + +fn create_array_dim_appender FromSql<'a> + 'static, TRow: PgAbstractRow + Clone>(c: &ColumnInfo) -> impl ColumnAppender { + let int_appender = new_autoconv_generic_appender::(c.definition_level + 2, c.repetition_level + 1); + let dim_appender = + ArrayColumnAppender::new(int_appender, false, false, c.definition_level + 1, c.repetition_level) + .preprocess(|x: Cow>>| Cow::>>::Owned( + x.dims.as_ref() + .map(|x| x.iter().map(|c| Some(*c)).collect()) + .unwrap_or_else(|| if x.data.len() == 0 { Vec::new() } else { vec![Some(x.data.len() as i32)] }) + )); + wrap_pg_row_reader::>>(c, dim_appender) +} + + +fn create_array_lower_bound_appender FromSql<'a> + 'static, TRow: PgAbstractRow + Clone>(c: &ColumnInfo) -> impl ColumnAppender { + let int_appender = new_autoconv_generic_appender::(c.definition_level + 2, c.repetition_level + 1); + let dim_appender = + ArrayColumnAppender::new(int_appender, false, false, c.definition_level + 1, c.repetition_level) + .preprocess(|x: Cow>>| Cow::>>::Owned( + match &x.lower_bounds { + _ if x.data.len() == 0 => Vec::new(), + PgMultidimArrayLowerBounds::Const(c) => vec![Some(*c); x.dims.as_ref().map(|x| x.len()).unwrap_or(1)], + PgMultidimArrayLowerBounds::PerDim(v) => v.iter().map(|x| Some(*x)).collect() + } + )); + wrap_pg_row_reader::>>(c, dim_appender) } fn wrap_pg_row_reader FromSql<'a>>(c: &ColumnInfo, a: impl ColumnAppender) -> impl ColumnAppender { diff --git a/py-tests/test_arrays.py b/py-tests/test_arrays.py index 74e6df4..23d9b48 100644 --- a/py-tests/test_arrays.py +++ b/py-tests/test_arrays.py @@ -88,3 +88,43 @@ def r(low, up, low_inc=True, up_inc=False, is_empty=False): # self.assertEqual(pl_df["simple_range"].to_list(), [ r[1] for r in duckdb_table ]) # self.assertEqual(pl_df["rint_"].to_list(), [ r[2] for r in duckdb_table ]) + + def test_multidim(self): + self.maxDiff = None + plain_file = wrappers.create_and_export( + "arrays_multidim", "id", + "id int, a int[], b text[]", + """ + (1, ARRAY[[1,2],[3,4],[NULL,5]], ARRAY[[NULL,'b'],['c',NULL]]), + (2, NULL, NULL), + (3, ARRAY[]::int[], ARRAY[[[]]]::text[]), + (4, ARRAY[[[[1]]]], '{{{a}}}'::text[]), + (5, '[-2:0]={1,2,3}'::int[], '[-1:0][4:5]={{a,b},{c,d}}'::text[]) + """ + ) + + expected_ids = [1, 2, 3, 4, 5] + expected_a = [ [ 1, 2, 3, 4, None, 5 ], None, [], [1], [1, 2, 3] ] + expected_b = [ [ None, "b", "c", None ], None, [], ["a"], ["a", "b", "c", "d"] ] + plain_df = pl.read_parquet(plain_file) + self.assertEqual(plain_df["id"].to_list(), expected_ids) + self.assertEqual(plain_df["a"].to_list(), expected_a) + self.assertEqual(plain_df["b"].to_list(), expected_b) + + dims_file = wrappers.run_export_table("arrays_multidim_dims", "arrays_multidim", "id", options=["--array-handling=dims"]) + dims_df = pl.read_parquet(dims_file) + self.assertEqual(dims_df["id"].to_list(), expected_ids) + self.assertEqual(dims_df["a"].struct.field("data").to_list(), expected_a) + self.assertEqual(dims_df["b"].struct.field("data").to_list(), expected_b) + self.assertEqual(dims_df["a"].struct.field("dims").to_list(), [ [3, 2], None, [], [1, 1, 1, 1], [3] ]) + self.assertEqual(dims_df["b"].struct.field("dims").to_list(), [ [2, 2], None, [], [1, 1, 1], [2, 2] ]) + + dims_lb_file = wrappers.run_export_table("arrays_multidim_dims_lb", "arrays_multidim", "id", options=["--array-handling=dims+lb"]) + dims_lb_df = pl.read_parquet(dims_lb_file) + self.assertEqual(dims_lb_df["id"].to_list(), expected_ids) + self.assertEqual(dims_lb_df["a"].struct.field("data").to_list(), expected_a) + self.assertEqual(dims_lb_df["b"].struct.field("data").to_list(), expected_b) + self.assertEqual(dims_lb_df["a"].struct.field("dims").to_list(), [ [3, 2], None, [], [1, 1, 1, 1], [3] ]) + self.assertEqual(dims_lb_df["a"].struct.field("lower_bound").to_list(), [ [1, 1], None, [], [1, 1, 1, 1], [-2] ]) + self.assertEqual(dims_lb_df["b"].struct.field("lower_bound").to_list(), [ [1, 1], None, [], [1, 1, 1], [-1, 4] ]) + diff --git a/py-tests/wrappers.py b/py-tests/wrappers.py index 32d1627..429cdf3 100644 --- a/py-tests/wrappers.py +++ b/py-tests/wrappers.py @@ -92,7 +92,7 @@ def run_pg2parquet(args: list[str]): return r -def run_export(name, query = None, options = []) -> pyarrow.Table: +def run_export(name, query = None, options = []) -> str: outfile = os.path.join(output_directory, name + ".parquet") if query is not None: query_opt = ["--query", query] @@ -113,6 +113,9 @@ def run_export(name, query = None, options = []) -> pyarrow.Table: return outfile +def run_export_table(name, table, sort_column, options = []) -> str: + return run_export(name, f"SELECT * FROM {table} ORDER BY {sort_column} NULLS LAST", options=options) + def str_to_sql(s: str) -> sql.SQL: return sql.SQL(s) # type: ignore @@ -122,4 +125,4 @@ def create_and_export(name, sort_column, schema, inserts, options=[]): f"CREATE TABLE {name} ({schema})", f"INSERT INTO {name} VALUES {inserts}" ) - return run_export(name, f"SELECT * FROM {name} ORDER BY {sort_column} NULLS LAST", options=options) + return run_export_table(name, name, sort_column, options=options)