Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
liyuheng55555 committed Mar 15, 2024
1 parent 4ea9f5b commit 1c0094b
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 119 deletions.
6 changes: 4 additions & 2 deletions configuration/conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@

############## 数据:设备、传感器、客户端 ##################
# 设备总数
# DEVICE_NUMBER=6000
DEVICE_NUMBER=60

# 实际写入设备比例,(0, 1]
# REAL_INSERT_RATE=1.0
Expand Down Expand Up @@ -447,5 +447,7 @@
################# 输出结果:日志参数 ######################
# 是否使用静默模式,静默模式会关闭部分日志输出
# IS_QUIET_MODE=true
# 测试过程日志的输出间隔,单位为秒
# 测试过程中测试进度日志的输出间隔,单位为秒
# LOG_PRINT_INTERVAL=5
# 测试过程中当前测试结果日志的输出间隔,单位为秒
# RESULT_PRINT_INTERVAL=3600
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void run() {
}

public Measurement getMeasurement() {
return measurement;
return dbWrapper.getMeasurement();
}

/** Do test, Notice please use `isStop` parameters to control */
Expand All @@ -174,7 +174,7 @@ protected void initDBWrappers() {
if (config.isIS_DOUBLE_WRITE()) {
dbConfigs.add(config.getANOTHER_DBConfig());
}
dbWrapper = new DBWrapper(dbConfigs, measurement);
dbWrapper = new DBWrapper(dbConfigs);
}

