From 89e96b404f07900469fee31740f43edd8a410a10 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 2 Nov 2024 06:24:37 -0400 Subject: [PATCH] Derive `Clone` for more ExecutionPlans (#13203) * Derive `Clone` for more ExecutionPlans * improve docs --- datafusion/physical-plan/src/aggregates/mod.rs | 2 +- datafusion/physical-plan/src/coalesce_batches.rs | 2 +- datafusion/physical-plan/src/coalesce_partitions.rs | 2 +- datafusion/physical-plan/src/empty.rs | 2 +- datafusion/physical-plan/src/filter.rs | 2 +- datafusion/physical-plan/src/insert.rs | 1 + datafusion/physical-plan/src/joins/cross_join.rs | 5 +++++ datafusion/physical-plan/src/joins/hash_join.rs | 5 +++++ datafusion/physical-plan/src/joins/nested_loop_join.rs | 4 ++++ datafusion/physical-plan/src/joins/sort_merge_join.rs | 2 +- datafusion/physical-plan/src/joins/symmetric_hash_join.rs | 2 +- datafusion/physical-plan/src/limit.rs | 2 +- datafusion/physical-plan/src/memory.rs | 1 + datafusion/physical-plan/src/placeholder_row.rs | 2 +- datafusion/physical-plan/src/recursive_query.rs | 2 +- datafusion/physical-plan/src/repartition/mod.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 2 +- datafusion/physical-plan/src/sorts/sort_preserving_merge.rs | 2 +- datafusion/physical-plan/src/streaming.rs | 1 + datafusion/physical-plan/src/union.rs | 4 ++-- datafusion/physical-plan/src/unnest.rs | 2 +- datafusion/physical-plan/src/values.rs | 2 +- .../physical-plan/src/windows/bounded_window_agg_exec.rs | 2 +- datafusion/physical-plan/src/windows/window_agg_exec.rs | 2 +- 24 files changed, 36 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4193cc187e10..5ffe797c5c26 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -344,7 +344,7 @@ impl From for SendableRecordBatchStream { } /// Hash aggregate execution plan -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AggregateExec { /// Aggregation mode (full, partial) mode: AggregateMode, diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 61fb3599f013..11678e7a4696 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -48,7 +48,7 @@ use futures::stream::{Stream, StreamExt}; /// reaches the `fetch` value. /// /// See [`BatchCoalescer`] for more information -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CoalesceBatchesExec { /// The input plan input: Arc, diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index f9d4ec6a1a34..3da101d6092f 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -36,7 +36,7 @@ use datafusion_execution::TaskContext; /// Merge execution plan executes partitions in parallel and combines them into a single /// partition. No guarantees are made about the order of the resulting partition. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CoalescePartitionsExec { /// Input execution plan input: Arc, diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index f6e0abb94fa8..192619f69f6a 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -35,7 +35,7 @@ use datafusion_physical_expr::EquivalenceProperties; use log::trace; /// Execution plan for empty relation with produce_one_row=false -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EmptyExec { /// The schema for the produced row schema: SchemaRef, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 30b0af19f43b..97d8159137f4 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -54,7 +54,7 @@ use log::trace; /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FilterExec { /// The expression to filter on. This expression must evaluate to a boolean value. predicate: Arc, diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 8b3ef5ae01e4..e478cecb7ffc 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -79,6 +79,7 @@ pub type FileSinkExec = DataSinkExec; /// Execution plan for writing record batches to a [`DataSink`] /// /// Returns a single row with the number of values written +#[derive(Clone)] pub struct DataSinkExec { /// Input plan that produces the record batches to be written. input: Arc, diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 8f49885068fd..a67e1df47bc7 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -49,8 +49,13 @@ use futures::{ready, Stream, StreamExt, TryStreamExt}; /// Data of the left side type JoinLeftData = (RecordBatch, MemoryReservation); +#[allow(rustdoc::private_intra_doc_links)] /// executes partitions in parallel and combines them into a set of /// partitions by combining all values from the left with all values on the right +/// +/// Note that the `Clone` trait is not implemented for this struct due to the +/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the +/// left side with the processing in each output stream. #[derive(Debug)] pub struct CrossJoinExec { /// left (build) side which gets loaded in memory diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index c56c179c17eb..57d8a9ce7b35 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -136,6 +136,7 @@ impl JoinLeftData { } } +#[allow(rustdoc::private_intra_doc_links)] /// Join execution plan: Evaluates eqijoin predicates in parallel on multiple /// partitions using a hash table and an optional filter list to apply post /// join. @@ -293,6 +294,10 @@ impl JoinLeftData { /// │ "dimension" │ │ "fact" │ /// └───────────────┘ └───────────────┘ /// ``` +/// +/// Note that the `Clone` trait is not implemented for this struct due to the +/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the +/// left side with the processing in each output stream. #[derive(Debug)] pub struct HashJoinExec { /// left (build) side which gets hashed diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index a87743565adf..f36c2395e20f 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -105,6 +105,7 @@ impl JoinLeftData { } } +#[allow(rustdoc::private_intra_doc_links)] /// NestedLoopJoinExec is build-probe join operator, whose main task is to /// perform joins without any equijoin conditions in `ON` clause. /// @@ -140,6 +141,9 @@ impl JoinLeftData { /// "reports" about probe phase completion (which means that "visited" bitmap won't be /// updated anymore), and only the last thread, reporting about completion, will return output. /// +/// Note that the `Clone` trait is not implemented for this struct due to the +/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the +/// left side with the processing in each output stream. #[derive(Debug)] pub struct NestedLoopJoinExec { /// left side diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 90dc407fcaed..3ad892c880f6 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -71,7 +71,7 @@ use crate::{ /// join execution plan executes partitions in parallel and combines them into a set of /// partitions. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SortMergeJoinExec { /// Left sorted joining execution plan pub left: Arc, diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 81c13c652513..5b6dc2cd2ae9 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -167,7 +167,7 @@ const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; /// making the smallest value in 'left_sorted' 1231 and any rows below (since ascending) /// than that can be dropped from the inner buffer. /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SymmetricHashJoinExec { /// Left side stream pub(crate) left: Arc, diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 1fe550a93056..ab1e6cb37bc8 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -39,7 +39,7 @@ use futures::stream::{Stream, StreamExt}; use log::trace; /// Limit execution plan -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct GlobalLimitExec { /// Input execution plan input: Arc, diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 56ed144845a0..c9ada345afc7 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -40,6 +40,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use futures::Stream; /// Execution plan for reading in-memory batches of data +#[derive(Clone)] pub struct MemoryExec { /// The partitions to query partitions: Vec>, diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 5d8ca7e76935..f9437f46f8a6 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -37,7 +37,7 @@ use datafusion_physical_expr::EquivalenceProperties; use log::trace; /// Execution plan for empty relation with produce_one_row=true -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PlaceholderRowExec { /// The schema for the produced row schema: SchemaRef, diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index e9ea9d4f5032..cbf22a4b392f 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -53,7 +53,7 @@ use futures::{ready, Stream, StreamExt}; /// Note that there won't be any limit or checks applied to detect /// an infinite recursion, so it is up to the planner to ensure that /// it won't happen. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RecursiveQueryExec { /// Name of the query handler name: String, diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 06144f98c89d..bc65b251561b 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -399,7 +399,7 @@ impl BatchPartitioner { /// Paper](https://w6113.github.io/files/papers/volcanoparallelism-89.pdf) /// which uses the term "Exchange" for the concept of repartitioning /// data across threads. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RepartitionExec { /// Input execution plan input: Arc, diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 32d6d3e0073c..d90d0f64ceb4 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -675,7 +675,7 @@ pub(crate) fn lexsort_to_indices_multi_columns( /// /// Support sorting datasets that are larger than the memory allotted /// by the memory manager, by spilling to disk. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SortExec { /// Input schema pub(crate) input: Arc, diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index ae39cfe412ba..9ee0faaa0a44 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -71,7 +71,7 @@ use log::{debug, trace}; /// /// If any of the input partitions return an error, the error is propagated to /// the output and inputs are not polled again. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SortPreservingMergeExec { /// Input plan input: Arc, diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index cdb94af1fe8a..7ccef3248069 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -55,6 +55,7 @@ pub trait PartitionStream: Debug + Send + Sync { /// /// If your source can be represented as one or more [`PartitionStream`]s, you can /// use this struct to implement [`ExecutionPlan`]. +#[derive(Clone)] pub struct StreamingTableExec { partitions: Vec>, projection: Option>, diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 69cc63f8f65d..bd36753880eb 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -85,7 +85,7 @@ use tokio::macros::support::thread_rng_n; /// │Input 1 │ │Input 2 │ /// └─────────────────┘ └──────────────────┘ /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct UnionExec { /// Input execution plan inputs: Vec>, @@ -298,7 +298,7 @@ impl ExecutionPlan for UnionExec { /// | |-----------------+ /// +---------+ /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InterleaveExec { /// Input execution plan inputs: Vec>, diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 3e312b7451be..b7b9f17eb1b6 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -56,7 +56,7 @@ use log::trace; /// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m') /// /// See [`UnnestOptions`] for more details and an example. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct UnnestExec { /// Input execution plan input: Arc, diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 991146d245a7..edadf98cb10c 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -36,7 +36,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; /// Execution plan for values list based relation (produces constant rows) -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ValuesExec { /// The schema schema: SchemaRef, diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 2c60be49a480..8c0331f94570 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -67,7 +67,7 @@ use indexmap::IndexMap; use log::debug; /// Window execution plan -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BoundedWindowAggExec { /// Input plan input: Arc, diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 1318f36f269e..f71a0b9fd095 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -46,7 +46,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use futures::{ready, Stream, StreamExt}; /// Window execution plan -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct WindowAggExec { /// Input plan pub(crate) input: Arc,