Skip to content

Commit

Permalink
Derive Clone for more ExecutionPlans (#13203)
Browse files Browse the repository at this point in the history
* Derive `Clone` for more ExecutionPlans

* improve docs
  • Loading branch information
alamb authored Nov 2, 2024
1 parent 344f089 commit 89e96b4
Show file tree
Hide file tree
Showing 24 changed files with 36 additions and 19 deletions.
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl From<StreamType> for SendableRecordBatchStream {
}

/// Hash aggregate execution plan
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct AggregateExec {
/// Aggregation mode (full, partial)
mode: AggregateMode,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use futures::stream::{Stream, StreamExt};
/// reaches the `fetch` value.
///
/// See [`BatchCoalescer`] for more information
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CoalesceBatchesExec {
/// The input plan
input: Arc<dyn ExecutionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_execution::TaskContext;

/// Merge execution plan executes partitions in parallel and combines them into a single
/// partition. No guarantees are made about the order of the resulting partition.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CoalescePartitionsExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_physical_expr::EquivalenceProperties;
use log::trace;

/// Execution plan for empty relation with produce_one_row=false
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct EmptyExec {
/// The schema for the produced row
schema: SchemaRef,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use log::trace;

/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
/// include in its output batches.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct FilterExec {
/// The expression to filter on. This expression must evaluate to a boolean value.
predicate: Arc<dyn PhysicalExpr>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub type FileSinkExec = DataSinkExec;
/// Execution plan for writing record batches to a [`DataSink`]
///
/// Returns a single row with the number of values written
#[derive(Clone)]
pub struct DataSinkExec {
/// Input plan that produces the record batches to be written.
input: Arc<dyn ExecutionPlan>,
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};
/// Data of the left side
type JoinLeftData = (RecordBatch, MemoryReservation);

#[allow(rustdoc::private_intra_doc_links)]
/// executes partitions in parallel and combines them into a set of
/// partitions by combining all values from the left with all values on the right
///
/// Note that the `Clone` trait is not implemented for this struct due to the
/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
/// left side with the processing in each output stream.
#[derive(Debug)]
pub struct CrossJoinExec {
/// left (build) side which gets loaded in memory
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl JoinLeftData {
}
}

#[allow(rustdoc::private_intra_doc_links)]
/// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
/// partitions using a hash table and an optional filter list to apply post
/// join.
Expand Down Expand Up @@ -293,6 +294,10 @@ impl JoinLeftData {
/// │ "dimension" │ │ "fact" │
/// └───────────────┘ └───────────────┘
/// ```
///
/// Note that the `Clone` trait is not implemented for this struct due to the
/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
/// left side with the processing in each output stream.
#[derive(Debug)]
pub struct HashJoinExec {
/// left (build) side which gets hashed
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl JoinLeftData {
}
}

#[allow(rustdoc::private_intra_doc_links)]
/// NestedLoopJoinExec is build-probe join operator, whose main task is to
/// perform joins without any equijoin conditions in `ON` clause.
///
Expand Down Expand Up @@ -140,6 +141,9 @@ impl JoinLeftData {
/// "reports" about probe phase completion (which means that "visited" bitmap won't be
/// updated anymore), and only the last thread, reporting about completion, will return output.
///
/// Note that the `Clone` trait is not implemented for this struct due to the
/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
/// left side with the processing in each output stream.
#[derive(Debug)]
pub struct NestedLoopJoinExec {
/// left side
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use crate::{

/// join execution plan executes partitions in parallel and combines them into a set of
/// partitions.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SortMergeJoinExec {
/// Left sorted joining execution plan
pub left: Arc<dyn ExecutionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
/// making the smallest value in 'left_sorted' 1231 and any rows below (since ascending)
/// than that can be dropped from the inner buffer.
/// ```
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SymmetricHashJoinExec {
/// Left side stream
pub(crate) left: Arc<dyn ExecutionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use futures::stream::{Stream, StreamExt};
use log::trace;

/// Limit execution plan
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct GlobalLimitExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use futures::Stream;

/// Execution plan for reading in-memory batches of data
#[derive(Clone)]
pub struct MemoryExec {
/// The partitions to query
partitions: Vec<Vec<RecordBatch>>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/placeholder_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use datafusion_physical_expr::EquivalenceProperties;
use log::trace;

/// Execution plan for empty relation with produce_one_row=true
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct PlaceholderRowExec {
/// The schema for the produced row
schema: SchemaRef,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use futures::{ready, Stream, StreamExt};
/// Note that there won't be any limit or checks applied to detect
/// an infinite recursion, so it is up to the planner to ensure that
/// it won't happen.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RecursiveQueryExec {
/// Name of the query handler
name: String,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ impl BatchPartitioner {
/// Paper](https://w6113.github.io/files/papers/volcanoparallelism-89.pdf)
/// which uses the term "Exchange" for the concept of repartitioning
/// data across threads.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RepartitionExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ pub(crate) fn lexsort_to_indices_multi_columns(
///
/// Support sorting datasets that are larger than the memory allotted
/// by the memory manager, by spilling to disk.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SortExec {
/// Input schema
pub(crate) input: Arc<dyn ExecutionPlan>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use log::{debug, trace};
///
/// If any of the input partitions return an error, the error is propagated to
/// the output and inputs are not polled again.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SortPreservingMergeExec {
/// Input plan
input: Arc<dyn ExecutionPlan>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub trait PartitionStream: Debug + Send + Sync {
///
/// If your source can be represented as one or more [`PartitionStream`]s, you can
/// use this struct to implement [`ExecutionPlan`].
#[derive(Clone)]
pub struct StreamingTableExec {
partitions: Vec<Arc<dyn PartitionStream>>,
projection: Option<Arc<[usize]>>,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ use tokio::macros::support::thread_rng_n;
/// │Input 1 │ │Input 2 │
/// └─────────────────┘ └──────────────────┘
/// ```
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct UnionExec {
/// Input execution plan
inputs: Vec<Arc<dyn ExecutionPlan>>,
Expand Down Expand Up @@ -298,7 +298,7 @@ impl ExecutionPlan for UnionExec {
/// | |-----------------+
/// +---------+
/// ```
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InterleaveExec {
/// Input execution plan
inputs: Vec<Arc<dyn ExecutionPlan>>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use log::trace;
/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
///
/// See [`UnnestOptions`] for more details and an example.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct UnnestExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;

/// Execution plan for values list based relation (produces constant rows)
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ValuesExec {
/// The schema
schema: SchemaRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use indexmap::IndexMap;
use log::debug;

/// Window execution plan
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BoundedWindowAggExec {
/// Input plan
input: Arc<dyn ExecutionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use futures::{ready, Stream, StreamExt};

/// Window execution plan
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct WindowAggExec {
/// Input plan
pub(crate) input: Arc<dyn ExecutionPlan>,
Expand Down

0 comments on commit 89e96b4

Please sign in to comment.