diff --git a/datafusion/common/src/test_util.rs b/datafusion/common/src/test_util.rs index 422fcb5eb3e0..d3b8c8451258 100644 --- a/datafusion/common/src/test_util.rs +++ b/datafusion/common/src/test_util.rs @@ -347,7 +347,7 @@ macro_rules! record_batch { let batch = arrow_array::RecordBatch::try_new( schema, vec![$( - create_array!($type, $values), + $crate::create_array!($type, $values), )*] ); diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index 131b8c354ce7..80d2bf987473 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -32,11 +32,19 @@ use std::sync::Arc; /// /// This interface provides a way to implement custom schema adaptation logic /// for ParquetExec (for example, to fill missing columns with default value -/// other than null) +/// other than null). +/// +/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for +/// more details and examples. pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { - /// Provides `SchemaAdapter`. - // The design of this function is mostly modeled for the needs of DefaultSchemaAdapterFactory, - // read its implementation docs for the reasoning + /// Create a [`SchemaAdapter`] + /// + /// Arguments: + /// + /// * `projected_table_schema`: The schema for the table, projected to + /// include only the fields being output (projected) by the this mapping. + /// + /// * `table_schema`: The entire table schema for the table fn create( &self, projected_table_schema: SchemaRef, @@ -44,53 +52,57 @@ pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { ) -> Box; } -/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema -/// obtained from merging multiple file-level schemas. -/// -/// This is useful for enabling schema evolution in partitioned datasets. -/// -/// This has to be done in two stages. +/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table +/// schema, which may have a schema obtained from merging multiple file-level +/// schemas. /// -/// 1. Before reading the file, we have to map projected column indexes from the -/// table schema to the file schema. +/// This is useful for implementing schema evolution in partitioned datasets. /// -/// 2. After reading a record batch map the read columns back to the expected -/// columns indexes and insert null-valued columns wherever the file schema was -/// missing a column present in the table schema. +/// See [`DefaultSchemaAdapterFactory`] for more details and examples. pub trait SchemaAdapter: Send + Sync { /// Map a column index in the table schema to a column index in a particular /// file schema /// + /// This is used while reading a file to push down projections by mapping + /// projected column indexes from the table schema to the file schema + /// /// Panics if index is not in range for the table schema fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option; - /// Creates a `SchemaMapping` that can be used to cast or map the columns - /// from the file schema to the table schema. + /// Creates a mapping for casting columns from the file schema to the table + /// schema. /// - /// If the provided `file_schema` contains columns of a different type to the expected - /// `table_schema`, the method will attempt to cast the array data from the file schema - /// to the table schema where possible. + /// This is used after reading a record batch. The returned [`SchemaMapper`]: /// - /// Returns a [`SchemaMapper`] that can be applied to the output batch - /// along with an ordered list of columns to project from the file + /// 1. Maps columns to the expected columns indexes + /// 2. Handles missing values (e.g. fills nulls or a default value) for + /// columns in the in the table schema not in the file schema + /// 2. Handles different types: if the column in the file schema has a + /// different type than `table_schema`, the mapper will resolve this + /// difference (e.g. by casting to the appropriate type) + /// + /// Returns: + /// * a [`SchemaMapper`] + /// * an ordered list of columns to project from the file fn map_schema( &self, file_schema: &Schema, ) -> datafusion_common::Result<(Arc, Vec)>; } -/// Maps, by casting or reordering columns from the file schema to the table -/// schema. +/// Maps, columns from a specific file schema to the table schema. +/// +/// See [`DefaultSchemaAdapterFactory`] for more details and examples. pub trait SchemaMapper: Debug + Send + Sync { - /// Adapts a `RecordBatch` to match the `table_schema` using the stored - /// mapping and conversions. + /// Adapts a `RecordBatch` to match the `table_schema` fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; /// Adapts a [`RecordBatch`] that does not have all the columns from the /// file schema. /// - /// This method is used when applying a filter to a subset of the columns as - /// part of `DataFusionArrowPredicate` when `filter_pushdown` is enabled. + /// This method is used, for example, when applying a filter to a subset of + /// the columns as part of `DataFusionArrowPredicate` when `filter_pushdown` + /// is enabled. /// /// This method is slower than `map_batch` as it looks up columns by name. fn map_partial_batch( @@ -99,11 +111,106 @@ pub trait SchemaMapper: Debug + Send + Sync { ) -> datafusion_common::Result; } -/// Implementation of [`SchemaAdapterFactory`] that maps columns by name -/// and casts columns to the expected type. +/// Default [`SchemaAdapterFactory`] for mapping schemas. +/// +/// This can be used to adapt file-level record batches to a table schema and +/// implement schema evolution. +/// +/// Given an input file schema and a table schema, this factory returns +/// [`SchemaAdapter`] that return [`SchemaMapper`]s that: +/// +/// 1. Reorder columns +/// 2. Cast columns to the correct type +/// 3. Fill missing columns with nulls +/// +/// # Errors: +/// +/// * If a column in the table schema is non-nullable but is not present in the +/// file schema (i.e. it is missing), the returned mapper tries to fill it with +/// nulls resulting in a schema error. +/// +/// # Illustration of Schema Mapping +/// +/// ```text +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +/// ┌───────┐ ┌───────┐ │ ┌───────┐ ┌───────┐ ┌───────┐ │ +/// ││ 1.0 │ │ "foo" │ ││ NULL │ │ "foo" │ │ "1.0" │ +/// ├───────┤ ├───────┤ │ Schema mapping ├───────┤ ├───────┤ ├───────┤ │ +/// ││ 2.0 │ │ "bar" │ ││ NULL │ │ "bar" │ │ "2.0" │ +/// └───────┘ └───────┘ │────────────────▶ └───────┘ └───────┘ └───────┘ │ +/// │ │ +/// column "c" column "b"│ column "a" column "b" column "c"│ +/// │ Float64 Utf8 │ Int32 Utf8 Utf8 +/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ +/// Input Record Batch Output Record Batch +/// +/// Schema { Schema { +/// "c": Float64, "a": Int32, +/// "b": Utf8, "b": Utf8, +/// } "c": Utf8, +/// } +/// ``` +/// +/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s +/// +/// Note `SchemaMapping` also supports mapping partial batches, which is used as +/// part of predicate pushdown. +/// +/// ``` +/// # use std::sync::Arc; +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion::datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory}; +/// # use datafusion_common::record_batch; +/// // Table has fields "a", "b" and "c" +/// let table_schema = Schema::new(vec![ +/// Field::new("a", DataType::Int32, true), +/// Field::new("b", DataType::Utf8, true), +/// Field::new("c", DataType::Utf8, true), +/// ]); +/// +/// // create an adapter to map the table schema to the file schema +/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); +/// +/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64' +/// // instead of 'Utf8' +/// let file_schema = Schema::new(vec![ +/// Field::new("c", DataType::Utf8, true), +/// Field::new("b", DataType::Float64, true), +/// ]); +/// +/// // Get a mapping from the file schema to the table schema +/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap(); +/// +/// let file_batch = record_batch!( +/// ("c", Utf8, vec!["foo", "bar"]), +/// ("b", Float64, vec![1.0, 2.0]) +/// ).unwrap(); +/// +/// let mapped_batch = mapper.map_batch(file_batch).unwrap(); +/// +/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8 +/// let expected_batch = record_batch!( +/// ("a", Int32, vec![None, None]), // missing column filled with nulls +/// ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed +/// ("c", Utf8, vec!["foo", "bar"]) +/// ).unwrap(); +/// assert_eq!(mapped_batch, expected_batch); +/// ``` #[derive(Clone, Debug, Default)] pub struct DefaultSchemaAdapterFactory; +impl DefaultSchemaAdapterFactory { + /// Create a new factory for mapping batches from a file schema to a table + /// schema. + /// + /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with + /// the same schema for both the projected table schema and the table + /// schema. + pub fn from_schema(table_schema: SchemaRef) -> Box { + Self.create(Arc::clone(&table_schema), table_schema) + } +} + impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { fn create( &self, @@ -117,8 +224,8 @@ impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { } } -/// This SchemaAdapter requires both the table schema and the projected table schema because of the -/// needs of the [`SchemaMapping`] it creates. Read its documentation for more details +/// This SchemaAdapter requires both the table schema and the projected table +/// schema. See [`SchemaMapping`] for more details #[derive(Clone, Debug)] pub(crate) struct DefaultSchemaAdapter { /// The schema for the table, projected to include only the fields being output (projected) by the @@ -142,11 +249,12 @@ impl SchemaAdapter for DefaultSchemaAdapter { Some(file_schema.fields.find(field.name())?.0) } - /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. + /// Creates a `SchemaMapping` for casting or mapping the columns from the + /// file schema to the table schema. /// - /// If the provided `file_schema` contains columns of a different type to the expected - /// `table_schema`, the method will attempt to cast the array data from the file schema - /// to the table schema where possible. + /// If the provided `file_schema` contains columns of a different type to + /// the expected `table_schema`, the method will attempt to cast the array + /// data from the file schema to the table schema where possible. /// /// Returns a [`SchemaMapping`] that can be applied to the output batch /// along with an ordered list of columns to project from the file @@ -189,36 +297,45 @@ impl SchemaAdapter for DefaultSchemaAdapter { } } -/// The SchemaMapping struct holds a mapping from the file schema to the table schema -/// and any necessary type conversions that need to be applied. +/// The SchemaMapping struct holds a mapping from the file schema to the table +/// schema and any necessary type conversions. +/// +/// Note, because `map_batch` and `map_partial_batch` functions have different +/// needs, this struct holds two schemas: +/// +/// 1. The projected **table** schema +/// 2. The full table schema /// -/// This needs both the projected table schema and full table schema because its different -/// functions have different needs. The [`map_batch`] function is only used by the ParquetOpener to -/// produce a RecordBatch which has the projected schema, since that's the schema which is supposed -/// to come out of the execution of this query. [`map_partial_batch`], however, is used to create a -/// RecordBatch with a schema that can be used for Parquet pushdown, meaning that it may contain -/// fields which are not in the projected schema (as the fields that parquet pushdown filters -/// operate can be completely distinct from the fields that are projected (output) out of the -/// ParquetExec). +/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which +/// has the projected schema, since that's the schema which is supposed to come +/// out of the execution of this query. Thus `map_batch` uses +/// `projected_table_schema` as it can only operate on the projected fields. /// -/// [`map_partial_batch`] uses `table_schema` to create the resulting RecordBatch (as it could be -/// operating on any fields in the schema), while [`map_batch`] uses `projected_table_schema` (as -/// it can only operate on the projected fields). +/// [`map_partial_batch`] is used to create a RecordBatch with a schema that +/// can be used for Parquet predicate pushdown, meaning that it may contain +/// fields which are not in the projected schema (as the fields that parquet +/// pushdown filters operate can be completely distinct from the fields that are +/// projected (output) out of the ParquetExec). `map_partial_batch` thus uses +/// `table_schema` to create the resulting RecordBatch (as it could be operating +/// on any fields in the schema). /// /// [`map_batch`]: Self::map_batch /// [`map_partial_batch`]: Self::map_partial_batch #[derive(Debug)] pub struct SchemaMapping { - /// The schema of the table. This is the expected schema after conversion and it should match - /// the schema of the query result. + /// The schema of the table. This is the expected schema after conversion + /// and it should match the schema of the query result. projected_table_schema: SchemaRef, - /// Mapping from field index in `projected_table_schema` to index in projected file_schema. - /// They are Options instead of just plain `usize`s because the table could have fields that - /// don't exist in the file. + /// Mapping from field index in `projected_table_schema` to index in + /// projected file_schema. + /// + /// They are Options instead of just plain `usize`s because the table could + /// have fields that don't exist in the file. field_mappings: Vec>, - /// The entire table schema, as opposed to the projected_table_schema (which only contains the - /// columns that we are projecting out of this query). This contains all fields in the table, - /// regardless of if they will be projected out or not. + /// The entire table schema, as opposed to the projected_table_schema (which + /// only contains the columns that we are projecting out of this query). + /// This contains all fields in the table, regardless of if they will be + /// projected out or not. table_schema: SchemaRef, } @@ -331,8 +448,9 @@ mod tests { use crate::datasource::listing::PartitionedFile; use crate::datasource::schema_adapter::{ - SchemaAdapter, SchemaAdapterFactory, SchemaMapper, + DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; + use datafusion_common::record_batch; #[cfg(feature = "parquet")] use parquet::arrow::ArrowWriter; use tempfile::TempDir; @@ -405,6 +523,58 @@ mod tests { assert_batches_sorted_eq!(expected, &read); } + #[test] + fn default_schema_adapter() { + let table_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ]); + + // file has a subset of the table schema fields and different type + let file_schema = Schema::new(vec![ + Field::new("c", DataType::Float64, true), // not in table schema + Field::new("b", DataType::Float64, true), + ]); + + let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); + let (mapper, indices) = adapter.map_schema(&file_schema).unwrap(); + assert_eq!(indices, vec![1]); + + let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap(); + + let mapped_batch = mapper.map_batch(file_batch).unwrap(); + + // the mapped batch has the correct schema and the "b" column has been cast to Utf8 + let expected_batch = record_batch!( + ("a", Int32, vec![None, None]), // missing column filled with nulls + ("b", Utf8, vec!["1.0", "2.0"]) // b was cast to string and order was changed + ) + .unwrap(); + assert_eq!(mapped_batch, expected_batch); + } + + #[test] + fn default_schema_adapter_non_nullable_columns() { + let table_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), // "a"" is declared non nullable + Field::new("b", DataType::Utf8, true), + ]); + let file_schema = Schema::new(vec![ + // since file doesn't have "a" it will be filled with nulls + Field::new("b", DataType::Float64, true), + ]); + + let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); + let (mapper, indices) = adapter.map_schema(&file_schema).unwrap(); + assert_eq!(indices, vec![0]); + + let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap(); + + // Mapping fails because it tries to fill in a non-nullable column with nulls + let err = mapper.map_batch(file_batch).unwrap_err().to_string(); + assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}"); + } + #[derive(Debug)] struct TestSchemaAdapterFactory; diff --git a/datafusion/core/tests/macro_hygiene/mod.rs b/datafusion/core/tests/macro_hygiene/mod.rs index 72ac6e64fb0c..c35e46c0c558 100644 --- a/datafusion/core/tests/macro_hygiene/mod.rs +++ b/datafusion/core/tests/macro_hygiene/mod.rs @@ -37,3 +37,13 @@ mod plan_datafusion_err { plan_datafusion_err!("foo"); } } + +mod record_batch { + // NO other imports! + use datafusion_common::record_batch; + + #[test] + fn test_macro() { + record_batch!(("column_name", Int32, vec![1, 2, 3])).unwrap(); + } +}