Skip to content

Commit

Permalink
refactor(interactive): Support Fetching Meta By Calling Procedure in …
Browse files Browse the repository at this point in the history
…Compiler (#4215)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?
To get schema, run `call graph.procedure.meta.schema();`, which returns
the schema in [FLEX
specification](https://app.swaggerhub.com/apis/GRAPHSCOPE/flex-api/1.0.0#/Graph/importSchemaById)
in json string;
To get statistiscs, run `call graph.procedure.meta.statistics();`, which
returns the statistics in [json
string](https://github.com/alibaba/GraphScope/blob/main/interactive_engine/compiler/src/test/resources/statistics/modern_statistics.json);

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes
  • Loading branch information
shirly121 authored Sep 11, 2024
1 parent 405cc52 commit d13f298
Show file tree
Hide file tree
Showing 20 changed files with 495 additions and 75 deletions.
9 changes: 9 additions & 0 deletions docs/interactive_engine/neo4j/supported_cypher.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,12 @@ RETURN a, b;
| ORDER BY | | <input type="checkbox" disabled checked /> | |
| LIMIT | | <input type="checkbox" disabled checked /> | |

Additionally, we support two types of procedure call invocations in Cypher:
- We offer a set of built-in procedures that can be invoked directly within Cypher queries. These procedures are all prefixed with `gs.procedure.`.

| Keyword | Comments | Example |
|:---|---|:---:|
| CALL | Retrieve schema information in a JSON format following the FLEX specification | `call gs.procedure.meta.schema();` |
| CALL | Retrieve statistics information in a JSON format following the FLEX specification | `call gs.procedure.meta.statistics();` |

- User-defined procedures: Users can define custom procedures in GIE and invoke them within their Cypher queries.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.alibaba.graphscope.common.intermediate.process.SinkByColumns;
import com.alibaba.graphscope.common.intermediate.process.SinkGraph;
import com.alibaba.graphscope.common.ir.meta.IrMeta;
import com.alibaba.graphscope.common.ir.meta.schema.SchemaSpec;
import com.alibaba.graphscope.common.jna.IrCoreLibrary;
import com.alibaba.graphscope.common.jna.type.*;
import com.alibaba.graphscope.common.utils.ClassUtils;
Expand Down Expand Up @@ -825,7 +826,7 @@ public Pointer createParams(QueryParams params) {
}

public IrPlan(IrMeta meta, InterOpCollection opCollection) {
irCoreLib.setSchema(meta.getSchema().schemaJson());
irCoreLib.setSchema(meta.getSchema().getSchemaSpec(SchemaSpec.Type.IR_CORE_IN_JSON));
this.ptrPlan = irCoreLib.initLogicalPlan();
// add snapshot to QueryParams
for (InterOpBase op : opCollection.unmodifiableCollection()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.alibaba.graphscope.common.ir.meta.procedure.GraphStoredProcedures;
import com.alibaba.graphscope.common.ir.meta.schema.IrGraphSchema;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Objects;

/**
Expand All @@ -33,18 +31,18 @@ public class IrMeta {
protected final GraphId graphId;
protected final SnapshotId snapshotId;
protected final IrGraphSchema schema;
protected final @Nullable GraphStoredProcedures storedProcedures;
protected final GraphStoredProcedures storedProcedures;

public IrMeta(IrGraphSchema schema) {
this(SnapshotId.createEmpty(), schema);
this(GraphId.DEFAULT, SnapshotId.createEmpty(), schema, new GraphStoredProcedures());
}

public IrMeta(IrGraphSchema schema, GraphStoredProcedures storedProcedures) {
this(GraphId.DEFAULT, SnapshotId.createEmpty(), schema, storedProcedures);
}

public IrMeta(SnapshotId snapshotId, IrGraphSchema schema) {
this(GraphId.DEFAULT, snapshotId, schema, null);
this(GraphId.DEFAULT, snapshotId, schema, new GraphStoredProcedures());
}

public IrMeta(
Expand All @@ -55,7 +53,7 @@ public IrMeta(
this.graphId = graphId;
this.snapshotId = Objects.requireNonNull(snapshotId);
this.schema = Objects.requireNonNull(schema);
this.storedProcedures = storedProcedures;
this.storedProcedures = Objects.requireNonNull(storedProcedures);
}

public IrGraphSchema getSchema() {
Expand All @@ -66,7 +64,7 @@ public SnapshotId getSnapshotId() {
return snapshotId;
}

public @Nullable GraphStoredProcedures getStoredProcedures() {
public GraphStoredProcedures getStoredProcedures() {
return storedProcedures;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.graphscope.common.ir.meta.IrMetaStats;
import com.alibaba.graphscope.common.ir.meta.IrMetaTracker;
import com.alibaba.graphscope.common.ir.meta.reader.IrMetaReader;
import com.alibaba.graphscope.common.ir.meta.schema.SchemaSpec.Type;
import com.alibaba.graphscope.groot.common.schema.api.GraphStatistics;

import org.slf4j.Logger;
Expand Down Expand Up @@ -72,7 +73,7 @@ private synchronized void syncMeta() {
IrMeta meta = this.reader.readMeta();
logger.debug(
"schema from remote: {}",
(meta == null) ? null : meta.getSchema().schemaJson());
(meta == null) ? null : meta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON));
GraphStatistics curStats;
// if the graph id or schema version is changed, we need to update the statistics
if (this.currentState == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

import com.alibaba.graphscope.common.ir.meta.IrMeta;
import com.alibaba.graphscope.common.ir.meta.reader.IrMetaReader;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.commons.lang3.ObjectUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand All @@ -34,10 +38,17 @@
import java.util.stream.Collectors;

public class GraphStoredProcedures {
public static final String META_PROCEDURE_PREFIX = "gs.procedure.meta.";
private static final Logger logger = LoggerFactory.getLogger(GraphStoredProcedures.class);
private final Map<String, StoredProcedureMeta> storedProcedureMetaMap;
private final IrMetaReader metaReader;

public GraphStoredProcedures() {
this.metaReader = null;
this.storedProcedureMetaMap = Maps.newLinkedHashMap();
registerBuiltInProcedures();
}

public GraphStoredProcedures(InputStream metaStream, IrMetaReader metaReader) {
Yaml yaml = new Yaml();
Map<String, Object> yamlAsMap = yaml.load(metaStream);
Expand All @@ -61,6 +72,38 @@ public GraphStoredProcedures(InputStream metaStream, IrMetaReader metaReader) {
.collect(Collectors.toMap(StoredProcedureMeta::getName, k -> k));
}
this.metaReader = metaReader;
registerBuiltInProcedures();
}

private void registerBuiltInProcedures() {
// register system-built-in procedures
String schemaProcedure = META_PROCEDURE_PREFIX + "schema";
RelDataTypeFactory typeFactory = StoredProcedureMeta.typeFactory;
this.storedProcedureMetaMap.put(
schemaProcedure,
new StoredProcedureMeta(
schemaProcedure,
StoredProcedureMeta.Mode.SCHEMA,
"",
"",
typeFactory.createStructType(
ImmutableList.of(typeFactory.createSqlType(SqlTypeName.CHAR)),
ImmutableList.of("schema")),
ImmutableList.of(),
ImmutableMap.of()));
String statsProcedure = META_PROCEDURE_PREFIX + "statistics";
this.storedProcedureMetaMap.put(
statsProcedure,
new StoredProcedureMeta(
statsProcedure,
StoredProcedureMeta.Mode.SCHEMA,
"",
"",
typeFactory.createStructType(
ImmutableList.of(typeFactory.createSqlType(SqlTypeName.CHAR)),
ImmutableList.of("statistics")),
ImmutableList.of(),
ImmutableMap.of()));
}

public @Nullable StoredProcedureMeta getStoredProcedure(String procedureName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,22 @@

package com.alibaba.graphscope.common.ir.meta.procedure;

import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.ir.meta.IrMeta;
import com.alibaba.graphscope.common.ir.meta.IrMetaStats;
import com.alibaba.graphscope.common.ir.meta.schema.GSDataTypeConvertor;
import com.alibaba.graphscope.common.ir.meta.schema.GSDataTypeDesc;
import com.alibaba.graphscope.common.ir.meta.schema.IrGraphStatistics;
import com.alibaba.graphscope.common.ir.meta.schema.SchemaSpec;
import com.alibaba.graphscope.common.ir.rex.RexProcedureCall;
import com.alibaba.graphscope.common.ir.tools.GraphPlanExecutor;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.gaia.proto.Common;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

Expand All @@ -34,8 +47,7 @@
import java.util.stream.Collectors;

public class StoredProcedureMeta {
private static final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();

public static final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
private final String name;
private final RelDataType returnType;
private final List<Parameter> parameters;
Expand All @@ -44,7 +56,7 @@ public class StoredProcedureMeta {
private final String extension;
private final Map<String, Object> options;

protected StoredProcedureMeta(
public StoredProcedureMeta(
String name,
Mode mode,
String description,
Expand Down Expand Up @@ -109,6 +121,10 @@ public String toString() {
+ '}';
}

public Mode getMode() {
return this.mode;
}

public static class Parameter {
private final String name;
private final RelDataType dataType;
Expand Down Expand Up @@ -265,10 +281,56 @@ private static List<StoredProcedureMeta.Parameter> createParameters(
}
}

public enum Mode {
public enum Mode implements GraphPlanExecutor {
READ,
WRITE,
SCHEMA
SCHEMA {
@Override
public void execute(
GraphPlanner.Summary summary, IrMeta irMeta, ExecutionResponseListener listener)
throws Exception {
RexProcedureCall procedureCall =
(RexProcedureCall) summary.getLogicalPlan().getProcedureCall();
String metaProcedure = procedureCall.op.getName();
String metaInJson;
// call gs.procedure.meta.schema();
if (metaProcedure.endsWith("schema")) {
metaInJson = irMeta.getSchema().getSchemaSpec(SchemaSpec.Type.FLEX_IN_JSON);
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(metaInJson);
metaInJson = mapper.writeValueAsString(rootNode.get("schema"));
} else if (metaProcedure.endsWith(
"statistics")) { // call gs.procedure.meta.statistics();
Preconditions.checkArgument(
irMeta instanceof IrMetaStats,
"cannot get statistics from ir meta, should be instance"
+ " of %s, but is %s",
IrMetaStats.class,
irMeta.getClass());
metaInJson =
((IrGraphStatistics) ((IrMetaStats) irMeta).getStatistics())
.getStatsJson();
} else {
throw new IllegalArgumentException("invalid meta procedure: " + metaProcedure);
}
IrResult.Entry metaEntry =
IrResult.Entry.newBuilder()
.setElement(
IrResult.Element.newBuilder()
.setObject(
Common.Value.newBuilder()
.setStr(metaInJson)
.build())
.build())
.build();
listener.onNext(
IrResult.Record.newBuilder()
.addColumns(
IrResult.Column.newBuilder().setEntry(metaEntry).build())
.build());
listener.onCompleted();
}
}
}

public static class Config {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import com.alibaba.graphscope.common.ir.meta.IrMeta;
import com.alibaba.graphscope.common.ir.meta.SnapshotId;
import com.alibaba.graphscope.common.ir.meta.procedure.GraphStoredProcedures;
import com.alibaba.graphscope.common.ir.meta.schema.FileFormatType;
import com.alibaba.graphscope.common.ir.meta.schema.IrGraphSchema;
import com.alibaba.graphscope.common.ir.meta.schema.IrGraphStatistics;
import com.alibaba.graphscope.common.ir.meta.schema.SchemaInputStream;
import com.alibaba.graphscope.common.ir.meta.schema.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -76,7 +73,7 @@ public IrMeta readMeta() throws IOException {
new SchemaInputStream(
new ByteArrayInputStream(
metaInYaml.getBytes(StandardCharsets.UTF_8)),
FileFormatType.YAML)),
SchemaSpec.Type.FLEX_IN_YAML)),
new GraphStoredProcedures(
new ByteArrayInputStream(metaInYaml.getBytes(StandardCharsets.UTF_8)),
this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import com.alibaba.graphscope.common.ir.meta.GraphId;
import com.alibaba.graphscope.common.ir.meta.IrMeta;
import com.alibaba.graphscope.common.ir.meta.procedure.GraphStoredProcedures;
import com.alibaba.graphscope.common.ir.meta.schema.FileFormatType;
import com.alibaba.graphscope.common.ir.meta.schema.IrGraphSchema;
import com.alibaba.graphscope.common.ir.meta.schema.IrGraphStatistics;
import com.alibaba.graphscope.common.ir.meta.schema.SchemaInputStream;
import com.alibaba.graphscope.common.ir.meta.schema.*;
import com.alibaba.graphscope.common.utils.FileUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -56,10 +53,15 @@ public IrMeta readMeta() throws IOException {
Path schemaPath =
(schemaURI.getScheme() == null) ? Path.of(schemaURI.getPath()) : Path.of(schemaURI);
FileFormatType formatType = FileUtils.getFormatType(schemaUri);
// hack way to determine schema specification from the file format
SchemaSpec.Type schemaSpec =
formatType == FileFormatType.YAML
? SchemaSpec.Type.FLEX_IN_YAML
: SchemaSpec.Type.IR_CORE_IN_JSON;
IrGraphSchema graphSchema =
new IrGraphSchema(
new SchemaInputStream(
new FileInputStream(schemaPath.toFile()), formatType));
new FileInputStream(schemaPath.toFile()), schemaSpec));
IrMeta irMeta =
(formatType == FileFormatType.YAML)
? new IrMeta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,43 @@
import com.alibaba.graphscope.groot.common.exception.PropertyNotFoundException;
import com.alibaba.graphscope.groot.common.exception.TypeNotFoundException;
import com.alibaba.graphscope.groot.common.schema.api.*;
import com.alibaba.graphscope.groot.common.util.IrSchemaParser;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
* Maintain Graph schema meta for IR and add two extra interfaces : {@link #schemaJson()} and {@link #isColumnId()}
* Maintain Graph schema meta for IR and add two extra interfaces : {@link #getSchemaSpec(SchemaSpec.Type)} ()} and {@link #isColumnId()}
*/
public class IrGraphSchema implements GraphSchema {
private final GraphSchema graphSchema;
private final String schemeJson;
private final boolean isColumnId;
private final SchemaSpecManager specManager;

public IrGraphSchema(SchemaInputStream schemaInputStream) throws IOException {
this.isColumnId = false;
String content =
new String(
schemaInputStream.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
schemaInputStream.getInputStream().close();
switch (schemaInputStream.getFormatType()) {
case YAML:
this.graphSchema = Utils.buildSchemaFromYaml(content);
this.schemeJson =
IrSchemaParser.getInstance().parse(this.graphSchema, this.isColumnId);
break;
case JSON:
default:
this.graphSchema = Utils.buildSchemaFromJson(content);
this.schemeJson = content;
}
SchemaSpec spec = new SchemaSpec(schemaInputStream.getType(), content);
this.graphSchema = spec.convert();
this.specManager = new SchemaSpecManager(this, spec);
}

public IrGraphSchema(GraphSchema graphSchema, boolean isColumnId) {
this.graphSchema = graphSchema;
this.schemeJson = IrSchemaParser.getInstance().parse(graphSchema, isColumnId);
this.isColumnId = isColumnId;
this.specManager = new SchemaSpecManager(this);
}

public boolean isColumnId() {
return this.isColumnId;
}

public String schemaJson() {
return this.schemeJson;
public String getSchemaSpec(SchemaSpec.Type type) {
return this.specManager.getSpec(type).getContent();
}

@Override
Expand Down Expand Up @@ -111,4 +102,8 @@ public Map<GraphElement, GraphProperty> getPropertyList(int i) {
public String getVersion() {
return this.graphSchema.getVersion();
}

protected GraphSchema getGraphSchema() {
return this.graphSchema;
}
}
Loading

0 comments on commit d13f298

Please sign in to comment.