diff --git a/eggstrain/src/execution/operators/hash_join.rs b/eggstrain/src/execution/operators/hash_join.rs index aafa899..a7fc6ae 100644 --- a/eggstrain/src/execution/operators/hash_join.rs +++ b/eggstrain/src/execution/operators/hash_join.rs @@ -84,7 +84,7 @@ impl HashJoin { Ok(batch) => { // TODO gather N batches and use rayon to insert all at once let hashes = self.hash_batch::(&batch)?; - record_table.insert_batch(batch, hashes); + record_table.insert_batch(batch, hashes)?; } Err(e) => match e { RecvError::Closed => break, @@ -113,29 +113,39 @@ impl HashJoin { ) -> Result<()> { let hashes = self.hash_batch::(&right_batch)?; - let left_column_count = self.left_schema.fields().size(); - let right_column_count = self.right_schema.fields().size(); - let output_columns = left_column_count + right_column_count - self.equate_on.len(); + // let left_column_count = self.left_schema.fields().size(); + // let right_column_count = self.right_schema.fields().size(); + // let output_columns = left_column_count + right_column_count - self.equate_on.len(); - for (right_row, &hash) in hashes.iter().enumerate() { - // Construct a RecordBatch for each tuple that might get joined with tuples in the hash table - let mut out_columns: Vec<(String, ArrayRef)> = Vec::with_capacity(output_columns); + let right_rows = table.buffer.record_batch_to_rows(right_batch)?; + for (row, &hash) in hashes.iter().enumerate() { // For each of these hashes, check if it is in the table - let Some(records) = table.get_records(hash) else { + let Some(records) = table.get_record_indices(hash) else { return Ok(()); }; assert!(!records.is_empty()); + // TODO + todo!("create a new RowConverter with the joined schema"); + // There are records associated with this hash value, so we need to emit things for &record in records { - let (left_batch, left_row) = table.get(record).unwrap(); + let left_tuple = table.buffer.get(record).unwrap(); + let right_tuple = right_rows.row(row); - // Left tuple is in `left_batch` at `left_row` offset - // Right tuple is in `right_batch` at `right_row` offset + todo!("Join the two tuples in some way, then append to a `Rows`") } - let joined_batch = RecordBatch::try_from_iter(out_columns)?; + let out_columns: Vec = todo!( + "Convert the `Rows` back into a `RecordBatch` with `RowConverter::convert_rows`" + ); + + todo!("Figure out names for each column"); + + let out_columns_iter = out_columns.into_iter().map(|col| ("name", col)); + + let joined_batch = RecordBatch::try_from_iter(out_columns_iter)?; tx.send(joined_batch) .expect("Unable to send the projected batch"); diff --git a/eggstrain/src/execution/record_buffer.rs b/eggstrain/src/execution/record_buffer.rs index 97de637..8043347 100644 --- a/eggstrain/src/execution/record_buffer.rs +++ b/eggstrain/src/execution/record_buffer.rs @@ -1,11 +1,13 @@ +use arrow::row::{Row, RowConverter, Rows, SortField}; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::Result; /// Make this an opaque type that can be used later, we may want to make this contiguous and then /// use offsets instead. #[derive(Clone, Copy)] pub struct RecordIndex { - index: u32, - row: u32, // by default this is just 0 + index: u32, // index into the vector of rows + row: u32, // index into the row group } /// TODO docs @@ -14,32 +16,58 @@ impl RecordIndex { Self { index, row } } - pub fn update_row(&mut self, row: u32) { - self.row = row; + // Use functional style due to easy copying + pub fn with_row(&self, row: u32) -> Self { + Self { + index: self.index, + row, + } } } +pub fn schema_to_fields(schema: SchemaRef) -> Vec { + schema + .fields() + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect::>() +} + pub struct RecordBuffer { - schema: SchemaRef, // The schema for all of the record batches - inner: Vec, // make this contiguous + schema: SchemaRef, + converter: RowConverter, + inner: Vec, // vector of row groups } impl RecordBuffer { pub fn new(schema: SchemaRef) -> Self { + let fields = schema_to_fields(schema.clone()); Self { schema, + converter: RowConverter::new(fields).expect("Unable to create a RowConverter"), inner: vec![], } } pub fn with_capacity(schema: SchemaRef, capacity: usize) -> Self { + let fields = schema_to_fields(schema.clone()); Self { schema, + converter: RowConverter::new(fields).expect("Unable to create a RowConverter"), inner: Vec::with_capacity(capacity), } } - pub fn insert(&mut self, batch: RecordBatch) -> RecordIndex { + pub fn converter(&self) -> &RowConverter { + &self.converter + } + + pub fn record_batch_to_rows(&self, batch: RecordBatch) -> Result { + // `Ok` to make use of `?` behavior + Ok(self.converter.convert_columns(batch.columns())?) + } + + pub fn insert(&mut self, batch: RecordBatch) -> Result { assert_eq!( self.schema, batch.schema(), @@ -50,20 +78,30 @@ impl RecordBuffer { "Maximum size for a RecordBuffer is u32::MAX" ); - self.inner.push(batch); + let rows = self.record_batch_to_rows(batch)?; + self.inner.push(rows); - RecordIndex { + Ok(RecordIndex { index: (self.inner.len() - 1) as u32, row: 0, - } + }) } - /// Retrieve the batch and row number associated with the RecordIndex - pub fn get(&self, index: RecordIndex) -> Option<(&RecordBatch, u32)> { + /// Retrieve the row group and row number associated with the RecordIndex + pub fn get_group(&self, index: RecordIndex) -> Option<(&Rows, u32)> { if (index.index as usize) >= self.inner.len() { return None; } Some((&self.inner[index.index as usize], index.row)) } + + /// Retrieve row / tuple associated with the RecordIndex + pub fn get(&self, index: RecordIndex) -> Option { + if (index.index as usize) >= self.inner.len() { + return None; + } + + Some(self.inner[index.index as usize].row(index.row as usize)) + } } diff --git a/eggstrain/src/execution/record_table.rs b/eggstrain/src/execution/record_table.rs index ec1ffaf..540fd84 100644 --- a/eggstrain/src/execution/record_table.rs +++ b/eggstrain/src/execution/record_table.rs @@ -1,11 +1,13 @@ use super::record_buffer::{RecordBuffer, RecordIndex}; +use arrow::row::RowConverter; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::Result; use std::collections::HashMap; // TODO replace with a raw table pub struct RecordTable { /// Maps a Hash value to a `RecordIndex` into the `RecordBuffer` inner: HashMap>, - buffer: RecordBuffer, + pub(crate) buffer: RecordBuffer, } impl RecordTable { @@ -23,7 +25,11 @@ impl RecordTable { } } - pub fn insert_batch(&mut self, batch: RecordBatch, hashes: Vec) { + pub fn converter(&self) -> &RowConverter { + self.buffer.converter() + } + + pub fn insert_batch(&mut self, batch: RecordBatch, hashes: Vec) -> Result<()> { assert_eq!( batch.num_rows(), hashes.len(), @@ -31,24 +37,20 @@ impl RecordTable { ); // Points to the location of the base of the record batch - let base_record_id = self.buffer.insert(batch); + let base_record_id = self.buffer.insert(batch)?; for (row, &hash) in hashes.iter().enumerate() { - // Given the row, we can create a record id for a specific tuple by updating the row - // from the base_record_id - let mut record_id = base_record_id; - record_id.update_row(row as u32); - // Insert the record into the hashtable bucket - self.inner.entry(hash).or_default().push(record_id) + self.inner + .entry(hash) + .or_default() + .push(base_record_id.with_row(row as u32)) } - } - pub fn get_records(&self, hash: u64) -> Option<&Vec> { - self.inner.get(&hash) + Ok(()) } - pub fn get(&self, index: RecordIndex) -> Option<(&RecordBatch, u32)> { - self.buffer.get(index) + pub fn get_record_indices(&self, hash: u64) -> Option<&Vec> { + self.inner.get(&hash) } }