Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Nov 15, 2024
1 parent 8dda83e commit ccec805
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 56 deletions.
14 changes: 10 additions & 4 deletions benchmarks/src/bin/cache_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.parquet
.schema_force_view_types = false;

session_config
.options_mut()
.execution
.parquet
.operate_as_flight_cache = true;

let ctx = Arc::new(SessionContext::new_with_config(session_config));

// register parquet file with the execution context
Expand Down Expand Up @@ -293,16 +299,16 @@ impl FlightSqlService for FlightSqlServiceImpl {
let ctx = self.get_ctx(&request)?;

let schema = execution_plan.schema();

println!("execution plan schema: {:?}", schema);
let stream = execution_plan
.execute(fetch_results.partition as usize, ctx.task_ctx())
.map_err(|e| status!("Error executing plan", e))?
.unwrap()
// .map_err(|e| status!("Error executing plan", e))?
.map_err(|e| {
arrow_flight::error::FlightError::from_external_error(Box::new(e))
panic!("Error executing plan: {:?}", e);
});

let stream = FlightDataEncoderBuilder::new()
.with_schema(schema)
.build(stream)
.map_err(Status::from);

Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ config_namespace! {
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_view_types: bool, default = true

/// (reading) If true, parquet reader will operate as a flight cache server
pub operate_as_flight_cache: bool, default = false

/// (reading) If true, parquet reader will read columns of
/// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
///
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ impl ParquetOptions {
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _, // reads not used for writer props
schema_force_view_types: _,
operate_as_flight_cache: _,
binary_as_string: _, // not used for writer props
} = self;

Expand Down Expand Up @@ -443,6 +444,7 @@ mod tests {
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: defaults.bloom_filter_on_read,
schema_force_view_types: defaults.schema_force_view_types,
operate_as_flight_cache: defaults.operate_as_flight_cache,
binary_as_string: defaults.binary_as_string,
}
}
Expand Down Expand Up @@ -545,6 +547,7 @@ mod tests {
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
schema_force_view_types: global_options_defaults.schema_force_view_types,
operate_as_flight_cache: global_options_defaults.operate_as_flight_cache,
binary_as_string: global_options_defaults.binary_as_string,
},
column_specific_options,
Expand Down
50 changes: 50 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,56 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema {
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}

pub(crate) fn transform_to_flight_cache_types(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
field_with_new_type(
field,
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
)
}
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
field_with_new_type(
field,
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Binary),
),
)
}
_ => field.clone(),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}

pub(crate) fn coerce_file_schema_to_flight_cache_types(
table_schema: &Schema,
file_schema: &Schema,
) -> Option<Schema> {
let mut transform = false;
for field in table_schema.fields() {
if field.data_type().equals_datatype(&DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
)) {
transform = true;
}
}

if !transform {
return None;
}

Some(transform_to_flight_cache_types(file_schema))
}

/// Coerces the file schema if the table schema uses a view type.
pub(crate) fn coerce_file_schema_to_view_type(
table_schema: &Schema,
Expand Down
16 changes: 14 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
transform_binary_to_string, transform_schema_to_view, FileFormat, FileFormatFactory,
FilePushdownSupport, FileScanConfig,
transform_binary_to_string, transform_schema_to_view,
transform_to_flight_cache_types, FileFormat, FileFormatFactory, FilePushdownSupport,
FileScanConfig,
};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
Expand Down Expand Up @@ -258,6 +259,11 @@ impl ParquetFormat {
self.options.global.schema_force_view_types
}

/// If true, will operate as flight cache.
pub fn operate_as_flight_cache(&self) -> bool {
self.options.global.operate_as_flight_cache
}

