Skip to content

Commit

Permalink
Fixes to chaining optimizer, visualization, and watermark handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Dec 3, 2024
1 parent f11a90d commit 3b61bd6
Show file tree
Hide file tree
Showing 38 changed files with 907 additions and 577 deletions.
24 changes: 16 additions & 8 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use axum::{debug_handler, Json};
use axum_extra::extract::WithRejection;
use http::StatusCode;

use petgraph::visit::NodeRef;
use petgraph::{Direction, EdgeDirection};
use std::collections::HashMap;
use std::num::ParseIntError;
use std::str::FromStr;
use std::string::ParseError;
use petgraph::visit::NodeRef;
use std::time::{Duration, SystemTime};

use crate::{compiler_service, connection_profiles, jobs, types};
Expand Down Expand Up @@ -274,9 +274,9 @@ async fn register_schemas(compiled_sql: &mut CompiledSql) -> anyhow::Result<()>
if node.operator_name == OperatorName::ConnectorSink {
let mut op = ConnectorOp::decode(&node.operator_config[..]).map_err(|_| {
anyhow!(
"failed to decode configuration for connector node {:?}",
node
)
"failed to decode configuration for connector node {:?}",
node
)
})?;

try_register_confluent_schema(&mut op, &schema).await?;
Expand Down Expand Up @@ -329,11 +329,13 @@ pub(crate) async fn create_pipeline_int<'a>(
for idx in g.node_indices() {
let should_replace = {
let node = &g.node_weight(idx).unwrap().operator_chain;
node.is_sink() && node.iter().next().unwrap().0.operator_config != default_sink().encode_to_vec()
node.is_sink()
&& node.iter().next().unwrap().0.operator_config
!= default_sink().encode_to_vec()
};
if should_replace {
if enable_sinks {
todo!("enable sinks")
todo!("enable sinks")
// let new_idx = g.add_node(LogicalNode {
// operator_id: format!("{}_1", g.node_weight(idx).unwrap().operator_id),
// description: "Preview sink".to_string(),
Expand All @@ -349,8 +351,14 @@ pub(crate) async fn create_pipeline_int<'a>(
// g.add_edge(source, new_idx, weight);
// }
} else {
g.node_weight_mut(idx).unwrap().operator_chain.iter_mut().next().unwrap().0.operator_config =
default_sink().encode_to_vec();
g.node_weight_mut(idx)
.unwrap()
.operator_chain
.iter_mut()
.next()
.unwrap()
.0
.operator_config = default_sink().encode_to_vec();
}
}
}
Expand Down
81 changes: 46 additions & 35 deletions crates/arroyo-connectors/src/impulse/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ impl ImpulseSourceFunc {
}
}

async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> SourceFinishType {
async fn run(
&mut self,
ctx: &mut SourceContext,
collector: &mut SourceCollector,
) -> SourceFinishType {
let delay = self.delay(ctx);
info!(
"Starting impulse source with start {} delay {:?} and limit {}",
Expand Down Expand Up @@ -138,18 +142,19 @@ impl ImpulseSourceFunc {
let counter_column = counter_builder.finish();
let task_index_column = task_index_scalar.to_array_of_size(items).unwrap();
let timestamp_column = timestamp_builder.finish();
collector.collect(
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(counter_column),
Arc::new(task_index_column),
Arc::new(timestamp_column),
],
collector
.collect(
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(counter_column),
Arc::new(task_index_column),
Arc::new(timestamp_column),
],
)
.unwrap(),
)
.unwrap(),
)
.await;
.await;
items = 0;
}

