Skip to content

Commit

Permalink
Implement automatic conversion of aggFun under table model. (#464)
Browse files Browse the repository at this point in the history
* Implement automatic conversion of aggFun under table model.

* Replace Boolean with boolean.

* Code Optimization

* Modify table model Q4 SQL.

* Reduce code duplication.

* Fix bm not exiting

* Add comments for easier understanding.

* spotless:apply
  • Loading branch information
YangYumings authored Nov 25, 2024
1 parent e547d8b commit d49fe00
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ private void loadProps() {
config.setQUERY_DEVICE_NUM(
Integer.parseInt(
properties.getProperty("QUERY_DEVICE_NUM", config.getQUERY_DEVICE_NUM() + "")));
config.setQUERY_AGGREGATE_FUN(
properties.getProperty("QUERY_AGGREGATE_FUN", config.getQUERY_AGGREGATE_FUN()));

loadAndConvertAggregateFunction(properties);

config.setQUERY_INTERVAL(
Long.parseLong(
Expand Down Expand Up @@ -553,6 +553,38 @@ private void loadProps() {
}
}

/** for table model */
private void loadAndConvertAggregateFunction(Properties properties) {
String aggFun = properties.getProperty("QUERY_AGGREGATE_FUN", config.getQUERY_AGGREGATE_FUN());
if (config.getIoTDB_DIALECT_MODE() == SQLDialect.TABLE) {
switch (aggFun) {
case Constants.MAX_TIME:
config.setQUERY_AGGREGATE_FUN(Constants.LAST_BY);
break;
case Constants.MIN_TIME:
config.setQUERY_AGGREGATE_FUN(Constants.FIRST_BY);
break;
case Constants.MAX_VALUE:
config.setQUERY_AGGREGATE_FUN(Constants.MAX_AGG);
break;
case Constants.MIN_VALUE:
config.setQUERY_AGGREGATE_FUN(Constants.MIN_AGG);
break;
case Constants.FIRST_VALUE:
config.setQUERY_AGGREGATE_FUN(Constants.FIRST);
break;
case Constants.LAST_VALUE:
config.setQUERY_AGGREGATE_FUN(Constants.LAST);
break;
case Constants.TIME_DURATION:
config.setQUERY_AGGREGATE_FUN(Constants.TIME_DURATION);
break;
}
} else {
config.setQUERY_AGGREGATE_FUN(aggFun);
}
}

/** Check validation of config */
private boolean checkConfig() {
boolean result = true;
Expand Down Expand Up @@ -634,6 +666,18 @@ private boolean checkConfig() {
"The iotdb table model only supports INSERT_USE_SESSION_TABLET! Please modify DB_SWITCH in the configuration file.");
result = false;
}
// TODO Not supported TIME_DURATION、MAX_BY、MIN_BY. iotdb will report errors for these three
// types of aggFun.
if (config.getQUERY_AGGREGATE_FUN().equals(Constants.MAX_BY)
|| config.getQUERY_AGGREGATE_FUN().equals(Constants.MIN_BY)) {
LOGGER.error("MAX_BY or MIN_BY not yet supported !");
result = false;
}
if (config.getIoTDB_DIALECT_MODE() == SQLDialect.TABLE
&& config.getQUERY_AGGREGATE_FUN().equals(Constants.TIME_DURATION)) {
LOGGER.error("TIME_DURATION not yet supported !");
result = false;
}
result &= checkInsertDataTypeProportion();
result &= checkOperationProportion();
if (config.getSCHEMA_CLIENT_NUMBER() == 0 || config.getDATA_CLIENT_NUMBER() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,22 @@ public class Constants {
public static final String PI_ARCHIVE_CLASS = "cn.edu.tsinghua.iot.benchmark.piarchive.PIArchive";
public static final String IGINX_CLASS = "cn.edu.tsinghua.iot.benchmark.iginx.IginX";
public static final String SELF_CHECK_CLASS = "cn.edu.tsinghua.iot.benchmark.tsdb.self.SelfCheck";

public static final String MAX_TIME = "max_time";
public static final String MIN_TIME = "min_time";
public static final String TIME_DURATION = "time_duration";
public static final String MAX_VALUE = "max_value";
public static final String MIN_VALUE = "min_value";
public static final String FIRST_VALUE = "first_value";
public static final String LAST_VALUE = "last_value";

public static final String LAST_BY = "last_by";
public static final String FIRST_BY = "first_by";
public static final String MAX_AGG = "max";
public static final String MIN_AGG = "min";
public static final String LAST = "last";
public static final String FIRST = "first";
public static final String COUNT = "count";
public static final String MAX_BY = "max_by";
public static final String MIN_BY = "min_by";
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public Status aggRangeValueQuery(AggRangeValueQuery aggRangeValueQuery) {
*/
@Override
public Status groupByQuery(GroupByQuery groupByQuery) {
String sql = modelStrategy.getGroupByQuerySQL(groupByQuery);
String sql = modelStrategy.getGroupByQuerySQL(groupByQuery, false);
return executeQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY);
}

Expand Down Expand Up @@ -400,7 +400,7 @@ public Status rangeQueryOrderByDesc(RangeQuery rangeQuery) {
0,
builder);
// ORDER BY
builder.append(" ORDER BY time desc");
modelStrategy.addOrderByTimeDesc(builder);
return executeQueryAndGetStatus(builder.toString(), Operation.RANGE_QUERY_ORDER_BY_TIME_DESC);
}

Expand All @@ -425,7 +425,7 @@ public Status valueRangeQueryOrderByDesc(ValueRangeQuery valueRangeQuery) {
(int) valueRangeQuery.getValueThreshold(),
builder);
// ORDER BY
builder.append(" ORDER BY time desc");
modelStrategy.addOrderByTimeDesc(builder);
return executeQueryAndGetStatus(
builder.toString(), Operation.VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC);
}
Expand All @@ -439,7 +439,7 @@ public Status valueRangeQueryOrderByDesc(ValueRangeQuery valueRangeQuery) {
*/
@Override
public Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) {
String sql = modelStrategy.getGroupByQuerySQL(groupByQuery);
String sql = modelStrategy.getGroupByQuerySQL(groupByQuery, true);
return executeQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY_ORDER_BY_TIME_DESC);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ public abstract void registerSchema(

public abstract String getAggQuerySqlHead(List<DeviceSchema> devices, String aggFun);

public abstract String getGroupByQuerySQL(GroupByQuery groupByQuery);
public abstract String getGroupByQuerySQL(GroupByQuery groupByQuery, boolean addOrderBy);

public abstract String getLatestPointQuerySql(List<DeviceSchema> devices);

public abstract void addFromClause(List<DeviceSchema> devices, StringBuilder builder);

public abstract void addOrderByTimeDesc(StringBuilder builder);

public abstract void addPreciseQueryWhereClause(
String strTime, List<DeviceSchema> deviceSchemas, StringBuilder builder);

Expand Down Expand Up @@ -162,20 +164,35 @@ public abstract void sessionCleanupImpl(Session session)

public abstract Logger getLogger();

/**
* </> DESC
*
* <p>Table model The builder has already concatenated "SELECT device_id, date_bin(20000ms, time),
* ".
*
* <p>Tree model The builder has already concatenated "SELECT ".
*
* <p>Therefore, the first loop does not need ", "
*/
protected String getAggFunForGroupByQuery(List<Sensor> querySensors, String aggFunction) {
StringBuilder builder = new StringBuilder();
builder.append(aggFunction).append("(").append(querySensors.get(0).getName()).append(")");
for (int i = 1; i < querySensors.size(); i++) {
String timeArg = getTimeArg(aggFunction);
for (int i = 0; i < querySensors.size(); i++) {
if (i > 0) {
builder.append(", ");
}
builder
.append(", ")
.append(aggFunction)
.append("(")
.append(timeArg)
.append(querySensors.get(i).getName())
.append(")");
}
return builder.toString();
}

protected abstract String getTimeArg(String aggFunction);

protected String getTimeWhereClause(long start, long end) {
StringBuilder builder = new StringBuilder();
builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;

import cn.edu.tsinghua.iot.benchmark.conf.Constants;
import cn.edu.tsinghua.iot.benchmark.entity.Batch.IBatch;
import cn.edu.tsinghua.iot.benchmark.entity.Record;
import cn.edu.tsinghua.iot.benchmark.entity.Sensor;
Expand Down Expand Up @@ -167,16 +168,25 @@ public void addFromClause(List<DeviceSchema> devices, StringBuilder builder) {
.append(devices.get(0).getTable());
}

@Override
public void addOrderByTimeDesc(StringBuilder builder) {
builder.append(" ORDER BY device_id, time desc");
}

@Override
public String getAggQuerySqlHead(List<DeviceSchema> devices, String aggFun) {
StringBuilder builder = new StringBuilder();
builder.append("SELECT device_id");
List<Sensor> querySensors = devices.get(0).getSensors();
for (int i = 1; i < querySensors.size(); i++) {
String timeArg = getTimeArg(aggFun);
for (int i = 0; i < querySensors.size(); i++) {
// The builder has already concatenated "SELECT device_id", so each subsequent loop needs to
// concatenate one", ".
builder.append(", ");
builder
.append(", ")
.append(aggFun)
.append("(")
.append(timeArg)
.append(querySensors.get(i).getName())
.append(")");
}
Expand All @@ -192,7 +202,7 @@ public String getAggQuerySqlHead(List<DeviceSchema> devices, String aggFun) {
* <p>getAggForGroupByQuery
*/
@Override
public String getGroupByQuerySQL(GroupByQuery groupByQuery) {
public String getGroupByQuerySQL(GroupByQuery groupByQuery, boolean addOrderBy) {
StringBuilder builder = new StringBuilder();
// SELECT
builder
Expand All @@ -219,10 +229,12 @@ public String getGroupByQuerySQL(GroupByQuery groupByQuery) {
.append(groupByQuery.getGranularity())
.append("ms, time)");
// ORDER BY
builder
.append(" order by device_id, date_bin(")
.append(groupByQuery.getGranularity())
.append("ms, time) desc");
if (addOrderBy) {
builder
.append(" order by device_id, date_bin(")
.append(groupByQuery.getGranularity())
.append("ms, time) desc");
}
return builder.toString();
}

Expand Down Expand Up @@ -509,4 +521,17 @@ public void addWhereValueClauseIfNecessary(List<DeviceSchema> devices, StringBui
public Logger getLogger() {
return LOGGER;
}

@Override
protected String getTimeArg(String aggFunction) {
switch (aggFunction) {
case Constants.FIRST_BY:
case Constants.LAST_BY:
case Constants.MAX_BY:
case Constants.MIN_BY:
return "time, ";
default:
return "";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.template.MeasurementNode;

import cn.edu.tsinghua.iot.benchmark.conf.Constants;
import cn.edu.tsinghua.iot.benchmark.entity.Batch.IBatch;
import cn.edu.tsinghua.iot.benchmark.entity.Record;
import cn.edu.tsinghua.iot.benchmark.entity.Sensor;
Expand Down Expand Up @@ -221,12 +222,18 @@ public String getAggQuerySqlHead(List<DeviceSchema> devices, String aggFun) {
StringBuilder builder = new StringBuilder();
builder.append("SELECT ");
List<Sensor> querySensors = devices.get(0).getSensors();
builder.append(aggFun).append("(").append(querySensors.get(0).getName()).append(")");
for (int i = 1; i < querySensors.size(); i++) {
String timeArg = getTimeArg(aggFun);

for (int i = 0; i < querySensors.size(); i++) {
// The builder only concatenates "SELECT ". Except for the first loop, each subsequent loop
// must concatenate a ", ".
if (i > 0) {
builder.append(", ");
}
builder
.append(", ")
.append(aggFun)
.append("(")
.append(timeArg)
.append(querySensors.get(i).getName())
.append(")");
}
Expand All @@ -243,7 +250,12 @@ public void addFromClause(List<DeviceSchema> devices, StringBuilder builder) {
}

@Override
public String getGroupByQuerySQL(GroupByQuery groupByQuery) {
public void addOrderByTimeDesc(StringBuilder builder) {
builder.append(" ORDER BY time desc");
}

@Override
public String getGroupByQuerySQL(GroupByQuery groupByQuery, boolean addOrderBy) {
StringBuilder builder = new StringBuilder();
// SELECT
builder
Expand All @@ -260,7 +272,9 @@ public String getGroupByQuerySQL(GroupByQuery groupByQuery) {
groupByQuery.getEndTimestamp(),
groupByQuery.getGranularity());
// ORDER BY
builder.append(" ORDER BY time desc");
if (addOrderBy) {
builder.append(" ORDER BY time desc");
}
return builder.toString();
}

Expand Down Expand Up @@ -477,4 +491,15 @@ public void sessionCleanupImpl(Session session) {
public Logger getLogger() {
return LOGGER;
}

@Override
protected String getTimeArg(String aggFunction) {
switch (aggFunction) {
case Constants.MAX_BY:
case Constants.MIN_BY:
return "time, ";
default:
return "";
}
}
}

0 comments on commit d49fe00

Please sign in to comment.