Skip to content

Commit

Permalink
chore(query): fix pipeline display (#15729)
Browse files Browse the repository at this point in the history
* chore(query): fix pipeline display

* chore(query): fix pipeline display

* chore(query): fix pipeline display
  • Loading branch information
sundy-li authored Jun 4, 2024
1 parent e8374f6 commit 6e59dd7
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 195 deletions.
43 changes: 3 additions & 40 deletions src/query/pipeline/core/src/pipeline_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,51 +46,14 @@ impl<'a> Display for PipelineIndentDisplayWrapper<'a> {
write!(f, " ")?;
}

let pipe_name = Self::pipe_name(pipe);
if pipe.input_length == pipe.output_length
|| pipe.input_length == 0
|| pipe.output_length == 0
{
write!(
f,
"{} × {} {}",
Self::pipe_name(pipe),
pipe.items.len(),
if pipe.items.len() == 1 {
"processor"
} else {
"processors"
},
)?;
write!(f, "{} × {}", Self::pipe_name(pipe), pipe.items.len(),)?;
} else {
let prev_name = Self::pipe_name(&pipes[pipes.len() - index - 2]);
if index > 0 {
let post_name = Self::pipe_name(&pipes[pipes.len() - index]);
write!(
f,
"Merge ({} × {} {}) to ({} × {})",
prev_name,
pipe.input_length,
if pipe.input_length == 1 {
"processor"
} else {
"processors"
},
post_name,
pipe.output_length,
)?;
} else {
write!(
f,
"Merge ({} × {} {})",
prev_name,
pipe.input_length,
if pipe.input_length == 1 {
"processor"
} else {
"processors"
},
)?;
}
write!(f, "Merge to {pipe_name} × {}", pipe.output_length,)?;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ onlyif mysql
query T
explain pipeline select a from t1 ignore_result
----
EmptySink × 1 processor
DeserializeDataTransform × 1 processor
SyncReadParquetDataSource × 1 processor
EmptySink × 1
DeserializeDataTransform × 1
SyncReadParquetDataSource × 1


statement ok
Expand Down
66 changes: 33 additions & 33 deletions tests/sqllogictests/suites/mode/standalone/explain/sort.test
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
create table if not exists t1(a int, b int);
create or replace table t1(a int, b int);

query T
explain select a from (select * from t1 order by a) as t2 where a > 1;
Expand Down Expand Up @@ -74,13 +74,13 @@ set sort_spilling_memory_ratio = 0;
query T
explain pipeline select a, b from t1 order by a;
----
CompoundBlockOperator(Project) × 1 processor
Merge (TransformSortMerge × 4 processors) to (CompoundBlockOperator(Project) × 1)
TransformSortMerge × 4 processors
SortPartialTransform × 4 processors
Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4)
DeserializeDataTransform × 1 processor
SyncReadParquetDataSource × 1 processor
CompoundBlockOperator(Project) × 1
Merge to MultiSortMerge × 1
TransformSortMerge × 4
SortPartialTransform × 4
Merge to Resize × 4
DeserializeDataTransform × 1
SyncReadParquetDataSource × 1


# Sort spilling
Expand All @@ -90,14 +90,14 @@ set sort_spilling_memory_ratio = 60;
query T
explain pipeline select a, b from t1 order by a;
----
CompoundBlockOperator(Project) × 1 processor
Merge (TransformSortSpill × 4 processors) to (CompoundBlockOperator(Project) × 1)
TransformSortSpill × 4 processors
TransformSortMerge × 4 processors
SortPartialTransform × 4 processors
Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4)
DeserializeDataTransform × 1 processor
SyncReadParquetDataSource × 1 processor
CompoundBlockOperator(Project) × 1
Merge to MultiSortMerge × 1
TransformSortSpill × 4
TransformSortMerge × 4
SortPartialTransform × 4
Merge to Resize × 4
DeserializeDataTransform × 1
SyncReadParquetDataSource × 1

statement ok
set sort_spilling_memory_ratio = 0;
Expand All @@ -106,14 +106,14 @@ set sort_spilling_memory_ratio = 0;
query T
explain pipeline select a + 1, b from t1 order by a + 1;
----
CompoundBlockOperator(Project) × 1 processor
Merge (TransformSortMerge × 4 processors) to (CompoundBlockOperator(Project) × 1)
TransformSortMerge × 4 processors
SortPartialTransform × 4 processors
Merge (CompoundBlockOperator(Map) × 1 processor) to (SortPartialTransform × 4)
CompoundBlockOperator(Map) × 1 processor
DeserializeDataTransform × 1 processor
SyncReadParquetDataSource × 1 processor
CompoundBlockOperator(Project) × 1
Merge to MultiSortMerge × 1
TransformSortMerge × 4
SortPartialTransform × 4
Merge to Resize × 4
CompoundBlockOperator(Map) × 1
DeserializeDataTransform × 1
SyncReadParquetDataSource × 1


# Sort spilling
Expand All @@ -123,15 +123,15 @@ set sort_spilling_memory_ratio = 60;
query T
explain pipeline select a + 1, b from t1 order by a + 1;
----
CompoundBlockOperator(Project) × 1 processor
Merge (TransformSortSpill × 4 processors) to (CompoundBlockOperator(Project) × 1)
TransformSortSpill × 4 processors
TransformSortMerge × 4 processors
SortPartialTransform × 4 processors
Merge (CompoundBlockOperator(Map) × 1 processor) to (SortPartialTransform × 4)
CompoundBlockOperator(Map) × 1 processor
DeserializeDataTransform × 1 processor
SyncReadParquetDataSource × 1 processor
CompoundBlockOperator(Project) × 1
Merge to MultiSortMerge × 1
TransformSortSpill × 4
TransformSortMerge × 4
SortPartialTransform × 4
Merge to Resize × 4
CompoundBlockOperator(Map) × 1
DeserializeDataTransform × 1
SyncReadParquetDataSource × 1

statement ok
drop table if exists t1;
Loading

0 comments on commit 6e59dd7

Please sign in to comment.