/// If true, will use view types. See [`Self::force_view_types`] for details
pub fn with_force_view_types(mut self, use_views: bool) -> Self {
self.options.global.schema_force_view_types = use_views;
Expand Down Expand Up @@ -389,6 +395,12 @@ impl FileFormat for ParquetFormat {
schema
};

let schema = if self.operate_as_flight_cache() {
transform_to_flight_cache_types(&schema)
} else {
schema
};

Ok(Arc::new(schema))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
//! [`ParquetOpener`] for opening Parquet files
use crate::datasource::file_format::{
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
coerce_file_schema_to_flight_cache_types, coerce_file_schema_to_string_type,
coerce_file_schema_to_view_type,
};
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
Expand Down Expand Up @@ -137,6 +138,12 @@ impl FileOpener for ParquetOpener {
schema = Arc::new(merged);
}

if let Some(merged) =
coerce_file_schema_to_flight_cache_types(&table_schema, &schema)
{
schema = Arc::new(merged);
}

let options = ArrowReaderOptions::new()
.with_page_index(enable_page_index)
.with_schema(Arc::clone(&schema));
Expand Down
93 changes: 45 additions & 48 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,10 @@ use arrow::compute::kernels;
use arrow::datatypes::{DataType, Schema};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use arrow_schema::Field;
use datafusion_expr::{ColumnarValue, Operator};
use datafusion_physical_expr_common::datum::apply_cmp;
use parquet::arrow::arrow_cache::etc_array::EtcStringArray;
use parquet::arrow::arrow_cache::etc_array::{AsEtcArray, EtcArray, EtcArrayRef};
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
Expand Down Expand Up @@ -202,66 +201,64 @@ impl ArrowPredicate for DatafusionArrowPredicate {
return self.evaluate(batch.clone());
}

if let Some(array) = input.downcast_ref::<EtcStringArray>() {
if let Some(array) = input.downcast_ref::<EtcArrayRef>() {
if let Some(binary_expr) =
self.physical_expr.as_any().downcast_ref::<BinaryExpr>()
{
if let Some(literal) =
binary_expr.right().as_any().downcast_ref::<Literal>()
{
let op = binary_expr.op();
if op == &Operator::Eq {
if let ScalarValue::Utf8(Some(v)) = literal.value() {
let result = array.compare_equals(&v);
return Ok(result);
};
} else if op == &Operator::NotEq {
if let ScalarValue::Utf8(Some(v)) = literal.value() {
let result = array.compare_not_equals(&v);
return Ok(result);
};
}

let dict_array = array.to_dict_string();
if let Some(array) = array.as_string_array_opt() {
if op == &Operator::Eq {
if let ScalarValue::Utf8(Some(v)) = literal.value() {
let result = array.compare_equals(&v);
return Ok(result);
};
} else if op == &Operator::NotEq {
if let ScalarValue::Utf8(Some(v)) = literal.value() {
let result = array.compare_not_equals(&v);
return Ok(result);
};
}

let lhs = ColumnarValue::Array(Arc::new(dict_array));
let rhs = ColumnarValue::Scalar(literal.value().clone());
let dict_array = array.to_dict_string();
let lhs = ColumnarValue::Array(Arc::new(dict_array));
let rhs = ColumnarValue::Scalar(literal.value().clone());

let result = match op {
Operator::NotEq => apply_cmp(&lhs, &rhs, kernels::cmp::neq),
Operator::Eq => apply_cmp(&lhs, &rhs, kernels::cmp::eq),
Operator::Lt => apply_cmp(&lhs, &rhs, kernels::cmp::lt),
Operator::LtEq => apply_cmp(&lhs, &rhs, kernels::cmp::lt_eq),
Operator::Gt => apply_cmp(&lhs, &rhs, kernels::cmp::gt),
Operator::GtEq => apply_cmp(&lhs, &rhs, kernels::cmp::gt_eq),
Operator::LikeMatch => {
apply_cmp(&lhs, &rhs, arrow::compute::like)
}
Operator::ILikeMatch => {
apply_cmp(&lhs, &rhs, arrow::compute::ilike)
}
Operator::NotLikeMatch => {
apply_cmp(&lhs, &rhs, arrow::compute::nlike)
}
Operator::NotILikeMatch => {
apply_cmp(&lhs, &rhs, arrow::compute::nilike)
let result = match op {
Operator::NotEq => apply_cmp(&lhs, &rhs, kernels::cmp::neq),
Operator::Eq => apply_cmp(&lhs, &rhs, kernels::cmp::eq),
Operator::Lt => apply_cmp(&lhs, &rhs, kernels::cmp::lt),
Operator::LtEq => apply_cmp(&lhs, &rhs, kernels::cmp::lt_eq),
Operator::Gt => apply_cmp(&lhs, &rhs, kernels::cmp::gt),
Operator::GtEq => apply_cmp(&lhs, &rhs, kernels::cmp::gt_eq),
Operator::LikeMatch => {
apply_cmp(&lhs, &rhs, arrow::compute::like)
}
Operator::ILikeMatch => {
apply_cmp(&lhs, &rhs, arrow::compute::ilike)
}
Operator::NotLikeMatch => {
apply_cmp(&lhs, &rhs, arrow::compute::nlike)
}
Operator::NotILikeMatch => {
apply_cmp(&lhs, &rhs, arrow::compute::nilike)
}
_ => panic!("unsupported operator: {:?}", op),
};
if let Ok(result) = result {
let filtered =
result.into_array(array.len())?.as_boolean().clone();
return Ok(filtered);
}
_ => panic!("unsupported operator: {:?}", op),
};
if let Ok(result) = result {
let filtered =
result.into_array(array.len())?.as_boolean().clone();
return Ok(filtered);
}
}
}
let arrow_array = array.to_dict_string();
let arrow_array = array.to_arrow_array();
let schema = Schema::new(vec![Field::new(
"_",
DataType::Dictionary(
Box::new(DataType::UInt32),
Box::new(DataType::Utf8),
),
"",
arrow_array.data_type().clone(),
arrow_array.is_nullable(),
)]);
let record_batch =
Expand Down
1 change: 1 addition & 0 deletions datafusion/flight-table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl FlightTableFactory {
let num_rows = precision(metadata.info.total_records);
let total_byte_size = precision(metadata.info.total_bytes);
let logical_schema = metadata.schema;
println!("logical schema: {:?}", logical_schema);
let stats = Statistics {
num_rows,
total_byte_size,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,8 @@ message ParquetOptions {
bool bloom_filter_on_write = 27; // default = false
bool schema_force_view_types = 28; // default = false
bool binary_as_string = 29; // default = false

bool operate_as_flight_cache = 30; // default = false

oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
schema_force_view_types: value.schema_force_view_types,
binary_as_string: value.binary_as_string,
operate_as_flight_cache: value.operate_as_flight_cache,
})
}
}
Expand Down
Loading

0 comments on commit ccec805

Please sign in to comment.