From 6992665b46fa12f69830a5869c449204412f08b0 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Tue, 8 Oct 2024 20:28:26 +0800 Subject: [PATCH 1/9] try enhanced replay --- .../graphscope/groot/sdk/GrootClient.java | 10 ++ .../groot/frontend/ClientService.java | 46 ++++++++ .../groot/frontend/FrontendStoreClient.java | 20 ++++ .../groot/store/FrontendStoreService.java | 16 +++ .../groot/store/KafkaProcessor.java | 109 ++++++++++++++---- .../groot/store/SnapshotSortQueue.java | 18 ++- .../graphscope/groot/store/StoreService.java | 2 + .../graphscope/groot/store/WriterAgent.java | 20 ++-- proto/groot/frontend_store_service.proto | 1 + proto/groot/sdk/client_service.proto | 1 + proto/groot/sdk/model.proto | 9 ++ python/graphscope/client/connection.py | 9 ++ 12 files changed, 224 insertions(+), 37 deletions(-) diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java index 971dea16d561..ed2e2da1997f 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java @@ -101,6 +101,16 @@ public List replayRecords(long offset, long timestamp) { return resp.getSnapshotIdList(); } + public boolean replayRecordsV2(long offset, long timestamp) { + ReplayRecordsRequestV2 req = + ReplayRecordsRequestV2.newBuilder() + .setOffset(offset) + .setTimestamp(timestamp) + .build(); + ReplayRecordsResponseV2 resp = this.clientStub.replayRecordsV2(req); + return resp.getSuccess(); + } + private long modifyVertex(Vertex vertex, WriteTypePb writeType) { WriteRequestPb request = vertex.toWriteRequest(writeType); return submit(request); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index 48f64d721751..ac5cfc05843d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -535,4 +535,50 @@ private void finish(Throwable t) { }); } } + + @Override + public void replayRecordsV2( + ReplayRecordsRequestV2 request, + StreamObserver responseObserver) { + ReplayRecordsResponseV2.Builder response = ReplayRecordsResponseV2.newBuilder().setSuccess(true); + logger.info("replay records v2"); + int storeCount = this.metaService.getStoreCount(); + AtomicInteger counter = new AtomicInteger(storeCount); + AtomicBoolean finished = new AtomicBoolean(false); + + for (int i = 0; i < storeCount; i++) { + this.frontendStoreClients + .getClient(i) + .replayRecordsV2( + request, + new CompletionCallback() { + @Override + public void onCompleted(ReplayRecordsResponseV2 res) { + response.mergeFrom(res); + if (!finished.get() && counter.decrementAndGet() == 0) { + finish(null); + } + } + + @Override + public void onError(Throwable t) { + logger.error("failed to replay records", t); + response.setSuccess(false); + finish(t); + } + + private void finish(Throwable t) { + if (finished.getAndSet(true)) { + return; + } + if (t != null) { + responseObserver.onError(t); + } else { + responseObserver.onNext(response.build()); + responseObserver.onCompleted(); + } + } + }); + } + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java index 0fa6aad2ca80..fbaa9babb2b1 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java @@ -135,4 +135,24 @@ public void onError(Throwable t) { public void onCompleted() {} }); } + + public void replayRecordsV2(ReplayRecordsRequestV2 request, CompletionCallback callback) { + getStub() + .replayRecordsV2( + request, + new StreamObserver() { + @Override + public void onNext(ReplayRecordsResponseV2 value) { + callback.onCompleted(value); + } + + @Override + public void onError(Throwable t) { + callback.onError(t); + } + + @Override + public void onCompleted() {} + }); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java index 621ad6faccdb..2c4c26eada26 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java @@ -127,4 +127,20 @@ public void getState( responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } + + @Override + public void replayRecordsV2( + ReplayRecordsRequestV2 request, + StreamObserver responseObserver) { + try { + long offset = request.getOffset(); + long ts = request.getTimestamp(); + this.storeService.replayRecordsV2(offset, ts); + responseObserver.onNext(ReplayRecordsResponseV2.newBuilder().setSuccess(true).build()); + responseObserver.onCompleted(); + } catch (IOException e) { + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException()); + } + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index d99e97c2a31b..61dbb58d2414 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -16,9 +16,7 @@ import com.alibaba.graphscope.groot.operation.OperationBlob; import com.alibaba.graphscope.groot.operation.OperationType; import com.alibaba.graphscope.groot.operation.StoreDataBatch; -import com.alibaba.graphscope.groot.wal.LogEntry; -import com.alibaba.graphscope.groot.wal.LogReader; -import com.alibaba.graphscope.groot.wal.LogService; +import com.alibaba.graphscope.groot.wal.*; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -56,7 +54,10 @@ public class KafkaProcessor { private volatile boolean shouldStop = true; List typesDDL; - BlockingQueue> queue; + BlockingQueue> writeQueue; + BlockingQueue replayQueue; + + boolean replayInProgress = false; public KafkaProcessor( Configs configs, @@ -76,7 +77,8 @@ public KafkaProcessor( offsetsPersistIntervalMs = CoordinatorConfig.OFFSETS_PERSIST_INTERVAL_MS.get(configs); int queueSize = StoreConfig.STORE_QUEUE_BUFFER_SIZE.get(configs); - queue = new ArrayBlockingQueue<>(queueSize); + writeQueue = new ArrayBlockingQueue<>(queueSize); + replayQueue = new ArrayBlockingQueue<>(queueSize); } public void start() { @@ -203,20 +205,24 @@ public void pollBatches() { // -1 stands for poll from latest try (LogReader reader = logService.createReader(storeId, -1)) { while (!shouldStop) { - ConsumerRecords records = reader.getLatestUpdates(); - for (ConsumerRecord record : records) { - queue.add(record); + try { + ConsumerRecords records = reader.getLatestUpdates(); + for (ConsumerRecord record : records) { + writeQueue.put(record); + } + } catch (InterruptedException e) { + throw new InternalException(e); + } catch (Exception ex) { + ex.printStackTrace(); } } - } catch (IOException e) { - throw new InternalException(e); + } catch (IOException ex) { + ex.printStackTrace(); } } - private void processRecord(ConsumerRecord record) { + private void processRecord(long offset, LogEntry logEntry) { int partitionCount = metaService.getPartitionCount(); - long offset = record.offset(); - LogEntry logEntry = record.value(); OperationBatch operationBatch = logEntry.getOperationBatch(); if (isSecondary) { // only catch up the schema updates operationBatch = Utils.extractOperations(operationBatch, typesDDL); @@ -228,7 +234,7 @@ private void processRecord(ConsumerRecord record) { StoreDataBatch.Builder builder = StoreDataBatch.newBuilder() .requestId("") - .queueId(storeId) + .queueId(0) .snapshotId(snapshotId) .traceId(operationBatch.getTraceId()) .offset(offset); @@ -263,11 +269,13 @@ public void replayWAL() throws IOException { logger.warn("It may not be expected to replay from the 0 offset, skipped"); return; } + int replayCount = 0; try (LogReader logReader = this.logService.createReader(storeId, replayFrom)) { + replayInProgress = true; ConsumerRecord record; while ((record = logReader.readNextRecord()) != null) { - queue.put(record); + writeQueue.put(record); replayCount++; if (replayCount % 10000 == 0) { logger.info("replayed {} records", replayCount); @@ -275,18 +283,31 @@ public void replayWAL() throws IOException { } } catch (InterruptedException e) { throw new RuntimeException(e); + } finally { + replayInProgress = false; } logger.info("replayWAL finished. total replayed [{}] records", replayCount); } private void processRecords() { - while (true) { - try { - ConsumerRecord record = queue.take(); - processRecord(record); - } catch (InterruptedException e) { - e.printStackTrace(); + try { + while (true) { + long offset; + LogEntry logEntry; + if (replayInProgress || !replayQueue.isEmpty()) { + ReadLogEntry readLogEntry = replayQueue.take(); + offset = readLogEntry.getOffset(); + logEntry = readLogEntry.getLogEntry(); + } else { + ConsumerRecord record = writeQueue.take(); + offset = record.offset(); + logEntry = record.value(); + } + processRecord(offset, logEntry); } + } catch (InterruptedException ex) { + ex.printStackTrace(); + throw new RuntimeException(ex); } } @@ -305,4 +326,50 @@ private List prepareDDLTypes() { types.add(OperationType.MARKER); // For advance ID return types; } + + private List prepareDMLTypes() { + List types = new ArrayList<>(); + types.add(OperationType.OVERWRITE_VERTEX); + types.add(OperationType.UPDATE_VERTEX); + types.add(OperationType.DELETE_VERTEX); + types.add(OperationType.OVERWRITE_EDGE); + types.add(OperationType.UPDATE_EDGE); + types.add(OperationType.DELETE_EDGE); + types.add(OperationType.CLEAR_VERTEX_PROPERTIES); + types.add(OperationType.CLEAR_EDGE_PROPERTIES); + types.add(OperationType.ADD_VERTEX_TYPE_PROPERTIES); + types.add(OperationType.ADD_EDGE_TYPE_PROPERTIES); + return types; + } + + public List replayDMLRecordsFrom(long offset, long timestamp) throws IOException { + List types = prepareDMLTypes(); + logger.info("replay DML records of from offset [{}], ts [{}]", offset, timestamp); + + long batchSnapshotId = 0; + int replayCount = 0; + try (LogReader logReader = this.logService.createReader(storeId, offset, timestamp)) { + replayInProgress = true; + // Note this clear is necessary, as those records would be a subset of record range in + // new reader + writeQueue.clear(); + ReadLogEntry readLogEntry; + while (!shouldStop && (readLogEntry = logReader.readNext()) != null) { + LogEntry logEntry = readLogEntry.getLogEntry(); + OperationBatch batch = Utils.extractOperations(logEntry.getOperationBatch(), types); + if (batch.getOperationCount() == 0) { + continue; + } + ReadLogEntry entry = new ReadLogEntry(readLogEntry.getOffset(), 0, batch); + replayQueue.put(entry); + replayCount++; + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + replayInProgress = false; + } + logger.info("replay DML records finished. total replayed [{}] records", replayCount); + return List.of(batchSnapshotId); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java index cb86eaf5ec11..4f9547d5ce7e 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java @@ -68,11 +68,14 @@ public boolean offerQueue(int queueId, StoreDataBatch entry) throws InterruptedE if (innerQueue == null) { throw new InvalidArgumentException("invalid queueId [" + queueId + "]"); } - boolean res = innerQueue.offer(entry, this.queueWaitMs, TimeUnit.MILLISECONDS); - if (res) { - this.size.incrementAndGet(); - } - return res; + // boolean res = innerQueue.offer(entry, this.queueWaitMs, TimeUnit.MILLISECONDS); + // if (res) {x + // this.size.incrementAndGet(); + // } + // return res; + innerQueue.put(entry); + this.size.incrementAndGet(); + return true; } public StoreDataBatch poll() throws InterruptedException { @@ -108,7 +111,10 @@ public StoreDataBatch poll() throws InterruptedException { } long snapshotId = entry.getSnapshotId(); - if (snapshotId == this.currentPollSnapshotId) { + // allow for a short duration inconsistent, due to different frontend may have minor + // difference in timing + if (snapshotId == this.currentPollSnapshotId + || currentPollSnapshotId - snapshotId < 10) { this.size.decrementAndGet(); return entry; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 136d0639ae61..6968db935a14 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -591,4 +591,6 @@ public long[] getDiskStatus() { long usable = file.getUsableSpace(); return new long[] {total, usable}; } + + public void replayRecordsV2(long offset, long timestamp) throws IOException {} } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index edde700c84d9..8b5c63b27686 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -15,6 +15,7 @@ import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; +import com.alibaba.graphscope.groot.common.config.StoreConfig; import com.alibaba.graphscope.groot.common.util.ThreadFactoryUtils; import com.alibaba.graphscope.groot.coordinator.SnapshotInfo; import com.alibaba.graphscope.groot.meta.MetaService; @@ -47,7 +48,8 @@ public class WriterAgent { private final RoleClients snapshotCommitter; private volatile boolean shouldStop = true; - private SnapshotSortQueue bufferQueue; + // private SnapshotSortQueue bufferQueue; + private BlockingQueue bufferQueue; private volatile long lastCommitSI; private volatile long consumeSI; private volatile long consumeDdlSnapshotId; @@ -68,6 +70,9 @@ public WriterAgent( this.metaService = metaService; this.snapshotCommitter = snapshotCommitter; this.availSnapshotInfoRef = new AtomicReference<>(); + // this.bufferQueue = new SnapshotSortQueue(this.configs, this.metaService); + int queueSize = StoreConfig.STORE_QUEUE_BUFFER_SIZE.get(configs); + this.bufferQueue = new ArrayBlockingQueue<>(queueSize); initMetrics(); } @@ -78,7 +83,6 @@ public void start() { this.availSnapshotInfoRef.set(new SnapshotInfo(0, 0)); this.shouldStop = false; - this.bufferQueue = new SnapshotSortQueue(this.configs, this.metaService); this.consumedQueueOffsets = new ArrayList<>(this.queueCount); for (int i = 0; i < this.queueCount; i++) { this.consumedQueueOffsets.add(-1L); @@ -135,15 +139,15 @@ public void stop() { public boolean writeStore(StoreDataBatch storeDataBatch) throws InterruptedException { // logger.info("writeStore {}", storeDataBatch.toProto()); // int queueId = storeDataBatch.getQueueId(); - boolean suc = this.bufferQueue.offerQueue(0, storeDataBatch); + // boolean suc = this.bufferQueue.offerQueue(0, storeDataBatch); // logger.debug("Buffer queue: {}, {}", suc, this.bufferQueue.innerQueueSizes()); - return suc; + this.bufferQueue.put(storeDataBatch); + return true; } public boolean writeStore2(List storeDataBatches) throws InterruptedException { for (StoreDataBatch storeDataBatch : storeDataBatches) { - int queueId = storeDataBatch.getQueueId(); - if (!this.bufferQueue.offerQueue(queueId, storeDataBatch)) { + if (!writeStore(storeDataBatch)) { return false; } } @@ -175,8 +179,6 @@ private void processBatches() { } // this.consumedQueueOffsets.set(batch.getQueueId(), batch.getOffset()); this.consumedQueueOffsets.set(0, batch.getOffset()); - } catch (InterruptedException e) { - logger.error("processBatches interrupted"); } catch (Exception e) { logger.error("error in processBatches, ignore", e); } @@ -198,8 +200,6 @@ private void asyncCommit() { } catch (Exception e) { logger.warn("commit failed. SI {}, offset {}. ignored", curSI, queueOffsets, e); } - } else { - logger.warn("curSI {} <= lastCommitSI {}, ignored", curSI, lastCommitSI); } } diff --git a/proto/groot/frontend_store_service.proto b/proto/groot/frontend_store_service.proto index 0280a60d9b00..0f07fd4982d9 100644 --- a/proto/groot/frontend_store_service.proto +++ b/proto/groot/frontend_store_service.proto @@ -27,4 +27,5 @@ service FrontendStoreService { rpc compactDB(CompactDBRequest) returns(CompactDBResponse); rpc reopenSecondary(ReopenSecondaryRequest) returns (ReopenSecondaryResponse); rpc GetState(GetStoreStateRequest) returns (GetStoreStateResponse); + rpc replayRecordsV2(ReplayRecordsRequestV2) returns(ReplayRecordsResponseV2); } diff --git a/proto/groot/sdk/client_service.proto b/proto/groot/sdk/client_service.proto index 3591054a2282..ed9d2a276e15 100644 --- a/proto/groot/sdk/client_service.proto +++ b/proto/groot/sdk/client_service.proto @@ -35,6 +35,7 @@ service Client { rpc getStoreState(GetStoreStateRequest) returns (GetStoreStateResponse); rpc compactDB(CompactDBRequest) returns (CompactDBResponse); rpc reopenSecondary(ReopenSecondaryRequest) returns (ReopenSecondaryResponse); + rpc replayRecordsV2(ReplayRecordsRequestV2) returns(ReplayRecordsResponseV2); } message GetSchemaRequest { diff --git a/proto/groot/sdk/model.proto b/proto/groot/sdk/model.proto index 81ddf100c245..96762cdf50de 100644 --- a/proto/groot/sdk/model.proto +++ b/proto/groot/sdk/model.proto @@ -250,3 +250,12 @@ message ReopenSecondaryRequest { message ReopenSecondaryResponse { bool success = 1; } + +message ReplayRecordsRequestV2 { + int64 offset = 1; + int64 timestamp = 2; +} + +message ReplayRecordsResponseV2 { + bool success = 1; +} \ No newline at end of file diff --git a/python/graphscope/client/connection.py b/python/graphscope/client/connection.py index 751540c6f9ef..b88a8c89bf9c 100644 --- a/python/graphscope/client/connection.py +++ b/python/graphscope/client/connection.py @@ -213,6 +213,15 @@ def reopen_secondary(self): ) return response.success + def replay_records_v2(self, offset: int, timestamp: int): + request = model_pb2.ReplayRecordsRequestV2() + request.offset = offset + request.timestamp = timestamp + response = self._client_service_stub.replayRecordsV2( + request, metadata=self._metadata + ) + return response.success + def _encode_metadata(self, username, password): if not (username and password): return None From e86eae2e9a9c72e9c73b480509b4b42fca816826 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Wed, 9 Oct 2024 12:01:04 +0800 Subject: [PATCH 2/9] pass replay --- .../ir/meta/fetcher/DynamicIrMetaFetcher.java | 2 +- .../gremlin/plugin/QueryStatusCallback.java | 2 -- .../graphscope/groot/sdk/GrootClient.java | 4 ++-- .../groot/frontend/ClientService.java | 3 +-- .../groot/frontend/FrontendStoreClient.java | 3 ++- .../groot/frontend/write/KafkaAppender.java | 8 +++++--- .../groot/store/FrontendStoreService.java | 10 +++++++--- .../graphscope/groot/store/KafkaProcessor.java | 17 +++++++++++++---- .../graphscope/groot/store/StoreService.java | 2 -- .../alibaba/graphscope/groot/servers/Store.java | 8 +++++--- proto/groot/sdk/model.proto | 2 +- python/graphscope/client/connection.py | 2 +- 12 files changed, 38 insertions(+), 25 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index 931e78aae887..00e667db86ac 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -101,7 +101,7 @@ private synchronized void syncMeta() { syncStats(statsEnabled); } } catch (Throwable e) { - logger.warn("failed to read meta data, error is {}", e); + logger.warn("failed to read meta data", e); } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java index 2f45930159a1..ba63ffdedcd9 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java @@ -114,8 +114,6 @@ private JsonObject buildSimpleLog(boolean isSucceed, long elapsedMillis) { private void fillLogDetail(JsonObject logJson, String errorMsg) { try { if (this.metricsCollector.getElapsedMillis() > this.printThreshold) { - // todo(siyuan): the invocation of the function can cause Exception when serializing - // a gremlin vertex to json format fillLogDetail(logJson, errorMsg, metricsCollector.getStartMillis()); } } catch (Throwable t) { diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java index ed2e2da1997f..dbec657b1174 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java @@ -101,14 +101,14 @@ public List replayRecords(long offset, long timestamp) { return resp.getSnapshotIdList(); } - public boolean replayRecordsV2(long offset, long timestamp) { + public List replayRecordsV2(long offset, long timestamp) { ReplayRecordsRequestV2 req = ReplayRecordsRequestV2.newBuilder() .setOffset(offset) .setTimestamp(timestamp) .build(); ReplayRecordsResponseV2 resp = this.clientStub.replayRecordsV2(req); - return resp.getSuccess(); + return resp.getSnapshotIdList(); } private long modifyVertex(Vertex vertex, WriteTypePb writeType) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index ac5cfc05843d..b616ddc64df5 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -540,7 +540,7 @@ private void finish(Throwable t) { public void replayRecordsV2( ReplayRecordsRequestV2 request, StreamObserver responseObserver) { - ReplayRecordsResponseV2.Builder response = ReplayRecordsResponseV2.newBuilder().setSuccess(true); + ReplayRecordsResponseV2.Builder response = ReplayRecordsResponseV2.newBuilder(); logger.info("replay records v2"); int storeCount = this.metaService.getStoreCount(); AtomicInteger counter = new AtomicInteger(storeCount); @@ -563,7 +563,6 @@ public void onCompleted(ReplayRecordsResponseV2 res) { @Override public void onError(Throwable t) { logger.error("failed to replay records", t); - response.setSuccess(false); finish(t); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java index fbaa9babb2b1..8d5e3fd75990 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java @@ -136,7 +136,8 @@ public void onCompleted() {} }); } - public void replayRecordsV2(ReplayRecordsRequestV2 request, CompletionCallback callback) { + public void replayRecordsV2( + ReplayRecordsRequestV2 request, CompletionCallback callback) { getStub() .replayRecordsV2( request, diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java index bb2f848843b5..27e4b7b837e3 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java @@ -240,13 +240,14 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc types.add(OperationType.ADD_EDGE_TYPE_PROPERTIES); logger.info("replay DML records of from offset [{}], ts [{}]", offset, timestamp); - long batchSnapshotId = 0; + List ids = new ArrayList<>(); int replayCount = 0; try (LogWriter logWriter = this.logService.createWriter()) { for (int storeId = 0; storeId < storeCount; ++storeId) { try (LogReader logReader = this.logService.createReader(storeId, offset, timestamp)) { + long batchSnapshotId = ingestSnapshotId.get(); ReadLogEntry readLogEntry; while (!shouldStop && (readLogEntry = logReader.readNext()) != null) { LogEntry logEntry = readLogEntry.getLogEntry(); @@ -255,15 +256,16 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc if (batch.getOperationCount() == 0) { continue; } - logWriter.append(storeId, new LogEntry(ingestSnapshotId.get(), batch)); + logWriter.append(storeId, new LogEntry(batchSnapshotId, batch)); replayCount++; } + ids.add(batchSnapshotId); } } } logger.info("replay DML records finished. total replayed [{}] records", replayCount); - return List.of(batchSnapshotId); + return ids; } /** diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java index 2c4c26eada26..3f1005e0e31b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java @@ -20,14 +20,17 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.List; import java.util.Map; public class FrontendStoreService extends FrontendStoreServiceGrpc.FrontendStoreServiceImplBase { private final StoreService storeService; + private final KafkaProcessor processor; - public FrontendStoreService(StoreService storeService) { + public FrontendStoreService(StoreService storeService, KafkaProcessor processor) { this.storeService = storeService; + this.processor = processor; } @Override @@ -135,8 +138,9 @@ public void replayRecordsV2( try { long offset = request.getOffset(); long ts = request.getTimestamp(); - this.storeService.replayRecordsV2(offset, ts); - responseObserver.onNext(ReplayRecordsResponseV2.newBuilder().setSuccess(true).build()); + List si = this.processor.replayDMLRecordsFrom(offset, ts); + responseObserver.onNext( + ReplayRecordsResponseV2.newBuilder().addAllSnapshotId(si).build()); responseObserver.onCompleted(); } catch (IOException e) { responseObserver.onError( diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index 61dbb58d2414..a5a27a76396e 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; public class KafkaProcessor { @@ -58,6 +59,7 @@ public class KafkaProcessor { BlockingQueue replayQueue; boolean replayInProgress = false; + private AtomicLong latestSnapshotId; public KafkaProcessor( Configs configs, @@ -79,6 +81,7 @@ public KafkaProcessor( int queueSize = StoreConfig.STORE_QUEUE_BUFFER_SIZE.get(configs); writeQueue = new ArrayBlockingQueue<>(queueSize); replayQueue = new ArrayBlockingQueue<>(queueSize); + latestSnapshotId = new AtomicLong(-1); } public void start() { @@ -275,7 +278,7 @@ public void replayWAL() throws IOException { replayInProgress = true; ConsumerRecord record; while ((record = logReader.readNextRecord()) != null) { - writeQueue.put(record); + replayQueue.put(new ReadLogEntry(record.offset(), record.value())); replayCount++; if (replayCount % 10000 == 0) { logger.info("replayed {} records", replayCount); @@ -294,7 +297,7 @@ private void processRecords() { while (true) { long offset; LogEntry logEntry; - if (replayInProgress || !replayQueue.isEmpty()) { + if (!replayQueue.isEmpty()) { ReadLogEntry readLogEntry = replayQueue.take(); offset = readLogEntry.getOffset(); logEntry = readLogEntry.getLogEntry(); @@ -302,6 +305,7 @@ private void processRecords() { ConsumerRecord record = writeQueue.take(); offset = record.offset(); logEntry = record.value(); + latestSnapshotId.set(logEntry.getSnapshotId()); } processRecord(offset, logEntry); } @@ -346,7 +350,7 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc List types = prepareDMLTypes(); logger.info("replay DML records of from offset [{}], ts [{}]", offset, timestamp); - long batchSnapshotId = 0; + long batchSnapshotId; int replayCount = 0; try (LogReader logReader = this.logService.createReader(storeId, offset, timestamp)) { replayInProgress = true; @@ -354,15 +358,20 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc // new reader writeQueue.clear(); ReadLogEntry readLogEntry; + batchSnapshotId = latestSnapshotId.get(); while (!shouldStop && (readLogEntry = logReader.readNext()) != null) { LogEntry logEntry = readLogEntry.getLogEntry(); OperationBatch batch = Utils.extractOperations(logEntry.getOperationBatch(), types); if (batch.getOperationCount() == 0) { continue; } - ReadLogEntry entry = new ReadLogEntry(readLogEntry.getOffset(), 0, batch); + ReadLogEntry entry = + new ReadLogEntry(readLogEntry.getOffset(), batchSnapshotId, batch); replayQueue.put(entry); replayCount++; + if (replayCount % 10000 == 0) { + logger.info("replayed {} records", replayCount); + } } } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 6968db935a14..136d0639ae61 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -591,6 +591,4 @@ public long[] getDiskStatus() { long usable = file.getUsableSpace(); return new long[] {total, usable}; } - - public void replayRecordsV2(long offset, long timestamp) throws IOException {} } diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java index ae86914a8aac..5882be772147 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java @@ -65,10 +65,13 @@ public Store(Configs configs) { this.writerAgent = new WriterAgent(configs, this.storeService, this.metaService, snapshotCommitter); + this.processor = new KafkaProcessor(configs, metaService, writerAgent, logService); + this.backupAgent = new BackupAgent(configs, this.storeService); StoreBackupService storeBackupService = new StoreBackupService(this.backupAgent); StoreSchemaService storeSchemaService = new StoreSchemaService(this.storeService); - FrontendStoreService storeIngestService = new FrontendStoreService(this.storeService); + FrontendStoreService frontendStoreService = + new FrontendStoreService(this.storeService, this.processor); StoreSnapshotService storeSnapshotService = new StoreSnapshotService(this.storeService); this.rpcServer = new RpcServer( @@ -76,13 +79,12 @@ public Store(Configs configs) { localNodeProvider, storeBackupService, storeSchemaService, - storeIngestService, + frontendStoreService, storeSnapshotService); IrServiceProducer serviceProducer = new IrServiceProducer(configs); this.executorService = serviceProducer.makeExecutorService(storeService, metaService, discoveryFactory); this.partitionService = new PartitionService(configs, storeService); - this.processor = new KafkaProcessor(configs, metaService, writerAgent, logService); } @Override diff --git a/proto/groot/sdk/model.proto b/proto/groot/sdk/model.proto index 96762cdf50de..8c6eb31a4e02 100644 --- a/proto/groot/sdk/model.proto +++ b/proto/groot/sdk/model.proto @@ -257,5 +257,5 @@ message ReplayRecordsRequestV2 { } message ReplayRecordsResponseV2 { - bool success = 1; + repeated int64 snapshot_id = 1; } \ No newline at end of file diff --git a/python/graphscope/client/connection.py b/python/graphscope/client/connection.py index b88a8c89bf9c..814789571073 100644 --- a/python/graphscope/client/connection.py +++ b/python/graphscope/client/connection.py @@ -220,7 +220,7 @@ def replay_records_v2(self, offset: int, timestamp: int): response = self._client_service_stub.replayRecordsV2( request, metadata=self._metadata ) - return response.success + return response.snapshot_id def _encode_metadata(self, username, password): if not (username and password): From 4bc947b7677c4a43788824ccd400642e171f6f16 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Wed, 9 Oct 2024 12:23:53 +0800 Subject: [PATCH 3/9] more logs --- .../alibaba/graphscope/common/client/ExecutionClient.java | 4 +++- .../graphscope/common/client/HttpExecutionClient.java | 4 +++- .../graphscope/common/client/RpcExecutionClient.java | 7 +++++-- .../gremlin/integration/processor/IrTestOpProcessor.java | 3 ++- .../gremlin/plugin/processor/LifeCycleSupplier.java | 3 ++- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java index 2fe515709b8d..089726d7fe0f 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.QueryTimeoutConfig; +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; /** * client to submit request to remote engine service @@ -37,7 +38,8 @@ public ExecutionClient(ChannelFetcher channelFetcher) { public abstract void submit( ExecutionRequest request, ExecutionResponseListener listener, - QueryTimeoutConfig timeoutConfig) + QueryTimeoutConfig timeoutConfig, + QueryLogger queryLogger) throws Exception; public abstract void close() throws Exception; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java index b8971bba6335..bb5157653f85 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java @@ -25,6 +25,7 @@ import com.alibaba.graphscope.gaia.proto.GraphAlgebraPhysical; import com.alibaba.graphscope.gaia.proto.IrResult; import com.alibaba.graphscope.gaia.proto.StoredProcedure; +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; import com.alibaba.graphscope.interactive.client.Session; import com.alibaba.graphscope.interactive.client.common.Result; import com.alibaba.graphscope.interactive.client.impl.DefaultSession; @@ -56,7 +57,8 @@ public HttpExecutionClient(Configs graphConfig, ChannelFetcher channelFetch public void submit( ExecutionRequest request, ExecutionResponseListener listener, - QueryTimeoutConfig timeoutConfig) + QueryTimeoutConfig timeoutConfig, + QueryLogger queryLogger) throws Exception { List responseFutures = Lists.newArrayList(); for (URI httpURI : channelFetcher.fetch()) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java index 2920d2d5accd..1ef68e17a1a0 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java @@ -23,6 +23,7 @@ import com.alibaba.graphscope.common.config.PegasusConfig; import com.alibaba.graphscope.common.config.QueryTimeoutConfig; import com.alibaba.graphscope.gaia.proto.IrResult; +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; import com.alibaba.pegasus.RpcChannel; import com.alibaba.pegasus.RpcClient; import com.alibaba.pegasus.intf.ResultProcessor; @@ -54,7 +55,8 @@ public RpcExecutionClient(Configs graphConfig, ChannelFetcher channe public void submit( ExecutionRequest request, ExecutionResponseListener listener, - QueryTimeoutConfig timeoutConfig) + QueryTimeoutConfig timeoutConfig, + QueryLogger queryLogger) throws Exception { if (rpcClientRef.get() == null) { rpcClientRef.compareAndSet(null, new RpcClient(channelFetcher.fetch())); @@ -97,12 +99,13 @@ public void process(PegasusClient.JobResponse jobResponse) { @Override public void finish() { listener.onCompleted(); - logger.info("[compile]: received results from engine"); + queryLogger.info("[compile]: received results from engine"); } @Override public void error(Status status) { listener.onError(status.asException()); + queryLogger.error("[compile]: fail to receive results from engine"); } }, timeoutConfig.getChannelTimeoutMS()); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java index 6a3eee75c803..da476cdf85c4 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java @@ -187,7 +187,8 @@ public ThrowingConsumer select(Context ctx) { summary.getLogicalPlan(), summary.getPhysicalPlan()), listener, - timeoutConfig); + timeoutConfig, + statusCallback.getQueryLogger()); } // request results from remote engine in a blocking way listener.request(); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java index 2b21ef46d244..1289c9caaef9 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java @@ -118,7 +118,8 @@ public GremlinExecutor.LifeCycle get() { summary.getLogicalPlan(), summary.getPhysicalPlan()), listener, - timeoutConfig); + timeoutConfig, + statusCallback.getQueryLogger()); } // request results from remote engine in a blocking way listener.request(); From 111a8988a2290a5daa199d55c8b685a4064e19af Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Wed, 9 Oct 2024 12:24:10 +0800 Subject: [PATCH 4/9] switch fetch stats by config --- .../ir/meta/fetcher/DynamicIrMetaFetcher.java | 51 ++++++++----------- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index 00e667db86ac..d01954dbccb7 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -20,7 +20,7 @@ import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.GraphConfig; -import com.alibaba.graphscope.common.ir.meta.GraphId; +import com.alibaba.graphscope.common.config.PlannerConfig; import com.alibaba.graphscope.common.ir.meta.IrMeta; import com.alibaba.graphscope.common.ir.meta.IrMetaStats; import com.alibaba.graphscope.common.ir.meta.IrMetaTracker; @@ -46,21 +46,26 @@ public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable private volatile IrMetaStats currentState; // To manage the state changes of statistics resulting from different update operations. private volatile StatsState statsState; - private volatile Boolean statsEnabled = null; + private final boolean fetchStats; public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) { super(dataReader, tracker); - this.scheduler = new ScheduledThreadPoolExecutor(2); + this.scheduler = new ScheduledThreadPoolExecutor(1); this.scheduler.scheduleAtFixedRate( () -> syncMeta(), - 2000, + 0, GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs), TimeUnit.MILLISECONDS); - this.scheduler.scheduleAtFixedRate( - () -> syncStats(statsEnabled == null ? false : statsEnabled), - 2000, - GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs), - TimeUnit.MILLISECONDS); + this.fetchStats = PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs) + && PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO"); + if (this.fetchStats) { + logger.info("start to schedule statistics fetch task"); + this.scheduler.scheduleAtFixedRate( + () -> syncStats(), + 2000, + GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs), + TimeUnit.MILLISECONDS); + } } @Override @@ -94,32 +99,18 @@ private synchronized void syncMeta() { meta.getSchema(), meta.getStoredProcedures(), curStats); - boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId()); - if (statsEnabled && this.statsState != StatsState.SYNCED - || (!statsEnabled && this.statsState != StatsState.MOCKED)) { - logger.debug("start to sync stats"); - syncStats(statsEnabled); + if (this.fetchStats && this.statsState != StatsState.SYNCED) { + logger.info("start to schedule statistics fetch task"); + syncStats(); } } catch (Throwable e) { logger.warn("failed to read meta data", e); } } - private boolean getStatsEnabled(GraphId graphId) { - try { - return this.statsEnabled == null - ? this.reader.syncStatsEnabled(graphId) - : this.statsEnabled; - } catch ( - Throwable e) { // if errors happen when reading stats enabled, we assume it is false - logger.warn("failed to read stats enabled, error is {}", e); - return false; - } - } - - private synchronized void syncStats(boolean statsEnabled) { + private synchronized void syncStats() { try { - if (this.currentState != null && statsEnabled) { + if (this.currentState != null) { GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId()); logger.debug("statistics from remote: {}", stats); if (stats != null && stats.getVertexCount() != 0) { @@ -137,7 +128,7 @@ private synchronized void syncStats(boolean statsEnabled) { } } } catch (Throwable e) { - logger.warn("failed to read graph statistics, error is {}", e); + logger.warn("failed to read graph statistics, error is", e); } finally { try { if (this.currentState != null @@ -148,7 +139,7 @@ private synchronized void syncStats(boolean statsEnabled) { this.statsState = StatsState.MOCKED; } } catch (Throwable t) { - logger.warn("failed to mock the glogue, error is {}", t); + logger.warn("failed to mock the glogue, error is", t); } } } From 5b8e1505fe3aebbcdfc204c174ddf13eb78a9de5 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Wed, 9 Oct 2024 13:51:40 +0800 Subject: [PATCH 5/9] fix --- .../graphscope/cypher/executor/GraphQueryExecutor.java | 2 +- .../gremlin/result/processor/AbstractResultProcessor.java | 5 ++--- .../graphscope/gremlin/resultx/GremlinResultProcessor.java | 5 ++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java index 4771cc39d3a0..4d0503299780 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java @@ -185,7 +185,7 @@ public void execute( jobName, summary.getLogicalPlan(), summary.getPhysicalPlan()); - client.submit(request, listener, timeoutConfig); + client.submit(request, listener, timeoutConfig, statusCallback.getQueryLogger()); } }; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java index 419286a5532d..b75f83d89166 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java @@ -73,9 +73,8 @@ protected AbstractResultProcessor( msg.optionalArgs(Tokens.ARGS_BATCH_SIZE) .orElse(settings.resultIterationBatchSize); this.resultCollectors = new ArrayList<>(this.resultCollectorsBatchSize); - this.responseStreamIterator = - new StreamIterator<>( - FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs)); + int capacity = FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs); + this.responseStreamIterator = new StreamIterator<>(capacity); } @Override diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java index b1fe008adced..0f6524cab81c 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java @@ -59,9 +59,8 @@ public GremlinResultProcessor( this.statusCallback = statusCallback; this.timeoutConfig = timeoutConfig; this.reducer = Maps.newLinkedHashMap(); - this.recordStreamIterator = - new StreamIterator<>( - FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs)); + int capacity = FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs); + this.recordStreamIterator = new StreamIterator<>(capacity); } @Override From 3b6adce070708ec93b4c379c0e7bd4f59b0b3de9 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Thu, 10 Oct 2024 12:57:38 +0800 Subject: [PATCH 6/9] fix --- .../common/config/CoordinatorConfig.java | 2 +- .../groot/store/KafkaProcessor.java | 38 +++++++++++++------ .../graphscope/groot/store/StoreService.java | 6 ++- .../graphscope/groot/store/WriterAgent.java | 11 +++--- .../graphscope/groot/wal/LogEntry.java | 6 ++- 5 files changed, 43 insertions(+), 20 deletions(-) diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java index 2dcbbaf704f2..f3d11169dfa4 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java @@ -17,7 +17,7 @@ public class CoordinatorConfig { public static final Config SNAPSHOT_INCREASE_INTERVAL_MS = - Config.longConfig("snapshot.increase.interval.ms", 1000L); + Config.longConfig("snapshot.increase.interval.ms", 2000L); public static final Config OFFSETS_PERSIST_INTERVAL_MS = Config.longConfig("offsets.persist.interval.ms", 1000L); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index a5a27a76396e..ac032bda0ed2 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -58,7 +59,7 @@ public class KafkaProcessor { BlockingQueue> writeQueue; BlockingQueue replayQueue; - boolean replayInProgress = false; + private AtomicBoolean replayInProgress; private AtomicLong latestSnapshotId; public KafkaProcessor( @@ -82,6 +83,8 @@ public KafkaProcessor( writeQueue = new ArrayBlockingQueue<>(queueSize); replayQueue = new ArrayBlockingQueue<>(queueSize); latestSnapshotId = new AtomicLong(-1); + replayInProgress = new AtomicBoolean(false); + } public void start() { @@ -275,10 +278,11 @@ public void replayWAL() throws IOException { int replayCount = 0; try (LogReader logReader = this.logService.createReader(storeId, replayFrom)) { - replayInProgress = true; +// replayInProgress.set(true); ConsumerRecord record; while ((record = logReader.readNextRecord()) != null) { - replayQueue.put(new ReadLogEntry(record.offset(), record.value())); +// writeQueue.put(new ReadLogEntry(record.offset(), record.value())); + writeQueue.put(record); replayCount++; if (replayCount % 10000 == 0) { logger.info("replayed {} records", replayCount); @@ -286,9 +290,11 @@ public void replayWAL() throws IOException { } } catch (InterruptedException e) { throw new RuntimeException(e); - } finally { - replayInProgress = false; } + +// } finally { +// replayInProgress.set(false); +// } logger.info("replayWAL finished. total replayed [{}] records", replayCount); } @@ -297,15 +303,24 @@ private void processRecords() { while (true) { long offset; LogEntry logEntry; - if (!replayQueue.isEmpty()) { + if (replayInProgress.get() || !replayQueue.isEmpty()) { + if (replayQueue.isEmpty()) { + Thread.sleep(10); + continue; + } ReadLogEntry readLogEntry = replayQueue.take(); offset = readLogEntry.getOffset(); logEntry = readLogEntry.getLogEntry(); + logEntry.setSnapshotId(latestSnapshotId.get()); +// logger.info("polled from replay queue, offset {}, id {}", offset, logEntry.getSnapshotId()); + } else { ConsumerRecord record = writeQueue.take(); offset = record.offset(); logEntry = record.value(); latestSnapshotId.set(logEntry.getSnapshotId()); +// logger.info("polled from write queue, offset {}, id {}", offset, latestSnapshotId.get()); + } processRecord(offset, logEntry); } @@ -349,14 +364,13 @@ private List prepareDMLTypes() { public List replayDMLRecordsFrom(long offset, long timestamp) throws IOException { List types = prepareDMLTypes(); logger.info("replay DML records of from offset [{}], ts [{}]", offset, timestamp); - + // Note this clear is necessary, as those records would be a subset of record range in + // new reader + replayInProgress.set(true); + writeQueue.clear(); long batchSnapshotId; int replayCount = 0; try (LogReader logReader = this.logService.createReader(storeId, offset, timestamp)) { - replayInProgress = true; - // Note this clear is necessary, as those records would be a subset of record range in - // new reader - writeQueue.clear(); ReadLogEntry readLogEntry; batchSnapshotId = latestSnapshotId.get(); while (!shouldStop && (readLogEntry = logReader.readNext()) != null) { @@ -376,7 +390,7 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc } catch (InterruptedException e) { throw new RuntimeException(e); } finally { - replayInProgress = false; + replayInProgress.set(false); } logger.info("replay DML records finished. total replayed [{}] records", replayCount); return List.of(batchSnapshotId); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 136d0639ae61..861d0a4790ed 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -17,6 +17,7 @@ import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.config.StoreConfig; +import com.alibaba.graphscope.groot.common.exception.ExternalStorageErrorException; import com.alibaba.graphscope.groot.common.exception.GrootException; import com.alibaba.graphscope.groot.common.exception.IllegalStateException; import com.alibaba.graphscope.groot.common.exception.InternalException; @@ -223,13 +224,16 @@ public boolean batchWrite(StoreDataBatch storeDataBatch) long snapshotId = storeDataBatch.getSnapshotId(); List> dataBatch = storeDataBatch.getDataBatch(); AtomicBoolean hasDdl = new AtomicBoolean(false); - int maxRetry = 10; + int maxRetry = 5; for (Map partitionToBatch : dataBatch) { while (!shouldStop && partitionToBatch.size() != 0 && maxRetry > 0) { partitionToBatch = writeStore(snapshotId, partitionToBatch, hasDdl); maxRetry--; } } + if (maxRetry == 0) { + throw new ExternalStorageErrorException("batchWrite failed after 5 attempts"); + } return hasDdl.get(); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index 8b5c63b27686..2a525be1d6cc 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -162,7 +162,7 @@ private void processBatches() { continue; } long batchSI = batch.getSnapshotId(); - logger.debug("polled one batch [" + batchSI + "]"); +// logger.debug("polled one batch [" + batchSI + "]"); boolean hasDdl = writeEngineWithRetry(batch); if (this.consumeSI < batchSI) { SnapshotInfo availSInfo = this.availSnapshotInfoRef.get(); @@ -171,9 +171,10 @@ private void processBatches() { this.consumeSI = batchSI; this.availSnapshotInfoRef.set(new SnapshotInfo(availSI, availDdlSI)); this.commitExecutor.execute(this::asyncCommit); - } else { // a flurry of batches with same snapshot ID - logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); } + // else { // a flurry of batches with same snapshot ID + // logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); + //} if (hasDdl) { this.consumeDdlSnapshotId = batchSI; } @@ -204,13 +205,13 @@ private void asyncCommit() { } private boolean writeEngineWithRetry(StoreDataBatch storeDataBatch) { - while (!shouldStop) { +// while (!shouldStop) { try { return this.storeService.batchWrite(storeDataBatch); } catch (Exception e) { logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e); } - } +// } return false; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java index a34bfdff5891..10a064f1e00d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java @@ -19,7 +19,7 @@ import java.util.Objects; public class LogEntry { - private final long snapshotId; + private long snapshotId; private final OperationBatch operationBatch; public LogEntry(long snapshotId, OperationBatch operationBatch) { @@ -48,6 +48,10 @@ public LogEntryPb toProto() { .build(); } + public void setSnapshotId(long snapshotId) { + this.snapshotId = snapshotId; + } + @Override public boolean equals(Object o) { if (this == o) return true; From 59fec6aa2d9f95689594120effdbd75da9186cd9 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Thu, 10 Oct 2024 14:51:43 +0800 Subject: [PATCH 7/9] snapshot id + 1 --- .../executor/store/mcsr/src/graph_partitioner.rs | 16 +++------------- .../groot/frontend/write/KafkaAppender.java | 2 +- .../graphscope/groot/store/KafkaProcessor.java | 2 +- 3 files changed, 5 insertions(+), 15 deletions(-) diff --git a/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs b/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs index 7d6519193767..f1fefb71d31d 100644 --- a/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs +++ b/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs @@ -214,19 +214,16 @@ impl GraphPartitioner { { let input_path = vertex_file .as_os_str() - .clone() .to_str() .unwrap(); let input_dir_path = self .input_dir .as_os_str() - .clone() .to_str() .unwrap(); let output_path = if let Some(pos) = input_path.find(input_dir_path) { self.partition_dir.join( input_path - .clone() .split_at(pos + input_dir_path.len() + 1) .1, ) @@ -260,13 +257,11 @@ impl GraphPartitioner { { let input_path = vertex_file .as_os_str() - .clone() .to_str() .unwrap(); let input_dir_path = self .input_dir .as_os_str() - .clone() .to_str() .unwrap(); let gz_loc = input_path.find(".gz").unwrap(); @@ -274,7 +269,6 @@ impl GraphPartitioner { let output_path = if let Some(pos) = input_path.find(input_dir_path) { self.partition_dir.join( input_path - .clone() .split_at(pos + input_dir_path.len() + 1) .1, ) @@ -327,18 +321,16 @@ impl GraphPartitioner { .unwrap() .ends_with(".csv") { - info!("{}", edge_file.as_os_str().clone().to_str().unwrap()); - let input_path = edge_file.as_os_str().clone().to_str().unwrap(); + info!("{}", edge_file.as_os_str().to_str().unwrap()); + let input_path = edge_file.as_os_str().to_str().unwrap(); let input_dir_path = self .input_dir .as_os_str() - .clone() .to_str() .unwrap(); let output_path = if let Some(pos) = input_path.find(input_dir_path) { self.partition_dir.join( input_path - .clone() .split_at(pos + input_dir_path.len() + 1) .1, ) @@ -373,11 +365,10 @@ impl GraphPartitioner { .unwrap() .ends_with(".csv.gz") { - let input_path = edge_file.as_os_str().clone().to_str().unwrap(); + let input_path = edge_file.as_os_str().to_str().unwrap(); let input_dir_path = self .input_dir .as_os_str() - .clone() .to_str() .unwrap(); let gz_loc = input_path.find(".gz").unwrap(); @@ -385,7 +376,6 @@ impl GraphPartitioner { let output_path = if let Some(pos) = input_path.find(input_dir_path) { self.partition_dir.join( input_path - .clone() .split_at(pos + input_dir_path.len() + 1) .1, ) diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java index 27e4b7b837e3..24a6c708ed3f 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java @@ -259,7 +259,7 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc logWriter.append(storeId, new LogEntry(batchSnapshotId, batch)); replayCount++; } - ids.add(batchSnapshotId); + ids.add(batchSnapshotId + 1); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index ac032bda0ed2..193298f2d8fb 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -393,6 +393,6 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc replayInProgress.set(false); } logger.info("replay DML records finished. total replayed [{}] records", replayCount); - return List.of(batchSnapshotId); + return List.of(batchSnapshotId + 1); } } From a7015a858c950d40b58ae12bc5de5b1ebdf014a7 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Tue, 22 Oct 2024 14:59:39 +0800 Subject: [PATCH 8/9] format --- .../ir/meta/fetcher/DynamicIrMetaFetcher.java | 5 +++-- .../cypher/executor/GraphQueryExecutor.java | 6 +++++- .../groot/store/KafkaProcessor.java | 17 ++++++++-------- .../graphscope/groot/store/WriterAgent.java | 20 +++++++++---------- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index d01954dbccb7..992895c4fdaa 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -56,8 +56,9 @@ public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTrac 0, GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs), TimeUnit.MILLISECONDS); - this.fetchStats = PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs) - && PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO"); + this.fetchStats = + PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs) + && PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO"); if (this.fetchStats) { logger.info("start to schedule statistics fetch task"); this.scheduler.scheduleAtFixedRate( diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java index 4d0503299780..1d6eeaaacc25 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java @@ -185,7 +185,11 @@ public void execute( jobName, summary.getLogicalPlan(), summary.getPhysicalPlan()); - client.submit(request, listener, timeoutConfig, statusCallback.getQueryLogger()); + client.submit( + request, + listener, + timeoutConfig, + statusCallback.getQueryLogger()); } }; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index 193298f2d8fb..6394628f6bc4 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -84,7 +84,6 @@ public KafkaProcessor( replayQueue = new ArrayBlockingQueue<>(queueSize); latestSnapshotId = new AtomicLong(-1); replayInProgress = new AtomicBoolean(false); - } public void start() { @@ -278,10 +277,10 @@ public void replayWAL() throws IOException { int replayCount = 0; try (LogReader logReader = this.logService.createReader(storeId, replayFrom)) { -// replayInProgress.set(true); + // replayInProgress.set(true); ConsumerRecord record; while ((record = logReader.readNextRecord()) != null) { -// writeQueue.put(new ReadLogEntry(record.offset(), record.value())); + // writeQueue.put(new ReadLogEntry(record.offset(), record.value())); writeQueue.put(record); replayCount++; if (replayCount % 10000 == 0) { @@ -292,9 +291,9 @@ public void replayWAL() throws IOException { throw new RuntimeException(e); } -// } finally { -// replayInProgress.set(false); -// } + // } finally { + // replayInProgress.set(false); + // } logger.info("replayWAL finished. total replayed [{}] records", replayCount); } @@ -312,14 +311,16 @@ private void processRecords() { offset = readLogEntry.getOffset(); logEntry = readLogEntry.getLogEntry(); logEntry.setSnapshotId(latestSnapshotId.get()); -// logger.info("polled from replay queue, offset {}, id {}", offset, logEntry.getSnapshotId()); + // logger.info("polled from replay queue, offset {}, id {}", + // offset, logEntry.getSnapshotId()); } else { ConsumerRecord record = writeQueue.take(); offset = record.offset(); logEntry = record.value(); latestSnapshotId.set(logEntry.getSnapshotId()); -// logger.info("polled from write queue, offset {}, id {}", offset, latestSnapshotId.get()); + // logger.info("polled from write queue, offset {}, id {}", + // offset, latestSnapshotId.get()); } processRecord(offset, logEntry); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index 2a525be1d6cc..aad4ffa45112 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -162,7 +162,7 @@ private void processBatches() { continue; } long batchSI = batch.getSnapshotId(); -// logger.debug("polled one batch [" + batchSI + "]"); + // logger.debug("polled one batch [" + batchSI + "]"); boolean hasDdl = writeEngineWithRetry(batch); if (this.consumeSI < batchSI) { SnapshotInfo availSInfo = this.availSnapshotInfoRef.get(); @@ -173,8 +173,8 @@ private void processBatches() { this.commitExecutor.execute(this::asyncCommit); } // else { // a flurry of batches with same snapshot ID - // logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); - //} + // logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); + // } if (hasDdl) { this.consumeDdlSnapshotId = batchSI; } @@ -205,13 +205,13 @@ private void asyncCommit() { } private boolean writeEngineWithRetry(StoreDataBatch storeDataBatch) { -// while (!shouldStop) { - try { - return this.storeService.batchWrite(storeDataBatch); - } catch (Exception e) { - logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e); - } -// } + // while (!shouldStop) { + try { + return this.storeService.batchWrite(storeDataBatch); + } catch (Exception e) { + logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e); + } + // } return false; } From 3f35b27e84366db0e8fa5a45c0c7423003828b68 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Tue, 29 Oct 2024 11:51:41 +0800 Subject: [PATCH 9/9] rust format --- .../store/mcsr/src/graph_partitioner.rs | 34 ++++--------------- 1 file changed, 6 insertions(+), 28 deletions(-) diff --git a/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs b/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs index f1fefb71d31d..3e1b39ce51f9 100644 --- a/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs +++ b/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs @@ -212,15 +212,8 @@ impl GraphPartitioner { .unwrap() .ends_with(".csv") { - let input_path = vertex_file - .as_os_str() - .to_str() - .unwrap(); - let input_dir_path = self - .input_dir - .as_os_str() - .to_str() - .unwrap(); + let input_path = vertex_file.as_os_str().to_str().unwrap(); + let input_dir_path = self.input_dir.as_os_str().to_str().unwrap(); let output_path = if let Some(pos) = input_path.find(input_dir_path) { self.partition_dir.join( input_path @@ -255,15 +248,8 @@ impl GraphPartitioner { .unwrap() .ends_with(".csv.gz") { - let input_path = vertex_file - .as_os_str() - .to_str() - .unwrap(); - let input_dir_path = self - .input_dir - .as_os_str() - .to_str() - .unwrap(); + let input_path = vertex_file.as_os_str().to_str().unwrap(); + let input_dir_path = self.input_dir.as_os_str().to_str().unwrap(); let gz_loc = input_path.find(".gz").unwrap(); let input_path = input_path.split_at(gz_loc).0; let output_path = if let Some(pos) = input_path.find(input_dir_path) { @@ -323,11 +309,7 @@ impl GraphPartitioner { { info!("{}", edge_file.as_os_str().to_str().unwrap()); let input_path = edge_file.as_os_str().to_str().unwrap(); - let input_dir_path = self - .input_dir - .as_os_str() - .to_str() - .unwrap(); + let input_dir_path = self.input_dir.as_os_str().to_str().unwrap(); let output_path = if let Some(pos) = input_path.find(input_dir_path) { self.partition_dir.join( input_path @@ -366,11 +348,7 @@ impl GraphPartitioner { .ends_with(".csv.gz") { let input_path = edge_file.as_os_str().to_str().unwrap(); - let input_dir_path = self - .input_dir - .as_os_str() - .to_str() - .unwrap(); + let input_dir_path = self.input_dir.as_os_str().to_str().unwrap(); let gz_loc = input_path.find(".gz").unwrap(); let input_path = input_path.split_at(gz_loc).0; let output_path = if let Some(pos) = input_path.find(input_dir_path) {