From 94856337f55f38523a870fb17afed93e5f61409c Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Thu, 10 Oct 2024 12:57:38 +0800 Subject: [PATCH] fix --- .../common/config/CoordinatorConfig.java | 2 +- .../groot/store/KafkaProcessor.java | 38 +++++++++++++------ .../graphscope/groot/store/StoreService.java | 6 ++- .../graphscope/groot/store/WriterAgent.java | 11 +++--- .../graphscope/groot/wal/LogEntry.java | 6 ++- 5 files changed, 43 insertions(+), 20 deletions(-) diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java index 2dcbbaf704f2..f3d11169dfa4 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java @@ -17,7 +17,7 @@ public class CoordinatorConfig { public static final Config SNAPSHOT_INCREASE_INTERVAL_MS = - Config.longConfig("snapshot.increase.interval.ms", 1000L); + Config.longConfig("snapshot.increase.interval.ms", 2000L); public static final Config OFFSETS_PERSIST_INTERVAL_MS = Config.longConfig("offsets.persist.interval.ms", 1000L); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index a5a27a76396e..ac032bda0ed2 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -58,7 +59,7 @@ public class KafkaProcessor { BlockingQueue> writeQueue; BlockingQueue replayQueue; - boolean replayInProgress = false; + private AtomicBoolean replayInProgress; private AtomicLong latestSnapshotId; public KafkaProcessor( @@ -82,6 +83,8 @@ public KafkaProcessor( writeQueue = new ArrayBlockingQueue<>(queueSize); replayQueue = new ArrayBlockingQueue<>(queueSize); latestSnapshotId = new AtomicLong(-1); + replayInProgress = new AtomicBoolean(false); + } public void start() { @@ -275,10 +278,11 @@ public void replayWAL() throws IOException { int replayCount = 0; try (LogReader logReader = this.logService.createReader(storeId, replayFrom)) { - replayInProgress = true; +// replayInProgress.set(true); ConsumerRecord record; while ((record = logReader.readNextRecord()) != null) { - replayQueue.put(new ReadLogEntry(record.offset(), record.value())); +// writeQueue.put(new ReadLogEntry(record.offset(), record.value())); + writeQueue.put(record); replayCount++; if (replayCount % 10000 == 0) { logger.info("replayed {} records", replayCount); @@ -286,9 +290,11 @@ public void replayWAL() throws IOException { } } catch (InterruptedException e) { throw new RuntimeException(e); - } finally { - replayInProgress = false; } + +// } finally { +// replayInProgress.set(false); +// } logger.info("replayWAL finished. total replayed [{}] records", replayCount); } @@ -297,15 +303,24 @@ private void processRecords() { while (true) { long offset; LogEntry logEntry; - if (!replayQueue.isEmpty()) { + if (replayInProgress.get() || !replayQueue.isEmpty()) { + if (replayQueue.isEmpty()) { + Thread.sleep(10); + continue; + } ReadLogEntry readLogEntry = replayQueue.take(); offset = readLogEntry.getOffset(); logEntry = readLogEntry.getLogEntry(); + logEntry.setSnapshotId(latestSnapshotId.get()); +// logger.info("polled from replay queue, offset {}, id {}", offset, logEntry.getSnapshotId()); + } else { ConsumerRecord record = writeQueue.take(); offset = record.offset(); logEntry = record.value(); latestSnapshotId.set(logEntry.getSnapshotId()); +// logger.info("polled from write queue, offset {}, id {}", offset, latestSnapshotId.get()); + } processRecord(offset, logEntry); } @@ -349,14 +364,13 @@ private List prepareDMLTypes() { public List replayDMLRecordsFrom(long offset, long timestamp) throws IOException { List types = prepareDMLTypes(); logger.info("replay DML records of from offset [{}], ts [{}]", offset, timestamp); - + // Note this clear is necessary, as those records would be a subset of record range in + // new reader + replayInProgress.set(true); + writeQueue.clear(); long batchSnapshotId; int replayCount = 0; try (LogReader logReader = this.logService.createReader(storeId, offset, timestamp)) { - replayInProgress = true; - // Note this clear is necessary, as those records would be a subset of record range in - // new reader - writeQueue.clear(); ReadLogEntry readLogEntry; batchSnapshotId = latestSnapshotId.get(); while (!shouldStop && (readLogEntry = logReader.readNext()) != null) { @@ -376,7 +390,7 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc } catch (InterruptedException e) { throw new RuntimeException(e); } finally { - replayInProgress = false; + replayInProgress.set(false); } logger.info("replay DML records finished. total replayed [{}] records", replayCount); return List.of(batchSnapshotId); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 136d0639ae61..861d0a4790ed 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -17,6 +17,7 @@ import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.config.StoreConfig; +import com.alibaba.graphscope.groot.common.exception.ExternalStorageErrorException; import com.alibaba.graphscope.groot.common.exception.GrootException; import com.alibaba.graphscope.groot.common.exception.IllegalStateException; import com.alibaba.graphscope.groot.common.exception.InternalException; @@ -223,13 +224,16 @@ public boolean batchWrite(StoreDataBatch storeDataBatch) long snapshotId = storeDataBatch.getSnapshotId(); List> dataBatch = storeDataBatch.getDataBatch(); AtomicBoolean hasDdl = new AtomicBoolean(false); - int maxRetry = 10; + int maxRetry = 5; for (Map partitionToBatch : dataBatch) { while (!shouldStop && partitionToBatch.size() != 0 && maxRetry > 0) { partitionToBatch = writeStore(snapshotId, partitionToBatch, hasDdl); maxRetry--; } } + if (maxRetry == 0) { + throw new ExternalStorageErrorException("batchWrite failed after 5 attempts"); + } return hasDdl.get(); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index 8b5c63b27686..2a525be1d6cc 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -162,7 +162,7 @@ private void processBatches() { continue; } long batchSI = batch.getSnapshotId(); - logger.debug("polled one batch [" + batchSI + "]"); +// logger.debug("polled one batch [" + batchSI + "]"); boolean hasDdl = writeEngineWithRetry(batch); if (this.consumeSI < batchSI) { SnapshotInfo availSInfo = this.availSnapshotInfoRef.get(); @@ -171,9 +171,10 @@ private void processBatches() { this.consumeSI = batchSI; this.availSnapshotInfoRef.set(new SnapshotInfo(availSI, availDdlSI)); this.commitExecutor.execute(this::asyncCommit); - } else { // a flurry of batches with same snapshot ID - logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); } + // else { // a flurry of batches with same snapshot ID + // logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); + //} if (hasDdl) { this.consumeDdlSnapshotId = batchSI; } @@ -204,13 +205,13 @@ private void asyncCommit() { } private boolean writeEngineWithRetry(StoreDataBatch storeDataBatch) { - while (!shouldStop) { +// while (!shouldStop) { try { return this.storeService.batchWrite(storeDataBatch); } catch (Exception e) { logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e); } - } +// } return false; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java index a34bfdff5891..10a064f1e00d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java @@ -19,7 +19,7 @@ import java.util.Objects; public class LogEntry { - private final long snapshotId; + private long snapshotId; private final OperationBatch operationBatch; public LogEntry(long snapshotId, OperationBatch operationBatch) { @@ -48,6 +48,10 @@ public LogEntryPb toProto() { .build(); } + public void setSnapshotId(long snapshotId) { + this.snapshotId = snapshotId; + } + @Override public boolean equals(Object o) { if (this == o) return true;