Skip to content

Commit

Permalink
Merge pull request #375 from Cpaulyz/support_tag
Browse files Browse the repository at this point in the history
Support variable tag in benchmark
  • Loading branch information
liyuheng55555 authored Nov 14, 2023
2 parents 2bfce9c + 199eed0 commit 220d4ef
Show file tree
Hide file tree
Showing 19 changed files with 173 additions and 67 deletions.
14 changes: 10 additions & 4 deletions configuration/conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,17 @@
# 传感器名称前缀
# SENSOR_NAME_PREFIX=s_

# 标签名的前缀
# TAG_NAME_PREFIX=tag_
# 标签名数量
# TAG_NUMBER=0

# 定义设备标签值,所有设备相同,使用逗号分开,目前支持iotdb-0.12, iotdb-0.13, influxdb-2.x, timescaledb, tdengine
# DEVICE_TAGS=
# 标签名前缀
# TAG_KEY_PREFIX=tag_

# 标签值前缀
# TAG_VALUE_PREFIX=value_

# 每个标签值的取值种数,使用逗号分开,设置个数应与标签名数量相等。例如TAG_NUMBER=2,TAG_VALUE_CARDINALITY=50,10,说明第一个标签名可能有50种值,第二个标签名可能有10种值。d_0的中间路径将为value_0.value_0.d_0。该配置不影响总点数。
# TAG_VALUE_CARDINALITY=

