Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make thread name readable && insertTablet synchronize #373

Merged
merged 4 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig;
import cn.edu.tsinghua.iot.benchmark.tsdb.DBWrapper;
import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException;
import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory;
import cn.edu.tsinghua.iot.benchmark.workload.DataWorkLoad;
import cn.edu.tsinghua.iot.benchmark.workload.QueryWorkLoad;
import cn.edu.tsinghua.iot.benchmark.workload.interfaces.IDataWorkLoad;
Expand All @@ -57,7 +58,7 @@ public abstract class DataClient implements Runnable {
/** QueryWorkload */
protected final IQueryWorkLoad queryWorkLoad;
/** Log related */
protected final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
protected final ScheduledExecutorService service;
/** Tested DataBase */
protected DBWrapper dbWrapper = null;
/** Related Schema */
Expand Down Expand Up @@ -88,6 +89,9 @@ public DataClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier)
this.deviceSchemas = MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId);
this.deviceSchemasSize = deviceSchemas.size();
this.measurement = new Measurement();
this.service =
Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("ShowWorkProgress-" + String.valueOf(clientThreadId)));
initDBWrappers();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import cn.edu.tsinghua.iot.benchmark.conf.Config;
import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor;
import cn.edu.tsinghua.iot.benchmark.measurement.enums.SystemMetrics;
import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,7 +34,8 @@ public abstract class TestDataPersistence {
protected static final Logger LOGGER = LoggerFactory.getLogger(TestDataPersistence.class);
protected static final Config config = ConfigDescriptor.getInstance().getConfig();
protected ExecutorService service =
Executors.newFixedThreadPool(config.getTEST_DATA_MAX_CONNECTION());
Executors.newFixedThreadPool(
config.getTEST_DATA_MAX_CONNECTION(), new NamedThreadFactory("ResultPersistence"));

/**
* Store system resources metrics data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig;
import cn.edu.tsinghua.iot.benchmark.tsdb.DBWrapper;
import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException;
import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,9 +48,10 @@ public abstract class BaseMode {
private static final double NANO_TO_SECOND = 1000000000.0d;

protected ExecutorService schemaExecutorService =
Executors.newFixedThreadPool(config.getCLIENT_NUMBER());
Executors.newFixedThreadPool(
config.getCLIENT_NUMBER(), new NamedThreadFactory("SchemaClient"));
protected ExecutorService executorService =
Executors.newFixedThreadPool(config.getCLIENT_NUMBER());
Executors.newFixedThreadPool(config.getCLIENT_NUMBER(), new NamedThreadFactory("DataClient"));
protected CountDownLatch schemaDownLatch = new CountDownLatch(config.getCLIENT_NUMBER());
protected CyclicBarrier schemaBarrier = new CyclicBarrier(config.getCLIENT_NUMBER());
protected CountDownLatch dataDownLatch = new CountDownLatch(config.getCLIENT_NUMBER());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package cn.edu.tsinghua.iot.benchmark.utils;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {
private final String poolName;
private final AtomicInteger count = new AtomicInteger(1);
private final ThreadFactory threadFactory;

public NamedThreadFactory(String name) {
this.poolName = name;
this.threadFactory = Executors.defaultThreadFactory();
}

private String getThreadName() {
return poolName + "-thread-" + String.valueOf(count.getAndIncrement());
}

@Override
public Thread newThread(Runnable r) {
Thread thread = threadFactory.newThread(r);
thread.setName(getThreadName());
return thread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig;
import cn.edu.tsinghua.iot.benchmark.tsdb.IDatabase;
import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException;
import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory;
import cn.edu.tsinghua.iot.benchmark.utils.TimeUtils;
import cn.edu.tsinghua.iot.benchmark.workload.query.impl.AggRangeQuery;
import cn.edu.tsinghua.iot.benchmark.workload.query.impl.AggRangeValueQuery;
Expand Down Expand Up @@ -115,7 +116,8 @@ public void init() throws TsdbException {
try {
ioTDBConnection = new SingleNodeJDBCConnection(dbConfig);
ioTDBConnection.init();
this.service = Executors.newSingleThreadExecutor();
this.service =
Executors.newSingleThreadExecutor(new NamedThreadFactory("DataClientExecuteJob"));
} catch (Exception e) {
throw new TsdbException(e);
}
Expand Down
Loading