Skip to content

Commit

Permalink
Fix ColumnInjectionPushDown bug & add UTs
Browse files Browse the repository at this point in the history
  • Loading branch information
liuminghui233 authored Dec 20, 2023
1 parent 6351e76 commit 50d01a7
Show file tree
Hide file tree
Showing 6 changed files with 905 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.iotdb.tsfile.utils.Preconditions.checkArgument;

/**
* <b>Optimization phase:</b> Distributed plan planning.
*
Expand Down Expand Up @@ -67,34 +65,38 @@ public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext conte
// SeriesAggregationNode,
// If it is and has overlap in groupByParameter, there is SlidingWindowNode
// There will be a ColumnInjectNode on them, so we need to check if it can be pushed down.
return plan.accept(new Rewriter(), new RewriterContext());
return plan.accept(new Rewriter(), null);
}
return plan;
}

private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> {
private static class Rewriter extends PlanVisitor<PlanNode, Void> {

@Override
public PlanNode visitPlan(PlanNode node, RewriterContext context) {
for (PlanNode child : node.getChildren()) {
context.setParent(node);
child.accept(this, context);
}
public PlanNode visitPlan(PlanNode node, Void context) {
// other source node, just return
return node;
}

@Override
public PlanNode visitMultiChildProcess(MultiChildProcessNode node, RewriterContext context) {
List<PlanNode> children = new ArrayList<>();
public PlanNode visitSingleChildProcess(SingleChildProcessNode node, Void context) {
PlanNode rewrittenChild = node.getChild().accept(this, context);
node.setChild(rewrittenChild);
return node;
}

@Override
public PlanNode visitMultiChildProcess(MultiChildProcessNode node, Void context) {
List<PlanNode> rewrittenChildren = new ArrayList<>();
for (PlanNode child : node.getChildren()) {
context.setParent(null);
children.add(child.accept(this, context));
rewrittenChildren.add(child.accept(this, context));
}
return node.cloneWithChildren(children);
node.setChildren(rewrittenChildren);
return node;
}

@Override
public PlanNode visitColumnInject(ColumnInjectNode node, RewriterContext context) {
public PlanNode visitColumnInject(ColumnInjectNode node, Void context) {
PlanNode child = node.getChild();

boolean columnInjectPushDown = true;
Expand All @@ -109,32 +111,9 @@ public PlanNode visitColumnInject(ColumnInjectNode node, RewriterContext context
}

if (columnInjectPushDown) {
return concatParentWithChild(context.getParent(), child);
}
return node;
}

private PlanNode concatParentWithChild(PlanNode parent, PlanNode child) {
if (parent == null) {
return child;
}

checkArgument(parent instanceof SingleChildProcessNode);
((SingleChildProcessNode) parent).setChild(child);
return parent;
}
}

private static class RewriterContext {

private PlanNode parent;

public PlanNode getParent() {
return parent;
}

public void setParent(PlanNode parent) {
this.parent = parent;
return node;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;

@SuppressWarnings("java:S6539") // suppress "Monster class" warning
public abstract class PlanVisitor<R, C> {

public R process(PlanNode node, C context) {
Expand Down Expand Up @@ -200,7 +201,7 @@ public R visitDeviceViewInto(DeviceViewIntoNode node, C context) {
}

public R visitColumnInject(ColumnInjectNode node, C context) {
return visitPlan(node, context);
return visitSingleChildProcess(node, context);
}

public R visitSingleDeviceView(SingleDeviceViewNode node, C context) {
Expand Down
Loading

0 comments on commit 50d01a7

Please sign in to comment.