Skip to content

Commit

Permalink
feat: streaming support
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Jan 21, 2025
1 parent 82ef6bf commit 92a8f7b
Show file tree
Hide file tree
Showing 10 changed files with 440 additions and 81 deletions.
110 changes: 109 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,23 @@ use datafusion::execution::context::{SessionConfig, SessionContext, SessionState
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion_common::project_schema;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
TableReference, ToDFSchema,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility};
use datafusion_physical_expr::{create_physical_expr, create_physical_exprs, PhysicalExpr};
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::limit::LocalLimitExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
Statistics,
Expand All @@ -71,7 +76,9 @@ use either::Either;
use futures::TryStreamExt;
use itertools::Itertools;
use object_store::ObjectMeta;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};

use url::Url;

use crate::delta_datafusion::expr::parse_predicate_expression;
Expand Down Expand Up @@ -839,6 +846,107 @@ impl TableProvider for DeltaTableProvider {
}
}

#[derive(Debug)]
pub struct LazyTableProvider {
schema: Arc<ArrowSchema>,
batches: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
}

impl LazyTableProvider {
/// Build a DeltaTableProvider
pub fn try_new(
schema: Arc<ArrowSchema>,
batches: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
) -> DeltaResult<Self> {
Ok(LazyTableProvider { schema, batches })
}
}

#[async_trait]
impl TableProvider for LazyTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> Arc<ArrowSchema> {
self.schema.clone()
}

fn table_type(&self) -> TableType {
TableType::Base
}

fn get_table_definition(&self) -> Option<&str> {
None
}

fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
None
}

async fn scan(
&self,
session: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let mut plan: Arc<dyn ExecutionPlan> = Arc::new(LazyMemoryExec::try_new(
self.schema(),
self.batches.clone(),
)?);

let df_schema: DFSchema = plan.schema().try_into()?;

if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
let physical_expr =
create_physical_expr(&filter_expr, &df_schema, &ExecutionProps::new())?;
plan = Arc::new(FilterExec::try_new(physical_expr, plan)?);
}

if let Some(projection) = projection {
let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
if projection != &current_projection {
let execution_props = &ExecutionProps::new();
let fields: DeltaResult<Vec<(Arc<dyn PhysicalExpr>, String)>> = projection
.into_iter()
.map(|i| {
let (table_ref, field) = df_schema.qualified_field(*i);
create_physical_expr(
&Expr::Column(Column::from((table_ref, field))),
&df_schema,
execution_props,
)
.map(|expr| (expr, field.name().clone()))
.map_err(DeltaTableError::from)
})
.collect();
plan = Arc::new(ProjectionExec::try_new(fields?, plan)?);
}
}

if let Some(limit) = limit {
plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(limit)))
};

Ok(plan)
}

fn supports_filters_pushdown(
&self,
filter: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(filter
.iter()
.map(|_| TableProviderFilterPushDown::Inexact)
.collect())
}

fn statistics(&self) -> Option<Statistics> {
None
}
}

