Skip to content

Commit

Permalink
Improve documentation and examples for SchemaAdapterFactory, make `…
Browse files Browse the repository at this point in the history
…record_batch` "hygenic" (apache#13063)

* Improve documentation and examples for SchemaAdapterFactory and related classes

* fix macro

* Add macro hygene test

* Fix example, add convenience function, update docs

* Add tests and docs showing what happens when adapting a nullable column

* review feedback

* fix clippy
  • Loading branch information
alamb authored Oct 26, 2024
1 parent 412ca4e commit 62b063c
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 59 deletions.
2 changes: 1 addition & 1 deletion datafusion/common/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ macro_rules! record_batch {
let batch = arrow_array::RecordBatch::try_new(
schema,
vec![$(
create_array!($type, $values),
$crate::create_array!($type, $values),
)*]
);

Expand Down
286 changes: 228 additions & 58 deletions datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,65 +32,77 @@ use std::sync::Arc;
///
/// This interface provides a way to implement custom schema adaptation logic
/// for ParquetExec (for example, to fill missing columns with default value
/// other than null)
/// other than null).
///
/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for
/// more details and examples.
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
/// Provides `SchemaAdapter`.
// The design of this function is mostly modeled for the needs of DefaultSchemaAdapterFactory,
// read its implementation docs for the reasoning
/// Create a [`SchemaAdapter`]
///
/// Arguments:
///
/// * `projected_table_schema`: The schema for the table, projected to
/// include only the fields being output (projected) by the this mapping.
///
/// * `table_schema`: The entire table schema for the table
fn create(
&self,
projected_table_schema: SchemaRef,
table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter>;
}

/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table
/// schema, which may have a schema obtained from merging multiple file-level
/// schemas.
///
/// 1. Before reading the file, we have to map projected column indexes from the
/// table schema to the file schema.
/// This is useful for implementing schema evolution in partitioned datasets.
///
/// 2. After reading a record batch map the read columns back to the expected
/// columns indexes and insert null-valued columns wherever the file schema was
/// missing a column present in the table schema.
/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
pub trait SchemaAdapter: Send + Sync {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// This is used while reading a file to push down projections by mapping
/// projected column indexes from the table schema to the file schema
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;

/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
/// Creates a mapping for casting columns from the file schema to the table
/// schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
/// to the table schema where possible.
/// This is used after reading a record batch. The returned [`SchemaMapper`]:
///
/// Returns a [`SchemaMapper`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
/// 1. Maps columns to the expected columns indexes
/// 2. Handles missing values (e.g. fills nulls or a default value) for
/// columns in the in the table schema not in the file schema
/// 2. Handles different types: if the column in the file schema has a
/// different type than `table_schema`, the mapper will resolve this
/// difference (e.g. by casting to the appropriate type)
///
/// Returns:
/// * a [`SchemaMapper`]
/// * an ordered list of columns to project from the file
fn map_schema(
&self,
file_schema: &Schema,
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
}

/// Maps, by casting or reordering columns from the file schema to the table
/// schema.
/// Maps, columns from a specific file schema to the table schema.
///
/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
pub trait SchemaMapper: Debug + Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored
/// mapping and conversions.
/// Adapts a `RecordBatch` to match the `table_schema`
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;

/// Adapts a [`RecordBatch`] that does not have all the columns from the
/// file schema.
///
/// This method is used when applying a filter to a subset of the columns as
/// part of `DataFusionArrowPredicate` when `filter_pushdown` is enabled.
/// This method is used, for example, when applying a filter to a subset of
/// the columns as part of `DataFusionArrowPredicate` when `filter_pushdown`
/// is enabled.
///
/// This method is slower than `map_batch` as it looks up columns by name.
fn map_partial_batch(
Expand All @@ -99,11 +111,106 @@ pub trait SchemaMapper: Debug + Send + Sync {
) -> datafusion_common::Result<RecordBatch>;
}

/// Implementation of [`SchemaAdapterFactory`] that maps columns by name
/// and casts columns to the expected type.
/// Default [`SchemaAdapterFactory`] for mapping schemas.
///
/// This can be used to adapt file-level record batches to a table schema and
/// implement schema evolution.
///
/// Given an input file schema and a table schema, this factory returns
/// [`SchemaAdapter`] that return [`SchemaMapper`]s that:
///
/// 1. Reorder columns
/// 2. Cast columns to the correct type
/// 3. Fill missing columns with nulls
///
/// # Errors:
///
/// * If a column in the table schema is non-nullable but is not present in the
/// file schema (i.e. it is missing), the returned mapper tries to fill it with
/// nulls resulting in a schema error.
///
/// # Illustration of Schema Mapping
///
/// ```text
/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
/// ┌───────┐ ┌───────┐ │ ┌───────┐ ┌───────┐ ┌───────┐ │
/// ││ 1.0 │ │ "foo" │ ││ NULL │ │ "foo" │ │ "1.0" │
/// ├───────┤ ├───────┤ │ Schema mapping ├───────┤ ├───────┤ ├───────┤ │
/// ││ 2.0 │ │ "bar" │ ││ NULL │ │ "bar" │ │ "2.0" │
/// └───────┘ └───────┘ │────────────────▶ └───────┘ └───────┘ └───────┘ │
/// │ │
/// column "c" column "b"│ column "a" column "b" column "c"│
/// │ Float64 Utf8 │ Int32 Utf8 Utf8
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
/// Input Record Batch Output Record Batch
///
/// Schema { Schema {
/// "c": Float64, "a": Int32,
/// "b": Utf8, "b": Utf8,
/// } "c": Utf8,
/// }
/// ```
///
/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s
///
/// Note `SchemaMapping` also supports mapping partial batches, which is used as
/// part of predicate pushdown.
///
/// ```
/// # use std::sync::Arc;
/// # use arrow::datatypes::{DataType, Field, Schema};
/// # use datafusion::datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
/// # use datafusion_common::record_batch;
/// // Table has fields "a", "b" and "c"
/// let table_schema = Schema::new(vec![
/// Field::new("a", DataType::Int32, true),
/// Field::new("b", DataType::Utf8, true),
/// Field::new("c", DataType::Utf8, true),
/// ]);
///
/// // create an adapter to map the table schema to the file schema
/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
///
/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64'
/// // instead of 'Utf8'
/// let file_schema = Schema::new(vec![
/// Field::new("c", DataType::Utf8, true),
/// Field::new("b", DataType::Float64, true),
/// ]);
///
/// // Get a mapping from the file schema to the table schema
/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap();
///
/// let file_batch = record_batch!(
/// ("c", Utf8, vec!["foo", "bar"]),
/// ("b", Float64, vec![1.0, 2.0])
/// ).unwrap();
///
/// let mapped_batch = mapper.map_batch(file_batch).unwrap();
///
/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8
/// let expected_batch = record_batch!(
/// ("a", Int32, vec![None, None]), // missing column filled with nulls
/// ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed
/// ("c", Utf8, vec!["foo", "bar"])
/// ).unwrap();
/// assert_eq!(mapped_batch, expected_batch);
/// ```
#[derive(Clone, Debug, Default)]
pub struct DefaultSchemaAdapterFactory;

impl DefaultSchemaAdapterFactory {
/// Create a new factory for mapping batches from a file schema to a table
/// schema.
///
/// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
/// the same schema for both the projected table schema and the table
/// schema.
pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
Self.create(Arc::clone(&table_schema), table_schema)
}
}

impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
fn create(
&self,
Expand All @@ -117,8 +224,8 @@ impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
}
}

/// This SchemaAdapter requires both the table schema and the projected table schema because of the
/// needs of the [`SchemaMapping`] it creates. Read its documentation for more details
/// This SchemaAdapter requires both the table schema and the projected table
/// schema. See [`SchemaMapping`] for more details
#[derive(Clone, Debug)]
pub(crate) struct DefaultSchemaAdapter {
/// The schema for the table, projected to include only the fields being output (projected) by the
Expand All @@ -142,11 +249,12 @@ impl SchemaAdapter for DefaultSchemaAdapter {
Some(file_schema.fields.find(field.name())?.0)
}

/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
/// Creates a `SchemaMapping` for casting or mapping the columns from the
/// file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
/// to the table schema where possible.
/// If the provided `file_schema` contains columns of a different type to
/// the expected `table_schema`, the method will attempt to cast the array
/// data from the file schema to the table schema where possible.
///
/// Returns a [`SchemaMapping`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
Expand Down Expand Up @@ -189,36 +297,45 @@ impl SchemaAdapter for DefaultSchemaAdapter {
}
}

/// The SchemaMapping struct holds a mapping from the file schema to the table schema
/// and any necessary type conversions that need to be applied.
/// The SchemaMapping struct holds a mapping from the file schema to the table
/// schema and any necessary type conversions.
///
/// Note, because `map_batch` and `map_partial_batch` functions have different
/// needs, this struct holds two schemas:
///
/// 1. The projected **table** schema
/// 2. The full table schema
///
/// This needs both the projected table schema and full table schema because its different
/// functions have different needs. The [`map_batch`] function is only used by the ParquetOpener to
/// produce a RecordBatch which has the projected schema, since that's the schema which is supposed
/// to come out of the execution of this query. [`map_partial_batch`], however, is used to create a
/// RecordBatch with a schema that can be used for Parquet pushdown, meaning that it may contain
/// fields which are not in the projected schema (as the fields that parquet pushdown filters
/// operate can be completely distinct from the fields that are projected (output) out of the
/// ParquetExec).
/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
/// has the projected schema, since that's the schema which is supposed to come
/// out of the execution of this query. Thus `map_batch` uses
/// `projected_table_schema` as it can only operate on the projected fields.
///
/// [`map_partial_batch`] uses `table_schema` to create the resulting RecordBatch (as it could be
/// operating on any fields in the schema), while [`map_batch`] uses `projected_table_schema` (as
/// it can only operate on the projected fields).
/// [`map_partial_batch`] is used to create a RecordBatch with a schema that
/// can be used for Parquet predicate pushdown, meaning that it may contain
/// fields which are not in the projected schema (as the fields that parquet
/// pushdown filters operate can be completely distinct from the fields that are
/// projected (output) out of the ParquetExec). `map_partial_batch` thus uses
/// `table_schema` to create the resulting RecordBatch (as it could be operating
/// on any fields in the schema).
///
/// [`map_batch`]: Self::map_batch
/// [`map_partial_batch`]: Self::map_partial_batch
#[derive(Debug)]
pub struct SchemaMapping {
/// The schema of the table. This is the expected schema after conversion and it should match
/// the schema of the query result.
/// The schema of the table. This is the expected schema after conversion
/// and it should match the schema of the query result.
projected_table_schema: SchemaRef,
/// Mapping from field index in `projected_table_schema` to index in projected file_schema.
/// They are Options instead of just plain `usize`s because the table could have fields that
/// don't exist in the file.
/// Mapping from field index in `projected_table_schema` to index in
/// projected file_schema.
///
/// They are Options instead of just plain `usize`s because the table could
/// have fields that don't exist in the file.
field_mappings: Vec<Option<usize>>,
/// The entire table schema, as opposed to the projected_table_schema (which only contains the
/// columns that we are projecting out of this query). This contains all fields in the table,
/// regardless of if they will be projected out or not.
/// The entire table schema, as opposed to the projected_table_schema (which
/// only contains the columns that we are projecting out of this query).
/// This contains all fields in the table, regardless of if they will be
/// projected out or not.
table_schema: SchemaRef,
}

Expand Down Expand Up @@ -331,8 +448,9 @@ mod tests {

use crate::datasource::listing::PartitionedFile;
use crate::datasource::schema_adapter::{
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
use datafusion_common::record_batch;
#[cfg(feature = "parquet")]
use parquet::arrow::ArrowWriter;
use tempfile::TempDir;
Expand Down Expand Up @@ -405,6 +523,58 @@ mod tests {
assert_batches_sorted_eq!(expected, &read);
}

#[test]
fn default_schema_adapter() {
let table_schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
]);

// file has a subset of the table schema fields and different type
let file_schema = Schema::new(vec![
Field::new("c", DataType::Float64, true), // not in table schema
Field::new("b", DataType::Float64, true),
]);

let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
assert_eq!(indices, vec![1]);

let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap();

let mapped_batch = mapper.map_batch(file_batch).unwrap();

// the mapped batch has the correct schema and the "b" column has been cast to Utf8
let expected_batch = record_batch!(
("a", Int32, vec![None, None]), // missing column filled with nulls
("b", Utf8, vec!["1.0", "2.0"]) // b was cast to string and order was changed
)
.unwrap();
assert_eq!(mapped_batch, expected_batch);
}

#[test]
fn default_schema_adapter_non_nullable_columns() {
let table_schema = Schema::new(vec![
Field::new("a", DataType::Int32, false), // "a"" is declared non nullable
Field::new("b", DataType::Utf8, true),
]);
let file_schema = Schema::new(vec![
// since file doesn't have "a" it will be filled with nulls
Field::new("b", DataType::Float64, true),
]);

let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
assert_eq!(indices, vec![0]);

let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap();

// Mapping fails because it tries to fill in a non-nullable column with nulls
let err = mapper.map_batch(file_batch).unwrap_err().to_string();
assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}");
}

#[derive(Debug)]
struct TestSchemaAdapterFactory;

Expand Down
Loading

0 comments on commit 62b063c

Please sign in to comment.