############## 被测系统为IoTDB时扩展参数 ##################
# 是否使用thrift压缩,需要在iotdb的配置文件iotdb-common.properties中设置rpc_thrift_compression_enable=true
Expand Down
54 changes: 34 additions & 20 deletions core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,11 @@ public class Config {
/** name prefix of sensor */
private String SENSOR_NAME_PREFIX = "s_";
/** name prefix of tag */
private String TAG_NAME_PREFIX = "tag_";
/** The tags of device. Each device is same. */
private Map<String, String> DEVICE_TAGS = new LinkedHashMap<>();
private int TAG_NUMBER = 0;

private String TAG_KEY_PREFIX = "tag_";
private String TAG_VALUE_PREFIX = "value_";
private List<Integer> TAG_VALUE_CARDINALITY = new ArrayList<>();

// 设备、传感器、客户端:生成数据的规律
/** 线性 默认 9个 */
Expand Down Expand Up @@ -844,27 +846,36 @@ public void setCLIENT_NUMBER(int CLIENT_NUMBER) {
this.CLIENT_NUMBER = CLIENT_NUMBER;
}

public String getTAG_NAME_PREFIX() {
return TAG_NAME_PREFIX;
public int getTAG_NUMBER() {
return TAG_NUMBER;
}

public void setTAG_NAME_PREFIX(String TAG_NAME_PREFIX) {
this.TAG_NAME_PREFIX = TAG_NAME_PREFIX;
public void setTAG_NUMBER(int TAG_NUMBER) {
this.TAG_NUMBER = TAG_NUMBER;
}

public Map<String, String> getDEVICE_TAGS() {
return new LinkedHashMap<>(DEVICE_TAGS);
public String getTAG_KEY_PREFIX() {
return TAG_KEY_PREFIX;
}

public void setDEVICE_TAGS(String DEVICE_TAGS) {
if (DEVICE_TAGS.length() != 0) {
this.DEVICE_TAGS = new LinkedHashMap<>();
int tagIndex = 0;
for (String tagValue : DEVICE_TAGS.split(",")) {
this.DEVICE_TAGS.put(TAG_NAME_PREFIX + tagIndex, tagValue);
tagIndex++;
}
}
public void setTAG_KEY_PREFIX(String TAG_KEY_PREFIX) {
this.TAG_KEY_PREFIX = TAG_KEY_PREFIX;
}

public String getTAG_VALUE_PREFIX() {
return TAG_VALUE_PREFIX;
}

public void setTAG_VALUE_PREFIX(String TAG_VALUE_PREFIX) {
this.TAG_VALUE_PREFIX = TAG_VALUE_PREFIX;
}

public List<Integer> getTAG_VALUE_CARDINALITY() {
return TAG_VALUE_CARDINALITY;
}

public void setTAG_VALUE_CARDINALITY(List<Integer> TAG_VALUE_CARDINALITY) {
this.TAG_VALUE_CARDINALITY = TAG_VALUE_CARDINALITY;
}

public String getGROUP_NAME_PREFIX() {
Expand Down Expand Up @@ -1785,8 +1796,11 @@ public ConfigProperties getAllConfigProperties() {
configProperties.addProperty("Extern Param", "GROUP_NAME_PREFIX", this.GROUP_NAME_PREFIX);
configProperties.addProperty("Extern Param", "DEVICE_NAME_PREFIX", this.DEVICE_NAME_PREFIX);
configProperties.addProperty("Extern Param", "SENSOR_NAME_PREFIX", this.SENSOR_NAME_PREFIX);
configProperties.addProperty("Extern Param", "TAG_NAME_PREFIX", this.TAG_NAME_PREFIX);
configProperties.addProperty("Extern Param", "DEVICE_TAGS", this.DEVICE_TAGS);
configProperties.addProperty("Extern Param", "TAG_NUMBER", this.TAG_NUMBER);
configProperties.addProperty("Extern Param", "TAG_KEY_PREFIX", this.TAG_KEY_PREFIX);
configProperties.addProperty("Extern Param", "TAG_VALUE_PREFIX", this.TAG_VALUE_PREFIX);
configProperties.addProperty(
"Extern Param", "TAG_VALUE_CARDINALITY", this.TAG_VALUE_CARDINALITY);

configProperties.addProperty(
"Extern Param", "ENABLE_THRIFT_COMPRESSION", this.ENABLE_THRIFT_COMPRESSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;

import static cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBInsertMode.INSERT_USE_SESSION_RECORDS;

Expand Down Expand Up @@ -248,8 +249,25 @@ private void loadProps() {
properties.getProperty("DEVICE_NAME_PREFIX", config.getDEVICE_NAME_PREFIX()));
config.setSENSOR_NAME_PREFIX(
properties.getProperty("SENSOR_NAME_PREFIX", config.getSENSOR_NAME_PREFIX()));
config.setDEVICE_TAGS(properties.getProperty("DEVICE_TAGS", ""));

config.setTAG_NUMBER(
Integer.parseInt(properties.getProperty("TAG_NUMBER", config.getTAG_NUMBER() + "")));
config.setTAG_KEY_PREFIX(
properties.getProperty("TAG_KEY_PREFIX", config.getTAG_KEY_PREFIX()));
config.setTAG_VALUE_PREFIX(
properties.getProperty("TAG_VALUE_PREFIX", config.getTAG_VALUE_PREFIX()));
config.setTAG_VALUE_CARDINALITY(
Arrays.stream(
properties
.getProperty(
"TAG_VALUE_CARDINALITY",
config.getTAG_VALUE_CARDINALITY().stream()
.map(Object::toString)
.collect(Collectors.joining(",")))
.split(","))
.filter(s -> !s.isEmpty())
.mapToInt(Integer::parseInt)
.boxed()
.collect(Collectors.toList()));
config.setBENCHMARK_CLUSTER(
Boolean.parseBoolean(
properties.getProperty("BENCHMARK_CLUSTER", config.isBENCHMARK_CLUSTER() + "")));
Expand Down Expand Up @@ -596,6 +614,15 @@ private boolean checkConfig() {
"The combination of \"DEVICE_NUM_PER_WRITE > 1\" and \"IS_SENSOR_TS_ALIGNMENT == false\" is not supported");
result = false;
}
// check tag
if (config.getTAG_NUMBER() != config.getTAG_VALUE_CARDINALITY().size()) {
LOGGER.error(
"TAG_NUMBER must be equal to TAG_VALUE_CARDINALITY's size. Currently, "
+ "TAG_NUMBER = {}, TAG_VALUE_CARDINALITY = {}.",
config.getTAG_NUMBER(),
config.getTAG_VALUE_CARDINALITY().size());
result = false;
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,32 @@
import cn.edu.tsinghua.iot.benchmark.conf.Constants;
import cn.edu.tsinghua.iot.benchmark.exception.WorkloadException;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MetaUtil {

private static Config config = ConfigDescriptor.getInstance().getConfig();
private static final String TAG_KEY_PREFIX = config.getTAG_KEY_PREFIX();
private static final String TAG_VALUE_PREFIX = config.getTAG_VALUE_PREFIX();
private static final int TAG_NUMBER = config.getTAG_NUMBER();
private static final List<Integer> TAG_VALUE_CARDINALITY = config.getTAG_VALUE_CARDINALITY();
private static final List<Long> LEVEL_CARDINALITY =
Arrays.asList(new Long[TAG_VALUE_CARDINALITY.size() + 1]);

static {
int idx = TAG_VALUE_CARDINALITY.size();
long sum = 1;
LEVEL_CARDINALITY.set(idx--, 1L);
for (; idx >= 0; idx--) {
sum *= TAG_VALUE_CARDINALITY.get(idx);
LEVEL_CARDINALITY.set(idx, sum);
}
}

private static List<List<String>> CLIENT_FILES;

/** Used under cluster mode of benchmark */
Expand Down Expand Up @@ -72,4 +93,36 @@ public static List<List<String>> getClientFiles() {
public static void setClientFiles(List<List<String>> clientFiles) {
CLIENT_FILES = clientFiles;
}

/**
* Get tags pair by deviceName. Tags are allocated based on hashCode to ensure an even number of
* devices under each tag as much as possible.
*
* @param deviceName deviceName
* @return tags pair
*/
public static Map<String, String> getTags(String deviceName) {
if (TAG_NUMBER == 0) {
return Collections.emptyMap();
}
long id = Math.abs(deviceName.hashCode());
Map<String, String> res = new HashMap<>();
for (int i = 0; i < LEVEL_CARDINALITY.size() - 1; i++) {
id = id % LEVEL_CARDINALITY.get(i);
long tagValueId = id / LEVEL_CARDINALITY.get(i + 1);
res.put(TAG_KEY_PREFIX + i, TAG_VALUE_PREFIX + tagValueId);
}
return res;
}

/**
* Get tags pair by deviceId. Tags are allocated based on hashCode to ensure an even number of
* devices under each tag as much as possible.
*
* @param deviceId deviceId
* @return tags pair
*/
public static Map<String, String> getTags(int deviceId) {
return getTags(getDeviceName(deviceId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public boolean createMetaDataSchema() {
(clientId < leftClientDeviceNum) ? eachClientDeviceNum + 1 : eachClientDeviceNum;
List<DeviceSchema> deviceSchemaList = new ArrayList<>();
for (int d = 0; d < deviceNumber; d++) {
DeviceSchema deviceSchema = new DeviceSchema(deviceId, sensors, config.getDEVICE_TAGS());
DeviceSchema deviceSchema = new DeviceSchema(deviceId, sensors, MetaUtil.getTags(deviceId));
NAME_DATA_SCHEMA.put(deviceSchema.getDevice(), deviceSchema);
GROUPS.add(deviceSchema.getGroup());
deviceSchemaList.add(deviceSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected boolean createMetaDataSchema() {
MetaUtil.getGroupIdFromDeviceName(deviceName),
deviceName,
sensors,
config.getDEVICE_TAGS());
MetaUtil.getTags(deviceName));
NAME_DATA_SCHEMA.put(deviceName, deviceSchema);
GROUPS.add(deviceSchema.getGroup());
deviceSchemaList.add(deviceSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public IBatch nextBatch() {
MetaUtil.getGroupIdFromDeviceName(deviceName),
deviceName,
sensors,
config.getDEVICE_TAGS());
MetaUtil.getTags(deviceName));
firstLine = false;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public CopyDataReader(List<String> files) {
MetaUtil.getGroupIdFromDeviceName(deviceName),
deviceName,
sensors,
config.getDEVICE_TAGS());
MetaUtil.getTags(deviceName));
firstLine = false;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.edu.tsinghua.iot.benchmark.entity.Sensor;
import cn.edu.tsinghua.iot.benchmark.entity.enums.SensorType;
import cn.edu.tsinghua.iot.benchmark.exception.WorkloadException;
import cn.edu.tsinghua.iot.benchmark.schema.MetaUtil;
import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema;
import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBSwitch;
import cn.edu.tsinghua.iot.benchmark.utils.TimeUtils;
Expand Down Expand Up @@ -165,7 +166,7 @@ public DeviceQuery getDeviceQuery() {
return null;
}
DeviceSchema deviceSchema =
new DeviceSchema(deviceId, config.getSENSORS(), config.getDEVICE_TAGS());
new DeviceSchema(deviceId, config.getSENSORS(), MetaUtil.getTags(deviceId));
return new DeviceQuery(deviceSchema);
}

Expand Down Expand Up @@ -234,7 +235,8 @@ private List<DeviceSchema> getQueryDeviceSchemaList(boolean typeAllow) throws Wo
if (querySensors.size() != config.getQUERY_SENSOR_NUM()) {
continue;
}
DeviceSchema deviceSchema = new DeviceSchema(deviceId, querySensors, config.getDEVICE_TAGS());
DeviceSchema deviceSchema =
new DeviceSchema(deviceId, querySensors, MetaUtil.getTags(deviceId));
queryDevices.add(deviceSchema);
}
if (queryDevices.size() != config.getQUERY_DEVICE_NUM()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public IBatch getOneBatch() throws WorkloadException {
new DeviceSchema(
MetaUtil.getDeviceId((int) curLoop % config.getDEVICE_NUMBER()),
sensors,
config.getDEVICE_TAGS());
MetaUtil.getTags((int) curLoop % config.getDEVICE_NUMBER()));
// create data of batch
List<Record> records = new ArrayList<>();
for (long batchOffset = 0; batchOffset < recordsNumPerDevice; batchOffset++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.edu.tsinghua.iot.benchmark.entity.Record;
import cn.edu.tsinghua.iot.benchmark.entity.Sensor;
import cn.edu.tsinghua.iot.benchmark.exception.WorkloadException;
import cn.edu.tsinghua.iot.benchmark.schema.MetaUtil;
import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema;

import java.util.*;
Expand All @@ -47,7 +48,9 @@ public SyntheticDataWorkLoad(List<DeviceSchema> deviceSchemas) {
for (Sensor sensor : schema.getSensors()) {
DeviceSchema deviceSchema =
new DeviceSchema(
schema.getDeviceId(), Collections.singletonList(sensor), config.getDEVICE_TAGS());
schema.getDeviceId(),
Collections.singletonList(sensor),
MetaUtil.getTags(schema.getDeviceId()));
maxTimestampIndexMap.put(deviceSchema, 0L);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ private String addGroupByClause(String prefix, long start, long end, long granul
protected String getDevicePath(DeviceSchema deviceSchema) {
StringBuilder name = new StringBuilder(ROOT_SERIES_NAME);
name.append(".").append(deviceSchema.getGroup());
for (Map.Entry<String, String> pair : config.getDEVICE_TAGS().entrySet()) {
for (Map.Entry<String, String> pair : deviceSchema.getTags().entrySet()) {
name.append(".").append(pair.getValue());
}
name.append(".").append(deviceSchema.getDevice());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ private String addGroupByClause(String prefix, long start, long end, long granul
protected String getDevicePath(DeviceSchema deviceSchema) {
StringBuilder name = new StringBuilder(ROOT_SERIES_NAME);
name.append(".").append(deviceSchema.getGroup());
for (Map.Entry<String, String> pair : config.getDEVICE_TAGS().entrySet()) {
for (Map.Entry<String, String> pair : deviceSchema.getTags().entrySet()) {
name.append(".").append(pair.getValue());
}
name.append(".").append(deviceSchema.getDevice());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ private String addGroupByClause(String prefix, long start, long end, long granul
protected String getDevicePath(DeviceSchema deviceSchema) {
StringBuilder name = new StringBuilder(ROOT_SERIES_NAME);
name.append(".").append(deviceSchema.getGroup());
for (Map.Entry<String, String> pair : config.getDEVICE_TAGS().entrySet()) {
for (Map.Entry<String, String> pair : deviceSchema.getTags().entrySet()) {
name.append(".").append(pair.getValue());
}
name.append(".").append(deviceSchema.getDevice());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ private String addGroupByClause(String prefix, long start, long end, long granul
protected String getDevicePath(DeviceSchema deviceSchema) {
StringBuilder name = new StringBuilder(ROOT_SERIES_NAME);
name.append(".").append(deviceSchema.getGroup());
for (Map.Entry<String, String> pair : config.getDEVICE_TAGS().entrySet()) {
for (Map.Entry<String, String> pair : deviceSchema.getTags().entrySet()) {
name.append(".").append(pair.getValue());
}
name.append(".").append(deviceSchema.getDevice());
Expand Down
Loading

0 comments on commit 220d4ef

Please sign in to comment.