diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/SchemaClient.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/SchemaClient.java index 84cb89b0d..409ee466e 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/SchemaClient.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/SchemaClient.java @@ -32,10 +32,11 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; -public class SchemaClient implements Runnable { +public class SchemaClient implements Callable { private static final Logger LOGGER = LoggerFactory.getLogger(SchemaClient.class); protected static Config config = ConfigDescriptor.getInstance().getConfig(); @@ -77,7 +78,7 @@ private void initDBWrappers() { } @Override - public void run() { + public Boolean call() { try { try { if (dbWrapper != null) { @@ -88,7 +89,8 @@ public void run() { // register try { - result = (null == dbWrapper.registerSchema(deviceSchemas)); + // When the return value is not null, the registerSchema is successful. + result = (null != dbWrapper.registerSchema(deviceSchemas)); } catch (TsdbException e) { LOGGER.error("Register {} schema failed because ", config.getNET_DEVICE(), e); result = false; @@ -106,6 +108,7 @@ public void run() { } } finally { countDownLatch.countDown(); + return result; } } diff --git a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java index cdfbab85f..84b58f52f 100644 --- a/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java +++ b/core/src/main/java/cn/edu/tsinghua/iot/benchmark/mode/BaseMode.java @@ -38,8 +38,10 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -176,20 +178,32 @@ protected boolean registerSchema() { SchemaClient schemaClient = new SchemaClient(i, schemaDownLatch, schemaBarrier); schemaClients.add(schemaClient); } + List> futures = new ArrayList<>(); for (SchemaClient schemaClient : schemaClients) { - schemaExecutorService.submit(schemaClient); + Future future = schemaExecutorService.submit(schemaClient); + futures.add(future); } startTime = System.nanoTime(); schemaExecutorService.shutdown(); try { // wait for all dataClients finish test schemaDownLatch.await(); + for (Future future : futures) { + Boolean result = future.get(); + if (!result) { + LOGGER.error("Registering schema failed!"); + return false; + } + } schemaClients.stream() .map(SchemaClient::getMeasurement) .forEach(baseModeMeasurement::mergeCreateSchemaFinishTime); } catch (InterruptedException e) { LOGGER.error("Exception occurred during waiting for all threads finish.", e); Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + LOGGER.error("Exception occurred during getting result of tasks.", e); + Thread.currentThread().interrupt(); } LOGGER.info("Registering schema successful!"); MetaDataSchema.clearSchemaClientDataSchema(); diff --git a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TreeStrategy.java b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TreeStrategy.java index d5a4fe4b9..5af0fc95e 100644 --- a/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TreeStrategy.java +++ b/iotdb-2.0/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb200/ModelStrategy/TreeStrategy.java @@ -53,6 +53,7 @@ import java.util.Objects; import java.util.Random; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -104,7 +105,17 @@ public void registerSchema( registerTimeSeries(pair.getKey(), pair.getValue()); } } + } catch (BrokenBarrierException exception) { + LOGGER.error("Barrier was broken", exception); + throw new TsdbException(exception); + } catch (InterruptedException exception) { + Thread.currentThread().interrupt(); + LOGGER.warn("Thread was interrupted", exception); + throw new TsdbException(exception); } catch (Exception e) { + templateBarrier.reset(); + schemaBarrier.reset(); + activateTemplateBarrier.reset(); throw new TsdbException(e); } }