Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support variable tag in benchmark #375

Merged
merged 8 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions configuration/conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,17 @@
# 传感器名称前缀
# SENSOR_NAME_PREFIX=s_

# 标签名的前缀
# TAG_NAME_PREFIX=tag_
# 标签名数量
# TAG_NUMBER=0

# 定义设备标签值,所有设备相同,使用逗号分开,目前支持iotdb-0.12, iotdb-0.13, influxdb-2.x, timescaledb, tdengine
# DEVICE_TAGS=
# 标签名前缀
# TAG_KEY_PREFIX=tag_

# 标签值前缀
# TAG_VALUE_PREFIX=value_

# 每个标签值的取值种数,使用逗号分开,设置个数应与标签名数量相等。例如TAG_NUMBER=2,TAG_VALUE_CARDINALITY=50,10,说明第一个标签名可能有50种值,第二个标签名可能有10种值。d_0的中间路径将为value_0.value_0.d_0。该配置不影响总点数。
# TAG_VALUE_CARDINALITY=

############## 被测系统为IoTDB时扩展参数 ##################
# 是否使用thrift压缩,需要在iotdb的配置文件iotdb-common.properties中设置rpc_thrift_compression_enable=true
Expand Down
54 changes: 34 additions & 20 deletions core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,11 @@ public class Config {
/** name prefix of sensor */
private String SENSOR_NAME_PREFIX = "s_";
/** name prefix of tag */
private String TAG_NAME_PREFIX = "tag_";
/** The tags of device. Each device is same. */
private Map<String, String> DEVICE_TAGS = new LinkedHashMap<>();
private int TAG_NUMBER = 0;

private String TAG_KEY_PREFIX = "tag_";
private String TAG_VALUE_PREFIX = "value_";
private List<Integer> TAG_VALUE_CARDINALITY = new ArrayList<>();

// 设备、传感器、客户端:生成数据的规律
/** 线性 默认 9个 */
Expand Down Expand Up @@ -844,27 +846,36 @@ public void setCLIENT_NUMBER(int CLIENT_NUMBER) {
this.CLIENT_NUMBER = CLIENT_NUMBER;
}

public String getTAG_NAME_PREFIX() {
return TAG_NAME_PREFIX;
public int getTAG_NUMBER() {
return TAG_NUMBER;
}

public void setTAG_NAME_PREFIX(String TAG_NAME_PREFIX) {
this.TAG_NAME_PREFIX = TAG_NAME_PREFIX;
public void setTAG_NUMBER(int TAG_NUMBER) {
this.TAG_NUMBER = TAG_NUMBER;
}

public Map<String, String> getDEVICE_TAGS() {
return new LinkedHashMap<>(DEVICE_TAGS);
public String getTAG_KEY_PREFIX() {
return TAG_KEY_PREFIX;
}

public void setDEVICE_TAGS(String DEVICE_TAGS) {
if (DEVICE_TAGS.length() != 0) {
this.DEVICE_TAGS = new LinkedHashMap<>();
int tagIndex = 0;
for (String tagValue : DEVICE_TAGS.split(",")) {
this.DEVICE_TAGS.put(TAG_NAME_PREFIX + tagIndex, tagValue);
tagIndex++;
}
}
public void setTAG_KEY_PREFIX(String TAG_KEY_PREFIX) {
this.TAG_KEY_PREFIX = TAG_KEY_PREFIX;
}

public String getTAG_VALUE_PREFIX() {
return TAG_VALUE_PREFIX;
}

public void setTAG_VALUE_PREFIX(String TAG_VALUE_PREFIX) {
this.TAG_VALUE_PREFIX = TAG_VALUE_PREFIX;
}

public List<Integer> getTAG_VALUE_CARDINALITY() {
return TAG_VALUE_CARDINALITY;
}

public void setTAG_VALUE_CARDINALITY(List<Integer> TAG_VALUE_CARDINALITY) {
this.TAG_VALUE_CARDINALITY = TAG_VALUE_CARDINALITY;
}

public String getGROUP_NAME_PREFIX() {
Expand Down Expand Up @@ -1785,8 +1796,11 @@ public ConfigProperties getAllConfigProperties() {
configProperties.addProperty("Extern Param", "GROUP_NAME_PREFIX", this.GROUP_NAME_PREFIX);
configProperties.addProperty("Extern Param", "DEVICE_NAME_PREFIX", this.DEVICE_NAME_PREFIX);
configProperties.addProperty("Extern Param", "SENSOR_NAME_PREFIX", this.SENSOR_NAME_PREFIX);
configProperties.addProperty("Extern Param", "TAG_NAME_PREFIX", this.TAG_NAME_PREFIX);
configProperties.addProperty("Extern Param", "DEVICE_TAGS", this.DEVICE_TAGS);
configProperties.addProperty("Extern Param", "TAG_NUMBER", this.TAG_NUMBER);
configProperties.addProperty("Extern Param", "TAG_KEY_PREFIX", this.TAG_KEY_PREFIX);
configProperties.addProperty("Extern Param", "TAG_VALUE_PREFIX", this.TAG_VALUE_PREFIX);
configProperties.addProperty(
"Extern Param", "TAG_VALUE_CARDINALITY", this.TAG_VALUE_CARDINALITY);

configProperties.addProperty(
"Extern Param", "ENABLE_THRIFT_COMPRESSION", this.ENABLE_THRIFT_COMPRESSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;

import static cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBInsertMode.INSERT_USE_SESSION_RECORDS;

Expand Down Expand Up @@ -248,8 +249,25 @@ private void loadProps() {
properties.getProperty("DEVICE_NAME_PREFIX", config.getDEVICE_NAME_PREFIX()));
config.setSENSOR_NAME_PREFIX(
properties.getProperty("SENSOR_NAME_PREFIX", config.getSENSOR_NAME_PREFIX()));
config.setDEVICE_TAGS(properties.getProperty("DEVICE_TAGS", ""));

config.setTAG_NUMBER(
Integer.parseInt(properties.getProperty("TAG_NUMBER", config.getTAG_NUMBER() + "")));
config.setTAG_KEY_PREFIX(
properties.getProperty("TAG_KEY_PREFIX", config.getTAG_KEY_PREFIX()));
config.setTAG_VALUE_PREFIX(
properties.getProperty("TAG_VALUE_PREFIX", config.getTAG_VALUE_PREFIX()));
config.setTAG_VALUE_CARDINALITY(
Arrays.stream(
properties
.getProperty(
"TAG_VALUE_CARDINALITY",
config.getTAG_VALUE_CARDINALITY().stream()
.map(Object::toString)
.collect(Collectors.joining(",")))
.split(","))
.filter(s -> !s.isEmpty())
.mapToInt(Integer::parseInt)
.boxed()
.collect(Collectors.toList()));
config.setBENCHMARK_CLUSTER(
Boolean.parseBoolean(
properties.getProperty("BENCHMARK_CLUSTER", config.isBENCHMARK_CLUSTER() + "")));
Expand Down Expand Up @@ -596,6 +614,15 @@ private boolean checkConfig() {
"The combination of \"DEVICE_NUM_PER_WRITE > 1\" and \"IS_SENSOR_TS_ALIGNMENT == false\" is not supported");
result = false;
}
// check tag
if (config.getTAG_NUMBER() != config.getTAG_VALUE_CARDINALITY().size()) {
LOGGER.error(
"TAG_NUMBER must be equal to TAG_VALUE_CARDINALITY's size. Currently, "
+ "TAG_NUMBER = {}, TAG_VALUE_CARDINALITY = {}.",
config.getTAG_NUMBER(),
config.getTAG_VALUE_CARDINALITY().size());
result = false;
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,32 @@
import cn.edu.tsinghua.iot.benchmark.conf.Constants;
import cn.edu.tsinghua.iot.benchmark.exception.WorkloadException;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MetaUtil {

private static Config config = ConfigDescriptor.getInstance().getConfig();
private static final String TAG_KEY_PREFIX = config.getTAG_KEY_PREFIX();
private static final String TAG_VALUE_PREFIX = config.getTAG_VALUE_PREFIX();
private static final int TAG_NUMBER = config.getTAG_NUMBER();
private static final List<Integer> TAG_VALUE_CARDINALITY = config.getTAG_VALUE_CARDINALITY();
private static final List<Long> LEVEL_CARDINALITY =
Arrays.asList(new Long[TAG_VALUE_CARDINALITY.size() + 1]);

static {
int idx = TAG_VALUE_CARDINALITY.size();
long sum = 1;
LEVEL_CARDINALITY.set(idx--, 1L);
for (; idx >= 0; idx--) {
sum *= TAG_VALUE_CARDINALITY.get(idx);
LEVEL_CARDINALITY.set(idx, sum);
}
}

private static List<List<String>> CLIENT_FILES;

/** Used under cluster mode of benchmark */
Expand Down Expand Up @@ -72,4 +93,36 @@ public static List<List<String>> getClientFiles() {
public static void setClientFiles(List<List<String>> clientFiles) {
CLIENT_FILES = clientFiles;
}

/**
* Get tags pair by deviceName. Tags are allocated based on hashCode to ensure an even number of
* devices under each tag as much as possible.
*
* @param deviceName deviceName
* @return tags pair
*/
public static Map<String, String> getTags(String deviceName) {
if (TAG_NUMBER == 0) {
return Collections.emptyMap();
}
long id = Math.abs(deviceName.hashCode());
Map<String, String> res = new HashMap<>();
for (int i = 0; i < LEVEL_CARDINALITY.size() - 1; i++) {
id = id % LEVEL_CARDINALITY.get(i);
long tagValueId = id / LEVEL_CARDINALITY.get(i + 1);
res.put(TAG_KEY_PREFIX + i, TAG_VALUE_PREFIX + tagValueId);
}
return res;
}

/**
* Get tags pair by deviceId. Tags are allocated based on hashCode to ensure an even number of
* devices under each tag as much as possible.
*
* @param deviceId deviceId
* @return tags pair
*/
public static Map<String, String> getTags(int deviceId) {
return getTags(getDeviceName(deviceId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public boolean createMetaDataSchema() {
(clientId < leftClientDeviceNum) ? eachClientDeviceNum + 1 : eachClientDeviceNum;
List<DeviceSchema> deviceSchemaList = new ArrayList<>();
for (int d = 0; d < deviceNumber; d++) {
DeviceSchema deviceSchema = new DeviceSchema(deviceId, sensors, config.getDEVICE_TAGS());
DeviceSchema deviceSchema = new DeviceSchema(deviceId, sensors, MetaUtil.getTags(deviceId));
NAME_DATA_SCHEMA.put(deviceSchema.getDevice(), deviceSchema);
GROUPS.add(deviceSchema.getGroup());
deviceSchemaList.add(deviceSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected boolean createMetaDataSchema() {
MetaUtil.getGroupIdFromDeviceName(deviceName),
deviceName,
sensors,
config.getDEVICE_TAGS());
MetaUtil.getTags(deviceName));
NAME_DATA_SCHEMA.put(deviceName, deviceSchema);
GROUPS.add(deviceSchema.getGroup());
deviceSchemaList.add(deviceSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public IBatch nextBatch() {
MetaUtil.getGroupIdFromDeviceName(deviceName),
deviceName,
sensors,
config.getDEVICE_TAGS());
MetaUtil.getTags(deviceName));
firstLine = false;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public CopyDataReader(List<String> files) {
MetaUtil.getGroupIdFromDeviceName(deviceName),
deviceName,
sensors,
config.getDEVICE_TAGS());
MetaUtil.getTags(deviceName));
firstLine = false;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.edu.tsinghua.iot.benchmark.entity.Sensor;
import cn.edu.tsinghua.iot.benchmark.entity.enums.SensorType;
import cn.edu.tsinghua.iot.benchmark.exception.WorkloadException;
import cn.edu.tsinghua.iot.benchmark.schema.MetaUtil;
import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema;
import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBSwitch;
import cn.edu.tsinghua.iot.benchmark.utils.TimeUtils;
Expand Down Expand Up @@ -165,7 +166,7 @@ public DeviceQuery getDeviceQuery() {
return null;
}
DeviceSchema deviceSchema =
new DeviceSchema(deviceId, config.getSENSORS(), config.getDEVICE_TAGS());
new DeviceSchema(deviceId, config.getSENSORS(), MetaUtil.getTags(deviceId));
return new DeviceQuery(deviceSchema);
}

Expand Down Expand Up @@ -234,7 +235,8 @@ private List<DeviceSchema> getQueryDeviceSchemaList(boolean typeAllow) throws Wo
if (querySensors.size() != config.getQUERY_SENSOR_NUM()) {
continue;
}
DeviceSchema deviceSchema = new DeviceSchema(deviceId, querySensors, config.getDEVICE_TAGS());
DeviceSchema deviceSchema =
new DeviceSchema(deviceId, querySensors, MetaUtil.getTags(deviceId));
queryDevices.add(deviceSchema);
}
if (queryDevices.size() != config.getQUERY_DEVICE_NUM()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public IBatch getOneBatch() throws WorkloadException {
new DeviceSchema(
MetaUtil.getDeviceId((int) curLoop % config.getDEVICE_NUMBER()),
sensors,
config.getDEVICE_TAGS());
MetaUtil.getTags((int) curLoop % config.getDEVICE_NUMBER()));
// create data of batch
List<Record> records = new ArrayList<>();
for (long batchOffset = 0; batchOffset < recordsNumPerDevice; batchOffset++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.edu.tsinghua.iot.benchmark.entity.Record;
import cn.edu.tsinghua.iot.benchmark.entity.Sensor;
import cn.edu.tsinghua.iot.benchmark.exception.WorkloadException;
import cn.edu.tsinghua.iot.benchmark.schema.MetaUtil;
import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema;

import java.util.*;
Expand All @@ -47,7 +48,9 @@ public SyntheticDataWorkLoad(List<DeviceSchema> deviceSchemas) {
for (Sensor sensor : schema.getSensors()) {
DeviceSchema deviceSchema =
new DeviceSchema(
schema.getDeviceId(), Collections.singletonList(sensor), config.getDEVICE_TAGS());
schema.getDeviceId(),
Collections.singletonList(sensor),
MetaUtil.getTags(schema.getDeviceId()));
maxTimestampIndexMap.put(deviceSchema, 0L);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ private String addGroupByClause(String prefix, long start, long end, long granul
protected String getDevicePath(DeviceSchema deviceSchema) {
StringBuilder name = new StringBuilder(ROOT_SERIES_NAME);
name.append(".").append(deviceSchema.getGroup());
for (Map.Entry<String, String> pair : config.getDEVICE_TAGS().entrySet()) {
for (Map.Entry<String, String> pair : deviceSchema.getTags().entrySet()) {
name.append(".").append(pair.getValue());
}
name.append(".").append(deviceSchema.getDevice());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ private String addGroupByClause(String prefix, long start, long end, long granul
protected String getDevicePath(DeviceSchema deviceSchema) {
StringBuilder name = new StringBuilder(ROOT_SERIES_NAME);
name.append(".").append(deviceSchema.getGroup());
for (Map.Entry<String, String> pair : config.getDEVICE_TAGS().entrySet()) {
for (Map.Entry<String, String> pair : deviceSchema.getTags().entrySet()) {
name.append(".").append(pair.getValue());
}
name.append(".").append(deviceSchema.getDevice());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ private String addGroupByClause(String prefix, long start, long end, long granul
protected String getDevicePath(DeviceSchema deviceSchema) {
StringBuilder name = new StringBuilder(ROOT_SERIES_NAME);
name.append(".").append(deviceSchema.getGroup());
for (Map.Entry<String, String> pair : config.getDEVICE_TAGS().entrySet()) {
for (Map.Entry<String, String> pair : deviceSchema.getTags().entrySet()) {
name.append(".").append(pair.getValue());
}
name.append(".").append(deviceSchema.getDevice());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ private String addGroupByClause(String prefix, long start, long end, long granul
protected String getDevicePath(DeviceSchema deviceSchema) {
StringBuilder name = new StringBuilder(ROOT_SERIES_NAME);
name.append(".").append(deviceSchema.getGroup());
for (Map.Entry<String, String> pair : config.getDEVICE_TAGS().entrySet()) {
for (Map.Entry<String, String> pair : deviceSchema.getTags().entrySet()) {
name.append(".").append(pair.getValue());
}
name.append(".").append(deviceSchema.getDevice());
Expand Down
Loading
Loading