From 3bcf778b2f2f8a8aa3228b8c2b6ada9c1a237937 Mon Sep 17 00:00:00 2001 From: Yang Yuming <50571168+YangYumings@users.noreply.github.com> Date: Wed, 16 Oct 2024 19:26:42 +0800 Subject: [PATCH] Support iotdb table model Q4-Q8,Q11 (#455) * Support iotdb table model Q4-Q8,Q11 * Remove logs used by tests * Reduce code duplication * addGroupByClause is changed to private --- configuration/conf/config.properties | 1 - .../iot/benchmark/conf/ConfigDescriptor.java | 3 +- .../iot/benchmark/iotdb200/IoTDB.java | 60 +++++++++---------- .../ModelStrategy/IoTDBModelStrategy.java | 23 +++++++ .../iotdb200/ModelStrategy/TableStrategy.java | 50 +++++++++++++++- .../iotdb200/ModelStrategy/TreeStrategy.java | 31 ++++++++++ 6 files changed, 134 insertions(+), 34 deletions(-) diff --git a/configuration/conf/config.properties b/configuration/conf/config.properties index 622a2f633..c0cfe34a3 100644 --- a/configuration/conf/config.properties +++ b/configuration/conf/config.properties @@ -323,7 +323,6 @@ # Q9 倒序范围查询(只限制起止时间)select v1... from data where time > ? and time < ? and device in ? order by time desc # Q10 倒序带值过滤的范围查询 select v1... from data where time > ? and time < ? and v1 > ? and device in ? order by time desc # Q11 分组聚合查询,倒序;目前仅支持iotdb、tdengine-3.0、influxdb v1 -# IoTDB-2.0 表模型支持 Q1 Q2 Q3 Q9 Q10 # OPERATION_PROPORTION=1:0:0:0:0:0:0:0:0:0:0:0 # 最长等待写时间,单位毫秒,即如果整个写操作在指定时间内没有返回,则终止此操作 diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java index 9dc5b18bc..a2ccb4695 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java @@ -797,7 +797,8 @@ private boolean checkDatabaseVerification(DBConfig dbConfig) { boolean result = false; if (dbConfig.getDB_SWITCH().getType() == DBType.IoTDB) { // support after iotdb 1.0 - if (dbConfig.getDB_SWITCH().getVersion() == DBVersion.IOTDB_130 + if (dbConfig.getDB_SWITCH().getVersion() == DBVersion.IOTDB_200 + || dbConfig.getDB_SWITCH().getVersion() == DBVersion.IOTDB_130 || dbConfig.getDB_SWITCH().getVersion() == DBVersion.IOTDB_110 || dbConfig.getDB_SWITCH().getVersion() == DBVersion.IOTDB_100) { result = true; diff --git a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/IoTDB.java b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/IoTDB.java index 38181f85b..71d78987e 100644 --- a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/IoTDB.java +++ b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/IoTDB.java @@ -242,6 +242,7 @@ public Status rangeQuery(RangeQuery rangeQuery) { rangeQuery.getDeviceSchema(), rangeQuery.getStartTimestamp(), rangeQuery.getEndTimestamp()); + sql = modelStrategy.addDeviceIDColumnIfNecessary(rangeQuery.getDeviceSchema(), sql); return executeQueryAndGetStatus(sql, Operation.RANGE_QUERY); } @@ -254,6 +255,7 @@ public Status rangeQuery(RangeQuery rangeQuery) { @Override public Status valueRangeQuery(ValueRangeQuery valueRangeQuery) { String sql = getValueRangeQuerySql(valueRangeQuery); + sql = modelStrategy.addDeviceIDColumnIfNecessary(valueRangeQuery.getDeviceSchema(), sql); return executeQueryAndGetStatus(sql, Operation.VALUE_RANGE_QUERY); } @@ -270,6 +272,7 @@ public Status aggRangeQuery(AggRangeQuery aggRangeQuery) { String sql = addWhereTimeClause( aggQuerySqlHead, aggRangeQuery.getStartTimestamp(), aggRangeQuery.getEndTimestamp()); + sql = modelStrategy.addDeviceIDColumnIfNecessary(aggRangeQuery.getDeviceSchema(), sql); return executeQueryAndGetStatus(sql, Operation.AGG_RANGE_QUERY); } @@ -288,6 +291,7 @@ public Status aggValueQuery(AggValueQuery aggValueQuery) { + getValueFilterClause( aggValueQuery.getDeviceSchema(), (int) aggValueQuery.getValueThreshold()) .substring(4); + sql = modelStrategy.addDeviceIDColumnIfNecessary(aggValueQuery.getDeviceSchema(), sql); return executeQueryAndGetStatus(sql, Operation.AGG_VALUE_QUERY); } @@ -310,6 +314,7 @@ public Status aggRangeValueQuery(AggRangeValueQuery aggRangeValueQuery) { sql += getValueFilterClause( aggRangeValueQuery.getDeviceSchema(), (int) aggRangeValueQuery.getValueThreshold()); + sql = modelStrategy.addDeviceIDColumnIfNecessary(aggRangeValueQuery.getDeviceSchema(), sql); return executeQueryAndGetStatus(sql, Operation.AGG_RANGE_VALUE_QUERY); } @@ -321,14 +326,7 @@ public Status aggRangeValueQuery(AggRangeValueQuery aggRangeValueQuery) { */ @Override public Status groupByQuery(GroupByQuery groupByQuery) { - String aggQuerySqlHead = - getAggQuerySqlHead(groupByQuery.getDeviceSchema(), groupByQuery.getAggFun()); - String sql = - addGroupByClause( - aggQuerySqlHead, - groupByQuery.getStartTimestamp(), - groupByQuery.getEndTimestamp(), - groupByQuery.getGranularity()); + String sql = modelStrategy.getGroupByQuerySQL(groupByQuery); return executeQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY); } @@ -339,8 +337,8 @@ public Status groupByQuery(GroupByQuery groupByQuery) { */ @Override public Status latestPointQuery(LatestPointQuery latestPointQuery) { - String aggQuerySqlHead = getLatestPointQuerySql(latestPointQuery.getDeviceSchema()); - return executeQueryAndGetStatus(aggQuerySqlHead, Operation.LATEST_POINT_QUERY); + String latestPointSqlHead = getLatestPointQuerySql(latestPointQuery.getDeviceSchema()); + return executeQueryAndGetStatus(latestPointSqlHead, Operation.LATEST_POINT_QUERY); } /** @@ -353,9 +351,11 @@ public Status latestPointQuery(LatestPointQuery latestPointQuery) { public Status rangeQueryOrderByDesc(RangeQuery rangeQuery) { String sql = getRangeQuerySql( - rangeQuery.getDeviceSchema(), - rangeQuery.getStartTimestamp(), - rangeQuery.getEndTimestamp()) + rangeQuery.getDeviceSchema(), + rangeQuery.getStartTimestamp(), + rangeQuery.getEndTimestamp()); + sql = + modelStrategy.addDeviceIDColumnIfNecessary(rangeQuery.getDeviceSchema(), sql) + " order by time desc"; return executeQueryAndGetStatus(sql, Operation.RANGE_QUERY_ORDER_BY_TIME_DESC); } @@ -368,20 +368,23 @@ public Status rangeQueryOrderByDesc(RangeQuery rangeQuery) { */ @Override public Status valueRangeQueryOrderByDesc(ValueRangeQuery valueRangeQuery) { - String sql = getValueRangeQuerySql(valueRangeQuery) + " order by time desc"; + String sql = getValueRangeQuerySql(valueRangeQuery); + sql = + modelStrategy.addDeviceIDColumnIfNecessary(valueRangeQuery.getDeviceSchema(), sql) + + " order by time desc"; return executeQueryAndGetStatus(sql, Operation.VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC); } + /** + * Q11: GroupByQuery SQL: select {AggFun}({sensors}) from {devices} group by ([{start}, {end}], * + * {Granularity}ms) order by time desc + * + * @param groupByQuery + * @return + */ @Override public Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) { - String aggQuerySqlHead = - getAggQuerySqlHead(groupByQuery.getDeviceSchema(), groupByQuery.getAggFun()); - String sql = - addGroupByClause( - aggQuerySqlHead, - groupByQuery.getStartTimestamp(), - groupByQuery.getEndTimestamp(), - groupByQuery.getGranularity()); + String sql = modelStrategy.getGroupByQuerySQL(groupByQuery); sql += ORDER_BY_TIME_DESC; return executeQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY_ORDER_BY_TIME_DESC); } @@ -455,24 +458,21 @@ private String getLatestPointQuerySql(List devices) { for (int i = 1; i < querySensors.size(); i++) { builder.append(", ").append(querySensors.get(i).getName()); } - return addFromClause(devices, builder); + String sql = addFromClause(devices, builder); + sql = modelStrategy.addWhereValueClauseIfNecessary(devices, sql); + return sql; } private String getRangeQuerySql(List deviceSchemas, long start, long end) { - return modelStrategy.addDeviceIDColumnIfNecessary( - deviceSchemas, addWhereTimeClause(getSimpleQuerySqlHead(deviceSchemas), start, end)); + return addWhereTimeClause(getSimpleQuerySqlHead(deviceSchemas), start, end); } - private String addWhereTimeClause(String prefix, long start, long end) { + public static String addWhereTimeClause(String prefix, long start, long end) { String startTime = start + ""; String endTime = end + ""; return prefix + " WHERE time >= " + startTime + " AND time <= " + endTime; } - private String addGroupByClause(String prefix, long start, long end, long granularity) { - return prefix + " group by ([" + start + "," + end + ")," + granularity + "ms) "; - } - protected Status executeQueryAndGetStatus(String sql, Operation operation) { String executeSQL; if (config.isIOTDB_USE_DEBUG() && random.nextDouble() < config.getIOTDB_USE_DEBUG_RATIO()) { diff --git a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/IoTDBModelStrategy.java b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/IoTDBModelStrategy.java index bdfe549f2..036b5ed42 100644 --- a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/IoTDBModelStrategy.java +++ b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/IoTDBModelStrategy.java @@ -33,6 +33,7 @@ import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema; import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.GroupByQuery; import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -131,6 +132,28 @@ public abstract void sessionCleanupImpl(Session session) // endregion + // region query + + public abstract String getGroupByQuerySQL(GroupByQuery groupByQuery); + + public abstract String addWhereValueClauseIfNecessary(List devices, String prefix); + + protected String getAggFunForGroupByQuery(List querySensors, String aggFunction) { + StringBuilder builder = new StringBuilder(); + builder.append(aggFunction).append("(").append(querySensors.get(0).getName()).append(")"); + for (int i = 1; i < querySensors.size(); i++) { + builder + .append(", ") + .append(aggFunction) + .append("(") + .append(querySensors.get(i).getName()) + .append(")"); + } + return builder.toString(); + } + + // endregion + public abstract Logger getLogger(); public void handleRegisterException(Exception e) throws TsdbException { diff --git a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TableStrategy.java b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TableStrategy.java index bf3c4b438..e91027351 100644 --- a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TableStrategy.java +++ b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TableStrategy.java @@ -34,6 +34,7 @@ import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema; import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.GroupByQuery; import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -163,11 +164,14 @@ public String addFromClause(List devices, StringBuilder builder) { @Override public String addDeviceIDColumnIfNecessary(List deviceSchemas, String sql) { + return sql + " AND" + getDeviceIDColumn(deviceSchemas); + } + + public String getDeviceIDColumn(List deviceSchemas) { Set deviceIds = new HashSet<>(); StringBuilder builder = new StringBuilder(); builder - .append(sql) - .append(" AND (") + .append(" (") .append("device_id") .append(" = '") .append(deviceSchemas.get(0).getDevice()) @@ -357,6 +361,48 @@ public void sessionCleanupImpl(Session session) } } + /** + * eg. SELECT data_bin(20000ms, time), count(s_1), count(s_3), count(s_4) FROM test_g_0.table_0 + * WHERE time >= 1640966400000 AND time < 1640966650000 AND (device_id='d_0' OR device_id='d_3') + * group by time + * + *

getAggForGroupByQuery + */ + @Override + public String getGroupByQuerySQL(GroupByQuery groupByQuery) { + StringBuilder builder = new StringBuilder(); + // SELECT + builder + .append("SELECT ") + .append("data_bin(") + .append(groupByQuery.getGranularity()) + .append("ms, ") + .append("time), ") + .append( + getAggFunForGroupByQuery( + groupByQuery.getDeviceSchema().get(0).getSensors(), groupByQuery.getAggFun())); + // FROM + String sql = addFromClause(groupByQuery.getDeviceSchema(), builder); + // WHERE + sql = + IoTDB.addWhereTimeClause( + sql, groupByQuery.getStartTimestamp(), groupByQuery.getEndTimestamp()); + sql = addDeviceIDColumnIfNecessary(groupByQuery.getDeviceSchema(), sql); + // GROUP BY + sql = addGroupByClause(sql); + return sql; + } + + @Override + public String addWhereValueClauseIfNecessary(List devices, String prefix) { + String sql = prefix + " WHERE" + getDeviceIDColumn(devices); + return sql; + } + + private String addGroupByClause(String prefix) { + return prefix + " group by time"; + } + // endregion @Override diff --git a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TreeStrategy.java b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TreeStrategy.java index 27f67be65..158a2f3da 100644 --- a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TreeStrategy.java +++ b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TreeStrategy.java @@ -35,6 +35,7 @@ import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema; import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import cn.edu.tsinghua.iot.benchmark.workload.query.impl.GroupByQuery; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; @@ -355,6 +356,36 @@ public void sessionCleanupImpl(Session session) { } } + @Override + public String getGroupByQuerySQL(GroupByQuery groupByQuery) { + StringBuilder builder = new StringBuilder(); + // SELECT + builder + .append("SELECT ") + .append( + getAggFunForGroupByQuery( + groupByQuery.getDeviceSchema().get(0).getSensors(), groupByQuery.getAggFun())); + // FROM + String sql = addFromClause(groupByQuery.getDeviceSchema(), builder); + // GROUP BY + sql = + addGroupByClause( + sql, + groupByQuery.getStartTimestamp(), + groupByQuery.getEndTimestamp(), + groupByQuery.getGranularity()); + return sql; + } + + @Override + public String addWhereValueClauseIfNecessary(List devices, String prefix) { + return prefix; + } + + private String addGroupByClause(String prefix, long start, long end, long granularity) { + return prefix + " group by ([" + start + "," + end + ")," + granularity + "ms) "; + } + // endregion @Override