From 0b5c7d4a5243ce857073005712e0d3085db9ae0c Mon Sep 17 00:00:00 2001 From: zhuliquan Date: Wed, 27 Nov 2024 00:41:45 +0800 Subject: [PATCH 1/2] feat: merge sink node --- crates/arroyo-planner/src/builder.rs | 1 + crates/arroyo-planner/src/extension/sink.rs | 42 +++++----- crates/arroyo-planner/src/lib.rs | 51 +++++++++++- crates/arroyo-planner/src/rewriters.rs | 42 ++++++++++ .../src/test/queries/test_merge_sink.sql | 25 ++++++ .../golden_outputs/test_merge_sink.json | 78 +++++++++++++++++++ .../src/test/queries/test_merge_sink.sql | 25 ++++++ 7 files changed, 239 insertions(+), 25 deletions(-) create mode 100644 crates/arroyo-planner/src/test/queries/test_merge_sink.sql create mode 100644 crates/arroyo-sql-testing/golden_outputs/test_merge_sink.json create mode 100644 crates/arroyo-sql-testing/src/test/queries/test_merge_sink.sql diff --git a/crates/arroyo-planner/src/builder.rs b/crates/arroyo-planner/src/builder.rs index 89251bb0d..f11f30200 100644 --- a/crates/arroyo-planner/src/builder.rs +++ b/crates/arroyo-planner/src/builder.rs @@ -213,6 +213,7 @@ pub(crate) enum NamedNode { Source(TableReference), Watermark(TableReference), RemoteTable(TableReference), + Sink(TableReference), } struct ArroyoExtensionPlanner {} diff --git a/crates/arroyo-planner/src/extension/sink.rs b/crates/arroyo-planner/src/extension/sink.rs index 602dd2bf7..a7e20047b 100644 --- a/crates/arroyo-planner/src/extension/sink.rs +++ b/crates/arroyo-planner/src/extension/sink.rs @@ -5,7 +5,7 @@ use arroyo_rpc::{ df::{ArroyoSchema, ArroyoSchemaRef}, UPDATING_META_FIELD, }; -use datafusion::common::{internal_err, plan_err, DFSchemaRef, Result, TableReference}; +use datafusion::common::{plan_err, DFSchemaRef, Result, TableReference}; use datafusion::logical_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore}; @@ -29,10 +29,10 @@ pub(crate) struct SinkExtension { pub(crate) name: TableReference, pub(crate) table: Table, pub(crate) schema: DFSchemaRef, - input: Arc, + inputs: Arc>, } -multifield_partial_ord!(SinkExtension, name, input); +multifield_partial_ord!(SinkExtension, name, inputs); impl SinkExtension { pub fn new( @@ -76,11 +76,12 @@ impl SinkExtension { } Self::add_remote_if_necessary(&schema, &mut input); + let inputs = Arc::new(vec![(*input).clone()]); Ok(Self { name, table, schema, - input, + inputs, }) } @@ -112,7 +113,7 @@ impl UserDefinedLogicalNodeCore for SinkExtension { } fn inputs(&self) -> Vec<&LogicalPlan> { - vec![&self.input] + self.inputs.iter().collect() } fn schema(&self) -> &DFSchemaRef { @@ -128,22 +129,21 @@ impl UserDefinedLogicalNodeCore for SinkExtension { } fn with_exprs_and_inputs(&self, _exprs: Vec, inputs: Vec) -> Result { - if inputs.len() != 1 { - return internal_err!("input size inconsistent"); - } - Ok(Self { name: self.name.clone(), table: self.table.clone(), schema: self.schema.clone(), - input: Arc::new(inputs[0].clone()), + inputs: Arc::new(inputs), }) } } impl ArroyoExtension for SinkExtension { fn node_name(&self) -> Option { - None + match &self.table { + Table::PreviewSink { .. } => None, + _ => Some(NamedNode::Sink(self.name.clone())), + } } fn plan_node( @@ -152,12 +152,6 @@ impl ArroyoExtension for SinkExtension { index: usize, input_schemas: Vec, ) -> Result { - if input_schemas.len() != 1 { - return plan_err!("sink should have exactly one input"); - } - // should have exactly one input - let input_schema = input_schemas[0].clone(); - let operator_config = (self .table .connector_op() @@ -170,16 +164,18 @@ impl ArroyoExtension for SinkExtension { parallelism: 1, operator_config, }; - let edge = LogicalEdge::project_all(LogicalEdgeType::Forward, (*input_schema).clone()); - Ok(NodeWithIncomingEdges { - node, - edges: vec![edge], - }) + let edges = input_schemas + .into_iter() + .map(|input_schema| { + LogicalEdge::project_all(LogicalEdgeType::Forward, (*input_schema).clone()) + }) + .collect(); + Ok(NodeWithIncomingEdges { node, edges }) } fn output_schema(&self) -> ArroyoSchema { // this is kinda fake? - ArroyoSchema::from_schema_keys(Arc::new(self.input.schema().as_ref().into()), vec![]) + ArroyoSchema::from_schema_keys(Arc::new(self.inputs[0].schema().as_ref().into()), vec![]) .unwrap() } } diff --git a/crates/arroyo-planner/src/lib.rs b/crates/arroyo-planner/src/lib.rs index f1ba69cf5..a1fa3379d 100644 --- a/crates/arroyo-planner/src/lib.rs +++ b/crates/arroyo-planner/src/lib.rs @@ -22,6 +22,8 @@ use arrow::datatypes::{self, DataType}; use arrow_schema::{Field, FieldRef, Schema}; use arroyo_datastream::WindowType; +use builder::NamedNode; +use datafusion::common::tree_node::TreeNode; use datafusion::common::{not_impl_err, plan_err, Column, DFSchema, Result, ScalarValue}; use datafusion::datasource::DefaultTableSource; #[allow(deprecated)] @@ -33,13 +35,15 @@ use datafusion::sql::{planner::ContextProvider, sqlparser, TableReference}; use datafusion::logical_expr::expr::ScalarFunction; use datafusion::logical_expr::{ - create_udaf, Expr, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, Signature, Volatility, - WindowUDF, + create_udaf, Expr, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, Signature, + UserDefinedLogicalNode, Volatility, WindowUDF, }; use datafusion::logical_expr::{AggregateUDF, TableSource}; +use extension::ArroyoExtension; use logical::LogicalBatchInput; +use rewriters::SinkInputRewriter; use schemas::window_arrow_struct; use tables::{Insert, Table}; @@ -633,6 +637,45 @@ pub fn rewrite_plan( Ok(rewritten_plan.data) } +fn build_sink_inputs(extensions: &[LogicalPlan]) -> HashMap> { + let mut sink_inputs = HashMap::>::new(); + for extension in extensions.iter() { + if let LogicalPlan::Extension(extension) = extension { + if let Some(sink_node) = extension.node.as_any().downcast_ref::() { + if let Some(named_node) = sink_node.node_name() { + let inputs = sink_node + .inputs() + .into_iter() + .cloned() + .collect::>(); + sink_inputs.entry(named_node).or_default().extend(inputs); + } + } + } + } + sink_inputs +} + +/// rewrite_sinks will rewrite sink's inputs and remove duplicated sinks +/// Collect inputs of [`SinkExtension`], it's help to merge [`SinkExtension`] with same table. +/// Each `SinkExtension` can get itself inputs (is merged previously) by named_node, +/// input [`SinkExtension`]'s named node which get by `named_node()` to get inputs. +fn rewrite_sinks(extensions: Vec) -> Result> { + let mut sink_inputs = build_sink_inputs(&extensions); + let mut new_extensions = vec![]; + for extension in extensions { + let mut is_rewrited = false; + let result = extension.rewrite(&mut SinkInputRewriter::new( + &mut sink_inputs, + &mut is_rewrited, + ))?; + if !(is_rewrited) { + new_extensions.push(result.data); + } + } + Ok(new_extensions) +} + fn try_handle_set_variable( statement: &Statement, schema_provider: &mut ArroyoSchemaProvider, @@ -798,6 +841,10 @@ pub async fn parse_and_get_arrow_program( node: Arc::new(sink?), })); } + + // rewrite sink's inputs, and remove duplicated sink + let extensions = rewrite_sinks(extensions)?; + let mut plan_to_graph_visitor = PlanToGraphVisitor::new(&schema_provider, &session_state); for extension in extensions { plan_to_graph_visitor.add_plan(extension)?; diff --git a/crates/arroyo-planner/src/rewriters.rs b/crates/arroyo-planner/src/rewriters.rs index 3dfbd8131..304e226c9 100644 --- a/crates/arroyo-planner/src/rewriters.rs +++ b/crates/arroyo-planner/src/rewriters.rs @@ -1,8 +1,10 @@ +use crate::builder::NamedNode; use crate::extension::debezium::DebeziumUnrollingExtension; use crate::extension::remote_table::RemoteTableExtension; use crate::extension::sink::SinkExtension; use crate::extension::table_source::TableSourceExtension; use crate::extension::watermark_node::WatermarkNode; +use crate::extension::ArroyoExtension; use crate::schemas::add_timestamp_field; use crate::tables::ConnectorTable; use crate::tables::FieldSpec; @@ -15,6 +17,7 @@ use crate::{ use arrow_schema::DataType; use arroyo_rpc::TIMESTAMP_FIELD; use arroyo_rpc::UPDATING_META_FIELD; +use datafusion::logical_expr::UserDefinedLogicalNode; use crate::extension::AsyncUDFExtension; use arroyo_udf_host::parse::{AsyncOptions, UdfType}; @@ -29,6 +32,7 @@ use datafusion::logical_expr::expr::ScalarFunction; use datafusion::logical_expr::{ BinaryExpr, ColumnUnnestList, Expr, Extension, LogicalPlan, Projection, TableScan, Unnest, }; +use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -685,3 +689,41 @@ impl TreeNodeRewriter for TimeWindowNullCheckRemover { Ok(Transformed::no(node)) } } + +type SinkInputs = HashMap>; + +pub(crate) struct SinkInputRewriter<'a> { + sink_inputs: &'a mut SinkInputs, + is_rewrited: &'a mut bool, +} + +impl<'a> SinkInputRewriter<'a> { + pub(crate) fn new(sink_inputs: &'a mut SinkInputs, is_rewrited: &'a mut bool) -> Self { + Self { + sink_inputs, + is_rewrited, + } + } +} + +impl<'a> TreeNodeRewriter for SinkInputRewriter<'a> { + type Node = LogicalPlan; + + fn f_down(&mut self, node: Self::Node) -> DFResult> { + if let LogicalPlan::Extension(extension) = &node { + if let Some(sink_node) = extension.node.as_any().downcast_ref::() { + if let Some(named_node) = sink_node.node_name() { + if let Some(inputs) = self.sink_inputs.remove(&named_node) { + let extension = LogicalPlan::Extension(Extension { + node: sink_node.with_exprs_and_inputs(vec![], inputs)?, + }); + return Ok(Transformed::new(extension, true, TreeNodeRecursion::Jump)); + } else { + *self.is_rewrited = true; + } + } + } + } + Ok(Transformed::no(node)) + } +} diff --git a/crates/arroyo-planner/src/test/queries/test_merge_sink.sql b/crates/arroyo-planner/src/test/queries/test_merge_sink.sql new file mode 100644 index 000000000..fa58aeaa8 --- /dev/null +++ b/crates/arroyo-planner/src/test/queries/test_merge_sink.sql @@ -0,0 +1,25 @@ +CREATE TABLE cars ( + timestamp TIMESTAMP, + driver_id BIGINT, + event_type TEXT, + location TEXT +) WITH ( + connector = 'single_file', + path = 'cars.json', + format = 'json', + type = 'source' +); + +CREATE TABLE cars_output ( + timestamp TIMESTAMP, + driver_id BIGINT, + event_type TEXT, + location TEXT +) WITH ( + connector = 'single_file', + path = 'cars_output.json', + format = 'json', + type = 'sink' +); +INSERT INTO cars_output SELECT * FROM cars WHERE driver_id = 100 AND event_type = 'pickup'; +INSERT INTO cars_output SELECT * FROM cars WHERE driver_id = 101 AND event_type = 'dropoff'; diff --git a/crates/arroyo-sql-testing/golden_outputs/test_merge_sink.json b/crates/arroyo-sql-testing/golden_outputs/test_merge_sink.json new file mode 100644 index 000000000..c83680220 --- /dev/null +++ b/crates/arroyo-sql-testing/golden_outputs/test_merge_sink.json @@ -0,0 +1,78 @@ +{"timestamp": "2023-09-18T14:28:56", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"} +{"timestamp": "2023-09-18T14:49:46", "driver_id": 100, "event_type": "pickup", "location": "Chinatown"} +{"timestamp": "2023-09-18T15:30:11", "driver_id": 100, "event_type": "pickup", "location": "Parkside"} +{"timestamp": "2023-09-18T16:14:44", "driver_id": 100, "event_type": "pickup", "location": "Portola"} +{"timestamp": "2023-09-18T16:49:39", "driver_id": 100, "event_type": "pickup", "location": "Portola"} +{"timestamp": "2023-09-18T17:41:24", "driver_id": 100, "event_type": "pickup", "location": "Potrero Hill"} +{"timestamp": "2023-09-18T18:07:14", "driver_id": 100, "event_type": "pickup", "location": "Noe Valley"} +{"timestamp": "2023-09-18T18:43:30", "driver_id": 100, "event_type": "pickup", "location": "Fillmore"} +{"timestamp": "2023-09-18T19:16:04", "driver_id": 100, "event_type": "pickup", "location": "Nob Hill"} +{"timestamp": "2023-09-18T19:44:47", "driver_id": 100, "event_type": "pickup", "location": "Potrero Hill"} +{"timestamp": "2023-09-18T20:21:16", "driver_id": 100, "event_type": "pickup", "location": "Yerba Buena"} +{"timestamp": "2023-09-18T21:09:21", "driver_id": 100, "event_type": "pickup", "location": "Ingleside"} +{"timestamp": "2023-09-18T21:55:11", "driver_id": 100, "event_type": "pickup", "location": "Noe Valley"} +{"timestamp": "2023-09-18T22:32:54", "driver_id": 100, "event_type": "pickup", "location": "Potrero Hill"} +{"timestamp": "2023-09-18T23:16:13", "driver_id": 100, "event_type": "pickup", "location": "SOMA"} +{"timestamp": "2023-09-19T00:02:29", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"} +{"timestamp": "2023-09-19T00:50:37", "driver_id": 100, "event_type": "pickup", "location": "Chinatown"} +{"timestamp": "2023-09-19T01:10:47", "driver_id": 100, "event_type": "pickup", "location": "Japantown"} +{"timestamp": "2023-09-19T02:12:39", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"} +{"timestamp": "2023-09-19T02:41:42", "driver_id": 100, "event_type": "pickup", "location": "Tenderloin"} +{"timestamp": "2023-09-19T03:24:47", "driver_id": 100, "event_type": "pickup", "location": "Asbury Heights"} +{"timestamp": "2023-09-19T03:36:12", "driver_id": 100, "event_type": "pickup", "location": "Parkside"} +{"timestamp": "2023-09-19T03:53:34", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"} +{"timestamp": "2023-09-19T04:41:47", "driver_id": 100, "event_type": "pickup", "location": "Russian Hill"} +{"timestamp": "2023-09-19T05:46:00", "driver_id": 100, "event_type": "pickup", "location": "Dogpatch"} +{"timestamp": "2023-09-19T06:47:18", "driver_id": 100, "event_type": "pickup", "location": "Chinatown"} +{"timestamp": "2023-09-19T07:44:33", "driver_id": 100, "event_type": "pickup", "location": "Richmond"} +{"timestamp": "2023-09-19T08:49:31", "driver_id": 100, "event_type": "pickup", "location": "Tenderloin"} +{"timestamp": "2023-09-19T09:27:13", "driver_id": 100, "event_type": "pickup", "location": "SOMA"} +{"timestamp": "2023-09-19T09:46:19", "driver_id": 100, "event_type": "pickup", "location": "SOMA"} +{"timestamp": "2023-09-19T10:27:22", "driver_id": 100, "event_type": "pickup", "location": "Mission"} +{"timestamp": "2023-09-19T10:59:30", "driver_id": 100, "event_type": "pickup", "location": "Asbury Heights"} +{"timestamp": "2023-09-19T11:24:27", "driver_id": 100, "event_type": "pickup", "location": "Dogpatch"} +{"timestamp": "2023-09-19T11:53:56", "driver_id": 100, "event_type": "pickup", "location": "Sunset"} +{"timestamp": "2023-09-19T12:28:58", "driver_id": 100, "event_type": "pickup", "location": "Ingleside"} +{"timestamp": "2023-09-19T13:03:59", "driver_id": 100, "event_type": "pickup", "location": "Yerba Buena"} +{"timestamp": "2023-09-19T14:07:16", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"} +{"timestamp": "2023-09-18T14:36:19", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"} +{"timestamp": "2023-09-18T14:56:01", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"} +{"timestamp": "2023-09-18T15:46:32", "driver_id": 101, "event_type": "dropoff", "location": "Yerba Buena"} +{"timestamp": "2023-09-18T16:06:24", "driver_id": 101, "event_type": "dropoff", "location": "Richmond"} +{"timestamp": "2023-09-18T17:14:04", "driver_id": 101, "event_type": "dropoff", "location": "Bayview"} +{"timestamp": "2023-09-18T17:59:08", "driver_id": 101, "event_type": "dropoff", "location": "Treasure Island"} +{"timestamp": "2023-09-18T18:24:20", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"} +{"timestamp": "2023-09-18T18:47:15", "driver_id": 101, "event_type": "dropoff", "location": "Fillmore"} +{"timestamp": "2023-09-18T19:36:34", "driver_id": 101, "event_type": "dropoff", "location": "Mission"} +{"timestamp": "2023-09-18T20:33:46", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"} +{"timestamp": "2023-09-18T20:46:17", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"} +{"timestamp": "2023-09-18T21:14:45", "driver_id": 101, "event_type": "dropoff", "location": "Dogpatch"} +{"timestamp": "2023-09-18T22:15:49", "driver_id": 101, "event_type": "dropoff", "location": "Mission"} +{"timestamp": "2023-09-18T22:39:11", "driver_id": 101, "event_type": "dropoff", "location": "Chinatown"} +{"timestamp": "2023-09-18T23:35:49", "driver_id": 101, "event_type": "dropoff", "location": "SOMA"} +{"timestamp": "2023-09-18T23:49:59", "driver_id": 101, "event_type": "dropoff", "location": "Mission"} +{"timestamp": "2023-09-19T00:20:59", "driver_id": 101, "event_type": "dropoff", "location": "Yerba Buena"} +{"timestamp": "2023-09-19T00:59:00", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"} +{"timestamp": "2023-09-19T01:49:47", "driver_id": 101, "event_type": "dropoff", "location": "Portola"} +{"timestamp": "2023-09-19T02:05:42", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"} +{"timestamp": "2023-09-19T02:39:47", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"} +{"timestamp": "2023-09-19T02:56:49", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"} +{"timestamp": "2023-09-19T03:38:08", "driver_id": 101, "event_type": "dropoff", "location": "Mission"} +{"timestamp": "2023-09-19T04:38:17", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"} +{"timestamp": "2023-09-19T04:58:07", "driver_id": 101, "event_type": "dropoff", "location": "Treasure Island"} +{"timestamp": "2023-09-19T05:42:13", "driver_id": 101, "event_type": "dropoff", "location": "Yerba Buena"} +{"timestamp": "2023-09-19T06:18:40", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"} +{"timestamp": "2023-09-19T06:46:19", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"} +{"timestamp": "2023-09-19T07:01:08", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"} +{"timestamp": "2023-09-19T07:53:38", "driver_id": 101, "event_type": "dropoff", "location": "Bayview"} +{"timestamp": "2023-09-19T08:12:35", "driver_id": 101, "event_type": "dropoff", "location": "Nob Hill"} +{"timestamp": "2023-09-19T09:17:21", "driver_id": 101, "event_type": "dropoff", "location": "Mission"} +{"timestamp": "2023-09-19T09:31:35", "driver_id": 101, "event_type": "dropoff", "location": "Mission"} +{"timestamp": "2023-09-19T10:16:05", "driver_id": 101, "event_type": "dropoff", "location": "Nob Hill"} +{"timestamp": "2023-09-19T10:52:37", "driver_id": 101, "event_type": "dropoff", "location": "Portola"} +{"timestamp": "2023-09-19T11:23:49", "driver_id": 101, "event_type": "dropoff", "location": "Treasure Island"} +{"timestamp": "2023-09-19T12:14:29", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"} +{"timestamp": "2023-09-19T12:35:29", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"} +{"timestamp": "2023-09-19T13:08:40", "driver_id": 101, "event_type": "dropoff", "location": "Fillmore"} +{"timestamp": "2023-09-19T13:29:18", "driver_id": 101, "event_type": "dropoff", "location": "Tenderloin"} +{"timestamp": "2023-09-19T14:18:26", "driver_id": 101, "event_type": "dropoff", "location": "Potrero Hill"} diff --git a/crates/arroyo-sql-testing/src/test/queries/test_merge_sink.sql b/crates/arroyo-sql-testing/src/test/queries/test_merge_sink.sql new file mode 100644 index 000000000..9f00b3707 --- /dev/null +++ b/crates/arroyo-sql-testing/src/test/queries/test_merge_sink.sql @@ -0,0 +1,25 @@ +CREATE TABLE cars ( + timestamp TIMESTAMP, + driver_id BIGINT, + event_type TEXT, + location TEXT +) WITH ( + connector = 'single_file', + path = '$input_dir/cars.json', + format = 'json', + type = 'source' +); + +CREATE TABLE cars_output ( + timestamp TIMESTAMP, + driver_id BIGINT, + event_type TEXT, + location TEXT +) WITH ( + connector = 'single_file', + path = '$output_path', + format = 'json', + type = 'sink' +); +INSERT INTO cars_output SELECT * FROM cars WHERE driver_id = 100 AND event_type = 'pickup'; +INSERT INTO cars_output SELECT * FROM cars WHERE driver_id = 101 AND event_type = 'dropoff'; From 7aaa079ef028825dcde4be7cb746b6ae52906b72 Mon Sep 17 00:00:00 2001 From: zhuliquan Date: Tue, 3 Dec 2024 09:52:55 +0800 Subject: [PATCH 2/2] fix: take clippy suggestion for lifetime --- crates/arroyo-planner/src/rewriters.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/arroyo-planner/src/rewriters.rs b/crates/arroyo-planner/src/rewriters.rs index 304e226c9..611822719 100644 --- a/crates/arroyo-planner/src/rewriters.rs +++ b/crates/arroyo-planner/src/rewriters.rs @@ -706,7 +706,7 @@ impl<'a> SinkInputRewriter<'a> { } } -impl<'a> TreeNodeRewriter for SinkInputRewriter<'a> { +impl TreeNodeRewriter for SinkInputRewriter<'_> { type Node = LogicalPlan; fn f_down(&mut self, node: Self::Node) -> DFResult> {