Skip to content

Commit

Permalink
feat: merge sink node
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuliquan authored and mwylde committed Dec 2, 2024
1 parent 29e96b0 commit 993b24d
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 25 deletions.
1 change: 1 addition & 0 deletions crates/arroyo-planner/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ pub(crate) enum NamedNode {
Source(TableReference),
Watermark(TableReference),
RemoteTable(TableReference),
Sink(TableReference),
}

struct ArroyoExtensionPlanner {}
Expand Down
42 changes: 19 additions & 23 deletions crates/arroyo-planner/src/extension/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arroyo_rpc::{
df::{ArroyoSchema, ArroyoSchemaRef},
UPDATING_META_FIELD,
};
use datafusion::common::{internal_err, plan_err, DFSchemaRef, Result, TableReference};
use datafusion::common::{plan_err, DFSchemaRef, Result, TableReference};

use datafusion::logical_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};

Expand All @@ -29,10 +29,10 @@ pub(crate) struct SinkExtension {
pub(crate) name: TableReference,
pub(crate) table: Table,
pub(crate) schema: DFSchemaRef,
input: Arc<LogicalPlan>,
inputs: Arc<Vec<LogicalPlan>>,
}

multifield_partial_ord!(SinkExtension, name, input);
multifield_partial_ord!(SinkExtension, name, inputs);

impl SinkExtension {
pub fn new(
Expand Down Expand Up @@ -76,11 +76,12 @@ impl SinkExtension {
}
Self::add_remote_if_necessary(&schema, &mut input);

let inputs = Arc::new(vec![(*input).clone()]);
Ok(Self {
name,
table,
schema,
input,
inputs,
})
}

Expand Down Expand Up @@ -112,7 +113,7 @@ impl UserDefinedLogicalNodeCore for SinkExtension {
}

fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
self.inputs.iter().collect()
}

fn schema(&self) -> &DFSchemaRef {
Expand All @@ -128,22 +129,21 @@ impl UserDefinedLogicalNodeCore for SinkExtension {
}

fn with_exprs_and_inputs(&self, _exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
if inputs.len() != 1 {
return internal_err!("input size inconsistent");
}

Ok(Self {
name: self.name.clone(),
table: self.table.clone(),
schema: self.schema.clone(),
input: Arc::new(inputs[0].clone()),
inputs: Arc::new(inputs),
})
}
}

impl ArroyoExtension for SinkExtension {
fn node_name(&self) -> Option<NamedNode> {
None
match &self.table {
Table::PreviewSink { .. } => None,
_ => Some(NamedNode::Sink(self.name.clone())),
}
}

fn plan_node(
Expand All @@ -152,12 +152,6 @@ impl ArroyoExtension for SinkExtension {
index: usize,
input_schemas: Vec<ArroyoSchemaRef>,
) -> Result<NodeWithIncomingEdges> {
if input_schemas.len() != 1 {
return plan_err!("sink should have exactly one input");
}
// should have exactly one input
let input_schema = input_schemas[0].clone();

let operator_config = (self
.table
.connector_op()
Expand All @@ -170,16 +164,18 @@ impl ArroyoExtension for SinkExtension {
parallelism: 1,
operator_config,
};
let edge = LogicalEdge::project_all(LogicalEdgeType::Forward, (*input_schema).clone());
Ok(NodeWithIncomingEdges {
node,
edges: vec![edge],
})
let edges = input_schemas
.into_iter()
.map(|input_schema| {
LogicalEdge::project_all(LogicalEdgeType::Forward, (*input_schema).clone())
})
.collect();
Ok(NodeWithIncomingEdges { node, edges })
}

fn output_schema(&self) -> ArroyoSchema {
// this is kinda fake?
ArroyoSchema::from_schema_keys(Arc::new(self.input.schema().as_ref().into()), vec![])
ArroyoSchema::from_schema_keys(Arc::new(self.inputs[0].schema().as_ref().into()), vec![])
.unwrap()
}
}
51 changes: 49 additions & 2 deletions crates/arroyo-planner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use arrow::datatypes::{self, DataType};
use arrow_schema::{Field, FieldRef, Schema};
use arroyo_datastream::WindowType;

use builder::NamedNode;
use datafusion::common::tree_node::TreeNode;
use datafusion::common::{not_impl_err, plan_err, Column, DFSchema, Result, ScalarValue};
use datafusion::datasource::DefaultTableSource;
#[allow(deprecated)]
Expand All @@ -33,13 +35,15 @@ use datafusion::sql::{planner::ContextProvider, sqlparser, TableReference};

use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::{
create_udaf, Expr, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
WindowUDF,
create_udaf, Expr, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, Signature,
UserDefinedLogicalNode, Volatility, WindowUDF,
};

use datafusion::logical_expr::{AggregateUDF, TableSource};
use extension::ArroyoExtension;
use logical::LogicalBatchInput;

use rewriters::SinkInputRewriter;
use schemas::window_arrow_struct;
use tables::{Insert, Table};

Expand Down Expand Up @@ -633,6 +637,45 @@ pub fn rewrite_plan(
Ok(rewritten_plan.data)
}

fn build_sink_inputs(extensions: &[LogicalPlan]) -> HashMap<NamedNode, Vec<LogicalPlan>> {
let mut sink_inputs = HashMap::<NamedNode, Vec<LogicalPlan>>::new();
for extension in extensions.iter() {
if let LogicalPlan::Extension(extension) = extension {
if let Some(sink_node) = extension.node.as_any().downcast_ref::<SinkExtension>() {
if let Some(named_node) = sink_node.node_name() {
let inputs = sink_node
.inputs()
.into_iter()
.cloned()
.collect::<Vec<LogicalPlan>>();
sink_inputs.entry(named_node).or_default().extend(inputs);
}
}
}
}
sink_inputs
}

/// rewrite_sinks will rewrite sink's inputs and remove duplicated sinks
/// Collect inputs of [`SinkExtension`], it's help to merge [`SinkExtension`] with same table.
/// Each `SinkExtension` can get itself inputs (is merged previously) by named_node,
/// input [`SinkExtension`]'s named node which get by `named_node()` to get inputs.
fn rewrite_sinks(extensions: Vec<LogicalPlan>) -> Result<Vec<LogicalPlan>> {
let mut sink_inputs = build_sink_inputs(&extensions);
let mut new_extensions = vec![];
for extension in extensions {
let mut is_rewrited = false;
let result = extension.rewrite(&mut SinkInputRewriter::new(
&mut sink_inputs,
&mut is_rewrited,
))?;
if !(is_rewrited) {
new_extensions.push(result.data);
}
}
Ok(new_extensions)
}

fn try_handle_set_variable(
statement: &Statement,
schema_provider: &mut ArroyoSchemaProvider,
Expand Down Expand Up @@ -798,6 +841,10 @@ pub async fn parse_and_get_arrow_program(
node: Arc::new(sink?),
}));
}

