Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support align by device, group query order by time desc, limit for IoTDB and TDengine and InfluxDB #387

Merged
merged 19 commits into from
Jan 3, 2024
Merged
13 changes: 12 additions & 1 deletion configuration/conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@
# Q8 最近点查询 select time, v1... where device = ? and time = max(time)
# Q9 倒序范围查询(只限制起止时间)select v1... from data where time > ? and time < ? and device in ? order by time desc
# Q10 倒序带值过滤的范围查询 select v1... from data where time > ? and time < ? and v1 > ? and device in ? order by time desc
# OPERATION_PROPORTION=1:0:0:0:0:0:0:0:0:0:0
# Q11 分组聚合查询,倒序;目前仅支持iotdb、tdengine-3.0、influxdb v1
# OPERATION_PROPORTION=1:0:0:0:0:0:0:0:0:0:0:0
liyuheng55555 marked this conversation as resolved.
Show resolved Hide resolved

# 最长等待写时间,单位毫秒,即如果整个写操作在指定时间内没有返回,则终止此操作
# WRITE_OPERATION_TIMEOUT_MS=120000
Expand Down Expand Up @@ -386,6 +387,16 @@
# 查询语句的随机数种子
# QUERY_SEED=151658

# 行数限制, -1表示不使用limit语句
# 目前仅支持iotdb、influxdb v1、tdengine-3
# RESULT_ROW_LIMIT=-1

# 是否align by device
# 目前仅支持iotdb、influxdb v1、tdengine-3
# iotdb支持OPERATION_PROPORTION=1:1:1:0:0:0:0:0:0:1:0:0
# tdengine支持1:1:1:1:1:1:1:1:1:1:1:1
# ALIGN_BY_DEVICE=false

