Skip to content

Commit

Permalink
Fix check_not_null_constraints null detection
Browse files Browse the repository at this point in the history
`check_not_null_constraints` (aka `check_not_null_contraits`) checked
for null using `Array::null_count` which does not return real null
count.
  • Loading branch information
findepi committed Oct 21, 2024
1 parent 21ebaf2 commit 1d34535
Showing 1 changed file with 130 additions and 5 deletions.
135 changes: 130 additions & 5 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use futures::stream::{StreamExt, TryStreamExt};
use tokio::task::JoinSet;

Expand Down Expand Up @@ -885,7 +886,13 @@ pub fn check_not_null_constraints(
);
}

if batch.column(index).null_count() > 0 {
if batch
.column(index)
.logical_nulls()
.map(|nulls| nulls.null_count())
.unwrap_or_default()
> 0
{
return exec_err!(
"Invalid batch column at '{}' has null but schema specifies non-nullable",
index
Expand Down Expand Up @@ -920,11 +927,11 @@ pub enum CardinalityEffect {
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{DictionaryArray, Int32Array, NullArray, RunArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use std::any::Any;
use std::sync::Arc;

use arrow_schema::{Schema, SchemaRef};

use datafusion_common::{Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};

Expand Down Expand Up @@ -1068,6 +1075,124 @@ mod tests {
fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
let _ = plan.name();
}
}

// pub mod test;
#[test]
fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
)?,
&vec![0],
)?;
Ok(())
}

#[test]
fn test_check_not_null_constraints_reject_null() -> Result<()> {
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
)?,
&vec![0],
);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().message().to_owned().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable"
);
Ok(())
}

#[test]
fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
// some null value inside REE array
let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
let run_end_array = RunArray::try_new(&run_ends, &values)?;
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
run_end_array.data_type().to_owned(),
true,
)])),
vec![Arc::new(run_end_array)],
)?,
&vec![0],
);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().message().to_owned().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable"
);
Ok(())
}

#[test]
fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
let keys = Int32Array::from(vec![0, 1, 2, 3]);
let dictionary = DictionaryArray::new(keys, values);
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
dictionary.data_type().to_owned(),
true,
)])),
vec![Arc::new(dictionary)],
)?,
&vec![0],
);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().message().to_owned().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable"
);
Ok(())
}

#[test]
fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
// some null value marked out by dictionary array
let values = Arc::new(Int32Array::from(vec![
Some(1),
None, // this null value is masked by dictionary keys
Some(3),
Some(4)]));
let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
let dictionary = DictionaryArray::new(keys, values);
check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
dictionary.data_type().to_owned(),
true,
)])),
vec![Arc::new(dictionary)],
)?,
&vec![0],
)?;
Ok(())
}

#[test]
fn test_check_not_null_constraints_on_null_type() -> Result<()> {
// null value of Null type
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
vec![Arc::new(NullArray::new(3))],
)?,
&vec![0],
);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().message().to_owned().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable"
);
Ok(())
}
}

0 comments on commit 1d34535

Please sign in to comment.