From 02e8b02b552419135ad6ac37670905553ff9d80f Mon Sep 17 00:00:00 2001 From: liyuheng Date: Thu, 21 Sep 2023 12:14:46 +0800 Subject: [PATCH 1/4] use NamedThreadFactory everywhere --- .../iot/benchmark/client/DataClient.java | 5 +++- .../persistence/TestDataPersistence.java | 3 ++- .../tsinghua/iot/benchmark/mode/BaseMode.java | 5 ++-- .../benchmark/utils/NamedThreadFactory.java | 27 +++++++++++++++++++ .../iot/benchmark/iotdb110/IoTDB.java | 3 ++- 5 files changed, 38 insertions(+), 5 deletions(-) create mode 100644 core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/NamedThreadFactory.java diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java index 6e09945a3..0de680f04 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java @@ -32,6 +32,7 @@ import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; import cn.edu.tsinghua.iot.benchmark.tsdb.DBWrapper; import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory; import cn.edu.tsinghua.iot.benchmark.workload.DataWorkLoad; import cn.edu.tsinghua.iot.benchmark.workload.QueryWorkLoad; import cn.edu.tsinghua.iot.benchmark.workload.interfaces.IDataWorkLoad; @@ -43,6 +44,7 @@ import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public abstract class DataClient implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(DataClient.class); @@ -57,7 +59,7 @@ public abstract class DataClient implements Runnable { /** QueryWorkload */ protected final IQueryWorkLoad queryWorkLoad; /** Log related */ - protected final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + protected final ScheduledExecutorService service; /** Tested DataBase */ protected DBWrapper dbWrapper = null; /** Related Schema */ @@ -88,6 +90,7 @@ public DataClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) this.deviceSchemas = MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId); this.deviceSchemasSize = deviceSchemas.size(); this.measurement = new Measurement(); + this.service = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ShowWorkProgress-"+String.valueOf(clientThreadId))); initDBWrappers(); } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/persistence/TestDataPersistence.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/persistence/TestDataPersistence.java index 3c07aa957..ff804b716 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/persistence/TestDataPersistence.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/persistence/TestDataPersistence.java @@ -22,6 +22,7 @@ import cn.edu.tsinghua.iot.benchmark.conf.Config; import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor; import cn.edu.tsinghua.iot.benchmark.measurement.enums.SystemMetrics; +import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +34,7 @@ public abstract class TestDataPersistence { protected static final Logger LOGGER = LoggerFactory.getLogger(TestDataPersistence.class); protected static final Config config = ConfigDescriptor.getInstance().getConfig(); protected ExecutorService service = - Executors.newFixedThreadPool(config.getTEST_DATA_MAX_CONNECTION()); + Executors.newFixedThreadPool(config.getTEST_DATA_MAX_CONNECTION(), new NamedThreadFactory("ResultPersistence")); /** * Store system resources metrics data diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java index 78b2b8fb0..1938d730b 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java @@ -29,6 +29,7 @@ import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; import cn.edu.tsinghua.iot.benchmark.tsdb.DBWrapper; import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,9 +48,9 @@ public abstract class BaseMode { private static final double NANO_TO_SECOND = 1000000000.0d; protected ExecutorService schemaExecutorService = - Executors.newFixedThreadPool(config.getCLIENT_NUMBER()); + Executors.newFixedThreadPool(config.getCLIENT_NUMBER(), new NamedThreadFactory("SchemaClient")); protected ExecutorService executorService = - Executors.newFixedThreadPool(config.getCLIENT_NUMBER()); + Executors.newFixedThreadPool(config.getCLIENT_NUMBER(), new NamedThreadFactory("DataClient")); protected CountDownLatch schemaDownLatch = new CountDownLatch(config.getCLIENT_NUMBER()); protected CyclicBarrier schemaBarrier = new CyclicBarrier(config.getCLIENT_NUMBER()); protected CountDownLatch dataDownLatch = new CountDownLatch(config.getCLIENT_NUMBER()); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/NamedThreadFactory.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/NamedThreadFactory.java new file mode 100644 index 000000000..96175c9cf --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/NamedThreadFactory.java @@ -0,0 +1,27 @@ +package cn.edu.tsinghua.iot.benchmark.utils; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class NamedThreadFactory implements ThreadFactory { + private final String poolName; + private final AtomicInteger count = new AtomicInteger(1); + private final ThreadFactory threadFactory; + + public NamedThreadFactory(String name) { + this.poolName = name; + this.threadFactory = Executors.defaultThreadFactory(); + } + + private String getThreadName() { + return poolName + "-thread-" + String.valueOf(count.getAndIncrement()); + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = threadFactory.newThread(r); + thread.setName(getThreadName()); + return thread; + } +} diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java index f64ae606e..6bbf96246 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java @@ -19,6 +19,7 @@ package cn.edu.tsinghua.iot.benchmark.iotdb110; +import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory; import org.apache.iotdb.isession.template.Template; import org.apache.iotdb.isession.util.Version; import org.apache.iotdb.rpc.IoTDBConnectionException; @@ -115,7 +116,7 @@ public void init() throws TsdbException { try { ioTDBConnection = new SingleNodeJDBCConnection(dbConfig); ioTDBConnection.init(); - this.service = Executors.newSingleThreadExecutor(); + this.service = Executors.newSingleThreadExecutor(new NamedThreadFactory("DataClientExecuteJob")); } catch (Exception e) { throw new TsdbException(e); } From 24817a347f1b18497bc5a7b8c263e7545eb8a546 Mon Sep 17 00:00:00 2001 From: liyuheng Date: Thu, 21 Sep 2023 12:16:43 +0800 Subject: [PATCH 2/4] insertByTablet execute synchronize --- .../benchmark/iotdb110/IoTDBSessionBase.java | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java index 01c474ffd..a4a8f6d47 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java @@ -71,20 +71,16 @@ public IoTDBSessionBase(DBConfig dbConfig) { public Status insertOneBatchByTablet(IBatch batch) { Tablet tablet = genTablet(batch); - future = - service.submit( - () -> { - try { - if (config.isVECTOR()) { - sessionWrapper.insertAlignedTablet(tablet); - } else { - sessionWrapper.insertTablet(tablet); - } - } catch (IoTDBConnectionException | StatementExecutionException e) { - LOGGER.error("insert tablet failed", e); - } - }); - return waitFuture(); + try { + if (config.isVECTOR()) { + sessionWrapper.insertAlignedTablet(tablet); + } else { + sessionWrapper.insertTablet(tablet); + } + return new Status(true); + } catch (IoTDBConnectionException | StatementExecutionException e) { + return new Status(false, 0, e, e.toString()); + } } public Status insertOneBatchByRecord(IBatch batch) { From de052f3ed909200c2e4bf6150ad8045de6fb70b7 Mon Sep 17 00:00:00 2001 From: liyuheng Date: Thu, 21 Sep 2023 12:23:56 +0800 Subject: [PATCH 3/4] spotless --- .../iot/benchmark/client/DataClient.java | 5 +-- .../persistence/TestDataPersistence.java | 3 +- .../tsinghua/iot/benchmark/mode/BaseMode.java | 3 +- .../benchmark/utils/NamedThreadFactory.java | 32 +++++++++---------- .../iot/benchmark/iotdb110/IoTDB.java | 5 +-- 5 files changed, 26 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java index 0de680f04..73f966994 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java @@ -44,7 +44,6 @@ import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; public abstract class DataClient implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(DataClient.class); @@ -90,7 +89,9 @@ public DataClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) this.deviceSchemas = MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId); this.deviceSchemasSize = deviceSchemas.size(); this.measurement = new Measurement(); - this.service = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ShowWorkProgress-"+String.valueOf(clientThreadId))); + this.service = + Executors.newSingleThreadScheduledExecutor( + new NamedThreadFactory("ShowWorkProgress-" + String.valueOf(clientThreadId))); initDBWrappers(); } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/persistence/TestDataPersistence.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/persistence/TestDataPersistence.java index ff804b716..bd8dc4f47 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/persistence/TestDataPersistence.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/measurement/persistence/TestDataPersistence.java @@ -34,7 +34,8 @@ public abstract class TestDataPersistence { protected static final Logger LOGGER = LoggerFactory.getLogger(TestDataPersistence.class); protected static final Config config = ConfigDescriptor.getInstance().getConfig(); protected ExecutorService service = - Executors.newFixedThreadPool(config.getTEST_DATA_MAX_CONNECTION(), new NamedThreadFactory("ResultPersistence")); + Executors.newFixedThreadPool( + config.getTEST_DATA_MAX_CONNECTION(), new NamedThreadFactory("ResultPersistence")); /** * Store system resources metrics data diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java index 1938d730b..58d7b785d 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java @@ -48,7 +48,8 @@ public abstract class BaseMode { private static final double NANO_TO_SECOND = 1000000000.0d; protected ExecutorService schemaExecutorService = - Executors.newFixedThreadPool(config.getCLIENT_NUMBER(), new NamedThreadFactory("SchemaClient")); + Executors.newFixedThreadPool( + config.getCLIENT_NUMBER(), new NamedThreadFactory("SchemaClient")); protected ExecutorService executorService = Executors.newFixedThreadPool(config.getCLIENT_NUMBER(), new NamedThreadFactory("DataClient")); protected CountDownLatch schemaDownLatch = new CountDownLatch(config.getCLIENT_NUMBER()); diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/NamedThreadFactory.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/NamedThreadFactory.java index 96175c9cf..ed9c1587a 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/NamedThreadFactory.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/utils/NamedThreadFactory.java @@ -5,23 +5,23 @@ import java.util.concurrent.atomic.AtomicInteger; public class NamedThreadFactory implements ThreadFactory { - private final String poolName; - private final AtomicInteger count = new AtomicInteger(1); - private final ThreadFactory threadFactory; + private final String poolName; + private final AtomicInteger count = new AtomicInteger(1); + private final ThreadFactory threadFactory; - public NamedThreadFactory(String name) { - this.poolName = name; - this.threadFactory = Executors.defaultThreadFactory(); - } + public NamedThreadFactory(String name) { + this.poolName = name; + this.threadFactory = Executors.defaultThreadFactory(); + } - private String getThreadName() { - return poolName + "-thread-" + String.valueOf(count.getAndIncrement()); - } + private String getThreadName() { + return poolName + "-thread-" + String.valueOf(count.getAndIncrement()); + } - @Override - public Thread newThread(Runnable r) { - Thread thread = threadFactory.newThread(r); - thread.setName(getThreadName()); - return thread; - } + @Override + public Thread newThread(Runnable r) { + Thread thread = threadFactory.newThread(r); + thread.setName(getThreadName()); + return thread; + } } diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java index 6bbf96246..f31dfd584 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDB.java @@ -19,7 +19,6 @@ package cn.edu.tsinghua.iot.benchmark.iotdb110; -import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory; import org.apache.iotdb.isession.template.Template; import org.apache.iotdb.isession.util.Version; import org.apache.iotdb.rpc.IoTDBConnectionException; @@ -44,6 +43,7 @@ import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; import cn.edu.tsinghua.iot.benchmark.tsdb.IDatabase; import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; +import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory; import cn.edu.tsinghua.iot.benchmark.utils.TimeUtils; import cn.edu.tsinghua.iot.benchmark.workload.query.impl.AggRangeQuery; import cn.edu.tsinghua.iot.benchmark.workload.query.impl.AggRangeValueQuery; @@ -116,7 +116,8 @@ public void init() throws TsdbException { try { ioTDBConnection = new SingleNodeJDBCConnection(dbConfig); ioTDBConnection.init(); - this.service = Executors.newSingleThreadExecutor(new NamedThreadFactory("DataClientExecuteJob")); + this.service = + Executors.newSingleThreadExecutor(new NamedThreadFactory("DataClientExecuteJob")); } catch (Exception e) { throw new TsdbException(e); } From 4a98b5ddb6804b7c21e983e142c0f414d76c4070 Mon Sep 17 00:00:00 2001 From: liyuheng Date: Thu, 21 Sep 2023 14:24:03 +0800 Subject: [PATCH 4/4] Revert "insertByTablet execute synchronize" This reverts commit 24817a347f1b18497bc5a7b8c263e7545eb8a546. --- .../benchmark/iotdb110/IoTDBSessionBase.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java index a4a8f6d47..01c474ffd 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java @@ -71,16 +71,20 @@ public IoTDBSessionBase(DBConfig dbConfig) { public Status insertOneBatchByTablet(IBatch batch) { Tablet tablet = genTablet(batch); - try { - if (config.isVECTOR()) { - sessionWrapper.insertAlignedTablet(tablet); - } else { - sessionWrapper.insertTablet(tablet); - } - return new Status(true); - } catch (IoTDBConnectionException | StatementExecutionException e) { - return new Status(false, 0, e, e.toString()); - } + future = + service.submit( + () -> { + try { + if (config.isVECTOR()) { + sessionWrapper.insertAlignedTablet(tablet); + } else { + sessionWrapper.insertTablet(tablet); + } + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("insert tablet failed", e); + } + }); + return waitFuture(); } public Status insertOneBatchByRecord(IBatch batch) {