Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature merge sink #794

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/arroyo-planner/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ pub(crate) enum NamedNode {
Source(TableReference),
Watermark(TableReference),
RemoteTable(TableReference),
Sink(TableReference),
}

struct ArroyoExtensionPlanner {}
Expand Down
42 changes: 19 additions & 23 deletions crates/arroyo-planner/src/extension/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arroyo_rpc::{
df::{ArroyoSchema, ArroyoSchemaRef},
UPDATING_META_FIELD,
};
use datafusion::common::{internal_err, plan_err, DFSchemaRef, Result, TableReference};
use datafusion::common::{plan_err, DFSchemaRef, Result, TableReference};

use datafusion::logical_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};

Expand All @@ -29,10 +29,10 @@ pub(crate) struct SinkExtension {
pub(crate) name: TableReference,
pub(crate) table: Table,
pub(crate) schema: DFSchemaRef,
input: Arc<LogicalPlan>,
inputs: Arc<Vec<LogicalPlan>>,
}

multifield_partial_ord!(SinkExtension, name, input);
multifield_partial_ord!(SinkExtension, name, inputs);

impl SinkExtension {
pub fn new(
Expand Down Expand Up @@ -76,11 +76,12 @@ impl SinkExtension {
}
Self::add_remote_if_necessary(&schema, &mut input);

let inputs = Arc::new(vec![(*input).clone()]);
Ok(Self {
name,
table,
schema,
input,
inputs,
})
}

Expand Down Expand Up @@ -112,7 +113,7 @@ impl UserDefinedLogicalNodeCore for SinkExtension {
}

fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
self.inputs.iter().collect()
}

fn schema(&self) -> &DFSchemaRef {
Expand All @@ -128,22 +129,21 @@ impl UserDefinedLogicalNodeCore for SinkExtension {
}

fn with_exprs_and_inputs(&self, _exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
if inputs.len() != 1 {
return internal_err!("input size inconsistent");
}

Ok(Self {
name: self.name.clone(),
table: self.table.clone(),
schema: self.schema.clone(),
input: Arc::new(inputs[0].clone()),
inputs: Arc::new(inputs),
})
}
}

impl ArroyoExtension for SinkExtension {
fn node_name(&self) -> Option<NamedNode> {
None
match &self.table {
Table::PreviewSink { .. } => None,
_ => Some(NamedNode::Sink(self.name.clone())),
}
}

fn plan_node(
Expand All @@ -152,12 +152,6 @@ impl ArroyoExtension for SinkExtension {
index: usize,
input_schemas: Vec<ArroyoSchemaRef>,
) -> Result<NodeWithIncomingEdges> {
if input_schemas.len() != 1 {
return plan_err!("sink should have exactly one input");
}
// should have exactly one input
let input_schema = input_schemas[0].clone();

let operator_config = (self
.table
.connector_op()
Expand All @@ -170,16 +164,18 @@ impl ArroyoExtension for SinkExtension {
parallelism: 1,
operator_config,
};
let edge = LogicalEdge::project_all(LogicalEdgeType::Forward, (*input_schema).clone());
Ok(NodeWithIncomingEdges {
node,
edges: vec![edge],
})
let edges = input_schemas
.into_iter()
.map(|input_schema| {
LogicalEdge::project_all(LogicalEdgeType::Forward, (*input_schema).clone())
})
.collect();
Ok(NodeWithIncomingEdges { node, edges })
}

fn output_schema(&self) -> ArroyoSchema {
// this is kinda fake?
ArroyoSchema::from_schema_keys(Arc::new(self.input.schema().as_ref().into()), vec![])
ArroyoSchema::from_schema_keys(Arc::new(self.inputs[0].schema().as_ref().into()), vec![])
.unwrap()
}
}
51 changes: 49 additions & 2 deletions crates/arroyo-planner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use arrow::datatypes::{self, DataType};
use arrow_schema::{Field, FieldRef, Schema};
use arroyo_datastream::WindowType;

use builder::NamedNode;
use datafusion::common::tree_node::TreeNode;
use datafusion::common::{not_impl_err, plan_err, Column, DFSchema, Result, ScalarValue};
use datafusion::datasource::DefaultTableSource;
#[allow(deprecated)]
Expand All @@ -33,13 +35,15 @@ use datafusion::sql::{planner::ContextProvider, sqlparser, TableReference};

use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::{
create_udaf, Expr, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
WindowUDF,
create_udaf, Expr, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, Signature,
UserDefinedLogicalNode, Volatility, WindowUDF,
};

use datafusion::logical_expr::{AggregateUDF, TableSource};
use extension::ArroyoExtension;
use logical::LogicalBatchInput;

use rewriters::SinkInputRewriter;
use schemas::window_arrow_struct;
use tables::{Insert, Table};

Expand Down Expand Up @@ -633,6 +637,45 @@ pub fn rewrite_plan(
Ok(rewritten_plan.data)
}

fn build_sink_inputs(extensions: &[LogicalPlan]) -> HashMap<NamedNode, Vec<LogicalPlan>> {
let mut sink_inputs = HashMap::<NamedNode, Vec<LogicalPlan>>::new();
for extension in extensions.iter() {
if let LogicalPlan::Extension(extension) = extension {
if let Some(sink_node) = extension.node.as_any().downcast_ref::<SinkExtension>() {
if let Some(named_node) = sink_node.node_name() {
let inputs = sink_node
.inputs()
.into_iter()
.cloned()
.collect::<Vec<LogicalPlan>>();
sink_inputs.entry(named_node).or_default().extend(inputs);
}
}
}
}
sink_inputs
}

/// rewrite_sinks will rewrite sink's inputs and remove duplicated sinks
/// Collect inputs of [`SinkExtension`], it's help to merge [`SinkExtension`] with same table.
/// Each `SinkExtension` can get itself inputs (is merged previously) by named_node,
/// input [`SinkExtension`]'s named node which get by `named_node()` to get inputs.
fn rewrite_sinks(extensions: Vec<LogicalPlan>) -> Result<Vec<LogicalPlan>> {
let mut sink_inputs = build_sink_inputs(&extensions);
let mut new_extensions = vec![];
for extension in extensions {
let mut is_rewrited = false;
let result = extension.rewrite(&mut SinkInputRewriter::new(
&mut sink_inputs,
&mut is_rewrited,
))?;
if !(is_rewrited) {
new_extensions.push(result.data);
}
}
Ok(new_extensions)
}

fn try_handle_set_variable(
statement: &Statement,
schema_provider: &mut ArroyoSchemaProvider,
Expand Down Expand Up @@ -798,6 +841,10 @@ pub async fn parse_and_get_arrow_program(
node: Arc::new(sink?),
}));
}