// rewrite sink's inputs, and remove duplicated sink
let extensions = rewrite_sinks(extensions)?;

let mut plan_to_graph_visitor = PlanToGraphVisitor::new(&schema_provider, &session_state);
for extension in extensions {
plan_to_graph_visitor.add_plan(extension)?;
Expand Down
42 changes: 42 additions & 0 deletions crates/arroyo-planner/src/rewriters.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::builder::NamedNode;
use crate::extension::debezium::DebeziumUnrollingExtension;
use crate::extension::remote_table::RemoteTableExtension;
use crate::extension::sink::SinkExtension;
use crate::extension::table_source::TableSourceExtension;
use crate::extension::watermark_node::WatermarkNode;
use crate::extension::ArroyoExtension;
use crate::schemas::add_timestamp_field;
use crate::tables::ConnectorTable;
use crate::tables::FieldSpec;
Expand All @@ -15,6 +17,7 @@ use crate::{
use arrow_schema::DataType;
use arroyo_rpc::TIMESTAMP_FIELD;
use arroyo_rpc::UPDATING_META_FIELD;
use datafusion::logical_expr::UserDefinedLogicalNode;

use crate::extension::AsyncUDFExtension;
use arroyo_udf_host::parse::{AsyncOptions, UdfType};
Expand All @@ -29,6 +32,7 @@ use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::{
BinaryExpr, ColumnUnnestList, Expr, Extension, LogicalPlan, Projection, TableScan, Unnest,
};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -685,3 +689,41 @@ impl TreeNodeRewriter for TimeWindowNullCheckRemover {
Ok(Transformed::no(node))
}
}

type SinkInputs = HashMap<NamedNode, Vec<LogicalPlan>>;

pub(crate) struct SinkInputRewriter<'a> {
sink_inputs: &'a mut SinkInputs,
is_rewrited: &'a mut bool,
}

impl<'a> SinkInputRewriter<'a> {
pub(crate) fn new(sink_inputs: &'a mut SinkInputs, is_rewrited: &'a mut bool) -> Self {
Self {
sink_inputs,
is_rewrited,
}
}
}

