Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEVICE_NUM_PER_WRITE will no longer influence writing points number #380

Merged
merged 4 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion configuration/conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@
# 每个Batch写入数据总点数=DEVICE_NUM_PER_WRITE * SENSOR_NUMBER * BATCH_SIZE_PER_WRITE
# BATCH_SIZE_PER_WRITE=100

# 每批写入设备数,必须能整除批写入数据行数
# 每批写入设备数,必须能整除每个client被分配到的设备数
# DEVICE_NUM_PER_WRITE=1

# 是否在写入数据前,创建SCHEMA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ public abstract class DataClient implements Runnable {
/** Tested DataBase */
protected DBWrapper dbWrapper = null;
/** Related Schema */
protected final List<DeviceSchema> deviceSchemas;
/** Related Schema Size */
protected final int deviceSchemasSize;
protected final List<DeviceSchema> clientDeviceSchemas;
/** Measurement */
protected Measurement measurement;
/** Total number of loop */
Expand All @@ -86,12 +84,12 @@ public DataClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier)
this.dataWorkLoad = DataWorkLoad.getInstance(id);
this.queryWorkLoad = QueryWorkLoad.getInstance(id);
this.clientThreadId = id;
this.deviceSchemas = MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId);
this.deviceSchemasSize = deviceSchemas.size();
this.clientDeviceSchemas =
MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId);
this.measurement = new Measurement();
this.service =
Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("ShowWorkProgress-" + String.valueOf(clientThreadId)));
new NamedThreadFactory("ShowWorkProgress-" + clientThreadId));
initDBWrappers();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ protected void doTest() {
/** Do Ingestion Operation @Return when connect failed return false */
private boolean ingestionOperation() {
try {
for (int i = 0; i < deviceSchemasSize; i++) {
for (int i = 0; i < clientDeviceSchemas.size(); i += config.getDEVICE_NUM_PER_WRITE()) {
int innerLoop = 0;
if (config.isIS_SENSOR_TS_ALIGNMENT()) {
innerLoop = 1;
} else {
if (config.isIS_CLIENT_BIND()) {
innerLoop = deviceSchemas.get(i).getSensors().size();
innerLoop = clientDeviceSchemas.get(i).getSensors().size();
} else {
innerLoop = deviceSchemas.get(i).getSensors().size() * config.getDEVICE_NUMBER();
innerLoop = clientDeviceSchemas.get(i).getSensors().size() * config.getDEVICE_NUMBER();
}
}
for (int j = 0; j < innerLoop; j++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ public GenerateDataWriteClient(int id, CountDownLatch countDownLatch, CyclicBarr
/** Do Operations */
@Override
protected void doTest() {
loop:
for (loopIndex = 0; loopIndex < config.getLOOP(); loopIndex++) {
if (!doGenerate()) {
break loop;
break;
}
if (isStop.get()) {
break;
Expand All @@ -50,9 +49,8 @@ protected void doTest() {
/** Do Ingestion Operation @Return when connect failed return false */
private boolean doGenerate() {
try {
for (int i = 0; i < deviceSchemas.size(); i++) {
int innerLoop =
config.isIS_SENSOR_TS_ALIGNMENT() ? 1 : deviceSchemas.get(i).getSensors().size();
final int innerLoop = config.isIS_SENSOR_TS_ALIGNMENT() ? 1 : config.getSENSOR_NUMBER();
for (int i = 0; i < clientDeviceSchemas.size(); i += config.getDEVICE_NUM_PER_WRITE()) {
for (int j = 0; j < innerLoop; j++) {
IBatch batch = dataWorkLoad.getOneBatch();
if (checkBatch(batch)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

import cn.edu.tsinghua.iot.benchmark.mode.enums.BenchmarkMode;
import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig;
import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBInsertMode;
import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBSwitch;
import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBType;
import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBVersion;
import cn.edu.tsinghua.iot.benchmark.utils.CommonAlgorithms;
import cn.edu.tsinghua.iot.benchmark.workload.enums.OutOfOrderMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -577,53 +577,81 @@ private boolean checkConfig() {
if (config.isIS_DOUBLE_WRITE()) {
result &= checkDatabaseVerification(config.getANOTHER_DBConfig());
}
break;
default:
break;
}
// check config
result &= checkOperationProportion();
if (config.getCLIENT_NUMBER() == 0) {
LOGGER.error("Client number can't be zero");
result = false;
}
result &= checkDeviceNumPerWrite();
result &= checkTag();
return result;
}

private boolean checkOperationProportion() {
String[] op = config.getOPERATION_PROPORTION().split(":");
int minOps = 0;
for (int i = 0; i < op.length; i++) {
if (Double.valueOf(op[i]) > 1e-7) {
for (String s : op) {
if (Double.parseDouble(s) > 1e-7) {
minOps++;
}
}
if (minOps > config.getLOOP()) {
LOGGER.error("Loop is too small that can't meet the need of OPERATION_PROPORTION");
result = false;
}
if (config.getCLIENT_NUMBER() == 0) {
LOGGER.error("Client number can't be zero");
result = false;
return false;
}
// check DEVICE_NUM_PER_WRITE
if (config.getDEVICE_NUM_PER_WRITE() <= 0) {
return true;
}

private boolean checkDeviceNumPerWrite() {
final int dnw = config.getDEVICE_NUM_PER_WRITE();
if (dnw <= 0) {
LOGGER.error("DEVICE_NUM_PER_WRITE must be greater than 0");
result = false;
return false;
}
if (dnw == 1) {
return true;
}
if (config.getDbConfig().getDB_SWITCH().getType() != DBType.IoTDB) {
LOGGER.error("DEVICE_NUM_PER_WRITE is only supported in IoTDB");
return false;
}
if (config.getDbConfig().getDB_SWITCH().getInsertMode() != INSERT_USE_SESSION_RECORDS) {
LOGGER.error("The combination of DEVICE_NUM_PER_WRITE and insert-mode is not supported");
return false;
}
// check insert mode
if (config.getDbConfig().getDB_SWITCH().getType() == DBType.IoTDB) {
DBInsertMode insertMode = config.getDbConfig().getDB_SWITCH().getInsertMode();
if (config.getDEVICE_NUM_PER_WRITE() != 1 && insertMode != INSERT_USE_SESSION_RECORDS) {
LOGGER.error("The combination of DEVICE_NUM_PER_WRITE and insert-mode is not supported");
result = false;
for (int deviceNum :
CommonAlgorithms.distributeDevicesToClients(
config.getDEVICE_NUMBER(), config.getCLIENT_NUMBER())
.values()) {
if (deviceNum % dnw != 0) {
LOGGER.error(
"Some clients will be allocated {} devices, which are not divisible by parameter DEVICE_NUM_PER_WRITE {}.\n"
+ "To solve this problem, please make DEVICE_NUMBER % CLIENTS_NUMBER == 0, and (DEVICE_NUMBER / CLIENT_NUMBER) % DEVICE_NUM_PER_WRITE == 0.",
deviceNum, dnw);
return false;
}
}
if (config.getDEVICE_NUM_PER_WRITE() != 1 && !config.isIS_SENSOR_TS_ALIGNMENT()) {
LOGGER.error(
"The combination of \"DEVICE_NUM_PER_WRITE > 1\" and \"IS_SENSOR_TS_ALIGNMENT == false\" is not supported");
result = false;
if (!config.isIS_SENSOR_TS_ALIGNMENT()) {
LOGGER.error("When DEVICE_NUM_PER_WRITE > 1, IS_SENSOR_TS_ALIGNMENT must be true");
return false;
}
// check tag
return true;
}

private boolean checkTag() {
if (config.getTAG_NUMBER() != config.getTAG_VALUE_CARDINALITY().size()) {
LOGGER.error(
"TAG_NUMBER must be equal to TAG_VALUE_CARDINALITY's size. Currently, "
+ "TAG_NUMBER = {}, TAG_VALUE_CARDINALITY = {}.",
config.getTAG_NUMBER(),
config.getTAG_VALUE_CARDINALITY().size());
result = false;
return false;
}
return result;
return true;
}

private void checkQuery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import cn.edu.tsinghua.iot.benchmark.entity.Sensor;
import cn.edu.tsinghua.iot.benchmark.schema.MetaDataSchema;
import cn.edu.tsinghua.iot.benchmark.schema.MetaUtil;
import cn.edu.tsinghua.iot.benchmark.utils.CommonAlgorithms;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/** Data Schema for generate data */
public class GenerateMetaDataSchema extends MetaDataSchema {
Expand All @@ -43,13 +45,12 @@ public boolean createMetaDataSchema() {
return false;
}

int eachClientDeviceNum = config.getDEVICE_NUMBER() / config.getCLIENT_NUMBER();
// The part that cannot be divided equally is given to dataClients with a smaller number
int leftClientDeviceNum = config.getDEVICE_NUMBER() % config.getCLIENT_NUMBER();
int deviceId = MetaUtil.getDeviceId(0);
Map<Integer, Integer> deviceDistribution =
CommonAlgorithms.distributeDevicesToClients(
config.getDEVICE_NUMBER(), config.getCLIENT_NUMBER());
for (int clientId = 0; clientId < config.getCLIENT_NUMBER(); clientId++) {
int deviceNumber =
(clientId < leftClientDeviceNum) ? eachClientDeviceNum + 1 : eachClientDeviceNum;
int deviceNumber = deviceDistribution.get(clientId);
List<DeviceSchema> deviceSchemaList = new ArrayList<>();
for (int d = 0; d < deviceNumber; d++) {
DeviceSchema deviceSchema = new DeviceSchema(deviceId, sensors, MetaUtil.getTags(deviceId));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package cn.edu.tsinghua.iot.benchmark.utils;

import java.util.HashMap;
import java.util.Map;

public class CommonAlgorithms {
private CommonAlgorithms() {}

/**
* Distribute devices to clients as balance as possible. For example, 100 devices, 8 clients, the
* result will be {13,13,13,13,12,12,12,12}.
*
* @return Map{clientID, how many devices should it take}
*/
public static Map<Integer, Integer> distributeDevicesToClients(
final int deviceNumber, final int clientNumber) {
final int eachClientDeviceNum = deviceNumber / clientNumber;
final int leftClientDeviceNum = deviceNumber % clientNumber;
Map<Integer, Integer> result = new HashMap<>();
for (int clientId = 0; clientId < clientNumber; clientId++) {
int deviceNum =
(clientId < leftClientDeviceNum) ? eachClientDeviceNum + 1 : eachClientDeviceNum;
result.put(clientId, deviceNum);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public static IDataWorkLoad getInstance(int clientId) {
List<String> files = MetaUtil.getClientFiles().get(clientId);
return new RealDataWorkLoad(files);
} else {
List<DeviceSchema> deviceSchemas = metaDataSchema.getDeviceSchemaByClientId(clientId);
if (config.isIS_CLIENT_BIND()) {
List<DeviceSchema> deviceSchemas = metaDataSchema.getDeviceSchemaByClientId(clientId);
return new SyntheticDataWorkLoad(deviceSchemas);
} else {
return SingletonWorkDataWorkLoad.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public IBatch getOneBatch() throws WorkloadException {
} else {
batch = new MultiDeviceBatch(config.getDEVICE_NUM_PER_WRITE());
}
for (int deviceID = 0; deviceID < config.getDEVICE_NUM_PER_WRITE(); deviceID++) {
for (int i = 0; i < config.getDEVICE_NUM_PER_WRITE(); i++) {
long curLoop = insertLoop.getAndIncrement();
// create schema of batch
List<Sensor> sensors = new ArrayList<>();
Expand Down
Loading