From 358ebbde720736194c4be520dd9234c3915fd1ee Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 22 Mar 2024 17:18:56 -0400 Subject: [PATCH] Introduce FlatContainer, container function for hinting --- container/Cargo.toml | 1 + container/src/flatcontainer.rs | 50 ++++++++++ container/src/lib.rs | 1 + timely/examples/distinct.rs | 1 + timely/examples/flatcontainer.rs | 93 +++++++++++++++++++ timely/examples/hashjoin.rs | 1 + timely/examples/simple.rs | 1 + timely/examples/wordcount.rs | 3 + timely/src/dataflow/operators/capability.rs | 3 +- timely/src/dataflow/operators/core/input.rs | 4 + timely/src/dataflow/operators/core/map.rs | 2 + timely/src/dataflow/operators/core/ok_err.rs | 4 +- timely/src/dataflow/operators/core/rc.rs | 2 +- .../dataflow/operators/generic/notificator.rs | 4 +- .../dataflow/operators/generic/operator.rs | 8 +- timely/src/dataflow/operators/mod.rs | 4 +- timely/src/dataflow/operators/to_stream.rs | 32 +++++++ timely/src/dataflow/stream.rs | 13 +++ 18 files changed, 219 insertions(+), 8 deletions(-) create mode 100644 container/src/flatcontainer.rs create mode 100644 timely/examples/flatcontainer.rs create mode 100644 timely/src/dataflow/operators/to_stream.rs diff --git a/container/Cargo.toml b/container/Cargo.toml index 6b3e2d65de..06abc91236 100644 --- a/container/Cargo.toml +++ b/container/Cargo.toml @@ -7,4 +7,5 @@ license = "MIT" [dependencies] columnation = { git = "https://github.com/frankmcsherry/columnation" } +flatcontainer = "0.1" serde = { version = "1.0"} diff --git a/container/src/flatcontainer.rs b/container/src/flatcontainer.rs new file mode 100644 index 0000000000..53f13ed397 --- /dev/null +++ b/container/src/flatcontainer.rs @@ -0,0 +1,50 @@ +//! Present a [`FlatStack`] as a timely container. + +pub use flatcontainer::*; +use crate::{buffer, Container, PushContainer, PushInto}; + +impl Container for FlatStack { + type ItemRef<'a> = R::ReadItem<'a> where Self: 'a; + type Item<'a> = R::ReadItem<'a> where Self: 'a; + + fn len(&self) -> usize { + self.len() + } + + fn clear(&mut self) { + self.clear() + } + + type Iter<'a> = <&'a Self as IntoIterator>::IntoIter; + + fn iter<'a>(&'a self) -> Self::Iter<'a> { + IntoIterator::into_iter(self) + } + + type DrainIter<'a> = Self::Iter<'a>; + + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { + IntoIterator::into_iter(&*self) + } +} + +impl PushContainer for FlatStack { + fn capacity(&self) -> usize { + self.capacity() + } + + fn preferred_capacity() -> usize { + buffer::default_capacity::() + } + + fn reserve(&mut self, additional: usize) { + self.reserve(additional); + } +} + +impl> PushInto> for T { + #[inline] + fn push_into(self, target: &mut FlatStack) { + target.copy(self); + } +} \ No newline at end of file diff --git a/container/src/lib.rs b/container/src/lib.rs index 6a56295f9d..61a4b32b93 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -3,6 +3,7 @@ #![forbid(missing_docs)] pub mod columnation; +pub mod flatcontainer; /// A container transferring data through dataflow edges /// diff --git a/timely/examples/distinct.rs b/timely/examples/distinct.rs index 28971384a6..cfe31440d1 100644 --- a/timely/examples/distinct.rs +++ b/timely/examples/distinct.rs @@ -35,6 +35,7 @@ fn main() { } }) }) + .container::>() .inspect(move |x| println!("worker {}:\tvalue {}", index, x)) .probe_with(&mut probe); }); diff --git a/timely/examples/flatcontainer.rs b/timely/examples/flatcontainer.rs new file mode 100644 index 0000000000..8d528c9540 --- /dev/null +++ b/timely/examples/flatcontainer.rs @@ -0,0 +1,93 @@ +//! Wordcount based on flatcontainer. + +#[cfg(feature = "bincode")] +use { + std::collections::HashMap, + timely::container::flatcontainer::{Containerized, FlatStack}, + timely::dataflow::channels::pact::{ExchangeCore, Pipeline}, + timely::dataflow::operators::core::InputHandle, + timely::dataflow::operators::{Inspect, Operator, Probe}, + timely::dataflow::ProbeHandle, +}; + +#[cfg(feature = "bincode")] +fn main() { + // initializes and runs a timely dataflow. + timely::execute_from_args(std::env::args(), |worker| { + let mut input = + ::Region>>>::new(); + let mut probe = ProbeHandle::new(); + + // create a new input, exchange data, and inspect its output + worker.dataflow::(|scope| { + input + .to_stream(scope) + .unary::::Region>, _, _, _>( + Pipeline, + "Split", + |_cap, _info| { + move |input, output| { + while let Some((time, data)) = input.next() { + let mut session = output.session(&time); + for (text, diff) in data.iter().flat_map(|(text, diff)| { + text.split_whitespace().map(move |s| (s, diff)) + }) { + session.give((text, diff)); + } + } + } + }, + ) + .unary_frontier::::Region>, _, _, _>( + ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64), + "WordCount", + |_capability, _info| { + let mut queues = HashMap::new(); + let mut counts = HashMap::new(); + + move |input, output| { + while let Some((time, data)) = input.next() { + queues + .entry(time.retain()) + .or_insert(Vec::new()) + .push(data.take()); + } + + for (key, val) in queues.iter_mut() { + if !input.frontier().less_equal(key.time()) { + let mut session = output.session(key); + for batch in val.drain(..) { + for (word, diff) in batch.iter() { + let entry = + counts.entry(word.to_string()).or_insert(0i64); + *entry += diff; + session.give((word, *entry)); + } + } + } + } + + queues.retain(|_key, val| !val.is_empty()); + } + }, + ) + .inspect(|x| println!("seen: {:?}", x)) + .probe_with(&mut probe); + }); + + // introduce data and watch! + for round in 0..10 { + input.send(("flat container", 1)); + input.advance_to(round + 1); + while probe.less_than(input.time()) { + worker.step(); + } + } + }) + .unwrap(); +} + +#[cfg(not(feature = "bincode"))] +fn main() { + eprintln!("Example requires feature bincode."); +} \ No newline at end of file diff --git a/timely/examples/hashjoin.rs b/timely/examples/hashjoin.rs index ad8ef8809a..43ff20878c 100644 --- a/timely/examples/hashjoin.rs +++ b/timely/examples/hashjoin.rs @@ -76,6 +76,7 @@ fn main() { }); } }) + .container::>() .probe_with(&mut probe); }); diff --git a/timely/examples/simple.rs b/timely/examples/simple.rs index 541d02f452..3286d864fc 100644 --- a/timely/examples/simple.rs +++ b/timely/examples/simple.rs @@ -5,6 +5,7 @@ use timely::dataflow::operators::*; fn main() { timely::example(|scope| { (0..10).to_stream(scope) + .container::>() .inspect(|x| println!("seen: {:?}", x)); }); } diff --git a/timely/examples/wordcount.rs b/timely/examples/wordcount.rs index a92ee3b6d0..20e8015e19 100644 --- a/timely/examples/wordcount.rs +++ b/timely/examples/wordcount.rs @@ -19,11 +19,13 @@ fn main() { // create a new input, exchange data, and inspect its output worker.dataflow::(|scope| { input.to_stream(scope) + .container::>() .flat_map(|(text, diff): (String, i64)| text.split_whitespace() .map(move |word| (word.to_owned(), diff)) .collect::>() ) + .container::>() .unary_frontier(exchange, "WordCount", |_capability, _info| { let mut queues = HashMap::new(); @@ -51,6 +53,7 @@ fn main() { queues.retain(|_key, val| !val.is_empty()); }}) + .container::>() .inspect(|x| println!("seen: {:?}", x)) .probe_with(&mut probe); }); diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 5caa9bce8e..233acd18e5 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -426,7 +426,8 @@ impl CapabilitySet { /// output.session(a_cap).give(()); /// } /// } - /// }); + /// }) + /// .container::>(); /// }); /// ``` pub fn from_elem(cap: Capability) -> Self { diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index ec27cab9b6..2b8ed58dc1 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -81,6 +81,7 @@ pub trait Input : Scope { /// let mut input = Handle::new(); /// worker.dataflow(|scope| { /// scope.input_from(&mut input) + /// .container::>() /// .inspect(|x| println!("hello {:?}", x)); /// }); /// @@ -198,6 +199,7 @@ impl Handle { /// let mut input = Handle::new(); /// worker.dataflow(|scope| { /// scope.input_from(&mut input) + /// .container::>() /// .inspect(|x| println!("hello {:?}", x)); /// }); /// @@ -235,6 +237,7 @@ impl Handle { /// let mut input = Handle::new(); /// worker.dataflow(|scope| { /// input.to_stream(scope) + /// .container::>() /// .inspect(|x| println!("hello {:?}", x)); /// }); /// @@ -404,6 +407,7 @@ impl Handle { /// let mut input = Handle::new(); /// worker.dataflow(|scope| { /// scope.input_from(&mut input) + /// .container::>() /// .inspect(|x| println!("hello {:?}", x)); /// }); /// diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index ecbba1ea49..2b52826dce 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -18,6 +18,7 @@ pub trait Map { /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .map(|x| x + 1) + /// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` @@ -39,6 +40,7 @@ pub trait Map { /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .flat_map(|x| (0..x)) + /// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index 6888108b34..c49b0513fd 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -24,8 +24,8 @@ pub trait OkErr { /// .to_stream(scope) /// .ok_err(|x| if x % 2 == 0 { Ok(x) } else { Err(x) }); /// - /// even.inspect(|x| println!("even numbers: {:?}", x)); - /// odd.inspect(|x| println!("odd numbers: {:?}", x)); + /// even.container::>().inspect(|x| println!("even: {:?}", x)); + /// odd.container::>().inspect(|x| println!("odd: {:?}", x)); /// }); /// ``` fn ok_err( diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index eaae55093e..fcdc646cf2 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -50,7 +50,7 @@ mod test { #[test] fn test_shared() { let output = crate::example(|scope| { - let shared = vec![Ok(0), Err(())].to_stream(scope).shared(); + let shared = vec![Ok(0), Err(())].to_stream(scope).container::>().shared(); scope .concatenate([ shared.unary(Pipeline, "read shared 1", |_, _| { diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 6c8dd8ea94..72a4be2f94 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -217,7 +217,9 @@ fn notificator_delivers_notifications_in_topo_order() { /// } /// }); /// } -/// }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); +/// }) +/// .container::>() +/// .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); /// /// (in1_handle, in2_handle) /// }); diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 45571153a3..24ce5b376d 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -51,7 +51,8 @@ pub trait Operator { /// } /// }); /// } - /// }); + /// }) + /// .container::>(); /// }); /// } /// ``` @@ -172,7 +173,9 @@ pub trait Operator { /// } /// }); /// } - /// }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); + /// }) + /// .container::>() + /// .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); /// /// (in1_handle, in2_handle) /// }); @@ -552,6 +555,7 @@ impl Operator for StreamCore { /// else { activator.activate(); } /// } /// }) +/// .container::>() /// .inspect(|x| println!("number: {:?}", x)); /// }); /// ``` diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 453e8838f2..53b504d3ba 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -20,6 +20,8 @@ pub use self::broadcast::Broadcast; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; pub use self::result::ResultStream; +pub use self::to_stream::ToStream; + pub use self::generic::Operator; pub use self::generic::{Notificator, FrontierNotificator}; @@ -43,7 +45,7 @@ pub mod delay; pub use self::core::exchange; pub mod broadcast; pub use self::core::probe::{self, Probe}; -pub use self::core::to_stream::ToStream; +pub mod to_stream; pub mod capture; pub mod branch; pub use self::core::ok_err::{self, OkErr}; diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs new file mode 100644 index 0000000000..4a9b41dfa5 --- /dev/null +++ b/timely/src/dataflow/operators/to_stream.rs @@ -0,0 +1,32 @@ +//! Conversion to the `Stream` type from iterators. + +use crate::Data; +use crate::dataflow::{Stream, Scope}; +use crate::dataflow::operators::core::{ToStream as ToStreamCore}; + +/// Converts to a timely `Stream`. +pub trait ToStream { + /// Converts to a timely `Stream`. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::operators::{ToStream, Capture}; + /// use timely::dataflow::operators::capture::Extract; + /// + /// let (data1, data2) = timely::example(|scope| { + /// let data1 = (0..3).to_stream(scope).capture(); + /// let data2 = vec![0,1,2].to_stream(scope).capture(); + /// (data1, data2) + /// }); + /// + /// assert_eq!(data1.extract(), data2.extract()); + /// ``` + fn to_stream(self, scope: &mut S) -> Stream; +} + +impl ToStream for I where I::Item: Data { + fn to_stream(self, scope: &mut S) -> Stream { + ToStreamCore::to_stream(self, scope) + } +} \ No newline at end of file diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 8eafd5a4bb..173e777af3 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -58,6 +58,19 @@ impl StreamCore { pub fn name(&self) -> &Source { &self.name } /// The scope immediately containing the stream. pub fn scope(&self) -> S { self.scope.clone() } + + /// Allows the assertion of a container type, for the benefit of type inference. + pub fn container(self) -> StreamCore where Self: AsStream { self.as_stream() } +} + +/// A type that can be translated to a [StreamCore]. +pub trait AsStream { + /// Translate `self` to a [StreamCore]. + fn as_stream(self) -> StreamCore; +} + +impl AsStream for StreamCore { + fn as_stream(self) -> Self { self } } impl Debug for StreamCore