From 76af1a40043d90a8a08b1bff718a95514db68c5e Mon Sep 17 00:00:00 2001 From: Li Yu Heng Date: Tue, 19 Mar 2024 18:26:46 +0800 Subject: [PATCH] Periodically print test result (#402) --- configuration/conf/config.properties | 4 +- .../iot/benchmark/client/DataClient.java | 7 +- .../iot/benchmark/client/SchemaClient.java | 10 +- .../tsinghua/iot/benchmark/conf/Config.java | 10 ++ .../iot/benchmark/conf/ConfigDescriptor.java | 4 + .../benchmark/measurement/Measurement.java | 135 +++++++++++------- .../tsinghua/iot/benchmark/mode/BaseMode.java | 110 ++++++++++---- .../mode/TestWithDefaultPathMode.java | 21 +-- .../benchmark/mode/VerificationQueryMode.java | 12 +- .../benchmark/mode/VerificationWriteMode.java | 16 +-- .../iot/benchmark/tsdb/DBWrapper.java | 13 +- 11 files changed, 221 insertions(+), 121 deletions(-) diff --git a/configuration/conf/config.properties b/configuration/conf/config.properties index 91e737f66..4e3d5f42a 100644 --- a/configuration/conf/config.properties +++ b/configuration/conf/config.properties @@ -447,5 +447,7 @@ ################# 输出结果:日志参数 ###################### # 是否使用静默模式,静默模式会关闭部分日志输出 # IS_QUIET_MODE=true -# 测试过程日志的输出间隔,单位为秒 +# 测试过程中测试进度日志的输出间隔,单位为秒 # LOG_PRINT_INTERVAL=5 +# 测试过程中当前测试结果日志的输出间隔,单位为秒 +# RESULT_PRINT_INTERVAL=3600 diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java index 9dbf7a828..c66a9d18f 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java @@ -63,8 +63,6 @@ public abstract class DataClient implements Runnable { protected DBWrapper dbWrapper = null; /** Related Schema */ protected final List clientDeviceSchemas; - /** Measurement */ - protected Measurement measurement; /** Total number of loop */ protected long totalLoop = 0; /** Loop Index, using for loop and log */ @@ -86,7 +84,6 @@ public DataClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) this.clientThreadId = id; this.clientDeviceSchemas = MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId); - this.measurement = new Measurement(); this.service = Executors.newSingleThreadScheduledExecutor( new NamedThreadFactory("ShowWorkProgress-" + clientThreadId)); @@ -161,7 +158,7 @@ public void run() { } public Measurement getMeasurement() { - return measurement; + return dbWrapper.getMeasurement(); } /** Do test, Notice please use `isStop` parameters to control */ @@ -174,7 +171,7 @@ protected void initDBWrappers() { if (config.isIS_DOUBLE_WRITE()) { dbConfigs.add(config.getANOTHER_DBConfig()); } - dbWrapper = new DBWrapper(dbConfigs, measurement); + dbWrapper = new DBWrapper(dbConfigs); } /** Stop client */ diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/SchemaClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/SchemaClient.java index 4fb8fff34..8bc9b0ba8 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/SchemaClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/SchemaClient.java @@ -48,8 +48,6 @@ public class SchemaClient implements Runnable { protected final List deviceSchemas; /** Related Schema Size */ protected final int deviceSchemasSize; - /** Measurement */ - protected Measurement measurement; /** Control the end of client */ private final CountDownLatch countDownLatch; @@ -58,14 +56,12 @@ public class SchemaClient implements Runnable { /** result of register */ private Boolean result = false; - public SchemaClient( - int id, Measurement measurement, CountDownLatch countDownLatch, CyclicBarrier barrier) { + public SchemaClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) { this.countDownLatch = countDownLatch; this.barrier = barrier; this.clientThreadId = id; this.deviceSchemas = MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId); this.deviceSchemasSize = deviceSchemas.size(); - this.measurement = measurement; initDBWrappers(); } @@ -76,7 +72,7 @@ private void initDBWrappers() { if (config.isIS_DOUBLE_WRITE()) { dbConfigs.add(config.getANOTHER_DBConfig()); } - dbWrapper = new DBWrapper(dbConfigs, measurement); + dbWrapper = new DBWrapper(dbConfigs); } @Override @@ -113,7 +109,7 @@ public void run() { } public Measurement getMeasurement() { - return measurement; + return dbWrapper.getMeasurement(); } public Boolean getResult() { diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java index 75c6eff62..42f92e7a6 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java @@ -370,6 +370,8 @@ public class Config { /** Print test progress log interval in second */ private int LOG_PRINT_INTERVAL = 5; + private int RESULT_PRINT_INTERVAL = 3600; + // 输出:数据库配置,当前支持IoTDB和MySQL /** The Ip of database */ private String TEST_DATA_STORE_IP = "127.0.0.1"; @@ -1320,6 +1322,14 @@ public void setLOG_PRINT_INTERVAL(int LOG_PRINT_INTERVAL) { this.LOG_PRINT_INTERVAL = LOG_PRINT_INTERVAL; } + public int getRESULT_PRINT_INTERVAL() { + return RESULT_PRINT_INTERVAL; + } + + public void setRESULT_PRINT_INTERVAL(int RESULT_PRINT_INTERVAL) { + this.RESULT_PRINT_INTERVAL = RESULT_PRINT_INTERVAL; + } + public String getTEST_DATA_STORE_IP() { return TEST_DATA_STORE_IP; } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java index 9d0e47bda..71145c404 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/ConfigDescriptor.java @@ -471,6 +471,10 @@ private void loadProps() { config.setLOG_PRINT_INTERVAL( Integer.parseInt( properties.getProperty("LOG_PRINT_INTERVAL", config.getLOG_PRINT_INTERVAL() + ""))); + config.setRESULT_PRINT_INTERVAL( + Integer.parseInt( + properties.getProperty( + "RESULT_PRINT_INTERVAL", config.getRESULT_PRINT_INTERVAL() + ""))); config.setTEST_DATA_STORE_IP( properties.getProperty("TEST_DATA_STORE_IP", config.getTEST_DATA_STORE_IP())); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/Measurement.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/Measurement.java index e14f16f36..84c270595 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/Measurement.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/Measurement.java @@ -30,7 +30,6 @@ import com.clearspring.analytics.stream.quantile.TDigest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.weakref.jmx.internal.guava.util.concurrent.AtomicDouble; import java.io.BufferedWriter; import java.io.File; @@ -48,7 +47,7 @@ public class Measurement { new EnumMap<>(Operation.class); private static final Map operationLatencySumAllClient = new EnumMap<>(Operation.class); - private final AtomicDouble createSchemaTime = new AtomicDouble(0.0); + private double createSchemaFinishTime = 0; private double elapseTime; private final Map operationLatencySumThisClient; private final Map okOperationNumMap; @@ -74,6 +73,10 @@ public Measurement() { okPointNumMap = new EnumMap<>(Operation.class); failPointNumMap = new EnumMap<>(Operation.class); operationLatencySumThisClient = new EnumMap<>(Operation.class); + resetMeasurementMaps(); + } + + public void resetMeasurementMaps() { for (Operation operation : Operation.values()) { okOperationNumMap.put(operation, 0L); failOperationNumMap.put(operation, 0L); @@ -83,6 +86,12 @@ public Measurement() { } } + public void mergeCreateSchemaFinishTime(Measurement m) { + if (this.createSchemaFinishTime < m.getCreateSchemaFinishTime()) { + this.createSchemaFinishTime = m.getCreateSchemaFinishTime(); + } + } + /** * Users need to call calculateMetrics() after calling mergeMeasurement() to update metrics. * @@ -147,45 +156,53 @@ public void calculateMetrics(List operations) { } /** Show measurements and record according to TEST_DATA_PERSISTENCE */ - public void showMeasurements(List operations) { + public String getMeasurementsString(List operations) { PersistenceFactory persistenceFactory = new PersistenceFactory(); TestDataPersistence recorder = persistenceFactory.getPersistence(); - System.out.println(Thread.currentThread().getName() + " measurements:"); - System.out.println( - "Create schema cost " + String.format("%.2f", createSchemaTime.get()) + " second"); - System.out.println( - "Test elapsed time (not include schema creation): " - + String.format("%.2f", elapseTime) - + " second"); + StringBuilder stringBuilder = new StringBuilder("\n"); + stringBuilder + .append("Create schema cost ") + .append(String.format("%.2f", createSchemaFinishTime)) + .append(" second") + .append('\n'); + stringBuilder + .append("Test elapsed time (not include schema creation): ") + .append(String.format("%.2f", elapseTime)) + .append(" second") + .append('\n'); recorder.saveResultAsync( - "total", TotalResult.CREATE_SCHEMA_TIME.getName(), "" + createSchemaTime.get()); + "total", TotalResult.CREATE_SCHEMA_TIME.getName(), "" + createSchemaFinishTime); recorder.saveResultAsync("total", TotalResult.ELAPSED_TIME.getName(), "" + elapseTime); - System.out.println( - "----------------------------------------------------------Result Matrix----------------------------------------------------------"); + stringBuilder + .append( + "----------------------------------------------------------Result Matrix----------------------------------------------------------") + .append('\n'); StringBuffer format = new StringBuffer(); for (int i = 0; i < 6; i++) { format.append(RESULT_ITEM); } format.append("\n"); - System.out.printf( - format.toString(), - "Operation", - "okOperation", - "okPoint", - "failOperation", - "failPoint", - "throughput(point/s)"); + stringBuilder.append( + String.format( + format.toString(), + "Operation", + "okOperation", + "okPoint", + "failOperation", + "failPoint", + "throughput(point/s)")); for (Operation operation : operations) { String throughput = String.format("%.2f", okPointNumMap.get(operation) / elapseTime); - System.out.printf( - format.toString(), - operation.getName(), - okOperationNumMap.get(operation), - okPointNumMap.get(operation), - failOperationNumMap.get(operation), - failPointNumMap.get(operation), - throughput); + stringBuilder.append( + String.format( + format.toString(), + operation.getName(), + okOperationNumMap.get(operation), + okPointNumMap.get(operation), + failOperationNumMap.get(operation), + failPointNumMap.get(operation), + throughput)); recorder.saveResultAsync( operation.toString(), @@ -206,42 +223,56 @@ public void showMeasurements(List operations) { recorder.saveResultAsync( operation.toString(), TotalOperationResult.THROUGHPUT.getName(), throughput); } - System.out.println( - "---------------------------------------------------------------------------------------------------------------------------------"); - + stringBuilder + .append( + "---------------------------------------------------------------------------------------------------------------------------------") + .append('\n'); recorder.closeAsync(); + return stringBuilder.toString(); } /** Show Config of test */ - public void showConfigs() { - System.out.println("----------------------Main Configurations----------------------"); - System.out.println(config.getShowConfigProperties().toString()); - System.out.println("---------------------------------------------------------------"); + public String getConfigsString() { + StringBuilder stringBuilder = new StringBuilder("\n"); + stringBuilder + .append("----------------------Main Configurations----------------------") + .append('\n'); + stringBuilder.append(config.getShowConfigProperties().toString()).append('\n'); + stringBuilder + .append("---------------------------------------------------------------") + .append('\n'); + return stringBuilder.toString(); } /** Show metrics of test */ - public void showMetrics(List operations) { + public String getMetricsString(List operations) { PersistenceFactory persistenceFactory = new PersistenceFactory(); TestDataPersistence recorder = persistenceFactory.getPersistence(); - System.out.println( - "--------------------------------------------------------------------------Latency (ms) Matrix--------------------------------------------------------------------------"); - System.out.printf(RESULT_ITEM, "Operation"); + StringBuilder stringBuilder = new StringBuilder("\n"); + stringBuilder + .append( + "--------------------------------------------------------------------------Latency (ms) Matrix--------------------------------------------------------------------------") + .append('\n'); + stringBuilder.append(String.format(RESULT_ITEM, "Operation")); for (Metric metric : Metric.values()) { - System.out.printf(LATENCY_ITEM, metric.name); + stringBuilder.append(String.format(LATENCY_ITEM, metric.name)); } - System.out.println(); + stringBuilder.append('\n'); for (Operation operation : operations) { - System.out.printf(RESULT_ITEM, operation.getName()); + stringBuilder.append(String.format(RESULT_ITEM, operation.getName())); for (Metric metric : Metric.values()) { String metricResult = String.format("%.2f", metric.typeValueMap.get(operation)); - System.out.printf(LATENCY_ITEM, metricResult); + stringBuilder.append(String.format(LATENCY_ITEM, metricResult)); recorder.saveResultAsync(operation.toString(), metric.name, metricResult); } - System.out.println(); + stringBuilder.append('\n'); } - System.out.println( - "-----------------------------------------------------------------------------------------------------------------------------------------------------------------------"); + stringBuilder + .append( + "-----------------------------------------------------------------------------------------------------------------------------------------------------------------------") + .append('\n'); recorder.closeAsync(); + return stringBuilder.toString(); } /** output measurement to csv */ @@ -395,7 +426,7 @@ private void outputSchemaMetricsToCSV(File csv) { try { BufferedWriter bw = new BufferedWriter(new FileWriter(csv, true)); bw.newLine(); - bw.write(String.format("Schema cost(s),%.2f", createSchemaTime.get())); + bw.write(String.format("Schema cost(s),%.2f", createSchemaFinishTime)); bw.newLine(); bw.write( String.format("Test elapsed time (not include schema creation)(s),%.2f", elapseTime)); @@ -449,12 +480,12 @@ public void addFailOperationNum(Operation operation) { failOperationNumMap.put(operation, failOperationNumMap.get(operation) + 1); } - public double getCreateSchemaTime() { - return createSchemaTime.get(); + public double getCreateSchemaFinishTime() { + return createSchemaFinishTime; } - public void setCreateSchemaTime(double createSchemaTime) { - this.createSchemaTime.set(Math.max(this.createSchemaTime.get(), createSchemaTime)); + public void setCreateSchemaFinishTime(double createSchemaFinishTime) { + this.createSchemaFinishTime = createSchemaFinishTime; } public double getElapseTime() { diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java index 4f90e593b..cf4273f58 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Timer; import java.util.TimerTask; @@ -40,6 +41,8 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; public abstract class BaseMode { @@ -59,8 +62,9 @@ public abstract class BaseMode { protected CyclicBarrier dataBarrier = new CyclicBarrier(config.getCLIENT_NUMBER()); protected List dataClients = new ArrayList<>(); protected List schemaClients = new ArrayList<>(); - protected Measurement measurement = new Measurement(); - protected long start = 0; + protected Measurement baseModeMeasurement = new Measurement(); + protected long startTime = 0; + private Timer middleMeasureTimer = new Timer("ShowResultPeriodically"); protected abstract boolean preCheck(); @@ -80,7 +84,8 @@ public void run() { executorService.submit(client); } setTimeLimitTask(); - start = System.nanoTime(); + setMiddleMeasureTask(); + startTime = System.nanoTime(); executorService.shutdown(); try { // wait for all dataClients finish test @@ -90,6 +95,7 @@ public void run() { Thread.currentThread().interrupt(); } postCheck(); + middleMeasureTimer.cancel(); } private void setTimeLimitTask() { @@ -104,15 +110,37 @@ public void run() { dataClients.forEach(DataClient::stopClient); } }; - new Timer().schedule(stopAllDataClient, config.getTEST_MAX_TIME()); + middleMeasureTimer.schedule(stopAllDataClient, config.getTEST_MAX_TIME()); } } + private void setMiddleMeasureTask() { + TimerTask measure = + new TimerTask() { + @Override + public void run() { + List operations; + if (config.isIS_POINT_COMPARISON()) { + operations = Collections.singletonList(Operation.DEVICE_QUERY); + } else { + operations = Operation.getNormalOperation(); + } + middleMeasure( + baseModeMeasurement, + dataClients.stream().map(DataClient::getMeasurement), + startTime, + operations); + } + }; + middleMeasureTimer.schedule( + measure, TimeUnit.SECONDS.toMillis(1), config.getRESULT_PRINT_INTERVAL() * 1000L); + } + protected abstract void postCheck(); /** Clean up data */ - protected boolean cleanUpData(List dbConfigs, Measurement measurement) { - DBWrapper dbWrapper = new DBWrapper(dbConfigs, measurement); + protected boolean cleanUpData(List dbConfigs) { + DBWrapper dbWrapper = new DBWrapper(dbConfigs); try { dbWrapper.init(); try { @@ -135,19 +163,22 @@ protected boolean cleanUpData(List dbConfigs, Measurement measurement) } /** Register schema */ - protected boolean registerSchema(Measurement measurement) { + protected boolean registerSchema() { for (int i = 0; i < config.getCLIENT_NUMBER(); i++) { - SchemaClient schemaClient = new SchemaClient(i, measurement, schemaDownLatch, schemaBarrier); + SchemaClient schemaClient = new SchemaClient(i, schemaDownLatch, schemaBarrier); schemaClients.add(schemaClient); } for (SchemaClient schemaClient : schemaClients) { schemaExecutorService.submit(schemaClient); } - start = System.nanoTime(); + startTime = System.nanoTime(); schemaExecutorService.shutdown(); try { // wait for all dataClients finish test schemaDownLatch.await(); + schemaClients.stream() + .map(SchemaClient::getMeasurement) + .forEach(baseModeMeasurement::mergeCreateSchemaFinishTime); } catch (InterruptedException e) { LOGGER.error("Exception occurred during waiting for all threads finish.", e); Thread.currentThread().interrupt(); @@ -159,37 +190,64 @@ protected boolean registerSchema(Measurement measurement) { /** Save measure */ protected static void finalMeasure( Measurement measurement, - List threadsMeasurements, - long st, - List clients, + Stream allClientsMeasurement, + long startTime, + List operations) { + measure( + measurement, + allClientsMeasurement, + startTime, + operations, + "All dataClients finished. The final test result is: ", + true); + } + + protected static void middleMeasure( + Measurement measurement, + Stream allClientsMeasurement, + long startTime, List operations) { - long en = System.nanoTime(); - LOGGER.info("All dataClients finished."); + measure( + measurement, + allClientsMeasurement, + startTime, + operations, + "The test is in progress. The current test result is: ", + false); + } + + private static void measure( + Measurement measurement, + Stream allClientsMeasurement, + long startTime, + List operations, + String prefix, + boolean needPrintConf) { + measurement.setElapseTime((System.nanoTime() - startTime) / NANO_TO_SECOND); // sum up all the measurements and calculate statistics - measurement.setElapseTime((en - st) / NANO_TO_SECOND); - for (DataClient client : clients) { - threadsMeasurements.add(client.getMeasurement()); - } - for (Measurement m : threadsMeasurements) { - measurement.mergeMeasurement(m); - } + measurement.resetMeasurementMaps(); + allClientsMeasurement.forEach(measurement::mergeMeasurement); // output results - measurement.showConfigs(); + String showMeasurement = prefix; + if (needPrintConf) { + showMeasurement += measurement.getConfigsString(); + } if (config.isUSE_MEASUREMENT()) { // must call calculateMetrics() before using the Metrics try { measurement.calculateMetrics(operations); if (!operations.isEmpty()) { - measurement.showMeasurements(operations); - measurement.showMetrics(operations); + showMeasurement += measurement.getMeasurementsString(operations); + showMeasurement += measurement.getMetricsString(operations); } } catch (IllegalArgumentException e) { LOGGER.error( - "Failed to show metric, please check the relation between LOOP and OPERATION_PROPORTION"); - e.printStackTrace(); + "Failed to show metric, please check the relation between LOOP and OPERATION_PROPORTION", + e); return; } } + LOGGER.info(showMeasurement); if (config.isCSV_OUTPUT()) { measurement.outputCSV(); } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/TestWithDefaultPathMode.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/TestWithDefaultPathMode.java index 94592d0f6..3db50d6a3 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/TestWithDefaultPathMode.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/TestWithDefaultPathMode.java @@ -19,15 +19,16 @@ package cn.edu.tsinghua.iot.benchmark.mode; +import cn.edu.tsinghua.iot.benchmark.client.DataClient; import cn.edu.tsinghua.iot.benchmark.client.operation.Operation; import cn.edu.tsinghua.iot.benchmark.conf.Config; import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor; -import cn.edu.tsinghua.iot.benchmark.measurement.Measurement; import cn.edu.tsinghua.iot.benchmark.measurement.persistence.PersistenceFactory; import cn.edu.tsinghua.iot.benchmark.measurement.persistence.TestDataPersistence; import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; import java.util.ArrayList; +import java.util.Collections; import java.util.List; public class TestWithDefaultPathMode extends BaseMode { @@ -44,10 +45,10 @@ protected boolean preCheck() { if (config.isIS_DOUBLE_WRITE()) { dbConfigs.add(config.getANOTHER_DBConfig()); } - if (config.isIS_DELETE_DATA() && (!cleanUpData(dbConfigs, measurement))) { + if (config.isIS_DELETE_DATA() && (!cleanUpData(dbConfigs))) { return false; } - if (config.isCREATE_SCHEMA() && (!registerSchema(measurement))) { + if (config.isCREATE_SCHEMA() && (!registerSchema())) { return false; } return true; @@ -55,12 +56,16 @@ protected boolean preCheck() { @Override protected void postCheck() { - List operations = Operation.getNormalOperation(); + List operations; if (config.isIS_POINT_COMPARISON()) { - operations = new ArrayList<>(); - operations.add(Operation.DEVICE_QUERY); + operations = Collections.singletonList(Operation.DEVICE_QUERY); + } else { + operations = Operation.getNormalOperation(); } - List threadsMeasurements = new ArrayList<>(); - finalMeasure(measurement, threadsMeasurements, start, dataClients, operations); + finalMeasure( + baseModeMeasurement, + dataClients.stream().map(DataClient::getMeasurement), + startTime, + operations); } } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/VerificationQueryMode.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/VerificationQueryMode.java index 7ea33fb1b..eeac8a59a 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/VerificationQueryMode.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/VerificationQueryMode.java @@ -19,12 +19,10 @@ package cn.edu.tsinghua.iot.benchmark.mode; +import cn.edu.tsinghua.iot.benchmark.client.DataClient; import cn.edu.tsinghua.iot.benchmark.client.operation.Operation; -import cn.edu.tsinghua.iot.benchmark.measurement.Measurement; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; public class VerificationQueryMode extends BaseMode { @@ -35,12 +33,10 @@ protected boolean preCheck() { @Override protected void postCheck() { - List threadsMeasurements = new ArrayList<>(); finalMeasure( - measurement, - threadsMeasurements, - start, - dataClients, + baseModeMeasurement, + dataClients.stream().map(DataClient::getMeasurement), + startTime, Collections.singletonList(Operation.VERIFICATION_QUERY)); } } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/VerificationWriteMode.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/VerificationWriteMode.java index 690a09157..835266548 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/VerificationWriteMode.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/VerificationWriteMode.java @@ -19,10 +19,10 @@ package cn.edu.tsinghua.iot.benchmark.mode; +import cn.edu.tsinghua.iot.benchmark.client.DataClient; import cn.edu.tsinghua.iot.benchmark.client.operation.Operation; import cn.edu.tsinghua.iot.benchmark.conf.Config; import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor; -import cn.edu.tsinghua.iot.benchmark.measurement.Measurement; import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; import java.util.ArrayList; @@ -39,10 +39,10 @@ protected boolean preCheck() { if (config.isIS_DOUBLE_WRITE()) { dbConfigs.add(config.getANOTHER_DBConfig()); } - if (config.isIS_DELETE_DATA() && (!cleanUpData(dbConfigs, measurement))) { + if (config.isIS_DELETE_DATA() && (!cleanUpData(dbConfigs))) { return false; } - if (config.isCREATE_SCHEMA() && (!registerSchema(measurement))) { + if (config.isCREATE_SCHEMA() && (!registerSchema())) { return false; } return true; @@ -50,12 +50,10 @@ protected boolean preCheck() { @Override protected void postCheck() { - List threadsMeasurements = new ArrayList<>(); finalMeasure( - measurement, - threadsMeasurements, - start, - dataClients, - new ArrayList<>(Collections.singletonList(Operation.INGESTION))); + baseModeMeasurement, + dataClients.stream().map(DataClient::getMeasurement), + startTime, + Collections.singletonList(Operation.INGESTION)); } } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java index 3bb7f4ab6..6c6b401d6 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/tsdb/DBWrapper.java @@ -60,11 +60,11 @@ public class DBWrapper implements IDatabase { private static final String ERROR_LOG = "Failed to do {} because unexpected exception: "; private List databases = new ArrayList<>(); - private Measurement measurement; + private final Measurement measurement = new Measurement(); private TestDataPersistence recorder; /** Use DBFactory to get database */ - public DBWrapper(List dbConfigs, Measurement measurement) { + public DBWrapper(List dbConfigs) { DBFactory dbFactory = new DBFactory(); for (DBConfig dbConfig : dbConfigs) { try { @@ -77,11 +77,14 @@ public DBWrapper(List dbConfigs, Measurement measurement) { LOGGER.error("Failed to get database because", e); } } - this.measurement = measurement; PersistenceFactory persistenceFactory = new PersistenceFactory(); recorder = persistenceFactory.getPersistence(); } + public Measurement getMeasurement() { + return measurement; + } + @Override public Status insertOneBatch(IBatch batch) throws DBConnectException { Status status = null; @@ -543,9 +546,9 @@ public Double registerSchema(List schemaList) throws TsdbException } createSchemaTimeInSecond = Math.max(createSchemaTimeInSecond, registerTime); } - measurement.setCreateSchemaTime(createSchemaTimeInSecond); + measurement.setCreateSchemaFinishTime(createSchemaTimeInSecond); } catch (Exception e) { - measurement.setCreateSchemaTime(0.0); + measurement.setCreateSchemaFinishTime(0.0); throw new TsdbException(e); } return createSchemaTimeInSecond;