Expand All @@ -163,18 +168,19 @@ impl ImpulseSourceFunc {
let counter_column = counter_builder.finish();
let task_index_column = task_index_scalar.to_array_of_size(items).unwrap();
let timestamp_column = timestamp_builder.finish();
collector.collect(
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(counter_column),
Arc::new(task_index_column),
Arc::new(timestamp_column),
],
collector
.collect(
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(counter_column),
Arc::new(task_index_column),
Arc::new(timestamp_column),
],
)
.unwrap(),
)
.unwrap(),
)
.await;
.await;
items = 0;
}
ctx.table_manager
Expand Down Expand Up @@ -222,18 +228,19 @@ impl ImpulseSourceFunc {
let counter_column = counter_builder.finish();
let task_index_column = task_index_scalar.to_array_of_size(items).unwrap();
let timestamp_column = timestamp_builder.finish();
collector.collect(
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(counter_column),
Arc::new(task_index_column),
Arc::new(timestamp_column),
],
collector
.collect(
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(counter_column),
Arc::new(task_index_column),
Arc::new(timestamp_column),
],
)
.unwrap(),
)
.unwrap(),
)
.await;
.await;
}

SourceFinishType::Final
Expand Down Expand Up @@ -262,7 +269,11 @@ impl SourceOperator for ImpulseSourceFunc {
}
}

async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> SourceFinishType {
async fn run(
&mut self,
ctx: &mut SourceContext,
collector: &mut SourceCollector,
) -> SourceFinishType {
self.run(ctx, collector).await
}
}
78 changes: 41 additions & 37 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,48 +398,52 @@ impl Connector for KafkaConnector {
None
};

Ok(ConstructedOperator::from_source(Box::new(KafkaSourceFunc {
topic: table.topic,
bootstrap_servers: profile.bootstrap_servers.to_string(),
group_id: group_id.clone(),
group_id_prefix: group_id_prefix.clone(),
offset_mode: *offset,
format: config.format.expect("Format must be set for Kafka source"),
framing: config.framing,
schema_resolver,
bad_data: config.bad_data,
client_configs,
context: Context::new(Some(profile.clone())),
messages_per_second: NonZeroU32::new(
config
.rate_limit
.map(|l| l.messages_per_second)
.unwrap_or(u32::MAX),
)
.unwrap(),
metadata_fields: config.metadata_fields,
})))
Ok(ConstructedOperator::from_source(Box::new(
KafkaSourceFunc {
topic: table.topic,
bootstrap_servers: profile.bootstrap_servers.to_string(),
group_id: group_id.clone(),
group_id_prefix: group_id_prefix.clone(),
offset_mode: *offset,
format: config.format.expect("Format must be set for Kafka source"),
framing: config.framing,
schema_resolver,
bad_data: config.bad_data,
client_configs,
context: Context::new(Some(profile.clone())),
messages_per_second: NonZeroU32::new(
config
.rate_limit
.map(|l| l.messages_per_second)
.unwrap_or(u32::MAX),
)
.unwrap(),
metadata_fields: config.metadata_fields,
},
)))
}
TableType::Sink {
commit_mode,
key_field,
timestamp_field,
} => Ok(ConstructedOperator::from_operator(Box::new(KafkaSinkFunc {
bootstrap_servers: profile.bootstrap_servers.to_string(),
producer: None,
consistency_mode: (*commit_mode).into(),
timestamp_field: timestamp_field.clone(),
timestamp_col: None,
key_field: key_field.clone(),
key_col: None,
write_futures: vec![],
client_config: client_configs(&profile, Some(table.clone()))?,
context: Context::new(Some(profile.clone())),
topic: table.topic,
serializer: ArrowSerializer::new(
config.format.expect("Format must be defined for KafkaSink"),
),
}))),
} => Ok(ConstructedOperator::from_operator(Box::new(
KafkaSinkFunc {
bootstrap_servers: profile.bootstrap_servers.to_string(),
producer: None,
consistency_mode: (*commit_mode).into(),
timestamp_field: timestamp_field.clone(),
timestamp_col: None,
key_field: key_field.clone(),
key_col: None,
write_futures: vec![],
client_config: client_configs(&profile, Some(table.clone()))?,
context: Context::new(Some(profile.clone())),
topic: table.topic,
serializer: ArrowSerializer::new(
config.format.expect("Format must be defined for KafkaSink"),
),
},
))),
}
}
}
Expand Down
21 changes: 18 additions & 3 deletions crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,12 @@ impl ArrowOperator for KafkaSinkFunc {
.expect("Producer creation failed");
}

