Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support interval multiplication and division by arbitrary numerics #6906

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 316 additions & 0 deletions arrow-arith/src/numeric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,24 @@ fn arithmetic_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) -> Result<ArrayRef, A
(Interval(YearMonth), Interval(YearMonth)) => interval_op::<IntervalYearMonthType>(op, l, l_scalar, r, r_scalar),
(Interval(DayTime), Interval(DayTime)) => interval_op::<IntervalDayTimeType>(op, l, l_scalar, r, r_scalar),
(Interval(MonthDayNano), Interval(MonthDayNano)) => interval_op::<IntervalMonthDayNanoType>(op, l, l_scalar, r, r_scalar),
(Interval(unit), rhs) if rhs.is_numeric() && matches!(op, Op::Mul | Op::MulWrapping) =>
match unit {
YearMonth => interval_mul_op::<IntervalYearMonthType>(op, l, l_scalar, r, r_scalar),
DayTime => interval_mul_op::<IntervalDayTimeType>(op, l, l_scalar, r, r_scalar),
MonthDayNano => interval_mul_op::<IntervalMonthDayNanoType>(op, l, l_scalar, r, r_scalar),
},
(lhs, Interval(unit)) if lhs.is_numeric() && matches!(op, Op::Mul | Op::MulWrapping) =>
match unit {
YearMonth => interval_mul_op::<IntervalYearMonthType>(op, r, r_scalar, l, l_scalar),
DayTime => interval_mul_op::<IntervalDayTimeType>(op, r, r_scalar, l, l_scalar),
MonthDayNano => interval_mul_op::<IntervalMonthDayNanoType>(op, r, r_scalar, l, l_scalar),
},
(Interval(unit), rhs) if rhs.is_numeric() && matches!(op, Op::Div) =>
match unit {
YearMonth => interval_div_op::<IntervalYearMonthType>(op, l, l_scalar, r, r_scalar),
DayTime => interval_div_op::<IntervalDayTimeType>(op, l, l_scalar, r, r_scalar),
MonthDayNano => interval_div_op::<IntervalMonthDayNanoType>(op, l, l_scalar, r, r_scalar),
},
Comment on lines +233 to +250

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to keep these patterns concerned only with the lhs/rhs types and leaving the switch on op to interval_op instead of adding interval_mul_op/interval_div_op. Since the preparation to dispatching a mul or a div are similar, keeping it all inside interval_op and switching on op may reduce binary size.

