From a7015a858c950d40b58ae12bc5de5b1ebdf014a7 Mon Sep 17 00:00:00 2001 From: siyuan0322 Date: Tue, 22 Oct 2024 14:59:39 +0800 Subject: [PATCH] format --- .../ir/meta/fetcher/DynamicIrMetaFetcher.java | 5 +++-- .../cypher/executor/GraphQueryExecutor.java | 6 +++++- .../groot/store/KafkaProcessor.java | 17 ++++++++-------- .../graphscope/groot/store/WriterAgent.java | 20 +++++++++---------- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index d01954dbccb7..992895c4fdaa 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -56,8 +56,9 @@ public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTrac 0, GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs), TimeUnit.MILLISECONDS); - this.fetchStats = PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs) - && PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO"); + this.fetchStats = + PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs) + && PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO"); if (this.fetchStats) { logger.info("start to schedule statistics fetch task"); this.scheduler.scheduleAtFixedRate( diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java index 4d0503299780..1d6eeaaacc25 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java @@ -185,7 +185,11 @@ public void execute( jobName, summary.getLogicalPlan(), summary.getPhysicalPlan()); - client.submit(request, listener, timeoutConfig, statusCallback.getQueryLogger()); + client.submit( + request, + listener, + timeoutConfig, + statusCallback.getQueryLogger()); } }; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index 193298f2d8fb..6394628f6bc4 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -84,7 +84,6 @@ public KafkaProcessor( replayQueue = new ArrayBlockingQueue<>(queueSize); latestSnapshotId = new AtomicLong(-1); replayInProgress = new AtomicBoolean(false); - } public void start() { @@ -278,10 +277,10 @@ public void replayWAL() throws IOException { int replayCount = 0; try (LogReader logReader = this.logService.createReader(storeId, replayFrom)) { -// replayInProgress.set(true); + // replayInProgress.set(true); ConsumerRecord record; while ((record = logReader.readNextRecord()) != null) { -// writeQueue.put(new ReadLogEntry(record.offset(), record.value())); + // writeQueue.put(new ReadLogEntry(record.offset(), record.value())); writeQueue.put(record); replayCount++; if (replayCount % 10000 == 0) { @@ -292,9 +291,9 @@ public void replayWAL() throws IOException { throw new RuntimeException(e); } -// } finally { -// replayInProgress.set(false); -// } + // } finally { + // replayInProgress.set(false); + // } logger.info("replayWAL finished. total replayed [{}] records", replayCount); } @@ -312,14 +311,16 @@ private void processRecords() { offset = readLogEntry.getOffset(); logEntry = readLogEntry.getLogEntry(); logEntry.setSnapshotId(latestSnapshotId.get()); -// logger.info("polled from replay queue, offset {}, id {}", offset, logEntry.getSnapshotId()); + // logger.info("polled from replay queue, offset {}, id {}", + // offset, logEntry.getSnapshotId()); } else { ConsumerRecord record = writeQueue.take(); offset = record.offset(); logEntry = record.value(); latestSnapshotId.set(logEntry.getSnapshotId()); -// logger.info("polled from write queue, offset {}, id {}", offset, latestSnapshotId.get()); + // logger.info("polled from write queue, offset {}, id {}", + // offset, latestSnapshotId.get()); } processRecord(offset, logEntry); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index 2a525be1d6cc..aad4ffa45112 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -162,7 +162,7 @@ private void processBatches() { continue; } long batchSI = batch.getSnapshotId(); -// logger.debug("polled one batch [" + batchSI + "]"); + // logger.debug("polled one batch [" + batchSI + "]"); boolean hasDdl = writeEngineWithRetry(batch); if (this.consumeSI < batchSI) { SnapshotInfo availSInfo = this.availSnapshotInfoRef.get(); @@ -173,8 +173,8 @@ private void processBatches() { this.commitExecutor.execute(this::asyncCommit); } // else { // a flurry of batches with same snapshot ID - // logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); - //} + // logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); + // } if (hasDdl) { this.consumeDdlSnapshotId = batchSI; } @@ -205,13 +205,13 @@ private void asyncCommit() { } private boolean writeEngineWithRetry(StoreDataBatch storeDataBatch) { -// while (!shouldStop) { - try { - return this.storeService.batchWrite(storeDataBatch); - } catch (Exception e) { - logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e); - } -// } + // while (!shouldStop) { + try { + return this.storeService.batchWrite(storeDataBatch); + } catch (Exception e) { + logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e); + } + // } return false; }