Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(substrait): introduce consume_rel and consume_expression #13963

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

vbarua
Copy link
Contributor

@vbarua vbarua commented Dec 31, 2024

Which issue does this PR close?

Additional work for #13318

Rationale for this change

Improve the ergonomics of the consumer and fixed a gap in null literal handling.

What changes are included in this PR?

Consumer Improvements

Adds two handlers to the SubstraitConsumer

  • consume_rel
  • consume_expression

and routes all calls to from_substrait_rel and from_substrait_rex through them to allow users to customize behaviour at the Rel and Expression level.

Null Literal Improvements

The existing code in from_substrait_null would fail to handle nulls of user-defined types. The null handling code has been simplified to first convert the type of the null into a DataFusion DataType, which already allows for conversion of user-defined Substrait types using consume_user_defined_type and then converting that DataType into a ScalarValue.

As an added benefit, this also allows us to remove a big-match block in from_substrait_null.

Are these changes tested?

Covered by existing conversion tests.

Are there any user-facing changes?

There are user-facing changes that require not work from uses. There are now additional methods on the SubstraitConsumer trait, but they have default implementations.

Route calls to from_substrait_rel and from_substrait_rex through the
SubstraitConsumer in order to allow users to provide their own behaviour
@vbarua vbarua force-pushed the vbarua/more-substrait-consumer-improvements branch from 5b49daa to 04175bd Compare December 31, 2024 22:41
null_type: &Type,
dfs_names: &[String],
name_idx: &mut usize,
) -> Result<ScalarValue> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The match block below is effectively replaced by converting the Substrait Type to a DataFusion DataType, and then invoking ScalarValue::try_from which does the same thing effectively:

impl TryFrom<&DataType> for ScalarValue {
type Error = DataFusionError;
/// Create a Null instance of ScalarValue for this datatype
fn try_from(data_type: &DataType) -> Result<Self> {
Ok(match data_type {
DataType::Boolean => ScalarValue::Boolean(None),
DataType::Float16 => ScalarValue::Float16(None),
DataType::Float64 => ScalarValue::Float64(None),
DataType::Float32 => ScalarValue::Float32(None),
DataType::Int8 => ScalarValue::Int8(None),
DataType::Int16 => ScalarValue::Int16(None),
DataType::Int32 => ScalarValue::Int32(None),
DataType::Int64 => ScalarValue::Int64(None),
DataType::UInt8 => ScalarValue::UInt8(None),
DataType::UInt16 => ScalarValue::UInt16(None),
DataType::UInt32 => ScalarValue::UInt32(None),
DataType::UInt64 => ScalarValue::UInt64(None),
DataType::Decimal128(precision, scale) => {
ScalarValue::Decimal128(None, *precision, *scale)
}
DataType::Decimal256(precision, scale) => {
ScalarValue::Decimal256(None, *precision, *scale)
}
DataType::Utf8 => ScalarValue::Utf8(None),
DataType::LargeUtf8 => ScalarValue::LargeUtf8(None),
DataType::Utf8View => ScalarValue::Utf8View(None),
DataType::Binary => ScalarValue::Binary(None),
DataType::BinaryView => ScalarValue::BinaryView(None),
DataType::FixedSizeBinary(len) => ScalarValue::FixedSizeBinary(*len, None),
DataType::LargeBinary => ScalarValue::LargeBinary(None),
DataType::Date32 => ScalarValue::Date32(None),
DataType::Date64 => ScalarValue::Date64(None),
DataType::Time32(TimeUnit::Second) => ScalarValue::Time32Second(None),
DataType::Time32(TimeUnit::Millisecond) => {
ScalarValue::Time32Millisecond(None)
}
DataType::Time64(TimeUnit::Microsecond) => {
ScalarValue::Time64Microsecond(None)
}
DataType::Time64(TimeUnit::Nanosecond) => ScalarValue::Time64Nanosecond(None),
DataType::Timestamp(TimeUnit::Second, tz_opt) => {
ScalarValue::TimestampSecond(None, tz_opt.clone())
}
DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
ScalarValue::TimestampMillisecond(None, tz_opt.clone())
}
DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
ScalarValue::TimestampMicrosecond(None, tz_opt.clone())
}
DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
ScalarValue::TimestampNanosecond(None, tz_opt.clone())
}
DataType::Interval(IntervalUnit::YearMonth) => {
ScalarValue::IntervalYearMonth(None)
}
DataType::Interval(IntervalUnit::DayTime) => {
ScalarValue::IntervalDayTime(None)
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
ScalarValue::IntervalMonthDayNano(None)
}
DataType::Duration(TimeUnit::Second) => ScalarValue::DurationSecond(None),
DataType::Duration(TimeUnit::Millisecond) => {
ScalarValue::DurationMillisecond(None)
}
DataType::Duration(TimeUnit::Microsecond) => {
ScalarValue::DurationMicrosecond(None)
}
DataType::Duration(TimeUnit::Nanosecond) => {
ScalarValue::DurationNanosecond(None)
}
DataType::Dictionary(index_type, value_type) => ScalarValue::Dictionary(
index_type.clone(),
Box::new(value_type.as_ref().try_into()?),
),
// `ScalaValue::List` contains single element `ListArray`.
DataType::List(field_ref) => ScalarValue::List(Arc::new(
GenericListArray::new_null(Arc::clone(field_ref), 1),
)),
// `ScalarValue::LargeList` contains single element `LargeListArray`.
DataType::LargeList(field_ref) => ScalarValue::LargeList(Arc::new(
GenericListArray::new_null(Arc::clone(field_ref), 1),
)),
// `ScalaValue::FixedSizeList` contains single element `FixedSizeList`.
DataType::FixedSizeList(field_ref, fixed_length) => {
ScalarValue::FixedSizeList(Arc::new(FixedSizeListArray::new_null(
Arc::clone(field_ref),
*fixed_length,
1,
)))
}
DataType::Struct(fields) => ScalarValue::Struct(
new_null_array(&DataType::Struct(fields.to_owned()), 1)
.as_struct()
.to_owned()
.into(),
),
DataType::Map(fields, sorted) => ScalarValue::Map(
new_null_array(&DataType::Map(fields.to_owned(), sorted.to_owned()), 1)
.as_map()
.to_owned()
.into(),
),
DataType::Union(fields, mode) => {
ScalarValue::Union(None, fields.clone(), *mode)
}
DataType::Null => ScalarValue::Null,
_ => {
return _not_impl_err!(
"Can't create a scalar from data_type \"{data_type:?}\""
);
}
})
}
}

@vbarua
Copy link
Contributor Author

vbarua commented Dec 31, 2024

@Blizzara some more improvements for the consumer, for when you have a chance. I'm planning on splitting out the consumer and producer into submodules after this and the producer change land.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @vbarua -- this PR looks really nice to me

I'll plan to leave it open for a few days to give @Blizzara a chance to review if they would like to as well.

@@ -233,6 +233,10 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
// These methods have default implementations calling the common handler code, to allow for users
// to re-use common handling logic.

async fn consume_rel(&self, rel: &Rel) -> Result<LogicalPlan> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend documenting such methods with /// style comments that show up in the API docs https://docs.rs/crate/datafusion-substrait/latest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a little bit of documentation for these. I'm planning to add some module levels docs as part of #13864.

Copy link
Contributor

@Blizzara Blizzara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks! And happy new year! :)

datafusion/substrait/src/logical_plan/consumer.rs Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants