From cd7879c5bcb85f6e489efa772edc56d3afe067e5 Mon Sep 17 00:00:00 2001 From: Xiaoli Zhou Date: Thu, 21 Nov 2024 15:31:20 +0800 Subject: [PATCH] fix(interactive): Fix Bugs of OOM in CBO (#4330) ## What do these changes do? as titled. ## Related issue number Fixes --- .../ir/meta/fetcher/DynamicIrMetaFetcher.java | 105 ++++++++++-------- .../planner/rules/JoinDecompositionRule.java | 16 +-- .../ir/planner/volcano/VolcanoPlannerX.java | 6 + 3 files changed, 75 insertions(+), 52 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index 992895c4fdaa..71be66ef2d20 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.GraphConfig; import com.alibaba.graphscope.common.config.PlannerConfig; +import com.alibaba.graphscope.common.ir.meta.GraphId; import com.alibaba.graphscope.common.ir.meta.IrMeta; import com.alibaba.graphscope.common.ir.meta.IrMetaStats; import com.alibaba.graphscope.common.ir.meta.IrMetaTracker; @@ -46,26 +47,27 @@ public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable private volatile IrMetaStats currentState; // To manage the state changes of statistics resulting from different update operations. private volatile StatsState statsState; - private final boolean fetchStats; + private volatile Boolean statsEnabled = null; public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) { super(dataReader, tracker); this.scheduler = new ScheduledThreadPoolExecutor(1); - this.scheduler.scheduleAtFixedRate( - () -> syncMeta(), - 0, - GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs), - TimeUnit.MILLISECONDS); - this.fetchStats = + long schemaIntervalMS = GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs); + if (schemaIntervalMS > 0) { + logger.info("start to schedule the schema sync task per {} ms", schemaIntervalMS); + this.scheduler.scheduleAtFixedRate( + () -> syncMeta(), schemaIntervalMS, schemaIntervalMS, TimeUnit.MILLISECONDS); + } + boolean isCBOMode = PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs) - && PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO"); - if (this.fetchStats) { - logger.info("start to schedule statistics fetch task"); + && PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equalsIgnoreCase("CBO"); + long statsIntervalMS = GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs); + if (!isCBOMode || statsIntervalMS <= 0) { + this.statsEnabled = false; + } else { + logger.info("start to schedule the stats sync task per {} ms", statsIntervalMS); this.scheduler.scheduleAtFixedRate( - () -> syncStats(), - 2000, - GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs), - TimeUnit.MILLISECONDS); + () -> syncStats(), statsIntervalMS, statsIntervalMS, TimeUnit.MILLISECONDS); } } @@ -80,7 +82,6 @@ private synchronized void syncMeta() { logger.debug( "schema from remote: {}", (meta == null) ? null : meta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON)); - GraphStatistics curStats; // if the graph id or schema version is changed, we need to update the statistics if (this.currentState == null || !this.currentState.getGraphId().equals(meta.getGraphId()) @@ -89,58 +90,74 @@ private synchronized void syncMeta() { .getVersion() .equals(meta.getSchema().getVersion())) { this.statsState = StatsState.INITIALIZED; - curStats = null; - } else { - curStats = this.currentState.getStatistics(); + this.currentState = + new IrMetaStats( + meta.getGraphId(), + meta.getSnapshotId(), + meta.getSchema(), + meta.getStoredProcedures(), + null); } - this.currentState = - new IrMetaStats( - meta.getGraphId(), - meta.getSnapshotId(), - meta.getSchema(), - meta.getStoredProcedures(), - curStats); - if (this.fetchStats && this.statsState != StatsState.SYNCED) { - logger.info("start to schedule statistics fetch task"); + boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId()); + if (statsEnabled && this.statsState != StatsState.SYNCED + || (!statsEnabled && this.statsState != StatsState.MOCKED)) { + logger.debug("start to sync stats"); syncStats(); } } catch (Throwable e) { - logger.warn("failed to read meta data", e); + logger.warn("failed to read meta data, error is {}", e); + } + } + + private boolean getStatsEnabled(GraphId graphId) { + try { + this.statsEnabled = + (this.statsEnabled == null) + ? this.reader.syncStatsEnabled(graphId) + : this.statsEnabled; + return this.statsEnabled; + } catch ( + Throwable e) { // if errors happen when reading stats enabled, we assume it is false + logger.warn("failed to read stats enabled, error is {}", e); + return false; } } private synchronized void syncStats() { try { if (this.currentState != null) { - GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId()); - logger.debug("statistics from remote: {}", stats); - if (stats != null && stats.getVertexCount() != 0) { - this.currentState = - new IrMetaStats( - this.currentState.getSnapshotId(), - this.currentState.getSchema(), - this.currentState.getStoredProcedures(), - stats); - if (tracker != null) { - logger.debug("start to update the glogue"); - tracker.onChanged(this.currentState); + boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId()); + if (statsEnabled) { + GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId()); + logger.debug("statistics from remote: {}", stats); + if (stats != null && stats.getVertexCount() != 0) { + this.currentState = + new IrMetaStats( + this.currentState.getSnapshotId(), + this.currentState.getSchema(), + this.currentState.getStoredProcedures(), + stats); + if (tracker != null) { + logger.info("start to update the glogue"); + tracker.onChanged(this.currentState); + } + this.statsState = StatsState.SYNCED; } - this.statsState = StatsState.SYNCED; } } } catch (Throwable e) { - logger.warn("failed to read graph statistics, error is", e); + logger.warn("failed to read graph statistics, error is {}", e); } finally { try { if (this.currentState != null && tracker != null && this.statsState == StatsState.INITIALIZED) { - logger.debug("start to mock the glogue"); + logger.info("start to mock the glogue"); tracker.onChanged(this.currentState); this.statsState = StatsState.MOCKED; } } catch (Throwable t) { - logger.warn("failed to mock the glogue, error is", t); + logger.warn("failed to mock the glogue, error is {}", t); } } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/JoinDecompositionRule.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/JoinDecompositionRule.java index b49f2115deab..ea8003ac63b9 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/JoinDecompositionRule.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/JoinDecompositionRule.java @@ -49,10 +49,7 @@ public void onMatch(RelOptRuleCall relOptRuleCall) { // specific optimization for relational DB scenario. // 3. `JoinByEdge`: Split the pattern by edge, convert a triangle pattern to `JoinByEdge` to // support optimizations in Neo4j. - if (getMaxEdgeNum(graphPattern.getPattern()) > 2) { - (new JoinByVertex(graphPattern, mq, decompositionQueue, queueCapacity)) - .addDecompositions(); - } + (new JoinByVertex(graphPattern, mq, decompositionQueue, queueCapacity)).addDecompositions(); if (config.getForeignKeyMeta() != null) { (new JoinByForeignKey(graphPattern, mq, decompositionQueue, queueCapacity)) .addDecompositions(); @@ -311,10 +308,13 @@ public JoinByVertex( @Override public void addDecompositions() { - List queues = initDecompositions(); - while (!queues.isEmpty()) { - List nextCompositions = getDecompositions(queues.remove(0)); - queues.addAll(nextCompositions); + if (getMaxEdgeNum(graphPattern.getPattern()) > 2) { + List queues = initDecompositions(); + while (!queues.isEmpty()) { + List nextCompositions = + getDecompositions(queues.remove(0)); + queues.addAll(nextCompositions); + } } addPxdInnerVDecompositions(); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/volcano/VolcanoPlannerX.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/volcano/VolcanoPlannerX.java index 4e468b60aa58..73df191c473a 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/volcano/VolcanoPlannerX.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/volcano/VolcanoPlannerX.java @@ -22,6 +22,7 @@ import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.ConventionTraitDef; import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.plan.volcano.VolcanoPlanner; import org.apache.calcite.rel.RelNode; @@ -59,4 +60,9 @@ protected RelOptCost upperBoundForInputs(RelNode mExpr, RelOptCost upperBound) { if (relCost == null) return null; return cost.plus(relCost); } + + @Override + public void registerSchema(RelOptSchema schema) { + // do nothing + } }