// rewrite sink's inputs, and remove duplicated sink
let extensions = rewrite_sinks(extensions)?;

let mut plan_to_graph_visitor = PlanToGraphVisitor::new(&schema_provider, &session_state);
for extension in extensions {
plan_to_graph_visitor.add_plan(extension)?;
Expand Down
42 changes: 42 additions & 0 deletions crates/arroyo-planner/src/rewriters.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::builder::NamedNode;
use crate::extension::debezium::DebeziumUnrollingExtension;
use crate::extension::remote_table::RemoteTableExtension;
use crate::extension::sink::SinkExtension;
use crate::extension::table_source::TableSourceExtension;
use crate::extension::watermark_node::WatermarkNode;
use crate::extension::ArroyoExtension;
use crate::schemas::add_timestamp_field;
use crate::tables::ConnectorTable;
use crate::tables::FieldSpec;
Expand All @@ -15,6 +17,7 @@ use crate::{
use arrow_schema::DataType;
use arroyo_rpc::TIMESTAMP_FIELD;
use arroyo_rpc::UPDATING_META_FIELD;
use datafusion::logical_expr::UserDefinedLogicalNode;

use crate::extension::AsyncUDFExtension;
use arroyo_udf_host::parse::{AsyncOptions, UdfType};
Expand All @@ -29,6 +32,7 @@ use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::{
BinaryExpr, ColumnUnnestList, Expr, Extension, LogicalPlan, Projection, TableScan, Unnest,
};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -685,3 +689,41 @@ impl TreeNodeRewriter for TimeWindowNullCheckRemover {
Ok(Transformed::no(node))
}
}

type SinkInputs = HashMap<NamedNode, Vec<LogicalPlan>>;

pub(crate) struct SinkInputRewriter<'a> {
sink_inputs: &'a mut SinkInputs,
is_rewrited: &'a mut bool,
}

impl<'a> SinkInputRewriter<'a> {
pub(crate) fn new(sink_inputs: &'a mut SinkInputs, is_rewrited: &'a mut bool) -> Self {
Self {
sink_inputs,
is_rewrited,
}
}
}

impl TreeNodeRewriter for SinkInputRewriter<'_> {
type Node = LogicalPlan;

fn f_down(&mut self, node: Self::Node) -> DFResult<Transformed<Self::Node>> {
if let LogicalPlan::Extension(extension) = &node {
if let Some(sink_node) = extension.node.as_any().downcast_ref::<SinkExtension>() {
if let Some(named_node) = sink_node.node_name() {
if let Some(inputs) = self.sink_inputs.remove(&named_node) {
let extension = LogicalPlan::Extension(Extension {
node: sink_node.with_exprs_and_inputs(vec![], inputs)?,
});
return Ok(Transformed::new(extension, true, TreeNodeRecursion::Jump));
} else {
*self.is_rewrited = true;
}
}
}
}
Ok(Transformed::no(node))
}
}
25 changes: 25 additions & 0 deletions crates/arroyo-planner/src/test/queries/test_merge_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE TABLE cars (
timestamp TIMESTAMP,
driver_id BIGINT,
event_type TEXT,
location TEXT
) WITH (
connector = 'single_file',
path = 'cars.json',
format = 'json',
type = 'source'
);

