Skip to content

Commit

Permalink
refactor: replaced asterisk with constraint name in get_constraints f…
Browse files Browse the repository at this point in the history
…or table_config and added as_any to DeltaCheck to allow type checking in enforce_checks

Signed-off-by: Alexander Falk <[email protected]>
  • Loading branch information
Nordalf authored and ion-elgreco committed Mar 1, 2025
1 parent d9a8b26 commit f013cbc
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 3 deletions.
60 changes: 59 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1415,9 +1415,14 @@ impl DeltaDataChecker {
));
}

let field_to_select = if check.as_any().is::<Constraint>() {
"*"
} else {
check.get_name()
};
let sql = format!(
"SELECT {} FROM `{table_name}` WHERE NOT ({}) LIMIT 1",
check.get_name(),
field_to_select,
check.get_expression()
);

Expand Down Expand Up @@ -2160,6 +2165,59 @@ mod tests {
assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
}

#[tokio::test]
async fn test_enforce_constraints() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", ArrowDataType::Utf8, false),
Field::new("b", ArrowDataType::Int32, false),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
],
)
.unwrap();
// Empty constraints is okay
let constraints: Vec<Constraint> = vec![];
assert!(DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await
.is_ok());

// Valid invariants return Ok(())
let constraints = vec![
Constraint::new("custom_a", "a is not null"),
Constraint::new("custom_b", "b < 1000"),
];
assert!(DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await
.is_ok());

// Violated invariants returns an error with list of violations
let constraints = vec![
Constraint::new("custom_a", "a is null"),
Constraint::new("custom_B", "b < 100"),
];
let result = DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
if let Err(DeltaTableError::InvalidData { violations }) = result {
assert_eq!(violations.len(), 2);
}

// Irrelevant constraints return a different error
let constraints = vec![Constraint::new("custom_c", "c > 2000")];
let result = DeltaDataChecker::new_with_constraints(constraints)
.check_batch(&batch)
.await;
assert!(result.is_err());
}

#[test]
fn roundtrip_test_delta_exec_plan() {
let ctx = SessionContext::new();
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! The Kernel module contains all the logic for reading and processing the Delta Lake transaction log.
use delta_kernel::engine::arrow_expression::ArrowExpressionHandler;
use std::sync::LazyLock;
use std::{any::Any, sync::LazyLock};

pub mod arrow;
pub mod error;
Expand All @@ -21,6 +21,8 @@ pub trait DataCheck {
fn get_name(&self) -> &str;
/// The SQL expression to use for the check
fn get_expression(&self) -> &str;

fn as_any(&self) -> &dyn Any;
}

static ARROW_HANDLER: LazyLock<ArrowExpressionHandler> =
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/kernel/models/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Delta table schema
use std::any::Any;
use std::sync::Arc;

pub use delta_kernel::schema::{
Expand Down Expand Up @@ -44,6 +45,10 @@ impl DataCheck for Invariant {
fn get_expression(&self) -> &str {
&self.invariant_sql
}

fn as_any(&self) -> &dyn Any {
self
}
}

/// Trait to add convenience functions to struct type
Expand Down
26 changes: 26 additions & 0 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,32 @@ mod tests {
.to_owned()
}

#[tokio::test]
async fn test_get_constraints_with_correct_names() -> DeltaResult<()> {
// The key of a constraint is allowed to be custom
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#check-constraints
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.await?;
let table = DeltaOps(write);

let constraint = table
.add_constraint()
.with_constraint("my_custom_constraint", "value < 100")
.await;
assert!(constraint.is_ok());
let constraints = constraint
.unwrap()
.state
.unwrap()
.table_config()
.get_constraints();
assert!(constraints.len() == 1);
assert_eq!(constraints[0].name, "my_custom_constraint");
Ok(())
}

#[tokio::test]
async fn add_constraint_with_invalid_data() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ impl TableConfig<'_> {
.iter()
.filter_map(|(field, value)| {
if field.starts_with("delta.constraints") {
value.as_ref().map(|f| Constraint::new("*", f))
let constraint_name = field.replace("delta.constraints.", "");
value.as_ref().map(|f| Constraint::new(&constraint_name, f))
} else {
None
}
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Delta Table read and write implementation
use std::any::Any;
use std::cmp::{min, Ordering};
use std::collections::HashMap;
use std::fmt;
Expand Down Expand Up @@ -155,6 +156,10 @@ impl DataCheck for Constraint {
fn get_expression(&self) -> &str {
&self.expr
}

fn as_any(&self) -> &dyn Any {
self
}
}

/// A generated column
Expand Down Expand Up @@ -195,6 +200,10 @@ impl DataCheck for GeneratedColumn {
fn get_expression(&self) -> &str {
&self.validation_expr
}

fn as_any(&self) -> &dyn Any {
self
}
}

/// Return partition fields along with their data type from the current schema.
Expand Down

0 comments on commit f013cbc

Please sign in to comment.