Skip to content

Commit

Permalink
chore: Upgrade to DataFusion 44.0.0-rc2 (apache#1154)
Browse files Browse the repository at this point in the history
* move aggregate expressions to spark-expr crate

* move more expressions

* move benchmark

* normalize_nan

* bitwise not

* comet scalar funcs

* update bench imports

* save

* save

* save

* remove unused imports

* clippy

* implement more hashers

* implement Hash and PartialEq

* implement Hash and PartialEq

* implement Hash and PartialEq

* benches

* fix ScalarUDFImpl.return_type failure

* exclude test from miri

* ignore correct test

* ignore another test

* remove miri checks

* use return_type_from_exprs

* Revert "use return_type_from_exprs"

This reverts commit febc1f1.

* use DF main branch

* hacky workaround for regression in ScalarUDFImpl.return_type

* fix repo url

* pin to revision

* bump to latest rev

* bump to latest DF rev

* bump DF to rev 9f530dd

* add Cargo.lock

* bump DF version

* no default features

* Revert "remove miri checks"

This reverts commit 4638fe3.

* Update pin to DataFusion e99e02b9b9093ceb0c13a2dd32a2a89beba47930

* update pin

* Update Cargo.toml

Bump to 44.0.0-rc2

* update cargo lock

* revert miri change

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
andygrove and alamb committed Jan 13, 2025
1 parent b49c17b commit d245f0e
Show file tree
Hide file tree
Showing 35 changed files with 718 additions and 1,016 deletions.
582 changes: 295 additions & 287 deletions native/Cargo.lock

Large diffs are not rendered by default.

29 changes: 15 additions & 14 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@ edition = "2021"
rust-version = "1.79"

[workspace.dependencies]
arrow = { version = "53.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "53.2.0" }
arrow-buffer = { version = "53.2.0" }
arrow-data = { version = "53.2.0" }
arrow-schema = { version = "53.2.0" }
parquet = { version = "53.2.0", default-features = false, features = ["experimental"] }
datafusion = { version = "43.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "parquet"] }
datafusion-common = { version = "43.0.0" }
datafusion-functions = { version = "43.0.0", features = ["crypto_expressions"] }
datafusion-functions-nested = { version = "43.0.0", default-features = false }
datafusion-expr = { version = "43.0.0", default-features = false }
datafusion-execution = { version = "43.0.0", default-features = false }
datafusion-physical-plan = { version = "43.0.0", default-features = false }
datafusion-physical-expr = { version = "43.0.0", default-features = false }
arrow = { version = "53.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "53.3.0" }
arrow-buffer = { version = "53.3.0" }
arrow-data = { version = "53.3.0" }
arrow-schema = { version = "53.3.0" }
parquet = { version = "53.3.0", default-features = false, features = ["experimental"] }
datafusion = { version = "44.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
datafusion-common = { version = "44.0.0", default-features = false }
datafusion-functions = { version = "44.0.0", default-features = false, features = ["crypto_expressions"] }
datafusion-functions-nested = { version = "44.0.0", default-features = false }
datafusion-expr = { version = "44.0.0", default-features = false }
datafusion-expr-common = { version = "44.0.0", default-features = false }
datafusion-execution = { version = "44.0.0", default-features = false }
datafusion-physical-plan = { version = "44.0.0", default-features = false }
datafusion-physical-expr = { version = "44.0.0", default-features = false }
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.5.0" }
datafusion-comet-proto = { path = "proto", version = "0.5.0" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand Down
46 changes: 19 additions & 27 deletions native/core/src/execution/expressions/bloom_filter_might_contain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,37 @@ use crate::{execution::util::spark_bloom_filter::SparkBloomFilter, parquet::data
use arrow::record_batch::RecordBatch;
use arrow_array::cast::as_primitive_array;
use arrow_schema::{DataType, Schema};
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
use datafusion::physical_plan::ColumnarValue;
use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_physical_expr::PhysicalExpr;
use std::{
any::Any,
fmt::Display,
hash::{Hash, Hasher},
sync::Arc,
};
use std::hash::Hash;
use std::{any::Any, fmt::Display, sync::Arc};

/// A physical expression that checks if a value might be in a bloom filter. It corresponds to the
/// Spark's `BloomFilterMightContain` expression.
#[derive(Debug, Hash)]
#[derive(Debug, Eq)]
pub struct BloomFilterMightContain {
pub bloom_filter_expr: Arc<dyn PhysicalExpr>,
pub value_expr: Arc<dyn PhysicalExpr>,
bloom_filter: Option<SparkBloomFilter>,
}

impl Hash for BloomFilterMightContain {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.bloom_filter_expr.hash(state);
self.value_expr.hash(state);
self.bloom_filter.hash(state);
}
}

impl PartialEq for BloomFilterMightContain {
fn eq(&self, other: &Self) -> bool {
self.bloom_filter_expr.eq(&other.bloom_filter_expr)
&& self.value_expr.eq(&other.value_expr)
&& self.bloom_filter.eq(&other.bloom_filter)
}
}

impl Display for BloomFilterMightContain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
Expand All @@ -49,18 +60,6 @@ impl Display for BloomFilterMightContain {
}
}

impl PartialEq<dyn Any> for BloomFilterMightContain {
fn eq(&self, _other: &dyn Any) -> bool {
down_cast_any_ref(_other)
.downcast_ref::<Self>()
.map(|other| {
self.bloom_filter_expr.eq(&other.bloom_filter_expr)
&& self.value_expr.eq(&other.value_expr)
})
.unwrap_or(false)
}
}

fn evaluate_bloom_filter(
bloom_filter_expr: &Arc<dyn PhysicalExpr>,
) -> Result<Option<SparkBloomFilter>> {
Expand Down Expand Up @@ -141,11 +140,4 @@ impl PhysicalExpr for BloomFilterMightContain {
Arc::clone(&children[1]),
)?))
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.bloom_filter_expr.hash(&mut s);
self.value_expr.hash(&mut s);
self.hash(&mut s);
}
}
23 changes: 2 additions & 21 deletions native/core/src/execution/expressions/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Schema, TimeUnit};
use datafusion::logical_expr::ColumnarValue;
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
use datafusion_common::{internal_err, ScalarValue};
use datafusion_physical_expr::PhysicalExpr;
use jni::{
Expand All @@ -32,11 +31,11 @@ use jni::{
use std::{
any::Any,
fmt::{Display, Formatter},
hash::{Hash, Hasher},
hash::Hash,
sync::Arc,
};

#[derive(Debug, Hash)]
#[derive(Debug, Hash, PartialEq, Eq)]
pub struct Subquery {
/// The ID of the execution context that owns this subquery. We use this ID to retrieve the
/// subquery result.
Expand All @@ -63,19 +62,6 @@ impl Display for Subquery {
}
}

impl PartialEq<dyn Any> for Subquery {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| {
self.id.eq(&x.id)
&& self.data_type.eq(&x.data_type)
&& self.exec_context_id.eq(&x.exec_context_id)
})
.unwrap_or(false)
}
}

impl PhysicalExpr for Subquery {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -209,9 +195,4 @@ impl PhysicalExpr for Subquery {
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s)
}
}
9 changes: 4 additions & 5 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::RecordBatch;
use datafusion::{
execution::{
disk_manager::DiskManagerConfig,
runtime_env::{RuntimeConfig, RuntimeEnv},
},
execution::{disk_manager::DiskManagerConfig, runtime_env::RuntimeEnv},
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
prelude::{SessionConfig, SessionContext},
};
Expand All @@ -51,6 +48,7 @@ use crate::{
};
use datafusion_comet_proto::spark_operator::Operator;
use datafusion_common::ScalarValue;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use futures::stream::StreamExt;
use jni::{
objects::GlobalRef,
Expand Down Expand Up @@ -191,7 +189,7 @@ fn prepare_datafusion_session_context(
memory_fraction: f64,
comet_task_memory_manager: Arc<GlobalRef>,
) -> CometResult<SessionContext> {
let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs);

// Check if we are using unified memory manager integrated with Spark.
if use_unified_memory_manager {
Expand Down Expand Up @@ -219,6 +217,7 @@ fn prepare_datafusion_session_context(
&ScalarValue::Float64(Some(1.1)),
);

#[allow(deprecated)]
let runtime = RuntimeEnv::try_new(rt_config)?;

let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime));
Expand Down
4 changes: 3 additions & 1 deletion native/core/src/execution/operators/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use arrow_array::{
use arrow_data::transform::MutableArrayData;
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaRef};

