Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The system shuts down automatically if the template creation fails. #463

Merged
merged 7 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

public class SchemaClient implements Runnable {
public class SchemaClient implements Callable<Boolean> {
private static final Logger LOGGER = LoggerFactory.getLogger(SchemaClient.class);

protected static Config config = ConfigDescriptor.getInstance().getConfig();
Expand Down Expand Up @@ -77,7 +78,7 @@ private void initDBWrappers() {
}

@Override
public void run() {
public Boolean call() {
try {
try {
if (dbWrapper != null) {
Expand All @@ -88,7 +89,8 @@ public void run() {

// register
try {
result = (null == dbWrapper.registerSchema(deviceSchemas));
// When the return value is not null, the registerSchema is successful.
result = (null != dbWrapper.registerSchema(deviceSchemas));
} catch (TsdbException e) {
LOGGER.error("Register {} schema failed because ", config.getNET_DEVICE(), e);
result = false;
Expand All @@ -106,6 +108,7 @@ public void run() {
}
} finally {
countDownLatch.countDown();
return result;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
Expand Down Expand Up @@ -176,20 +178,32 @@ protected boolean registerSchema() {
SchemaClient schemaClient = new SchemaClient(i, schemaDownLatch, schemaBarrier);
schemaClients.add(schemaClient);
}
List<Future<Boolean>> futures = new ArrayList<>();
for (SchemaClient schemaClient : schemaClients) {
schemaExecutorService.submit(schemaClient);
Future<Boolean> future = schemaExecutorService.submit(schemaClient);
futures.add(future);
}
startTime = System.nanoTime();
schemaExecutorService.shutdown();
try {
// wait for all dataClients finish test
schemaDownLatch.await();
for (Future<Boolean> future : futures) {
Boolean result = future.get();
if (!result) {
LOGGER.error("Registering schema failed!");
return false;
}
}
schemaClients.stream()
.map(SchemaClient::getMeasurement)
.forEach(baseModeMeasurement::mergeCreateSchemaFinishTime);
} catch (InterruptedException e) {
LOGGER.error("Exception occurred during waiting for all threads finish.", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOGGER.error("Exception occurred during getting result of tasks.", e);
Thread.currentThread().interrupt();
}
LOGGER.info("Registering schema successful!");
MetaDataSchema.clearSchemaClientDataSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -104,7 +105,17 @@ public void registerSchema(
registerTimeSeries(pair.getKey(), pair.getValue());
}
}
} catch (BrokenBarrierException exception) {
LOGGER.error("Barrier was broken", exception);
throw new TsdbException(exception);
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
LOGGER.warn("Thread was interrupted", exception);
throw new TsdbException(exception);
} catch (Exception e) {
templateBarrier.reset();
schemaBarrier.reset();
activateTemplateBarrier.reset();
throw new TsdbException(e);
}
}
Expand Down