From 6e59dd71289dd3c957b15c68925f78d2de84ec41 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Tue, 4 Jun 2024 19:38:57 +0800 Subject: [PATCH] chore(query): fix pipeline display (#15729) * chore(query): fix pipeline display * chore(query): fix pipeline display * chore(query): fix pipeline display --- .../pipeline/core/src/pipeline_display.rs | 43 +--- .../standalone/explain/explain_pipeline.test | 6 +- .../suites/mode/standalone/explain/sort.test | 66 ++--- .../mode/standalone/explain/window.test | 230 +++++++++--------- .../explain_native/explain_pipeline.test | 6 +- .../mode/standalone/explain_native/sort.test | 2 +- 6 files changed, 158 insertions(+), 195 deletions(-) diff --git a/src/query/pipeline/core/src/pipeline_display.rs b/src/query/pipeline/core/src/pipeline_display.rs index 8a5a0307b90a..e0f1b427d33b 100644 --- a/src/query/pipeline/core/src/pipeline_display.rs +++ b/src/query/pipeline/core/src/pipeline_display.rs @@ -46,51 +46,14 @@ impl<'a> Display for PipelineIndentDisplayWrapper<'a> { write!(f, " ")?; } + let pipe_name = Self::pipe_name(pipe); if pipe.input_length == pipe.output_length || pipe.input_length == 0 || pipe.output_length == 0 { - write!( - f, - "{} × {} {}", - Self::pipe_name(pipe), - pipe.items.len(), - if pipe.items.len() == 1 { - "processor" - } else { - "processors" - }, - )?; + write!(f, "{} × {}", Self::pipe_name(pipe), pipe.items.len(),)?; } else { - let prev_name = Self::pipe_name(&pipes[pipes.len() - index - 2]); - if index > 0 { - let post_name = Self::pipe_name(&pipes[pipes.len() - index]); - write!( - f, - "Merge ({} × {} {}) to ({} × {})", - prev_name, - pipe.input_length, - if pipe.input_length == 1 { - "processor" - } else { - "processors" - }, - post_name, - pipe.output_length, - )?; - } else { - write!( - f, - "Merge ({} × {} {})", - prev_name, - pipe.input_length, - if pipe.input_length == 1 { - "processor" - } else { - "processors" - }, - )?; - } + write!(f, "Merge to {pipe_name} × {}", pipe.output_length,)?; } } diff --git a/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test b/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test index 4ed0d8b08df2..bddfc8bf5c8c 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test @@ -15,9 +15,9 @@ onlyif mysql query T explain pipeline select a from t1 ignore_result ---- -EmptySink × 1 processor - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +EmptySink × 1 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 statement ok diff --git a/tests/sqllogictests/suites/mode/standalone/explain/sort.test b/tests/sqllogictests/suites/mode/standalone/explain/sort.test index 68e3c063c332..e08c72e02bc2 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/sort.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/sort.test @@ -1,5 +1,5 @@ statement ok -create table if not exists t1(a int, b int); +create or replace table t1(a int, b int); query T explain select a from (select * from t1 order by a) as t2 where a > 1; @@ -74,13 +74,13 @@ set sort_spilling_memory_ratio = 0; query T explain pipeline select a, b from t1 order by a; ---- -CompoundBlockOperator(Project) × 1 processor - Merge (TransformSortMerge × 4 processors) to (CompoundBlockOperator(Project) × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # Sort spilling @@ -90,14 +90,14 @@ set sort_spilling_memory_ratio = 60; query T explain pipeline select a, b from t1 order by a; ---- -CompoundBlockOperator(Project) × 1 processor - Merge (TransformSortSpill × 4 processors) to (CompoundBlockOperator(Project) × 1) - TransformSortSpill × 4 processors - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + Merge to MultiSortMerge × 1 + TransformSortSpill × 4 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 statement ok set sort_spilling_memory_ratio = 0; @@ -106,14 +106,14 @@ set sort_spilling_memory_ratio = 0; query T explain pipeline select a + 1, b from t1 order by a + 1; ---- -CompoundBlockOperator(Project) × 1 processor - Merge (TransformSortMerge × 4 processors) to (CompoundBlockOperator(Project) × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (CompoundBlockOperator(Map) × 1 processor) to (SortPartialTransform × 4) - CompoundBlockOperator(Map) × 1 processor - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + CompoundBlockOperator(Map) × 1 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # Sort spilling @@ -123,15 +123,15 @@ set sort_spilling_memory_ratio = 60; query T explain pipeline select a + 1, b from t1 order by a + 1; ---- -CompoundBlockOperator(Project) × 1 processor - Merge (TransformSortSpill × 4 processors) to (CompoundBlockOperator(Project) × 1) - TransformSortSpill × 4 processors - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (CompoundBlockOperator(Map) × 1 processor) to (SortPartialTransform × 4) - CompoundBlockOperator(Map) × 1 processor - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + Merge to MultiSortMerge × 1 + TransformSortSpill × 4 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + CompoundBlockOperator(Map) × 1 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 statement ok drop table if exists t1; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/window.test b/tests/sqllogictests/suites/mode/standalone/explain/window.test index 2fdefccd6b07..d2483adae61a 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/window.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/window.test @@ -43,14 +43,14 @@ set sort_spilling_memory_ratio = 0; query T explain pipeline SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno; ---- -CompoundBlockOperator(Project) × 1 processor - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # Enable sort spilling @@ -60,15 +60,15 @@ set sort_spilling_memory_ratio = 60; query T explain pipeline SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno; ---- -CompoundBlockOperator(Project) × 1 processor - Transform Window × 1 processor - Merge (TransformSortSpill × 4 processors) to (Transform Window × 1) - TransformSortSpill × 4 processors - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortSpill × 4 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 statement ok @@ -348,15 +348,15 @@ set sort_spilling_memory_ratio = 0; query T explain pipeline select a, sum(a) over (partition by a order by a desc) from t limit 3 ---- -CompoundBlockOperator(Project) × 1 processor - LimitTransform × 1 processor - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + LimitTransform × 1 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # Enable sort spilling statement ok @@ -366,16 +366,16 @@ set sort_spilling_memory_ratio = 60; query T explain pipeline select a, sum(a) over (partition by a order by a desc) from t limit 3 ---- -CompoundBlockOperator(Project) × 1 processor - LimitTransform × 1 processor - Transform Window × 1 processor - Merge (TransformSortSpill × 4 processors) to (Transform Window × 1) - TransformSortSpill × 4 processors - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + LimitTransform × 1 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortSpill × 4 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # Disable sort spilling @@ -386,84 +386,84 @@ set sort_spilling_memory_ratio = 0; query T explain pipeline select a, dense_rank() over (partition by a order by a desc) from t limit 3 ---- -CompoundBlockOperator(Project) × 1 processor - LimitTransform × 1 processor - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + LimitTransform × 1 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # rows frame single window (can push down limit) query T explain pipeline select a, sum(a) over (partition by a order by a desc rows between unbounded preceding and current row) from t limit 3 ---- -CompoundBlockOperator(Project) × 1 processor - LimitTransform × 1 processor - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + LimitTransform × 1 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # rows frame single window (can not push down limit) query T explain pipeline select a, sum(a) over (partition by a order by a desc rows between unbounded preceding and unbounded following) from t limit 3 ---- -CompoundBlockOperator(Project) × 1 processor - LimitTransform × 1 processor - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + LimitTransform × 1 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # rows frame multi window (can not push down limit) query T explain pipeline select a, sum(a) over (partition by a order by a desc rows between unbounded preceding and current row), avg(a) over (order by a rows between unbounded preceding and current row) from t limit 3 ---- -CompoundBlockOperator(Project) × 1 processor - LimitTransform × 1 processor - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (Transform Window × 1 processor) to (SortPartialTransform × 4) - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + LimitTransform × 1 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # row fetch with window function(pipeline explain) query T explain pipeline select *, sum(a) over (partition by a order by a desc rows between unbounded preceding and current row) from t where a > 1 order by b limit 3; ---- -CompoundBlockOperator(Project) × 1 processor - TransformRowsFetcher × 1 processor - LimitTransform × 1 processor - Merge (TransformSortMergeLimit × 4 processors) to (LimitTransform × 1) - TransformSortMergeLimit × 4 processors - SortPartialTransform × 4 processors - Merge (Transform Window × 1 processor) to (SortPartialTransform × 4) - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (TransformFilter × 1 processor) to (SortPartialTransform × 4) - TransformFilter × 1 processor - AddInternalColumnsTransform × 1 processor - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + TransformRowsFetcher × 1 + LimitTransform × 1 + Merge to MultiSortMerge × 1 + TransformSortMergeLimit × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + TransformFilter × 1 + AddInternalColumnsTransform × 1 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # row fetch with window function(plan explain) query T @@ -516,33 +516,33 @@ CREATE TABLE table43764_orc (rowkey VARCHAR NOT NULL, time TIMESTAMP NULL, sirc_ query T explain pipeline select time, rowkey from (select *, row_number() over(partition by rowkey order by time desc) as rn from table43764_orc) a where rn = 1 limit 4 ---- -CompoundBlockOperator(Project) × 1 processor - LimitTransform × 1 processor - TransformFilter × 1 processor - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor +CompoundBlockOperator(Project) × 1 + LimitTransform × 1 + TransformFilter × 1 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + DeserializeDataTransform × 1 + SyncReadParquetDataSource × 1 # same order multi window query T explain pipeline select *,lead(number,1, 42) over (order by number), lead(number,2,44) over (order by number), lead(number,3,44) over (order by number) from numbers(5); ---- -CompoundBlockOperator(Project) × 1 processor - Transform Window × 1 processor - CompoundBlockOperator(Map) × 1 processor - Transform Window × 1 processor - CompoundBlockOperator(Map) × 1 processor - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (CompoundBlockOperator(Map) × 1 processor) to (SortPartialTransform × 4) - CompoundBlockOperator(Map) × 1 processor - NumbersSourceTransform × 1 processor +CompoundBlockOperator(Project) × 1 + Transform Window × 1 + CompoundBlockOperator(Map) × 1 + Transform Window × 1 + CompoundBlockOperator(Map) × 1 + Transform Window × 1 + Merge to MultiSortMerge × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + CompoundBlockOperator(Map) × 1 + NumbersSourceTransform × 1 statement ok DROP DATABASE test_explain_window; diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test b/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test index f8c17e624110..35208a8df0ad 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test @@ -15,9 +15,9 @@ onlyif mysql query T explain pipeline select a from t1 ignore_result ---- -EmptySink × 1 processor - NativeDeserializeDataTransform × 1 processor - SyncReadNativeDataSource × 1 processor +EmptySink × 1 + NativeDeserializeDataTransform × 1 + SyncReadNativeDataSource × 1 statement ok diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/sort.test b/tests/sqllogictests/suites/mode/standalone/explain_native/sort.test index 23d5a365898b..1a184227f965 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/sort.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/sort.test @@ -1,5 +1,5 @@ statement ok -create table if not exists t1(a int, b int); +create or replace table t1(a int, b int); query T explain select a from (select * from t1 order by a) as t2 where a > 1;