async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut OperatorContext, _: &mut dyn Collector) {
async fn process_batch(
&mut self,
batch: RecordBatch,
ctx: &mut OperatorContext,
_: &mut dyn Collector,
) {
let values = self.serializer.serialize(&batch);
let timestamps = batch
.column(
Expand All @@ -306,7 +311,12 @@ impl ArrowOperator for KafkaSinkFunc {
}
}

async fn handle_checkpoint(&mut self, _: CheckpointBarrier, ctx: &mut OperatorContext, _: &mut dyn Collector) {
async fn handle_checkpoint(
&mut self,
_: CheckpointBarrier,
ctx: &mut OperatorContext,
_: &mut dyn Collector,
) {
self.flush(ctx).await;
if let ConsistencyMode::ExactlyOnce {
next_transaction_index,
Expand Down Expand Up @@ -372,7 +382,12 @@ impl ArrowOperator for KafkaSinkFunc {
.expect("sent commit event");
}

async fn on_close(&mut self, final_message: &Option<SignalMessage>, ctx: &mut OperatorContext, _: &mut dyn Collector) {
async fn on_close(
&mut self,
final_message: &Option<SignalMessage>,
ctx: &mut OperatorContext,
_: &mut dyn Collector,
) {
self.flush(ctx).await;
if !self.is_committing() {
return;
Expand Down
23 changes: 16 additions & 7 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ impl KafkaSourceFunc {
partitions
.iter()
.enumerate()
.filter(|(i, _)| i % ctx.task_info.parallelism as usize == ctx.task_info.task_index as usize)
.filter(|(i, _)| {
i % ctx.task_info.parallelism as usize == ctx.task_info.task_index as usize
})
.map(|(_, p)| {
let offset = state
.get(&p.id())
Expand Down Expand Up @@ -145,7 +147,11 @@ impl KafkaSourceFunc {
Ok(consumer)
}

async fn run_int(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> Result<SourceFinishType, UserError> {
async fn run_int(
&mut self,
ctx: &mut SourceContext,
collector: &mut SourceCollector,
) -> Result<SourceFinishType, UserError> {
let consumer = self
.get_consumer(ctx)
.await
Expand All @@ -157,10 +163,9 @@ impl KafkaSourceFunc {
if consumer.assignment().unwrap().count() == 0 {
warn!("Kafka Consumer {}-{} is subscribed to no partitions, as there are more subtasks than partitions... setting idle",
ctx.task_info.operator_id, ctx.task_info.task_index);
collector.broadcast(ArrowMessage::Signal(SignalMessage::Watermark(
Watermark::Idle,
)))
.await;
collector
.broadcast(SignalMessage::Watermark(Watermark::Idle))
.await;
}

if let Some(schema_resolver) = &self.schema_resolver {
Expand Down Expand Up @@ -285,7 +290,11 @@ impl KafkaSourceFunc {

#[async_trait]
impl SourceOperator for KafkaSourceFunc {
async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> SourceFinishType {
async fn run(
&mut self,
ctx: &mut SourceContext,
collector: &mut SourceCollector,
) -> SourceFinishType {
match self.run_int(ctx, collector).await {
Ok(r) => r,
Err(e) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/kafka/source/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use crate::kafka::SourceOffset;
use arroyo_operator::context::{batch_bounded, OperatorContext, BatchReceiver};
use arroyo_operator::context::{batch_bounded, BatchReceiver, OperatorContext};
use arroyo_operator::operator::SourceOperator;
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{Format, RawStringFormat};
Expand Down
Loading

0 comments on commit 3b61bd6

Please sign in to comment.