diff --git a/configuration/conf/config.properties b/configuration/conf/config.properties index a5f5dcf9c..872cbd1cf 100644 --- a/configuration/conf/config.properties +++ b/configuration/conf/config.properties @@ -297,7 +297,8 @@ # Q8 最近点查询 select time, v1... where device = ? and time = max(time) # Q9 倒序范围查询(只限制起止时间)select v1... from data where time > ? and time < ? and device in ? order by time desc # Q10 倒序带值过滤的范围查询 select v1... from data where time > ? and time < ? and v1 > ? and device in ? order by time desc -# OPERATION_PROPORTION=1:0:0:0:0:0:0:0:0:0:0 +# Q11 分组聚合查询,倒序;目前仅支持iotdb、tdengine-3.0、influxdb v1 +# OPERATION_PROPORTION=1:0:0:0:0:0:0:0:0:0:0:0 # 最长等待写时间,单位毫秒,即如果整个写操作在指定时间内没有返回,则终止此操作 # WRITE_OPERATION_TIMEOUT_MS=120000 @@ -386,6 +387,16 @@ # 查询语句的随机数种子 # QUERY_SEED=151658 +# 行数限制, -1表示不使用limit语句 +# 目前仅支持iotdb、influxdb v1、tdengine-3 +# RESULT_ROW_LIMIT=-1 + +# 是否align by device +# 目前仅支持iotdb、influxdb v1、tdengine-3 +# iotdb支持OPERATION_PROPORTION=1:1:1:0:0:0:0:0:0:1:0:0 +# tdengine支持1:1:1:1:1:1:1:1:1:1:1:1 +# ALIGN_BY_DEVICE=false + ################ Workload:相关参数 ###################### # workload的缓冲区的大小 # WORKLOAD_BUFFER_SIZE=100 diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java index a8dffcfe7..edf91bdd9 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java @@ -95,6 +95,9 @@ protected void doTest() { case VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC: dbWrapper.valueRangeQueryOrderByDesc(queryWorkLoad.getValueRangeQuery()); break; + case GROUP_BY_QUERY_ORDER_BY_TIME_DESC: + dbWrapper.groupByQueryOrderByDesc(queryWorkLoad.getGroupByQuery()); + break; default: LOGGER.error("Unsupported operation sensorType {}", operation); } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/Operation.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/Operation.java index b0090ab7d..10134ac4c 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/Operation.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/Operation.java @@ -34,6 +34,7 @@ public enum Operation { LATEST_POINT_QUERY("LATEST_POINT"), RANGE_QUERY_ORDER_BY_TIME_DESC("RANGE_QUERY_DESC"), VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC("VALUE_RANGE_QUERY_DESC"), + GROUP_BY_QUERY_ORDER_BY_TIME_DESC("GROUP_BY_DESC"), VERIFICATION_QUERY("VERIFICATION_QUERY"), DEVICE_QUERY("DEVICE_QUERY"); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/OperationController.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/OperationController.java index 4e761b337..dd6696311 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/OperationController.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/operation/OperationController.java @@ -23,6 +23,7 @@ import cn.edu.tsinghua.iot.benchmark.conf.Config; import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; @@ -30,7 +31,7 @@ public class OperationController { - private static final Logger LOGGER = null; + private static final Logger LOGGER = LoggerFactory.getLogger(OperationController.class); private static Config config = ConfigDescriptor.getInstance().getConfig(); private List proportion = new ArrayList<>(); private List operations = Operation.getNormalOperation(); @@ -108,6 +109,8 @@ public Operation getNextOperationType() { return Operation.RANGE_QUERY_ORDER_BY_TIME_DESC; case 11: return Operation.VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC; + case 12: + return Operation.GROUP_BY_QUERY_ORDER_BY_TIME_DESC; default: LOGGER.error("Unsupported operation {}, use default operation: INGESTION.", i); return Operation.INGESTION; diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java index c9b4832b4..2df8af854 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java @@ -319,7 +319,9 @@ public class Config { * value filtering in reverse order, Eg. select v1... from data where time > ? and time < ? and v1 * > ? and device in ? order by time desc */ - private String OPERATION_PROPORTION = "1:0:0:0:0:0:0:0:0:0:0"; + private String OPERATION_PROPORTION = "1:0:0:0:0:0:0:0:0:0:0:0"; + + private final int OPERATION_PROPORTION_LEN = 12; /** The number of sensors involved in each query */ private int QUERY_SENSOR_NUM = 1; /** The number of devices involved in each query */ @@ -338,6 +340,9 @@ public class Config { /** Query random seed */ private long QUERY_SEED = 151658L; + private long RESULT_ROW_LIMIT = -1; + private boolean ALIGN_BY_DEVICE = false; + // workload 相关部分 /** The size of workload buffer size */ private int WORKLOAD_BUFFER_SIZE = 100; @@ -1198,6 +1203,10 @@ public void setOPERATION_PROPORTION(String OPERATION_PROPORTION) { this.OPERATION_PROPORTION = OPERATION_PROPORTION; } + public int getOPERATION_PROPORTION_LEN() { + return this.OPERATION_PROPORTION_LEN; + } + public int getQUERY_SENSOR_NUM() { return QUERY_SENSOR_NUM; } @@ -1254,6 +1263,22 @@ public void setQUERY_SEED(long QUERY_SEED) { this.QUERY_SEED = QUERY_SEED; } + public long getRESULT_ROW_LIMIT() { + return RESULT_ROW_LIMIT; + } + + public void setRESULT_ROW_LIMIT(long RESULT_ROW_LIMIT) { + this.RESULT_ROW_LIMIT = RESULT_ROW_LIMIT; + } + + public boolean isALIGN_BY_DEVICE() { + return ALIGN_BY_DEVICE; + } + + public void setALIGN_BY_DEVICE(boolean ALIGN_BY_DEVICE) { + this.ALIGN_BY_DEVICE = ALIGN_BY_DEVICE; + } + public int getWORKLOAD_BUFFER_SIZE() { return WORKLOAD_BUFFER_SIZE; } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java index 5ac46a79d..07fbc60df 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java @@ -32,6 +32,7 @@ import java.io.*; import java.lang.reflect.Field; import java.util.Arrays; +import java.util.Objects; import java.util.Properties; import java.util.stream.Collectors; @@ -436,6 +437,12 @@ private void loadProps() { properties.getProperty("GROUP_BY_TIME_UNIT", config.getGROUP_BY_TIME_UNIT() + ""))); config.setQUERY_SEED( Long.parseLong(properties.getProperty("QUERY_SEED", config.getQUERY_SEED() + ""))); + config.setRESULT_ROW_LIMIT( + Long.parseLong( + properties.getProperty("RESULT_ROW_LIMIT", config.getRESULT_ROW_LIMIT() + ""))); + config.setALIGN_BY_DEVICE( + Boolean.parseBoolean( + properties.getProperty("ALIGN_BY_DEVICE", config.isALIGN_BY_DEVICE() + ""))); config.setWORKLOAD_BUFFER_SIZE( Integer.parseInt( @@ -588,10 +595,32 @@ private boolean checkConfig() { } result &= checkDeviceNumPerWrite(); result &= checkTag(); + if (!commonlyUseDB()) { + if (config.isALIGN_BY_DEVICE()) { + result = false; + LOGGER.error("ALIGN_BY_DEVICE is not supported for this database"); + } + if (config.getRESULT_ROW_LIMIT() >= 0) { + result = false; + LOGGER.error("RESULT_ROW_LIMIT is not supported for this database"); + } + } return result; } + private boolean commonlyUseDB() { + DBType dbType = config.getDbConfig().getDB_SWITCH().getType(); + DBVersion dbVersion = config.getDbConfig().getDB_SWITCH().getVersion(); + return Objects.equals(dbVersion, DBVersion.IOTDB_110) + || Objects.equals(dbVersion, DBVersion.TDengine_3) + || (Objects.equals(dbType, DBType.InfluxDB) && dbVersion == null); + } + private boolean checkOperationProportion() { + while (config.getOPERATION_PROPORTION().split(":").length + != config.getOPERATION_PROPORTION_LEN()) { + config.setOPERATION_PROPORTION(config.getOPERATION_PROPORTION() + ":0"); + } String[] op = config.getOPERATION_PROPORTION().split(":"); int minOps = 0; for (String s : op) { diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java index efa398dbd..3bb7f4ab6 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java @@ -317,6 +317,31 @@ public Status groupByQuery(GroupByQuery groupByQuery) { return status; } + @Override + public Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) { + Status status = null; + Operation operation = Operation.GROUP_BY_QUERY_ORDER_BY_TIME_DESC; + String device = "No Device"; + if (groupByQuery.getDeviceSchema().size() > 0) { + device = groupByQuery.getDeviceSchema().get(0).getDevice(); + } + try { + List statuses = new ArrayList<>(); + for (IDatabase database : databases) { + long start = System.nanoTime(); + status = database.groupByQueryOrderByDesc(groupByQuery); + long end = System.nanoTime(); + status.setTimeCost(end - start); + handleQueryOperation(status, operation, device); + statuses.add(status); + } + doComparisonByRecord(groupByQuery, operation, statuses); + } catch (Exception e) { + handleUnexpectedQueryException(operation, e, device); + } + return status; + } + @Override public Status latestPointQuery(LatestPointQuery latestPointQuery) { Status status = null; diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/IDatabase.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/IDatabase.java index c525163d6..028ca8a37 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/IDatabase.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/IDatabase.java @@ -164,6 +164,10 @@ default Status insertOneBatchWithCheck(IBatch batch) throws Exception { /** similar to rangeQuery, but order by time desc. */ Status valueRangeQueryOrderByDesc(ValueRangeQuery valueRangeQuery); + default Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) { + throw new UnsupportedOperationException("This operation is not supported for this database"); + } + /** Using in verification */ default Status verificationQuery(VerificationQuery verificationQuery) { WorkloadException workloadException = new WorkloadException("Not Supported Verification Query"); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/ValueRangeFilter.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/ValueRangeFilter.java new file mode 100644 index 000000000..967f663ae --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/ValueRangeFilter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cn.edu.tsinghua.iot.benchmark.workload; + +import cn.edu.tsinghua.iot.benchmark.entity.Sensor; + +import java.util.List; + +public class ValueRangeFilter { + private final Double minValue; + private final List sensors; + private final Double maxValue; + + public ValueRangeFilter(double minValue, List sensors, double maxValue) { + this.sensors = sensors; + this.minValue = minValue; + this.maxValue = maxValue; + } + + public ValueRangeFilter(double minValue, List sensors) { + this.minValue = minValue; + this.sensors = sensors; + this.maxValue = null; + } + + public ValueRangeFilter(List sensors, double maxValue) { + this.minValue = null; + this.sensors = sensors; + this.maxValue = maxValue; + } + + public List getSensors() { + return sensors; + } + + public Double getMinValue() { + return minValue; + } + + public Double getMaxValue() { + return maxValue; + } +} diff --git a/influxdb/src/main/java/cn/edu/tsinghua/iot/benchmark/influxdb/InfluxDB.java b/influxdb/src/main/java/cn/edu/tsinghua/iot/benchmark/influxdb/InfluxDB.java index 64d594f7e..42aade911 100644 --- a/influxdb/src/main/java/cn/edu/tsinghua/iot/benchmark/influxdb/InfluxDB.java +++ b/influxdb/src/main/java/cn/edu/tsinghua/iot/benchmark/influxdb/InfluxDB.java @@ -140,7 +140,7 @@ public Status insertOneBatch(IBatch batch) { @Override public Status preciseQuery(PreciseQuery preciseQuery) { String sql = getPreciseQuerySql(preciseQuery); - return executeQueryAndGetStatus(sql); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** @@ -151,7 +151,7 @@ public Status preciseQuery(PreciseQuery preciseQuery) { public Status rangeQuery(RangeQuery rangeQuery) { String rangeQueryHead = getSimpleQuerySqlHead(rangeQuery.getDeviceSchema()); String sql = addWhereTimeClause(rangeQueryHead, rangeQuery); - return executeQueryAndGetStatus(sql); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** @@ -167,7 +167,7 @@ public Status valueRangeQuery(ValueRangeQuery valueRangeQuery) { valueRangeQuery.getDeviceSchema(), sqlWithTimeFilter, valueRangeQuery.getValueThreshold()); - return executeQueryAndGetStatus(sqlWithValueFilter); + return addTailClausesAndExecuteQueryAndGetStatus(sqlWithValueFilter); } /** @@ -179,7 +179,7 @@ public Status aggRangeQuery(AggRangeQuery aggRangeQuery) { String aggQuerySqlHead = getAggQuerySqlHead(aggRangeQuery.getDeviceSchema(), aggRangeQuery.getAggFun()); String sql = addWhereTimeClause(aggQuerySqlHead, aggRangeQuery); - return executeQueryAndGetStatus(sql); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** eg. SELECT count(s_3) FROM group_3 WHERE ( device = 'd_12' ) AND s_3 > -5.0. */ @@ -190,7 +190,7 @@ public Status aggValueQuery(AggValueQuery aggValueQuery) { String sql = addWhereValueClause( aggValueQuery.getDeviceSchema(), aggQuerySqlHead, aggValueQuery.getValueThreshold()); - return executeQueryAndGetStatus(sql); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** @@ -207,7 +207,7 @@ public Status aggRangeValueQuery(AggRangeValueQuery aggRangeValueQuery) { aggRangeValueQuery.getDeviceSchema(), sqlWithTimeFilter, aggRangeValueQuery.getValueThreshold()); - return executeQueryAndGetStatus(sqlWithValueFilter); + return addTailClausesAndExecuteQueryAndGetStatus(sqlWithValueFilter); } /** @@ -219,14 +219,14 @@ public Status groupByQuery(GroupByQuery groupByQuery) { String sqlHeader = getAggQuerySqlHead(groupByQuery.getDeviceSchema(), groupByQuery.getAggFun()); String sqlWithTimeFilter = addWhereTimeClause(sqlHeader, groupByQuery); String sqlWithGroupBy = addGroupByClause(sqlWithTimeFilter, groupByQuery.getGranularity()); - return executeQueryAndGetStatus(sqlWithGroupBy); + return addTailClausesAndExecuteQueryAndGetStatus(sqlWithGroupBy); } /** eg. SELECT last(s_2) FROM group_2 WHERE ( device = 'd_8' ). */ @Override public Status latestPointQuery(LatestPointQuery latestPointQuery) { String sql = getAggQuerySqlHead(latestPointQuery.getDeviceSchema(), "last"); - return executeQueryAndGetStatus(sql); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } @Override @@ -234,7 +234,7 @@ public Status rangeQueryOrderByDesc(RangeQuery rangeQuery) { String rangeQueryHead = getSimpleQuerySqlHead(rangeQuery.getDeviceSchema()); String sql = addWhereTimeClause(rangeQueryHead, rangeQuery); sql = addDescClause(sql); - return executeQueryAndGetStatus(sql); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } @Override @@ -247,7 +247,16 @@ public Status valueRangeQueryOrderByDesc(ValueRangeQuery valueRangeQuery) { sqlWithTimeFilter, valueRangeQuery.getValueThreshold()); sqlWithValueFilter = addDescClause(sqlWithValueFilter); - return executeQueryAndGetStatus(sqlWithValueFilter); + return addTailClausesAndExecuteQueryAndGetStatus(sqlWithValueFilter); + } + + @Override + public Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) { + String sql = getAggQuerySqlHead(groupByQuery.getDeviceSchema(), groupByQuery.getAggFun()); + sql = addWhereTimeClause(sql, groupByQuery); + sql = addGroupByClause(sql, groupByQuery.getGranularity()); + sql = addDescClause(sql); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } private String addDescClause(String sql) { @@ -289,9 +298,12 @@ private InfluxDataModel createDataModel( return model; } - private Status executeQueryAndGetStatus(String sql) { + private Status addTailClausesAndExecuteQueryAndGetStatus(String sql) { + if (config.getRESULT_ROW_LIMIT() >= 0) { + sql += " limit " + config.getRESULT_ROW_LIMIT(); + } if (!config.isIS_QUIET_MODE()) { - LOGGER.debug("{} query SQL: {}", Thread.currentThread().getName(), sql); + LOGGER.info("{} query SQL: {}", Thread.currentThread().getName(), sql); } QueryResult results = influxDbInstance.query(new Query(sql, influxDbName)); @@ -370,8 +382,10 @@ private static String addGroupByClause(String sqlHeader, long timeGranularity) { private static String getSimpleQuerySqlHead(List devices) { StringBuilder builder = new StringBuilder(); builder.append("SELECT "); + if (config.isALIGN_BY_DEVICE()) { + builder.append("device, "); + } List querySensors = devices.get(0).getSensors(); - builder.append(querySensors.get(0).getName()); for (int i = 1; i < querySensors.size(); i++) { builder.append(", ").append(querySensors.get(i).getName()); @@ -391,8 +405,10 @@ private static String getSimpleQuerySqlHead(List devices) { private static String getAggQuerySqlHead(List devices, String method) { StringBuilder builder = new StringBuilder(); builder.append("SELECT "); + if (config.isALIGN_BY_DEVICE()) { + builder.append("device, "); + } List querySensors = devices.get(0).getSensors(); - builder.append(method).append("(").append(querySensors.get(0).getName()).append(")"); for (int i = 1; i < querySensors.size(); i++) { builder diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java index da770de36..b1bae6a4c 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java @@ -89,6 +89,7 @@ public class IoTDB implements IDatabase { private static final AtomicBoolean templateInit = new AtomicBoolean(false); private static final int ACTIVATE_TEMPLATE_THRESHOLD = 1000; protected final String DELETE_SERIES_SQL; + private final String ORDER_BY_TIME_DESC = " order by time desc "; protected SingleNodeJDBCConnection ioTDBConnection; protected static final Config config = ConfigDescriptor.getInstance().getConfig(); @@ -424,7 +425,7 @@ public Status insertOneBatch(IBatch batch) throws DBConnectException { public Status preciseQuery(PreciseQuery preciseQuery) { String strTime = preciseQuery.getTimestamp() + ""; String sql = getSimpleQuerySqlHead(preciseQuery.getDeviceSchema()) + " WHERE time = " + strTime; - return executeQueryAndGetStatus(sql, Operation.PRECISE_QUERY); + return addTailClausesAndExecuteQueryAndGetStatus(sql, Operation.PRECISE_QUERY); } /** @@ -441,7 +442,7 @@ public Status rangeQuery(RangeQuery rangeQuery) { rangeQuery.getDeviceSchema(), rangeQuery.getStartTimestamp(), rangeQuery.getEndTimestamp()); - return executeQueryAndGetStatus(sql, Operation.RANGE_QUERY); + return addTailClausesAndExecuteQueryAndGetStatus(sql, Operation.RANGE_QUERY); } /** @@ -454,7 +455,7 @@ public Status rangeQuery(RangeQuery rangeQuery) { @Override public Status valueRangeQuery(ValueRangeQuery valueRangeQuery) { String sql = getValueRangeQuerySql(valueRangeQuery); - return executeQueryAndGetStatus(sql, Operation.VALUE_RANGE_QUERY); + return addTailClausesAndExecuteQueryAndGetStatus(sql, Operation.VALUE_RANGE_QUERY); } /** @@ -471,7 +472,7 @@ public Status aggRangeQuery(AggRangeQuery aggRangeQuery) { String sql = addWhereTimeClause( aggQuerySqlHead, aggRangeQuery.getStartTimestamp(), aggRangeQuery.getEndTimestamp()); - return executeQueryAndGetStatus(sql, Operation.AGG_RANGE_QUERY); + return addTailClausesAndExecuteQueryAndGetStatus(sql, Operation.AGG_RANGE_QUERY); } /** @@ -490,7 +491,7 @@ public Status aggValueQuery(AggValueQuery aggValueQuery) { + getValueFilterClause( aggValueQuery.getDeviceSchema(), (int) aggValueQuery.getValueThreshold()) .substring(4); - return executeQueryAndGetStatus(sql, Operation.AGG_VALUE_QUERY); + return addTailClausesAndExecuteQueryAndGetStatus(sql, Operation.AGG_VALUE_QUERY); } /** @@ -513,7 +514,7 @@ public Status aggRangeValueQuery(AggRangeValueQuery aggRangeValueQuery) { sql += getValueFilterClause( aggRangeValueQuery.getDeviceSchema(), (int) aggRangeValueQuery.getValueThreshold()); - return executeQueryAndGetStatus(sql, Operation.AGG_RANGE_VALUE_QUERY); + return addTailClausesAndExecuteQueryAndGetStatus(sql, Operation.AGG_RANGE_VALUE_QUERY); } /** @@ -533,7 +534,7 @@ public Status groupByQuery(GroupByQuery groupByQuery) { groupByQuery.getStartTimestamp(), groupByQuery.getEndTimestamp(), groupByQuery.getGranularity()); - return executeQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY); + return addTailClausesAndExecuteQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY); } /** @@ -545,7 +546,7 @@ public Status groupByQuery(GroupByQuery groupByQuery) { @Override public Status latestPointQuery(LatestPointQuery latestPointQuery) { String aggQuerySqlHead = getLatestPointQuerySql(latestPointQuery.getDeviceSchema()); - return executeQueryAndGetStatus(aggQuerySqlHead, Operation.LATEST_POINT_QUERY); + return addTailClausesAndExecuteQueryAndGetStatus(aggQuerySqlHead, Operation.LATEST_POINT_QUERY); } /** @@ -562,8 +563,8 @@ public Status rangeQueryOrderByDesc(RangeQuery rangeQuery) { rangeQuery.getDeviceSchema(), rangeQuery.getStartTimestamp(), rangeQuery.getEndTimestamp()) - + " order by time desc"; - return executeQueryAndGetStatus(sql, Operation.RANGE_QUERY_ORDER_BY_TIME_DESC); + + ORDER_BY_TIME_DESC; + return addTailClausesAndExecuteQueryAndGetStatus(sql, Operation.RANGE_QUERY_ORDER_BY_TIME_DESC); } /** @@ -575,8 +576,25 @@ public Status rangeQueryOrderByDesc(RangeQuery rangeQuery) { */ @Override public Status valueRangeQueryOrderByDesc(ValueRangeQuery valueRangeQuery) { - String sql = getValueRangeQuerySql(valueRangeQuery) + " order by time desc"; - return executeQueryAndGetStatus(sql, Operation.VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC); + String sql = getValueRangeQuerySql(valueRangeQuery) + ORDER_BY_TIME_DESC; + return addTailClausesAndExecuteQueryAndGetStatus( + sql, Operation.VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC); + } + + /** Q11: Q7 order by time desc */ + @Override + public Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) { + String aggQuerySqlHead = + getAggQuerySqlHead(groupByQuery.getDeviceSchema(), groupByQuery.getAggFun()); + String sql = + addGroupByClause( + aggQuerySqlHead, + groupByQuery.getStartTimestamp(), + groupByQuery.getEndTimestamp(), + groupByQuery.getGranularity()); + sql += ORDER_BY_TIME_DESC; + return addTailClausesAndExecuteQueryAndGetStatus( + sql, Operation.GROUP_BY_QUERY_ORDER_BY_TIME_DESC); } /** @@ -696,7 +714,13 @@ protected String getDevicePath(DeviceSchema deviceSchema) { return name.toString(); } - protected Status executeQueryAndGetStatus(String sql, Operation operation) { + protected Status addTailClausesAndExecuteQueryAndGetStatus(String sql, Operation operation) { + if (config.getRESULT_ROW_LIMIT() >= 0) { + sql += " limit " + config.getRESULT_ROW_LIMIT(); + } + if (config.isALIGN_BY_DEVICE()) { + sql += " align by device"; + } String executeSQL; if (config.isIOTDB_USE_DEBUG() && random.nextDouble() < config.getIOTDB_USE_DEBUG_RATIO()) { executeSQL = "debug " + sql; @@ -741,7 +765,8 @@ protected Status executeQueryAndGetStatus(String sql, Operation operation) { isOk.set(false); } long resultPointNum = line.get(); - if (!Operation.LATEST_POINT_QUERY.equals(operation)) { + if (!Operation.LATEST_POINT_QUERY.equals(operation) + || !config.isALIGN_BY_DEVICE()) { resultPointNum *= config.getQUERY_SENSOR_NUM(); resultPointNum *= config.getQUERY_DEVICE_NUM(); } @@ -899,7 +924,7 @@ protected String getDeviceQuerySql( sql.append(getSimpleQuerySqlHead(deviceSchemas)); sql.append(" where time >= ").append(startTimeStamp); sql.append(" and time <").append(endTimeStamp); - sql.append(" order by time desc"); + sql.append(ORDER_BY_TIME_DESC); return sql.toString(); } @@ -933,7 +958,7 @@ protected String getMinTimeStampSql(DeviceSchema deviceSchema) { } protected String getMaxTimeStampSql(DeviceSchema deviceSchema) { - return "select * from " + getDevicePath(deviceSchema) + " order by time desc limit 1"; + return "select * from " + getDevicePath(deviceSchema) + ORDER_BY_TIME_DESC + " limit 1"; } String getEncodingType(SensorType dataSensorType) { diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java index b085c2fdd..e02113a79 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java @@ -169,7 +169,13 @@ public Status insertOneBatchByRecords(IBatch batch) { } @Override - protected Status executeQueryAndGetStatus(String sql, Operation operation) { + protected Status addTailClausesAndExecuteQueryAndGetStatus(String sql, Operation operation) { + if (config.getRESULT_ROW_LIMIT() >= 0) { + sql += " limit " + config.getRESULT_ROW_LIMIT(); + } + if (config.isALIGN_BY_DEVICE()) { + sql += " align by device"; + } String executeSQL; if (config.isIOTDB_USE_DEBUG() && random.nextDouble() < config.getIOTDB_USE_DEBUG_RATIO()) { executeSQL = "debug " + sql; @@ -230,7 +236,8 @@ protected Status executeQueryAndGetStatus(String sql, Operation operation) { isOk.set(false); } long resultPointNum = line.get(); - if (!Operation.LATEST_POINT_QUERY.equals(operation)) { + if (!Operation.LATEST_POINT_QUERY.equals(operation) + || !config.isALIGN_BY_DEVICE()) { resultPointNum *= config.getQUERY_SENSOR_NUM(); resultPointNum *= config.getQUERY_DEVICE_NUM(); } diff --git a/tdengine-3.0/src/main/java/cn/edu/tsinghua/iot/benchmark/tdengine3/TDengine.java b/tdengine-3.0/src/main/java/cn/edu/tsinghua/iot/benchmark/tdengine3/TDengine.java index c79fe8669..85b4cd6f7 100644 --- a/tdengine-3.0/src/main/java/cn/edu/tsinghua/iot/benchmark/tdengine3/TDengine.java +++ b/tdengine-3.0/src/main/java/cn/edu/tsinghua/iot/benchmark/tdengine3/TDengine.java @@ -32,6 +32,7 @@ import cn.edu.tsinghua.iot.benchmark.tsdb.IDatabase; import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; import cn.edu.tsinghua.iot.benchmark.utils.TimeUtils; +import cn.edu.tsinghua.iot.benchmark.workload.ValueRangeFilter; import cn.edu.tsinghua.iot.benchmark.workload.query.impl.AggRangeQuery; import cn.edu.tsinghua.iot.benchmark.workload.query.impl.AggRangeValueQuery; import cn.edu.tsinghua.iot.benchmark.workload.query.impl.AggValueQuery; @@ -49,6 +50,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -64,6 +66,8 @@ public class TDengine implements IDatabase { new CyclicBarrier(config.getCLIENT_NUMBER()); private static final String USE_DB = "use %s"; private static final String SUPER_TABLE_NAME = "device"; + private static final String ORDER_BY_TIME_DESC = " order by time desc "; + private static final String ORDER_BY_WSTART_DESC = " order by _wstart desc "; private static final AtomicBoolean isInit = new AtomicBoolean(false); private final String CREATE_STABLE; @@ -265,7 +269,7 @@ private String getInsertOneRecordSql( @Override public Status preciseQuery(PreciseQuery preciseQuery) { String sql = getPreciseQuerySql(preciseQuery); - return executeQueryAndGetStatus(sql); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** @@ -275,8 +279,10 @@ public Status preciseQuery(PreciseQuery preciseQuery) { @Override public Status rangeQuery(RangeQuery rangeQuery) { String rangeQueryHead = getSimpleQuerySqlHead(rangeQuery.getDeviceSchema()); - String sql = addWhereTimeClause(rangeQueryHead, rangeQuery); - return executeQueryAndGetStatus(sql); + String sql = + addWhereClause( + rangeQueryHead, rangeQuery, null, getTableNameFilterForAlignByDevice(rangeQuery)); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** @@ -286,13 +292,15 @@ public Status rangeQuery(RangeQuery rangeQuery) { @Override public Status valueRangeQuery(ValueRangeQuery valueRangeQuery) { String rangeQueryHead = getSimpleQuerySqlHead(valueRangeQuery.getDeviceSchema()); - String sqlWithTimeFilter = addWhereTimeClause(rangeQueryHead, valueRangeQuery); - String sqlWithValueFilter = - addWhereValueClause( - valueRangeQuery.getDeviceSchema(), - sqlWithTimeFilter, - valueRangeQuery.getValueThreshold()); - return executeQueryAndGetStatus(sqlWithValueFilter); + String sql = + addWhereClause( + rangeQueryHead, + valueRangeQuery, + new ValueRangeFilter( + valueRangeQuery.getValueThreshold(), + valueRangeQuery.getDeviceSchema().get(0).getSensors()), + getTableNameFilterForAlignByDevice(valueRangeQuery)); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** @@ -303,8 +311,13 @@ public Status valueRangeQuery(ValueRangeQuery valueRangeQuery) { public Status aggRangeQuery(AggRangeQuery aggRangeQuery) { String aggQuerySqlHead = getAggQuerySqlHead(aggRangeQuery.getDeviceSchema(), aggRangeQuery.getAggFun()); - String sql = addWhereTimeClause(aggQuerySqlHead, aggRangeQuery); - return executeQueryAndGetStatus(sql); + String sql = + addWhereClause( + aggQuerySqlHead, + aggRangeQuery, + null, + getTableNameFilterForAlignByDevice(aggRangeQuery)); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** eg. SELECT count(s_3) FROM group_3 WHERE ( device = 'd_12' ) AND s_3 > -5.0. */ @@ -313,9 +326,14 @@ public Status aggValueQuery(AggValueQuery aggValueQuery) { String aggQuerySqlHead = getAggQuerySqlHead(aggValueQuery.getDeviceSchema(), aggValueQuery.getAggFun()); String sql = - addWhereValueWithoutTimeClause( - aggValueQuery.getDeviceSchema(), aggQuerySqlHead, aggValueQuery.getValueThreshold()); - return executeQueryAndGetStatus(sql); + addWhereClause( + aggQuerySqlHead, + null, + new ValueRangeFilter( + aggValueQuery.getValueThreshold(), + aggValueQuery.getDeviceSchema().get(0).getSensors()), + getTableNameFilterForAlignByDevice(aggValueQuery)); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** @@ -326,13 +344,15 @@ public Status aggValueQuery(AggValueQuery aggValueQuery) { public Status aggRangeValueQuery(AggRangeValueQuery aggRangeValueQuery) { String rangeQueryHead = getAggQuerySqlHead(aggRangeValueQuery.getDeviceSchema(), aggRangeValueQuery.getAggFun()); - String sqlWithTimeFilter = addWhereTimeClause(rangeQueryHead, aggRangeValueQuery); - String sqlWithValueFilter = - addWhereValueClause( - aggRangeValueQuery.getDeviceSchema(), - sqlWithTimeFilter, - aggRangeValueQuery.getValueThreshold()); - return executeQueryAndGetStatus(sqlWithValueFilter); + String sql = + addWhereClause( + rangeQueryHead, + aggRangeValueQuery, + new ValueRangeFilter( + aggRangeValueQuery.getValueThreshold(), + aggRangeValueQuery.getDeviceSchema().get(0).getSensors()), + getTableNameFilterForAlignByDevice(aggRangeValueQuery)); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** @@ -342,36 +362,54 @@ public Status aggRangeValueQuery(AggRangeValueQuery aggRangeValueQuery) { @Override public Status groupByQuery(GroupByQuery groupByQuery) { String sqlHeader = getAggQuerySqlHead(groupByQuery.getDeviceSchema(), groupByQuery.getAggFun()); - String sqlWithTimeFilter = addWhereTimeClause(sqlHeader, groupByQuery); - String sqlWithGroupBy = addGroupByClause(sqlWithTimeFilter, groupByQuery.getGranularity()); - return executeQueryAndGetStatus(sqlWithGroupBy); + String sql = + addWhereClause( + sqlHeader, groupByQuery, null, getTableNameFilterForAlignByDevice(groupByQuery)); + sql = addGroupByClause(sql, groupByQuery.getGranularity()); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } /** eg. SELECT last(s_2) FROM group_2 WHERE ( device = 'd_8' ). */ @Override public Status latestPointQuery(LatestPointQuery latestPointQuery) { String sql = getAggQuerySqlHead(latestPointQuery.getDeviceSchema(), "last"); - return executeQueryAndGetStatus(sql); + return addTailClausesAndExecuteQueryAndGetStatus(sql); } @Override public Status rangeQueryOrderByDesc(RangeQuery rangeQuery) { String rangeQueryHead = getSimpleQuerySqlHead(rangeQuery.getDeviceSchema()); - String sql = addWhereTimeClause(rangeQueryHead, rangeQuery) + " order by time desc"; - return executeQueryAndGetStatus(sql); + String sql = + addWhereClause( + rangeQueryHead, rangeQuery, null, getTableNameFilterForAlignByDevice(rangeQuery)); + sql += ORDER_BY_TIME_DESC; + return addTailClausesAndExecuteQueryAndGetStatus(sql); } @Override public Status valueRangeQueryOrderByDesc(ValueRangeQuery valueRangeQuery) { String rangeQueryHead = getSimpleQuerySqlHead(valueRangeQuery.getDeviceSchema()); - String sqlWithTimeFilter = addWhereTimeClause(rangeQueryHead, valueRangeQuery); - String sqlWithValueFilter = - addWhereValueClause( - valueRangeQuery.getDeviceSchema(), - sqlWithTimeFilter, - valueRangeQuery.getValueThreshold()) - + " order by time desc"; - return executeQueryAndGetStatus(sqlWithValueFilter); + String sql = + addWhereClause( + rangeQueryHead, + valueRangeQuery, + new ValueRangeFilter( + valueRangeQuery.getValueThreshold(), + valueRangeQuery.getDeviceSchema().get(0).getSensors()), + getTableNameFilterForAlignByDevice(valueRangeQuery)); + sql += ORDER_BY_TIME_DESC; + return addTailClausesAndExecuteQueryAndGetStatus(sql); + } + + @Override + public Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) { + String sqlHeader = getAggQuerySqlHead(groupByQuery.getDeviceSchema(), groupByQuery.getAggFun()); + String sql = + addWhereClause( + sqlHeader, groupByQuery, null, getTableNameFilterForAlignByDevice(groupByQuery)); + sql = addGroupByClause(sql, groupByQuery.getGranularity()); + sql += ORDER_BY_WSTART_DESC; + return addTailClausesAndExecuteQueryAndGetStatus(sql); } private String getPreciseQuerySql(PreciseQuery preciseQuery) { @@ -408,18 +446,18 @@ private static String getSimpleQuerySqlHead(List devices) { */ private static String generateConstrainForDevices(List devices) { StringBuilder builder = new StringBuilder(); - builder.append(" FROM ").append(devices.get(0).getDevice()); - // builder.append(" WHERE "); - /*for (DeviceSchema d : devices) { - builder.append(" device = '").append(d.getDevice()).append("' OR"); + if (config.isALIGN_BY_DEVICE()) { + builder.append(" FROM ").append(SUPER_TABLE_NAME); + } else { + builder.append(" FROM ").append(devices.get(0).getDevice()); } - builder.delete(builder.lastIndexOf("OR"), builder.length()); - builder.append(")"); - */ return builder.toString(); } - private Status executeQueryAndGetStatus(String sql) { + private Status addTailClausesAndExecuteQueryAndGetStatus(String sql) { + if (config.getRESULT_ROW_LIMIT() >= 0) { + sql += " limit " + config.getRESULT_ROW_LIMIT(); + } if (!config.isIS_QUIET_MODE()) { LOGGER.info("{} query SQL: {}", Thread.currentThread().getName(), sql); } @@ -443,52 +481,60 @@ private Status executeQueryAndGetStatus(String sql) { } /** - * add time filter for query statements. + * Add WHERE clause. Include time filter, value filter, table name filter * - * @param sql sql header - * @param rangeQuery range query - * @return sql with time filter + * @param sql The original sql + * @param timeRangeQuery + * @param valueRangeFilter + * @param alignByDeviceTableNameFilter + * @return New sql, with WHERE clause */ - private static String addWhereTimeClause(String sql, RangeQuery rangeQuery) { - String startTime = "" + rangeQuery.getStartTimestamp(); - String endTime = "" + rangeQuery.getEndTimestamp(); - return sql + " Where time >= " + startTime + " AND time <= " + endTime; - } - - /** - * add value filter for query statements. - * - * @param devices query device schema - * @param sqlHeader sql header - * @param valueThreshold lower bound of query value filter - * @return sql with value filter - */ - private static String addWhereValueClause( - List devices, String sqlHeader, double valueThreshold) { - StringBuilder builder = new StringBuilder(sqlHeader); - for (Sensor sensor : devices.get(0).getSensors()) { - builder.append(" AND ").append(sensor.getName()).append(" > ").append(valueThreshold); + private static String addWhereClause( + String sql, + RangeQuery timeRangeQuery, + ValueRangeFilter valueRangeFilter, + List alignByDeviceTableNameFilter) { + if (timeRangeQuery == null + && valueRangeFilter == null + && !alignByDeviceTableNameFilter.isEmpty()) { + return sql; } - return builder.toString(); + StringBuilder sqlBuilder = new StringBuilder(sql); + sqlBuilder.append(" WHERE "); + if (timeRangeQuery != null) { + String startTime = "" + timeRangeQuery.getStartTimestamp(); + String endTime = "" + timeRangeQuery.getEndTimestamp(); + sqlBuilder.append(" time >= ").append(startTime).append(" AND time <= ").append(endTime); + } + if (valueRangeFilter != null) { + if (!sqlBuilder.toString().endsWith("WHERE ")) { + sqlBuilder.append(" AND "); + } + double valueThreshold = valueRangeFilter.getMinValue(); + for (Sensor sensor : valueRangeFilter.getSensors()) { + sqlBuilder.append(sensor.getName()).append(" > ").append(valueThreshold).append(" AND "); + } + sqlBuilder.delete(sqlBuilder.length() - 4, sqlBuilder.length()); + } + if (!alignByDeviceTableNameFilter.isEmpty()) { + if (!sqlBuilder.toString().endsWith("WHERE ")) { + sqlBuilder.append(" AND "); + } + sqlBuilder.append(" tbname in ("); + for (DeviceSchema deviceSchema : alignByDeviceTableNameFilter) { + sqlBuilder.append('"').append(deviceSchema.getDevice()).append('"').append(','); + } + sqlBuilder.deleteCharAt(sqlBuilder.length() - 1); + sqlBuilder.append(')'); + } + return sqlBuilder.toString(); } - /** - * add value filter without time filter for query statements. - * - * @param devices query device schema - * @param sqlHeader sql header - * @param valueThreshold lower bound of query value filter - * @return sql with value filter - */ - private static String addWhereValueWithoutTimeClause( - List devices, String sqlHeader, double valueThreshold) { - StringBuilder builder = new StringBuilder(sqlHeader); - builder.append(" Where "); - for (Sensor sensor : devices.get(0).getSensors()) { - builder.append(sensor.getName()).append(" > ").append(valueThreshold).append(" AND "); + private static List getTableNameFilterForAlignByDevice(RangeQuery rangeQuery) { + if (config.isALIGN_BY_DEVICE()) { + return rangeQuery.getDeviceSchema(); } - builder.delete(builder.lastIndexOf("AND"), builder.length()); - return builder.toString(); + return Collections.emptyList(); } /**