Skip to content

Commit

Permalink
Demonstrate columnar stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Sep 19, 2024
1 parent f54796a commit 1208fe9
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 0 deletions.
2 changes: 2 additions & 0 deletions container/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ license = "MIT"
columnation = { git = "https://github.com/frankmcsherry/columnation" }
flatcontainer = "0.5"
serde = { version = "1.0"}
# columnar = { path = "../../columnar" }
columnar = { git = "https://github.com/frankmcsherry/columnar" }
88 changes: 88 additions & 0 deletions container/src/columnar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//! Present a columnar container as a timely container.
use serde::{Serialize, Deserialize};

pub use columnar::*;
use columnar::common::IterOwn;

use crate::{Container, SizableContainer, PushInto};

/// A container based on a `columnar` store.
#[derive(Clone, Default, Serialize, Deserialize)]
pub struct Columnar<C> {
store: C,
}

impl<C: Len + Clear + Clone + Default + 'static> Container for Columnar<C>
where
for<'a> &'a C: columnar::IndexOwn,
{
fn len(&self) -> usize { self.store.len() }
fn clear(&mut self) { self.store.clear() }

type ItemRef<'a> = <&'a C as IndexOwn>::Ref where Self: 'a;
type Iter<'a> = IterOwn<&'a C>;
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.store.iter() }

type Item<'a> = <&'a C as IndexOwn>::Ref where Self: 'a;
type DrainIter<'a> = IterOwn<&'a C>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.store.iter() }
}

impl<C: Len + Clear + Clone + Default + 'static> SizableContainer for Columnar<C>
where
for<'a> &'a C: columnar::IndexOwn,
{
fn capacity(&self) -> usize { 1024 }
fn preferred_capacity() -> usize { 1024 }
fn reserve(&mut self, _additional: usize) { }
}

impl<C: columnar::Push<T>, T> PushInto<T> for Columnar<C> {
#[inline]
fn push_into(&mut self, item: T) {
self.store.push(item);
}
}


use columnar::bytes::{AsBytes, FromBytes, serialization::decode};

/// A container based on a columnar store, encoded in aligned bytes.
#[derive(Clone, Default)]
pub struct ColumnarBytes<B, C> {
bytes: B,
phantom: std::marker::PhantomData<C>,
}

impl<B: std::ops::Deref<Target = [u64]> + Clone + Default + 'static, C: AsBytes + Clone + Default + 'static> Container for ColumnarBytes<B, C>
where
for<'a> C::Borrowed<'a> : Len + Clear + IndexOwn,
{
fn len(&self) -> usize {
<C::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(&self.bytes)).len()
}
// Perhpas this should be an enum that allows the bytes to be un-set, but .. not sure what this should do.
fn clear(&mut self) { unimplemented!() }

type ItemRef<'a> = <C::Borrowed<'a> as IndexOwn>::Ref where Self: 'a;
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> {
<C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(&self.bytes)).iter()
}

type Item<'a> = <C::Borrowed<'a> as IndexOwn>::Ref where Self: 'a;
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
<C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(&self.bytes)).iter()
}
}

impl<B: std::ops::Deref<Target = [u64]> + Clone + Default + 'static, C: AsBytes + Clone + Default + 'static> SizableContainer for ColumnarBytes<B, C>
where
for<'a> C::Borrowed<'a> : Len + Clear + IndexOwn,
{
fn capacity(&self) -> usize { 1024 }
fn preferred_capacity() -> usize { 1024 }
fn reserve(&mut self, _additional: usize) { }
}
1 change: 1 addition & 0 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::collections::VecDeque;

pub mod columnation;
pub mod flatcontainer;
pub mod columnar;

/// A container transferring data through dataflow edges
///
Expand Down
98 changes: 98 additions & 0 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! Wordcount based on flatcontainer.
use {
std::collections::HashMap,
timely::{Container, container::CapacityContainerBuilder},
timely::container::columnar::Columnar,
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::InputHandleCore,
timely::dataflow::operators::{Inspect, Operator, Probe},
timely::dataflow::ProbeHandle,
};

fn main() {

use timely_container::columnar::Strings;
type Container = Columnar<(Strings, Vec<i64>)>;

// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let mut input = <InputHandleCore<_, CapacityContainerBuilder<Container>>>::new();
let mut probe = ProbeHandle::new();

// create a new input, exchange data, and inspect its output
worker.dataflow::<usize, _, _>(|scope| {
input
.to_stream(scope)
.unary(
Pipeline,
"Split",
|_cap, _info| {
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session(&time);
for (text, diff) in data.iter().flat_map(|(text, diff)| {
text.split_whitespace().map(move |s| (s, diff))
}) {
session.give((text, diff));
}
}
}
},
)
.container::<Container>()
.unary_frontier(
ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64),
"WordCount",
|_capability, _info| {
let mut queues = HashMap::new();
let mut counts = HashMap::new();

move |input, output| {
while let Some((time, data)) = input.next() {
queues
.entry(time.retain())
.or_insert(Vec::new())
.push(data.take());
}

for (key, val) in queues.iter_mut() {
if !input.frontier().less_equal(key.time()) {
let mut session = output.session(key);
for batch in val.drain(..) {
for (word, diff) in batch.iter() {
let total =
if let Some(count) = counts.get_mut(word) {
*count += diff;
*count
}
else {
counts.insert(word.to_string(), *diff);
*diff
};
session.give((word, total));
}
}
}
}

queues.retain(|_key, val| !val.is_empty());
}
},
)
.container::<Container>()
.inspect(|x| println!("seen: {:?}", x))
.probe_with(&mut probe);
});

// introduce data and watch!
for round in 0..10 {
input.send(("flat container", 1));
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
})
.unwrap();
}

0 comments on commit 1208fe9

Please sign in to comment.