Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Oct 10, 2024
1 parent 39f10e7 commit 9485633
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

public class CoordinatorConfig {
public static final Config<Long> SNAPSHOT_INCREASE_INTERVAL_MS =
Config.longConfig("snapshot.increase.interval.ms", 1000L);
Config.longConfig("snapshot.increase.interval.ms", 2000L);

public static final Config<Long> OFFSETS_PERSIST_INTERVAL_MS =
Config.longConfig("offsets.persist.interval.ms", 1000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -58,7 +59,7 @@ public class KafkaProcessor {
BlockingQueue<ConsumerRecord<LogEntry, LogEntry>> writeQueue;
BlockingQueue<ReadLogEntry> replayQueue;

boolean replayInProgress = false;
private AtomicBoolean replayInProgress;
private AtomicLong latestSnapshotId;

public KafkaProcessor(
Expand All @@ -82,6 +83,8 @@ public KafkaProcessor(
writeQueue = new ArrayBlockingQueue<>(queueSize);
replayQueue = new ArrayBlockingQueue<>(queueSize);
latestSnapshotId = new AtomicLong(-1);
replayInProgress = new AtomicBoolean(false);

}

public void start() {
Expand Down Expand Up @@ -275,20 +278,23 @@ public void replayWAL() throws IOException {

int replayCount = 0;
try (LogReader logReader = this.logService.createReader(storeId, replayFrom)) {
replayInProgress = true;
// replayInProgress.set(true);
ConsumerRecord<LogEntry, LogEntry> record;
while ((record = logReader.readNextRecord()) != null) {
replayQueue.put(new ReadLogEntry(record.offset(), record.value()));
// writeQueue.put(new ReadLogEntry(record.offset(), record.value()));
writeQueue.put(record);
replayCount++;
if (replayCount % 10000 == 0) {
logger.info("replayed {} records", replayCount);
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
replayInProgress = false;
}

// } finally {
// replayInProgress.set(false);
// }
logger.info("replayWAL finished. total replayed [{}] records", replayCount);
}

Expand All @@ -297,15 +303,24 @@ private void processRecords() {
while (true) {
long offset;
LogEntry logEntry;
if (!replayQueue.isEmpty()) {
if (replayInProgress.get() || !replayQueue.isEmpty()) {
if (replayQueue.isEmpty()) {
Thread.sleep(10);
continue;
}
ReadLogEntry readLogEntry = replayQueue.take();
offset = readLogEntry.getOffset();
logEntry = readLogEntry.getLogEntry();
logEntry.setSnapshotId(latestSnapshotId.get());
// logger.info("polled from replay queue, offset {}, id {}", offset, logEntry.getSnapshotId());

} else {
ConsumerRecord<LogEntry, LogEntry> record = writeQueue.take();
offset = record.offset();
logEntry = record.value();
latestSnapshotId.set(logEntry.getSnapshotId());
// logger.info("polled from write queue, offset {}, id {}", offset, latestSnapshotId.get());

}
processRecord(offset, logEntry);
}
Expand Down Expand Up @@ -349,14 +364,13 @@ private List<OperationType> prepareDMLTypes() {
public List<Long> replayDMLRecordsFrom(long offset, long timestamp) throws IOException {
List<OperationType> types = prepareDMLTypes();
logger.info("replay DML records of from offset [{}], ts [{}]", offset, timestamp);

// Note this clear is necessary, as those records would be a subset of record range in
// new reader
replayInProgress.set(true);
writeQueue.clear();
long batchSnapshotId;
int replayCount = 0;
try (LogReader logReader = this.logService.createReader(storeId, offset, timestamp)) {
replayInProgress = true;
// Note this clear is necessary, as those records would be a subset of record range in
// new reader
writeQueue.clear();
ReadLogEntry readLogEntry;
batchSnapshotId = latestSnapshotId.get();
while (!shouldStop && (readLogEntry = logReader.readNext()) != null) {
Expand All @@ -376,7 +390,7 @@ public List<Long> replayDMLRecordsFrom(long offset, long timestamp) throws IOExc
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
replayInProgress = false;
replayInProgress.set(false);
}
logger.info("replay DML records finished. total replayed [{}] records", replayCount);
return List.of(batchSnapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.alibaba.graphscope.groot.common.config.CommonConfig;
import com.alibaba.graphscope.groot.common.config.Configs;
import com.alibaba.graphscope.groot.common.config.StoreConfig;
import com.alibaba.graphscope.groot.common.exception.ExternalStorageErrorException;
import com.alibaba.graphscope.groot.common.exception.GrootException;
import com.alibaba.graphscope.groot.common.exception.IllegalStateException;
import com.alibaba.graphscope.groot.common.exception.InternalException;
Expand Down Expand Up @@ -223,13 +224,16 @@ public boolean batchWrite(StoreDataBatch storeDataBatch)
long snapshotId = storeDataBatch.getSnapshotId();
List<Map<Integer, OperationBatch>> dataBatch = storeDataBatch.getDataBatch();
AtomicBoolean hasDdl = new AtomicBoolean(false);
int maxRetry = 10;
int maxRetry = 5;
for (Map<Integer, OperationBatch> partitionToBatch : dataBatch) {
while (!shouldStop && partitionToBatch.size() != 0 && maxRetry > 0) {
partitionToBatch = writeStore(snapshotId, partitionToBatch, hasDdl);
maxRetry--;
}
}
if (maxRetry == 0) {
throw new ExternalStorageErrorException("batchWrite failed after 5 attempts");
}
return hasDdl.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void processBatches() {
continue;
}
long batchSI = batch.getSnapshotId();
logger.debug("polled one batch [" + batchSI + "]");
// logger.debug("polled one batch [" + batchSI + "]");
boolean hasDdl = writeEngineWithRetry(batch);
if (this.consumeSI < batchSI) {
SnapshotInfo availSInfo = this.availSnapshotInfoRef.get();
Expand All @@ -171,9 +171,10 @@ private void processBatches() {
this.consumeSI = batchSI;
this.availSnapshotInfoRef.set(new SnapshotInfo(availSI, availDdlSI));
this.commitExecutor.execute(this::asyncCommit);
} else { // a flurry of batches with same snapshot ID
logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI);
}
// else { // a flurry of batches with same snapshot ID
// logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI);
//}
if (hasDdl) {
this.consumeDdlSnapshotId = batchSI;
}
Expand Down Expand Up @@ -204,13 +205,13 @@ private void asyncCommit() {
}

private boolean writeEngineWithRetry(StoreDataBatch storeDataBatch) {
while (!shouldStop) {
// while (!shouldStop) {
try {
return this.storeService.batchWrite(storeDataBatch);
} catch (Exception e) {
logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e);
}
}
// }
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.Objects;

public class LogEntry {
private final long snapshotId;
private long snapshotId;
private final OperationBatch operationBatch;

public LogEntry(long snapshotId, OperationBatch operationBatch) {
Expand Down Expand Up @@ -48,6 +48,10 @@ public LogEntryPb toProto() {
.build();
}

public void setSnapshotId(long snapshotId) {
this.snapshotId = snapshotId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down

0 comments on commit 9485633

Please sign in to comment.