Skip to content

Commit

Permalink
Minor: Improve documentation about OnceAsync (apache#13223)
Browse files Browse the repository at this point in the history
* Minor: add documentation about OnceAsync

* More refinement

* Fix docs CI

* Update datafusion/physical-plan/src/joins/hash_join.rs

Co-authored-by: Eduard Karacharov <[email protected]>

---------

Co-authored-by: Eduard Karacharov <[email protected]>
  • Loading branch information
alamb and korowa authored Nov 5, 2024
1 parent 0458d30 commit eeb9d58
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 21 deletions.
28 changes: 21 additions & 7 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use datafusion_physical_expr::equivalence::join_equivalence_properties;
use async_trait::async_trait;
use futures::{ready, Stream, StreamExt, TryStreamExt};

/// Data of the left side
/// Data of the left side that is buffered into memory
#[derive(Debug)]
struct JoinLeftData {
/// Single RecordBatch with all rows from the left side
Expand All @@ -58,12 +58,20 @@ struct JoinLeftData {
}

#[allow(rustdoc::private_intra_doc_links)]
/// executes partitions in parallel and combines them into a set of
/// partitions by combining all values from the left with all values on the right
/// Cross Join Execution Plan
///
/// Note that the `Clone` trait is not implemented for this struct due to the
/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
/// left side with the processing in each output stream.
/// This operator is used when there are no predicates between two tables and
/// returns the Cartesian product of the two tables.
///
/// Buffers the left input into memory and then streams batches from each
/// partition on the right input combining them with the buffered left input
/// to generate the output.
///
/// # Clone / Shared State
///
/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
/// loading of the left side with the processing in each output stream.
/// Therefore it can not be [`Clone`]
#[derive(Debug)]
pub struct CrossJoinExec {
/// left (build) side which gets loaded in memory
Expand All @@ -72,10 +80,16 @@ pub struct CrossJoinExec {
pub right: Arc<dyn ExecutionPlan>,
/// The schema once the join is applied
schema: SchemaRef,
/// Build-side data
/// Buffered copy of left (build) side in memory.
///
/// This structure is *shared* across all output streams.
///
/// Each output stream waits on the `OnceAsync` to signal the completion of
/// the left side loading.
left_fut: OnceAsync<JoinLeftData>,
/// Execution plan metrics
metrics: ExecutionPlanMetricsSet,
/// Properties such as schema, equivalence properties, ordering, partitioning, etc.
cache: PlanProperties,
}

Expand Down
13 changes: 10 additions & 3 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,11 @@ impl JoinLeftData {
/// └───────────────┘ └───────────────┘
/// ```
///
/// Note that the `Clone` trait is not implemented for this struct due to the
/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
/// left side with the processing in each output stream.
/// # Clone / Shared State
///
/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
/// loading of the left side with the processing in each output stream.
/// Therefore it can not be [`Clone`]
#[derive(Debug)]
pub struct HashJoinExec {
/// left (build) side which gets hashed
Expand All @@ -314,6 +316,11 @@ pub struct HashJoinExec {
/// if there is a projection, the schema isn't the same as the output schema.
join_schema: SchemaRef,
/// Future that consumes left input and builds the hash table
///
/// For CollectLeft partition mode, this structure is *shared* across all output streams.
///
/// Each output stream waits on the `OnceAsync` to signal the completion of
/// the hash table creation.
left_fut: OnceAsync<JoinLeftData>,
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
Expand Down
19 changes: 12 additions & 7 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Defines the nested loop join plan, it supports all [`JoinType`].
//! The nested loop join can execute in parallel by partitions and it is
//! determined by the [`JoinType`].
//! [`NestedLoopJoinExec`]: joins without equijoin (equality predicates).
use std::any::Any;
use std::fmt::Formatter;
Expand Down Expand Up @@ -141,9 +139,11 @@ impl JoinLeftData {
/// "reports" about probe phase completion (which means that "visited" bitmap won't be
/// updated anymore), and only the last thread, reporting about completion, will return output.
///
/// Note that the `Clone` trait is not implemented for this struct due to the
/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
/// left side with the processing in each output stream.
/// # Clone / Shared State
///
/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
/// loading of the left side with the processing in each output stream.
/// Therefore it can not be [`Clone`]
#[derive(Debug)]
pub struct NestedLoopJoinExec {
/// left side
Expand All @@ -156,7 +156,12 @@ pub struct NestedLoopJoinExec {
pub(crate) join_type: JoinType,
/// The schema once the join is applied
schema: SchemaRef,
/// Build-side data
/// Future that consumes left input and buffers it in memory
///
/// This structure is *shared* across all output streams.
///
/// Each output stream waits on the `OnceAsync` to signal the completion of
/// the hash table creation.
inner_table: OnceAsync<JoinLeftData>,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
Expand Down
16 changes: 12 additions & 4 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,11 +700,19 @@ pub fn build_join_schema(
(fields.finish().with_metadata(metadata), column_indices)
}

/// A [`OnceAsync`] can be used to run an async closure once, with subsequent calls
/// to [`OnceAsync::once`] returning a [`OnceFut`] to the same asynchronous computation
/// A [`OnceAsync`] runs an `async` closure once, where multiple calls to
/// [`OnceAsync::once`] return a [`OnceFut`] that resolves to the result of the
/// same computation.
///
/// This is useful for joins where the results of one child are buffered in memory
/// and shared across potentially multiple output partitions
/// This is useful for joins where the results of one child are needed to proceed
/// with multiple output stream
///
///
/// For example, in a hash join, one input is buffered and shared across
/// potentially multiple output partitions. Each output partition must wait for
/// the hash table to be built before proceeding.
///
/// Each output partition waits on the same `OnceAsync` before proceeding.
pub(crate) struct OnceAsync<T> {
fut: Mutex<Option<OnceFut<T>>>,
}
Expand Down

0 comments on commit eeb9d58

Please sign in to comment.