use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
Expand Down Expand Up @@ -78,7 +79,8 @@ impl CopyExec {
let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Final,
Boundedness::Bounded,
);

Self {
Expand Down
6 changes: 4 additions & 2 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

use arrow_array::{RecordBatch, RecordBatchOptions};
use arrow_schema::SchemaRef;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::{
execution::TaskContext,
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream,
},
};
Expand Down Expand Up @@ -54,7 +55,8 @@ impl ExpandExec {
let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Final,
Boundedness::Bounded,
);

Self {
Expand Down
3 changes: 2 additions & 1 deletion native/core/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl FilterExec {
Ok(PlanProperties::new(
eq_properties,
input.output_partitioning().clone(), // Output Partitioning
input.execution_mode(), // Execution Mode
input.pipeline_behavior(),
input.boundedness(),
))
}
}
Expand Down
4 changes: 3 additions & 1 deletion native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow_data::ffi::FFI_ArrowArray;
use arrow_data::ArrayData;
use arrow_schema::ffi::FFI_ArrowSchema;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
};
Expand Down Expand Up @@ -122,7 +123,8 @@ impl ScanExec {
// The partitioning is not important because we are not using DataFusion's
// query planner or optimizer
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Final,
Boundedness::Bounded,
);

Ok(Self {
Expand Down
Loading

0 comments on commit d245f0e

Please sign in to comment.