diff --git a/configuration/conf/config.properties b/configuration/conf/config.properties index 415548437..48886cb2f 100644 --- a/configuration/conf/config.properties +++ b/configuration/conf/config.properties @@ -307,7 +307,7 @@ # 每个Batch写入数据总点数=DEVICE_NUM_PER_WRITE * SENSOR_NUMBER * BATCH_SIZE_PER_WRITE # BATCH_SIZE_PER_WRITE=100 -# 每批写入设备数,必须能整除批写入数据行数 +# 每批写入设备数,必须能整除每个client被分配到的设备数 # DEVICE_NUM_PER_WRITE=1 # 是否在写入数据前,创建SCHEMA diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java index 73f966994..df17b7f03 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java @@ -62,9 +62,7 @@ public abstract class DataClient implements Runnable { /** Tested DataBase */ protected DBWrapper dbWrapper = null; /** Related Schema */ - protected final List deviceSchemas; - /** Related Schema Size */ - protected final int deviceSchemasSize; + protected final List clientDeviceSchemas; /** Measurement */ protected Measurement measurement; /** Total number of loop */ @@ -86,12 +84,12 @@ public DataClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) this.dataWorkLoad = DataWorkLoad.getInstance(id); this.queryWorkLoad = QueryWorkLoad.getInstance(id); this.clientThreadId = id; - this.deviceSchemas = MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId); - this.deviceSchemasSize = deviceSchemas.size(); + this.clientDeviceSchemas = + MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId); this.measurement = new Measurement(); this.service = Executors.newSingleThreadScheduledExecutor( - new NamedThreadFactory("ShowWorkProgress-" + String.valueOf(clientThreadId))); + new NamedThreadFactory("ShowWorkProgress-" + clientThreadId)); initDBWrappers(); } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java index d05bdf422..2cc764430 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java @@ -128,15 +128,15 @@ protected void doTest() { /** Do Ingestion Operation @Return when connect failed return false */ private boolean ingestionOperation() { try { - for (int i = 0; i < deviceSchemasSize; i++) { + for (int i = 0; i < clientDeviceSchemas.size(); i += config.getDEVICE_NUM_PER_WRITE()) { int innerLoop = 0; if (config.isIS_SENSOR_TS_ALIGNMENT()) { innerLoop = 1; } else { if (config.isIS_CLIENT_BIND()) { - innerLoop = deviceSchemas.get(i).getSensors().size(); + innerLoop = clientDeviceSchemas.get(i).getSensors().size(); } else { - innerLoop = deviceSchemas.get(i).getSensors().size() * config.getDEVICE_NUMBER(); + innerLoop = clientDeviceSchemas.get(i).getSensors().size() * config.getDEVICE_NUMBER(); } } for (int j = 0; j < innerLoop; j++) { diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataWriteClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataWriteClient.java index 1d261c8f9..94fdd9351 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataWriteClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataWriteClient.java @@ -36,10 +36,9 @@ public GenerateDataWriteClient(int id, CountDownLatch countDownLatch, CyclicBarr /** Do Operations */ @Override protected void doTest() { - loop: for (loopIndex = 0; loopIndex < config.getLOOP(); loopIndex++) { if (!doGenerate()) { - break loop; + break; } if (isStop.get()) { break; @@ -50,9 +49,8 @@ protected void doTest() { /** Do Ingestion Operation @Return when connect failed return false */ private boolean doGenerate() { try { - for (int i = 0; i < deviceSchemas.size(); i++) { - int innerLoop = - config.isIS_SENSOR_TS_ALIGNMENT() ? 1 : deviceSchemas.get(i).getSensors().size(); + final int innerLoop = config.isIS_SENSOR_TS_ALIGNMENT() ? 1 : config.getSENSOR_NUMBER(); + for (int i = 0; i < clientDeviceSchemas.size(); i += config.getDEVICE_NUM_PER_WRITE()) { for (int j = 0; j < innerLoop; j++) { IBatch batch = dataWorkLoad.getOneBatch(); if (checkBatch(batch)) { diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java index 1e1a99807..5ac46a79d 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java @@ -21,10 +21,10 @@ import cn.edu.tsinghua.iot.benchmark.mode.enums.BenchmarkMode; import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; -import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBInsertMode; import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBSwitch; import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBType; import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBVersion; +import cn.edu.tsinghua.iot.benchmark.utils.CommonAlgorithms; import cn.edu.tsinghua.iot.benchmark.workload.enums.OutOfOrderMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -577,53 +577,81 @@ private boolean checkConfig() { if (config.isIS_DOUBLE_WRITE()) { result &= checkDatabaseVerification(config.getANOTHER_DBConfig()); } + break; default: break; } - // check config + result &= checkOperationProportion(); + if (config.getCLIENT_NUMBER() == 0) { + LOGGER.error("Client number can't be zero"); + result = false; + } + result &= checkDeviceNumPerWrite(); + result &= checkTag(); + return result; + } + + private boolean checkOperationProportion() { String[] op = config.getOPERATION_PROPORTION().split(":"); int minOps = 0; - for (int i = 0; i < op.length; i++) { - if (Double.valueOf(op[i]) > 1e-7) { + for (String s : op) { + if (Double.parseDouble(s) > 1e-7) { minOps++; } } if (minOps > config.getLOOP()) { LOGGER.error("Loop is too small that can't meet the need of OPERATION_PROPORTION"); - result = false; - } - if (config.getCLIENT_NUMBER() == 0) { - LOGGER.error("Client number can't be zero"); - result = false; + return false; } - // check DEVICE_NUM_PER_WRITE - if (config.getDEVICE_NUM_PER_WRITE() <= 0) { + return true; + } + + private boolean checkDeviceNumPerWrite() { + final int dnw = config.getDEVICE_NUM_PER_WRITE(); + if (dnw <= 0) { LOGGER.error("DEVICE_NUM_PER_WRITE must be greater than 0"); - result = false; + return false; + } + if (dnw == 1) { + return true; + } + if (config.getDbConfig().getDB_SWITCH().getType() != DBType.IoTDB) { + LOGGER.error("DEVICE_NUM_PER_WRITE is only supported in IoTDB"); + return false; + } + if (config.getDbConfig().getDB_SWITCH().getInsertMode() != INSERT_USE_SESSION_RECORDS) { + LOGGER.error("The combination of DEVICE_NUM_PER_WRITE and insert-mode is not supported"); + return false; } - // check insert mode - if (config.getDbConfig().getDB_SWITCH().getType() == DBType.IoTDB) { - DBInsertMode insertMode = config.getDbConfig().getDB_SWITCH().getInsertMode(); - if (config.getDEVICE_NUM_PER_WRITE() != 1 && insertMode != INSERT_USE_SESSION_RECORDS) { - LOGGER.error("The combination of DEVICE_NUM_PER_WRITE and insert-mode is not supported"); - result = false; + for (int deviceNum : + CommonAlgorithms.distributeDevicesToClients( + config.getDEVICE_NUMBER(), config.getCLIENT_NUMBER()) + .values()) { + if (deviceNum % dnw != 0) { + LOGGER.error( + "Some clients will be allocated {} devices, which are not divisible by parameter DEVICE_NUM_PER_WRITE {}.\n" + + "To solve this problem, please make DEVICE_NUMBER % CLIENTS_NUMBER == 0, and (DEVICE_NUMBER / CLIENT_NUMBER) % DEVICE_NUM_PER_WRITE == 0.", + deviceNum, dnw); + return false; } } - if (config.getDEVICE_NUM_PER_WRITE() != 1 && !config.isIS_SENSOR_TS_ALIGNMENT()) { - LOGGER.error( - "The combination of \"DEVICE_NUM_PER_WRITE > 1\" and \"IS_SENSOR_TS_ALIGNMENT == false\" is not supported"); - result = false; + if (!config.isIS_SENSOR_TS_ALIGNMENT()) { + LOGGER.error("When DEVICE_NUM_PER_WRITE > 1, IS_SENSOR_TS_ALIGNMENT must be true"); + return false; } - // check tag + return true; + } + + private boolean checkTag() { if (config.getTAG_NUMBER() != config.getTAG_VALUE_CARDINALITY().size()) { LOGGER.error( "TAG_NUMBER must be equal to TAG_VALUE_CARDINALITY's size. Currently, " + "TAG_NUMBER = {}, TAG_VALUE_CARDINALITY = {}.", config.getTAG_NUMBER(), config.getTAG_VALUE_CARDINALITY().size()); - result = false; + return false; } - return result; + return true; } private void checkQuery() { diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/schema/schemaImpl/GenerateMetaDataSchema.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/schema/schemaImpl/GenerateMetaDataSchema.java index e6374b9e5..59d6b4cb4 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/schema/schemaImpl/GenerateMetaDataSchema.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/schema/schemaImpl/GenerateMetaDataSchema.java @@ -24,11 +24,13 @@ import cn.edu.tsinghua.iot.benchmark.entity.Sensor; import cn.edu.tsinghua.iot.benchmark.schema.MetaDataSchema; import cn.edu.tsinghua.iot.benchmark.schema.MetaUtil; +import cn.edu.tsinghua.iot.benchmark.utils.CommonAlgorithms; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** Data Schema for generate data */ public class GenerateMetaDataSchema extends MetaDataSchema { @@ -43,13 +45,12 @@ public boolean createMetaDataSchema() { return false; } - int eachClientDeviceNum = config.getDEVICE_NUMBER() / config.getCLIENT_NUMBER(); - // The part that cannot be divided equally is given to dataClients with a smaller number - int leftClientDeviceNum = config.getDEVICE_NUMBER() % config.getCLIENT_NUMBER(); int deviceId = MetaUtil.getDeviceId(0); + Map deviceDistribution = + CommonAlgorithms.distributeDevicesToClients( + config.getDEVICE_NUMBER(), config.getCLIENT_NUMBER()); for (int clientId = 0; clientId < config.getCLIENT_NUMBER(); clientId++) { - int deviceNumber = - (clientId < leftClientDeviceNum) ? eachClientDeviceNum + 1 : eachClientDeviceNum; + int deviceNumber = deviceDistribution.get(clientId); List deviceSchemaList = new ArrayList<>(); for (int d = 0; d < deviceNumber; d++) { DeviceSchema deviceSchema = new DeviceSchema(deviceId, sensors, MetaUtil.getTags(deviceId)); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/CommonAlgorithms.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/CommonAlgorithms.java new file mode 100644 index 000000000..4aec97097 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/CommonAlgorithms.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cn.edu.tsinghua.iot.benchmark.utils; + +import java.util.HashMap; +import java.util.Map; + +public class CommonAlgorithms { + private CommonAlgorithms() {} + + /** + * Distribute devices to clients as balance as possible. For example, 100 devices, 8 clients, the + * result will be {13,13,13,13,12,12,12,12}. + * + * @return Map{clientID, how many devices should it take} + */ + public static Map distributeDevicesToClients( + final int deviceNumber, final int clientNumber) { + final int eachClientDeviceNum = deviceNumber / clientNumber; + final int leftClientDeviceNum = deviceNumber % clientNumber; + Map result = new HashMap<>(); + for (int clientId = 0; clientId < clientNumber; clientId++) { + int deviceNum = + (clientId < leftClientDeviceNum) ? eachClientDeviceNum + 1 : eachClientDeviceNum; + result.put(clientId, deviceNum); + } + return result; + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/DataWorkLoad.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/DataWorkLoad.java index dcfc900f1..ed7cbc7e2 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/DataWorkLoad.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/DataWorkLoad.java @@ -41,8 +41,8 @@ public static IDataWorkLoad getInstance(int clientId) { List files = MetaUtil.getClientFiles().get(clientId); return new RealDataWorkLoad(files); } else { - List deviceSchemas = metaDataSchema.getDeviceSchemaByClientId(clientId); if (config.isIS_CLIENT_BIND()) { + List deviceSchemas = metaDataSchema.getDeviceSchemaByClientId(clientId); return new SyntheticDataWorkLoad(deviceSchemas); } else { return SingletonWorkDataWorkLoad.getInstance(); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/SingletonWorkDataWorkLoad.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/SingletonWorkDataWorkLoad.java index e09fe96e2..ec7ce0af5 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/SingletonWorkDataWorkLoad.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/SingletonWorkDataWorkLoad.java @@ -74,7 +74,7 @@ public IBatch getOneBatch() throws WorkloadException { } else { batch = new MultiDeviceBatch(config.getDEVICE_NUM_PER_WRITE()); } - for (int deviceID = 0; deviceID < config.getDEVICE_NUM_PER_WRITE(); deviceID++) { + for (int i = 0; i < config.getDEVICE_NUM_PER_WRITE(); i++) { long curLoop = insertLoop.getAndIncrement(); // create schema of batch List sensors = new ArrayList<>();