diff --git a/examples/spines.rs b/examples/spines.rs index 552b62c76..57b127c2b 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -54,16 +54,15 @@ fn main() { "slc" => { use differential_dataflow::trace::implementations::ord_neu::PreferredSpine; - use differential_dataflow::operators::reduce::ReduceCore; let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) - .arrange::>(); - // .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); + .arrange::>() + .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) - .arrange::>(); - // .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); + .arrange::>() + .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); keys.join_core(&data, |k,_v1,_v2| { println!("{:?}", k.text); diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index de19fd413..d152f7088 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -452,6 +452,49 @@ where } } +// Direct reduce implementations. +use difference::Abelian; +impl Arranged +where + G::Timestamp: Lattice+Ord, + T1: TraceReader+Clone+'static, + T1::Diff: Semigroup, +{ + /// A direct implementation of `ReduceCore::reduce_abelian`. + pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> + where + T2: for<'a> Trace= T1::Key<'a>, Time=G::Timestamp>+'static, + T2::ValOwned: Data, + T2::Diff: Abelian, + T2::Batch: Batch, + T2::Builder: Builder, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(::ValOwned, T2::Diff)>)+'static, + { + self.reduce_core::<_,T2>(name, move |key, input, output, change| { + if !input.is_empty() { + logic(key, input, change); + } + change.extend(output.drain(..).map(|(x,d)| (x, d.negate()))); + crate::consolidation::consolidate(change); + }) + } + + /// A direct implementation of `ReduceCore::reduce_core`. + pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> + where + T2: for<'a> Trace=T1::Key<'a>, Time=G::Timestamp>+'static, + T2::ValOwned: Data, + T2::Diff: Semigroup, + T2::Batch: Batch, + T2::Builder: Builder, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, + { + use operators::reduce::reduce_trace; + reduce_trace(self, name, logic) + } +} + + impl<'a, G: Scope, Tr> Arranged, Tr> where G::Timestamp: Lattice+Ord, diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 4c2d609f6..b408357b2 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -26,7 +26,7 @@ use trace::implementations::{KeySpine, ValSpine}; use trace::TraceReader; /// Extension trait for the `reduce` differential dataflow method. -pub trait Reduce : ReduceCore where G::Timestamp: Lattice+Ord { +pub trait Reduce where G::Timestamp: Lattice+Ord { /// Applies a reduction function on records grouped by key. /// /// Input data must be structured as `(key, val)` pairs. @@ -327,28 +327,10 @@ where } } -impl ReduceCore for Arranged -where - K: ToOwned + Ord + ?Sized, - K::Owned: Data, - V: ToOwned + Ord + ?Sized, - G::Timestamp: Lattice+Ord, - T1: for<'a> TraceReader=&'a K, KeyOwned = ::Owned, Val<'a>=&'a V, Time=G::Timestamp, Diff=R>+Clone+'static, -{ - fn reduce_core(&self, name: &str, logic: L) -> Arranged> - where - T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, - T2::ValOwned: Data, - T2::Diff: Semigroup, - T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, - { - reduce_trace(self, name, logic) - } -} - -fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> +/// A key-wise reduction of values in an input trace. +/// +/// This method exists to provide reduce functionality without opinions about qualifying trace types. +pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, G::Timestamp: Lattice+Ord, @@ -359,7 +341,7 @@ where T2::Diff: Semigroup, T2::Batch: Batch, T2::Builder: Builder, - L: for<'a> FnMut(T1::Key<'a>, &[(T1::Val<'a>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static, { let mut result_trace = None;