Skip to content

Commit

Permalink
feat: pull input batches in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Sep 16, 2024
1 parent 2108180 commit 5efd919
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 1 deletion.
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ crc32fast = "1.3.2"
simd-adler32 = "0.3.7"
datafusion-comet-spark-expr = { workspace = true }
datafusion-comet-proto = { workspace = true }
rayon = "1.10.0"

[dev-dependencies]
pprof = { version = "0.13.0", features = ["flamegraph"] }
Expand Down
4 changes: 3 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use jni::{
};
use std::{collections::HashMap, sync::Arc, task::Poll};

use rayon::prelude::*;

use super::{serde, utils::SparkArrowConvert, CometMemoryPool};

use crate::{
Expand Down Expand Up @@ -322,7 +324,7 @@ fn prepare_output(
/// operators before polling the stream,
#[inline]
fn pull_input_batches(exec_context: &mut ExecutionContext) -> Result<(), CometError> {
exec_context.scans.iter_mut().try_for_each(|scan| {
exec_context.scans.par_iter_mut().try_for_each(|scan| {
scan.get_next_batch()?;
Ok::<(), CometError>(())
})
Expand Down

0 comments on commit 5efd919

Please sign in to comment.