/** Stop client */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public class SchemaClient implements Runnable {
protected final List<DeviceSchema> deviceSchemas;
/** Related Schema Size */
protected final int deviceSchemasSize;
/** Measurement */
protected Measurement measurement;

/** Control the end of client */
private final CountDownLatch countDownLatch;
Expand All @@ -58,14 +56,12 @@ public class SchemaClient implements Runnable {
/** result of register */
private Boolean result = false;

public SchemaClient(
int id, Measurement measurement, CountDownLatch countDownLatch, CyclicBarrier barrier) {
public SchemaClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) {
this.countDownLatch = countDownLatch;
this.barrier = barrier;
this.clientThreadId = id;
this.deviceSchemas = MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId);
this.deviceSchemasSize = deviceSchemas.size();
this.measurement = measurement;
initDBWrappers();
}

Expand All @@ -76,7 +72,7 @@ private void initDBWrappers() {
if (config.isIS_DOUBLE_WRITE()) {
dbConfigs.add(config.getANOTHER_DBConfig());
}
dbWrapper = new DBWrapper(dbConfigs, measurement);
dbWrapper = new DBWrapper(dbConfigs);
}

@Override
Expand Down Expand Up @@ -113,7 +109,7 @@ public void run() {
}

public Measurement getMeasurement() {
return measurement;
return dbWrapper.getMeasurement();
}

public Boolean getResult() {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ public class Config {
/** Print test progress log interval in second */
private int LOG_PRINT_INTERVAL = 5;

private int RESULT_PRINT_INTERVAL = 3600;

// 输出:数据库配置,当前支持IoTDB和MySQL
/** The Ip of database */
private String TEST_DATA_STORE_IP = "127.0.0.1";
Expand Down Expand Up @@ -1320,6 +1322,14 @@ public void setLOG_PRINT_INTERVAL(int LOG_PRINT_INTERVAL) {
this.LOG_PRINT_INTERVAL = LOG_PRINT_INTERVAL;
}

public int getRESULT_PRINT_INTERVAL() {
return RESULT_PRINT_INTERVAL;
}

public void setRESULT_PRINT_INTERVAL(int RESULT_PRINT_INTERVAL) {
this.RESULT_PRINT_INTERVAL = RESULT_PRINT_INTERVAL;
}

public String getTEST_DATA_STORE_IP() {
return TEST_DATA_STORE_IP;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ private void loadProps() {
config.setLOG_PRINT_INTERVAL(
Integer.parseInt(
properties.getProperty("LOG_PRINT_INTERVAL", config.getLOG_PRINT_INTERVAL() + "")));
config.setRESULT_PRINT_INTERVAL(
Integer.parseInt(
properties.getProperty(
"RESULT_PRINT_INTERVAL", config.getRESULT_PRINT_INTERVAL() + "")));

config.setTEST_DATA_STORE_IP(
properties.getProperty("TEST_DATA_STORE_IP", config.getTEST_DATA_STORE_IP()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.clearspring.analytics.stream.quantile.TDigest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.weakref.jmx.internal.guava.util.concurrent.AtomicDouble;

import java.io.BufferedWriter;
import java.io.File;
Expand All @@ -48,7 +47,7 @@ public class Measurement {
new EnumMap<>(Operation.class);
private static final Map<Operation, Double> operationLatencySumAllClient =
new EnumMap<>(Operation.class);
private final AtomicDouble createSchemaTime = new AtomicDouble(0.0);
private double createSchemaFinishTime = 0;
private double elapseTime;
private final Map<Operation, Double> operationLatencySumThisClient;
private final Map<Operation, Long> okOperationNumMap;
Expand All @@ -74,6 +73,10 @@ public Measurement() {
okPointNumMap = new EnumMap<>(Operation.class);
failPointNumMap = new EnumMap<>(Operation.class);
operationLatencySumThisClient = new EnumMap<>(Operation.class);
resetMeasurementMaps();
}

public void resetMeasurementMaps() {
for (Operation operation : Operation.values()) {
okOperationNumMap.put(operation, 0L);
failOperationNumMap.put(operation, 0L);
Expand All @@ -83,6 +86,12 @@ public Measurement() {
}
}

public void mergeCreateSchemaFinishTime(Measurement m) {
if (this.createSchemaFinishTime < m.getCreateSchemaFinishTime()) {
this.createSchemaFinishTime = m.getCreateSchemaFinishTime();
}
}

/**
* Users need to call calculateMetrics() after calling mergeMeasurement() to update metrics.
*
Expand Down Expand Up @@ -147,45 +156,54 @@ public void calculateMetrics(List<Operation> operations) {
}

/** Show measurements and record according to TEST_DATA_PERSISTENCE */
public void showMeasurements(List<Operation> operations) {
public String getMeasurementsString(List<Operation> operations) {
PersistenceFactory persistenceFactory = new PersistenceFactory();
TestDataPersistence recorder = persistenceFactory.getPersistence();
System.out.println(Thread.currentThread().getName() + " measurements:");
System.out.println(
"Create schema cost " + String.format("%.2f", createSchemaTime.get()) + " second");
System.out.println(
"Test elapsed time (not include schema creation): "
+ String.format("%.2f", elapseTime)
+ " second");
StringBuilder stringBuilder = new StringBuilder("\n");
stringBuilder.append(Thread.currentThread().getName()).append(" measurements:").append('\n');
stringBuilder
.append("Create schema cost ")
.append(String.format("%.2f", createSchemaFinishTime))
.append(" second")
.append('\n');
stringBuilder
.append("Test elapsed time (not include schema creation): ")
.append(String.format("%.2f", elapseTime))
.append(" second")
.append('\n');
recorder.saveResultAsync(
"total", TotalResult.CREATE_SCHEMA_TIME.getName(), "" + createSchemaTime.get());
"total", TotalResult.CREATE_SCHEMA_TIME.getName(), "" + createSchemaFinishTime);
recorder.saveResultAsync("total", TotalResult.ELAPSED_TIME.getName(), "" + elapseTime);

System.out.println(
"----------------------------------------------------------Result Matrix----------------------------------------------------------");
stringBuilder
.append(
"----------------------------------------------------------Result Matrix----------------------------------------------------------")
.append('\n');
StringBuffer format = new StringBuffer();
for (int i = 0; i < 6; i++) {
format.append(RESULT_ITEM);
}
format.append("\n");
System.out.printf(
format.toString(),
"Operation",
"okOperation",
"okPoint",
"failOperation",
"failPoint",
"throughput(point/s)");
stringBuilder.append(
String.format(
format.toString(),
"Operation",
"okOperation",
"okPoint",
"failOperation",
"failPoint",
"throughput(point/s)"));
for (Operation operation : operations) {
String throughput = String.format("%.2f", okPointNumMap.get(operation) / elapseTime);
System.out.printf(
format.toString(),
operation.getName(),
okOperationNumMap.get(operation),
okPointNumMap.get(operation),
failOperationNumMap.get(operation),
failPointNumMap.get(operation),
throughput);
stringBuilder.append(
String.format(
format.toString(),
operation.getName(),
okOperationNumMap.get(operation),
okPointNumMap.get(operation),
failOperationNumMap.get(operation),
failPointNumMap.get(operation),
throughput));

recorder.saveResultAsync(
operation.toString(),
Expand All @@ -206,42 +224,56 @@ public void showMeasurements(List<Operation> operations) {
recorder.saveResultAsync(
operation.toString(), TotalOperationResult.THROUGHPUT.getName(), throughput);
}
System.out.println(
"---------------------------------------------------------------------------------------------------------------------------------");

stringBuilder
.append(
"---------------------------------------------------------------------------------------------------------------------------------")
.append('\n');
recorder.closeAsync();
return stringBuilder.toString();
}

/** Show Config of test */
public void showConfigs() {
System.out.println("----------------------Main Configurations----------------------");
System.out.println(config.getShowConfigProperties().toString());
System.out.println("---------------------------------------------------------------");
public String getConfigsString() {
StringBuilder stringBuilder = new StringBuilder("\n");
stringBuilder
.append("----------------------Main Configurations----------------------")
.append('\n');
stringBuilder.append(config.getShowConfigProperties().toString()).append('\n');
stringBuilder
.append("---------------------------------------------------------------")
.append('\n');
return stringBuilder.toString();
}

/** Show metrics of test */
public void showMetrics(List<Operation> operations) {
public String getMetricsString(List<Operation> operations) {
PersistenceFactory persistenceFactory = new PersistenceFactory();
TestDataPersistence recorder = persistenceFactory.getPersistence();
System.out.println(
"--------------------------------------------------------------------------Latency (ms) Matrix--------------------------------------------------------------------------");
System.out.printf(RESULT_ITEM, "Operation");
StringBuilder stringBuilder = new StringBuilder("\n");
stringBuilder
.append(
"--------------------------------------------------------------------------Latency (ms) Matrix--------------------------------------------------------------------------")
.append('\n');
stringBuilder.append(String.format(RESULT_ITEM, "Operation"));
for (Metric metric : Metric.values()) {
System.out.printf(LATENCY_ITEM, metric.name);
stringBuilder.append(String.format(LATENCY_ITEM, metric.name));
}
System.out.println();
stringBuilder.append('\n');
for (Operation operation : operations) {
System.out.printf(RESULT_ITEM, operation.getName());
stringBuilder.append(String.format(RESULT_ITEM, operation.getName()));
for (Metric metric : Metric.values()) {
String metricResult = String.format("%.2f", metric.typeValueMap.get(operation));
System.out.printf(LATENCY_ITEM, metricResult);
stringBuilder.append(String.format(LATENCY_ITEM, metricResult));
recorder.saveResultAsync(operation.toString(), metric.name, metricResult);
}
System.out.println();
stringBuilder.append('\n');
}
System.out.println(
"-----------------------------------------------------------------------------------------------------------------------------------------------------------------------");
stringBuilder
.append(
"-----------------------------------------------------------------------------------------------------------------------------------------------------------------------")
.append('\n');
recorder.closeAsync();
return stringBuilder.toString();
}

/** output measurement to csv */
Expand Down Expand Up @@ -395,7 +427,7 @@ private void outputSchemaMetricsToCSV(File csv) {
try {
BufferedWriter bw = new BufferedWriter(new FileWriter(csv, true));
bw.newLine();
bw.write(String.format("Schema cost(s),%.2f", createSchemaTime.get()));
bw.write(String.format("Schema cost(s),%.2f", createSchemaFinishTime));
bw.newLine();
bw.write(
String.format("Test elapsed time (not include schema creation)(s),%.2f", elapseTime));
Expand Down Expand Up @@ -449,12 +481,12 @@ public void addFailOperationNum(Operation operation) {
failOperationNumMap.put(operation, failOperationNumMap.get(operation) + 1);
}

public double getCreateSchemaTime() {
return createSchemaTime.get();
public double getCreateSchemaFinishTime() {
return createSchemaFinishTime;
}

public void setCreateSchemaTime(double createSchemaTime) {
this.createSchemaTime.set(Math.max(this.createSchemaTime.get(), createSchemaTime));
public void setCreateSchemaFinishTime(double createSchemaFinishTime) {
this.createSchemaFinishTime = createSchemaFinishTime;
}

public double getElapseTime() {
Expand Down
Loading

0 comments on commit 1c0094b

Please sign in to comment.