Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/apache/datafusion into arra…
Browse files Browse the repository at this point in the history
…y-coercion
  • Loading branch information
jayzhan211 committed Oct 10, 2024
2 parents 2de7041 + 58c32cb commit 6afbb51
Show file tree
Hide file tree
Showing 70 changed files with 1,453 additions and 730 deletions.
44 changes: 40 additions & 4 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3577,9 +3577,8 @@ impl fmt::Display for ScalarValue {
columns
.iter()
.zip(fields.iter())
.enumerate()
.map(|(index, (column, field))| {
if nulls.is_some_and(|b| b.is_null(index)) {
.map(|(column, field)| {
if nulls.is_some_and(|b| b.is_null(0)) {
format!("{}:NULL", field.name())
} else if let DataType::Struct(_) = field.data_type() {
let sv = ScalarValue::Struct(Arc::new(
Expand Down Expand Up @@ -3875,7 +3874,7 @@ mod tests {
use arrow::compute::{is_null, kernels};
use arrow::error::ArrowError;
use arrow::util::pretty::pretty_format_columns;
use arrow_buffer::Buffer;
use arrow_buffer::{Buffer, NullBuffer};
use arrow_schema::Fields;
use chrono::NaiveDate;
use rand::Rng;
Expand Down Expand Up @@ -6589,6 +6588,43 @@ mod tests {
assert_batches_eq!(&expected, &[batch]);
}

#[test]
fn test_null_bug() {
let field_a = Field::new("a", DataType::Int32, true);
let field_b = Field::new("b", DataType::Int32, true);
let fields = Fields::from(vec![field_a, field_b]);

let array_a = Arc::new(Int32Array::from_iter_values([1]));
let array_b = Arc::new(Int32Array::from_iter_values([2]));
let arrays: Vec<ArrayRef> = vec![array_a, array_b];

let mut not_nulls = BooleanBufferBuilder::new(1);
not_nulls.append(true);
let not_nulls = not_nulls.finish();
let not_nulls = Some(NullBuffer::new(not_nulls));

let ar = unsafe { StructArray::new_unchecked(fields, arrays, not_nulls) };
let s = ScalarValue::Struct(Arc::new(ar));

assert_eq!(s.to_string(), "{a:1,b:2}");
assert_eq!(format!("{s:?}"), r#"Struct({a:1,b:2})"#);

let ScalarValue::Struct(arr) = s else {
panic!("Expected struct");
};

//verify compared to arrow display
let batch = RecordBatch::try_from_iter(vec![("s", arr as _)]).unwrap();
let expected = [
"+--------------+",
"| s |",
"+--------------+",
"| {a: 1, b: 2} |",
"+--------------+",
];
assert_batches_eq!(&expected, &[batch]);
}

#[test]
fn test_struct_display_null() {
let fields = vec![Field::new("a", DataType::Int32, false)];
Expand Down
23 changes: 8 additions & 15 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

use datafusion::functions_window::row_number::row_number_udwf;
use datafusion_functions_window::dense_rank::dense_rank_udwf;
use datafusion_functions_window::rank::rank_udwf;
use hashbrown::HashMap;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
Expand Down Expand Up @@ -224,9 +226,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// )
(
// Window function
WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::Rank),
WindowFunctionDefinition::WindowUDF(rank_udwf()),
// its name
"RANK",
"rank",
// no argument
vec![],
// Expected causality, for None cases causality will be determined from window frame boundaries
Expand All @@ -238,11 +240,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// )
(
// Window function
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::DenseRank,
),
WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
// its name
"DENSE_RANK",
"dense_rank",
// no argument
vec![],
// Expected causality, for None cases causality will be determined from window frame boundaries
Expand Down Expand Up @@ -382,19 +382,12 @@ fn get_random_function(
);
window_fn_map.insert(
"rank",
(
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::Rank,
),
vec![],
),
(WindowFunctionDefinition::WindowUDF(rank_udwf()), vec![]),
);
window_fn_map.insert(
"dense_rank",
(
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::DenseRank,
),
WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
vec![],
),
);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr-common/src/interval_arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1753,7 +1753,7 @@ impl NullableInterval {
}
_ => Ok(Self::MaybeNull { values }),
}
} else if op.is_comparison_operator() {
} else if op.supports_propagation() {
Ok(Self::Null {
datatype: DataType::Boolean,
})
Expand Down
16 changes: 13 additions & 3 deletions datafusion/expr-common/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ impl Operator {
)
}

/// Return true if the operator is a comparison operator.
/// Return true if the comparison operator can be used in interval arithmetic and constraint
/// propagation
///
/// For example, 'Binary(a, >, b)' would be a comparison expression.
pub fn is_comparison_operator(&self) -> bool {
/// For example, 'Binary(a, >, b)' expression supports propagation.
pub fn supports_propagation(&self) -> bool {
matches!(
self,
Operator::Eq
Expand All @@ -163,6 +164,15 @@ impl Operator {
)
}

/// Return true if the comparison operator can be used in interval arithmetic and constraint
/// propagation
///
/// For example, 'Binary(a, >, b)' expression supports propagation.
#[deprecated(since = "43.0.0", note = "please use `supports_propagation` instead")]
pub fn is_comparison_operator(&self) -> bool {
self.supports_propagation()
}

/// Return true if the operator is a logic operator.
///
/// For example, 'Binary(Binary(a, >, b), AND, Binary(a, <, b + 3))' would
Expand Down
25 changes: 3 additions & 22 deletions datafusion/expr/src/built_in_window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ impl fmt::Display for BuiltInWindowFunction {
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
/// rank of the current row with gaps; same as row_number of its first peer
Rank,
/// rank of the current row without gaps; this function counts peer groups
DenseRank,
/// relative rank of the current row: (rank - 1) / (total rows - 1)
PercentRank,
/// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
CumeDist,
/// integer ranging from 1 to the argument value, dividing the partition as equally as possible
Expand All @@ -72,9 +66,6 @@ impl BuiltInWindowFunction {
pub fn name(&self) -> &str {
use BuiltInWindowFunction::*;
match self {
Rank => "RANK",
DenseRank => "DENSE_RANK",
PercentRank => "PERCENT_RANK",
CumeDist => "CUME_DIST",
Ntile => "NTILE",
Lag => "LAG",
Expand All @@ -90,9 +81,6 @@ impl FromStr for BuiltInWindowFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
Ok(match name.to_uppercase().as_str() {
"RANK" => BuiltInWindowFunction::Rank,
"DENSE_RANK" => BuiltInWindowFunction::DenseRank,
"PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
"CUME_DIST" => BuiltInWindowFunction::CumeDist,
"NTILE" => BuiltInWindowFunction::Ntile,
"LAG" => BuiltInWindowFunction::Lag,
Expand Down Expand Up @@ -127,12 +115,8 @@ impl BuiltInWindowFunction {
})?;

match self {
BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
Ok(DataType::Float64)
}
BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::CumeDist => Ok(DataType::Float64),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
Expand All @@ -145,10 +129,7 @@ impl BuiltInWindowFunction {
pub fn signature(&self) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.
match self {
BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::PercentRank
| BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable),
BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable),
BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead => {
Signature::one_of(
vec![
Expand Down
12 changes: 0 additions & 12 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2598,15 +2598,6 @@ mod test {
Ok(())
}

#[test]
fn test_percent_rank_return_type() -> Result<()> {
let fun = find_df_window_func("percent_rank").unwrap();
let observed = fun.return_type(&[], &[], "")?;
assert_eq!(DataType::Float64, observed);

Ok(())
}

#[test]
fn test_cume_dist_return_type() -> Result<()> {
let fun = find_df_window_func("cume_dist").unwrap();
Expand All @@ -2628,9 +2619,6 @@ mod test {
#[test]
fn test_window_function_case_insensitive() -> Result<()> {
let names = vec![
"rank",
"dense_rank",
"percent_rank",
"cume_dist",
"ntile",
"lag",
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,6 @@ pub fn interval_month_day_nano_lit(value: &str) -> Expr {
/// # use datafusion_expr::test::function_stub::count;
/// # use sqlparser::ast::NullTreatment;
/// # use datafusion_expr::{ExprFunctionExt, lit, Expr, col};
/// # use datafusion_expr::window_function::percent_rank;
/// # // first_value is an aggregate function in another crate
/// # fn first_value(_arg: Expr) -> Expr {
/// unimplemented!() }
Expand All @@ -721,6 +720,9 @@ pub fn interval_month_day_nano_lit(value: &str) -> Expr {
/// // Create a window expression for percent rank partitioned on column a
/// // equivalent to:
/// // `PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS LAST IGNORE NULLS)`
/// // percent_rank is an udwf function in another crate
/// # fn percent_rank() -> Expr {
/// unimplemented!() }
/// let window = percent_rank()
/// .partition_by(vec![col("a")])
/// .order_by(vec![col("b").sort(true, true)])
Expand Down
21 changes: 0 additions & 21 deletions datafusion/expr/src/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,6 @@ use datafusion_common::ScalarValue;

use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};

/// Create an expression to represent the `rank` window function
pub fn rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Rank, vec![]))
}

/// Create an expression to represent the `dense_rank` window function
pub fn dense_rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(
BuiltInWindowFunction::DenseRank,
vec![],
))
}

/// Create an expression to represent the `percent_rank` window function
pub fn percent_rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(
BuiltInWindowFunction::PercentRank,
vec![],
))
}

/// Create an expression to represent the `cume_dist` window function
pub fn cume_dist() -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::CumeDist, vec![]))
Expand Down
Loading

0 comments on commit 6afbb51

Please sign in to comment.