From 7b8fd9c32c3470064b535c183aa4f0dad5746d43 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 18 Jan 2025 20:14:44 +0100 Subject: [PATCH 1/2] refactor: combine normal and cdf plan until write Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- crates/core/src/operations/merge/mod.rs | 81 +++---- crates/core/src/operations/merge/writer.rs | 236 +++++++++++++++++++++ crates/core/src/operations/write.rs | 6 +- 3 files changed, 265 insertions(+), 58 deletions(-) create mode 100644 crates/core/src/operations/merge/writer.rs diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index b9d8b2f95f..c032a321e1 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -80,14 +80,16 @@ use crate::logstore::LogStoreRef; use crate::operations::cdc::*; use crate::operations::merge::barrier::find_node; use crate::operations::transaction::CommitBuilder; -use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig}; +use crate::operations::write::WriterStatsConfig; use crate::protocol::{DeltaOperation, MergePredicate}; use crate::table::state::DeltaTableState; use crate::table::GeneratedColumn; use crate::{DeltaResult, DeltaTable, DeltaTableError}; +use writer::write_execution_plan_v2; mod barrier; mod filter; +mod writer; const SOURCE_COLUMN: &str = "__delta_rs_source"; const TARGET_COLUMN: &str = "__delta_rs_target"; @@ -1205,6 +1207,8 @@ async fn execute( }), }); + // We should observe the metrics before we union the merge plan with the cdf_merge plan + // so that we get the metrics only for the merge plan. let operation_count = LogicalPlan::Extension(Extension { node: Arc::new(MetricObserver { id: OUTPUT_COUNT_ID.into(), @@ -1222,7 +1226,7 @@ async fn execute( .clone() .filter(col(DELETE_COLUMN))? .select(write_projection.clone())? - .with_column(crate::operations::cdc::CDC_COLUMN_NAME, lit("delete"))?, + .with_column(CDC_COLUMN_NAME, lit("delete"))?, ); } @@ -1235,7 +1239,7 @@ async fn execute( lit(5), ))?; - let mut cdc_insert_df = cdc_projection + let cdc_insert_df = cdc_projection .clone() .filter( col(SOURCE_COLUMN) @@ -1245,30 +1249,16 @@ async fn execute( .select(write_projection.clone())? .with_column(CDC_COLUMN_NAME, lit("insert"))?; - cdc_insert_df = add_generated_columns( - cdc_insert_df, - &generated_col_expressions, - &missing_generated_columns, - &state, - )?; - change_data.push(cdc_insert_df); - let mut after = cdc_projection + let after = cdc_projection .clone() .filter(col(TARGET_COLUMN).is_true())? .select(write_projection.clone())?; - after = add_generated_columns( - after, - &generated_col_expressions, - &missing_generated_columns, - &state, - )?; - // Extra select_columns is required so that before and after have same schema order // DataFusion doesn't have UnionByName yet, see https://github.com/apache/datafusion/issues/12650 - let mut before = cdc_projection + let before = cdc_projection .clone() .filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())? .select( @@ -1288,18 +1278,26 @@ async fn execute( .collect::>(), )?; - before = add_generated_columns( - before, - &generated_col_expressions, - &missing_generated_columns, - &state, - )?; - let tracker = CDCTracker::new(before, after); change_data.push(tracker.collect()?); } let mut project = filtered.clone().select(write_projection)?; + + if should_cdc && !change_data.is_empty() { + let mut df = change_data + .pop() + .expect("change_data should never be empty"); + // Accumulate all the changes together into a single data frame to produce the necessary + // change data files + for change in change_data { + df = df.union(change)?; + } + project = project + .with_column(CDC_COLUMN_NAME, lit(ScalarValue::Null))? + .union(df)?; + } + project = add_generated_columns( project, &generated_col_expressions, @@ -1327,7 +1325,7 @@ async fn execute( ); let rewrite_start = Instant::now(); - let mut add_actions = write_execution_plan( + let mut actions: Vec = write_execution_plan_v2( Some(&snapshot), state.clone(), write, @@ -1338,38 +1336,11 @@ async fn execute( writer_properties.clone(), writer_stats_config.clone(), None, + should_cdc, // if true, write execution plan splits batches in [normal, cdc] data before writing ) .await?; - if should_cdc && !change_data.is_empty() { - let mut df = change_data - .pop() - .expect("change_data should never be empty"); - // Accumulate all the changes together into a single data frame to produce the necessary - // change data files - for change in change_data { - df = df.union(change)?; - } - add_actions.extend( - write_execution_plan_cdc( - Some(&snapshot), - state.clone(), - df.create_physical_plan().await?, - table_partition_cols.clone(), - log_store.object_store(Some(operation_id)), - Some(snapshot.table_config().target_file_size() as usize), - None, - writer_properties, - writer_stats_config, - None, - ) - .await?, - ); - } - metrics.rewrite_time_ms = Instant::now().duration_since(rewrite_start).as_millis() as u64; - - let mut actions: Vec = add_actions.clone(); metrics.num_target_files_added = actions.len(); let survivors = barrier diff --git a/crates/core/src/operations/merge/writer.rs b/crates/core/src/operations/merge/writer.rs new file mode 100644 index 0000000000..a477286241 --- /dev/null +++ b/crates/core/src/operations/merge/writer.rs @@ -0,0 +1,236 @@ +//! Writer for MERGE operation, can write normal and CDF data at same time + +use std::sync::Arc; +use std::vec; + +use arrow::compute::concat_batches; +use arrow_array::RecordBatch; +use arrow_schema::{Schema, SchemaRef as ArrowSchemaRef}; +use datafusion::catalog::TableProvider; +use datafusion::datasource::MemTable; +use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion_expr::col; +use datafusion_physical_plan::ExecutionPlan; +use futures::StreamExt; +use object_store::prefix::PrefixStore; +use parquet::file::properties::WriterProperties; +use tracing::log::*; + +use crate::operations::cdc::CDC_COLUMN_NAME; +use crate::operations::writer::{DeltaWriter, WriterConfig}; + +use crate::delta_datafusion::DeltaDataChecker; +use crate::errors::DeltaResult; +use crate::kernel::{Action, AddCDCFile, StructType, StructTypeExt}; + +use crate::operations::write::{WriteError, WriterStatsConfig}; +use crate::storage::ObjectStoreRef; +use crate::table::state::DeltaTableState; + +use tokio::sync::mpsc::Sender; + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn write_execution_plan_v2( + snapshot: Option<&DeltaTableState>, + state: SessionState, + plan: Arc, + partition_columns: Vec, + object_store: ObjectStoreRef, + target_file_size: Option, + write_batch_size: Option, + writer_properties: Option, + writer_stats_config: WriterStatsConfig, + sender: Option>, + contains_cdc: bool, +) -> DeltaResult> { + // We always take the plan Schema since the data may contain Large/View arrow types, + // the schema and batches were prior constructed with this in mind. + let schema: ArrowSchemaRef = plan.schema(); + let checker = if let Some(snapshot) = snapshot { + DeltaDataChecker::new(snapshot) + } else { + debug!("Using plan schema to derive generated columns, since no snapshot was provided. Implies first write."); + let delta_schema: StructType = schema.as_ref().try_into()?; + DeltaDataChecker::new_with_generated_columns( + delta_schema.get_generated_columns().unwrap_or_default(), + ) + }; + + // Write data to disk + let mut tasks = vec![]; + if !contains_cdc { + for i in 0..plan.properties().output_partitioning().partition_count() { + let inner_plan = plan.clone(); + let inner_schema = schema.clone(); + let task_ctx = Arc::new(TaskContext::from(&state)); + let config = WriterConfig::new( + inner_schema.clone(), + partition_columns.clone(), + writer_properties.clone(), + target_file_size, + write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), + ); + let mut writer = DeltaWriter::new(object_store.clone(), config); + let checker_stream = checker.clone(); + let sender_stream = sender.clone(); + let mut stream = inner_plan.execute(i, task_ctx)?; + + let handle: tokio::task::JoinHandle>> = tokio::task::spawn( + async move { + let sendable = sender_stream.clone(); + while let Some(maybe_batch) = stream.next().await { + let batch = maybe_batch?; + + checker_stream.check_batch(&batch).await?; + + if let Some(s) = sendable.as_ref() { + if let Err(e) = s.send(batch.clone()).await { + error!("Failed to send data to observer: {e:#?}"); + } + } else { + debug!("write_execution_plan_with_predicate did not send any batches, no sender."); + } + writer.write(&batch).await?; + } + let add_actions = writer.close().await; + match add_actions { + Ok(actions) => Ok(actions.into_iter().map(Action::Add).collect::>()), + Err(err) => Err(err), + } + }, + ); + tasks.push(handle); + } + } else { + // Incoming plan contains the normal write_plan unioned with the cdf plan + // we split these batches during the write + let cdf_store = Arc::new(PrefixStore::new(object_store.clone(), "_change_data")); + for i in 0..plan.properties().output_partitioning().partition_count() { + let inner_plan = plan.clone(); + let write_schema = Arc::new(Schema::new( + schema + .clone() + .fields() + .into_iter() + .filter_map(|f| { + if f.name() != CDC_COLUMN_NAME { + Some(f.as_ref().clone()) + } else { + None + } + }) + .collect::>(), + )); + let cdf_schema = schema.clone(); + let task_ctx = Arc::new(TaskContext::from(&state)); + let normal_config = WriterConfig::new( + write_schema.clone(), + partition_columns.clone(), + writer_properties.clone(), + target_file_size, + write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), + ); + + let cdf_config = WriterConfig::new( + cdf_schema.clone(), + partition_columns.clone(), + writer_properties.clone(), + target_file_size, + write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), + ); + + let mut writer = DeltaWriter::new(object_store.clone(), normal_config); + + let mut cdf_writer = DeltaWriter::new(cdf_store.clone(), cdf_config); + + let checker_stream = checker.clone(); + let sender_stream = sender.clone(); + let mut stream = inner_plan.execute(i, task_ctx)?; + + let session_context = SessionContext::new(); + + let handle: tokio::task::JoinHandle>> = tokio::task::spawn( + async move { + let sendable = sender_stream.clone(); + while let Some(maybe_batch) = stream.next().await { + let batch = maybe_batch?; + + // split batch since we unioned upstream the operation write and cdf plan + let table_provider: Arc = Arc::new(MemTable::try_new( + batch.schema(), + vec![vec![batch.clone()]], + )?); + let batch_df = session_context.read_table(table_provider).unwrap(); + + let normal_df = batch_df + .clone() + .filter(col(CDC_COLUMN_NAME).is_null())? + .drop_columns(&[CDC_COLUMN_NAME])?; + let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).is_not_null())?; + + let normal_batch = + concat_batches(&write_schema, &normal_df.collect().await?)?; + checker_stream.check_batch(&normal_batch).await?; + + let cdf_batch = concat_batches(&cdf_schema, &cdf_df.collect().await?)?; + checker_stream.check_batch(&cdf_batch).await?; + + if let Some(s) = sendable.as_ref() { + if let Err(e) = s.send(batch.clone()).await { + error!("Failed to send data to observer: {e:#?}"); + } + } else { + debug!("write_execution_plan_with_predicate did not send any batches, no sender."); + } + writer.write(&normal_batch).await?; + cdf_writer.write(&cdf_batch).await?; + } + let mut add_actions = writer + .close() + .await? + .into_iter() + .map(Action::Add) + .collect::>(); + let cdf_actions = cdf_writer.close().await.map(|v| { + v.into_iter() + .map(|add| { + { + Action::Cdc(AddCDCFile { + // This is a gnarly hack, but the action needs the nested path, not the + // path isnide the prefixed store + path: format!("_change_data/{}", add.path), + size: add.size, + partition_values: add.partition_values, + data_change: false, + tags: add.tags, + }) + } + }) + .collect::>() + })?; + add_actions.extend(cdf_actions); + Ok(add_actions) + }, + ); + tasks.push(handle); + } + } + let actions = futures::future::join_all(tasks) + .await + .into_iter() + .collect::, _>>() + .map_err(|err| WriteError::WriteTask { source: err })? + .into_iter() + .collect::, _>>()? + .concat() + .into_iter() + .collect::>(); + // Collect add actions to add to commit + Ok(actions) +} diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 8d17413c19..914f88887c 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -78,7 +78,7 @@ use crate::DeltaTable; use tokio::sync::mpsc::Sender; #[derive(thiserror::Error, Debug)] -enum WriteError { +pub(crate) enum WriteError { #[error("No data source supplied to write command.")] MissingData, @@ -403,9 +403,9 @@ impl WriteBuilder { #[derive(Clone)] pub struct WriterStatsConfig { /// Number of columns to collect stats for, idx based - num_indexed_cols: i32, + pub num_indexed_cols: i32, /// Optional list of columns which to collect stats for, takes precedende over num_index_cols - stats_columns: Option>, + pub stats_columns: Option>, } impl WriterStatsConfig { From 9c01a9e9a10aed9a78372fb28913f208d8aac4ef Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 20 Jan 2025 22:46:16 +0100 Subject: [PATCH 2/2] refactor: cdf merge, keep all ops in one single df Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- crates/core/src/operations/merge/mod.rs | 145 +++++++++------------ crates/core/src/operations/merge/writer.rs | 18 ++- 2 files changed, 79 insertions(+), 84 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index c032a321e1..70087fb3b3 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -33,6 +33,7 @@ use std::fmt::Debug; use std::sync::Arc; use std::time::Instant; +use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; @@ -56,7 +57,6 @@ use datafusion_expr::{ use filter::try_construct_early_filter; use futures::future::BoxFuture; -use itertools::Itertools; use parquet::file::properties::WriterProperties; use serde::Serialize; use tracing::log::*; @@ -719,7 +719,6 @@ async fn execute( // be disabled in the common case(s) let should_cdc = should_write_cdc(&snapshot)?; // Change data may be collected and then written out at the completion of the merge - let mut change_data = vec![]; if should_cdc { debug!("Executing a merge and I should write CDC!"); @@ -790,7 +789,7 @@ async fn execute( fn add_generated_columns( mut df: DataFrame, generated_cols: &Vec, - generated_cols_missing_in_source: &Vec, + generated_cols_missing_in_source: &[String], state: &SessionState, ) -> DeltaResult { debug!("Generating columns in dataframe"); @@ -1049,6 +1048,7 @@ async fn execute( let mut new_columns = vec![]; let mut write_projection = Vec::new(); + let mut write_projection_with_cdf = Vec::new(); for delta_field in snapshot.schema().fields() { let mut when_expr = Vec::with_capacity(operations_size); @@ -1086,9 +1086,26 @@ async fn execute( Expr::Column(Column::from_name(name.clone())).alias(delta_field.name()), delta_field.data_type().try_into()?, )); + + write_projection_with_cdf.push( + when( + col(CDC_COLUMN_NAME).not_eq(lit("update_preimage")), + cast( + Expr::Column(Column::from_name(name.clone())), + delta_field.data_type().try_into()?, + ), + ) + .otherwise(cast( + Expr::Column(Column::new(Some(target_name.clone()), delta_field.name())), // We take the column from target table + delta_field.data_type().try_into()?, + ))? + .alias(delta_field.name()), + ); new_columns.push((name, case)); } + write_projection_with_cdf.push(col("_change_type")); + let mut insert_when = Vec::with_capacity(ops.len()); let mut insert_then = Vec::with_capacity(ops.len()); @@ -1219,93 +1236,59 @@ async fn execute( let operation_count = DataFrame::new(state.clone(), operation_count); - if should_cdc { - // Create a dataframe containing the CDC deletes which are present at this point - change_data.push( - operation_count - .clone() - .filter(col(DELETE_COLUMN))? - .select(write_projection.clone())? - .with_column(CDC_COLUMN_NAME, lit("delete"))?, - ); - } - - let filtered = operation_count.filter(col(DELETE_COLUMN).is_false())?; - - if should_cdc { - debug!("The merge should triggere a CDC tracking, computing pre/insert/postimage datasets"); - let cdc_projection = filtered.clone().filter(col(OPERATION_COLUMN).not_eq( - // This is a copy operation, but I'm not sure how to turn that enum into an int - lit(5), - ))?; - - let cdc_insert_df = cdc_projection + let mut projected = if should_cdc { + operation_count .clone() - .filter( - col(SOURCE_COLUMN) - .is_true() - .and(col(TARGET_COLUMN).is_null()), + .with_column( + CDC_COLUMN_NAME, + when(col(TARGET_DELETE_COLUMN).is_null(), lit("delete")) // nulls are equal to True + .when(col(DELETE_COLUMN).is_null(), lit("source_delete")) + .when(col(TARGET_COPY_COLUMN).is_null(), lit("copy")) + .when(col(TARGET_INSERT_COLUMN).is_null(), lit("insert")) + .when(col(TARGET_UPDATE_COLUMN).is_null(), lit("update")) + .end()?, )? - .select(write_projection.clone())? - .with_column(CDC_COLUMN_NAME, lit("insert"))?; - - change_data.push(cdc_insert_df); - - let after = cdc_projection - .clone() - .filter(col(TARGET_COLUMN).is_true())? - .select(write_projection.clone())?; - - // Extra select_columns is required so that before and after have same schema order - // DataFusion doesn't have UnionByName yet, see https://github.com/apache/datafusion/issues/12650 - let before = cdc_projection - .clone() - .filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())? - .select( - target_schema - .columns() - .iter() - .filter(|c| c.name != crate::delta_datafusion::PATH_COLUMN) - .map(|c| Expr::Column(c.clone())) - .collect_vec(), + .drop_columns(&["__delta_rs_path"])? // WEIRD bug caused by interaction with unnest_columns, has to be dropped otherwise throws schema error + .with_column( + "__delta_rs_update_expanded", + when( + col(CDC_COLUMN_NAME).eq(lit("update")), + lit(ScalarValue::List(ScalarValue::new_list( + &[ + ScalarValue::Utf8(Some("update_preimage".into())), + ScalarValue::Utf8(Some("update_postimage".into())), + ], + &DataType::List(Field::new("element", DataType::Utf8, false).into()), + true, + ))), + ) + .end()?, )? - .select_columns( - &after - .schema() - .columns() - .iter() - .map(|v| v.name()) - .collect::>(), - )?; - - let tracker = CDCTracker::new(before, after); - change_data.push(tracker.collect()?); - } - - let mut project = filtered.clone().select(write_projection)?; - - if should_cdc && !change_data.is_empty() { - let mut df = change_data - .pop() - .expect("change_data should never be empty"); - // Accumulate all the changes together into a single data frame to produce the necessary - // change data files - for change in change_data { - df = df.union(change)?; - } - project = project - .with_column(CDC_COLUMN_NAME, lit(ScalarValue::Null))? - .union(df)?; - } + .unnest_columns(&["__delta_rs_update_expanded"])? + .with_column( + CDC_COLUMN_NAME, + when( + col(CDC_COLUMN_NAME).eq(lit("update")), + col("__delta_rs_update_expanded"), + ) + .otherwise(col(CDC_COLUMN_NAME))?, + )? + .drop_columns(&["__delta_rs_update_expanded"])? + .select(write_projection_with_cdf)? + } else { + operation_count + .filter(col(DELETE_COLUMN).is_false())? + .select(write_projection)? + }; - project = add_generated_columns( - project, + projected = add_generated_columns( + projected, &generated_col_expressions, &missing_generated_columns, &state, )?; - let merge_final = &project.into_unoptimized_plan(); + let merge_final = &projected.into_unoptimized_plan(); let write = state.create_physical_plan(merge_final).await?; let err = || DeltaTableError::Generic("Unable to locate expected metric node".into()); diff --git a/crates/core/src/operations/merge/writer.rs b/crates/core/src/operations/merge/writer.rs index a477286241..2c563abd10 100644 --- a/crates/core/src/operations/merge/writer.rs +++ b/crates/core/src/operations/merge/writer.rs @@ -9,7 +9,7 @@ use arrow_schema::{Schema, SchemaRef as ArrowSchemaRef}; use datafusion::catalog::TableProvider; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; -use datafusion_expr::col; +use datafusion_expr::{col, lit}; use datafusion_physical_plan::ExecutionPlan; use futures::StreamExt; use object_store::prefix::PrefixStore; @@ -170,9 +170,21 @@ pub(crate) async fn write_execution_plan_v2( let normal_df = batch_df .clone() - .filter(col(CDC_COLUMN_NAME).is_null())? + .filter(col(CDC_COLUMN_NAME).in_list( + vec![lit("delete"), lit("source_delete"), lit("update_preimage")], + true, + ))? .drop_columns(&[CDC_COLUMN_NAME])?; - let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).is_not_null())?; + + let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).in_list( + vec![ + lit("delete"), + lit("insert"), + lit("update_preimage"), + lit("update_postimage"), + ], + false, + ))?; let normal_batch = concat_batches(&write_schema, &normal_df.collect().await?)?;