Skip to content

Commit

Permalink
Splitting base case aggregation to multi threading (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
ameyc authored Aug 30, 2024
1 parent 09eed58 commit 8743b28
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ impl PhysicalOptimizerRule for CoaslesceBeforeStreamingAggregate {
};

let coalesce_exec = if streaming_aggr_exec.group_by.is_empty() {
Arc::new(RepartitionExec::try_new(
input_exec.clone(),
Partitioning::RoundRobinBatch(1),
)?)
return Ok(Transformed::no(original));
} else {
Arc::new(RepartitionExec::try_new(
input_exec.clone(),
Expand All @@ -80,6 +77,7 @@ impl PhysicalOptimizerRule for CoaslesceBeforeStreamingAggregate {
coalesce_exec.clone(),
input.schema(),
streaming_aggr_exec.window_type,
streaming_aggr_exec.upstream_partitioning,
)?,
)))
} else {
Expand Down
286 changes: 271 additions & 15 deletions crates/core/src/physical_plan/continuous/streaming_window.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
alloc::System,
any::Any,
borrow::Cow,
collections::BTreeMap,
Expand All @@ -9,14 +10,13 @@ use std::{
};

use arrow::{
compute::{concat_batches, filter_record_batch},
compute::{concat_batches, filter_record_batch, max},
datatypes::TimestampMillisecondType,
};
use arrow_array::{Array, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray};
use arrow_ord::cmp;
use arrow_schema::{Field, Schema, SchemaRef};

use datafusion::common::{internal_err, stats::Precision, DataFusionError, Statistics};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::{
equivalence::{collapse_lex_req, ProjectionMapping},
Expand All @@ -33,17 +33,26 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
InputOrderMode, PlanProperties,
};
use datafusion::{
common::{internal_err, stats::Precision, DataFusionError, Statistics},
physical_plan::Distribution,
};
use futures::{Stream, StreamExt};
use tracing::debug;

use crate::physical_plan::{
continuous::grouped_window_agg_stream::GroupedWindowAggStream,
utils::{
accumulators::{create_accumulators, AccumulatorItem},
time::RecordBatchWatermark,
use crate::{
accumulators,
physical_plan::{
continuous::grouped_window_agg_stream::GroupedWindowAggStream,
utils::{
accumulators::{create_accumulators, AccumulatorItem},
time::{system_time_from_epoch, RecordBatchWatermark},
},
},
};

type WatermarkMutex = Arc<Mutex<Option<SystemTime>>>;

pub struct FranzWindowFrame {
pub window_start_time: SystemTime,
window_end_time: SystemTime,
Expand Down Expand Up @@ -202,6 +211,7 @@ pub struct FranzStreamingWindowExec {
cache: PlanProperties,
pub mode: AggregateMode,
pub window_type: FranzStreamingWindowType,
pub upstream_partitioning: Option<usize>,
}

impl FranzStreamingWindowExec {
Expand All @@ -215,6 +225,7 @@ impl FranzStreamingWindowExec {
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
window_type: FranzStreamingWindowType,
upstream_partitioning: Option<usize>,
) -> Result<Self> {
let schema = create_schema(
&input.schema(),
Expand All @@ -234,6 +245,7 @@ impl FranzStreamingWindowExec {
input_schema,
schema,
window_type,
upstream_partitioning,
)
}

Expand All @@ -246,6 +258,7 @@ impl FranzStreamingWindowExec {
input_schema: SchemaRef,
schema: SchemaRef,
window_type: FranzStreamingWindowType,
upstream_partitioning: Option<usize>,
) -> Result<Self> {
if aggr_expr.len() != filter_expr.len() {
return internal_err!("Inconsistent aggregate expr: {:?} and filter expr: {:?} for AggregateExec, their size should match", aggr_expr, filter_expr);
Expand Down Expand Up @@ -306,6 +319,7 @@ impl FranzStreamingWindowExec {
cache,
mode,
window_type,
upstream_partitioning,
})
}

Expand Down Expand Up @@ -399,6 +413,7 @@ impl ExecutionPlan for FranzStreamingWindowExec {
children[0].clone(),
self.input_schema.clone(),
self.window_type,
self.upstream_partitioning,
)?))
}

Expand All @@ -409,14 +424,24 @@ impl ExecutionPlan for FranzStreamingWindowExec {
) -> Result<SendableRecordBatchStream> {
if self.group_by.is_empty() {
debug!("GROUP BY expression is empty creating a SimpleWindowAggStream");
Ok(Box::pin(WindowAggStream::new(
self,
context,
partition,
self.watermark.clone(),
self.window_type,
self.mode,
)?))
if self.mode == AggregateMode::Partial {
Ok(Box::pin(WindowAggStream::new(
self,
context,
partition,
self.watermark.clone(),
self.window_type,
self.mode,
)?))
} else {
Ok(Box::pin(FullWindowAggStream::try_new(
self,
context,
partition,
Duration::from_millis(100),
self.upstream_partitioning,
)?))
}
} else {
debug!("Creating a GroupedWindowAggStream");
Ok(Box::pin(GroupedWindowAggStream::new(
Expand All @@ -430,6 +455,13 @@ impl ExecutionPlan for FranzStreamingWindowExec {
}
}

fn required_input_distribution(&self) -> Vec<Distribution> {
if self.mode == AggregateMode::Final {
return vec![Distribution::SinglePartition; self.children().len()];
}
vec![Distribution::UnspecifiedDistribution; self.children().len()]
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
Expand Down Expand Up @@ -750,6 +782,230 @@ impl Stream for WindowAggStream {
}
}

struct FullWindowAggFrame {
window_start_time: SystemTime,
window_end_time: SystemTime,
accumulators: Vec<AccumulatorItem>,
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
schema: SchemaRef,
baseline_metrics: BaselineMetrics,
batches_accumulated: usize,
}

impl FullWindowAggFrame {
pub fn new(
start_time: SystemTime,
end_time: SystemTime,
exec_aggregate_expressions: &[Arc<dyn AggregateExpr>],
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,

schema: SchemaRef,
baseline_metrics: BaselineMetrics,
) -> Self {
let accumulators = create_accumulators(exec_aggregate_expressions).unwrap();
Self {
window_start_time: start_time,
window_end_time: end_time,
accumulators,
aggregate_expressions,
filter_expressions,
schema: schema.clone(),
baseline_metrics: baseline_metrics,
batches_accumulated: 0,
}
}

fn aggregate_batch(&mut self, batch: RecordBatch) {
let _ = aggregate_batch(
&AggregateMode::Final,
batch,
&mut self.accumulators,
&self.aggregate_expressions,
&self.filter_expressions,
);
self.batches_accumulated += 1;
}

fn evaluate(&mut self) -> Result<RecordBatch, DataFusionError> {
let result = finalize_aggregation(&mut self.accumulators, &AggregateMode::Final).and_then(
|columns| RecordBatch::try_new(self.schema.clone(), columns).map_err(Into::into),
);
result
}
}
struct FullWindowAggStream {
pub schema: SchemaRef,
input: SendableRecordBatchStream,
//exec_operator: FranzStreamingWindowExec,
baseline_metrics: BaselineMetrics,
exec_aggregate_expressions: Vec<Arc<dyn AggregateExpr>>,
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
cached_frames: BTreeMap<SystemTime, FullWindowAggFrame>,
watermark: Option<SystemTime>, // This stream needs to be run with only one partition in the Exec operator.
upstream_partitions: usize,
lateness_threshold: Duration,
seen_windows: std::collections::HashSet<SystemTime>,
}

impl FullWindowAggStream {
pub fn try_new(
exec_operator: &FranzStreamingWindowExec,
context: Arc<TaskContext>,
partition: usize,
lateness_threshold: Duration,
upstream_partitioning: Option<usize>,
) -> Result<Self> {
debug!(">>>>>> FullWindowAggStream for partition {}", partition);
let agg_schema = Arc::clone(&exec_operator.schema);
let agg_filter_expr = exec_operator.filter_expressions.clone();

let baseline_metrics = BaselineMetrics::new(&exec_operator.metrics, partition);
let input = exec_operator
.input
.execute(partition, Arc::clone(&context))?;

let aggregate_expressions =
aggregate_expressions(&exec_operator.aggregate_expressions, &exec_operator.mode, 0)?;
let filter_expressions = vec![None; exec_operator.aggregate_expressions.len()];
let exec_aggregate_expressions = exec_operator.aggregate_expressions.clone();
Ok(Self {
schema: agg_schema,
input,
baseline_metrics,
exec_aggregate_expressions,
aggregate_expressions,
filter_expressions,
cached_frames: BTreeMap::new(),
watermark: None,
upstream_partitions: upstream_partitioning.map_or(1, |x| x),
lateness_threshold,
seen_windows: std::collections::HashSet::<SystemTime>::new(),
})
}

fn finalize_windows(&mut self) -> Result<RecordBatch, DataFusionError> {
let mut results: Vec<RecordBatch> = Vec::new();
let mut frame_to_remove: Vec<SystemTime> = Vec::new();
for (timestamp, frame) in self.cached_frames.iter_mut() {
//TODO: May need to add this frame.batches_accumulated == self.upstream_partitions
if self
.watermark
.map_or_else(|| false, |ts| ts > frame.window_end_time)
{
let rb = frame.evaluate()?;
let result = add_window_columns_to_record_batch(
rb,
frame.window_start_time,
frame.window_end_time,
);
results.push(result);
frame_to_remove.push(*timestamp);
}
}
for timestamp in frame_to_remove {
self.cached_frames.remove(&timestamp);
}
concat_batches(
&Arc::new(add_window_columns_to_schema(self.schema.clone())),
&results,
)
.map_err(|err| DataFusionError::ArrowError(err, None))
}

#[inline]
fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
let result = match self.input.poll_next_unpin(cx) {
Poll::Ready(rdy) => match rdy {
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
let mut rb = batch;
let batch_start_time = rb.column_by_name("window_start_time").map(|col| {
let timestamp = col
.as_any()
.downcast_ref::<PrimitiveArray<TimestampMillisecondType>>()
.unwrap();
system_time_from_epoch(
max::<TimestampMillisecondType>(timestamp).unwrap() as _, // these batches should only have 1 row per batch
)
});
let batch_end_time = rb.column_by_name("window_end_time").map(|col| {
let timestamp = col
.as_any()
.downcast_ref::<PrimitiveArray<TimestampMillisecondType>>()
.unwrap();
system_time_from_epoch(
max::<TimestampMillisecondType>(timestamp).unwrap() as _, // these batches should only have 1 row per batch
)
});
let start_time = batch_start_time.unwrap(); //TODO: Remove unwrap.

if self.seen_windows.contains(&start_time)
&& !self.cached_frames.contains_key(&start_time)
{
panic!("we are reopening a window already seen.")
}
let frame = self.cached_frames.entry(start_time).or_insert({
FullWindowAggFrame::new(
start_time,
batch_end_time.unwrap(),
&self.exec_aggregate_expressions,
self.aggregate_expressions.clone(),
self.filter_expressions.clone(),
self.schema.clone(),
self.baseline_metrics.clone(),
)
});

self.seen_windows.insert(start_time);

//last two columns are timestamp columns, so remove them before pushing them onto a frame.
let col_size = rb.num_columns();
rb.remove_column(col_size - 1);
rb.remove_column(col_size - 2);

let _ = frame.aggregate_batch(rb);

self.watermark = self
.watermark
.map_or(Some(start_time), |w| Some(w.max(start_time)));

self.finalize_windows()
} else {
Ok(RecordBatch::new_empty(Arc::new(
add_window_columns_to_schema(self.schema.clone()),
)))
}
}
Some(Err(e)) => Err(e),
None => Ok(RecordBatch::new_empty(Arc::new(
add_window_columns_to_schema(self.schema.clone()),
))),
},
Poll::Pending => return Poll::Pending,
};
Poll::Ready(Some(result))
}
}

impl RecordBatchStream for FullWindowAggStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for FullWindowAggStream {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll: Poll<Option<std::prelude::v1::Result<RecordBatch, DataFusionError>>> =
self.poll_next_inner(cx);
self.baseline_metrics.record_poll(poll)
}
}

pub fn get_windows_for_watermark(
watermark: &RecordBatchWatermark,
window_type: FranzStreamingWindowType,
Expand Down
Loading

0 comments on commit 8743b28

Please sign in to comment.