diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 5df1826..b17602c 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,3 +1,7 @@ pub mod error; pub use error::*; + +pub const INTERNAL_METADATA_COLUMN: &str = "_streaming_internal_metadata"; + +// use denormalized::common::INTERNAL_METADATA_COLUMN; diff --git a/crates/core/src/datasource/kafka/kafka_config.rs b/crates/core/src/datasource/kafka/kafka_config.rs index 84b8dbf..5d22ff3 100644 --- a/crates/core/src/datasource/kafka/kafka_config.rs +++ b/crates/core/src/datasource/kafka/kafka_config.rs @@ -6,6 +6,7 @@ use apache_avro::Schema as AvroSchema; use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; use datafusion::logical_expr::SortExpr; +use denormalized_common::INTERNAL_METADATA_COLUMN; use crate::formats::decoders::avro::AvroDecoder; use crate::formats::decoders::json::JsonDecoder; @@ -203,7 +204,7 @@ impl KafkaTopicBuilder { fields.insert( fields.len(), Arc::new(Field::new( - String::from("_streaming_internal_metadata"), + String::from(INTERNAL_METADATA_COLUMN), DataType::Struct(Fields::from(struct_fields)), true, )), diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index f251724..6b22f72 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -26,6 +26,7 @@ use crate::state_backend::slatedb::get_global_slatedb; use denormalized_orchestrator::orchestrator::Orchestrator; use denormalized_common::error::Result; +use denormalized_common::INTERNAL_METADATA_COLUMN; /// The primary interface for building a streaming job /// @@ -203,7 +204,7 @@ impl DataStream { let qualified_fields = schema .iter() .map(|(qualifier, field)| (qualifier.cloned(), field.clone())) - .filter(|(_qualifier, field)| *field.name() != "_streaming_internal_metadata") + .filter(|(_qualifier, field)| *field.name() != INTERNAL_METADATA_COLUMN) .collect::>(); DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone()).unwrap() diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index 0388a23..536822b 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -36,6 +36,7 @@ use datafusion::{ }, }; +use denormalized_common::INTERNAL_METADATA_COLUMN; use denormalized_orchestrator::{ channel_manager::take_receiver, orchestrator::OrchestrationMessage, }; @@ -323,35 +324,33 @@ impl GroupedWindowAggStream { #[inline] fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { - let result: std::prelude::v1::Result = match self - .input - .poll_next_unpin(cx) - { - Poll::Ready(rdy) => match rdy { - Some(Ok(batch)) => { - if batch.num_rows() > 0 { - let watermark: RecordBatchWatermark = - RecordBatchWatermark::try_from(&batch, "_streaming_internal_metadata")?; - let ranges = get_windows_for_watermark(&watermark, self.window_type); - let _ = self.ensure_window_frames_for_ranges(&ranges); - for range in ranges { - let frame = self.window_frames.get_mut(&range.0).unwrap(); - let _ = frame.push(&batch); + let result: std::prelude::v1::Result = + match self.input.poll_next_unpin(cx) { + Poll::Ready(rdy) => match rdy { + Some(Ok(batch)) => { + if batch.num_rows() > 0 { + let watermark: RecordBatchWatermark = + RecordBatchWatermark::try_from(&batch, INTERNAL_METADATA_COLUMN)?; + let ranges = get_windows_for_watermark(&watermark, self.window_type); + let _ = self.ensure_window_frames_for_ranges(&ranges); + for range in ranges { + let frame = self.window_frames.get_mut(&range.0).unwrap(); + let _ = frame.push(&batch); + } + self.process_watermark(watermark); + + self.trigger_windows() + } else { + Ok(RecordBatch::new_empty(self.output_schema_with_window())) } - self.process_watermark(watermark); - - self.trigger_windows() - } else { - Ok(RecordBatch::new_empty(self.output_schema_with_window())) } + Some(Err(e)) => Err(e), + None => Ok(RecordBatch::new_empty(self.output_schema_with_window())), + }, + Poll::Pending => { + return Poll::Pending; } - Some(Err(e)) => Err(e), - None => Ok(RecordBatch::new_empty(self.output_schema_with_window())), - }, - Poll::Pending => { - return Poll::Pending; - } - }; + }; let mut checkpoint_batch = false; @@ -547,9 +546,7 @@ impl GroupedAggWindowFrame { } pub fn push(&mut self, batch: &RecordBatch) -> Result<(), DataFusionError> { - let metadata = batch - .column_by_name("_streaming_internal_metadata") - .unwrap(); + let metadata = batch.column_by_name(INTERNAL_METADATA_COLUMN).unwrap(); let metadata_struct = metadata.as_any().downcast_ref::().unwrap(); let ts_column = metadata_struct @@ -582,7 +579,7 @@ impl GroupedAggWindowFrame { .as_millis() as i64; let metadata = filtered_batch - .column_by_name("_streaming_internal_metadata") + .column_by_name(INTERNAL_METADATA_COLUMN) .unwrap(); let metadata_struct = metadata.as_any().downcast_ref::().unwrap(); diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index 49d8e27..6551f05 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -1,3 +1,10 @@ +use arrow::{ + compute::{concat_batches, filter_record_batch, max}, + datatypes::TimestampMillisecondType, +}; +use arrow_array::{Array, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray}; +use arrow_ord::cmp; +use arrow_schema::{Field, Schema, SchemaRef}; use std::{ any::Any, borrow::Cow, @@ -8,14 +15,6 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use arrow::{ - compute::{concat_batches, filter_record_batch, max}, - datatypes::TimestampMillisecondType, -}; -use arrow_array::{Array, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray}; -use arrow_ord::cmp; -use arrow_schema::{Field, Schema, SchemaRef}; - use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::aggregate::AggregateFunctionExpr; use datafusion::physical_expr::{ @@ -38,6 +37,7 @@ use datafusion::{ common::{internal_err, stats::Precision, DataFusionError, Statistics}, physical_plan::Distribution, }; +use denormalized_common::INTERNAL_METADATA_COLUMN; use denormalized_orchestrator::{ channel_manager::{create_channel, get_sender}, orchestrator::OrchestrationMessage, @@ -116,9 +116,7 @@ impl PartialWindowAggFrame { } pub fn push(&mut self, batch: &RecordBatch) -> Result<(), DataFusionError> { - let metadata = batch - .column_by_name("_streaming_internal_metadata") - .unwrap(); + let metadata = batch.column_by_name(INTERNAL_METADATA_COLUMN).unwrap(); let metadata_struct = metadata.as_any().downcast_ref::().unwrap(); let ts_column = metadata_struct @@ -151,7 +149,7 @@ impl PartialWindowAggFrame { .as_millis() as i64; let metadata = filtered_batch - .column_by_name("_streaming_internal_metadata") + .column_by_name(INTERNAL_METADATA_COLUMN) .unwrap(); let metadata_struct = metadata.as_any().downcast_ref::().unwrap(); @@ -778,35 +776,33 @@ impl WindowAggStream { #[inline] fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { - let result: std::prelude::v1::Result = match self - .input - .poll_next_unpin(cx) - { - Poll::Ready(rdy) => match rdy { - Some(Ok(batch)) => { - if batch.num_rows() > 0 { - let watermark: RecordBatchWatermark = - RecordBatchWatermark::try_from(&batch, "_streaming_internal_metadata")?; - let ranges = get_windows_for_watermark(&watermark, self.window_type); - let _ = self.ensure_window_frames_for_ranges(&ranges); - for range in ranges { - let frame = self.window_frames.get_mut(&range.0).unwrap(); - let _ = frame.push(&batch); - } - self.process_watermark(watermark); + let result: std::prelude::v1::Result = + match self.input.poll_next_unpin(cx) { + Poll::Ready(rdy) => match rdy { + Some(Ok(batch)) => { + if batch.num_rows() > 0 { + let watermark: RecordBatchWatermark = + RecordBatchWatermark::try_from(&batch, INTERNAL_METADATA_COLUMN)?; + let ranges = get_windows_for_watermark(&watermark, self.window_type); + let _ = self.ensure_window_frames_for_ranges(&ranges); + for range in ranges { + let frame = self.window_frames.get_mut(&range.0).unwrap(); + let _ = frame.push(&batch); + } + self.process_watermark(watermark); - self.trigger_windows() - } else { - Ok(RecordBatch::new_empty(self.output_schema_with_window())) + self.trigger_windows() + } else { + Ok(RecordBatch::new_empty(self.output_schema_with_window())) + } } + Some(Err(e)) => Err(e), + None => Ok(RecordBatch::new_empty(self.output_schema_with_window())), + }, + Poll::Pending => { + return Poll::Pending; } - Some(Err(e)) => Err(e), - None => Ok(RecordBatch::new_empty(self.output_schema_with_window())), - }, - Poll::Pending => { - return Poll::Pending; - } - }; + }; Poll::Ready(Some(result)) } } diff --git a/py-denormalized/src/datastream.rs b/py-denormalized/src/datastream.rs index 5551529..ae43988 100644 --- a/py-denormalized/src/datastream.rs +++ b/py-denormalized/src/datastream.rs @@ -13,6 +13,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion_python::expr::{join::PyJoinType, PyExpr}; use tokio::task::JoinHandle; +use denormalized::common::INTERNAL_METADATA_COLUMN; use denormalized::datastream::DataStream; use crate::errors::{py_denormalized_err, DenormalizedError, Result}; @@ -242,7 +243,7 @@ impl PyDataStream { Ok(Some(batch)) => { Python::with_gil(|py| -> PyResult<()> { let mut batch = batch.clone(); - if let Ok(col_idx) = batch.schema_ref().index_of("_streaming_internal_metadata") { + if let Ok(col_idx) = batch.schema_ref().index_of(INTERNAL_METADATA_COLUMN) { batch.remove_column(col_idx); }