Skip to content

Commit

Permalink
Add support for multi-dimensional arrays
Browse files Browse the repository at this point in the history
By default, the arrays are flattened (instead of failing, as before).
With the --array-handling options, the stripped information about array dimensions and alternative lowerbounds can be included.

resolves #22
  • Loading branch information
exyi committed Jun 2, 2024
1 parent 0be1adb commit aa88868
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 25 deletions.
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ Options:
- prefer: Attempt to connect with TLS but allow sessions without (default behavior compiled with SSL support)
- require: Require the use of TLS
--ssl-root-cert <SSL_ROOT_CERT>
File with a TLS root certificate in PEM or DER (.crt) format. When specified, the default CA certificates are considered untrusted. The option can be specified multiple times. Using this options implies --sslmode=require
--macaddr-handling <MACADDR_HANDLING>
How to handle `macaddr` columns
Expand Down Expand Up @@ -175,7 +178,7 @@ Options:
--numeric-handling <NUMERIC_HANDLING>
How to handle `numeric` columns
[default: decimal]
[default: double]
Possible values:
- decimal: Numeric is stored using the DECIMAL parquet type. Use --decimal-precision and --decimal-scale to set the desired precision and scale
Expand All @@ -184,12 +187,22 @@ Options:
- string: Convert the numeric to a string and store it as UTF8 text. This option never looses precision. Note that text "NaN" may be present if NaN is present in the database
--decimal-scale <DECIMAL_SCALE>
How many decimal digits after the decimal point are stored in the Parquet file
How many decimal digits after the decimal point are stored in the Parquet file in DECIMAL data type
[default: 18]
--decimal-precision <DECIMAL_PRECISION>
How many decimal digits are allowed in numeric/DECIMAL column. By default 38, the largest value which fits in 128 bits. If <= 9, the column is stored as INT32; if <= 18, the column is stored as INT64; otherwise BYTE_ARRAY
[default: 38]
--array-handling <ARRAY_HANDLING>
Parquet does not support multi-dimensional arrays and arrays with different starting index. pg2parquet flattens the arrays, and this options allows including the stripped information in additional columns
[default: plain]
Possible values:
- plain: Postgres arrays are simply stored as Parquet LIST
- dimensions: Postgres arrays are stored as struct of { data: List[T], dims: List[int] }
- dimensions+lowerbound: Postgres arrays are stored as struct of { data: List[T], dims: List[int], lower_bound: List[int] }
```
3 changes: 2 additions & 1 deletion cli/src/appenders/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl<TPg: Clone, TInner> ArrayColumnAppender<TPg, TInner>
panic!("Cannot create {}, repetition levels {} must be one less than inner repetition levels {}", std::any::type_name::<Self>(), rl, inner.max_rl());
}
if inner.max_dl() != dl + 1 + allow_element_null as i16 {
panic!("Cannot create {}, definition levels {} must be {} less than inner definition levels {}", std::any::type_name::<Self>(), dl, if allow_element_null { "one" } else { "two" }, inner.max_dl());
panic!("Cannot create {}, definition levels {} must be {} less than inner definition levels {}", std::any::type_name::<Self>(), dl, 1 + allow_element_null as i16, inner.max_dl());
}
if dl < allow_null as i16 {
panic!("Cannot create {}, definition levels {} must be positive", std::any::type_name::<Self>(), dl);
Expand Down Expand Up @@ -94,6 +94,7 @@ impl<'a, TPg: Clone, TInner, TArray: Clone> ColumnAppender<TArray> for ArrayColu
Cow::Owned(Some(value)) => self.copy_value(repetition_index, Cow::<TArray>::Owned(value)),
Cow::Borrowed(Some(value)) => self.copy_value(repetition_index, Cow::Borrowed(value)),
Cow::Owned(None) | Cow::Borrowed(None) => {
// if !self.allow_null, this writes an empty array
let nested_ri = repetition_index.new_child();
self.inner.write_null(&nested_ri, self.dl - self.allow_null as i16)
},
Expand Down
62 changes: 62 additions & 0 deletions cli/src/datatypes/array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::vec;

use parquet::data_type::ByteArray;
use postgres::{fallible_iterator::FallibleIterator, types::{FromSql, Kind, Type}};
use postgres_protocol::types::array_from_sql;

use crate::myfrom::MyFrom;

#[derive(Debug, Clone)]
pub struct PgMultidimArray<T> {
pub data: Vec<T>,
pub dims: Option<Vec<i32>>,
pub lower_bounds: PgMultidimArrayLowerBounds
}

#[derive(Debug, Clone)]
pub enum PgMultidimArrayLowerBounds {
Const(i32),
PerDim(Vec<i32>),
}

impl<'a, T> FromSql<'a> for PgMultidimArray<T> where T: FromSql<'a> {
fn from_sql(ty: &postgres::types::Type, raw: &'a [u8]) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
let member_type = match *ty.kind() {
Kind::Array(ref member) => member,
_ => panic!("expected array type"),
};

let array = array_from_sql(raw)?;
let mut dims_iter = array.dimensions();
let (count, dims, lower_bounds) = if let Some(dim1) = dims_iter.next()? {
if let Some(dim2) = dims_iter.next()? {
let mut dims: Vec<i32> = vec![dim1.len, dim2.len];
let mut count = dim1.len * dim2.len;
let mut lb = vec![dim1.lower_bound, dim2.lower_bound];
for dim in dims_iter.iterator() {
let dim = dim?;
count *= dim.len;
dims.push(dim.len);
lb.push(dim.lower_bound);
}
(count, Some(dims), PgMultidimArrayLowerBounds::PerDim(lb))
} else {
(dim1.len, None, PgMultidimArrayLowerBounds::Const(dim1.lower_bound))
}
} else {
(0, None, PgMultidimArrayLowerBounds::Const(1))
};

let mut data: Vec<T> = Vec::with_capacity(count as usize);
for elem in array.values().iterator() {
let elem = elem?;
data.push(T::from_sql_nullable(member_type, elem)?);
}

Ok(PgMultidimArray { data, dims, lower_bounds })
}

fn accepts(ty: &postgres::types::Type) -> bool {
matches!(ty.kind(), Kind::Array(_))
}
}
1 change: 1 addition & 0 deletions cli/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod numeric;
pub mod money;
pub mod jsonb;
pub mod interval;
pub mod array;
8 changes: 6 additions & 2 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{sync::Arc, path::PathBuf, process};

use clap::{Parser, ValueEnum, Command};
use parquet::{basic::{ZstdLevel, BrotliLevel, GzipLevel, Compression}, file::properties::DEFAULT_WRITE_BATCH_SIZE};
use postgres_cloner::{SchemaSettingsMacaddrHandling, SchemaSettingsJsonHandling, SchemaSettingsEnumHandling, SchemaSettingsIntervalHandling, SchemaSettingsNumericHandling};
use postgres_cloner::{SchemaSettingsArrayHandling, SchemaSettingsEnumHandling, SchemaSettingsIntervalHandling, SchemaSettingsJsonHandling, SchemaSettingsMacaddrHandling, SchemaSettingsNumericHandling};

mod postgresutils;
mod myfrom;
Expand Down Expand Up @@ -130,7 +130,10 @@ pub struct SchemaSettingsArgs {
decimal_scale: i32,
/// How many decimal digits are allowed in numeric/DECIMAL column. By default 38, the largest value which fits in 128 bits. If <= 9, the column is stored as INT32; if <= 18, the column is stored as INT64; otherwise BYTE_ARRAY.
#[arg(long, hide_short_help = true, default_value_t = 38)]
decimal_precision: u32,
decimal_precision: u32,
/// Parquet does not support multi-dimensional arrays and arrays with different starting index. pg2parquet flattens the arrays, and this options allows including the stripped information in additional columns.
#[arg(long, hide_short_help = true, default_value = "plain")]
array_handling: SchemaSettingsArrayHandling,
}


Expand Down Expand Up @@ -232,6 +235,7 @@ fn perform_export(args: ExportArgs) {
numeric_handling: args.schema_settings.numeric_handling,
decimal_scale: args.schema_settings.decimal_scale,
decimal_precision: args.schema_settings.decimal_precision,
array_handling: args.schema_settings.array_handling,
};
let query = args.query.unwrap_or_else(|| {
format!("SELECT * FROM {}", args.table.unwrap())
Expand Down
130 changes: 112 additions & 18 deletions cli/src/postgres_cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::marker::PhantomData;
use std::net::IpAddr;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use clap::error::Error;
Expand All @@ -21,8 +22,9 @@ use postgres::{self, Client, RowIter, Row, Column, Statement, NoTls};
use postgres::fallible_iterator::FallibleIterator;
use parquet::schema::types::{Type as ParquetType, TypePtr, GroupTypeBuilder};

use crate::datatypes::array::{PgMultidimArray, PgMultidimArrayLowerBounds};
use crate::PostgresConnArgs;
use crate::appenders::{ColumnAppender, DynamicMergedAppender, RealMemorySize, ArrayColumnAppender, ColumnAppenderBase, GenericColumnAppender, BasicPgRowColumnAppender, RcWrapperAppender, StaticMergedAppender, new_autoconv_generic_appender, PreprocessExt, new_static_merged_appender, DynColumnAppender};
use crate::appenders::{new_autoconv_generic_appender, new_static_merged_appender, ArrayColumnAppender, BasicPgRowColumnAppender, ColumnAppender, ColumnAppenderBase, DynColumnAppender, DynamicMergedAppender, GenericColumnAppender, PreprocessAppender, PreprocessExt, RcWrapperAppender, RealMemorySize, StaticMergedAppender};
use crate::datatypes::interval::PgInterval;
use crate::datatypes::jsonb::PgRawJsonb;
use crate::datatypes::money::PgMoney;
Expand All @@ -42,6 +44,7 @@ pub struct SchemaSettings {
pub numeric_handling: SchemaSettingsNumericHandling,
pub decimal_scale: i32,
pub decimal_precision: u32,
pub array_handling: SchemaSettingsArrayHandling,
}

#[derive(clap::ValueEnum, Clone, Copy, Debug)]
Expand Down Expand Up @@ -93,6 +96,18 @@ pub enum SchemaSettingsNumericHandling {
String
}

#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq)]
pub enum SchemaSettingsArrayHandling {
/// Postgres arrays are simply stored as Parquet LIST
Plain,
/// Postgres arrays are stored as struct of { data: List[T], dims: List[int] }
#[clap(alias="dims")]
Dimensions,
/// Postgres arrays are stored as struct of { data: List[T], dims: List[int], lower_bound: List[int] }
#[clap(name="dimensions+lowerbound", alias="dimensions+lower_bound", alias="dimensions+lower-bound", alias="dims+lb")]
DimensionsAndLowerBound,
}

pub fn default_settings() -> SchemaSettings {
SchemaSettings {
macaddr_handling: SchemaSettingsMacaddrHandling::Text,
Expand All @@ -102,6 +117,7 @@ pub fn default_settings() -> SchemaSettings {
numeric_handling: SchemaSettingsNumericHandling::Double,
decimal_scale: 18,
decimal_precision: 38,
array_handling: SchemaSettingsArrayHandling::Plain,
}
}

Expand Down Expand Up @@ -360,24 +376,43 @@ fn map_schema_column<TRow: PgAbstractRow + Clone + 'static>(
let (element_appender, element_schema) = map_schema_column(element_type, &element_column, settings)?;

debug_assert_eq!(element_schema.name(), "element");
let list_schema = ParquetType::group_type_builder("list")
.with_repetition(Repetition::REPEATED)
.with_fields(&mut vec![
Arc::new(element_schema)
])
.build().unwrap();

let schema = ParquetType::group_type_builder(c.col_name())
.with_logical_type(Some(LogicalType::List))
.with_repetition(Repetition::OPTIONAL)
.with_fields(&mut vec![
Arc::new(list_schema)
])
.build().unwrap();
let plain_schema = settings.array_handling == SchemaSettingsArrayHandling::Plain;

let schema = if plain_schema {
make_list_schema(c.col_name(), Repetition::OPTIONAL, element_schema)
} else {
make_list_schema("data", Repetition::REQUIRED, element_schema)
};

assert_eq!(element_appender.max_dl(), element_column.definition_level + 1);
assert_eq!(element_appender.max_rl(), element_column.repetition_level);
let array_appender = create_array_appender(element_appender, &c);
Ok((Box::new(array_appender), schema))
let array_appender = create_array_appender(element_appender, &c, plain_schema);
let dim_appender = create_array_dim_appender::<PgAny, TRow>(&c);
let lb_appender = create_array_lower_bound_appender::<PgAny, TRow>(&c);
let dim_schema = make_list_schema("dims", Repetition::REQUIRED, ParquetType::primitive_type_builder("element", basic::Type::INT32).with_repetition(Repetition::REQUIRED).with_logical_type(Some(LogicalType::Integer { bit_width: 32, is_signed: false })).build().unwrap());
let lb_schema = make_list_schema("lower_bound", Repetition::REQUIRED, ParquetType::primitive_type_builder("element", basic::Type::INT32).with_repetition(Repetition::REQUIRED).with_logical_type(Some(LogicalType::Integer { bit_width: 32, is_signed: true })).build().unwrap());
match settings.array_handling {
SchemaSettingsArrayHandling::Plain => Ok((Box::new(array_appender), schema)),
SchemaSettingsArrayHandling::Dimensions => Ok((
Box::new(
new_static_merged_appender(c.definition_level + 1, c.repetition_level).add_appender(array_appender).add_appender(dim_appender)
),
ParquetType::group_type_builder(c.col_name())
.with_repetition(Repetition::OPTIONAL)
.with_fields(&mut vec![ Arc::new(schema), Arc::new(dim_schema) ])
.build().unwrap()
)),
SchemaSettingsArrayHandling::DimensionsAndLowerBound => Ok((
Box::new(
new_static_merged_appender(c.definition_level + 1, c.repetition_level).add_appender(array_appender).add_appender(dim_appender).add_appender(lb_appender)
),
ParquetType::group_type_builder(c.col_name())
.with_repetition(Repetition::OPTIONAL)
.with_fields(&mut vec![ Arc::new(schema), Arc::new(dim_schema), Arc::new(lb_schema) ])
.build().unwrap()
))
}
},
Kind::Domain(ref element_type) => {
map_schema_column(element_type, c, settings)
Expand Down Expand Up @@ -444,6 +479,20 @@ fn map_schema_column<TRow: PgAbstractRow + Clone + 'static>(
}
}

fn make_list_schema(name: &str, repetition: Repetition, element_schema: ParquetType) -> ParquetType {
ParquetType::group_type_builder(name)
.with_logical_type(Some(LogicalType::List))
.with_repetition(repetition)
.with_fields(&mut vec![
Arc::new(ParquetType::group_type_builder("list")
.with_repetition(Repetition::REPEATED)
.with_fields(&mut vec![
Arc::new(element_schema)
])
.build().unwrap())
])
.build().unwrap()
}

fn map_simple_type<TRow: PgAbstractRow + Clone + 'static>(
t: &PgType,
Expand Down Expand Up @@ -641,11 +690,56 @@ fn create_complex_appender<T: for <'a> FromSql<'a> + Clone + 'static, TRow: PgAb
wrap_pg_row_reader(c, RcWrapperAppender::new(main_cp))
}

fn create_array_appender<TRow: PgAbstractRow + Clone>(inner: DynColumnAppender<PgAny>, c: &ColumnInfo) -> impl ColumnAppender<TRow> {
fn create_array_appender<TRow: PgAbstractRow + Clone>(inner: DynColumnAppender<PgAny>, c: &ColumnInfo, warn_on_multidim: bool) -> impl ColumnAppender<TRow> {
let outer_dl = c.definition_level + 1;
debug_assert_eq!(outer_dl + 2, inner.max_dl());
let array_appender = ArrayColumnAppender::new(inner, true, true, outer_dl, c.repetition_level);
wrap_pg_row_reader::<TRow, Vec<Option<PgAny>>>(c, array_appender)
let warned = AtomicBool::new(false);
let col_clone = c.clone();
let multidim_appender = array_appender.preprocess(move |x: Cow<PgMultidimArray<Option<PgAny>>>| {
if warn_on_multidim && x.dims.is_some() && !warned.load(Ordering::Relaxed) {
if !warned.fetch_or(true, Ordering::SeqCst) {
eprintln!("Warning: Column {} contains a {}-dimensional array which will be flattened in Parquet (i.e. {} -> {}). Use --array-handling=dimensions, include another column with the PostgreSQL array dimensions.",
col_clone.full_name(),
x.dims.as_ref().unwrap().len(),
x.dims.as_ref().unwrap().iter().map(|x| x.to_string()).collect::<Vec<_>>().join("x"),
x.data.len()
)
}
}
match x {
Cow::Owned(x) => Cow::Owned(x.data),
Cow::Borrowed(x) => Cow::Borrowed(&x.data)
}
});
wrap_pg_row_reader::<TRow, PgMultidimArray<Option<PgAny>>>(c, multidim_appender)
}

fn create_array_dim_appender<T: Clone + for <'a> FromSql<'a> + 'static, TRow: PgAbstractRow + Clone>(c: &ColumnInfo) -> impl ColumnAppender<TRow> {
let int_appender = new_autoconv_generic_appender::<i32, Int32Type>(c.definition_level + 2, c.repetition_level + 1);
let dim_appender =
ArrayColumnAppender::new(int_appender, false, false, c.definition_level + 1, c.repetition_level)
.preprocess(|x: Cow<PgMultidimArray<Option<T>>>| Cow::<Vec<Option<i32>>>::Owned(
x.dims.as_ref()
.map(|x| x.iter().map(|c| Some(*c)).collect())
.unwrap_or_else(|| if x.data.len() == 0 { Vec::new() } else { vec![Some(x.data.len() as i32)] })
));
wrap_pg_row_reader::<TRow, PgMultidimArray<Option<T>>>(c, dim_appender)
}


fn create_array_lower_bound_appender<T: Clone + for <'a> FromSql<'a> + 'static, TRow: PgAbstractRow + Clone>(c: &ColumnInfo) -> impl ColumnAppender<TRow> {
let int_appender = new_autoconv_generic_appender::<i32, Int32Type>(c.definition_level + 2, c.repetition_level + 1);
let dim_appender =
ArrayColumnAppender::new(int_appender, false, false, c.definition_level + 1, c.repetition_level)
.preprocess(|x: Cow<PgMultidimArray<Option<T>>>| Cow::<Vec<Option<i32>>>::Owned(
match &x.lower_bounds {
_ if x.data.len() == 0 => Vec::new(),
PgMultidimArrayLowerBounds::Const(c) => vec![Some(*c); x.dims.as_ref().map(|x| x.len()).unwrap_or(1)],
PgMultidimArrayLowerBounds::PerDim(v) => v.iter().map(|x| Some(*x)).collect()
}
));
wrap_pg_row_reader::<TRow, PgMultidimArray<Option<T>>>(c, dim_appender)
}

fn wrap_pg_row_reader<TRow: PgAbstractRow + Clone, T: Clone + for <'a> FromSql<'a>>(c: &ColumnInfo, a: impl ColumnAppender<T>) -> impl ColumnAppender<TRow> {
Expand Down
40 changes: 40 additions & 0 deletions py-tests/test_arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,43 @@ def r(low, up, low_inc=True, up_inc=False, is_empty=False):
# self.assertEqual(pl_df["simple_range"].to_list(), [ r[1] for r in duckdb_table ])
# self.assertEqual(pl_df["rint_"].to_list(), [ r[2] for r in duckdb_table ])


def test_multidim(self):
self.maxDiff = None
plain_file = wrappers.create_and_export(
"arrays_multidim", "id",
"id int, a int[], b text[]",
"""
(1, ARRAY[[1,2],[3,4],[NULL,5]], ARRAY[[NULL,'b'],['c',NULL]]),
(2, NULL, NULL),
(3, ARRAY[]::int[], ARRAY[[[]]]::text[]),
(4, ARRAY[[[[1]]]], '{{{a}}}'::text[]),
(5, '[-2:0]={1,2,3}'::int[], '[-1:0][4:5]={{a,b},{c,d}}'::text[])
"""
)

expected_ids = [1, 2, 3, 4, 5]
expected_a = [ [ 1, 2, 3, 4, None, 5 ], None, [], [1], [1, 2, 3] ]
expected_b = [ [ None, "b", "c", None ], None, [], ["a"], ["a", "b", "c", "d"] ]
plain_df = pl.read_parquet(plain_file)
self.assertEqual(plain_df["id"].to_list(), expected_ids)
self.assertEqual(plain_df["a"].to_list(), expected_a)
self.assertEqual(plain_df["b"].to_list(), expected_b)

dims_file = wrappers.run_export_table("arrays_multidim_dims", "arrays_multidim", "id", options=["--array-handling=dims"])
dims_df = pl.read_parquet(dims_file)
self.assertEqual(dims_df["id"].to_list(), expected_ids)
self.assertEqual(dims_df["a"].struct.field("data").to_list(), expected_a)
self.assertEqual(dims_df["b"].struct.field("data").to_list(), expected_b)
self.assertEqual(dims_df["a"].struct.field("dims").to_list(), [ [3, 2], None, [], [1, 1, 1, 1], [3] ])
self.assertEqual(dims_df["b"].struct.field("dims").to_list(), [ [2, 2], None, [], [1, 1, 1], [2, 2] ])

dims_lb_file = wrappers.run_export_table("arrays_multidim_dims_lb", "arrays_multidim", "id", options=["--array-handling=dims+lb"])
dims_lb_df = pl.read_parquet(dims_lb_file)
self.assertEqual(dims_lb_df["id"].to_list(), expected_ids)
self.assertEqual(dims_lb_df["a"].struct.field("data").to_list(), expected_a)
self.assertEqual(dims_lb_df["b"].struct.field("data").to_list(), expected_b)
self.assertEqual(dims_lb_df["a"].struct.field("dims").to_list(), [ [3, 2], None, [], [1, 1, 1, 1], [3] ])
self.assertEqual(dims_lb_df["a"].struct.field("lower_bound").to_list(), [ [1, 1], None, [], [1, 1, 1, 1], [-2] ])
self.assertEqual(dims_lb_df["b"].struct.field("lower_bound").to_list(), [ [1, 1], None, [], [1, 1, 1], [-1, 4] ])

Loading

0 comments on commit aa88868

Please sign in to comment.