################ Workload:相关参数 ######################
# workload的缓冲区的大小
# WORKLOAD_BUFFER_SIZE=100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ protected void doTest() {
case VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC:
dbWrapper.valueRangeQueryOrderByDesc(queryWorkLoad.getValueRangeQuery());
break;
case GROUP_BY_QUERY_ORDER_BY_TIME_DESC:
dbWrapper.groupByQueryOrderByDesc(queryWorkLoad.getGroupByQuery());
break;
default:
LOGGER.error("Unsupported operation sensorType {}", operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum Operation {
LATEST_POINT_QUERY("LATEST_POINT"),
RANGE_QUERY_ORDER_BY_TIME_DESC("RANGE_QUERY_DESC"),
VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC("VALUE_RANGE_QUERY_DESC"),
GROUP_BY_QUERY_ORDER_BY_TIME_DESC("GROUP_BY_DESC"),
VERIFICATION_QUERY("VERIFICATION_QUERY"),
DEVICE_QUERY("DEVICE_QUERY");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import cn.edu.tsinghua.iot.benchmark.conf.Config;
import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class OperationController {

private static final Logger LOGGER = null;
private static final Logger LOGGER = LoggerFactory.getLogger(OperationController.class);
private static Config config = ConfigDescriptor.getInstance().getConfig();
private List<Double> proportion = new ArrayList<>();
private List<Operation> operations = Operation.getNormalOperation();
Expand Down Expand Up @@ -108,6 +109,8 @@ public Operation getNextOperationType() {
return Operation.RANGE_QUERY_ORDER_BY_TIME_DESC;
case 11:
return Operation.VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC;
case 12:
return Operation.GROUP_BY_QUERY_ORDER_BY_TIME_DESC;
default:
LOGGER.error("Unsupported operation {}, use default operation: INGESTION.", i);
return Operation.INGESTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ public class Config {
* value filtering in reverse order, Eg. select v1... from data where time > ? and time < ? and v1
* > ? and device in ? order by time desc
*/
private String OPERATION_PROPORTION = "1:0:0:0:0:0:0:0:0:0:0";
private String OPERATION_PROPORTION = "1:0:0:0:0:0:0:0:0:0:0:0";

private final int OPERATION_PROPORTION_LEN = 12;
/** The number of sensors involved in each query */
private int QUERY_SENSOR_NUM = 1;
/** The number of devices involved in each query */
Expand All @@ -338,6 +340,9 @@ public class Config {
/** Query random seed */
private long QUERY_SEED = 151658L;

private long RESULT_ROW_LIMIT = -1;
private boolean ALIGN_BY_DEVICE = false;

// workload 相关部分
/** The size of workload buffer size */
private int WORKLOAD_BUFFER_SIZE = 100;
Expand Down Expand Up @@ -1198,6 +1203,10 @@ public void setOPERATION_PROPORTION(String OPERATION_PROPORTION) {
this.OPERATION_PROPORTION = OPERATION_PROPORTION;
}

public int getOPERATION_PROPORTION_LEN() {
return this.OPERATION_PROPORTION_LEN;
}

public int getQUERY_SENSOR_NUM() {
return QUERY_SENSOR_NUM;
}
Expand Down Expand Up @@ -1254,6 +1263,22 @@ public void setQUERY_SEED(long QUERY_SEED) {
this.QUERY_SEED = QUERY_SEED;
}

public long getRESULT_ROW_LIMIT() {
return RESULT_ROW_LIMIT;
}

public void setRESULT_ROW_LIMIT(long RESULT_ROW_LIMIT) {
this.RESULT_ROW_LIMIT = RESULT_ROW_LIMIT;
}

public boolean isALIGN_BY_DEVICE() {
return ALIGN_BY_DEVICE;
}

public void setALIGN_BY_DEVICE(boolean ALIGN_BY_DEVICE) {
this.ALIGN_BY_DEVICE = ALIGN_BY_DEVICE;
}

public int getWORKLOAD_BUFFER_SIZE() {
return WORKLOAD_BUFFER_SIZE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.*;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -436,6 +437,12 @@ private void loadProps() {
properties.getProperty("GROUP_BY_TIME_UNIT", config.getGROUP_BY_TIME_UNIT() + "")));
config.setQUERY_SEED(
Long.parseLong(properties.getProperty("QUERY_SEED", config.getQUERY_SEED() + "")));
config.setRESULT_ROW_LIMIT(
Long.parseLong(
properties.getProperty("RESULT_ROW_LIMIT", config.getRESULT_ROW_LIMIT() + "")));
config.setALIGN_BY_DEVICE(
Boolean.parseBoolean(
properties.getProperty("ALIGN_BY_DEVICE", config.isALIGN_BY_DEVICE() + "")));

config.setWORKLOAD_BUFFER_SIZE(
Integer.parseInt(
Expand Down Expand Up @@ -588,10 +595,32 @@ private boolean checkConfig() {
}
result &= checkDeviceNumPerWrite();
result &= checkTag();
if (!commonlyUseDB()) {
if (config.isALIGN_BY_DEVICE()) {
result = false;
LOGGER.error("ALIGN_BY_DEVICE is not supported for this database");
liyuheng55555 marked this conversation as resolved.
Show resolved Hide resolved
}
if (config.getRESULT_ROW_LIMIT() >= 0) {
result = false;
LOGGER.error("RESULT_ROW_LIMIT is not supported for this database");
}
}
return result;
}

private boolean commonlyUseDB() {
DBType dbType = config.getDbConfig().getDB_SWITCH().getType();
DBVersion dbVersion = config.getDbConfig().getDB_SWITCH().getVersion();
return Objects.equals(dbVersion, DBVersion.IOTDB_110)
|| Objects.equals(dbVersion, DBVersion.TDengine_3)
|| (Objects.equals(dbType, DBType.InfluxDB) && dbVersion == null);
}

private boolean checkOperationProportion() {
while (config.getOPERATION_PROPORTION().split(":").length
!= config.getOPERATION_PROPORTION_LEN()) {
config.setOPERATION_PROPORTION(config.getOPERATION_PROPORTION() + ":0");
}
String[] op = config.getOPERATION_PROPORTION().split(":");
int minOps = 0;
for (String s : op) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,31 @@ public Status groupByQuery(GroupByQuery groupByQuery) {
return status;
}

@Override
public Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) {
Status status = null;
Operation operation = Operation.GROUP_BY_QUERY_ORDER_BY_TIME_DESC;
String device = "No Device";
if (groupByQuery.getDeviceSchema().size() > 0) {
device = groupByQuery.getDeviceSchema().get(0).getDevice();
}
try {
List<Status> statuses = new ArrayList<>();
for (IDatabase database : databases) {
long start = System.nanoTime();
status = database.groupByQueryOrderByDesc(groupByQuery);
long end = System.nanoTime();
status.setTimeCost(end - start);
handleQueryOperation(status, operation, device);
statuses.add(status);
}
doComparisonByRecord(groupByQuery, operation, statuses);
} catch (Exception e) {
handleUnexpectedQueryException(operation, e, device);
}
return status;
}

@Override
public Status latestPointQuery(LatestPointQuery latestPointQuery) {
Status status = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ default Status insertOneBatchWithCheck(IBatch batch) throws Exception {
/** similar to rangeQuery, but order by time desc. */
Status valueRangeQueryOrderByDesc(ValueRangeQuery valueRangeQuery);

default Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) {
throw new UnsupportedOperationException("This operation is not supported for this database");
}

/** Using in verification */
default Status verificationQuery(VerificationQuery verificationQuery) {
WorkloadException workloadException = new WorkloadException("Not Supported Verification Query");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package cn.edu.tsinghua.iot.benchmark.workload;

import cn.edu.tsinghua.iot.benchmark.entity.Sensor;

import java.util.List;

public class ValueRangeFilter {
private final Double minValue;
private final List<Sensor> sensors;
private final Double maxValue;

public ValueRangeFilter(double minValue, List<Sensor> sensors, double maxValue) {
this.sensors = sensors;
this.minValue = minValue;
this.maxValue = maxValue;
}

public ValueRangeFilter(double minValue, List<Sensor> sensors) {
this.minValue = minValue;
this.sensors = sensors;
this.maxValue = null;
}

public ValueRangeFilter(List<Sensor> sensors, double maxValue) {
this.minValue = null;
this.sensors = sensors;
this.maxValue = maxValue;
}

public List<Sensor> getSensors() {
return sensors;
}

public Double getMinValue() {
return minValue;
}

public Double getMaxValue() {
return maxValue;
}
}
Loading
Loading