From b73f041453572987e90b09b25aa653d2b42c95e5 Mon Sep 17 00:00:00 2001 From: SarveshOO7 Date: Sat, 23 Mar 2024 19:38:03 -0400 Subject: [PATCH] Use Rayon thread to do sort --- eggstrain/src/execution/operators/sort.rs | 23 +++++++++++++---------- eggstrain/src/main.rs | 1 + 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/eggstrain/src/execution/operators/sort.rs b/eggstrain/src/execution/operators/sort.rs index 27ffacf..4e7edac 100644 --- a/eggstrain/src/execution/operators/sort.rs +++ b/eggstrain/src/execution/operators/sort.rs @@ -30,17 +30,21 @@ impl Sort { } } - fn sort_in_mem(&self, rb: RecordBatch) -> Result { - assert_eq!(rb.schema(), self.input_schema); + fn sort_in_mem( + rb: RecordBatch, + limit_size: Option, + sort_expr: Vec, + ) -> Result { + // assert_eq!(rb.schema(), self.input_schema); - let expressions = self.sort_expr.clone(); + let expressions = sort_expr.clone(); let sort_columns = expressions .iter() .map(|expr| expr.evaluate_to_sort_column(&rb)) .collect::>>()?; - let indices = lexsort_to_indices(&sort_columns, self.limit_size)?; + let indices = lexsort_to_indices(&sort_columns, limit_size)?; let columns = rb .columns() @@ -90,9 +94,11 @@ impl UnaryOperator for Sort { } let merged_batch = concat_batches(&self.input_schema, &batches); - match merged_batch { + let limit_size = self.limit_size; + let sort_expr = self.sort_expr.clone(); + rayon::spawn(move || match merged_batch { Ok(merged_batch) => { - let sorted_batch = self.sort_in_mem(merged_batch).unwrap(); + let sorted_batch = Sort::sort_in_mem(merged_batch, limit_size, sort_expr).unwrap(); let mut current = 0; let total_rows = sorted_batch.num_rows(); while current + BATCH_SIZE < total_rows { @@ -104,11 +110,8 @@ impl UnaryOperator for Sort { let batch_to_send = sorted_batch.slice(current, total_rows - current); tx.send(batch_to_send) .expect("Unable to send the last sorted batch"); - - // TODO: do I have to call drop here manually or will rust take care of it? - // drop(sorted_batch); } Err(_) => todo!("Could not concat the batches for sorting"), - } + }); } } diff --git a/eggstrain/src/main.rs b/eggstrain/src/main.rs index df04a2f..e97ae0c 100644 --- a/eggstrain/src/main.rs +++ b/eggstrain/src/main.rs @@ -14,6 +14,7 @@ async fn main() -> Result<()> { // Run our execution engine on the physical plan let df_physical_plan = sql.clone().create_physical_plan().await?; + let df_physical_plan = df_physical_plan.children()[0].clone(); let results = run(df_physical_plan).await; results.into_iter().for_each(|batch| {