Skip to content

Commit

Permalink
Merge branch 'main' into refine-error-reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei1949 authored Aug 14, 2024
2 parents 1f04d22 + 11a2b63 commit e028c0e
Show file tree
Hide file tree
Showing 41 changed files with 400 additions and 98 deletions.
4 changes: 3 additions & 1 deletion analytical_engine/apps/flash/flash_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ limitations under the License.
#include <memory>
#include <vector>

#include "flash/flash_ware.h"
#include "grape/grape.h"

#include "core/context/tensor_context.h"
#include "flash/flash_ware.h"

namespace gs {

/**
Expand Down
3 changes: 2 additions & 1 deletion analytical_engine/apps/flash/flash_ware.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limitations under the License.
#include "grape/parallel/parallel_message_manager.h"
#include "grape/worker/comm_spec.h"

#include "core/config.h"
#include "flash/flash_bitset.h"
#include "flash/vertex_subset.h"

Expand Down Expand Up @@ -141,7 +142,7 @@ class FlashWare : public grape::Communicator, public grape::ParallelEngine {
vid_t n_;
vid_t n_loc_;
fid_t pid_;
int n_procs_;
fid_t n_procs_;
int n_threads_;
std::vector<vid_t> masters_;
std::vector<vid_t> mirrors_;
Expand Down
2 changes: 2 additions & 0 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ data:
graph.physical.opt={{ .Values.graphPhysicalOpt }}
gremlin.script.language.name={{ .Values.gremlinScriptLanguageName }}
query.execution.timeout.ms={{ .Values.queryExecutionTimeoutMs }}
graph.planner.join.min.pattern.size={{ .Values.graphPlannerJoinMinPatternSize }}
graph.planner.cbo.glogue.size={{ .Values.graphPlannerCboGlogueSize }}

log4rs.config=LOG4RS_CONFIG
## Auth config
Expand Down
4 changes: 3 additions & 1 deletion charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ storeDataPath: "/var/lib/graphscope-store"
storeDataDownloadPath: "/var/lib/graphscope-store/download"
storeDataSecondaryPath: "/home/graphscope/secondary"
storeWriteThreadCount: 1
storeQueueBufferSize: 102400
storeQueueBufferSize: "1024000"

storeGcIntervalMs: 5000

Expand Down Expand Up @@ -591,6 +591,8 @@ graphPlannerRules: FilterIntoJoinRule, FilterMatchRule, ExtendIntersectRule, Exp
gremlinScriptLanguageName: antlr_gremlin_traversal
graphPhysicalOpt: ffi
queryExecutionTimeoutMs: 600000
graphPlannerJoinMinPatternSize: 5
graphPlannerCboGlogueSize: 3

## Key-value pair separated by ;
## For example extraConfig="k1=v1;k2=v2"
Expand Down
1 change: 1 addition & 0 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,7 @@ seastar::future<admin_query_result> admin_actor::service_status(
auto running_graph_res = metadata_store_->GetRunningGraph();
nlohmann::json res;
if (query_port != 0) {
res["statistics_enabled"] = true; // default is true
res["status"] =
graph_db_service.is_actors_running() ? "Running" : "Stopped";
res["hqps_port"] = query_port;
Expand Down
2 changes: 2 additions & 0 deletions flex/openapi/openapi_interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1791,6 +1791,8 @@ components:
x-body-name: service_status
type: object
properties:
statistics_enabled: # indicate whether the graph statistics interface is enabled
type: boolean
status:
type: string
graph:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class StoreConfig {
Config.intConfig("store.write.thread.count", 1);

public static final Config<Integer> STORE_QUEUE_BUFFER_SIZE =
Config.intConfig("store.queue.buffer.size", 102400);
Config.intConfig("store.queue.buffer.size", 1024000);

public static final Config<Long> STORE_QUEUE_WAIT_MS =
Config.longConfig("store.queue.wait.ms", 3000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public interface SchemaFetcher {
int getPartitionNum();

int getVersion();

boolean statisticsEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.GraphConfig;
import com.alibaba.graphscope.common.ir.meta.GraphId;
import com.alibaba.graphscope.common.ir.meta.IrMeta;
import com.alibaba.graphscope.common.ir.meta.IrMetaStats;
import com.alibaba.graphscope.common.ir.meta.IrMetaTracker;
Expand All @@ -44,18 +45,19 @@ public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable
private volatile IrMetaStats currentState;
// To manage the state changes of statistics resulting from different update operations.
private volatile StatsState statsState;
private volatile Boolean statsEnabled = null;

public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) {
super(dataReader, tracker);
this.scheduler = new ScheduledThreadPoolExecutor(2);
this.scheduler.scheduleAtFixedRate(
() -> syncMeta(),
0,
2000,
GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs),
TimeUnit.MILLISECONDS);
this.scheduler.scheduleAtFixedRate(
() -> syncStats(),
0,
() -> syncStats(statsEnabled == null ? false : statsEnabled),
2000,
GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs),
TimeUnit.MILLISECONDS);
}
Expand All @@ -68,10 +70,14 @@ public Optional<IrMeta> fetch() {
private synchronized void syncMeta() {
try {
IrMeta meta = this.reader.readMeta();
logger.debug(
"schema from remote: {}",
(meta == null) ? null : meta.getSchema().schemaJson());
GraphStatistics curStats;
// if the graph id is changed, we need to update the statistics
if (this.currentState == null
|| !this.currentState.getGraphId().equals(meta.getGraphId())) {
|| !this.currentState.getGraphId().equals(meta.getGraphId())
|| this.currentState.getSnapshotId().getId() != meta.getSnapshotId().getId()) {
this.statsState = StatsState.INITIALIZED;
curStats = null;
} else {
Expand All @@ -84,39 +90,61 @@ private synchronized void syncMeta() {
meta.getSchema(),
meta.getStoredProcedures(),
curStats);
if (this.statsState != StatsState.SYNCED) {
syncStats();
boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId());
if (statsEnabled && this.statsState != StatsState.SYNCED
|| (!statsEnabled && this.statsState != StatsState.MOCKED)) {
logger.debug("start to sync stats");
syncStats(statsEnabled);
}
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("failed to read meta data, error is {}", e);
}
}

private synchronized void syncStats() {
private boolean getStatsEnabled(GraphId graphId) {
try {
if (this.currentState != null) {
return this.statsEnabled == null
? this.reader.syncStatsEnabled(graphId)
: this.statsEnabled;
} catch (
Throwable e) { // if errors happen when reading stats enabled, we assume it is false
logger.warn("failed to read stats enabled, error is {}", e);
return false;
}
}

private synchronized void syncStats(boolean statsEnabled) {
try {
if (this.currentState != null && statsEnabled) {
GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId());
if (stats != null) {
logger.debug("statistics from remote: {}", stats);
if (stats != null && stats.getVertexCount() != 0) {
this.currentState =
new IrMetaStats(
this.currentState.getSnapshotId(),
this.currentState.getSchema(),
this.currentState.getStoredProcedures(),
stats);
if (tracker != null) {
logger.debug("start to update the glogue");
tracker.onChanged(this.currentState);
}
this.statsState = StatsState.SYNCED;
}
}
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("failed to read graph statistics, error is {}", e);
} finally {
if (this.currentState != null
&& tracker != null
&& this.statsState == StatsState.INITIALIZED) {
tracker.onChanged(this.currentState);
this.statsState = StatsState.MOCKED;
try {
if (this.currentState != null
&& tracker != null
&& this.statsState == StatsState.INITIALIZED) {
logger.debug("start to mock the glogue");
tracker.onChanged(this.currentState);
this.statsState = StatsState.MOCKED;
}
} catch (Throwable t) {
logger.warn("failed to mock the glogue, error is {}", t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,23 @@ public IrGraphStatistics readStats(GraphId graphId) throws IOException {
}
}

@Override
public boolean syncStatsEnabled(GraphId graphId) throws IOException {
try {
HttpResponse<String> response =
sendRequest(GraphConfig.GRAPH_META_SCHEMA_URI.get(configs));
String res = response.body();
Preconditions.checkArgument(
response.statusCode() == 200,
"read service status fail, status code: %s, error message: %s",
response.statusCode(),
res);
return getStaticEnabled(res);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private HttpResponse<String> sendRequest(String requestUri)
throws IOException, InterruptedException {
HttpRequest request =
Expand All @@ -128,4 +145,14 @@ private Pair<GraphId, String> convertMetaFromJsonToYaml(String metaInJson) throw
Yaml yaml = new Yaml();
return Pair.with(graphId, yaml.dump(metaMap));
}

private boolean getStaticEnabled(String metaInJson) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(metaInJson);
Map<String, Object> rootMap = mapper.convertValue(rootNode, Map.class);
if (rootMap.containsKey("statistics_enabled")) {
return (boolean) rootMap.get("statistics_enabled");
}
return false; // default value
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ public interface IrMetaReader {

// get statistics from a graph referenced by graphId
GraphStatistics readStats(GraphId graphId) throws IOException;

// a synchronous invocation to check whether statistics functionality is enabled in the backend
boolean syncStatsEnabled(GraphId graphId) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,10 @@ public IrGraphStatistics readStats(GraphId graphId) throws IOException {
(statsURI.getScheme() == null) ? Path.of(statsURI.getPath()) : Path.of(statsURI);
return new IrGraphStatistics(new FileInputStream(statsPath.toFile()));
}

@Override
public boolean syncStatsEnabled(GraphId graphId) {
String statsUri = GraphConfig.GRAPH_META_STATISTICS_URI.get(configs);
return !statsUri.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public Double getCardinality(Pattern queryPattern) {
return this.patternCardinality.get(pattern);
}
}
return 0.0;
// if not exist, return 1.0
return 1.0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.jgrapht.Graph;
import org.jgrapht.graph.DirectedPseudograph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
Expand All @@ -30,6 +32,7 @@ public class GlogueSchema {
private Graph<Integer, EdgeTypeId> schemaGraph;
private HashMap<Integer, Double> vertexTypeCardinality;
private HashMap<EdgeTypeId, Double> edgeTypeCardinality;
private static Logger logger = LoggerFactory.getLogger(GlogueSchema.class);

public GlogueSchema(
GraphSchema graphSchema,
Expand Down Expand Up @@ -69,6 +72,7 @@ public GlogueSchema(GraphSchema graphSchema) {
edgeTypeCardinality.put(edgeType, 1.0);
}
}
logger.debug("GlogueSchema created with default cardinality 1.0: {}", this);
}

public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) {
Expand Down Expand Up @@ -108,6 +112,7 @@ public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) {
}
}
}
logger.debug("GlogueSchema created with statistics: {}", this);
}

public static GlogueSchema fromMeta(IrMetaStats irMeta) {
Expand Down Expand Up @@ -139,7 +144,9 @@ public List<EdgeTypeId> getEdgeTypes(Integer source, Integer target) {
public Double getVertexTypeCardinality(Integer vertexType) {
Double cardinality = this.vertexTypeCardinality.get(vertexType);
if (cardinality == null) {
return 0.0;
logger.debug(
"Vertex type {} not found in schema, assuming cardinality 1.0", vertexType);
return 1.0;
} else {
return cardinality;
}
Expand All @@ -148,7 +155,8 @@ public Double getVertexTypeCardinality(Integer vertexType) {
public Double getEdgeTypeCardinality(EdgeTypeId edgeType) {
Double cardinality = this.edgeTypeCardinality.get(edgeType);
if (cardinality == null) {
return 0.0;
logger.debug("Edge type {} not found in schema, assuming cardinality 1.0", edgeType);
return 1.0;
} else {
return cardinality;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public RelNode visit(GraphLogicalSource source) {
queryParamsBuilder,
Utils.extractColumnsFromRelDataType(source.getRowType(), isColumnId));
}
scanBuilder.setParams(buildQueryParams(source));
scanBuilder.setParams(queryParamsBuilder);
if (source.getAliasId() != AliasInference.DEFAULT_ID) {
scanBuilder.setAlias(Utils.asAliasId(source.getAliasId()));
}
Expand Down Expand Up @@ -396,6 +396,8 @@ public RelNode visit(LogicalFilter filter) {
Map<Integer, Set<GraphNameOrId>> tagColumns =
Utils.extractTagColumnsFromRexNodes(List.of(filter.getCondition()));
if (preCacheEdgeProps) {
// Currently, we've already precache edge properties and path properties, so we
// need to remove them. So as the follows.
Utils.removeEdgeProperties(
com.alibaba.graphscope.common.ir.tools.Utils.getOutputType(
filter.getInput()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.graphscope.common.ir.tools.config.GraphOpt;
import com.alibaba.graphscope.common.ir.type.GraphLabelType;
import com.alibaba.graphscope.common.ir.type.GraphNameOrId;
import com.alibaba.graphscope.common.ir.type.GraphPathType;
import com.alibaba.graphscope.common.ir.type.GraphProperty;
import com.alibaba.graphscope.common.ir.type.GraphSchemaType;
import com.alibaba.graphscope.gaia.proto.*;
Expand Down Expand Up @@ -738,7 +739,7 @@ public static Set<GraphNameOrId> extractColumnsFromRelDataType(
return columns;
}

// remove edge properties from columns by checking if the tags refers to edge type
// remove properties from columns by checking if the tags refers to edge type or path type
public static void removeEdgeProperties(
RelDataType inputDataType, Map<Integer, Set<GraphNameOrId>> tagColumns) {
List<RelDataTypeField> fieldTypes = inputDataType.getFieldList();
Expand All @@ -750,19 +751,25 @@ public static void removeEdgeProperties(
&& GraphOpt.Source.EDGE.equals(
((GraphSchemaType) headFieldType.getType()).getScanOpt())) {
tags.remove(AliasInference.DEFAULT_ID);
} else if (headFieldType.getType() instanceof GraphPathType) {
tags.remove(AliasInference.DEFAULT_ID);
}
}

if (tags.isEmpty()) {
return;
}
// then, process other tags by checking if they are of edge type
// then, process other tags by checking if they are of edge type or path type
List<Integer> removeKeys = new ArrayList<>();
for (RelDataTypeField fieldType : fieldTypes) {
if (tags.contains(fieldType.getIndex())
&& fieldType.getType() instanceof GraphSchemaType
&& GraphOpt.Source.EDGE.equals(
((GraphSchemaType) fieldType.getType()).getScanOpt())) {
removeKeys.add(fieldType.getIndex());
if (tags.contains(fieldType.getIndex())) {
if (fieldType.getType() instanceof GraphSchemaType
&& GraphOpt.Source.EDGE.equals(
((GraphSchemaType) fieldType.getType()).getScanOpt())) {
removeKeys.add(fieldType.getIndex());
} else if (fieldType.getType() instanceof GraphPathType) {
removeKeys.add(fieldType.getIndex());
}
}
}
tagColumns.keySet().removeAll(removeKeys);
Expand Down
Loading

0 comments on commit e028c0e

Please sign in to comment.