diff --git a/crates/arroyo-api/src/pipelines.rs b/crates/arroyo-api/src/pipelines.rs index 52ce19b55..29d501297 100644 --- a/crates/arroyo-api/src/pipelines.rs +++ b/crates/arroyo-api/src/pipelines.rs @@ -159,6 +159,7 @@ async fn compile_sql<'a>( .unwrap_or(json!({})), &table.config, Some(&table.schema), + None, ) .map_err(log_and_map)?; diff --git a/crates/arroyo-connectors/src/blackhole/mod.rs b/crates/arroyo-connectors/src/blackhole/mod.rs index bed4f4726..1b3f522e6 100644 --- a/crates/arroyo-connectors/src/blackhole/mod.rs +++ b/crates/arroyo-connectors/src/blackhole/mod.rs @@ -1,5 +1,6 @@ use crate::blackhole::operator::BlackholeSinkFunc; use anyhow::anyhow; +use arrow::datatypes::DataType; use arroyo_operator::connector::{Connection, Connector}; use arroyo_operator::operator::OperatorNode; use arroyo_rpc::api_types::connections::{ @@ -78,8 +79,9 @@ impl Connector for BlackholeConnector { _options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { - self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema) + self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema, None) } fn from_config( @@ -89,6 +91,7 @@ impl Connector for BlackholeConnector { config: Self::ProfileT, table: Self::TableT, s: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let description = "Blackhole".to_string(); @@ -99,6 +102,7 @@ impl Connector for BlackholeConnector { format: None, bad_data: None, framing: None, + additional_fields: None, }; Ok(Connection { diff --git a/crates/arroyo-connectors/src/confluent/mod.rs b/crates/arroyo-connectors/src/confluent/mod.rs index c694b9ef7..aa991afbd 100644 --- a/crates/arroyo-connectors/src/confluent/mod.rs +++ b/crates/arroyo-connectors/src/confluent/mod.rs @@ -3,6 +3,7 @@ use crate::kafka::{ }; use crate::{kafka, pull_opt}; use anyhow::anyhow; +use arrow::datatypes::DataType; use arroyo_operator::connector::{Connection, Connector}; use arroyo_operator::operator::OperatorNode; use arroyo_rpc::api_types::connections::{ @@ -161,6 +162,7 @@ impl Connector for ConfluentConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let connection = profile .map(|p| { @@ -172,7 +174,7 @@ impl Connector for ConfluentConnector { let table = KafkaConnector::table_from_options(options)?; - self.from_config(None, name, connection, table, schema) + self.from_config(None, name, connection, table, schema, None) } fn from_config( @@ -182,11 +184,12 @@ impl Connector for ConfluentConnector { config: Self::ProfileT, mut table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { table .client_configs .insert("client.id".to_string(), CLIENT_ID.to_string()); - KafkaConnector {}.from_config(id, name, config.into(), table, schema) + KafkaConnector {}.from_config(id, name, config.into(), table, schema, None) } fn make_operator( diff --git a/crates/arroyo-connectors/src/filesystem/delta.rs b/crates/arroyo-connectors/src/filesystem/delta.rs index fc384668f..2c62434a1 100644 --- a/crates/arroyo-connectors/src/filesystem/delta.rs +++ b/crates/arroyo-connectors/src/filesystem/delta.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, bail}; +use arrow::datatypes::DataType; use arroyo_operator::connector::Connection; use arroyo_storage::BackendConfig; use std::collections::HashMap; @@ -77,6 +78,7 @@ impl Connector for DeltaLakeConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let TableType::Sink { write_path, @@ -123,6 +125,7 @@ impl Connector for DeltaLakeConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { @@ -142,10 +145,11 @@ impl Connector for DeltaLakeConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let table = file_system_sink_from_options(options, schema, CommitStyle::DeltaLake)?; - self.from_config(None, name, EmptyConfig {}, table, schema) + self.from_config(None, name, EmptyConfig {}, table, schema, None) } fn make_operator( diff --git a/crates/arroyo-connectors/src/filesystem/mod.rs b/crates/arroyo-connectors/src/filesystem/mod.rs index c81e24c35..d3b4706bd 100644 --- a/crates/arroyo-connectors/src/filesystem/mod.rs +++ b/crates/arroyo-connectors/src/filesystem/mod.rs @@ -3,6 +3,7 @@ mod sink; mod source; use anyhow::{anyhow, bail, Result}; +use arrow::datatypes::DataType; use arroyo_storage::BackendConfig; use regex::Regex; use std::collections::HashMap; @@ -114,6 +115,7 @@ impl Connector for FileSystemConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let (description, connection_type) = match table.table_type { TableType::Source { .. } => ("FileSystem".to_string(), ConnectionType::Source), @@ -168,6 +170,7 @@ impl Connector for FileSystemConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { @@ -187,6 +190,7 @@ impl Connector for FileSystemConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { match options.remove("type") { Some(t) if t == "source" => { @@ -210,12 +214,13 @@ impl Connector for FileSystemConnector { }, }, schema, + None, ) } Some(t) if t == "sink" => { let table = file_system_sink_from_options(options, schema, CommitStyle::Direct)?; - self.from_config(None, name, EmptyConfig {}, table, schema) + self.from_config(None, name, EmptyConfig {}, table, schema, None) } Some(t) => bail!("unknown type: {}", t), None => bail!("must have type set"), diff --git a/crates/arroyo-connectors/src/filesystem/source.rs b/crates/arroyo-connectors/src/filesystem/source.rs index c7abd4f35..3bf43665d 100644 --- a/crates/arroyo-connectors/src/filesystem/source.rs +++ b/crates/arroyo-connectors/src/filesystem/source.rs @@ -367,7 +367,7 @@ impl FileSystemSourceFunc { line = line_reader.next() => { match line.transpose()? { Some(line) => { - ctx.deserialize_slice(line.as_bytes(), SystemTime::now()).await?; + ctx.deserialize_slice(line.as_bytes(), SystemTime::now(), None).await?; records_read += 1; if ctx.should_flush() { ctx.flush_buffer().await?; diff --git a/crates/arroyo-connectors/src/fluvio/mod.rs b/crates/arroyo-connectors/src/fluvio/mod.rs index c5b2b5d09..98f9aa4bb 100644 --- a/crates/arroyo-connectors/src/fluvio/mod.rs +++ b/crates/arroyo-connectors/src/fluvio/mod.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, bail}; +use arrow::datatypes::DataType; use arroyo_formats::ser::ArrowSerializer; use arroyo_operator::connector::{Connection, Connector}; use arroyo_operator::operator::OperatorNode; @@ -88,6 +89,7 @@ impl Connector for FluvioConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let endpoint = options.remove("endpoint"); let topic = pull_opt("topic", options)?; @@ -116,7 +118,7 @@ impl Connector for FluvioConnector { type_: table_type, }; - Self::from_config(self, None, name, EmptyConfig {}, table, schema) + Self::from_config(self, None, name, EmptyConfig {}, table, schema, None) } fn from_config( @@ -126,6 +128,7 @@ impl Connector for FluvioConnector { config: EmptyConfig, table: FluvioTable, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let (typ, desc) = match table.type_ { TableType::Source { .. } => ( @@ -154,6 +157,7 @@ impl Connector for FluvioConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { diff --git a/crates/arroyo-connectors/src/fluvio/source.rs b/crates/arroyo-connectors/src/fluvio/source.rs index e0315c6f1..4e7468a78 100644 --- a/crates/arroyo-connectors/src/fluvio/source.rs +++ b/crates/arroyo-connectors/src/fluvio/source.rs @@ -166,7 +166,7 @@ impl FluvioSourceFunc { match message { Some((_, Ok(msg))) => { let timestamp = from_millis(msg.timestamp().max(0) as u64); - ctx.deserialize_slice(msg.value(), timestamp).await?; + ctx.deserialize_slice(msg.value(), timestamp, None).await?; if ctx.should_flush() { ctx.flush_buffer().await?; diff --git a/crates/arroyo-connectors/src/impulse/mod.rs b/crates/arroyo-connectors/src/impulse/mod.rs index 2cee9b1c7..c5382a369 100644 --- a/crates/arroyo-connectors/src/impulse/mod.rs +++ b/crates/arroyo-connectors/src/impulse/mod.rs @@ -1,6 +1,7 @@ mod operator; use anyhow::{anyhow, bail}; +use arrow::datatypes::DataType; use arroyo_operator::connector::{Connection, Connector}; use arroyo_operator::operator::OperatorNode; use arroyo_rpc::api_types::connections::FieldType::Primitive; @@ -101,6 +102,7 @@ impl Connector for ImpulseConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let event_rate = f64::from_str(&pull_opt("event_rate", options)?) .map_err(|_| anyhow!("invalid value for event_rate; expected float"))?; @@ -134,6 +136,7 @@ impl Connector for ImpulseConnector { message_count, }, None, + None, ) } @@ -144,6 +147,7 @@ impl Connector for ImpulseConnector { config: Self::ProfileT, table: Self::TableT, _: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let description = format!( "{}Impulse<{} eps{}>", @@ -166,6 +170,7 @@ impl Connector for ImpulseConnector { format: None, bad_data: None, framing: None, + additional_fields: None, }; Ok(Connection { diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index 9d3d97fd4..b3faf20a2 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, bail}; +use arrow::datatypes::DataType; use arroyo_formats::de::ArrowDeserializer; use arroyo_formats::ser::ArrowSerializer; use arroyo_operator::connector::Connection; @@ -188,6 +189,7 @@ impl Connector for KafkaConnector { config: KafkaConfig, table: KafkaTable, schema: Option<&ConnectionSchema>, + metadata_fields: Option>, ) -> anyhow::Result { let (typ, desc) = match table.type_ { TableType::Source { .. } => ( @@ -207,6 +209,13 @@ impl Connector for KafkaConnector { .map(|t| t.to_owned()) .ok_or_else(|| anyhow!("'format' must be set for Kafka connection"))?; + let metadata_fields = metadata_fields.map(|fields| { + fields + .into_iter() + .map(|(k, (v, _))| (k, v)) + .collect::>() + }); + let config = OperatorConfig { connection: serde_json::to_value(config).unwrap(), table: serde_json::to_value(table).unwrap(), @@ -214,6 +223,7 @@ impl Connector for KafkaConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: metadata_fields, }; Ok(Connection { @@ -312,6 +322,7 @@ impl Connector for KafkaConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, profile: Option<&ConnectionProfile>, + metadata_fields: Option>, ) -> anyhow::Result { let connection = profile .map(|p| { @@ -323,7 +334,37 @@ impl Connector for KafkaConnector { let table = Self::table_from_options(options)?; - Self::from_config(self, None, name, connection, table, schema) + let allowed_metadata_udf_args: HashMap<&str, DataType> = [ + ("offset_id", DataType::Int64), + ("partition", DataType::Int32), + ("topic", DataType::Utf8), + ] + .iter() + .cloned() + .collect(); + + if let Some(fields) = &metadata_fields { + for (field_name, data_type) in fields.values() { + match allowed_metadata_udf_args.get(field_name.as_str()) { + Some(expected_type) => { + if expected_type != data_type { + return Err(anyhow!( + "Invalid datatype for metadata field '{}': expected '{:?}', found '{:?}'", + field_name, expected_type, data_type + )); + } + } + None => { + return Err(anyhow!( + "Invalid metadata field name for Kafka connector: '{}'", + field_name + )); + } + } + } + } + + Self::from_config(self, None, name, connection, table, schema, metadata_fields) } fn make_operator( @@ -383,6 +424,7 @@ impl Connector for KafkaConnector { .unwrap_or(u32::MAX), ) .unwrap(), + metadata_fields: config.additional_fields, }))) } TableType::Sink { @@ -622,7 +664,7 @@ impl KafkaTester { let mut builders = aschema.builders(); let mut error = deserializer - .deserialize_slice(&mut builders, &msg, SystemTime::now()) + .deserialize_slice(&mut builders, &msg, SystemTime::now(), None) .await .into_iter() .next(); @@ -644,7 +686,7 @@ impl KafkaTester { let mut builders = aschema.builders(); let mut error = deserializer - .deserialize_slice(&mut builders, &msg, SystemTime::now()) + .deserialize_slice(&mut builders, &msg, SystemTime::now(), None) .await .into_iter() .next(); @@ -678,7 +720,7 @@ impl KafkaTester { let mut builders = aschema.builders(); let mut error = deserializer - .deserialize_slice(&mut builders, &msg, SystemTime::now()) + .deserialize_slice(&mut builders, &msg, SystemTime::now(), None) .await .into_iter() .next(); diff --git a/crates/arroyo-connectors/src/kafka/source/mod.rs b/crates/arroyo-connectors/src/kafka/source/mod.rs index d0d03dc63..f95c4a479 100644 --- a/crates/arroyo-connectors/src/kafka/source/mod.rs +++ b/crates/arroyo-connectors/src/kafka/source/mod.rs @@ -1,3 +1,4 @@ +use arroyo_formats::de::FieldValueType; use arroyo_rpc::formats::{BadData, Format, Framing}; use arroyo_rpc::grpc::rpc::TableConfig; use arroyo_rpc::schema_resolver::SchemaResolver; @@ -35,6 +36,7 @@ pub struct KafkaSourceFunc { pub schema_resolver: Option>, pub client_configs: HashMap, pub messages_per_second: NonZeroU32, + pub metadata_fields: Option>, } #[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, PartialOrd)] @@ -178,7 +180,30 @@ impl KafkaSourceFunc { .ok_or_else(|| UserError::new("Failed to read timestamp from Kafka record", "The message read from Kafka did not contain a message timestamp"))?; - ctx.deserialize_slice(v, from_millis(timestamp as u64)).await?; + let connector_metadata = if let Some(metadata_fields) = &self.metadata_fields { + let mut connector_metadata = HashMap::new(); + for (key, value) in metadata_fields { + match value.as_str() { + "offset_id" => { + connector_metadata.insert(key, FieldValueType::Int64(msg.offset())); + } + "partition" => { + connector_metadata.insert(key, FieldValueType::Int32(msg.partition())); + } + "topic" => { + connector_metadata.insert(key, FieldValueType::String(&self.topic)); + } + _ => {} + } + } + Some(connector_metadata) + } else { + None + }; + + + ctx.deserialize_slice(v, from_millis(timestamp as u64), connector_metadata).await?; + if ctx.should_flush() { ctx.flush_buffer().await?; diff --git a/crates/arroyo-connectors/src/kafka/source/test.rs b/crates/arroyo-connectors/src/kafka/source/test.rs index 0057b7ae8..a8af9cb9d 100644 --- a/crates/arroyo-connectors/src/kafka/source/test.rs +++ b/crates/arroyo-connectors/src/kafka/source/test.rs @@ -87,6 +87,7 @@ impl KafkaTopicTester { schema_resolver: None, client_configs: HashMap::new(), messages_per_second: NonZeroU32::new(100).unwrap(), + metadata_fields: None, }); let (to_control_tx, control_rx) = channel(128); @@ -342,3 +343,101 @@ async fn test_kafka() { ) .await; } + +#[tokio::test] +async fn test_kafka_with_metadata_fields() { + let mut kafka_topic_tester = KafkaTopicTester { + topic: "__arroyo-source-test_metadata".to_string(), + server: "0.0.0.0:9092".to_string(), + group_id: Some("test-consumer-group".to_string()), + }; + + let mut task_info = arroyo_types::get_test_task_info(); + task_info.job_id = format!("kafka-job-{}", random::()); + + kafka_topic_tester.create_topic().await; + + // Prepare metadata fields + let metadata_fields = Some(HashMap::from([( + "offset".to_string(), + "offset_id".to_string(), + )])); + + // Set metadata fields in KafkaSourceFunc + let mut kafka = KafkaSourceFunc { + bootstrap_servers: kafka_topic_tester.server.clone(), + topic: kafka_topic_tester.topic.clone(), + group_id: kafka_topic_tester.group_id.clone(), + group_id_prefix: None, + offset_mode: SourceOffset::Earliest, + format: Format::RawString(RawStringFormat {}), + framing: None, + bad_data: None, + schema_resolver: None, + client_configs: HashMap::new(), + messages_per_second: NonZeroU32::new(100).unwrap(), + metadata_fields, + }; + + let (_to_control_tx, control_rx) = channel(128); + let (command_tx, _from_control_rx) = channel(128); + let (data_tx, _recv) = batch_bounded(128); + + let checkpoint_metadata = None; + + let mut ctx = ArrowContext::new( + task_info.clone(), + checkpoint_metadata, + control_rx, + command_tx, + 1, + vec![], + Some(ArroyoSchema::new_unkeyed( + Arc::new(Schema::new(vec![ + Field::new( + "_timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("value", DataType::Utf8, false), + Field::new("offset", DataType::Int64, false), + ])), + 0, + )), + None, + vec![vec![data_tx]], + kafka.tables(), + ) + .await; + + tokio::spawn(async move { + kafka.run(&mut ctx).await; + }); + + let mut reader = kafka_topic_tester + .get_source_with_reader(task_info.clone(), None) + .await; + let mut producer = kafka_topic_tester.get_producer(); + + // Send test data + let expected_messages: Vec<_> = (1u64..=21) + .map(|i| { + let data = TestData { i }; + producer.send_data(data.clone()); + serde_json::to_string(&data).unwrap() + }) + .collect(); + + // Verify received messages + reader + .assert_next_message_record_values(expected_messages.into()) + .await; + + reader + .to_control_tx + .send(ControlMessage::Stop { + mode: arroyo_rpc::grpc::rpc::StopMode::Graceful, + }) + .await + .unwrap(); +} diff --git a/crates/arroyo-connectors/src/kinesis/mod.rs b/crates/arroyo-connectors/src/kinesis/mod.rs index 085b00ec3..4014a7a75 100644 --- a/crates/arroyo-connectors/src/kinesis/mod.rs +++ b/crates/arroyo-connectors/src/kinesis/mod.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, bail, Result}; +use arrow::datatypes::DataType; use std::collections::HashMap; use typify::import_types; @@ -83,6 +84,7 @@ impl Connector for KinesisConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let (connection_type, description) = match table.type_ { TableType::Source { .. } => ( @@ -111,6 +113,7 @@ impl Connector for KinesisConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { @@ -130,6 +133,7 @@ impl Connector for KinesisConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let typ = pull_opt("type", options)?; let table_type = match typ.as_str() { @@ -166,7 +170,7 @@ impl Connector for KinesisConnector { aws_region: options.remove("aws_region").map(|s| s.to_string()), }; - Self::from_config(self, None, name, EmptyConfig {}, table, schema) + Self::from_config(self, None, name, EmptyConfig {}, table, schema, None) } fn make_operator( diff --git a/crates/arroyo-connectors/src/kinesis/source.rs b/crates/arroyo-connectors/src/kinesis/source.rs index 54d9ab150..39890d575 100644 --- a/crates/arroyo-connectors/src/kinesis/source.rs +++ b/crates/arroyo-connectors/src/kinesis/source.rs @@ -434,8 +434,7 @@ impl KinesisSourceFunc { for record in records { let data = record.data.into_inner(); let timestamp = record.approximate_arrival_timestamp.unwrap(); - - ctx.deserialize_slice(&data, from_nanos(timestamp.as_nanos() as u128)) + ctx.deserialize_slice(&data, from_nanos(timestamp.as_nanos() as u128), None) .await?; if ctx.should_flush() { diff --git a/crates/arroyo-connectors/src/mqtt/mod.rs b/crates/arroyo-connectors/src/mqtt/mod.rs index 47843c420..e0e54d3fd 100644 --- a/crates/arroyo-connectors/src/mqtt/mod.rs +++ b/crates/arroyo-connectors/src/mqtt/mod.rs @@ -8,6 +8,7 @@ use crate::mqtt::sink::MqttSinkFunc; use crate::mqtt::source::MqttSourceFunc; use crate::pull_opt; use anyhow::{anyhow, bail}; +use arrow::datatypes::DataType; use arroyo_formats::ser::ArrowSerializer; use arroyo_operator::connector::{Connection, Connector}; use arroyo_operator::operator::OperatorNode; @@ -157,6 +158,7 @@ impl Connector for MqttConnector { config: MqttConfig, table: MqttTable, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let (typ, desc) = match table.type_ { TableType::Source { .. } => ( @@ -183,6 +185,7 @@ impl Connector for MqttConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { @@ -243,6 +246,7 @@ impl Connector for MqttConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let connection = profile .map(|p| { @@ -254,7 +258,7 @@ impl Connector for MqttConnector { let table = Self::table_from_options(options)?; - Self::from_config(self, None, name, connection, table, schema) + Self::from_config(self, None, name, connection, table, schema, None) } fn make_operator( diff --git a/crates/arroyo-connectors/src/mqtt/source/mod.rs b/crates/arroyo-connectors/src/mqtt/source/mod.rs index d9646d618..485ee5560 100644 --- a/crates/arroyo-connectors/src/mqtt/source/mod.rs +++ b/crates/arroyo-connectors/src/mqtt/source/mod.rs @@ -143,7 +143,7 @@ impl MqttSourceFunc { event = eventloop.poll() => { match event { Ok(MqttEvent::Incoming(Incoming::Publish(p))) => { - ctx.deserialize_slice(&p.payload, SystemTime::now()).await?; + ctx.deserialize_slice(&p.payload, SystemTime::now(), None).await?; rate_limiter.until_ready().await; } Ok(MqttEvent::Outgoing(Outgoing::Subscribe(_))) => { diff --git a/crates/arroyo-connectors/src/nats/mod.rs b/crates/arroyo-connectors/src/nats/mod.rs index da6a89199..095d8fce5 100644 --- a/crates/arroyo-connectors/src/nats/mod.rs +++ b/crates/arroyo-connectors/src/nats/mod.rs @@ -3,6 +3,7 @@ use crate::nats::source::NatsSourceFunc; use crate::pull_opt; use anyhow::anyhow; use anyhow::bail; +use arrow::datatypes::DataType; use arroyo_formats::ser::ArrowSerializer; use arroyo_operator::connector::{Connection, Connector}; use arroyo_operator::operator::OperatorNode; @@ -246,6 +247,7 @@ impl Connector for NatsConnector { config: NatsConfig, table: NatsTable, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let stream_or_subject = match &table.connector_type { ConnectorType::Source { source_type, .. } => { @@ -295,6 +297,7 @@ impl Connector for NatsConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { @@ -314,6 +317,7 @@ impl Connector for NatsConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let connection = profile .map(|p| { @@ -325,7 +329,7 @@ impl Connector for NatsConnector { let table = Self::table_from_options(options)?; - Self::from_config(self, None, name, connection, table, schema) + Self::from_config(self, None, name, connection, table, schema, None) } fn make_operator( diff --git a/crates/arroyo-connectors/src/nats/source/mod.rs b/crates/arroyo-connectors/src/nats/source/mod.rs index a550efc93..9a81aee41 100644 --- a/crates/arroyo-connectors/src/nats/source/mod.rs +++ b/crates/arroyo-connectors/src/nats/source/mod.rs @@ -366,8 +366,7 @@ impl NatsSourceFunc { let payload = msg.payload.as_ref(); let message_info = msg.info().expect("Couldn't get message information"); let timestamp = message_info.published.into() ; - - ctx.deserialize_slice(payload, timestamp).await?; + ctx.deserialize_slice(payload, timestamp, None).await?; debug!("---------------------------------------------->"); debug!( @@ -493,7 +492,7 @@ impl NatsSourceFunc { Some(msg) => { let payload = msg.payload.as_ref(); let timestamp = SystemTime::now(); - ctx.deserialize_slice(payload, timestamp).await?; + ctx.deserialize_slice(payload, timestamp, None).await?; if ctx.should_flush() { ctx.flush_buffer().await?; } diff --git a/crates/arroyo-connectors/src/nexmark/mod.rs b/crates/arroyo-connectors/src/nexmark/mod.rs index 5e3636921..5c110f936 100644 --- a/crates/arroyo-connectors/src/nexmark/mod.rs +++ b/crates/arroyo-connectors/src/nexmark/mod.rs @@ -3,7 +3,7 @@ mod operator; mod test; use anyhow::{anyhow, bail}; -use arrow::datatypes::{Field, Schema, TimeUnit}; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arroyo_operator::connector::{Connection, Connector}; use arroyo_operator::operator::OperatorNode; use arroyo_rpc::api_types::connections::{ @@ -158,6 +158,7 @@ impl Connector for NexmarkConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let event_rate = f64::from_str(&pull_opt("event_rate", options)?) .map_err(|_| anyhow!("invalid value for event_rate; expected float"))?; @@ -183,6 +184,7 @@ impl Connector for NexmarkConnector { runtime, }, None, + None, ) } @@ -193,6 +195,7 @@ impl Connector for NexmarkConnector { config: Self::ProfileT, table: Self::TableT, _: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let description = format!( "{}Nexmark<{} eps>", @@ -211,6 +214,7 @@ impl Connector for NexmarkConnector { format: None, bad_data: None, framing: None, + additional_fields: None, }; Ok(Connection { diff --git a/crates/arroyo-connectors/src/polling_http/mod.rs b/crates/arroyo-connectors/src/polling_http/mod.rs index d620ce4d5..ea8ec421b 100644 --- a/crates/arroyo-connectors/src/polling_http/mod.rs +++ b/crates/arroyo-connectors/src/polling_http/mod.rs @@ -5,6 +5,7 @@ use std::str::FromStr; use std::time::Duration; use anyhow::anyhow; +use arrow::datatypes::DataType; use arroyo_rpc::{var_str::VarStr, OperatorConfig}; use arroyo_types::string_to_map; use reqwest::{Client, Request}; @@ -152,6 +153,7 @@ impl Connector for PollingHTTPConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let endpoint = pull_opt("endpoint", options)?; let headers = options.remove("headers"); @@ -183,6 +185,7 @@ impl Connector for PollingHTTPConnector { emit_behavior, }, schema, + None, ) } @@ -193,6 +196,7 @@ impl Connector for PollingHTTPConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let description = format!("PollingHTTPSource<{}>", table.endpoint); @@ -222,6 +226,7 @@ impl Connector for PollingHTTPConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { diff --git a/crates/arroyo-connectors/src/polling_http/operator.rs b/crates/arroyo-connectors/src/polling_http/operator.rs index c99ebad4f..ec150cebd 100644 --- a/crates/arroyo-connectors/src/polling_http/operator.rs +++ b/crates/arroyo-connectors/src/polling_http/operator.rs @@ -215,7 +215,7 @@ impl PollingHttpSourceFunc { continue; } - ctx.deserialize_slice(&buf, SystemTime::now()).await?; + ctx.deserialize_slice(&buf, SystemTime::now(), None).await?; if ctx.should_flush() { ctx.flush_buffer().await?; diff --git a/crates/arroyo-connectors/src/preview/mod.rs b/crates/arroyo-connectors/src/preview/mod.rs index d69939d6e..72dddd7f0 100644 --- a/crates/arroyo-connectors/src/preview/mod.rs +++ b/crates/arroyo-connectors/src/preview/mod.rs @@ -3,6 +3,7 @@ mod operator; use std::collections::HashMap; use anyhow::{anyhow, bail}; +use arrow::datatypes::DataType; use arroyo_rpc::OperatorConfig; use arroyo_operator::connector::Connection; @@ -71,6 +72,7 @@ impl Connector for PreviewConnector { _: &mut HashMap, _: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { bail!("Preview connector cannot be created in SQL"); } @@ -82,6 +84,7 @@ impl Connector for PreviewConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let description = "PreviewSink".to_string(); @@ -96,6 +99,7 @@ impl Connector for PreviewConnector { format: None, bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { diff --git a/crates/arroyo-connectors/src/redis/mod.rs b/crates/arroyo-connectors/src/redis/mod.rs index 16e8d7145..84323762e 100644 --- a/crates/arroyo-connectors/src/redis/mod.rs +++ b/crates/arroyo-connectors/src/redis/mod.rs @@ -1,6 +1,7 @@ mod operator; use anyhow::{anyhow, bail}; +use arrow::datatypes::DataType; use arroyo_formats::ser::ArrowSerializer; use arroyo_operator::connector::{Connection, Connector}; use arroyo_operator::operator::OperatorNode; @@ -227,6 +228,7 @@ impl Connector for RedisConnector { options: &mut HashMap, s: Option<&ConnectionSchema>, profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let connection_config = match profile { Some(connection_profile) => { @@ -348,6 +350,7 @@ impl Connector for RedisConnector { connector_type: sink, }, s, + None, ) } @@ -358,6 +361,7 @@ impl Connector for RedisConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let schema = schema .map(|s| s.to_owned()) @@ -378,6 +382,7 @@ impl Connector for RedisConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { diff --git a/crates/arroyo-connectors/src/single_file/mod.rs b/crates/arroyo-connectors/src/single_file/mod.rs index 77a4f9bff..543e5ba13 100644 --- a/crates/arroyo-connectors/src/single_file/mod.rs +++ b/crates/arroyo-connectors/src/single_file/mod.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, bail, Result}; +use arrow::datatypes::DataType; use arroyo_formats::ser::ArrowSerializer; use std::collections::HashMap; use typify::import_types; @@ -84,6 +85,7 @@ impl Connector for SingleFileConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let schema = schema .map(|s| s.to_owned()) @@ -103,6 +105,7 @@ impl Connector for SingleFileConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { @@ -122,6 +125,7 @@ impl Connector for SingleFileConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let path = pull_opt("path", options)?; let Ok(table_type) = pull_opt("type", options)?.try_into() else { @@ -146,6 +150,7 @@ impl Connector for SingleFileConnector { wait_for_control, }, schema, + None, ) } diff --git a/crates/arroyo-connectors/src/single_file/source.rs b/crates/arroyo-connectors/src/single_file/source.rs index 125a796b8..3ea8063a5 100644 --- a/crates/arroyo-connectors/src/single_file/source.rs +++ b/crates/arroyo-connectors/src/single_file/source.rs @@ -51,8 +51,7 @@ impl SingleFileSourceFunc { i += 1; continue; } - - ctx.deserialize_slice(s.as_bytes(), SystemTime::now()) + ctx.deserialize_slice(s.as_bytes(), SystemTime::now(), None) .await .unwrap(); if ctx.should_flush() { diff --git a/crates/arroyo-connectors/src/sse/mod.rs b/crates/arroyo-connectors/src/sse/mod.rs index 5da89ebbb..52b9b3be9 100644 --- a/crates/arroyo-connectors/src/sse/mod.rs +++ b/crates/arroyo-connectors/src/sse/mod.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::time::Duration; use anyhow::{anyhow, bail}; +use arrow::datatypes::DataType; use arroyo_rpc::{var_str::VarStr, OperatorConfig}; use arroyo_types::string_to_map; use eventsource_client::Client; @@ -78,6 +79,7 @@ impl Connector for SSEConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let description = format!("SSESource<{}>", table.endpoint); @@ -107,6 +109,7 @@ impl Connector for SSEConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { @@ -126,6 +129,7 @@ impl Connector for SSEConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let endpoint = pull_opt("endpoint", options)?; let headers = options.remove("headers"); @@ -141,6 +145,7 @@ impl Connector for SSEConnector { headers: headers.map(VarStr::new), }, schema, + None, ) } diff --git a/crates/arroyo-connectors/src/sse/operator.rs b/crates/arroyo-connectors/src/sse/operator.rs index e264b8fc8..b78654d8d 100644 --- a/crates/arroyo-connectors/src/sse/operator.rs +++ b/crates/arroyo-connectors/src/sse/operator.rs @@ -172,7 +172,7 @@ impl SSESourceFunc { if events.is_empty() || events.contains(&event.event_type) { ctx.deserialize_slice( - event.data.as_bytes(), SystemTime::now()).await?; + event.data.as_bytes(), SystemTime::now(), None).await?; if ctx.should_flush() { ctx.flush_buffer().await?; diff --git a/crates/arroyo-connectors/src/stdout/mod.rs b/crates/arroyo-connectors/src/stdout/mod.rs index dfb1be021..59db510a3 100644 --- a/crates/arroyo-connectors/src/stdout/mod.rs +++ b/crates/arroyo-connectors/src/stdout/mod.rs @@ -3,6 +3,7 @@ mod operator; use std::collections::HashMap; use anyhow::anyhow; +use arrow::datatypes::DataType; use arroyo_rpc::OperatorConfig; use tokio::io::BufWriter; @@ -76,8 +77,9 @@ impl Connector for StdoutConnector { _options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { - self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema) + self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema, None) } fn from_config( @@ -87,6 +89,7 @@ impl Connector for StdoutConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let description = "StdoutSink".to_string(); @@ -107,6 +110,7 @@ impl Connector for StdoutConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { diff --git a/crates/arroyo-connectors/src/webhook/mod.rs b/crates/arroyo-connectors/src/webhook/mod.rs index 84e8d681c..d5e1a2a29 100644 --- a/crates/arroyo-connectors/src/webhook/mod.rs +++ b/crates/arroyo-connectors/src/webhook/mod.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::time::SystemTime; use anyhow::anyhow; +use arrow::datatypes::DataType; use arroyo_rpc::OperatorConfig; use arroyo_formats::ser::ArrowSerializer; @@ -142,6 +143,7 @@ impl Connector for WebhookConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let description = format!("WebhookSink<{}>", table.endpoint.sub_env_vars()?); @@ -162,6 +164,7 @@ impl Connector for WebhookConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { @@ -181,6 +184,7 @@ impl Connector for WebhookConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let endpoint = pull_opt("endpoint", options)?; @@ -201,7 +205,7 @@ impl Connector for WebhookConnector { )?; let _ = Self::construct_test_request(&client, &table)?; - self.from_config(None, name, EmptyConfig {}, table, schema) + self.from_config(None, name, EmptyConfig {}, table, schema, None) } fn make_operator( diff --git a/crates/arroyo-connectors/src/websocket/mod.rs b/crates/arroyo-connectors/src/websocket/mod.rs index bf6d50660..36fefca1f 100644 --- a/crates/arroyo-connectors/src/websocket/mod.rs +++ b/crates/arroyo-connectors/src/websocket/mod.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use std::time::Duration; use anyhow::anyhow; +use arrow::datatypes::DataType; use arroyo_operator::connector::Connection; use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage, @@ -220,6 +221,7 @@ impl Connector for WebsocketConnector { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + _metadata_fields: Option>, ) -> anyhow::Result { let description = format!("WebsocketSource<{}>", table.endpoint); @@ -249,6 +251,7 @@ impl Connector for WebsocketConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), + additional_fields: None, }; Ok(Connection { @@ -268,6 +271,7 @@ impl Connector for WebsocketConnector { options: &mut HashMap, schema: Option<&ConnectionSchema>, _profile: Option<&ConnectionProfile>, + _metadata_fields: Option>, ) -> anyhow::Result { let endpoint = pull_opt("endpoint", options)?; let headers = options.remove("headers"); @@ -304,6 +308,7 @@ impl Connector for WebsocketConnector { subscription_messages, }, schema, + None, ) } diff --git a/crates/arroyo-connectors/src/websocket/operator.rs b/crates/arroyo-connectors/src/websocket/operator.rs index 74a746de9..c9c41b073 100644 --- a/crates/arroyo-connectors/src/websocket/operator.rs +++ b/crates/arroyo-connectors/src/websocket/operator.rs @@ -122,7 +122,7 @@ impl WebsocketSourceFunc { msg: &[u8], ctx: &mut ArrowContext, ) -> Result<(), UserError> { - ctx.deserialize_slice(msg, SystemTime::now()).await?; + ctx.deserialize_slice(msg, SystemTime::now(), None).await?; if ctx.should_flush() { ctx.flush_buffer().await?; diff --git a/crates/arroyo-formats/src/avro/de.rs b/crates/arroyo-formats/src/avro/de.rs index a771699a9..f371bf70c 100644 --- a/crates/arroyo-formats/src/avro/de.rs +++ b/crates/arroyo-formats/src/avro/de.rs @@ -281,7 +281,7 @@ mod tests { deserializer_with_schema(format.clone(), writer_schema); let errors = deserializer - .deserialize_slice(&mut builders, message, SystemTime::now()) + .deserialize_slice(&mut builders, message, SystemTime::now(), None) .await; assert_eq!(errors, vec![]); diff --git a/crates/arroyo-formats/src/de.rs b/crates/arroyo-formats/src/de.rs index 3b2edba74..0198b1193 100644 --- a/crates/arroyo-formats/src/de.rs +++ b/crates/arroyo-formats/src/de.rs @@ -1,6 +1,7 @@ use crate::avro::de; use crate::proto::schema::get_pool; use crate::{proto, should_flush}; +use arrow::array::{Int32Builder, Int64Builder}; use arrow::compute::kernels; use arrow_array::builder::{ ArrayBuilder, GenericByteBuilder, StringBuilder, TimestampNanosecondBuilder, @@ -20,6 +21,14 @@ use std::sync::Arc; use std::time::{Instant, SystemTime}; use tokio::sync::Mutex; +#[derive(Debug, Clone)] +pub enum FieldValueType<'a> { + Int64(i64), + Int32(i32), + String(&'a String), + // Extend with more types as needed +} + pub struct FramingIterator<'a> { framing: Option>, buf: &'a [u8], @@ -82,6 +91,7 @@ pub struct ArrowDeserializer { schema_registry: Arc>>, proto_pool: DescriptorPool, schema_resolver: Arc, + additional_fields_builder: Option>>, } impl ArrowDeserializer { @@ -158,6 +168,7 @@ impl ArrowDeserializer { proto_pool, buffered_count: 0, buffered_since: Instant::now(), + additional_fields_builder: None, } } @@ -166,11 +177,12 @@ impl ArrowDeserializer { buffer: &mut [Box], msg: &[u8], timestamp: SystemTime, + additional_fields: Option>>, ) -> Vec { match &*self.format { Format::Avro(_) => self.deserialize_slice_avro(buffer, msg, timestamp).await, _ => FramingIterator::new(self.framing.clone(), msg) - .map(|t| self.deserialize_single(buffer, t, timestamp)) + .map(|t| self.deserialize_single(buffer, t, timestamp, additional_fields.clone())) .filter_map(|t| t.err()) .collect(), } @@ -195,6 +207,11 @@ impl ArrowDeserializer { .map(|batch| { let mut columns = batch.columns().to_vec(); columns.insert(self.schema.timestamp_index, Arc::new(timestamp.finish())); + flush_additional_fields_builders( + &mut self.additional_fields_builder, + &self.schema, + &mut columns, + ); RecordBatch::try_new(self.schema.schema.clone(), columns).unwrap() }), ), @@ -214,6 +231,11 @@ impl ArrowDeserializer { kernels::filter::filter(×tamp.finish(), &mask).unwrap(); columns.insert(self.schema.timestamp_index, Arc::new(timestamp)); + flush_additional_fields_builders( + &mut self.additional_fields_builder, + &self.schema, + &mut columns, + ); RecordBatch::try_new(self.schema.schema.clone(), columns).unwrap() }), ), @@ -225,6 +247,7 @@ impl ArrowDeserializer { buffer: &mut [Box], msg: &[u8], timestamp: SystemTime, + additional_fields: Option>, ) -> Result<(), SourceError> { match &*self.format { Format::RawString(_) @@ -233,10 +256,20 @@ impl ArrowDeserializer { }) => { self.deserialize_raw_string(buffer, msg); add_timestamp(buffer, self.schema.timestamp_index, timestamp); + if let Some(fields) = additional_fields { + for (k, v) in fields.iter() { + add_additional_fields(buffer, &self.schema, k, v); + } + } } Format::RawBytes(_) => { self.deserialize_raw_bytes(buffer, msg); add_timestamp(buffer, self.schema.timestamp_index, timestamp); + if let Some(fields) = additional_fields { + for (k, v) in fields.iter() { + add_additional_fields(buffer, &self.schema, k, v); + } + } } Format::Json(json) => { let msg = if json.confluent_schema_registry { @@ -249,10 +282,35 @@ impl ArrowDeserializer { panic!("json decoder not initialized"); }; + if self.additional_fields_builder.is_none() { + if let Some(fields) = additional_fields.as_ref() { + let mut builders = HashMap::new(); + for (key, value) in fields.iter() { + let builder: Box = match value { + FieldValueType::Int32(_) => Box::new(Int32Builder::new()), + FieldValueType::Int64(_) => Box::new(Int64Builder::new()), + FieldValueType::String(_) => Box::new(StringBuilder::new()), + }; + builders.insert(key, builder); + } + self.additional_fields_builder = Some( + builders + .into_iter() + .map(|(k, v)| ((*k).clone(), v)) + .collect(), + ); + } + } + decoder .decode(msg) .map_err(|e| SourceError::bad_data(format!("invalid JSON: {:?}", e)))?; timestamp_builder.append_value(to_nanos(timestamp) as i64); + + add_additional_fields_using_builder( + additional_fields, + &mut self.additional_fields_builder, + ); self.buffered_count += 1; } Format::Protobuf(proto) => { @@ -269,6 +327,12 @@ impl ArrowDeserializer { .decode(json.to_string().as_bytes()) .map_err(|e| SourceError::bad_data(format!("invalid JSON: {:?}", e)))?; timestamp_builder.append_value(to_nanos(timestamp) as i64); + + add_additional_fields_using_builder( + additional_fields, + &mut self.additional_fields_builder, + ); + self.buffered_count += 1; } } @@ -400,9 +464,108 @@ pub(crate) fn add_timestamp( .append_value(to_nanos(timestamp) as i64); } +pub(crate) fn add_additional_fields( + builder: &mut [Box], + schema: &ArroyoSchema, + key: &str, + value: &FieldValueType<'_>, +) { + let (idx, _) = schema + .schema + .column_with_name(key) + .unwrap_or_else(|| panic!("no '{}' column for additional fields", key)); + match value { + FieldValueType::Int32(i) => { + builder[idx] + .as_any_mut() + .downcast_mut::() + .expect("additional field has incorrect type") + .append_value(*i); + } + FieldValueType::Int64(i) => { + builder[idx] + .as_any_mut() + .downcast_mut::() + .expect("additional field has incorrect type") + .append_value(*i); + } + FieldValueType::String(s) => { + builder[idx] + .as_any_mut() + .downcast_mut::() + .expect("additional field has incorrect type") + .append_value(s); + } + } +} + +pub(crate) fn add_additional_fields_using_builder( + additional_fields: Option>>, + additional_fields_builder: &mut Option>>, +) { + if let Some(fields) = additional_fields { + for (k, v) in fields.iter() { + if let Some(builder) = additional_fields_builder + .as_mut() + .and_then(|b| b.get_mut(*k)) + { + match v { + FieldValueType::Int32(i) => { + builder + .as_any_mut() + .downcast_mut::() + .expect("additional field has incorrect type") + .append_value(*i); + } + FieldValueType::Int64(i) => { + builder + .as_any_mut() + .downcast_mut::() + .expect("additional field has incorrect type") + .append_value(*i); + } + FieldValueType::String(s) => { + builder + .as_any_mut() + .downcast_mut::() + .expect("additional field has incorrect type") + .append_value(s); + } + } + } + } + } +} + +pub(crate) fn flush_additional_fields_builders( + additional_fields_builder: &mut Option>>, + schema: &ArroyoSchema, + columns: &mut [Arc], +) { + if let Some(additional_fields) = additional_fields_builder.take() { + for (field_name, mut builder) in additional_fields { + if let Some((idx, _)) = schema.schema.column_with_name(&field_name) { + let expected_type = schema.schema.fields[idx].data_type(); + let built_column = builder.as_mut().finish(); + let actual_type = built_column.data_type(); + if expected_type != actual_type { + panic!( + "Type mismatch for column '{}': expected {:?}, got {:?}", + field_name, expected_type, actual_type + ); + } + columns[idx] = Arc::new(built_column); + } else { + panic!("Field '{}' not found in schema", field_name); + } + } + } +} + #[cfg(test)] mod tests { - use crate::de::{ArrowDeserializer, FramingIterator}; + use crate::de::{ArrowDeserializer, FieldValueType, FramingIterator}; + use arrow::datatypes::Int32Type; use arrow_array::builder::{make_builder, ArrayBuilder}; use arrow_array::cast::AsArray; use arrow_array::types::{GenericBinaryType, Int64Type, TimestampNanosecondType}; @@ -534,7 +697,8 @@ mod tests { .deserialize_slice( &mut arrays[..], json!({ "x": 5 }).to_string().as_bytes(), - now + now, + None, ) .await, vec![] @@ -544,7 +708,8 @@ mod tests { .deserialize_slice( &mut arrays[..], json!({ "x": "hello" }).to_string().as_bytes(), - now + now, + None, ) .await, vec![] @@ -570,7 +735,8 @@ mod tests { .deserialize_slice( &mut arrays[..], json!({ "x": 5 }).to_string().as_bytes(), - SystemTime::now() + SystemTime::now(), + None, ) .await, vec![] @@ -580,7 +746,8 @@ mod tests { .deserialize_slice( &mut arrays[..], json!({ "x": "hello" }).to_string().as_bytes(), - SystemTime::now() + SystemTime::now(), + None, ) .await, vec![] @@ -619,7 +786,7 @@ mod tests { let time = SystemTime::now(); let result = deserializer - .deserialize_slice(&mut arrays, &[0, 1, 2, 3, 4, 5], time) + .deserialize_slice(&mut arrays, &[0, 1, 2, 3, 4, 5], time, None) .await; assert!(result.is_empty()); @@ -640,4 +807,70 @@ mod tests { to_nanos(time) as i64 ); } + + #[tokio::test] + async fn test_additional_fields_deserialisation() { + let schema = Arc::new(Schema::new(vec![ + arrow_schema::Field::new("x", arrow_schema::DataType::Int64, true), + arrow_schema::Field::new("y", arrow_schema::DataType::Int32, true), + arrow_schema::Field::new("z", arrow_schema::DataType::Utf8, true), + arrow_schema::Field::new( + "_timestamp", + arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + ])); + + let mut arrays: Vec<_> = schema + .fields + .iter() + .map(|f| make_builder(f.data_type(), 16)) + .collect(); + + let arroyo_schema = ArroyoSchema::from_schema_unkeyed(schema.clone()).unwrap(); + + let mut deserializer = ArrowDeserializer::new( + Format::Json(JsonFormat { + confluent_schema_registry: false, + schema_id: None, + include_schema: false, + debezium: false, + unstructured: false, + timestamp_format: Default::default(), + }), + arroyo_schema, + None, + BadData::Drop {}, + ); + + let time = SystemTime::now(); + let mut additional_fields = std::collections::HashMap::new(); + let binding = "y".to_string(); + additional_fields.insert(&binding, FieldValueType::Int32(5)); + let z_value = "hello".to_string(); + let binding = "z".to_string(); + additional_fields.insert(&binding, FieldValueType::String(&z_value)); + + let result = deserializer + .deserialize_slice( + &mut arrays, + json!({ "x": 5 }).to_string().as_bytes(), + time, + Some(additional_fields), + ) + .await; + assert!(result.is_empty()); + + let batch = deserializer.flush_buffer().unwrap().unwrap(); + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.columns()[0].as_primitive::().value(0), 5); + assert_eq!(batch.columns()[1].as_primitive::().value(0), 5); + assert_eq!(batch.columns()[2].as_string::().value(0), "hello"); + assert_eq!( + batch.columns()[3] + .as_primitive::() + .value(0), + to_nanos(time) as i64 + ); + } } diff --git a/crates/arroyo-operator/src/connector.rs b/crates/arroyo-operator/src/connector.rs index 5b8f82b08..8606de728 100644 --- a/crates/arroyo-operator/src/connector.rs +++ b/crates/arroyo-operator/src/connector.rs @@ -1,5 +1,6 @@ use crate::operator::OperatorNode; use anyhow::anyhow; +use arrow::datatypes::DataType; use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage, }; @@ -89,6 +90,7 @@ pub trait Connector: Send { options: &mut HashMap, schema: Option<&ConnectionSchema>, profile: Option<&ConnectionProfile>, + metadata_fields: Option>, ) -> anyhow::Result; fn from_config( @@ -98,6 +100,7 @@ pub trait Connector: Send { config: Self::ProfileT, table: Self::TableT, schema: Option<&ConnectionSchema>, + metadata_fields: Option>, ) -> anyhow::Result; #[allow(unused)] @@ -162,6 +165,7 @@ pub trait ErasedConnector: Send { options: &mut HashMap, schema: Option<&ConnectionSchema>, profile: Option<&ConnectionProfile>, + metadata_fields: Option>, ) -> anyhow::Result; fn from_config( @@ -171,6 +175,7 @@ pub trait ErasedConnector: Send { config: &serde_json::Value, table: &serde_json::Value, schema: Option<&ConnectionSchema>, + metadata_fields: Option>, ) -> anyhow::Result; fn make_operator(&self, config: OperatorConfig) -> anyhow::Result; @@ -256,8 +261,9 @@ impl ErasedConnector for C { options: &mut HashMap, schema: Option<&ConnectionSchema>, profile: Option<&ConnectionProfile>, + metadata_fields: Option>, ) -> anyhow::Result { - self.from_options(name, options, schema, profile) + self.from_options(name, options, schema, profile, metadata_fields) } fn from_config( @@ -267,6 +273,7 @@ impl ErasedConnector for C { config: &serde_json::Value, table: &serde_json::Value, schema: Option<&ConnectionSchema>, + metadata_fields: Option>, ) -> anyhow::Result { self.from_config( id, @@ -274,6 +281,7 @@ impl ErasedConnector for C { self.parse_config(config)?, self.parse_table(table)?, schema, + metadata_fields, ) } diff --git a/crates/arroyo-operator/src/context.rs b/crates/arroyo-operator/src/context.rs index d0009a230..cf0534e32 100644 --- a/crates/arroyo-operator/src/context.rs +++ b/crates/arroyo-operator/src/context.rs @@ -2,7 +2,7 @@ use crate::{server_for_hash_array, RateLimiter}; use arrow::array::{make_builder, Array, ArrayBuilder, PrimitiveArray, RecordBatch}; use arrow::compute::{partition, sort_to_indices, take}; use arrow::datatypes::{SchemaRef, UInt64Type}; -use arroyo_formats::de::ArrowDeserializer; +use arroyo_formats::de::{ArrowDeserializer, FieldValueType}; use arroyo_formats::should_flush; use arroyo_metrics::{register_queue_gauge, QueueGauges, TaskCounters}; use arroyo_rpc::config::config; @@ -670,6 +670,7 @@ impl ArrowContext { &mut self, msg: &[u8], time: SystemTime, + additional_fields: Option>>, ) -> Result<(), UserError> { let deserializer = self .deserializer @@ -688,6 +689,7 @@ impl ArrowContext { &mut self.buffer.as_mut().expect("no out schema").buffer, msg, time, + additional_fields, ) .await; self.collect_source_errors(errors).await?; diff --git a/crates/arroyo-planner/src/lib.rs b/crates/arroyo-planner/src/lib.rs index 654a2875c..45f6dcc45 100644 --- a/crates/arroyo-planner/src/lib.rs +++ b/crates/arroyo-planner/src/lib.rs @@ -195,7 +195,18 @@ impl ArroyoSchemaProvider { ) }), ); - + // Registering kafka connector metadata function + functions.insert( + "metadata".to_string(), + Arc::new(create_udf( + "metadata", + vec![DataType::Utf8], + Arc::new(DataType::Utf8), + Volatility::Volatile, + #[allow(deprecated)] + make_scalar_function(fn_impl), + )), + ); let mut registry = Self { functions, ..Default::default() diff --git a/crates/arroyo-planner/src/tables.rs b/crates/arroyo-planner/src/tables.rs index af8a8c0f3..b13bcb14a 100644 --- a/crates/arroyo-planner/src/tables.rs +++ b/crates/arroyo-planner/src/tables.rs @@ -4,6 +4,7 @@ use std::{collections::HashMap, time::Duration}; use arrow_schema::{DataType, Field, FieldRef, Schema}; use arroyo_connectors::connector_for_type; +use datafusion::logical_expr::expr::ScalarFunction; use crate::extension::remote_table::RemoteTableExtension; use crate::types::convert_data_type; @@ -52,7 +53,7 @@ use datafusion::optimizer::unwrap_cast_in_comparison::UnwrapCastInComparison; use datafusion::optimizer::OptimizerRule; use datafusion::sql::planner::PlannerContext; use datafusion::sql::sqlparser; -use datafusion::sql::sqlparser::ast::Query; +use datafusion::sql::sqlparser::ast::{FunctionArg, FunctionArguments, Query}; use datafusion::{ optimizer::{optimizer::Optimizer, OptimizerContext}, sql::{ @@ -98,6 +99,19 @@ impl FieldSpec { FieldSpec::VirtualField { field, .. } => field, } } + fn is_metadata_virtual(&self) -> bool { + match self { + FieldSpec::VirtualField { + expression: Expr::ScalarFunction(ScalarFunction { func, args, .. }), + .. + } => { + func.name() == "metadata" + && args.len() == 1 + && matches!(args.first(), Some(Expr::Literal(_))) + } + _ => false, + } + } } impl From for FieldSpec { @@ -210,6 +224,7 @@ impl ConnectorTable { primary_keys: Vec, options: &mut HashMap, connection_profile: Option<&ConnectionProfile>, + connector_metadata_columns: Option>, ) -> Result { // TODO: a more principled way of letting connectors dictate types to use if "delta" == connector { @@ -258,7 +273,7 @@ impl ConnectorTable { let schema_fields: Vec = input_to_schema_fields .iter() - .filter(|f| !f.is_virtual()) + .filter(|f| f.is_metadata_virtual() || !f.is_virtual()) .map(|f| { let struct_field = f.field(); struct_field.clone().try_into().map_err(|_| { @@ -285,7 +300,13 @@ impl ConnectorTable { .map_err(|e| DataFusionError::Plan(format!("could not create connection schema: {}", e)))?; let connection = connector - .from_options(name, options, Some(&schema), connection_profile) + .from_options( + name, + options, + Some(&schema), + connection_profile, + connector_metadata_columns, + ) .map_err(|e| DataFusionError::Plan(e.to_string()))?; let mut table: ConnectorTable = connection.into(); @@ -495,6 +516,7 @@ impl Table { fn schema_from_columns( columns: &[ColumnDef], schema_provider: &ArroyoSchemaProvider, + connector_metadata_columns: &mut HashMap, ) -> Result> { let struct_field_pairs = columns .iter() @@ -505,10 +527,9 @@ impl Table { .options .iter() .any(|option| matches!(option.option, ColumnOption::NotNull)); - let struct_field = ArroyoExtensionType::add_metadata( extension, - Field::new(name, data_type, nullable), + Field::new(name, data_type.clone(), nullable), ); let generating_expression = column.options.iter().find_map(|option| { @@ -516,6 +537,35 @@ impl Table { generation_expr, .. } = &option.option { + if let Some(sqlparser::ast::Expr::Function(sqlparser::ast::Function { + name, + args, + .. + })) = generation_expr + { + if name + .0 + .iter() + .any(|ident| ident.value.to_lowercase() == "metadata") + { + if let FunctionArguments::List(arg_list) = args { + if let Some(FunctionArg::Unnamed( + sqlparser::ast::FunctionArgExpr::Expr( + sqlparser::ast::Expr::Value( + sqlparser::ast::Value::SingleQuotedString(value), + ), + ), + )) = arg_list.args.first() + { + connector_metadata_columns.insert( + column.name.value.to_string(), + (value.to_string(), data_type.clone()), + ); + return None; + } + } + } + } generation_expr.clone() } else { None @@ -596,7 +646,12 @@ impl Table { } let connector = with_map.remove("connector"); - let fields = Self::schema_from_columns(columns, schema_provider)?; + let mut connector_metadata_columns = Some(HashMap::new()); + let fields = Self::schema_from_columns( + columns, + schema_provider, + connector_metadata_columns.as_mut().unwrap(), + )?; let primary_keys = columns .iter() @@ -660,6 +715,7 @@ impl Table { primary_keys, &mut with_map, connection_profile, + connector_metadata_columns, ) .map_err(|e| e.context(format!("Failed to create table {}", name)))?, ))) diff --git a/crates/arroyo-planner/src/test/mod.rs b/crates/arroyo-planner/src/test/mod.rs index d2ec51f9d..111a59f00 100644 --- a/crates/arroyo-planner/src/test/mod.rs +++ b/crates/arroyo-planner/src/test/mod.rs @@ -24,6 +24,7 @@ fn get_test_schema_provider() -> ArroyoSchemaProvider { runtime: Some(10.0 * 1_000_000.0), }, None, + None, ) .unwrap(); diff --git a/crates/arroyo-planner/src/test/queries/kafka_metadata_udf.sql b/crates/arroyo-planner/src/test/queries/kafka_metadata_udf.sql new file mode 100644 index 000000000..c88c503f9 --- /dev/null +++ b/crates/arroyo-planner/src/test/queries/kafka_metadata_udf.sql @@ -0,0 +1,15 @@ +create table users ( + id TEXT, + name TEXT, + offset BIGINT GENERATED ALWAYS AS (metadata('offset_id')) STORED, + topic TEXT GENERATED ALWAYS AS (metadata('topic')) STORED, + partition INT GENERATED ALWAYS AS (metadata('partition')) STORED +) with ( + connector = 'kafka', + topic = 'order_topic', + format='json', + bootstrap_servers = '0.0.0.0:9092', + type='source' +); + +SELECT * FROM users; \ No newline at end of file diff --git a/crates/arroyo-rpc/src/lib.rs b/crates/arroyo-rpc/src/lib.rs index a6013f4e0..21058b0c1 100644 --- a/crates/arroyo-rpc/src/lib.rs +++ b/crates/arroyo-rpc/src/lib.rs @@ -191,6 +191,7 @@ pub struct OperatorConfig { pub bad_data: Option, pub framing: Option, pub rate_limit: Option, + pub additional_fields: Option>, } impl Default for OperatorConfig { @@ -202,6 +203,7 @@ impl Default for OperatorConfig { bad_data: None, framing: None, rate_limit: None, + additional_fields: None, } } }