Skip to content

Commit

Permalink
Introduce FlatContainer, container function for hinting
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Mar 23, 2024
1 parent b407978 commit 79af07d
Show file tree
Hide file tree
Showing 17 changed files with 219 additions and 9 deletions.
1 change: 1 addition & 0 deletions container/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ license = "MIT"

[dependencies]
columnation = { git = "https://github.com/frankmcsherry/columnation" }
flatcontainer = "0.1"
serde = { version = "1.0"}
50 changes: 50 additions & 0 deletions container/src/flatcontainer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! Present a [`FlatStack`] as a timely container.
pub use flatcontainer::*;
use crate::{buffer, Container, PushContainer, PushInto};

impl<R: Region + Clone + 'static> Container for FlatStack<R> {
type ItemRef<'a> = R::ReadItem<'a> where Self: 'a;
type Item<'a> = R::ReadItem<'a> where Self: 'a;

fn len(&self) -> usize {
self.len()
}

fn clear(&mut self) {
self.clear()
}

type Iter<'a> = <&'a Self as IntoIterator>::IntoIter;

fn iter<'a>(&'a self) -> Self::Iter<'a> {
IntoIterator::into_iter(self)
}

type DrainIter<'a> = Self::Iter<'a>;

fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
IntoIterator::into_iter(&*self)
}
}

impl<R: Region + Clone + 'static> PushContainer for FlatStack<R> {
fn capacity(&self) -> usize {
self.capacity()
}

fn preferred_capacity() -> usize {
buffer::default_capacity::<R::Index>()
}

fn reserve(&mut self, additional: usize) {
self.reserve(additional);
}
}

impl<R: Region + Clone + 'static, T: CopyOnto<R>> PushInto<FlatStack<R>> for T {
#[inline]
fn push_into(self, target: &mut FlatStack<R>) {
target.copy(self);
}
}
1 change: 1 addition & 0 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![forbid(missing_docs)]

pub mod columnation;
pub mod flatcontainer;

/// A container transferring data through dataflow edges
///
Expand Down
1 change: 1 addition & 0 deletions timely/examples/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ fn main() {
}
})
})
.container::<Vec<_>>()
.inspect(move |x| println!("worker {}:\tvalue {}", index, x))
.probe_with(&mut probe);
});
Expand Down
93 changes: 93 additions & 0 deletions timely/examples/flatcontainer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! Wordcount based on flatcontainer.
#[cfg(feature = "bincode")]
use {
std::collections::HashMap,
timely::container::flatcontainer::{Containerized, FlatStack},
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::operators::core::InputHandle,
timely::dataflow::operators::{Inspect, Operator, Probe},
timely::dataflow::ProbeHandle,
};

#[cfg(feature = "bincode")]
fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let mut input =
<InputHandle<_, FlatStack<<(String, i64) as Containerized>::Region>>>::new();
let mut probe = ProbeHandle::new();

// create a new input, exchange data, and inspect its output
worker.dataflow::<usize, _, _>(|scope| {
input
.to_stream(scope)
.unary::<FlatStack<<(String, i64) as Containerized>::Region>, _, _, _>(
Pipeline,
"Split",
|_cap, _info| {
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session(&time);
for (text, diff) in data.iter().flat_map(|(text, diff)| {
text.split_whitespace().map(move |s| (s, diff))
}) {
session.give((text, diff));
}
}
}
},
)
.unary_frontier::<FlatStack<<(String, i64) as Containerized>::Region>, _, _, _>(
ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64),
"WordCount",
|_capability, _info| {
let mut queues = HashMap::new();
let mut counts = HashMap::new();

move |input, output| {
while let Some((time, data)) = input.next() {
queues
.entry(time.retain())
.or_insert(Vec::new())
.push(data.take());
}

for (key, val) in queues.iter_mut() {
if !input.frontier().less_equal(key.time()) {
let mut session = output.session(key);
for batch in val.drain(..) {
for (word, diff) in batch.iter() {
let entry =
counts.entry(word.to_string()).or_insert(0i64);
*entry += diff;
session.give((word, *entry));
}
}
}
}

queues.retain(|_key, val| !val.is_empty());
}
},
)
.inspect(|x| println!("seen: {:?}", x))
.probe_with(&mut probe);
});

