Skip to content

Commit

Permalink
Support iotdb table model Q4-Q8,Q11 (#455)
Browse files Browse the repository at this point in the history
* Support iotdb table model Q4-Q8,Q11

* Remove logs used by tests

* Reduce code duplication

* addGroupByClause is changed to private
  • Loading branch information
YangYumings authored Oct 16, 2024
1 parent 14691fd commit 3bcf778
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 34 deletions.
1 change: 0 additions & 1 deletion configuration/conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@
# Q9 倒序范围查询(只限制起止时间)select v1... from data where time > ? and time < ? and device in ? order by time desc
# Q10 倒序带值过滤的范围查询 select v1... from data where time > ? and time < ? and v1 > ? and device in ? order by time desc
# Q11 分组聚合查询,倒序;目前仅支持iotdb、tdengine-3.0、influxdb v1
# IoTDB-2.0 表模型支持 Q1 Q2 Q3 Q9 Q10
# OPERATION_PROPORTION=1:0:0:0:0:0:0:0:0:0:0:0

# 最长等待写时间,单位毫秒,即如果整个写操作在指定时间内没有返回,则终止此操作
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,8 @@ private boolean checkDatabaseVerification(DBConfig dbConfig) {
boolean result = false;
if (dbConfig.getDB_SWITCH().getType() == DBType.IoTDB) {
// support after iotdb 1.0
if (dbConfig.getDB_SWITCH().getVersion() == DBVersion.IOTDB_130
if (dbConfig.getDB_SWITCH().getVersion() == DBVersion.IOTDB_200
|| dbConfig.getDB_SWITCH().getVersion() == DBVersion.IOTDB_130
|| dbConfig.getDB_SWITCH().getVersion() == DBVersion.IOTDB_110
|| dbConfig.getDB_SWITCH().getVersion() == DBVersion.IOTDB_100) {
result = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ public Status rangeQuery(RangeQuery rangeQuery) {
rangeQuery.getDeviceSchema(),
rangeQuery.getStartTimestamp(),
rangeQuery.getEndTimestamp());
sql = modelStrategy.addDeviceIDColumnIfNecessary(rangeQuery.getDeviceSchema(), sql);
return executeQueryAndGetStatus(sql, Operation.RANGE_QUERY);
}

Expand All @@ -254,6 +255,7 @@ public Status rangeQuery(RangeQuery rangeQuery) {
@Override
public Status valueRangeQuery(ValueRangeQuery valueRangeQuery) {
String sql = getValueRangeQuerySql(valueRangeQuery);
sql = modelStrategy.addDeviceIDColumnIfNecessary(valueRangeQuery.getDeviceSchema(), sql);
return executeQueryAndGetStatus(sql, Operation.VALUE_RANGE_QUERY);
}

Expand All @@ -270,6 +272,7 @@ public Status aggRangeQuery(AggRangeQuery aggRangeQuery) {
String sql =
addWhereTimeClause(
aggQuerySqlHead, aggRangeQuery.getStartTimestamp(), aggRangeQuery.getEndTimestamp());
sql = modelStrategy.addDeviceIDColumnIfNecessary(aggRangeQuery.getDeviceSchema(), sql);
return executeQueryAndGetStatus(sql, Operation.AGG_RANGE_QUERY);
}

Expand All @@ -288,6 +291,7 @@ public Status aggValueQuery(AggValueQuery aggValueQuery) {
+ getValueFilterClause(
aggValueQuery.getDeviceSchema(), (int) aggValueQuery.getValueThreshold())
.substring(4);
sql = modelStrategy.addDeviceIDColumnIfNecessary(aggValueQuery.getDeviceSchema(), sql);
return executeQueryAndGetStatus(sql, Operation.AGG_VALUE_QUERY);
}

Expand All @@ -310,6 +314,7 @@ public Status aggRangeValueQuery(AggRangeValueQuery aggRangeValueQuery) {
sql +=
getValueFilterClause(
aggRangeValueQuery.getDeviceSchema(), (int) aggRangeValueQuery.getValueThreshold());
sql = modelStrategy.addDeviceIDColumnIfNecessary(aggRangeValueQuery.getDeviceSchema(), sql);
return executeQueryAndGetStatus(sql, Operation.AGG_RANGE_VALUE_QUERY);
}

Expand All @@ -321,14 +326,7 @@ public Status aggRangeValueQuery(AggRangeValueQuery aggRangeValueQuery) {
*/
@Override
public Status groupByQuery(GroupByQuery groupByQuery) {
String aggQuerySqlHead =
getAggQuerySqlHead(groupByQuery.getDeviceSchema(), groupByQuery.getAggFun());
String sql =
addGroupByClause(
aggQuerySqlHead,
groupByQuery.getStartTimestamp(),
groupByQuery.getEndTimestamp(),
groupByQuery.getGranularity());
String sql = modelStrategy.getGroupByQuerySQL(groupByQuery);
return executeQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY);
}

Expand All @@ -339,8 +337,8 @@ public Status groupByQuery(GroupByQuery groupByQuery) {
*/
@Override
public Status latestPointQuery(LatestPointQuery latestPointQuery) {
String aggQuerySqlHead = getLatestPointQuerySql(latestPointQuery.getDeviceSchema());
return executeQueryAndGetStatus(aggQuerySqlHead, Operation.LATEST_POINT_QUERY);
String latestPointSqlHead = getLatestPointQuerySql(latestPointQuery.getDeviceSchema());
return executeQueryAndGetStatus(latestPointSqlHead, Operation.LATEST_POINT_QUERY);
}

/**
Expand All @@ -353,9 +351,11 @@ public Status latestPointQuery(LatestPointQuery latestPointQuery) {
public Status rangeQueryOrderByDesc(RangeQuery rangeQuery) {
String sql =
getRangeQuerySql(
rangeQuery.getDeviceSchema(),
rangeQuery.getStartTimestamp(),
rangeQuery.getEndTimestamp())
rangeQuery.getDeviceSchema(),
rangeQuery.getStartTimestamp(),
rangeQuery.getEndTimestamp());
sql =
modelStrategy.addDeviceIDColumnIfNecessary(rangeQuery.getDeviceSchema(), sql)
+ " order by time desc";
return executeQueryAndGetStatus(sql, Operation.RANGE_QUERY_ORDER_BY_TIME_DESC);
}
Expand All @@ -368,20 +368,23 @@ public Status rangeQueryOrderByDesc(RangeQuery rangeQuery) {
*/
@Override
public Status valueRangeQueryOrderByDesc(ValueRangeQuery valueRangeQuery) {
String sql = getValueRangeQuerySql(valueRangeQuery) + " order by time desc";
String sql = getValueRangeQuerySql(valueRangeQuery);
sql =
modelStrategy.addDeviceIDColumnIfNecessary(valueRangeQuery.getDeviceSchema(), sql)
+ " order by time desc";
return executeQueryAndGetStatus(sql, Operation.VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC);
}

/**
* Q11: GroupByQuery SQL: select {AggFun}({sensors}) from {devices} group by ([{start}, {end}], *
* {Granularity}ms) order by time desc
*
* @param groupByQuery
* @return
*/
@Override
public Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) {
String aggQuerySqlHead =
getAggQuerySqlHead(groupByQuery.getDeviceSchema(), groupByQuery.getAggFun());
String sql =
addGroupByClause(
aggQuerySqlHead,
groupByQuery.getStartTimestamp(),
groupByQuery.getEndTimestamp(),
groupByQuery.getGranularity());
String sql = modelStrategy.getGroupByQuerySQL(groupByQuery);
sql += ORDER_BY_TIME_DESC;
return executeQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY_ORDER_BY_TIME_DESC);
}
Expand Down Expand Up @@ -455,24 +458,21 @@ private String getLatestPointQuerySql(List<DeviceSchema> devices) {
for (int i = 1; i < querySensors.size(); i++) {
builder.append(", ").append(querySensors.get(i).getName());
}
return addFromClause(devices, builder);
String sql = addFromClause(devices, builder);
sql = modelStrategy.addWhereValueClauseIfNecessary(devices, sql);
return sql;
}

private String getRangeQuerySql(List<DeviceSchema> deviceSchemas, long start, long end) {
return modelStrategy.addDeviceIDColumnIfNecessary(
deviceSchemas, addWhereTimeClause(getSimpleQuerySqlHead(deviceSchemas), start, end));
return addWhereTimeClause(getSimpleQuerySqlHead(deviceSchemas), start, end);
}

private String addWhereTimeClause(String prefix, long start, long end) {
public static String addWhereTimeClause(String prefix, long start, long end) {
String startTime = start + "";
String endTime = end + "";
return prefix + " WHERE time >= " + startTime + " AND time <= " + endTime;
}

private String addGroupByClause(String prefix, long start, long end, long granularity) {
return prefix + " group by ([" + start + "," + end + ")," + granularity + "ms) ";
}

protected Status executeQueryAndGetStatus(String sql, Operation operation) {
String executeSQL;
if (config.isIOTDB_USE_DEBUG() && random.nextDouble() < config.getIOTDB_USE_DEBUG_RATIO()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema;
import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig;
import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException;
import cn.edu.tsinghua.iot.benchmark.workload.query.impl.GroupByQuery;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
Expand Down Expand Up @@ -131,6 +132,28 @@ public abstract void sessionCleanupImpl(Session session)

// endregion

// region query

public abstract String getGroupByQuerySQL(GroupByQuery groupByQuery);

public abstract String addWhereValueClauseIfNecessary(List<DeviceSchema> devices, String prefix);

protected String getAggFunForGroupByQuery(List<Sensor> querySensors, String aggFunction) {
StringBuilder builder = new StringBuilder();
builder.append(aggFunction).append("(").append(querySensors.get(0).getName()).append(")");
for (int i = 1; i < querySensors.size(); i++) {
builder
.append(", ")
.append(aggFunction)
.append("(")
.append(querySensors.get(i).getName())
.append(")");
}
return builder.toString();
}

// endregion

public abstract Logger getLogger();

public void handleRegisterException(Exception e) throws TsdbException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema;
import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig;
import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException;
import cn.edu.tsinghua.iot.benchmark.workload.query.impl.GroupByQuery;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
Expand Down Expand Up @@ -163,11 +164,14 @@ public String addFromClause(List<DeviceSchema> devices, StringBuilder builder) {

@Override
public String addDeviceIDColumnIfNecessary(List<DeviceSchema> deviceSchemas, String sql) {
return sql + " AND" + getDeviceIDColumn(deviceSchemas);
}

public String getDeviceIDColumn(List<DeviceSchema> deviceSchemas) {
Set<String> deviceIds = new HashSet<>();
StringBuilder builder = new StringBuilder();
builder
.append(sql)
.append(" AND (")
.append(" (")
.append("device_id")
.append(" = '")
.append(deviceSchemas.get(0).getDevice())
Expand Down Expand Up @@ -357,6 +361,48 @@ public void sessionCleanupImpl(Session session)
}
}

/**
* eg. SELECT data_bin(20000ms, time), count(s_1), count(s_3), count(s_4) FROM test_g_0.table_0
* WHERE time >= 1640966400000 AND time < 1640966650000 AND (device_id='d_0' OR device_id='d_3')
* group by time
*
* <p>getAggForGroupByQuery
*/
@Override
public String getGroupByQuerySQL(GroupByQuery groupByQuery) {
StringBuilder builder = new StringBuilder();
// SELECT
builder
.append("SELECT ")
.append("data_bin(")
.append(groupByQuery.getGranularity())
.append("ms, ")
.append("time), ")
.append(
getAggFunForGroupByQuery(
groupByQuery.getDeviceSchema().get(0).getSensors(), groupByQuery.getAggFun()));
// FROM
String sql = addFromClause(groupByQuery.getDeviceSchema(), builder);
// WHERE
sql =
IoTDB.addWhereTimeClause(
sql, groupByQuery.getStartTimestamp(), groupByQuery.getEndTimestamp());
sql = addDeviceIDColumnIfNecessary(groupByQuery.getDeviceSchema(), sql);
// GROUP BY
sql = addGroupByClause(sql);
return sql;
}

@Override
public String addWhereValueClauseIfNecessary(List<DeviceSchema> devices, String prefix) {
String sql = prefix + " WHERE" + getDeviceIDColumn(devices);
return sql;
}

private String addGroupByClause(String prefix) {
return prefix + " group by time";
}

// endregion

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema;
import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig;
import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException;
import cn.edu.tsinghua.iot.benchmark.workload.query.impl.GroupByQuery;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
Expand Down Expand Up @@ -355,6 +356,36 @@ public void sessionCleanupImpl(Session session) {
}
}

@Override
public String getGroupByQuerySQL(GroupByQuery groupByQuery) {
StringBuilder builder = new StringBuilder();
// SELECT
builder
.append("SELECT ")
.append(
getAggFunForGroupByQuery(
groupByQuery.getDeviceSchema().get(0).getSensors(), groupByQuery.getAggFun()));
// FROM
String sql = addFromClause(groupByQuery.getDeviceSchema(), builder);
// GROUP BY
sql =
addGroupByClause(
sql,
groupByQuery.getStartTimestamp(),
groupByQuery.getEndTimestamp(),
groupByQuery.getGranularity());
return sql;
}

@Override
public String addWhereValueClauseIfNecessary(List<DeviceSchema> devices, String prefix) {
return prefix;
}

private String addGroupByClause(String prefix, long start, long end, long granularity) {
return prefix + " group by ([" + start + "," + end + ")," + granularity + "ms) ";
}

// endregion

@Override
Expand Down

0 comments on commit 3bcf778

Please sign in to comment.