From 76b03fbeb929c1368bea630b041f246155aa3900 Mon Sep 17 00:00:00 2001 From: liyuheng Date: Wed, 15 Nov 2023 14:44:11 +0800 Subject: [PATCH 1/3] done --- configuration/conf/config.properties | 2 +- .../iot/benchmark/client/DataClient.java | 10 ++-- .../generate/GenerateDataMixClient.java | 6 +-- .../generate/GenerateDataWriteClient.java | 8 ++- .../iot/benchmark/conf/ConfigDescriptor.java | 49 +++++++++++++------ .../schemaImpl/GenerateMetaDataSchema.java | 11 +++-- .../iot/benchmark/utils/CommonAlgorithms.java | 46 +++++++++++++++++ .../iot/benchmark/workload/DataWorkLoad.java | 2 +- .../workload/SingletonWorkDataWorkLoad.java | 2 +- 9 files changed, 99 insertions(+), 37 deletions(-) create mode 100644 core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/CommonAlgorithms.java diff --git a/configuration/conf/config.properties b/configuration/conf/config.properties index 877e9c3ae..56353f1d7 100644 --- a/configuration/conf/config.properties +++ b/configuration/conf/config.properties @@ -301,7 +301,7 @@ # 每个Batch写入数据总点数=DEVICE_NUM_PER_WRITE * SENSOR_NUMBER * BATCH_SIZE_PER_WRITE # BATCH_SIZE_PER_WRITE=100 -# 每批写入设备数,必须能整除批写入数据行数 +# 每批写入设备数,必须能整除每个client被分配到的设备数 # DEVICE_NUM_PER_WRITE=1 # 是否在写入数据前,创建SCHEMA diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java index 73f966994..df17b7f03 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java @@ -62,9 +62,7 @@ public abstract class DataClient implements Runnable { /** Tested DataBase */ protected DBWrapper dbWrapper = null; /** Related Schema */ - protected final List deviceSchemas; - /** Related Schema Size */ - protected final int deviceSchemasSize; + protected final List clientDeviceSchemas; /** Measurement */ protected Measurement measurement; /** Total number of loop */ @@ -86,12 +84,12 @@ public DataClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) this.dataWorkLoad = DataWorkLoad.getInstance(id); this.queryWorkLoad = QueryWorkLoad.getInstance(id); this.clientThreadId = id; - this.deviceSchemas = MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId); - this.deviceSchemasSize = deviceSchemas.size(); + this.clientDeviceSchemas = + MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId); this.measurement = new Measurement(); this.service = Executors.newSingleThreadScheduledExecutor( - new NamedThreadFactory("ShowWorkProgress-" + String.valueOf(clientThreadId))); + new NamedThreadFactory("ShowWorkProgress-" + clientThreadId)); initDBWrappers(); } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java index d05bdf422..2cc764430 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java @@ -128,15 +128,15 @@ protected void doTest() { /** Do Ingestion Operation @Return when connect failed return false */ private boolean ingestionOperation() { try { - for (int i = 0; i < deviceSchemasSize; i++) { + for (int i = 0; i < clientDeviceSchemas.size(); i += config.getDEVICE_NUM_PER_WRITE()) { int innerLoop = 0; if (config.isIS_SENSOR_TS_ALIGNMENT()) { innerLoop = 1; } else { if (config.isIS_CLIENT_BIND()) { - innerLoop = deviceSchemas.get(i).getSensors().size(); + innerLoop = clientDeviceSchemas.get(i).getSensors().size(); } else { - innerLoop = deviceSchemas.get(i).getSensors().size() * config.getDEVICE_NUMBER(); + innerLoop = clientDeviceSchemas.get(i).getSensors().size() * config.getDEVICE_NUMBER(); } } for (int j = 0; j < innerLoop; j++) { diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataWriteClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataWriteClient.java index 1d261c8f9..94fdd9351 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataWriteClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataWriteClient.java @@ -36,10 +36,9 @@ public GenerateDataWriteClient(int id, CountDownLatch countDownLatch, CyclicBarr /** Do Operations */ @Override protected void doTest() { - loop: for (loopIndex = 0; loopIndex < config.getLOOP(); loopIndex++) { if (!doGenerate()) { - break loop; + break; } if (isStop.get()) { break; @@ -50,9 +49,8 @@ protected void doTest() { /** Do Ingestion Operation @Return when connect failed return false */ private boolean doGenerate() { try { - for (int i = 0; i < deviceSchemas.size(); i++) { - int innerLoop = - config.isIS_SENSOR_TS_ALIGNMENT() ? 1 : deviceSchemas.get(i).getSensors().size(); + final int innerLoop = config.isIS_SENSOR_TS_ALIGNMENT() ? 1 : config.getSENSOR_NUMBER(); + for (int i = 0; i < clientDeviceSchemas.size(); i += config.getDEVICE_NUM_PER_WRITE()) { for (int j = 0; j < innerLoop; j++) { IBatch batch = dataWorkLoad.getOneBatch(); if (checkBatch(batch)) { diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java index e9bfb523c..e495640f4 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java @@ -21,10 +21,10 @@ import cn.edu.tsinghua.iot.benchmark.mode.enums.BenchmarkMode; import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; -import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBInsertMode; import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBSwitch; import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBType; import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBVersion; +import cn.edu.tsinghua.iot.benchmark.utils.CommonAlgorithms; import cn.edu.tsinghua.iot.benchmark.workload.enums.OutOfOrderMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -578,25 +578,44 @@ private boolean checkConfig() { LOGGER.error("Client number can't be zero"); result = false; } - // check DEVICE_NUM_PER_WRITE - if (config.getDEVICE_NUM_PER_WRITE() <= 0) { + result &= checkDeviceNumPerWrite(); + return result; + } + + private boolean checkDeviceNumPerWrite() { + final int dnw = config.getDEVICE_NUM_PER_WRITE(); + if (dnw <= 0) { LOGGER.error("DEVICE_NUM_PER_WRITE must be greater than 0"); - result = false; + return false; + } + if (dnw == 1) { + return true; + } + if (config.getDbConfig().getDB_SWITCH().getType() != DBType.IoTDB) { + LOGGER.error("DEVICE_NUM_PER_WRITE is only supported in IoTDB"); + return false; + } + if (config.getDbConfig().getDB_SWITCH().getInsertMode() != INSERT_USE_SESSION_RECORDS) { + LOGGER.error("The combination of DEVICE_NUM_PER_WRITE and insert-mode is not supported"); + return false; } - // check insert mode - if (config.getDbConfig().getDB_SWITCH().getType() == DBType.IoTDB) { - DBInsertMode insertMode = config.getDbConfig().getDB_SWITCH().getInsertMode(); - if (config.getDEVICE_NUM_PER_WRITE() != 1 && insertMode != INSERT_USE_SESSION_RECORDS) { - LOGGER.error("The combination of DEVICE_NUM_PER_WRITE and insert-mode is not supported"); - result = false; + for (int deviceNum : + CommonAlgorithms.distributeDevicesToClients( + config.getDEVICE_NUMBER(), config.getCLIENT_NUMBER()) + .values()) { + if (deviceNum % dnw != 0) { + LOGGER.error( + "Some clients will be allocated {} devices, which are not divisible by parameter DEVICE_NUM_PER_WRITE {}.\n" + + "To solve this problem, please make DEVICE_NUMBER % CLIENTS_NUMBER == 0, and (DEVICE_NUMBER / CLIENT_NUMBER) % DEVICE_NUM_PER_WRITE == 0.", + deviceNum, dnw); + return false; } } - if (config.getDEVICE_NUM_PER_WRITE() != 1 && !config.isIS_SENSOR_TS_ALIGNMENT()) { - LOGGER.error( - "The combination of \"DEVICE_NUM_PER_WRITE > 1\" and \"IS_SENSOR_TS_ALIGNMENT == false\" is not supported"); - result = false; + if (!config.isIS_SENSOR_TS_ALIGNMENT()) { + LOGGER.error("When DEVICE_NUM_PER_WRITE > 1, IS_SENSOR_TS_ALIGNMENT must be true"); + return false; } - return result; + return true; } private void checkQuery() { diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/schema/schemaImpl/GenerateMetaDataSchema.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/schema/schemaImpl/GenerateMetaDataSchema.java index c5658603b..7481ac455 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/schema/schemaImpl/GenerateMetaDataSchema.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/schema/schemaImpl/GenerateMetaDataSchema.java @@ -24,11 +24,13 @@ import cn.edu.tsinghua.iot.benchmark.entity.Sensor; import cn.edu.tsinghua.iot.benchmark.schema.MetaDataSchema; import cn.edu.tsinghua.iot.benchmark.schema.MetaUtil; +import cn.edu.tsinghua.iot.benchmark.utils.CommonAlgorithms; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** Data Schema for generate data */ public class GenerateMetaDataSchema extends MetaDataSchema { @@ -43,13 +45,12 @@ public boolean createMetaDataSchema() { return false; } - int eachClientDeviceNum = config.getDEVICE_NUMBER() / config.getCLIENT_NUMBER(); - // The part that cannot be divided equally is given to dataClients with a smaller number - int leftClientDeviceNum = config.getDEVICE_NUMBER() % config.getCLIENT_NUMBER(); int deviceId = MetaUtil.getDeviceId(0); + Map deviceDistribution = + CommonAlgorithms.distributeDevicesToClients( + config.getDEVICE_NUMBER(), config.getCLIENT_NUMBER()); for (int clientId = 0; clientId < config.getCLIENT_NUMBER(); clientId++) { - int deviceNumber = - (clientId < leftClientDeviceNum) ? eachClientDeviceNum + 1 : eachClientDeviceNum; + int deviceNumber = deviceDistribution.get(clientId); List deviceSchemaList = new ArrayList<>(); for (int d = 0; d < deviceNumber; d++) { DeviceSchema deviceSchema = new DeviceSchema(deviceId, sensors, config.getDEVICE_TAGS()); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/CommonAlgorithms.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/CommonAlgorithms.java new file mode 100644 index 000000000..4aec97097 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/CommonAlgorithms.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cn.edu.tsinghua.iot.benchmark.utils; + +import java.util.HashMap; +import java.util.Map; + +public class CommonAlgorithms { + private CommonAlgorithms() {} + + /** + * Distribute devices to clients as balance as possible. For example, 100 devices, 8 clients, the + * result will be {13,13,13,13,12,12,12,12}. + * + * @return Map{clientID, how many devices should it take} + */ + public static Map distributeDevicesToClients( + final int deviceNumber, final int clientNumber) { + final int eachClientDeviceNum = deviceNumber / clientNumber; + final int leftClientDeviceNum = deviceNumber % clientNumber; + Map result = new HashMap<>(); + for (int clientId = 0; clientId < clientNumber; clientId++) { + int deviceNum = + (clientId < leftClientDeviceNum) ? eachClientDeviceNum + 1 : eachClientDeviceNum; + result.put(clientId, deviceNum); + } + return result; + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/DataWorkLoad.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/DataWorkLoad.java index dcfc900f1..ed7cbc7e2 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/DataWorkLoad.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/DataWorkLoad.java @@ -41,8 +41,8 @@ public static IDataWorkLoad getInstance(int clientId) { List files = MetaUtil.getClientFiles().get(clientId); return new RealDataWorkLoad(files); } else { - List deviceSchemas = metaDataSchema.getDeviceSchemaByClientId(clientId); if (config.isIS_CLIENT_BIND()) { + List deviceSchemas = metaDataSchema.getDeviceSchemaByClientId(clientId); return new SyntheticDataWorkLoad(deviceSchemas); } else { return SingletonWorkDataWorkLoad.getInstance(); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/SingletonWorkDataWorkLoad.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/SingletonWorkDataWorkLoad.java index 700dc415f..1a8ed0566 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/SingletonWorkDataWorkLoad.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/SingletonWorkDataWorkLoad.java @@ -74,7 +74,7 @@ public IBatch getOneBatch() throws WorkloadException { } else { batch = new MultiDeviceBatch(config.getDEVICE_NUM_PER_WRITE()); } - for (int deviceID = 0; deviceID < config.getDEVICE_NUM_PER_WRITE(); deviceID++) { + for (int i = 0; i < config.getDEVICE_NUM_PER_WRITE(); i++) { long curLoop = insertLoop.getAndIncrement(); // create schema of batch List sensors = new ArrayList<>(); From ec8a7951675803d81d6046b4f68dd56d3a66239f Mon Sep 17 00:00:00 2001 From: liyuheng Date: Wed, 15 Nov 2023 14:58:07 +0800 Subject: [PATCH 2/3] extract some check methods --- .../iot/benchmark/conf/ConfigDescriptor.java | 58 +++++++++++-------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java index 11812fe3b..7fd33ba1c 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java @@ -580,33 +580,29 @@ private boolean checkConfig() { default: break; } - // check config + result &= checkOperationProportion(); + if (config.getCLIENT_NUMBER() == 0) { + LOGGER.error("Client number can't be zero"); + result = false; + } + result &= checkDeviceNumPerWrite(); + result &= checkTag(); + return result; + } + + private boolean checkOperationProportion() { String[] op = config.getOPERATION_PROPORTION().split(":"); int minOps = 0; - for (int i = 0; i < op.length; i++) { - if (Double.valueOf(op[i]) > 1e-7) { + for (String s : op) { + if (Double.parseDouble(s) > 1e-7) { minOps++; } } if (minOps > config.getLOOP()) { LOGGER.error("Loop is too small that can't meet the need of OPERATION_PROPORTION"); - result = false; - } - if (config.getCLIENT_NUMBER() == 0) { - LOGGER.error("Client number can't be zero"); - result = false; - } - result &= checkDeviceNumPerWrite(); - // check tag - if (config.getTAG_NUMBER() != config.getTAG_VALUE_CARDINALITY().size()) { - LOGGER.error( - "TAG_NUMBER must be equal to TAG_VALUE_CARDINALITY's size. Currently, " - + "TAG_NUMBER = {}, TAG_VALUE_CARDINALITY = {}.", - config.getTAG_NUMBER(), - config.getTAG_VALUE_CARDINALITY().size()); - result = false; + return false; } - return result; + return true; } private boolean checkDeviceNumPerWrite() { @@ -627,14 +623,14 @@ private boolean checkDeviceNumPerWrite() { return false; } for (int deviceNum : - CommonAlgorithms.distributeDevicesToClients( - config.getDEVICE_NUMBER(), config.getCLIENT_NUMBER()) - .values()) { + CommonAlgorithms.distributeDevicesToClients( + config.getDEVICE_NUMBER(), config.getCLIENT_NUMBER()) + .values()) { if (deviceNum % dnw != 0) { LOGGER.error( - "Some clients will be allocated {} devices, which are not divisible by parameter DEVICE_NUM_PER_WRITE {}.\n" - + "To solve this problem, please make DEVICE_NUMBER % CLIENTS_NUMBER == 0, and (DEVICE_NUMBER / CLIENT_NUMBER) % DEVICE_NUM_PER_WRITE == 0.", - deviceNum, dnw); + "Some clients will be allocated {} devices, which are not divisible by parameter DEVICE_NUM_PER_WRITE {}.\n" + + "To solve this problem, please make DEVICE_NUMBER % CLIENTS_NUMBER == 0, and (DEVICE_NUMBER / CLIENT_NUMBER) % DEVICE_NUM_PER_WRITE == 0.", + deviceNum, dnw); return false; } } @@ -645,6 +641,18 @@ private boolean checkDeviceNumPerWrite() { return true; } + private boolean checkTag() { + if (config.getTAG_NUMBER() != config.getTAG_VALUE_CARDINALITY().size()) { + LOGGER.error( + "TAG_NUMBER must be equal to TAG_VALUE_CARDINALITY's size. Currently, " + + "TAG_NUMBER = {}, TAG_VALUE_CARDINALITY = {}.", + config.getTAG_NUMBER(), + config.getTAG_VALUE_CARDINALITY().size()); + return false; + } + return true; + } + private void checkQuery() { if (config.isIS_DELETE_DATA()) { LOGGER.warn("Benchmark is doing query, no need to delete data."); From e320c749208ff3ad8cd2707b77a81320bedc4a2f Mon Sep 17 00:00:00 2001 From: liyuheng Date: Wed, 15 Nov 2023 14:59:40 +0800 Subject: [PATCH 3/3] break --- .../cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java index 7fd33ba1c..5ac46a79d 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java @@ -577,6 +577,7 @@ private boolean checkConfig() { if (config.isIS_DOUBLE_WRITE()) { result &= checkDatabaseVerification(config.getANOTHER_DBConfig()); } + break; default: break; }