impl<'a> TreeNodeRewriter for SinkInputRewriter<'a> {
type Node = LogicalPlan;

fn f_down(&mut self, node: Self::Node) -> DFResult<Transformed<Self::Node>> {
if let LogicalPlan::Extension(extension) = &node {
if let Some(sink_node) = extension.node.as_any().downcast_ref::<SinkExtension>() {
if let Some(named_node) = sink_node.node_name() {
if let Some(inputs) = self.sink_inputs.remove(&named_node) {
let extension = LogicalPlan::Extension(Extension {
node: sink_node.with_exprs_and_inputs(vec![], inputs)?,
});
return Ok(Transformed::new(extension, true, TreeNodeRecursion::Jump));
} else {
*self.is_rewrited = true;
}
}
}
}
Ok(Transformed::no(node))
}
}
25 changes: 25 additions & 0 deletions crates/arroyo-planner/src/test/queries/test_merge_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE TABLE cars (
timestamp TIMESTAMP,
driver_id BIGINT,
event_type TEXT,
location TEXT
) WITH (
connector = 'single_file',
path = 'cars.json',
format = 'json',
type = 'source'
);

CREATE TABLE cars_output (
timestamp TIMESTAMP,
driver_id BIGINT,
event_type TEXT,
location TEXT
) WITH (
connector = 'single_file',
path = 'cars_output.json',
format = 'json',
type = 'sink'
);
INSERT INTO cars_output SELECT * FROM cars WHERE driver_id = 100 AND event_type = 'pickup';
INSERT INTO cars_output SELECT * FROM cars WHERE driver_id = 101 AND event_type = 'dropoff';
78 changes: 78 additions & 0 deletions crates/arroyo-sql-testing/golden_outputs/test_merge_sink.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{"timestamp": "2023-09-18T14:28:56", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"}
{"timestamp": "2023-09-18T14:49:46", "driver_id": 100, "event_type": "pickup", "location": "Chinatown"}
{"timestamp": "2023-09-18T15:30:11", "driver_id": 100, "event_type": "pickup", "location": "Parkside"}
{"timestamp": "2023-09-18T16:14:44", "driver_id": 100, "event_type": "pickup", "location": "Portola"}
{"timestamp": "2023-09-18T16:49:39", "driver_id": 100, "event_type": "pickup", "location": "Portola"}
{"timestamp": "2023-09-18T17:41:24", "driver_id": 100, "event_type": "pickup", "location": "Potrero Hill"}
{"timestamp": "2023-09-18T18:07:14", "driver_id": 100, "event_type": "pickup", "location": "Noe Valley"}
{"timestamp": "2023-09-18T18:43:30", "driver_id": 100, "event_type": "pickup", "location": "Fillmore"}
{"timestamp": "2023-09-18T19:16:04", "driver_id": 100, "event_type": "pickup", "location": "Nob Hill"}
{"timestamp": "2023-09-18T19:44:47", "driver_id": 100, "event_type": "pickup", "location": "Potrero Hill"}
{"timestamp": "2023-09-18T20:21:16", "driver_id": 100, "event_type": "pickup", "location": "Yerba Buena"}
{"timestamp": "2023-09-18T21:09:21", "driver_id": 100, "event_type": "pickup", "location": "Ingleside"}
{"timestamp": "2023-09-18T21:55:11", "driver_id": 100, "event_type": "pickup", "location": "Noe Valley"}
{"timestamp": "2023-09-18T22:32:54", "driver_id": 100, "event_type": "pickup", "location": "Potrero Hill"}
{"timestamp": "2023-09-18T23:16:13", "driver_id": 100, "event_type": "pickup", "location": "SOMA"}
{"timestamp": "2023-09-19T00:02:29", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"}
{"timestamp": "2023-09-19T00:50:37", "driver_id": 100, "event_type": "pickup", "location": "Chinatown"}
{"timestamp": "2023-09-19T01:10:47", "driver_id": 100, "event_type": "pickup", "location": "Japantown"}
{"timestamp": "2023-09-19T02:12:39", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"}
{"timestamp": "2023-09-19T02:41:42", "driver_id": 100, "event_type": "pickup", "location": "Tenderloin"}
{"timestamp": "2023-09-19T03:24:47", "driver_id": 100, "event_type": "pickup", "location": "Asbury Heights"}
{"timestamp": "2023-09-19T03:36:12", "driver_id": 100, "event_type": "pickup", "location": "Parkside"}
{"timestamp": "2023-09-19T03:53:34", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"}
{"timestamp": "2023-09-19T04:41:47", "driver_id": 100, "event_type": "pickup", "location": "Russian Hill"}
{"timestamp": "2023-09-19T05:46:00", "driver_id": 100, "event_type": "pickup", "location": "Dogpatch"}
{"timestamp": "2023-09-19T06:47:18", "driver_id": 100, "event_type": "pickup", "location": "Chinatown"}
{"timestamp": "2023-09-19T07:44:33", "driver_id": 100, "event_type": "pickup", "location": "Richmond"}
{"timestamp": "2023-09-19T08:49:31", "driver_id": 100, "event_type": "pickup", "location": "Tenderloin"}
{"timestamp": "2023-09-19T09:27:13", "driver_id": 100, "event_type": "pickup", "location": "SOMA"}
{"timestamp": "2023-09-19T09:46:19", "driver_id": 100, "event_type": "pickup", "location": "SOMA"}
{"timestamp": "2023-09-19T10:27:22", "driver_id": 100, "event_type": "pickup", "location": "Mission"}
{"timestamp": "2023-09-19T10:59:30", "driver_id": 100, "event_type": "pickup", "location": "Asbury Heights"}
{"timestamp": "2023-09-19T11:24:27", "driver_id": 100, "event_type": "pickup", "location": "Dogpatch"}
{"timestamp": "2023-09-19T11:53:56", "driver_id": 100, "event_type": "pickup", "location": "Sunset"}
{"timestamp": "2023-09-19T12:28:58", "driver_id": 100, "event_type": "pickup", "location": "Ingleside"}
{"timestamp": "2023-09-19T13:03:59", "driver_id": 100, "event_type": "pickup", "location": "Yerba Buena"}
{"timestamp": "2023-09-19T14:07:16", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"}
{"timestamp": "2023-09-18T14:36:19", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"}
{"timestamp": "2023-09-18T14:56:01", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"}
{"timestamp": "2023-09-18T15:46:32", "driver_id": 101, "event_type": "dropoff", "location": "Yerba Buena"}
{"timestamp": "2023-09-18T16:06:24", "driver_id": 101, "event_type": "dropoff", "location": "Richmond"}
{"timestamp": "2023-09-18T17:14:04", "driver_id": 101, "event_type": "dropoff", "location": "Bayview"}
{"timestamp": "2023-09-18T17:59:08", "driver_id": 101, "event_type": "dropoff", "location": "Treasure Island"}
{"timestamp": "2023-09-18T18:24:20", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"}
{"timestamp": "2023-09-18T18:47:15", "driver_id": 101, "event_type": "dropoff", "location": "Fillmore"}
{"timestamp": "2023-09-18T19:36:34", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-18T20:33:46", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"}
{"timestamp": "2023-09-18T20:46:17", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"}
{"timestamp": "2023-09-18T21:14:45", "driver_id": 101, "event_type": "dropoff", "location": "Dogpatch"}
{"timestamp": "2023-09-18T22:15:49", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-18T22:39:11", "driver_id": 101, "event_type": "dropoff", "location": "Chinatown"}
{"timestamp": "2023-09-18T23:35:49", "driver_id": 101, "event_type": "dropoff", "location": "SOMA"}
{"timestamp": "2023-09-18T23:49:59", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-19T00:20:59", "driver_id": 101, "event_type": "dropoff", "location": "Yerba Buena"}
{"timestamp": "2023-09-19T00:59:00", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"}
{"timestamp": "2023-09-19T01:49:47", "driver_id": 101, "event_type": "dropoff", "location": "Portola"}
{"timestamp": "2023-09-19T02:05:42", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"}
{"timestamp": "2023-09-19T02:39:47", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"}
{"timestamp": "2023-09-19T02:56:49", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"}
{"timestamp": "2023-09-19T03:38:08", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-19T04:38:17", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"}
{"timestamp": "2023-09-19T04:58:07", "driver_id": 101, "event_type": "dropoff", "location": "Treasure Island"}
{"timestamp": "2023-09-19T05:42:13", "driver_id": 101, "event_type": "dropoff", "location": "Yerba Buena"}
{"timestamp": "2023-09-19T06:18:40", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"}
{"timestamp": "2023-09-19T06:46:19", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"}
{"timestamp": "2023-09-19T07:01:08", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"}
{"timestamp": "2023-09-19T07:53:38", "driver_id": 101, "event_type": "dropoff", "location": "Bayview"}
{"timestamp": "2023-09-19T08:12:35", "driver_id": 101, "event_type": "dropoff", "location": "Nob Hill"}
{"timestamp": "2023-09-19T09:17:21", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-19T09:31:35", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-19T10:16:05", "driver_id": 101, "event_type": "dropoff", "location": "Nob Hill"}
{"timestamp": "2023-09-19T10:52:37", "driver_id": 101, "event_type": "dropoff", "location": "Portola"}
{"timestamp": "2023-09-19T11:23:49", "driver_id": 101, "event_type": "dropoff", "location": "Treasure Island"}
{"timestamp": "2023-09-19T12:14:29", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"}
{"timestamp": "2023-09-19T12:35:29", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"}
{"timestamp": "2023-09-19T13:08:40", "driver_id": 101, "event_type": "dropoff", "location": "Fillmore"}
{"timestamp": "2023-09-19T13:29:18", "driver_id": 101, "event_type": "dropoff", "location": "Tenderloin"}
{"timestamp": "2023-09-19T14:18:26", "driver_id": 101, "event_type": "dropoff", "location": "Potrero Hill"}
Loading

0 comments on commit 993b24d

Please sign in to comment.