// introduce data and watch!
for round in 0..10 {
input.send(("flat container", 1));
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
})
.unwrap();
}

#[cfg(not(feature = "bincode"))]
fn main() {
eprintln!("Example requires feature bincode.");
}
1 change: 1 addition & 0 deletions timely/examples/hashjoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ fn main() {
});
}
})
.container::<Vec<_>>()
.probe_with(&mut probe);
});

Expand Down
5 changes: 4 additions & 1 deletion timely/examples/wordcount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ fn main() {
// create a new input, exchange data, and inspect its output
worker.dataflow::<usize,_,_>(|scope| {
input.to_stream(scope)
.flat_map(|(text, diff): (String, i64)|
.container::<Vec<_>>()
.flat_map(|(text, diff): (String, i64)|
text.split_whitespace()
.map(move |word| (word.to_owned(), diff))
.collect::<Vec<_>>()
)
.container::<Vec<_>>()
.unary_frontier(exchange, "WordCount", |_capability, _info| {

let mut queues = HashMap::new();
Expand Down Expand Up @@ -51,6 +53,7 @@ fn main() {

queues.retain(|_key, val| !val.is_empty());
}})
.container::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x))
.probe_with(&mut probe);
});
Expand Down
3 changes: 2 additions & 1 deletion timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ impl<T: Timestamp> CapabilitySet<T> {
/// output.session(a_cap).give(());
/// }
/// }
/// });
/// })
/// .container::<Vec<_>>();
/// });
/// ```
pub fn from_elem(cap: Capability<T>) -> Self {
Expand Down
4 changes: 4 additions & 0 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub trait Input : Scope {
/// let mut input = Handle::new();
/// worker.dataflow(|scope| {
/// scope.input_from(&mut input)
/// .container::<Vec<_>>()
/// .inspect(|x| println!("hello {:?}", x));
/// });
///
Expand Down Expand Up @@ -198,6 +199,7 @@ impl<T: Timestamp, C: Container> Handle<T, C> {
/// let mut input = Handle::new();
/// worker.dataflow(|scope| {
/// scope.input_from(&mut input)
/// .container::<Vec<_>>()
/// .inspect(|x| println!("hello {:?}", x));
/// });
///
Expand Down Expand Up @@ -235,6 +237,7 @@ impl<T: Timestamp, C: Container> Handle<T, C> {
/// let mut input = Handle::new();
/// worker.dataflow(|scope| {
/// input.to_stream(scope)
/// .container::<Vec<_>>()
/// .inspect(|x| println!("hello {:?}", x));
/// });
///
Expand Down Expand Up @@ -404,6 +407,7 @@ impl<T: Timestamp, C: PushContainer> Handle<T, C> {
/// let mut input = Handle::new();
/// worker.dataflow(|scope| {
/// scope.input_from(&mut input)
/// .container::<Vec<_>>()
/// .inspect(|x| println!("hello {:?}", x));
/// });
///
Expand Down
2 changes: 2 additions & 0 deletions timely/src/dataflow/operators/core/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub trait Map<S: Scope, C: Container> {
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .map(|x| x + 1)
/// .container::<Vec<_>>()
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
Expand All @@ -39,6 +40,7 @@ pub trait Map<S: Scope, C: Container> {
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .flat_map(|x| (0..x))
/// .container::<Vec<_>>()
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/core/ok_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub trait OkErr<S: Scope, C: Container> {
/// .to_stream(scope)
/// .ok_err(|x| if x % 2 == 0 { Ok(x) } else { Err(x) });
///
/// even.inspect(|x| println!("even numbers: {:?}", x));
/// odd.inspect(|x| println!("odd numbers: {:?}", x));
/// even.container::<Vec<_>>().inspect(|x| println!("even: {:?}", x));
/// odd.container::<Vec<_>>().inspect(|x| println!("odd: {:?}", x));
/// });
/// ```
fn ok_err<C1, D1, C2, D2, L>(
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ mod test {
#[test]
fn test_shared() {
let output = crate::example(|scope| {
let shared = vec![Ok(0), Err(())].to_stream(scope).shared();
let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
scope
.concatenate([
shared.unary(Pipeline, "read shared 1", |_, _| {
Expand Down
4 changes: 3 additions & 1 deletion timely/src/dataflow/operators/generic/notificator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ fn notificator_delivers_notifications_in_topo_order() {
/// }
/// });
/// }
/// }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
/// })
/// .container::<Vec<_>>()
/// .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
///
/// (in1_handle, in2_handle)
/// });
Expand Down
8 changes: 6 additions & 2 deletions timely/src/dataflow/operators/generic/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ pub trait Operator<G: Scope, C1: Container> {
/// }
/// });
/// }
/// });
/// })
/// .container::<Vec<_>>();
/// });
/// }
/// ```
Expand Down Expand Up @@ -172,7 +173,9 @@ pub trait Operator<G: Scope, C1: Container> {
/// }
/// });
/// }
/// }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
/// })
/// .container::<Vec<_>>()
/// .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
///
/// (in1_handle, in2_handle)
/// });
Expand Down Expand Up @@ -552,6 +555,7 @@ impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
/// else { activator.activate(); }
/// }
/// })
/// .container::<Vec<_>>()
/// .inspect(|x| println!("number: {:?}", x));
/// });
/// ```
Expand Down
4 changes: 3 additions & 1 deletion timely/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub use self::broadcast::Broadcast;
pub use self::capture::Capture;
pub use self::branch::{Branch, BranchWhen};
pub use self::result::ResultStream;
pub use self::to_stream::ToStream;


pub use self::generic::Operator;
pub use self::generic::{Notificator, FrontierNotificator};
Expand All @@ -43,7 +45,7 @@ pub mod delay;
pub use self::core::exchange;
pub mod broadcast;
pub use self::core::probe::{self, Probe};
pub use self::core::to_stream::ToStream;
pub mod to_stream;
pub mod capture;
pub mod branch;
pub use self::core::ok_err::{self, OkErr};
Expand Down
32 changes: 32 additions & 0 deletions timely/src/dataflow/operators/to_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! Conversion to the `Stream` type from iterators.
use crate::Data;
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::core::{ToStream as ToStreamCore};

/// Converts to a timely `Stream`.
pub trait ToStream<D: Data> {
/// Converts to a timely `Stream`.
///
/// # Examples
///
/// ```
/// use timely::dataflow::operators::{ToStream, Capture};
/// use timely::dataflow::operators::capture::Extract;
///
/// let (data1, data2) = timely::example(|scope| {
/// let data1 = (0..3).to_stream(scope).capture();
/// let data2 = vec![0,1,2].to_stream(scope).capture();
/// (data1, data2)
/// });
///
/// assert_eq!(data1.extract(), data2.extract());
/// ```
fn to_stream<S: Scope>(self, scope: &mut S) -> Stream<S, D>;
}

impl<I: IntoIterator+'static> ToStream<I::Item> for I where I::Item: Data {
fn to_stream<S: Scope>(self, scope: &mut S) -> Stream<S, I::Item> {
ToStreamCore::to_stream(self, scope)
}
}
13 changes: 13 additions & 0 deletions timely/src/dataflow/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ impl<S: Scope, C: Container> StreamCore<S, C> {
pub fn name(&self) -> &Source { &self.name }
/// The scope immediately containing the stream.
pub fn scope(&self) -> S { self.scope.clone() }

/// Allows the assertion of a container type, for the benefit of type inference.
pub fn container<D: Container>(self) -> StreamCore<S, D> where Self: AsStream<S, D> { self.as_stream() }
}

/// A type that can be translated to a [StreamCore].
pub trait AsStream<S: Scope, C> {
/// Translate `self` to a [StreamCore].
fn as_stream(self) -> StreamCore<S, C>;
}

impl<S: Scope, C> AsStream<S, C> for StreamCore<S, C> {
fn as_stream(self) -> Self { self }
}

impl<S, C> Debug for StreamCore<S, C>
Expand Down

0 comments on commit 79af07d

Please sign in to comment.