Skip to content

Commit

Permalink
refactor internal column name to constant
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Nov 26, 2024
1 parent 137feb3 commit 58313aa
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 71 deletions.
4 changes: 4 additions & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
pub mod error;

pub use error::*;

pub const INTERNAL_METADATA_COLUMN: &str = "_streaming_internal_metadata";

// use denormalized::common::INTERNAL_METADATA_COLUMN;
3 changes: 2 additions & 1 deletion crates/core/src/datasource/kafka/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use apache_avro::Schema as AvroSchema;
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};

use datafusion::logical_expr::SortExpr;
use denormalized_common::INTERNAL_METADATA_COLUMN;

use crate::formats::decoders::avro::AvroDecoder;
use crate::formats::decoders::json::JsonDecoder;
Expand Down Expand Up @@ -203,7 +204,7 @@ impl KafkaTopicBuilder {
fields.insert(
fields.len(),
Arc::new(Field::new(
String::from("_streaming_internal_metadata"),
String::from(INTERNAL_METADATA_COLUMN),
DataType::Struct(Fields::from(struct_fields)),
true,
)),
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::state_backend::slatedb::get_global_slatedb;
use denormalized_orchestrator::orchestrator::Orchestrator;

use denormalized_common::error::Result;
use denormalized_common::INTERNAL_METADATA_COLUMN;

/// The primary interface for building a streaming job
///
Expand Down Expand Up @@ -203,7 +204,7 @@ impl DataStream {
let qualified_fields = schema
.iter()
.map(|(qualifier, field)| (qualifier.cloned(), field.clone()))
.filter(|(_qualifier, field)| *field.name() != "_streaming_internal_metadata")
.filter(|(_qualifier, field)| *field.name() != INTERNAL_METADATA_COLUMN)
.collect::<Vec<_>>();

DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone()).unwrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion::{
},
};

use denormalized_common::INTERNAL_METADATA_COLUMN;
use denormalized_orchestrator::{
channel_manager::take_receiver, orchestrator::OrchestrationMessage,
};
Expand Down Expand Up @@ -323,35 +324,33 @@ impl GroupedWindowAggStream {

#[inline]
fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
let result: std::prelude::v1::Result<RecordBatch, DataFusionError> = match self
.input
.poll_next_unpin(cx)
{
Poll::Ready(rdy) => match rdy {
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
let watermark: RecordBatchWatermark =
RecordBatchWatermark::try_from(&batch, "_streaming_internal_metadata")?;
let ranges = get_windows_for_watermark(&watermark, self.window_type);
let _ = self.ensure_window_frames_for_ranges(&ranges);
for range in ranges {
let frame = self.window_frames.get_mut(&range.0).unwrap();
let _ = frame.push(&batch);
let result: std::prelude::v1::Result<RecordBatch, DataFusionError> =
match self.input.poll_next_unpin(cx) {
Poll::Ready(rdy) => match rdy {
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
let watermark: RecordBatchWatermark =
RecordBatchWatermark::try_from(&batch, INTERNAL_METADATA_COLUMN)?;
let ranges = get_windows_for_watermark(&watermark, self.window_type);
let _ = self.ensure_window_frames_for_ranges(&ranges);
for range in ranges {
let frame = self.window_frames.get_mut(&range.0).unwrap();
let _ = frame.push(&batch);
}
self.process_watermark(watermark);

self.trigger_windows()
} else {
Ok(RecordBatch::new_empty(self.output_schema_with_window()))
}
self.process_watermark(watermark);

self.trigger_windows()
} else {
Ok(RecordBatch::new_empty(self.output_schema_with_window()))
}
Some(Err(e)) => Err(e),
None => Ok(RecordBatch::new_empty(self.output_schema_with_window())),
},
Poll::Pending => {
return Poll::Pending;
}
Some(Err(e)) => Err(e),
None => Ok(RecordBatch::new_empty(self.output_schema_with_window())),
},
Poll::Pending => {
return Poll::Pending;
}
};
};

let mut checkpoint_batch = false;

Expand Down Expand Up @@ -547,9 +546,7 @@ impl GroupedAggWindowFrame {
}

pub fn push(&mut self, batch: &RecordBatch) -> Result<(), DataFusionError> {
let metadata = batch
.column_by_name("_streaming_internal_metadata")
.unwrap();
let metadata = batch.column_by_name(INTERNAL_METADATA_COLUMN).unwrap();
let metadata_struct = metadata.as_any().downcast_ref::<StructArray>().unwrap();

let ts_column = metadata_struct
Expand Down Expand Up @@ -582,7 +579,7 @@ impl GroupedAggWindowFrame {
.as_millis() as i64;

let metadata = filtered_batch
.column_by_name("_streaming_internal_metadata")
.column_by_name(INTERNAL_METADATA_COLUMN)
.unwrap();
let metadata_struct = metadata.as_any().downcast_ref::<StructArray>().unwrap();

Expand Down
72 changes: 34 additions & 38 deletions crates/core/src/physical_plan/continuous/streaming_window.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
use arrow::{
compute::{concat_batches, filter_record_batch, max},
datatypes::TimestampMillisecondType,
};
use arrow_array::{Array, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray};
use arrow_ord::cmp;
use arrow_schema::{Field, Schema, SchemaRef};
use std::{
any::Any,
borrow::Cow,
Expand All @@ -8,14 +15,6 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use arrow::{
compute::{concat_batches, filter_record_batch, max},
datatypes::TimestampMillisecondType,
};
use arrow_array::{Array, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray};
use arrow_ord::cmp;
use arrow_schema::{Field, Schema, SchemaRef};

use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
use datafusion::physical_expr::{
Expand All @@ -38,6 +37,7 @@ use datafusion::{
common::{internal_err, stats::Precision, DataFusionError, Statistics},
physical_plan::Distribution,
};
use denormalized_common::INTERNAL_METADATA_COLUMN;
use denormalized_orchestrator::{
channel_manager::{create_channel, get_sender},
orchestrator::OrchestrationMessage,
Expand Down Expand Up @@ -116,9 +116,7 @@ impl PartialWindowAggFrame {
}

pub fn push(&mut self, batch: &RecordBatch) -> Result<(), DataFusionError> {
let metadata = batch
.column_by_name("_streaming_internal_metadata")
.unwrap();
let metadata = batch.column_by_name(INTERNAL_METADATA_COLUMN).unwrap();
let metadata_struct = metadata.as_any().downcast_ref::<StructArray>().unwrap();

let ts_column = metadata_struct
Expand Down Expand Up @@ -151,7 +149,7 @@ impl PartialWindowAggFrame {
.as_millis() as i64;

let metadata = filtered_batch
.column_by_name("_streaming_internal_metadata")
.column_by_name(INTERNAL_METADATA_COLUMN)
.unwrap();
let metadata_struct = metadata.as_any().downcast_ref::<StructArray>().unwrap();

Expand Down Expand Up @@ -778,35 +776,33 @@ impl WindowAggStream {

#[inline]
fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
let result: std::prelude::v1::Result<RecordBatch, DataFusionError> = match self
.input
.poll_next_unpin(cx)
{
Poll::Ready(rdy) => match rdy {
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
let watermark: RecordBatchWatermark =
RecordBatchWatermark::try_from(&batch, "_streaming_internal_metadata")?;
let ranges = get_windows_for_watermark(&watermark, self.window_type);
let _ = self.ensure_window_frames_for_ranges(&ranges);
for range in ranges {
let frame = self.window_frames.get_mut(&range.0).unwrap();
let _ = frame.push(&batch);
}
self.process_watermark(watermark);
let result: std::prelude::v1::Result<RecordBatch, DataFusionError> =
match self.input.poll_next_unpin(cx) {
Poll::Ready(rdy) => match rdy {
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
let watermark: RecordBatchWatermark =
RecordBatchWatermark::try_from(&batch, INTERNAL_METADATA_COLUMN)?;
let ranges = get_windows_for_watermark(&watermark, self.window_type);
let _ = self.ensure_window_frames_for_ranges(&ranges);
for range in ranges {
let frame = self.window_frames.get_mut(&range.0).unwrap();
let _ = frame.push(&batch);
}
self.process_watermark(watermark);

self.trigger_windows()
} else {
Ok(RecordBatch::new_empty(self.output_schema_with_window()))
self.trigger_windows()
} else {
Ok(RecordBatch::new_empty(self.output_schema_with_window()))
}
}
Some(Err(e)) => Err(e),
None => Ok(RecordBatch::new_empty(self.output_schema_with_window())),
},
Poll::Pending => {
return Poll::Pending;
}
Some(Err(e)) => Err(e),
None => Ok(RecordBatch::new_empty(self.output_schema_with_window())),
},
Poll::Pending => {
return Poll::Pending;
}
};
};
Poll::Ready(Some(result))
}
}
Expand Down
3 changes: 2 additions & 1 deletion py-denormalized/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion_python::expr::{join::PyJoinType, PyExpr};
use tokio::task::JoinHandle;

use denormalized::common::INTERNAL_METADATA_COLUMN;
use denormalized::datastream::DataStream;

use crate::errors::{py_denormalized_err, DenormalizedError, Result};
Expand Down Expand Up @@ -242,7 +243,7 @@ impl PyDataStream {
Ok(Some(batch)) => {
Python::with_gil(|py| -> PyResult<()> {
let mut batch = batch.clone();
if let Ok(col_idx) = batch.schema_ref().index_of("_streaming_internal_metadata") {
if let Ok(col_idx) = batch.schema_ref().index_of(INTERNAL_METADATA_COLUMN) {
batch.remove_column(col_idx);
}

Expand Down

0 comments on commit 58313aa

Please sign in to comment.