CREATE TABLE cars_output (
timestamp TIMESTAMP,
driver_id BIGINT,
event_type TEXT,
location TEXT
) WITH (
connector = 'single_file',
path = 'cars_output.json',
format = 'json',
type = 'sink'
);
INSERT INTO cars_output SELECT * FROM cars WHERE driver_id = 100 AND event_type = 'pickup';
INSERT INTO cars_output SELECT * FROM cars WHERE driver_id = 101 AND event_type = 'dropoff';
78 changes: 78 additions & 0 deletions crates/arroyo-sql-testing/golden_outputs/test_merge_sink.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{"timestamp": "2023-09-18T14:28:56", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"}
{"timestamp": "2023-09-18T14:49:46", "driver_id": 100, "event_type": "pickup", "location": "Chinatown"}
{"timestamp": "2023-09-18T15:30:11", "driver_id": 100, "event_type": "pickup", "location": "Parkside"}
{"timestamp": "2023-09-18T16:14:44", "driver_id": 100, "event_type": "pickup", "location": "Portola"}
{"timestamp": "2023-09-18T16:49:39", "driver_id": 100, "event_type": "pickup", "location": "Portola"}
{"timestamp": "2023-09-18T17:41:24", "driver_id": 100, "event_type": "pickup", "location": "Potrero Hill"}
{"timestamp": "2023-09-18T18:07:14", "driver_id": 100, "event_type": "pickup", "location": "Noe Valley"}
{"timestamp": "2023-09-18T18:43:30", "driver_id": 100, "event_type": "pickup", "location": "Fillmore"}
{"timestamp": "2023-09-18T19:16:04", "driver_id": 100, "event_type": "pickup", "location": "Nob Hill"}
{"timestamp": "2023-09-18T19:44:47", "driver_id": 100, "event_type": "pickup", "location": "Potrero Hill"}
{"timestamp": "2023-09-18T20:21:16", "driver_id": 100, "event_type": "pickup", "location": "Yerba Buena"}
{"timestamp": "2023-09-18T21:09:21", "driver_id": 100, "event_type": "pickup", "location": "Ingleside"}
{"timestamp": "2023-09-18T21:55:11", "driver_id": 100, "event_type": "pickup", "location": "Noe Valley"}
{"timestamp": "2023-09-18T22:32:54", "driver_id": 100, "event_type": "pickup", "location": "Potrero Hill"}
{"timestamp": "2023-09-18T23:16:13", "driver_id": 100, "event_type": "pickup", "location": "SOMA"}
{"timestamp": "2023-09-19T00:02:29", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"}
{"timestamp": "2023-09-19T00:50:37", "driver_id": 100, "event_type": "pickup", "location": "Chinatown"}
{"timestamp": "2023-09-19T01:10:47", "driver_id": 100, "event_type": "pickup", "location": "Japantown"}
{"timestamp": "2023-09-19T02:12:39", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"}
{"timestamp": "2023-09-19T02:41:42", "driver_id": 100, "event_type": "pickup", "location": "Tenderloin"}
{"timestamp": "2023-09-19T03:24:47", "driver_id": 100, "event_type": "pickup", "location": "Asbury Heights"}
{"timestamp": "2023-09-19T03:36:12", "driver_id": 100, "event_type": "pickup", "location": "Parkside"}
{"timestamp": "2023-09-19T03:53:34", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"}
{"timestamp": "2023-09-19T04:41:47", "driver_id": 100, "event_type": "pickup", "location": "Russian Hill"}
{"timestamp": "2023-09-19T05:46:00", "driver_id": 100, "event_type": "pickup", "location": "Dogpatch"}
{"timestamp": "2023-09-19T06:47:18", "driver_id": 100, "event_type": "pickup", "location": "Chinatown"}
{"timestamp": "2023-09-19T07:44:33", "driver_id": 100, "event_type": "pickup", "location": "Richmond"}
{"timestamp": "2023-09-19T08:49:31", "driver_id": 100, "event_type": "pickup", "location": "Tenderloin"}
{"timestamp": "2023-09-19T09:27:13", "driver_id": 100, "event_type": "pickup", "location": "SOMA"}
{"timestamp": "2023-09-19T09:46:19", "driver_id": 100, "event_type": "pickup", "location": "SOMA"}
{"timestamp": "2023-09-19T10:27:22", "driver_id": 100, "event_type": "pickup", "location": "Mission"}
{"timestamp": "2023-09-19T10:59:30", "driver_id": 100, "event_type": "pickup", "location": "Asbury Heights"}
{"timestamp": "2023-09-19T11:24:27", "driver_id": 100, "event_type": "pickup", "location": "Dogpatch"}
{"timestamp": "2023-09-19T11:53:56", "driver_id": 100, "event_type": "pickup", "location": "Sunset"}
{"timestamp": "2023-09-19T12:28:58", "driver_id": 100, "event_type": "pickup", "location": "Ingleside"}
{"timestamp": "2023-09-19T13:03:59", "driver_id": 100, "event_type": "pickup", "location": "Yerba Buena"}
{"timestamp": "2023-09-19T14:07:16", "driver_id": 100, "event_type": "pickup", "location": "Treasure Island"}
{"timestamp": "2023-09-18T14:36:19", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"}
{"timestamp": "2023-09-18T14:56:01", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"}
{"timestamp": "2023-09-18T15:46:32", "driver_id": 101, "event_type": "dropoff", "location": "Yerba Buena"}
{"timestamp": "2023-09-18T16:06:24", "driver_id": 101, "event_type": "dropoff", "location": "Richmond"}
{"timestamp": "2023-09-18T17:14:04", "driver_id": 101, "event_type": "dropoff", "location": "Bayview"}
{"timestamp": "2023-09-18T17:59:08", "driver_id": 101, "event_type": "dropoff", "location": "Treasure Island"}
{"timestamp": "2023-09-18T18:24:20", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"}
{"timestamp": "2023-09-18T18:47:15", "driver_id": 101, "event_type": "dropoff", "location": "Fillmore"}
{"timestamp": "2023-09-18T19:36:34", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-18T20:33:46", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"}
{"timestamp": "2023-09-18T20:46:17", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"}
{"timestamp": "2023-09-18T21:14:45", "driver_id": 101, "event_type": "dropoff", "location": "Dogpatch"}
{"timestamp": "2023-09-18T22:15:49", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-18T22:39:11", "driver_id": 101, "event_type": "dropoff", "location": "Chinatown"}
{"timestamp": "2023-09-18T23:35:49", "driver_id": 101, "event_type": "dropoff", "location": "SOMA"}
{"timestamp": "2023-09-18T23:49:59", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-19T00:20:59", "driver_id": 101, "event_type": "dropoff", "location": "Yerba Buena"}
{"timestamp": "2023-09-19T00:59:00", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"}
{"timestamp": "2023-09-19T01:49:47", "driver_id": 101, "event_type": "dropoff", "location": "Portola"}
{"timestamp": "2023-09-19T02:05:42", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"}
{"timestamp": "2023-09-19T02:39:47", "driver_id": 101, "event_type": "dropoff", "location": "Civic Center"}
{"timestamp": "2023-09-19T02:56:49", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"}
{"timestamp": "2023-09-19T03:38:08", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-19T04:38:17", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"}
{"timestamp": "2023-09-19T04:58:07", "driver_id": 101, "event_type": "dropoff", "location": "Treasure Island"}
{"timestamp": "2023-09-19T05:42:13", "driver_id": 101, "event_type": "dropoff", "location": "Yerba Buena"}
{"timestamp": "2023-09-19T06:18:40", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"}
{"timestamp": "2023-09-19T06:46:19", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"}
{"timestamp": "2023-09-19T07:01:08", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"}
{"timestamp": "2023-09-19T07:53:38", "driver_id": 101, "event_type": "dropoff", "location": "Bayview"}
{"timestamp": "2023-09-19T08:12:35", "driver_id": 101, "event_type": "dropoff", "location": "Nob Hill"}
{"timestamp": "2023-09-19T09:17:21", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-19T09:31:35", "driver_id": 101, "event_type": "dropoff", "location": "Mission"}
{"timestamp": "2023-09-19T10:16:05", "driver_id": 101, "event_type": "dropoff", "location": "Nob Hill"}
{"timestamp": "2023-09-19T10:52:37", "driver_id": 101, "event_type": "dropoff", "location": "Portola"}
{"timestamp": "2023-09-19T11:23:49", "driver_id": 101, "event_type": "dropoff", "location": "Treasure Island"}
{"timestamp": "2023-09-19T12:14:29", "driver_id": 101, "event_type": "dropoff", "location": "Japantown"}
{"timestamp": "2023-09-19T12:35:29", "driver_id": 101, "event_type": "dropoff", "location": "Pacific Heights"}
{"timestamp": "2023-09-19T13:08:40", "driver_id": 101, "event_type": "dropoff", "location": "Fillmore"}
{"timestamp": "2023-09-19T13:29:18", "driver_id": 101, "event_type": "dropoff", "location": "Tenderloin"}
{"timestamp": "2023-09-19T14:18:26", "driver_id": 101, "event_type": "dropoff", "location": "Potrero Hill"}
Loading
Loading