(Date32, _) => date_op::<Date32Type>(op, l, l_scalar, r, r_scalar),
(Date64, _) => date_op::<Date64Type>(op, l, l_scalar, r, r_scalar),
(Decimal128(_, _), Decimal128(_, _)) => decimal_op::<Decimal128Type>(op, l, l_scalar, r, r_scalar),
Expand Down Expand Up @@ -550,6 +568,21 @@ date!(Date64Type);
trait IntervalOp: ArrowPrimitiveType {
fn add(left: Self::Native, right: Self::Native) -> Result<Self::Native, ArrowError>;
fn sub(left: Self::Native, right: Self::Native) -> Result<Self::Native, ArrowError>;
fn mul_int(left: Self::Native, right: i32) -> Result<Self::Native, ArrowError>;
fn mul_float(left: Self::Native, right: f64) -> Result<Self::Native, ArrowError>;
fn div_int(left: Self::Native, right: i32) -> Result<Self::Native, ArrowError>;
fn div_float(left: Self::Native, right: f64) -> Result<Self::Native, ArrowError>;
Comment on lines +571 to +574

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of instantiating a single-value interval [/*] number operation to build all the array operations we want, we can approach this in a way that avoids the combinatorial explosion.

To implement interval_array [/*] number operations we need operations that convert intervals to integers that count the number of the smallest unit of the interval type, regular int and float multiplication/divisions (which already exist), and then the conversion back to interval types. These conversion operations are parameterized by a single type so the number of specializations isn't a product of the number of interval types and the number of int and float types. All these operations are at the array level and not at the single-value level.

  • IntervalYearMonthType is already an int32 array (number of months) [1], so just fallback to int32 x ... kernels and convert the result to int32 number of months again (conversion might not even be needed depending on what rhs is)

  • IntervalDayTimeType is days and milliseconds (both int32) so if the whole array is converted to an int64 array of milliseconds you can delegate to regular * / and convert back the result

  • IntervalMonthDayNanoType similar idea.

[1] https://github.com/apache/arrow/blob/02a165922e46e5ed6dd3ed2446141cd0922a7c54/format/Schema.fbs#L398

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately intervals cannot be coerced to a duration as proposed, doing so changes their semantic meaning, 2 months is not the same as 60 days nor is 48 hours the same as 2 days (because of daylight savings). This has implications when performing arithmetic involving them

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately intervals cannot be coerced to a duration as proposed...

For the purpose of multiplication/division, some of the interval types can (internally to the kernel) -- before returning the output array, they are converted back to interval type to preserve their semantic meaning.

YEAR_MONTH

Example: the double of 1 year and 1 month is 2 years and 2 months which is what you get if you convert (1y, 1m) to 13m then 2x to 26m which should then become (2y, 2m). This is all fine because every year, no matter what, has 12 months. So 12 months can become a year in the interval. This is why the underlying implementation itself is in number of months.

Daylight savings and calendar consideration only come into play when you add an interval to a specific timestamp/date which is not the case here. It would be wrong to add 1y+1m to 2024-12-20 as just adding the number of seconds that interval in 30-day months converts to, but the operation we are doing here is the scaling of the interval by some factor while preserving the semantic meaning.

But maybe the multiplication by a float should not be available. Any system that wants to support interval[YEAR_MONTH]*float multiplication should round the float first or convert the interval to an interval type with more resolution.

DAY_TIME

IntervalDayTimeType expects the millis part to be less than a whole day worth of milliseconds (i.e. leap seconds are not accounted for) [1][2]. Perhaps the output of scaling operations on them (mul/div) should be a MONTH_DAY_NANO interval. Because unlike years that always have 12 months, days don't always have the same number of millis in them.

MONTH_DAY_NANO

For MonthDayNano my approach breaks indeed -- the components should be scaled independently. And the number of nanos is unbounded -- it doesn't have to be less than a day's worth of nanos.

Multiplication by float and division gets really confusing though.

[1] https://github.com/apache/arrow/blob/02a165922e46e5ed6dd3ed2446141cd0922a7c54/format/Schema.fbs#L398
[2] apache/arrow@7f7f72d

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bkietz what are your thoughts on multiplication and division of interval types? I remember you being involved in some discussion regarding these types in the past.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are converted back to interval type to preserve their semantic meaning

Right but this conversion is not possible for anything other than YearMonth (which is just a number of moths IIRC).

For anything else this conversion is lossy as days and months do not correspond to a fixed duration.

some of the interval types can (internally to the kernel)

If by some you mean one 😅

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main point of my suggestion is to think of these operations in terms of whole-array operations instead of single-value operations.

To treat the interval components separately, we would only need functions that split an array of interval values into multiple arrays (each per component) so that all the arithmetic can be delegated to existing arithmetic operations for all int/float combinations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a sound idea in principle, and this sort of approach to avoiding codegen is something I would generally advocate for, however, I don't think it works in this case.

In particular we don't support arithmetic operations between heterogeneous primitive types, e.g. int32 * float64, instead leaving such coercion to to query planners which are better placed to do this. The result is that we would effectively need the query engines to do this explosion for us, in order for things like interval * f64 to work.

Even discounting this, the performance characteristics of such an approach would be unfortunate, at that point you'd be better off using a StructArray

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felipecrv

@bkietz what are your thoughts on multiplication and division of interval types?

My contention is that (as noted above) intervals don't really map to a duration; they map to a pair of timestamps or dates. Indeed, when considered in isolation from any associated start or end date column, the only interpretation we have left for an interval column is in terms of its (flawed) conversion to duration. Therefore, the only interactions which should be supported on intervals is arithmetic with dates and timestamps:

YearMonthInterval +/- Date32 -> Date32
DayTimeInterval +/- Date64 -> Date64
MonthDayNanoInterval +/- TimestampNS -> TimestampNS
...

From these and an appropriate start or end date column, we can make the correct conversion to duration and do well defined multiplication and division.

}

/// Helper function to safely convert f64 to i32, checking for overflow and invalid values
fn f64_to_i32(value: f64) -> Result<i32, ArrowError> {
if !value.is_finite() || value > i32::MAX as f64 || value < i32::MIN as f64 {
Err(ArrowError::ComputeError(
"Division result out of i32 range".to_string(),
))
} else {
Ok(value as i32)
}
Comment on lines +579 to +585
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a builtin faillible version of float_value as i32 that we could use instead of implementing the logic ourselves?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is approached in the way I described above you won't be converting floats to integers. When you get a floating point array representing the number of milliseconds and you need to convert that to IntervalDayTimeType you will be dividing the input to MILLIS_IN_A_DAY and round the rest which comfortably fits in a 32-bit integer.

}

impl IntervalOp for IntervalYearMonthType {
Expand All @@ -560,6 +593,33 @@ impl IntervalOp for IntervalYearMonthType {
fn sub(left: Self::Native, right: Self::Native) -> Result<Self::Native, ArrowError> {
left.sub_checked(right)
}

fn mul_int(left: Self::Native, right: i32) -> Result<Self::Native, ArrowError> {
left.mul_checked(right)
}

fn mul_float(left: Self::Native, right: f64) -> Result<Self::Native, ArrowError> {
let result = (left as f64 * right) as i32;
Ok(result)
}

fn div_int(left: Self::Native, right: i32) -> Result<Self::Native, ArrowError> {
if right == 0 {
return Err(ArrowError::DivideByZero);
}

let result = left / right;
Ok(result)
}

fn div_float(left: Self::Native, right: f64) -> Result<Self::Native, ArrowError> {
if right == 0.0 {
return Err(ArrowError::DivideByZero);
}

let result = left as f64 / right;
f64_to_i32(result)
}
}

impl IntervalOp for IntervalDayTimeType {
Expand All @@ -578,6 +638,70 @@ impl IntervalOp for IntervalDayTimeType {
let ms = l_ms.sub_checked(r_ms)?;
Ok(Self::make_value(days, ms))
}

fn mul_int(left: Self::Native, right: i32) -> Result<Self::Native, ArrowError> {
let (days, ms) = Self::to_parts(left);
Ok(IntervalDayTimeType::make_value(
days.mul_checked(right)?,
ms.mul_checked(right)?,
))
}

fn mul_float(left: Self::Native, right: f64) -> Result<Self::Native, ArrowError> {
let (days, ms) = Self::to_parts(left);

// Calculate total days including fractional part
let total_days = days as f64 * right;
// Split into whole and fractional days
let whole_days = total_days.trunc() as i32;
let frac_days = total_days.fract();

// Convert fractional days to milliseconds (24 * 60 * 60 * 1000 = 86_400_000 ms per day)
let frac_ms = f64_to_i32(frac_days * 86_400_000.0)?;

// Calculate total milliseconds including the fractional days
let total_ms = f64_to_i32(ms as f64 * right)? + frac_ms;

Ok(Self::make_value(whole_days, total_ms))
}

fn div_int(left: Self::Native, right: i32) -> Result<Self::Native, ArrowError> {
if right == 0 {
return Err(ArrowError::DivideByZero);
}
let (days, ms) = Self::to_parts(left);

// Convert everything to milliseconds to handle remainders
let total_ms = ms as i64 + (days as i64 * 86_400_000); // 24 * 60 * 60 * 1000
let result_ms = total_ms / right as i64;

// Convert back to days and milliseconds
let result_days = result_ms as f64 / 86_400_000.0;
let result_ms = result_ms % 86_400_000;

let result_days_i32 = f64_to_i32(result_days)?;
let result_ms_i32 = f64_to_i32(result_ms as f64)?;
Ok(Self::make_value(result_days_i32, result_ms_i32))
}

fn div_float(left: Self::Native, right: f64) -> Result<Self::Native, ArrowError> {
if right == 0.0 {
return Err(ArrowError::DivideByZero);
}
let (days, ms) = Self::to_parts(left);

// Convert everything to milliseconds to handle remainders
let total_ms = (ms as f64 + (days as f64 * 86_400_000.0)) / right;

// Convert back to days and milliseconds
let result_days = (total_ms / 86_400_000.0).floor();
let result_ms = total_ms % 86_400_000.0;

let result_days_i32 = f64_to_i32(result_days)?;
let result_ms_i32 = f64_to_i32(result_ms)?;

Ok(Self::make_value(result_days_i32, result_ms_i32))
}
}

impl IntervalOp for IntervalMonthDayNanoType {
Expand All @@ -598,6 +722,33 @@ impl IntervalOp for IntervalMonthDayNanoType {
let nanos = l_nanos.sub_checked(r_nanos)?;
Ok(Self::make_value(months, days, nanos))
}

fn mul_int(left: Self::Native, right: i32) -> Result<Self::Native, ArrowError> {
let (months, days, nanos) = Self::to_parts(left);
Ok(Self::make_value(
months.mul_checked(right)?,
days.mul_checked(right)?,
nanos.mul_checked(right as i64)?,
))
}

fn mul_float(_left: Self::Native, _right: f64) -> Result<Self::Native, ArrowError> {
Err(ArrowError::InvalidArgumentError(
"Floating point multiplication not supported for MonthDayNano intervals".to_string(),
))
}

fn div_int(_left: Self::Native, _right: i32) -> Result<Self::Native, ArrowError> {
Err(ArrowError::InvalidArgumentError(
"Integer division not supported for MonthDayNano intervals".to_string(),
))
}

fn div_float(_left: Self::Native, _right: f64) -> Result<Self::Native, ArrowError> {
Err(ArrowError::InvalidArgumentError(
"Floating point division not supported for MonthDayNano intervals".to_string(),
))
}
}

/// Perform arithmetic operation on an interval array
Expand All @@ -621,6 +772,98 @@ fn interval_op<T: IntervalOp>(
}
}

/// Perform multiplication between an interval array and a numeric array
fn interval_mul_op<T: IntervalOp>(
op: Op,
l: &dyn Array,
l_s: bool,
r: &dyn Array,
r_s: bool,
) -> Result<ArrayRef, ArrowError> {
Comment on lines +775 to +782

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said in the first comment, these should become different cases in

fn interval_op<T: IntervalOp>(
    op: Op,
    l: &dyn Array,
    l_s: bool,
    r: &dyn Array,
    r_s: bool,
) -> Result<ArrayRef, ArrowError> {
    let l = l.as_primitive::<T>();
    let r = r.as_primitive::<T>();
    match op {
        Op::Add | Op::AddWrapping => Ok(try_op_ref!(T, l, l_s, r, r_s, T::add(l, r))),
        Op::Sub | Op::SubWrapping => Ok(try_op_ref!(T, l, l_s, r, r_s, T::sub(l, r))),
        -- NEW CASES HERE --

And then instead of relying on try_op_ref! that produces array operations based on single-value functions, you will first convert the interval array to a integer array (for both mul and div so no need for different traits), then delegate to either mul of div of numeric inputs (no interval types involved at this point), then you take that output and convert the desired interval type with the appropriate kernel. These conversion functions might be defined with try_op_ref!.

// Assume the interval is the left argument
if let Some(l_interval) = l.as_primitive_opt::<T>() {
match r.data_type() {
DataType::Int32 => {
let r_int = r.as_primitive::<Int32Type>();
Ok(try_op_ref!(
T,
l_interval,
l_s,
r_int,
r_s,
T::mul_int(l_interval, r_int)
))
}
DataType::Float64 => {
let r_float = r.as_primitive::<Float64Type>();
Ok(try_op_ref!(
T,
l_interval,
l_s,
r_float,
r_s,
T::mul_float(l_interval, r_float)
))
}
_ => Err(ArrowError::InvalidArgumentError(format!(
"Invalid numeric type for interval multiplication: {}",
r.data_type()
))),
}
} else {
Err(ArrowError::InvalidArgumentError(format!(
"Invalid interval multiplication: {} {op} {}",
l.data_type(),
r.data_type()
)))
}
}

fn interval_div_op<T: IntervalOp>(
op: Op,
l: &dyn Array,
l_s: bool,
r: &dyn Array,
r_s: bool,
) -> Result<ArrayRef, ArrowError> {
if let Some(l_interval) = l.as_primitive_opt::<T>() {
match r.data_type() {
DataType::Int32 => {
let r_int = r.as_primitive::<Int32Type>();
Ok(try_op_ref!(
T,
l_interval,
l_s,
r_int,
r_s,
T::div_int(l_interval, r_int)
))
}
DataType::Float64 => {
let r_float = r.as_primitive::<Float64Type>();
Ok(try_op_ref!(
T,
l_interval,
l_s,
r_float,
r_s,
T::div_float(l_interval, r_float)
))
}
_ => Err(ArrowError::InvalidArgumentError(format!(
"Invalid numeric type for interval division: {}",
r.data_type()
))),
}
} else {
Err(ArrowError::InvalidArgumentError(format!(
"Invalid interval division: {} {op} {}",
l.data_type(),
r.data_type()
)))
}
}

fn duration_op<T: ArrowPrimitiveType>(
op: Op,
l: &dyn Array,
Expand Down Expand Up @@ -1356,6 +1599,79 @@ mod tests {
err,
"Arithmetic overflow: Overflow happened on: 2147483647 + 1"
);

// Test interval multiplication
let a = IntervalYearMonthArray::from(vec![IntervalYearMonthType::make_value(2, 4)]);
let b = PrimitiveArray::<Int32Type>::from(vec![5]);
let result = mul(&a, &b).unwrap();
assert_eq!(
result.as_ref(),
&IntervalYearMonthArray::from(vec![IntervalYearMonthType::make_value(11, 8),])
);

// swap a and b
let result = mul(&b, &a).unwrap();
assert_eq!(
result.as_ref(),
&IntervalYearMonthArray::from(vec![IntervalYearMonthType::make_value(11, 8),])
);

let a = IntervalDayTimeArray::from(vec![
IntervalDayTimeType::make_value(10, 7200000), // 10 days, 2 hours
]);
let b = PrimitiveArray::<Int32Type>::from(vec![3]);
let result = mul(&a, &b).unwrap();
assert_eq!(
result.as_ref(),
&IntervalDayTimeArray::from(vec![
IntervalDayTimeType::make_value(30, 21600000), // 30 days, 6 hours
])
);

let a = IntervalMonthDayNanoArray::from(vec![
IntervalMonthDayNanoType::make_value(12, 15, 5_000_000_000), // 12 months, 15 days, 5 seconds
]);
let b = PrimitiveArray::<Int32Type>::from(vec![2]);
let result = mul(&a, &b).unwrap();
assert_eq!(
result.as_ref(),
&IntervalMonthDayNanoArray::from(vec![
IntervalMonthDayNanoType::make_value(24, 30, 10_000_000_000), // 24 months, 30 days, 10 seconds
])
);

let a = IntervalYearMonthArray::from(vec![IntervalYearMonthType::make_value(1, 6)]); // 1 year, 6 months
let b = PrimitiveArray::<Float64Type>::from(vec![2.5]);
let result = mul(&a, &b).unwrap();
assert_eq!(
result.as_ref(),
&IntervalYearMonthArray::from(vec![IntervalYearMonthType::make_value(3, 9)]) // 3 years, 9 months = 45 months
);

let a = IntervalDayTimeArray::from(vec![
IntervalDayTimeType::make_value(5, 3600000), // 5 days, 1 hour
]);
let b = PrimitiveArray::<Int32Type>::from(vec![-2]);
let result = mul(&a, &b).unwrap();
assert_eq!(
result.as_ref(),
&IntervalDayTimeArray::from(vec![
IntervalDayTimeType::make_value(-10, -7200000), // -10 days, -2 hours
])
);

// Test interval division
let a = IntervalDayTimeArray::from(vec![
IntervalDayTimeType::make_value(15, 3600000), // 15 days, 1 hour
]);
let b = PrimitiveArray::<Int32Type>::from(vec![2]);
let result = div(&a, &b).unwrap();
assert_eq!(
result.as_ref(),
&IntervalDayTimeArray::from(vec![
IntervalDayTimeType::make_value(7, 45000000), // 7 days, 12.5 hours (half of 15 days, 1 hour)
])
);
}

fn test_duration_impl<T: ArrowPrimitiveType<Native = i64>>() {
Expand Down
4 changes: 2 additions & 2 deletions arrow-array/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ macro_rules! repeat_pat {
/// [`DataType`]: arrow_schema::DataType
#[macro_export]
macro_rules! downcast_integer {
($($data_type:expr),+ => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) => {
($($data_type:expr),+ => ($m:path $(, $args:tt)*), $($p:pat $( if $guard:expr )? => $fallback:expr $(,)*)*) => {
match ($($data_type),+) {
$crate::repeat_pat!($crate::cast::__private::DataType::Int8, $($data_type),+) => {
$m!($crate::types::Int8Type $(, $args)*)
Expand All @@ -98,7 +98,7 @@ macro_rules! downcast_integer {
$crate::repeat_pat!($crate::cast::__private::DataType::UInt64, $($data_type),+) => {
$m!($crate::types::UInt64Type $(, $args)*)
}
$($p => $fallback,)*
$($p $( if $guard )?=> $fallback,)*
}
};
}
Expand Down
Loading