Skip to content

Commit

Permalink
Pipe: fix the problems that unable to start when cannot parse reboot …
Browse files Browse the repository at this point in the history
…times (#14594)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
XNX02 and SteveYurongSu authored Dec 31, 2024
1 parent f3ebf52 commit 88a5969
Showing 1 changed file with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.agent.runtime;

import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
Expand All @@ -31,6 +30,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -55,17 +55,21 @@ public class SimpleProgressIndexAssigner {
private int rebootTimes = 0;
private final AtomicLong insertionRequestId = new AtomicLong(1);

public void start() throws StartupException {
public void start() {
isSimpleConsensusEnable =
IOTDB_CONFIG.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS);
LOGGER.info("Start SimpleProgressIndexAssigner ...");
LOGGER.info("Starting SimpleProgressIndexAssigner ...");

try {
makeDirIfNecessary();
parseRebootTimes();
recordRebootTimes();
LOGGER.info(
"SimpleProgressIndexAssigner started successfully. isSimpleConsensusEnable: {}, rebootTimes: {}",
isSimpleConsensusEnable,
rebootTimes);
} catch (Exception e) {
throw new StartupException(e);
LOGGER.error("Cannot start SimpleProgressIndexAssigner because of {}", e.getMessage(), e);
}
}

Expand All @@ -86,15 +90,27 @@ private void parseRebootTimes() {
try {
String content = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
rebootTimes = Integer.parseInt(content);
} catch (IOException e) {
LOGGER.error("Cannot parse reboot times from file {}", file.getAbsolutePath(), e);
rebootTimes = 0;
} catch (final Exception e) {
rebootTimes = (int) (System.currentTimeMillis() / 1000);
LOGGER.error(
"Cannot parse reboot times from file {}, set the current time in seconds ({}) as the reboot times",
file.getAbsolutePath(),
rebootTimes);
}
}

private void recordRebootTimes() throws IOException {
File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + REBOOT_TIMES_FILE_NAME);
FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), StandardCharsets.UTF_8);
private void recordRebootTimes() {
final File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + REBOOT_TIMES_FILE_NAME);
try (final FileOutputStream fos = new FileOutputStream(file, false)) {
fos.write(String.valueOf(rebootTimes + 1).getBytes(StandardCharsets.UTF_8));
fos.flush();
fos.getFD().sync();
} catch (final Exception e) {
LOGGER.error(
"Cannot record reboot times {} to file {}, the reboot times will not be updated",
rebootTimes,
file.getAbsolutePath());
}
}

public void assignIfNeeded(InsertNode insertNode) {
Expand Down

0 comments on commit 88a5969

Please sign in to comment.