// TODO: this will likely also need to perform column mapping later when we support reader protocol v2
/// A wrapper for parquet scans
#[derive(Debug)]
Expand Down
98 changes: 58 additions & 40 deletions crates/core/src/operations/merge/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,36 +182,39 @@ pub(crate) fn generalize_filter(
source_name: &TableReference,
target_name: &TableReference,
placeholders: &mut Vec<PredicatePlaceholder>,
streaming_source: bool,
) -> Option<Expr> {
match predicate {
Expr::BinaryExpr(binary) => {
if references_table(&binary.right, source_name).has_reference() {
if let ReferenceTableCheck::HasReference(left_target) =
references_table(&binary.left, target_name)
{
return construct_placeholder(
binary,
false,
partition_columns.contains(&left_target),
left_target,
placeholders,
);
if !streaming_source {
if references_table(&binary.right, source_name).has_reference() {
if let ReferenceTableCheck::HasReference(left_target) =
references_table(&binary.left, target_name)
{
return construct_placeholder(
binary,
false,
partition_columns.contains(&left_target),
left_target,
placeholders,
);
}
return None;
}
return None;
}
if references_table(&binary.left, source_name).has_reference() {
if let ReferenceTableCheck::HasReference(right_target) =
references_table(&binary.right, target_name)
{
return construct_placeholder(
binary,
true,
partition_columns.contains(&right_target),
right_target,
placeholders,
);
if references_table(&binary.left, source_name).has_reference() {
if let ReferenceTableCheck::HasReference(right_target) =
references_table(&binary.right, target_name)
{
return construct_placeholder(
binary,
true,
partition_columns.contains(&right_target),
right_target,
placeholders,
);
}
return None;
}
return None;
}

let left = generalize_filter(
Expand All @@ -220,13 +223,15 @@ pub(crate) fn generalize_filter(
source_name,
target_name,
placeholders,
streaming_source,
);
let right = generalize_filter(
*binary.right,
partition_columns,
source_name,
target_name,
placeholders,
streaming_source,
);

match (left, right) {
Expand Down Expand Up @@ -258,6 +263,7 @@ pub(crate) fn generalize_filter(
source_name,
target_name,
placeholders,
streaming_source,
)?;

let mut list_expr = Vec::new();
Expand All @@ -272,6 +278,7 @@ pub(crate) fn generalize_filter(
source_name,
target_name,
placeholders,
streaming_source,
) {
list_expr.push(item)
}
Expand All @@ -291,19 +298,23 @@ pub(crate) fn generalize_filter(
}
other => match references_table(&other, source_name) {
ReferenceTableCheck::HasReference(col) => {
let placeholder_name = format!("{col}_{}", placeholders.len());

let placeholder = Expr::Placeholder(Placeholder {
id: placeholder_name.clone(),
data_type: None,
});

placeholders.push(PredicatePlaceholder {
expr: other,
alias: placeholder_name,
is_aggregate: true,
});
Some(placeholder)
if !streaming_source {
let placeholder_name = format!("{col}_{}", placeholders.len());

let placeholder = Expr::Placeholder(Placeholder {
id: placeholder_name.clone(),
data_type: None,
});

placeholders.push(PredicatePlaceholder {
expr: other,
alias: placeholder_name,
is_aggregate: true,
});
Some(placeholder)
} else {
None
}
}
ReferenceTableCheck::NoReference => Some(other),
ReferenceTableCheck::Unknown => None,
Expand All @@ -318,6 +329,7 @@ pub(crate) async fn try_construct_early_filter(
source: &LogicalPlan,
source_name: &TableReference,
target_name: &TableReference,
streaming_source: bool,
) -> DeltaResult<Option<Expr>> {
let table_metadata = table_snapshot.metadata();
let partition_columns = &table_metadata.partition_columns;
Expand All @@ -330,10 +342,11 @@ pub(crate) async fn try_construct_early_filter(
source_name,
target_name,
&mut placeholders,
streaming_source,
) {
None => Ok(None),
Some(filter) => {
if placeholders.is_empty() {
if placeholders.is_empty() || streaming_source {
// if we haven't recognised any source predicates in the join predicate, return our filter with static only predicates
Ok(Some(filter))
} else {
Expand Down Expand Up @@ -382,7 +395,6 @@ pub(crate) async fn try_construct_early_filter(
}
}
}

#[cfg(test)]
mod tests {
use crate::operations::merge::tests::setup_table;
Expand Down Expand Up @@ -457,6 +469,7 @@ mod tests {
&source,
&source_name,
&target_name,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -554,6 +567,7 @@ mod tests {
&source,
&source_name,
&target_name,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -632,6 +646,7 @@ mod tests {
&source,
&source_name,
&target_name,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -711,6 +726,7 @@ mod tests {
&source_plan,
&source_name,
&target_name,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -807,6 +823,7 @@ mod tests {
&source_plan,
&source_name,
&target_name,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -908,6 +925,7 @@ mod tests {
&source_plan,
&source_name,
&target_name,
false,
)
.await
.unwrap();
Expand Down
Loading

0 comments on commit 92a8f7b

Please sign in to comment.