diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java index 40a869d18a13..b8cd9f43453c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java @@ -35,8 +35,6 @@ import java.util.ArrayList; import java.util.List; -import static org.apache.iotdb.tsfile.utils.Preconditions.checkArgument; - /** * Optimization phase: Distributed plan planning. * @@ -67,34 +65,38 @@ public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext conte // SeriesAggregationNode, // If it is and has overlap in groupByParameter, there is SlidingWindowNode // There will be a ColumnInjectNode on them, so we need to check if it can be pushed down. - return plan.accept(new Rewriter(), new RewriterContext()); + return plan.accept(new Rewriter(), null); } return plan; } - private static class Rewriter extends PlanVisitor { + private static class Rewriter extends PlanVisitor { @Override - public PlanNode visitPlan(PlanNode node, RewriterContext context) { - for (PlanNode child : node.getChildren()) { - context.setParent(node); - child.accept(this, context); - } + public PlanNode visitPlan(PlanNode node, Void context) { + // other source node, just return return node; } @Override - public PlanNode visitMultiChildProcess(MultiChildProcessNode node, RewriterContext context) { - List children = new ArrayList<>(); + public PlanNode visitSingleChildProcess(SingleChildProcessNode node, Void context) { + PlanNode rewrittenChild = node.getChild().accept(this, context); + node.setChild(rewrittenChild); + return node; + } + + @Override + public PlanNode visitMultiChildProcess(MultiChildProcessNode node, Void context) { + List rewrittenChildren = new ArrayList<>(); for (PlanNode child : node.getChildren()) { - context.setParent(null); - children.add(child.accept(this, context)); + rewrittenChildren.add(child.accept(this, context)); } - return node.cloneWithChildren(children); + node.setChildren(rewrittenChildren); + return node; } @Override - public PlanNode visitColumnInject(ColumnInjectNode node, RewriterContext context) { + public PlanNode visitColumnInject(ColumnInjectNode node, Void context) { PlanNode child = node.getChild(); boolean columnInjectPushDown = true; @@ -109,32 +111,9 @@ public PlanNode visitColumnInject(ColumnInjectNode node, RewriterContext context } if (columnInjectPushDown) { - return concatParentWithChild(context.getParent(), child); - } - return node; - } - - private PlanNode concatParentWithChild(PlanNode parent, PlanNode child) { - if (parent == null) { return child; } - - checkArgument(parent instanceof SingleChildProcessNode); - ((SingleChildProcessNode) parent).setChild(child); - return parent; - } - } - - private static class RewriterContext { - - private PlanNode parent; - - public PlanNode getParent() { - return parent; - } - - public void setParent(PlanNode parent) { - this.parent = parent; + return node; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 10bde9ccf6cc..e8f6b149f0b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -103,6 +103,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +@SuppressWarnings("java:S6539") // suppress "Monster class" warning public abstract class PlanVisitor { public R process(PlanNode node, C context) { @@ -200,7 +201,7 @@ public R visitDeviceViewInto(DeviceViewIntoNode node, C context) { } public R visitColumnInject(ColumnInjectNode node, C context) { - return visitPlan(node, context); + return visitSingleChildProcess(node, context); } public R visitSingleDeviceView(SingleDeviceViewNode node, C context) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java new file mode 100644 index 000000000000..f5dd0507b271 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java @@ -0,0 +1,657 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.optimization; + +import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory; +import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; +import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.TimeDuration; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class ColumnInjectionPushDownTest { + + private static final Map schemaMap = new HashMap<>(); + + static { + try { + schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", TSDataType.INT32)); + schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.DOUBLE)); + + MeasurementPath d2s1 = new MeasurementPath("root.sg.d2.a.s1", TSDataType.INT32); + d2s1.setUnderAlignedEntity(true); + schemaMap.put("root.sg.d2.a.s1", d2s1); + + AlignedPath aligned_d2s1 = + new AlignedPath( + "root.sg.d2.a", + Collections.singletonList("s1"), + Collections.singletonList(d2s1.getMeasurementSchema())); + schemaMap.put("aligned_root.sg.d2.a.s1", aligned_d2s1); + + MeasurementPath d2s2 = new MeasurementPath("root.sg.d2.a.s2", TSDataType.DOUBLE); + d2s2.setUnderAlignedEntity(true); + schemaMap.put("root.sg.d2.a.s2", d2s2); + + AlignedPath aligned_d2s2 = + new AlignedPath( + "root.sg.d2.a", + Collections.singletonList("s2"), + Collections.singletonList(d2s2.getMeasurementSchema())); + schemaMap.put("aligned_root.sg.d2.a.s2", aligned_d2s2); + + AlignedPath alignedPath = + new AlignedPath( + "root.sg.d2.a", + Arrays.asList("s2", "s1"), + Arrays.asList(d2s2.getMeasurementSchema(), d2s1.getMeasurementSchema())); + schemaMap.put("root.sg.d2.a", alignedPath); + } catch (IllegalPathException e) { + e.printStackTrace(); + } + } + + private void checkPushDown(String sql, PlanNode rawPlan, PlanNode optPlan) { + OptimizationTestUtil.checkPushDown(new ColumnInjectionPushDown(), sql, rawPlan, optPlan); + } + + private void checkCannotPushDown(String sql, PlanNode rawPlan) { + OptimizationTestUtil.checkCannotPushDown(new ColumnInjectionPushDown(), sql, rawPlan); + } + + @Test + public void testPushDownAggregationSourceAlignByTime() { + GroupByTimeParameter groupByTimeParameter = + new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new TimeDuration(0, 10), true); + List aggregationDescriptorList = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1")); + + checkPushDown( + "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 10ms);", + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList, + groupByTimeParameter, + false) + .columnInject("1", groupByTimeParameter) + .getRoot(), + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList, + groupByTimeParameter, + true) + .getRoot()); + checkPushDown( + "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 10ms) fill(previous);", + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList, + groupByTimeParameter, + false) + .columnInject("1", groupByTimeParameter) + .fill("2", FillPolicy.PREVIOUS) + .getRoot(), + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList, + groupByTimeParameter, + true) + .fill("2", FillPolicy.PREVIOUS) + .getRoot()); + } + + @Test + public void testPushDownAlignedAggregationSourceAlignByTime() { + GroupByTimeParameter groupByTimeParameter = + new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new TimeDuration(0, 10), true); + List aggregationDescriptorList = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d2.a.s1")); + + checkPushDown( + "select __endTime, count(s1) from root.sg.d2.a group by ([0, 100), 10ms);", + new TestPlanBuilder() + .alignedAggregationScan( + "0", + schemaMap.get("aligned_root.sg.d2.a.s1"), + aggregationDescriptorList, + groupByTimeParameter, + false) + .columnInject("1", groupByTimeParameter) + .getRoot(), + new TestPlanBuilder() + .alignedAggregationScan( + "0", + schemaMap.get("aligned_root.sg.d2.a.s1"), + aggregationDescriptorList, + groupByTimeParameter, + true) + .getRoot()); + checkPushDown( + "select __endTime, count(s1) from root.sg.d2.a group by ([0, 100), 10ms) fill(previous);", + new TestPlanBuilder() + .alignedAggregationScan( + "0", + schemaMap.get("aligned_root.sg.d2.a.s1"), + aggregationDescriptorList, + groupByTimeParameter, + false) + .columnInject("1", groupByTimeParameter) + .fill("2", FillPolicy.PREVIOUS) + .getRoot(), + new TestPlanBuilder() + .alignedAggregationScan( + "0", + schemaMap.get("aligned_root.sg.d2.a.s1"), + aggregationDescriptorList, + groupByTimeParameter, + true) + .fill("2", FillPolicy.PREVIOUS) + .getRoot()); + } + + @Test + public void testPushDownSlidingWindowAlignByTime() { + GroupByTimeParameter groupByTimeParameter = + new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new TimeDuration(0, 5), true); + List aggregationDescriptorList1 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.PARTIAL, "root.sg.d1.s1")); + List aggregationDescriptorList2 = + Collections.singletonList(getAggregationDescriptor(AggregationStep.FINAL, "root.sg.d1.s1")); + checkPushDown( + "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 10ms, 5ms);", + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList1, + groupByTimeParameter, + false) + .slidingWindow("1", aggregationDescriptorList2, groupByTimeParameter, false) + .columnInject("2", groupByTimeParameter) + .getRoot(), + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList1, + groupByTimeParameter, + false) + .slidingWindow("1", aggregationDescriptorList2, groupByTimeParameter, true) + .getRoot()); + checkPushDown( + "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 10ms, 5ms) fill(previous);", + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList1, + groupByTimeParameter, + false) + .slidingWindow("1", aggregationDescriptorList2, groupByTimeParameter, false) + .columnInject("2", groupByTimeParameter) + .fill("3", FillPolicy.PREVIOUS) + .getRoot(), + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList1, + groupByTimeParameter, + false) + .slidingWindow("1", aggregationDescriptorList2, groupByTimeParameter, true) + .fill("3", FillPolicy.PREVIOUS) + .getRoot()); + } + + @Test + public void testPushDownRawDataAggregationAlignByTime() { + GroupByTimeParameter groupByTimeParameter = + new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new TimeDuration(0, 10), true); + List aggregationDescriptorList = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1")); + + checkPushDown( + "select __endTime, count(s1) from root.sg.d1 where s1 > 10 group by ([0, 100), 10ms);", + new TestPlanBuilder() + .scan("0", schemaMap.get("root.sg.d1.s1")) + .filter( + "1", + Collections.singletonList( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))), + ExpressionFactory.gt( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")), + ExpressionFactory.intValue("10")), + true) + .rawDataAggregation("2", aggregationDescriptorList, groupByTimeParameter, true) + .columnInject("3", groupByTimeParameter) + .getRoot(), + new TestPlanBuilder() + .scan("0", schemaMap.get("root.sg.d1.s1")) + .filter( + "1", + Collections.singletonList( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))), + ExpressionFactory.gt( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")), + ExpressionFactory.intValue("10")), + true) + .rawDataAggregation("2", aggregationDescriptorList, groupByTimeParameter, true) + .getRoot()); + } + + @Test + public void testCannotPushDownTimeJoinAlignByTime() { + GroupByTimeParameter groupByTimeParameter = + new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new TimeDuration(0, 10), true); + List aggregationDescriptorList1 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1")); + List aggregationDescriptorList2 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s2")); + + checkCannotPushDown( + "select __endTime, count(s1), count(s2) from root.sg.d1 group by ([0, 100), 10ms);", + new TestPlanBuilder() + .aggregationTimeJoin( + 0, + Arrays.asList(schemaMap.get("root.sg.d1.s2"), schemaMap.get("root.sg.d1.s1")), + Arrays.asList(aggregationDescriptorList2, aggregationDescriptorList1), + groupByTimeParameter) + .columnInject("3", groupByTimeParameter) + .getRoot()); + checkCannotPushDown( + "select __endTime, count(s1), count(s2) from root.sg.d1 group by ([0, 100), 10ms) fill(previous);", + new TestPlanBuilder() + .aggregationTimeJoin( + 0, + Arrays.asList(schemaMap.get("root.sg.d1.s2"), schemaMap.get("root.sg.d1.s1")), + Arrays.asList(aggregationDescriptorList2, aggregationDescriptorList1), + groupByTimeParameter) + .columnInject("3", groupByTimeParameter) + .fill("4", FillPolicy.PREVIOUS) + .getRoot()); + } + + @Test + public void testPushDownAggregationSourceAlignByDevice() { + GroupByTimeParameter groupByTimeParameter = + new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new TimeDuration(0, 10), true); + List aggregationDescriptorList1 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1")); + List aggregationDescriptorList2 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d2.a.s1")); + + List outputColumnNames = Arrays.asList("Device", "__endTime", "count(s1)"); + List devices = Arrays.asList("root.sg.d1", "root.sg.d2.a"); + Map> deviceToMeasurementIndexesMap = new HashMap<>(); + deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2)); + deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 2)); + + checkPushDown( + "select __endTime, count(s1) from root.sg.d1, root.sg.d2.a group by ([0, 100), 10ms) align by device;", + new TestPlanBuilder() + .deviceView( + "4", + outputColumnNames, + devices, + deviceToMeasurementIndexesMap, + Arrays.asList( + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList1, + groupByTimeParameter, + false) + .columnInject("1", groupByTimeParameter) + .getRoot(), + new TestPlanBuilder() + .alignedAggregationScan( + "2", + schemaMap.get("aligned_root.sg.d2.a.s1"), + aggregationDescriptorList2, + groupByTimeParameter, + false) + .columnInject("3", groupByTimeParameter) + .getRoot())) + .getRoot(), + new TestPlanBuilder() + .deviceView( + "4", + outputColumnNames, + devices, + deviceToMeasurementIndexesMap, + Arrays.asList( + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList1, + groupByTimeParameter, + true) + .getRoot(), + new TestPlanBuilder() + .alignedAggregationScan( + "2", + schemaMap.get("aligned_root.sg.d2.a.s1"), + aggregationDescriptorList2, + groupByTimeParameter, + true) + .getRoot())) + .getRoot()); + } + + @Test + public void testPushDownSlidingWindowAlignByDevice() { + GroupByTimeParameter groupByTimeParameter = + new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new TimeDuration(0, 5), true); + + List aggregationDescriptorList1_1 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.PARTIAL, "root.sg.d1.s1")); + List aggregationDescriptorList1_2 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.PARTIAL, "root.sg.d2.a.s1")); + + List aggregationDescriptorList2_1 = + Collections.singletonList(getAggregationDescriptor(AggregationStep.FINAL, "root.sg.d1.s1")); + List aggregationDescriptorList2_2 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.FINAL, "root.sg.d2.a.s1")); + + List outputColumnNames = Arrays.asList("Device", "__endTime", "count(s1)"); + List devices = Arrays.asList("root.sg.d1", "root.sg.d2.a"); + Map> deviceToMeasurementIndexesMap = new HashMap<>(); + deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2)); + deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 2)); + + checkPushDown( + "select __endTime, count(s1) from root.sg.d1, root.sg.d2.a group by ([0, 100), 10ms, 5ms) align by device;", + new TestPlanBuilder() + .deviceView( + "6", + outputColumnNames, + devices, + deviceToMeasurementIndexesMap, + Arrays.asList( + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList1_1, + groupByTimeParameter, + false) + .slidingWindow( + "1", aggregationDescriptorList2_1, groupByTimeParameter, false) + .columnInject("2", groupByTimeParameter) + .getRoot(), + new TestPlanBuilder() + .alignedAggregationScan( + "3", + schemaMap.get("aligned_root.sg.d2.a.s1"), + aggregationDescriptorList1_2, + groupByTimeParameter, + false) + .slidingWindow( + "4", aggregationDescriptorList2_2, groupByTimeParameter, false) + .columnInject("5", groupByTimeParameter) + .getRoot())) + .getRoot(), + new TestPlanBuilder() + .deviceView( + "6", + outputColumnNames, + devices, + deviceToMeasurementIndexesMap, + Arrays.asList( + new TestPlanBuilder() + .aggregationScan( + "0", + schemaMap.get("root.sg.d1.s1"), + aggregationDescriptorList1_1, + groupByTimeParameter, + false) + .slidingWindow( + "1", aggregationDescriptorList2_1, groupByTimeParameter, true) + .getRoot(), + new TestPlanBuilder() + .alignedAggregationScan( + "3", + schemaMap.get("aligned_root.sg.d2.a.s1"), + aggregationDescriptorList1_2, + groupByTimeParameter, + false) + .slidingWindow( + "4", aggregationDescriptorList2_2, groupByTimeParameter, true) + .getRoot())) + .getRoot()); + } + + @Test + public void testPushDownRawDataAggregationAlignByDevice() { + GroupByTimeParameter groupByTimeParameter = + new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new TimeDuration(0, 10), true); + + List aggregationDescriptorList1 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1")); + List aggregationDescriptorList2 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d2.a.s1")); + + List outputColumnNames = Arrays.asList("Device", "__endTime", "count(s1)"); + List devices = Arrays.asList("root.sg.d1", "root.sg.d2.a"); + Map> deviceToMeasurementIndexesMap = new HashMap<>(); + deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2)); + deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 2)); + + checkPushDown( + "select __endTime, count(s1) from root.sg.d1, root.sg.d2.a where s1 > 10 group by ([0, 100), 10ms) align by device;", + new TestPlanBuilder() + .deviceView( + "8", + outputColumnNames, + devices, + deviceToMeasurementIndexesMap, + Arrays.asList( + new TestPlanBuilder() + .scan("0", schemaMap.get("root.sg.d1.s1")) + .filter( + "1", + Collections.singletonList( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))), + ExpressionFactory.gt( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")), + ExpressionFactory.intValue("10")), + true) + .rawDataAggregation( + "2", aggregationDescriptorList1, groupByTimeParameter, true) + .columnInject("3", groupByTimeParameter) + .getRoot(), + new TestPlanBuilder() + .scanAligned("4", schemaMap.get("aligned_root.sg.d2.a.s1")) + .filter( + "5", + Collections.singletonList( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1"))), + ExpressionFactory.gt( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1")), + ExpressionFactory.intValue("10")), + true) + .rawDataAggregation( + "6", aggregationDescriptorList2, groupByTimeParameter, true) + .columnInject("7", groupByTimeParameter) + .getRoot())) + .getRoot(), + new TestPlanBuilder() + .deviceView( + "8", + outputColumnNames, + devices, + deviceToMeasurementIndexesMap, + Arrays.asList( + new TestPlanBuilder() + .scan("0", schemaMap.get("root.sg.d1.s1")) + .filter( + "1", + Collections.singletonList( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))), + ExpressionFactory.gt( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")), + ExpressionFactory.intValue("10")), + true) + .rawDataAggregation( + "2", aggregationDescriptorList1, groupByTimeParameter, true) + .getRoot(), + new TestPlanBuilder() + .scanAligned("4", schemaMap.get("aligned_root.sg.d2.a.s1")) + .filter( + "5", + Collections.singletonList( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1"))), + ExpressionFactory.gt( + ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1")), + ExpressionFactory.intValue("10")), + true) + .rawDataAggregation( + "6", aggregationDescriptorList2, groupByTimeParameter, true) + .getRoot())) + .getRoot()); + } + + @Test + public void testPartialPushDownTimeJoinAlignByDevice() { + GroupByTimeParameter groupByTimeParameter = + new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new TimeDuration(0, 10), true); + List aggregationDescriptorList1_1 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1")); + List aggregationDescriptorList1_2 = + Collections.singletonList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s2")); + + List aggregationDescriptorList2 = + Arrays.asList( + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d2.a.s2"), + getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d2.a.s1")); + + List outputColumnNames = Arrays.asList("Device", "__endTime", "count(s1)", "count(s2)"); + List devices = Arrays.asList("root.sg.d1", "root.sg.d2.a"); + Map> deviceToMeasurementIndexesMap = new LinkedHashMap<>(); + deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 3, 2)); + deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 3, 2)); + + checkPushDown( + "select __endTime, count(s1), count(s2) from root.sg.d1, root.sg.d2.a group by ([0, 100), 10ms) align by device;", + new TestPlanBuilder() + .deviceView( + "6", + outputColumnNames, + devices, + deviceToMeasurementIndexesMap, + Arrays.asList( + new TestPlanBuilder() + .aggregationTimeJoin( + 0, + Arrays.asList( + schemaMap.get("root.sg.d1.s2"), schemaMap.get("root.sg.d1.s1")), + Arrays.asList( + aggregationDescriptorList1_2, aggregationDescriptorList1_1), + groupByTimeParameter) + .columnInject("3", groupByTimeParameter) + .getRoot(), + new TestPlanBuilder() + .alignedAggregationScan( + "4", + schemaMap.get("root.sg.d2.a"), + aggregationDescriptorList2, + groupByTimeParameter, + false) + .columnInject("5", groupByTimeParameter) + .getRoot())) + .getRoot(), + new TestPlanBuilder() + .deviceView( + "6", + outputColumnNames, + devices, + deviceToMeasurementIndexesMap, + Arrays.asList( + new TestPlanBuilder() + .aggregationTimeJoin( + 0, + Arrays.asList( + schemaMap.get("root.sg.d1.s2"), schemaMap.get("root.sg.d1.s1")), + Arrays.asList( + aggregationDescriptorList1_2, aggregationDescriptorList1_1), + groupByTimeParameter) + .columnInject("3", groupByTimeParameter) + .getRoot(), + new TestPlanBuilder() + .alignedAggregationScan( + "4", + schemaMap.get("root.sg.d2.a"), + aggregationDescriptorList2, + groupByTimeParameter, + true) + .getRoot())) + .getRoot()); + } + + private AggregationDescriptor getAggregationDescriptor(AggregationStep step, String path) { + return new AggregationDescriptor( + TAggregationType.COUNT.name().toLowerCase(), + step, + Collections.singletonList(new TimeSeriesOperand(schemaMap.get(path))), + new HashMap<>()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java index 717cf9116dbf..7f086ad0be3a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java @@ -31,10 +31,8 @@ import org.apache.iotdb.db.queryengine.plan.analyze.FakeSchemaFetcherImpl; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; -import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; -import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; import org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; @@ -57,6 +55,8 @@ import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.gt; import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.intValue; import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.timeSeries; +import static org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.checkCannotPushDown; +import static org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.checkPushDown; public class LimitOffsetPushDownTest { @@ -262,42 +262,19 @@ public void testCannotPushDown() { .filter( "1", Collections.singletonList(timeSeries(schemaMap.get("root.sg.d1.s1"))), - gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10"))) + gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")), + false) .offset("2", 100) .limit("3", 100) .getRoot()); } private void checkPushDown(String sql, PlanNode rawPlan, PlanNode optPlan) { - Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()); - - MPPQueryContext context = new MPPQueryContext(new QueryId("test_query")); - Analyzer analyzer = - new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl()); - Analysis analysis = analyzer.analyze(statement); - - LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>()); - PlanNode actualPlan = planner.plan(analysis).getRootNode(); - Assert.assertEquals(rawPlan, actualPlan); - - PlanNode actualOptPlan = new LimitOffsetPushDown().optimize(actualPlan, analysis, context); - Assert.assertEquals(optPlan, actualOptPlan); + OptimizationTestUtil.checkPushDown(new LimitOffsetPushDown(), sql, rawPlan, optPlan); } private void checkCannotPushDown(String sql, PlanNode rawPlan) { - Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()); - - MPPQueryContext context = new MPPQueryContext(new QueryId("test_query")); - Analyzer analyzer = - new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl()); - Analysis analysis = analyzer.analyze(statement); - - LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>()); - PlanNode actualPlan = planner.plan(analysis).getRootNode(); - - Assert.assertEquals(rawPlan, actualPlan); - Assert.assertEquals( - actualPlan, new LimitOffsetPushDown().optimize(actualPlan, analysis, context)); + OptimizationTestUtil.checkCannotPushDown(new LimitOffsetPushDown(), sql, rawPlan); } // test for limit/offset push down in group by time diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java new file mode 100644 index 000000000000..73560bd21123 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.optimization; + +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; +import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer; +import org.apache.iotdb.db.queryengine.plan.analyze.FakePartitionFetcherImpl; +import org.apache.iotdb.db.queryengine.plan.analyze.FakeSchemaFetcherImpl; +import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; +import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; + +import org.junit.Assert; + +import java.time.ZonedDateTime; +import java.util.ArrayList; + +public class OptimizationTestUtil { + + private OptimizationTestUtil() { + // util class + } + + public static void checkPushDown( + PlanOptimizer optimizer, String sql, PlanNode rawPlan, PlanNode optPlan) { + Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()); + + MPPQueryContext context = new MPPQueryContext(new QueryId("test_query")); + Analyzer analyzer = + new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl()); + Analysis analysis = analyzer.analyze(statement); + + LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>()); + PlanNode actualPlan = planner.plan(analysis).getRootNode(); + Assert.assertEquals(rawPlan, actualPlan); + + PlanNode actualOptPlan = optimizer.optimize(actualPlan, analysis, context); + Assert.assertEquals(optPlan, actualOptPlan); + } + + public static void checkCannotPushDown(PlanOptimizer optimizer, String sql, PlanNode rawPlan) { + Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()); + + MPPQueryContext context = new MPPQueryContext(new QueryId("test_query")); + Analyzer analyzer = + new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl()); + Analysis analysis = analyzer.analyze(statement); + + LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>()); + PlanNode actualPlan = planner.plan(analysis).getRootNode(); + + Assert.assertEquals(rawPlan, actualPlan); + Assert.assertEquals(actualPlan, optimizer.optimize(actualPlan, analysis, context)); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java index 699e7b265be0..c3684bc91403 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java @@ -25,17 +25,24 @@ import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.FillDescriptor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter; import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; @@ -43,6 +50,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; import org.apache.iotdb.db.queryengine.plan.statement.literal.LongLiteral; +import org.apache.iotdb.db.utils.columngenerator.parameter.SlidingTimeColumnGeneratorParameter; import java.time.ZonedDateTime; import java.util.ArrayList; @@ -90,6 +98,111 @@ public TestPlanBuilder scanAligned(String id, PartialPath path, int limit, int o return this; } + public TestPlanBuilder aggregationScan( + String id, + PartialPath path, + List aggregationDescriptors, + GroupByTimeParameter groupByTimeParameter, + boolean outputEndTime) { + SeriesAggregationScanNode aggregationScanNode = + new SeriesAggregationScanNode( + new PlanNodeId(id), + (MeasurementPath) path, + aggregationDescriptors, + Ordering.ASC, + groupByTimeParameter); + aggregationScanNode.setOutputEndTime(outputEndTime); + this.root = aggregationScanNode; + return this; + } + + public TestPlanBuilder alignedAggregationScan( + String id, + PartialPath path, + List aggregationDescriptors, + GroupByTimeParameter groupByTimeParameter, + boolean outputEndTime) { + AlignedSeriesAggregationScanNode aggregationScanNode = + new AlignedSeriesAggregationScanNode( + new PlanNodeId(id), + (AlignedPath) path, + aggregationDescriptors, + Ordering.ASC, + groupByTimeParameter); + aggregationScanNode.setOutputEndTime(outputEndTime); + this.root = aggregationScanNode; + return this; + } + + public TestPlanBuilder rawDataAggregation( + String id, + List aggregationDescriptors, + GroupByTimeParameter groupByTimeParameter, + boolean outputEndTime) { + AggregationNode aggregationNode = + new AggregationNode( + new PlanNodeId(String.valueOf(id)), + Collections.singletonList(getRoot()), + aggregationDescriptors, + groupByTimeParameter, + Ordering.ASC); + aggregationNode.setOutputEndTime(outputEndTime); + this.root = aggregationNode; + return this; + } + + public TestPlanBuilder slidingWindow( + String id, + List aggregationDescriptors, + GroupByTimeParameter groupByTimeParameter, + boolean outputEndTime) { + SlidingWindowAggregationNode slidingWindowAggregationNode = + new SlidingWindowAggregationNode( + new PlanNodeId(id), + getRoot(), + aggregationDescriptors, + groupByTimeParameter, + Ordering.ASC); + slidingWindowAggregationNode.setOutputEndTime(outputEndTime); + this.root = slidingWindowAggregationNode; + return this; + } + + public TestPlanBuilder aggregationTimeJoin( + int startId, + List paths, + List> aggregationDescriptorsList, + GroupByTimeParameter groupByTimeParameter) { + int planId = startId; + + List seriesSourceNodes = new ArrayList<>(); + for (int i = 0; i < paths.size(); i++) { + PartialPath path = paths.get(i); + if (path instanceof MeasurementPath) { + seriesSourceNodes.add( + new SeriesAggregationScanNode( + new PlanNodeId(String.valueOf(planId)), + (MeasurementPath) paths.get(i), + aggregationDescriptorsList.get(i), + Ordering.ASC, + groupByTimeParameter)); + } else { + seriesSourceNodes.add( + new AlignedSeriesAggregationScanNode( + new PlanNodeId(String.valueOf(planId)), + (AlignedPath) paths.get(i), + aggregationDescriptorsList.get(i), + Ordering.ASC, + groupByTimeParameter)); + } + planId++; + } + + this.root = + new TimeJoinNode(new PlanNodeId(String.valueOf(planId)), Ordering.ASC, seriesSourceNodes); + return this; + } + public TestPlanBuilder timeJoin(List paths) { int planId = 0; @@ -163,14 +276,15 @@ public TestPlanBuilder singleDeviceView(String id, String device, String measure return this; } - public TestPlanBuilder filter(String id, List expressions, Expression predicate) { + public TestPlanBuilder filter( + String id, List expressions, Expression predicate, boolean isGroupByTime) { this.root = new FilterNode( new PlanNodeId(id), getRoot(), expressions.toArray(new Expression[0]), predicate, - false, + isGroupByTime, ZonedDateTime.now().getOffset(), Ordering.ASC); return this; @@ -185,4 +299,35 @@ public TestPlanBuilder into(String id, PartialPath sourcePath, PartialPath intoP this.root = new IntoNode(new PlanNodeId(id), getRoot(), intoPathDescriptor); return this; } + + public TestPlanBuilder columnInject(String id, GroupByTimeParameter groupByTimeParameter) { + this.root = + new ColumnInjectNode( + new PlanNodeId(id), + getRoot(), + 0, + new SlidingTimeColumnGeneratorParameter(groupByTimeParameter, true)); + return this; + } + + public TestPlanBuilder deviceView( + String id, + List outputColumnNames, + List devices, + Map> deviceToMeasurementIndexesMap, + List children) { + DeviceViewNode deviceViewNode = + new DeviceViewNode( + new PlanNodeId(id), + new OrderByParameter( + Arrays.asList( + new SortItem(OrderByKey.DEVICE, Ordering.ASC), + new SortItem(OrderByKey.TIME, Ordering.ASC))), + outputColumnNames, + devices, + deviceToMeasurementIndexesMap); + deviceViewNode.setChildren(children); + this.root = deviceViewNode; + return this; + } }