Skip to content

Commit

Permalink
Support "register schema only" by set LOOP=0 (#398)
Browse files Browse the repository at this point in the history
* fix code smell in App.java

* fix some code smell in mode

* delete some unuseful para in workLoad to fix one oom error while device number is too big

* make create template more efficient

* spotless

* fix by reviewed

* when loop=0 bm can run

* spotless

* fixed by review
  • Loading branch information
l2280212 authored Jan 15, 2024
1 parent 3f83a27 commit e9e0120
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 74 deletions.
10 changes: 3 additions & 7 deletions core/src/main/java/cn/edu/tsinghua/iot/benchmark/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@
import java.sql.SQLException;

public class App {
private static Logger LOGGER = LoggerFactory.getLogger(Config.class);
private static final Logger LOGGER = LoggerFactory.getLogger(App.class);

public static void main(String[] args) throws SQLException {
long initialHeapSize = Runtime.getRuntime().totalMemory();
long maxHeapSize = Runtime.getRuntime().maxMemory();
LOGGER.info(
"Initial Heap Size: "
+ initialHeapSize
+ "bytes, Max Heap Size: "
+ maxHeapSize
+ "bytes.");
"Initial Heap Size: {} bytes, Max Heap Size: {} bytes. ", initialHeapSize, maxHeapSize);

if (args == null || args.length == 0) {
args = new String[] {"-cf", "configuration/conf"};
Expand All @@ -54,7 +50,7 @@ public static void main(String[] args) throws SQLException {
}
Runtime.getRuntime().addShutdownHook(new CSVShutdownHook());
Config config = ConfigDescriptor.getInstance().getConfig();
BaseMode baseMode = null;
BaseMode baseMode;
switch (config.getBENCHMARK_WORK_MODE()) {
case TEST_WITH_DEFAULT_PATH:
baseMode = new TestWithDefaultPathMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ private boolean checkOperationProportion() {
minOps++;
}
}
if (minOps > config.getLOOP()) {
if (config.getLOOP() != 0 && minOps > config.getLOOP()) {
LOGGER.error("Loop is too small that can't meet the need of OPERATION_PROPORTION");
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ protected static void finalMeasure(
// must call calculateMetrics() before using the Metrics
try {
measurement.calculateMetrics(operations);
if (operations.size() != 0) {
if (!operations.isEmpty()) {
measurement.showMeasurements(operations);
measurement.showMetrics(operations);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ protected boolean preCheck() {

@Override
protected void postCheck() {
LOGGER.info("Data Location: " + config.getFILE_PATH());
LOGGER.info("Schema Location: " + FileUtils.union(config.getFILE_PATH(), "schema.txt"));
LOGGER.info("Generate Info Location: " + FileUtils.union(config.getFILE_PATH(), "info.txt"));
LOGGER.info("Data Location: {}", config.getFILE_PATH());
LOGGER.info("Schema Location: {}", FileUtils.union(config.getFILE_PATH(), "schema.txt"));
LOGGER.info("Generate Info Location: {}", FileUtils.union(config.getFILE_PATH(), "info.txt"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,16 @@ protected boolean preCheck() {
PersistenceFactory persistenceFactory = new PersistenceFactory();
TestDataPersistence recorder = persistenceFactory.getPersistence();
recorder.saveTestConfig();

List<DBConfig> dbConfigs = new ArrayList<>();
dbConfigs.add(config.getDbConfig());
if (config.isIS_DOUBLE_WRITE()) {
dbConfigs.add(config.getANOTHER_DBConfig());
}
if (config.isIS_DELETE_DATA()) {
if (!cleanUpData(dbConfigs, measurement)) {
return false;
}
if (config.isIS_DELETE_DATA() && (!cleanUpData(dbConfigs, measurement))) {
return false;
}
if (config.isCREATE_SCHEMA()) {
if (!registerSchema(measurement)) {
return false;
}
if (config.isCREATE_SCHEMA() && (!registerSchema(measurement))) {
return false;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@
package cn.edu.tsinghua.iot.benchmark.mode;

import cn.edu.tsinghua.iot.benchmark.client.operation.Operation;
import cn.edu.tsinghua.iot.benchmark.conf.Config;
import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor;
import cn.edu.tsinghua.iot.benchmark.measurement.Measurement;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class VerificationQueryMode extends BaseMode {

private static final Config config = ConfigDescriptor.getInstance().getConfig();

@Override
protected boolean preCheck() {
return true;
Expand All @@ -45,6 +41,6 @@ protected void postCheck() {
threadsMeasurements,
start,
dataClients,
Arrays.asList(Operation.VERIFICATION_QUERY));
Collections.singletonList(Operation.VERIFICATION_QUERY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@
import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class VerificationWriteMode extends BaseMode {

private static final Config config = ConfigDescriptor.getInstance().getConfig();

@Override
Expand All @@ -40,15 +39,11 @@ protected boolean preCheck() {
if (config.isIS_DOUBLE_WRITE()) {
dbConfigs.add(config.getANOTHER_DBConfig());
}
if (config.isIS_DELETE_DATA()) {
if (!cleanUpData(dbConfigs, measurement)) {
return false;
}
if (config.isIS_DELETE_DATA() && (!cleanUpData(dbConfigs, measurement))) {
return false;
}
if (config.isCREATE_SCHEMA()) {
if (!registerSchema(measurement)) {
return false;
}
if (config.isCREATE_SCHEMA() && (!registerSchema(measurement))) {
return false;
}
return true;
}
Expand All @@ -61,6 +56,6 @@ protected void postCheck() {
threadsMeasurements,
start,
dataClients,
new ArrayList<>(Arrays.asList(Operation.INGESTION)));
new ArrayList<>(Collections.singletonList(Operation.INGESTION)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class SingletonWorkDataWorkLoad extends GenerateDataWorkLoad {
private static final List<Sensor> SENSORS = Collections.synchronizedList(config.getSENSORS());
private ConcurrentHashMap<Integer, AtomicLong> deviceMaxTimeIndexMap;
private static SingletonWorkDataWorkLoad singletonWorkDataWorkLoad = null;
private static final AtomicInteger sensorIndex = new AtomicInteger();
private final AtomicLong insertLoop = new AtomicLong(0);
Expand All @@ -47,10 +45,6 @@ private SingletonWorkDataWorkLoad() {
long startIndex = (long) (config.getLOOP() * config.getOUT_OF_ORDER_RATIO());
this.insertLoop.set(startIndex);
}
deviceMaxTimeIndexMap = new ConcurrentHashMap<>();
for (int i = 0; i < config.getDEVICE_NUMBER(); i++) {
deviceMaxTimeIndexMap.put(MetaUtil.getDeviceId(i), new AtomicLong(0));
}
}

public static SingletonWorkDataWorkLoad getInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,18 @@
import cn.edu.tsinghua.iot.benchmark.entity.Record;
import cn.edu.tsinghua.iot.benchmark.entity.Sensor;
import cn.edu.tsinghua.iot.benchmark.exception.WorkloadException;
import cn.edu.tsinghua.iot.benchmark.schema.MetaUtil;
import cn.edu.tsinghua.iot.benchmark.schema.schemaImpl.DeviceSchema;

import java.util.*;

public class SyntheticDataWorkLoad extends GenerateDataWorkLoad {

private final Map<DeviceSchema, Long> maxTimestampIndexMap;
private long insertLoop = 0;
private int deviceIndex = 0;
private int sensorIndex = 0;
private final List<DeviceSchema> deviceSchemas;

public SyntheticDataWorkLoad(List<DeviceSchema> deviceSchemas) {
this.deviceSchemas = deviceSchemas;
maxTimestampIndexMap = new HashMap<>();
for (DeviceSchema schema : deviceSchemas) {
if (config.isIS_SENSOR_TS_ALIGNMENT()) {
maxTimestampIndexMap.put(schema, 0L);
} else {
for (Sensor sensor : schema.getSensors()) {
DeviceSchema deviceSchema =
new DeviceSchema(
schema.getDeviceId(),
Collections.singletonList(sensor),
MetaUtil.getTags(schema.getDeviceId()));
maxTimestampIndexMap.put(deviceSchema, 0L);
}
}
}
this.deviceSchemaSize = deviceSchemas.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,18 +305,14 @@ private void registerStorageGroups(Session metaSession, List<TimeseriesSchema> s
private void activateTemplate(Session metaSession, List<TimeseriesSchema> schemaList) {
List<String> someDevicePaths = new ArrayList<>();
AtomicLong activatedDeviceCount = new AtomicLong();
schemaList.stream()
.map(schema -> ROOT_SERIES_NAME + "." + schema.getDeviceSchema().getDevicePath())
.forEach(
path -> {
someDevicePaths.add(path);
if (someDevicePaths.size() >= ACTIVATE_TEMPLATE_THRESHOLD) {
activateTemplateForSomeDevices(
metaSession, someDevicePaths, activatedDeviceCount.get());
activatedDeviceCount.addAndGet(someDevicePaths.size());
someDevicePaths.clear();
}
});
for (TimeseriesSchema timeseriesSchema : schemaList) {
someDevicePaths.add(timeseriesSchema.getDeviceId());
if (someDevicePaths.size() >= ACTIVATE_TEMPLATE_THRESHOLD) {
activateTemplateForSomeDevices(metaSession, someDevicePaths, activatedDeviceCount.get());
activatedDeviceCount.addAndGet(someDevicePaths.size());
someDevicePaths.clear();
}
}
if (!someDevicePaths.isEmpty()) {
activateTemplateForSomeDevices(metaSession, someDevicePaths, activatedDeviceCount.get());
}
Expand Down

0 comments on commit e9e0120

Please sign in to comment.