Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
refactor to row-oriented hash table storage
Browse files Browse the repository at this point in the history
  • Loading branch information
connortsui20 committed Feb 26, 2024
1 parent 6066b2d commit 7ab03d6
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 38 deletions.
34 changes: 22 additions & 12 deletions eggstrain/src/execution/operators/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl HashJoin {
Ok(batch) => {
// TODO gather N batches and use rayon to insert all at once
let hashes = self.hash_batch::<true>(&batch)?;
record_table.insert_batch(batch, hashes);
record_table.insert_batch(batch, hashes)?;
}
Err(e) => match e {
RecvError::Closed => break,
Expand Down Expand Up @@ -113,29 +113,39 @@ impl HashJoin {
) -> Result<()> {
let hashes = self.hash_batch::<false>(&right_batch)?;

let left_column_count = self.left_schema.fields().size();
let right_column_count = self.right_schema.fields().size();
let output_columns = left_column_count + right_column_count - self.equate_on.len();
// let left_column_count = self.left_schema.fields().size();
// let right_column_count = self.right_schema.fields().size();
// let output_columns = left_column_count + right_column_count - self.equate_on.len();

for (right_row, &hash) in hashes.iter().enumerate() {
// Construct a RecordBatch for each tuple that might get joined with tuples in the hash table
let mut out_columns: Vec<(String, ArrayRef)> = Vec::with_capacity(output_columns);
let right_rows = table.buffer.record_batch_to_rows(right_batch)?;

for (row, &hash) in hashes.iter().enumerate() {
// For each of these hashes, check if it is in the table
let Some(records) = table.get_records(hash) else {
let Some(records) = table.get_record_indices(hash) else {
return Ok(());
};
assert!(!records.is_empty());

// TODO
todo!("create a new RowConverter with the joined schema");

// There are records associated with this hash value, so we need to emit things
for &record in records {
let (left_batch, left_row) = table.get(record).unwrap();
let left_tuple = table.buffer.get(record).unwrap();
let right_tuple = right_rows.row(row);

// Left tuple is in `left_batch` at `left_row` offset
// Right tuple is in `right_batch` at `right_row` offset
todo!("Join the two tuples in some way, then append to a `Rows`")
}

let joined_batch = RecordBatch::try_from_iter(out_columns)?;
let out_columns: Vec<ArrayRef> = todo!(
"Convert the `Rows` back into a `RecordBatch` with `RowConverter::convert_rows`"
);

todo!("Figure out names for each column");

let out_columns_iter = out_columns.into_iter().map(|col| ("name", col));

let joined_batch = RecordBatch::try_from_iter(out_columns_iter)?;

tx.send(joined_batch)
.expect("Unable to send the projected batch");
Expand Down
62 changes: 50 additions & 12 deletions eggstrain/src/execution/record_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use arrow::row::{Row, RowConverter, Rows, SortField};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::Result;

/// Make this an opaque type that can be used later, we may want to make this contiguous and then
/// use offsets instead.
#[derive(Clone, Copy)]
pub struct RecordIndex {
index: u32,
row: u32, // by default this is just 0
index: u32, // index into the vector of rows
row: u32, // index into the row group
}

/// TODO docs
Expand All @@ -14,32 +16,58 @@ impl RecordIndex {
Self { index, row }
}

pub fn update_row(&mut self, row: u32) {
self.row = row;
// Use functional style due to easy copying
pub fn with_row(&self, row: u32) -> Self {
Self {
index: self.index,
row,
}
}
}

pub fn schema_to_fields(schema: SchemaRef) -> Vec<SortField> {
schema
.fields()
.iter()
.map(|f| SortField::new(f.data_type().clone()))
.collect::<Vec<_>>()
}

pub struct RecordBuffer {
schema: SchemaRef, // The schema for all of the record batches
inner: Vec<RecordBatch>, // make this contiguous
schema: SchemaRef,
converter: RowConverter,
inner: Vec<Rows>, // vector of row groups
}

impl RecordBuffer {
pub fn new(schema: SchemaRef) -> Self {
let fields = schema_to_fields(schema.clone());
Self {
schema,
converter: RowConverter::new(fields).expect("Unable to create a RowConverter"),
inner: vec![],
}
}

pub fn with_capacity(schema: SchemaRef, capacity: usize) -> Self {
let fields = schema_to_fields(schema.clone());
Self {
schema,
converter: RowConverter::new(fields).expect("Unable to create a RowConverter"),
inner: Vec::with_capacity(capacity),
}
}

pub fn insert(&mut self, batch: RecordBatch) -> RecordIndex {
pub fn converter(&self) -> &RowConverter {
&self.converter
}

pub fn record_batch_to_rows(&self, batch: RecordBatch) -> Result<Rows> {
// `Ok` to make use of `?` behavior
Ok(self.converter.convert_columns(batch.columns())?)
}

pub fn insert(&mut self, batch: RecordBatch) -> Result<RecordIndex> {
assert_eq!(
self.schema,
batch.schema(),
Expand All @@ -50,20 +78,30 @@ impl RecordBuffer {
"Maximum size for a RecordBuffer is u32::MAX"
);

self.inner.push(batch);
let rows = self.record_batch_to_rows(batch)?;
self.inner.push(rows);

RecordIndex {
Ok(RecordIndex {
index: (self.inner.len() - 1) as u32,
row: 0,
}
})
}

/// Retrieve the batch and row number associated with the RecordIndex
pub fn get(&self, index: RecordIndex) -> Option<(&RecordBatch, u32)> {
/// Retrieve the row group and row number associated with the RecordIndex
pub fn get_group(&self, index: RecordIndex) -> Option<(&Rows, u32)> {
if (index.index as usize) >= self.inner.len() {
return None;
}

Some((&self.inner[index.index as usize], index.row))
}

/// Retrieve row / tuple associated with the RecordIndex
pub fn get(&self, index: RecordIndex) -> Option<Row> {
if (index.index as usize) >= self.inner.len() {
return None;
}

Some(self.inner[index.index as usize].row(index.row as usize))
}
}
30 changes: 16 additions & 14 deletions eggstrain/src/execution/record_table.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::record_buffer::{RecordBuffer, RecordIndex};
use arrow::row::RowConverter;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::Result;
use std::collections::HashMap; // TODO replace with a raw table

pub struct RecordTable {
/// Maps a Hash value to a `RecordIndex` into the `RecordBuffer`
inner: HashMap<u64, Vec<RecordIndex>>,
buffer: RecordBuffer,
pub(crate) buffer: RecordBuffer,
}

impl RecordTable {
Expand All @@ -23,32 +25,32 @@ impl RecordTable {
}
}

pub fn insert_batch(&mut self, batch: RecordBatch, hashes: Vec<u64>) {
pub fn converter(&self) -> &RowConverter {
self.buffer.converter()
}

pub fn insert_batch(&mut self, batch: RecordBatch, hashes: Vec<u64>) -> Result<()> {
assert_eq!(
batch.num_rows(),
hashes.len(),
"There should be an equal amount of batch rows and hashed values"
);

// Points to the location of the base of the record batch
let base_record_id = self.buffer.insert(batch);
let base_record_id = self.buffer.insert(batch)?;

for (row, &hash) in hashes.iter().enumerate() {
// Given the row, we can create a record id for a specific tuple by updating the row
// from the base_record_id
let mut record_id = base_record_id;
record_id.update_row(row as u32);

// Insert the record into the hashtable bucket
self.inner.entry(hash).or_default().push(record_id)
self.inner
.entry(hash)
.or_default()
.push(base_record_id.with_row(row as u32))
}
}

pub fn get_records(&self, hash: u64) -> Option<&Vec<RecordIndex>> {
self.inner.get(&hash)
Ok(())
}

pub fn get(&self, index: RecordIndex) -> Option<(&RecordBatch, u32)> {
self.buffer.get(index)
pub fn get_record_indices(&self, hash: u64) -> Option<&Vec<RecordIndex>> {
self.inner.get(&hash)
}
}

0 comments on commit 7ab03d6

Please sign in to comment.