Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
segeljakt committed Feb 27, 2024
1 parent 9939ac7 commit 7a84851
Show file tree
Hide file tree
Showing 33 changed files with 778 additions and 113 deletions.
5 changes: 4 additions & 1 deletion crates/runtime/src/builtins/keyed_stream/window.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod time_tumbling_holistic;
pub mod count_tumbling_holistic;
pub mod count_sliding_holistic;
pub mod time_tumbling_holistic_vec;
// pub mod time_sliding_holistic;
pub mod time_sliding_aligned_commutative_associative;
pub mod time_sliding_aligned_holistic;
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::collections::BTreeMap;

use crate::builtins::duration::Duration;
use crate::builtins::keyed_stream::KeyedEvent;
use crate::builtins::keyed_stream::KeyedStream;
use crate::builtins::stream::window::WindowRange;
use crate::builtins::time::Time;
use crate::runner::context::Context;
use crate::traits::Data;
use crate::traits::Key;
use crate::HashMap;

impl<K: Key, T: Data> KeyedStream<K, T> {
// Requires that duration % step == 0
#[allow(clippy::too_many_arguments)]
pub fn time_sliding_aligned_commutative_associative_window<P, O>(
mut self,
ctx: &mut Context,
duration: Duration,
step: Duration,
_init: P,
lift: impl Fn(&T) -> P + Send + 'static,
combine: impl Fn(&P, &P) -> P + Send + 'static,
lower: impl Fn(&K, &P, WindowRange) -> O + Send + 'static,
) -> KeyedStream<K, O>
where
O: Data,
P: Data,
{
assert!(duration % step == Duration::from_seconds(0));
ctx.keyed_operator(|tx| async move {
let mut slices: BTreeMap<Time, HashMap<K, P>> = BTreeMap::new();
let mut output: HashMap<K, P> = HashMap::default();
loop {
match self.recv().await {
KeyedEvent::Data(time, key, data) => {
let data = lift(&data);
let wr = WindowRange::of(time, step, step);
slices
.entry(wr.t0)
.or_default()
.entry(key)
.and_modify(|e| *e = combine(e, &data))
.or_insert(data);
}
KeyedEvent::Watermark(time) => {
while let Some(entry) = slices.first_entry() {
let t0 = *entry.key();
let t1 = t0 + duration;
let wr = WindowRange::new(t0, t1);
if wr.t1 <= time {
for (_, kvs) in slices.range(..t1) {
for (k, v) in kvs {
output
.entry(k.clone())
.and_modify(|e| *e = combine(e, v))
.or_insert_with(|| v.clone());
}
}
for (k, v) in output.drain() {
let output = lower(&k, &v, wr);
tx.send(KeyedEvent::Data(time, k, output.deep_clone()))
.await;
}
slices.pop_first();
} else {
break;
}
}
tx.send(KeyedEvent::Watermark(time)).await;
}
KeyedEvent::Snapshot(i) => {
tx.send(KeyedEvent::Snapshot(i)).await;
}
KeyedEvent::Sentinel => {
tx.send(KeyedEvent::Sentinel).await;
break;
}
}
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::builtins::duration::Duration;
use crate::builtins::keyed_stream::KeyedEvent;
use crate::builtins::keyed_stream::KeyedStream;
use crate::builtins::stream::window::align;
use crate::builtins::stream::window::WindowRange;
use crate::builtins::time::Time;
use crate::runner::context::Context;
use crate::traits::Data;
use crate::traits::Key;
use crate::BTreeMap;
use crate::HashMap;

impl<K: Key, T: Data> KeyedStream<K, T> {
pub fn time_sliding_aligned_holistic_window<O>(
mut self,
ctx: &mut Context,
size: Duration,
slide: Duration,
compute: impl for<'a, 'b> Fn(&K, Window<'a, 'b, T>, WindowRange) -> O + Send + 'static,
) -> KeyedStream<K, O>
where
O: Data,
{
ctx.keyed_operator(|tx| async move {
let mut slices: BTreeMap<Time, HashMap<K, Vec<T>>> = BTreeMap::new();
loop {
match self.recv().await {
KeyedEvent::Data(time, key, data) => {
slices
.entry(align(time, slide))
.or_default()
.entry(key)
.or_default()
.push(data);
}
KeyedEvent::Watermark(time) => {
while let Some((t0, _)) = slices.first_key_value() {
let t1 = *t0 + size;
let wr = WindowRange::new(*t0, t1);
if wr.t1 < time {
let mut output: HashMap<K, Vec<&[T]>> = HashMap::default();
for (_, kvs) in slices.range(..wr.t1) {
for (k, vs) in kvs {
output.entry(k.clone()).or_default().push(vs);
}
}
for (k, vs) in output.drain() {
let output = compute(&k, Window::new(vs.as_slice()), wr);
tx.send(KeyedEvent::Data(time, k, output.deep_clone()))
.await;
}
slices.pop_first();
} else {
break;
}
}
tx.send(KeyedEvent::Watermark(time)).await;
}
KeyedEvent::Snapshot(i) => {
tx.send(KeyedEvent::Snapshot(i)).await;
}
KeyedEvent::Sentinel => {
tx.send(KeyedEvent::Sentinel).await;
break;
}
}
}
})
}
}

pub struct Window<'a, 'b, T> {
slices: &'a [&'b [T]],
}

impl<'a, 'b, T> Window<'a, 'b, T> {
fn new(slices: &'a [&'b [T]]) -> Self {
Self { slices }
}
}

pub struct WindowIter<'a, 'b, T> {
slices: &'a [&'b [T]],
idx: usize,
}

impl<'a, 'b, T> Iterator for WindowIter<'a, 'b, T> {
type Item = &'b T;

fn next(&mut self) -> Option<Self::Item> {
if self.idx < self.slices.len() {
let slice = self.slices[self.idx];
self.idx += 1;
slice.first()
} else {
None
}
}
}

impl<'a, 'b, T> IntoIterator for Window<'a, 'b, T> {
type Item = &'b T;
type IntoIter = WindowIter<'a, 'b, T>;

fn into_iter(self) -> Self::IntoIter {
WindowIter {
slices: self.slices,
idx: 0,
}
}
}
4 changes: 2 additions & 2 deletions crates/runtime/src/builtins/stream/keyby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use super::Event;
use super::Stream;

impl<T: Data> Stream<T> {
pub fn keyby<K: Data>(mut self, ctx: &mut Context, fun: fn(T) -> K) -> KeyedStream<K, T> {
pub fn keyby<K: Data>(mut self, ctx: &mut Context, fun: fn(&T) -> K) -> KeyedStream<K, T> {
ctx.keyed_operator(|tx1| async move {
loop {
match self.recv().await {
Event::Data(t, v) => {
let k = fun(v.clone());
let k = fun(&v);
tx1.send(KeyedEvent::Data(t, k, v)).await;
}
Event::Watermark(t) => {
Expand Down
12 changes: 4 additions & 8 deletions crates/runtime/src/builtins/stream/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@ use crate::builtins::duration::Duration;
use crate::builtins::time::Time;

pub mod count_sliding_aligned_commutative_associative;
pub mod time_sliding_aligned_commutative_associative;

pub mod count_sliding_holistic;
pub mod time_sliding_holistic_btreemap;
pub mod time_sliding_holistic_vec;

pub mod count_sliding_invertible;
pub mod time_sliding_invertible;

pub mod count_tumbling_holistic;
pub mod time_tumbling_holistic;

pub mod time_sliding_aligned_commutative_associative;
pub mod time_sliding_aligned_holistic;
pub mod time_sliding_commutative_invertible;
pub mod time_sliding_invertible;
pub mod time_tumbling_holistic;

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct WindowRange {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl<T: Data> Stream<T> {
O: Data,
{
ctx.operator(|tx| async move {
let mut s: VecDeque<T> = VecDeque::with_capacity(size * 2);
let mut s: VecDeque<T> = VecDeque::with_capacity(size);
loop {
match self.recv().await {
Event::Data(time, data) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeMap;

use crate::builtins::duration::Duration;
use crate::builtins::stream::window::align;
use crate::builtins::stream::Event;
use crate::builtins::stream::Stream;
use crate::builtins::time::Time;
Expand All @@ -17,7 +18,7 @@ impl<T: Data> Stream<T> {
ctx: &mut Context,
duration: Duration,
step: Duration,
init: P,
_init: P,
lift: impl Fn(&T) -> P + Send + 'static,
combine: impl Fn(&P, &P) -> P + Send + 'static,
lower: impl Fn(&P, WindowRange) -> O + Send + 'static,
Expand All @@ -33,9 +34,10 @@ impl<T: Data> Stream<T> {
match self.recv().await {
Event::Data(time, data) => {
let data = lift(&data);
let wr = WindowRange::of(time, step, step);
let agg = slices.entry(wr.t0).or_insert_with(|| init.clone());
*agg = combine(agg, &data);
slices
.entry(align(time, step))
.and_modify(|agg| *agg = combine(agg, &data))
.or_insert(data);
}
Event::Watermark(time) => {
while let Some(entry) = slices.first_entry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl<T: Data> Stream<T> {
/// This window maintains a BTreeMap that maps window starting times to slices of data, where
/// each slice is a vector of (Time, Data) pairs. When a watermark is received, the window
/// iterates over the slices before the watermark and sends the result of the compute function
pub fn time_sliding_aligned_holistic_btreemap_window<O>(
pub fn time_sliding_aligned_holistic_window<O>(
mut self,
ctx: &mut Context,
duration: Duration,
Expand All @@ -28,32 +28,20 @@ impl<T: Data> Stream<T> {
{
ctx.operator(|tx| async move {
let mut s: WindowState<T> = WindowState::new();
// Slices before this time are sorted
let mut t_sorted: Time = Time::zero();
loop {
match self.recv().await {
Event::Data(time, data) => {
let t0 = align(time, step);
s.get_mut(t0).push((time, data));
}
Event::Watermark(time) => {
let t_safe = align(time, step);
for (_, slice) in s.0.range_mut(t_sorted..t_safe) {
slice.sort_by_key(|(t, _)| *t);
}
t_sorted = t_safe;
while let Some(entry) = s.0.first_entry() {
let t0 = *entry.key();
let wr = WindowRange::new(t0, t0 + duration);
if wr.t1 > time {
break;
}
// println!("Length: {}", s.0.len());
let win = Window::new(&s.0, wr.t1);
// println!("Window: {:?}", wr.t1);
// for i in win.iter() {
// println!("Data: {:?}", i);
// }
let data = compute(win, wr);
s.0.pop_first();
tx.send(Event::Data(wr.t1, data.deep_clone())).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,17 @@ impl<T: Data> Stream<T> {
Event::Watermark(time) => {
while let Some(entry) = buffer.first_entry() {
let wr = WindowRange::of(*entry.key(), duration, step);
if wr.t1 > time {
break;
}
for (_, p) in buffer.range(..wr.t1) {
agg = combine(&agg, &p.clone());
}
let data = lower(&agg, wr);
tx.send(Event::Data(wr.t1, data.deep_clone())).await;
// Evict the part of the oldest window that is no longer needed.
let after = buffer.split_off(&(wr.t0 + step));
for (_, p) in std::mem::replace(&mut buffer, after) {
agg = inverse(&agg, &p);
if wr.t1 < time {
for (_, p) in buffer.range(..wr.t1) {
agg = combine(&agg, &p);
}
let data = lower(&agg, wr);
tx.send(Event::Data(wr.t1, data.deep_clone())).await;
// Evict the part of the oldest window that is no longer needed.
let after = buffer.split_off(&(wr.t0 + step));
for (_, p) in std::mem::replace(&mut buffer, after) {
agg = inverse(&agg, &p);
}
}
}
tx.send(Event::Watermark(time)).await;
Expand Down
2 changes: 1 addition & 1 deletion experiments/nexmark/data-generator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.inspect(|(i, _)| {
let m = i + 1;
let p = n / 100;
if m % p == 0 {
if m % p == 10 {
let progress = m / p;
println!("{name}: {progress}%");
}
Expand Down
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit 7a84851

Please sign in to comment.