diff --git a/docs/interactive_engine/neo4j/supported_cypher.md b/docs/interactive_engine/neo4j/supported_cypher.md index 0a00bfbe9c9d..faf0c0319c9f 100644 --- a/docs/interactive_engine/neo4j/supported_cypher.md +++ b/docs/interactive_engine/neo4j/supported_cypher.md @@ -133,3 +133,12 @@ RETURN a, b; | ORDER BY | | | | | LIMIT | | | | +Additionally, we support two types of procedure call invocations in Cypher: +- We offer a set of built-in procedures that can be invoked directly within Cypher queries. These procedures are all prefixed with `gs.procedure.`. + + | Keyword | Comments | Example | + |:---|---|:---:| + | CALL | Retrieve schema information in a JSON format following the FLEX specification | `call gs.procedure.meta.schema();` | + | CALL | Retrieve statistics information in a JSON format following the FLEX specification | `call gs.procedure.meta.statistics();` | + +- User-defined procedures: Users can define custom procedures in GIE and invoke them within their Cypher queries. diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java index 600cedab8f31..965dd4750945 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java @@ -28,6 +28,7 @@ import com.alibaba.graphscope.common.intermediate.process.SinkByColumns; import com.alibaba.graphscope.common.intermediate.process.SinkGraph; import com.alibaba.graphscope.common.ir.meta.IrMeta; +import com.alibaba.graphscope.common.ir.meta.schema.SchemaSpec; import com.alibaba.graphscope.common.jna.IrCoreLibrary; import com.alibaba.graphscope.common.jna.type.*; import com.alibaba.graphscope.common.utils.ClassUtils; @@ -825,7 +826,7 @@ public Pointer createParams(QueryParams params) { } public IrPlan(IrMeta meta, InterOpCollection opCollection) { - irCoreLib.setSchema(meta.getSchema().schemaJson()); + irCoreLib.setSchema(meta.getSchema().getSchemaSpec(SchemaSpec.Type.IR_CORE_IN_JSON)); this.ptrPlan = irCoreLib.initLogicalPlan(); // add snapshot to QueryParams for (InterOpBase op : opCollection.unmodifiableCollection()) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/IrMeta.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/IrMeta.java index 6035e96490cb..e8b5c331b5ed 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/IrMeta.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/IrMeta.java @@ -21,8 +21,6 @@ import com.alibaba.graphscope.common.ir.meta.procedure.GraphStoredProcedures; import com.alibaba.graphscope.common.ir.meta.schema.IrGraphSchema; -import org.checkerframework.checker.nullness.qual.Nullable; - import java.util.Objects; /** @@ -33,10 +31,10 @@ public class IrMeta { protected final GraphId graphId; protected final SnapshotId snapshotId; protected final IrGraphSchema schema; - protected final @Nullable GraphStoredProcedures storedProcedures; + protected final GraphStoredProcedures storedProcedures; public IrMeta(IrGraphSchema schema) { - this(SnapshotId.createEmpty(), schema); + this(GraphId.DEFAULT, SnapshotId.createEmpty(), schema, new GraphStoredProcedures()); } public IrMeta(IrGraphSchema schema, GraphStoredProcedures storedProcedures) { @@ -44,7 +42,7 @@ public IrMeta(IrGraphSchema schema, GraphStoredProcedures storedProcedures) { } public IrMeta(SnapshotId snapshotId, IrGraphSchema schema) { - this(GraphId.DEFAULT, snapshotId, schema, null); + this(GraphId.DEFAULT, snapshotId, schema, new GraphStoredProcedures()); } public IrMeta( @@ -55,7 +53,7 @@ public IrMeta( this.graphId = graphId; this.snapshotId = Objects.requireNonNull(snapshotId); this.schema = Objects.requireNonNull(schema); - this.storedProcedures = storedProcedures; + this.storedProcedures = Objects.requireNonNull(storedProcedures); } public IrGraphSchema getSchema() { @@ -66,7 +64,7 @@ public SnapshotId getSnapshotId() { return snapshotId; } - public @Nullable GraphStoredProcedures getStoredProcedures() { + public GraphStoredProcedures getStoredProcedures() { return storedProcedures; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index 599fb4fcbe0c..931e78aae887 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -25,6 +25,7 @@ import com.alibaba.graphscope.common.ir.meta.IrMetaStats; import com.alibaba.graphscope.common.ir.meta.IrMetaTracker; import com.alibaba.graphscope.common.ir.meta.reader.IrMetaReader; +import com.alibaba.graphscope.common.ir.meta.schema.SchemaSpec.Type; import com.alibaba.graphscope.groot.common.schema.api.GraphStatistics; import org.slf4j.Logger; @@ -72,7 +73,7 @@ private synchronized void syncMeta() { IrMeta meta = this.reader.readMeta(); logger.debug( "schema from remote: {}", - (meta == null) ? null : meta.getSchema().schemaJson()); + (meta == null) ? null : meta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON)); GraphStatistics curStats; // if the graph id or schema version is changed, we need to update the statistics if (this.currentState == null diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/procedure/GraphStoredProcedures.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/procedure/GraphStoredProcedures.java index 068b9a6e0e8d..ef181abc912d 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/procedure/GraphStoredProcedures.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/procedure/GraphStoredProcedures.java @@ -18,8 +18,12 @@ import com.alibaba.graphscope.common.ir.meta.IrMeta; import com.alibaba.graphscope.common.ir.meta.reader.IrMetaReader; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.commons.lang3.ObjectUtils; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -34,10 +38,17 @@ import java.util.stream.Collectors; public class GraphStoredProcedures { + public static final String META_PROCEDURE_PREFIX = "gs.procedure.meta."; private static final Logger logger = LoggerFactory.getLogger(GraphStoredProcedures.class); private final Map storedProcedureMetaMap; private final IrMetaReader metaReader; + public GraphStoredProcedures() { + this.metaReader = null; + this.storedProcedureMetaMap = Maps.newLinkedHashMap(); + registerBuiltInProcedures(); + } + public GraphStoredProcedures(InputStream metaStream, IrMetaReader metaReader) { Yaml yaml = new Yaml(); Map yamlAsMap = yaml.load(metaStream); @@ -61,6 +72,38 @@ public GraphStoredProcedures(InputStream metaStream, IrMetaReader metaReader) { .collect(Collectors.toMap(StoredProcedureMeta::getName, k -> k)); } this.metaReader = metaReader; + registerBuiltInProcedures(); + } + + private void registerBuiltInProcedures() { + // register system-built-in procedures + String schemaProcedure = META_PROCEDURE_PREFIX + "schema"; + RelDataTypeFactory typeFactory = StoredProcedureMeta.typeFactory; + this.storedProcedureMetaMap.put( + schemaProcedure, + new StoredProcedureMeta( + schemaProcedure, + StoredProcedureMeta.Mode.SCHEMA, + "", + "", + typeFactory.createStructType( + ImmutableList.of(typeFactory.createSqlType(SqlTypeName.CHAR)), + ImmutableList.of("schema")), + ImmutableList.of(), + ImmutableMap.of())); + String statsProcedure = META_PROCEDURE_PREFIX + "statistics"; + this.storedProcedureMetaMap.put( + statsProcedure, + new StoredProcedureMeta( + statsProcedure, + StoredProcedureMeta.Mode.SCHEMA, + "", + "", + typeFactory.createStructType( + ImmutableList.of(typeFactory.createSqlType(SqlTypeName.CHAR)), + ImmutableList.of("statistics")), + ImmutableList.of(), + ImmutableMap.of())); } public @Nullable StoredProcedureMeta getStoredProcedure(String procedureName) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/procedure/StoredProcedureMeta.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/procedure/StoredProcedureMeta.java index 753e169393da..c0e00ddde078 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/procedure/StoredProcedureMeta.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/procedure/StoredProcedureMeta.java @@ -16,9 +16,22 @@ package com.alibaba.graphscope.common.ir.meta.procedure; +import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.ir.meta.IrMeta; +import com.alibaba.graphscope.common.ir.meta.IrMetaStats; import com.alibaba.graphscope.common.ir.meta.schema.GSDataTypeConvertor; import com.alibaba.graphscope.common.ir.meta.schema.GSDataTypeDesc; +import com.alibaba.graphscope.common.ir.meta.schema.IrGraphStatistics; +import com.alibaba.graphscope.common.ir.meta.schema.SchemaSpec; +import com.alibaba.graphscope.common.ir.rex.RexProcedureCall; +import com.alibaba.graphscope.common.ir.tools.GraphPlanExecutor; +import com.alibaba.graphscope.common.ir.tools.GraphPlanner; +import com.alibaba.graphscope.gaia.proto.Common; +import com.alibaba.graphscope.gaia.proto.IrResult; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -34,8 +47,7 @@ import java.util.stream.Collectors; public class StoredProcedureMeta { - private static final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); - + public static final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); private final String name; private final RelDataType returnType; private final List parameters; @@ -44,7 +56,7 @@ public class StoredProcedureMeta { private final String extension; private final Map options; - protected StoredProcedureMeta( + public StoredProcedureMeta( String name, Mode mode, String description, @@ -109,6 +121,10 @@ public String toString() { + '}'; } + public Mode getMode() { + return this.mode; + } + public static class Parameter { private final String name; private final RelDataType dataType; @@ -265,10 +281,56 @@ private static List createParameters( } } - public enum Mode { + public enum Mode implements GraphPlanExecutor { READ, WRITE, - SCHEMA + SCHEMA { + @Override + public void execute( + GraphPlanner.Summary summary, IrMeta irMeta, ExecutionResponseListener listener) + throws Exception { + RexProcedureCall procedureCall = + (RexProcedureCall) summary.getLogicalPlan().getProcedureCall(); + String metaProcedure = procedureCall.op.getName(); + String metaInJson; + // call gs.procedure.meta.schema(); + if (metaProcedure.endsWith("schema")) { + metaInJson = irMeta.getSchema().getSchemaSpec(SchemaSpec.Type.FLEX_IN_JSON); + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(metaInJson); + metaInJson = mapper.writeValueAsString(rootNode.get("schema")); + } else if (metaProcedure.endsWith( + "statistics")) { // call gs.procedure.meta.statistics(); + Preconditions.checkArgument( + irMeta instanceof IrMetaStats, + "cannot get statistics from ir meta, should be instance" + + " of %s, but is %s", + IrMetaStats.class, + irMeta.getClass()); + metaInJson = + ((IrGraphStatistics) ((IrMetaStats) irMeta).getStatistics()) + .getStatsJson(); + } else { + throw new IllegalArgumentException("invalid meta procedure: " + metaProcedure); + } + IrResult.Entry metaEntry = + IrResult.Entry.newBuilder() + .setElement( + IrResult.Element.newBuilder() + .setObject( + Common.Value.newBuilder() + .setStr(metaInJson) + .build()) + .build()) + .build(); + listener.onNext( + IrResult.Record.newBuilder() + .addColumns( + IrResult.Column.newBuilder().setEntry(metaEntry).build()) + .build()); + listener.onCompleted(); + } + } } public static class Config { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java index c67949c56124..2e12792d7bf1 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/HttpIrMetaReader.java @@ -24,10 +24,7 @@ import com.alibaba.graphscope.common.ir.meta.IrMeta; import com.alibaba.graphscope.common.ir.meta.SnapshotId; import com.alibaba.graphscope.common.ir.meta.procedure.GraphStoredProcedures; -import com.alibaba.graphscope.common.ir.meta.schema.FileFormatType; -import com.alibaba.graphscope.common.ir.meta.schema.IrGraphSchema; -import com.alibaba.graphscope.common.ir.meta.schema.IrGraphStatistics; -import com.alibaba.graphscope.common.ir.meta.schema.SchemaInputStream; +import com.alibaba.graphscope.common.ir.meta.schema.*; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; @@ -76,7 +73,7 @@ public IrMeta readMeta() throws IOException { new SchemaInputStream( new ByteArrayInputStream( metaInYaml.getBytes(StandardCharsets.UTF_8)), - FileFormatType.YAML)), + SchemaSpec.Type.FLEX_IN_YAML)), new GraphStoredProcedures( new ByteArrayInputStream(metaInYaml.getBytes(StandardCharsets.UTF_8)), this)); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalIrMetaReader.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalIrMetaReader.java index 8c8c96458036..3b859755e4f8 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalIrMetaReader.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalIrMetaReader.java @@ -21,10 +21,7 @@ import com.alibaba.graphscope.common.ir.meta.GraphId; import com.alibaba.graphscope.common.ir.meta.IrMeta; import com.alibaba.graphscope.common.ir.meta.procedure.GraphStoredProcedures; -import com.alibaba.graphscope.common.ir.meta.schema.FileFormatType; -import com.alibaba.graphscope.common.ir.meta.schema.IrGraphSchema; -import com.alibaba.graphscope.common.ir.meta.schema.IrGraphStatistics; -import com.alibaba.graphscope.common.ir.meta.schema.SchemaInputStream; +import com.alibaba.graphscope.common.ir.meta.schema.*; import com.alibaba.graphscope.common.utils.FileUtils; import org.slf4j.Logger; @@ -56,10 +53,15 @@ public IrMeta readMeta() throws IOException { Path schemaPath = (schemaURI.getScheme() == null) ? Path.of(schemaURI.getPath()) : Path.of(schemaURI); FileFormatType formatType = FileUtils.getFormatType(schemaUri); + // hack way to determine schema specification from the file format + SchemaSpec.Type schemaSpec = + formatType == FileFormatType.YAML + ? SchemaSpec.Type.FLEX_IN_YAML + : SchemaSpec.Type.IR_CORE_IN_JSON; IrGraphSchema graphSchema = new IrGraphSchema( new SchemaInputStream( - new FileInputStream(schemaPath.toFile()), formatType)); + new FileInputStream(schemaPath.toFile()), schemaSpec)); IrMeta irMeta = (formatType == FileFormatType.YAML) ? new IrMeta( diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphSchema.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphSchema.java index 351ad63a4c6a..b8f5ce184096 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphSchema.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphSchema.java @@ -19,7 +19,6 @@ import com.alibaba.graphscope.groot.common.exception.PropertyNotFoundException; import com.alibaba.graphscope.groot.common.exception.TypeNotFoundException; import com.alibaba.graphscope.groot.common.schema.api.*; -import com.alibaba.graphscope.groot.common.util.IrSchemaParser; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -27,12 +26,12 @@ import java.util.Map; /** - * Maintain Graph schema meta for IR and add two extra interfaces : {@link #schemaJson()} and {@link #isColumnId()} + * Maintain Graph schema meta for IR and add two extra interfaces : {@link #getSchemaSpec(SchemaSpec.Type)} ()} and {@link #isColumnId()} */ public class IrGraphSchema implements GraphSchema { private final GraphSchema graphSchema; - private final String schemeJson; private final boolean isColumnId; + private final SchemaSpecManager specManager; public IrGraphSchema(SchemaInputStream schemaInputStream) throws IOException { this.isColumnId = false; @@ -40,31 +39,23 @@ public IrGraphSchema(SchemaInputStream schemaInputStream) throws IOException { new String( schemaInputStream.getInputStream().readAllBytes(), StandardCharsets.UTF_8); schemaInputStream.getInputStream().close(); - switch (schemaInputStream.getFormatType()) { - case YAML: - this.graphSchema = Utils.buildSchemaFromYaml(content); - this.schemeJson = - IrSchemaParser.getInstance().parse(this.graphSchema, this.isColumnId); - break; - case JSON: - default: - this.graphSchema = Utils.buildSchemaFromJson(content); - this.schemeJson = content; - } + SchemaSpec spec = new SchemaSpec(schemaInputStream.getType(), content); + this.graphSchema = spec.convert(); + this.specManager = new SchemaSpecManager(this, spec); } public IrGraphSchema(GraphSchema graphSchema, boolean isColumnId) { this.graphSchema = graphSchema; - this.schemeJson = IrSchemaParser.getInstance().parse(graphSchema, isColumnId); this.isColumnId = isColumnId; + this.specManager = new SchemaSpecManager(this); } public boolean isColumnId() { return this.isColumnId; } - public String schemaJson() { - return this.schemeJson; + public String getSchemaSpec(SchemaSpec.Type type) { + return this.specManager.getSpec(type).getContent(); } @Override @@ -111,4 +102,8 @@ public Map getPropertyList(int i) { public String getVersion() { return this.graphSchema.getVersion(); } + + protected GraphSchema getGraphSchema() { + return this.graphSchema; + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphStatistics.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphStatistics.java index fb71cc745170..c01a7c48f940 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphStatistics.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphStatistics.java @@ -17,6 +17,9 @@ package com.alibaba.graphscope.common.ir.meta.schema; import com.alibaba.graphscope.groot.common.schema.api.GraphStatistics; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; @@ -27,16 +30,19 @@ * Maintain Graph statistics meta for IR */ public class IrGraphStatistics implements GraphStatistics { + private final String statsJson; private final GraphStatistics graphStatistics; public IrGraphStatistics(InputStream statisticsStream) throws IOException { String content = new String(statisticsStream.readAllBytes(), StandardCharsets.UTF_8); statisticsStream.close(); + this.statsJson = content; this.graphStatistics = Utils.buildStatisticsFromJson(content); } public IrGraphStatistics(GraphStatistics statistics) { this.graphStatistics = statistics; + this.statsJson = null; } @Override @@ -66,4 +72,14 @@ public Long getEdgeTypeCount( public String getVersion() { return this.graphStatistics.getVersion(); } + + public String getStatsJson() throws Exception { + // todo: conversion from 'GraphStatistics' to json need to be implemented in groot + Preconditions.checkArgument( + statsJson != null, "conversion from 'GraphStatistics' to json is unsupported yet"); + // print json in indent mode + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.INDENT_OUTPUT, false); + return mapper.writeValueAsString(mapper.readTree(statsJson)); + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/SchemaInputStream.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/SchemaInputStream.java index 91c6a2847f34..f1aa54b96c57 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/SchemaInputStream.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/SchemaInputStream.java @@ -22,18 +22,18 @@ public class SchemaInputStream { private final InputStream inputStream; - private final FileFormatType formatType; + private final SchemaSpec.Type type; - public SchemaInputStream(InputStream inputStream, FileFormatType formatType) { + public SchemaInputStream(InputStream inputStream, SchemaSpec.Type type) { this.inputStream = inputStream; - this.formatType = formatType; + this.type = type; } public InputStream getInputStream() { return inputStream; } - public FileFormatType getFormatType() { - return formatType; + public SchemaSpec.Type getType() { + return type; } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/SchemaSpec.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/SchemaSpec.java new file mode 100644 index 000000000000..dc35c7069760 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/SchemaSpec.java @@ -0,0 +1,68 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.ir.meta.schema; + +import com.alibaba.graphscope.groot.common.schema.api.GraphSchema; +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.yaml.snakeyaml.Yaml; + +import java.util.Map; + +public class SchemaSpec { + private final Type type; + private final String content; + + public SchemaSpec(Type type, String content) { + this.type = type; + this.content = content; + } + + public GraphSchema convert() throws JacksonException { + switch (type) { + case IR_CORE_IN_JSON: + return Utils.buildSchemaFromJson(content); + case FLEX_IN_YAML: + return Utils.buildSchemaFromYaml(content); + case FLEX_IN_JSON: + default: + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(content); + Map rootMap = mapper.convertValue(rootNode, Map.class); + Yaml yaml = new Yaml(); + return Utils.buildSchemaFromYaml(yaml.dump(rootMap)); + } + } + + public String getContent() { + return content; + } + + public Type getType() { + return type; + } + + public enum Type { + IR_CORE_IN_JSON, + FLEX_IN_JSON, + FLEX_IN_YAML; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/SchemaSpecManager.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/SchemaSpecManager.java new file mode 100644 index 000000000000..80079cbff9eb --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/SchemaSpecManager.java @@ -0,0 +1,113 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.ir.meta.schema; + +import com.alibaba.graphscope.groot.common.util.IrSchemaParser; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.util.List; +import java.util.Map; + +public class SchemaSpecManager { + private static final Logger logger = LoggerFactory.getLogger(SchemaSpecManager.class); + private final IrGraphSchema parent; + private final List specifications; + + public SchemaSpecManager(IrGraphSchema parent) { + this.parent = parent; + this.specifications = Lists.newArrayList(convert(null, SchemaSpec.Type.IR_CORE_IN_JSON)); + } + + public SchemaSpecManager(IrGraphSchema parent, SchemaSpec input) { + this.parent = parent; + this.specifications = Lists.newArrayList(input); + } + + public SchemaSpec getSpec(SchemaSpec.Type type) { + for (SchemaSpec spec : specifications) { + if (spec.getType() == type) { + return spec; + } + } + // if not exist, try to create a new JsonSpecification with content converted from others + SchemaSpec newSpec; + List existing = Lists.newArrayList(); + for (SchemaSpec spec : specifications) { + if ((newSpec = convert(spec, type)) != null) { + specifications.add(newSpec); + return newSpec; + } + existing.add(spec.getType()); + } + throw new IllegalArgumentException( + "spec type [" + + type + + "] cannot be converted from any existing spec types " + + existing); + } + + private @Nullable SchemaSpec convert(@Nullable SchemaSpec source, SchemaSpec.Type target) { + try { + if (source != null && source.getType() == target) { + return source; + } + switch (target) { + case IR_CORE_IN_JSON: + return new SchemaSpec( + target, + IrSchemaParser.getInstance() + .parse(parent.getGraphSchema(), parent.isColumnId())); + case FLEX_IN_JSON: + if (source.getType() == SchemaSpec.Type.FLEX_IN_YAML) { + Yaml yaml = new Yaml(); + Map rootMap = yaml.load(source.getContent()); + ObjectMapper mapper = new ObjectMapper(); + return new SchemaSpec(target, mapper.writeValueAsString(rootMap)); + } + // todo: convert from JSON in IR_CORE to JSON in FLEX + return null; + case FLEX_IN_YAML: + default: + if (source.getType() == SchemaSpec.Type.FLEX_IN_JSON) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(source.getContent()); + Map rootMap = mapper.convertValue(rootNode, Map.class); + Yaml yaml = new Yaml(); + return new SchemaSpec(target, yaml.dump(rootMap)); + } + // todo: convert from JSON in IR_CORE to YAML in FlEX + return null; + } + } catch (Exception e) { + logger.warn( + "can not convert from {} to {} due to some unexpected exception:", + source.getType(), + target, + e); + return null; + } + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/rex/RexProcedureCall.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/rex/RexProcedureCall.java new file mode 100644 index 000000000000..229c778af07f --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/rex/RexProcedureCall.java @@ -0,0 +1,50 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.ir.rex; + +import com.alibaba.graphscope.common.ir.meta.procedure.StoredProcedureMeta; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; + +import java.util.List; +import java.util.Objects; + +public class RexProcedureCall extends RexCall { + private final StoredProcedureMeta procedureMeta; + + public RexProcedureCall( + RelDataType type, + SqlOperator operator, + List operands, + StoredProcedureMeta procedureMeta) { + super(type, operator, operands); + this.procedureMeta = Objects.requireNonNull(procedureMeta); + } + + public StoredProcedureMeta getProcedureMeta() { + return procedureMeta; + } + + public StoredProcedureMeta.Mode getMode() { + return this.procedureMeta.getMode(); + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/ffi/FfiPhysicalBuilder.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/ffi/FfiPhysicalBuilder.java index d9f08d30f458..daf51c992a60 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/ffi/FfiPhysicalBuilder.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/ffi/FfiPhysicalBuilder.java @@ -20,6 +20,7 @@ import com.alibaba.graphscope.common.config.FrontendConfig; import com.alibaba.graphscope.common.config.PegasusConfig; import com.alibaba.graphscope.common.ir.meta.IrMeta; +import com.alibaba.graphscope.common.ir.meta.schema.SchemaSpec.Type; import com.alibaba.graphscope.common.ir.rel.*; import com.alibaba.graphscope.common.ir.rel.GraphLogicalAggregate; import com.alibaba.graphscope.common.ir.rel.GraphLogicalProject; @@ -80,7 +81,7 @@ public FfiPhysicalBuilder( } private static PlanPointer createDefaultPlanPointer(IrMeta irMeta) { - checkFfiResult(LIB.setSchema(irMeta.getSchema().schemaJson())); + checkFfiResult(LIB.setSchema(irMeta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON))); return new PlanPointer(LIB.initLogicalPlan()); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphBuilder.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphBuilder.java index 85b5002befda..41900d4a1cb4 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphBuilder.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphBuilder.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.FrontendConfig; import com.alibaba.graphscope.common.exception.FrontendException; +import com.alibaba.graphscope.common.ir.meta.procedure.StoredProcedureMeta; import com.alibaba.graphscope.common.ir.meta.schema.GraphOptSchema; import com.alibaba.graphscope.common.ir.meta.schema.IrGraphSchema; import com.alibaba.graphscope.common.ir.rel.*; @@ -752,6 +753,15 @@ public RexNode call(SqlOperator operator, RexNode... operands) { return call_(operator, ImmutableList.copyOf(operands)); } + public RexNode procedureCall( + SqlOperator operator, + Iterable operands, + StoredProcedureMeta procedureMeta) { + RexCall call = (RexCall) call(operator, operands); + return new RexProcedureCall( + call.getType(), call.getOperator(), call.getOperands(), procedureMeta); + } + @Override public RexNode call(SqlOperator operator, Iterable operands) { return call_(operator, ImmutableList.copyOf(operands)); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanExecutor.java new file mode 100644 index 000000000000..68cd72707a45 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanExecutor.java @@ -0,0 +1,28 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.ir.tools; + +import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; +import com.alibaba.graphscope.common.ir.meta.IrMeta; + +public interface GraphPlanExecutor { + default void execute( + GraphPlanner.Summary summary, IrMeta irMeta, ExecutionResponseListener listener) + throws Exception {} +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/ProcedureCallVisitor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/ProcedureCallVisitor.java index ffaf9e4f2221..b5d690d0751b 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/ProcedureCallVisitor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/ProcedureCallVisitor.java @@ -28,6 +28,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.util.Pair; import java.util.List; import java.util.stream.Collectors; @@ -60,23 +61,26 @@ public RexNode visitOC_StandaloneCall(CypherGSParser.OC_StandaloneCallContext ct @Override public RexNode visitOC_ExplicitProcedureInvocation( CypherGSParser.OC_ExplicitProcedureInvocationContext ctx) { - SqlOperator operator = visitOC_ProcedureNameAsOperator(ctx.oC_ProcedureName()); + Pair operator = + visitOC_ProcedureNameAsOperator(ctx.oC_ProcedureName()); List operands = ctx.oC_Expression().stream() .map(this::visitOC_Expression) .collect(Collectors.toList()); - return builder.call(operator, operands); + return builder.procedureCall(operator.left, operands, operator.right); } @Override public RexNode visitOC_ImplicitProcedureInvocation( CypherGSParser.OC_ImplicitProcedureInvocationContext ctx) { - SqlOperator operator = visitOC_ProcedureNameAsOperator(ctx.oC_ProcedureName()); - return builder.call(operator, ImmutableList.of()); + Pair operator = + visitOC_ProcedureNameAsOperator(ctx.oC_ProcedureName()); + return builder.procedureCall(operator.left, ImmutableList.of(), operator.right); } // visit procedure name - public SqlOperator visitOC_ProcedureNameAsOperator(CypherGSParser.OC_ProcedureNameContext ctx) { + public Pair visitOC_ProcedureNameAsOperator( + CypherGSParser.OC_ProcedureNameContext ctx) { String procedureName = ctx.getText(); StoredProcedureMeta meta = null; GraphStoredProcedures procedures = irMeta.getStoredProcedures(); @@ -84,7 +88,7 @@ public SqlOperator visitOC_ProcedureNameAsOperator(CypherGSParser.OC_ProcedureNa procedures != null && (meta = procedures.getStoredProcedure(procedureName)) != null, "procedure %s not found", procedureName); - return GraphStdOperatorTable.USER_DEFINED_PROCEDURE(meta); + return Pair.of(GraphStdOperatorTable.USER_DEFINED_PROCEDURE(meta), meta); } // visit procedure parameters diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/AbstractPlanExecution.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/CypherPlanExecution.java similarity index 78% rename from interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/AbstractPlanExecution.java rename to interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/CypherPlanExecution.java index 9b3845cb6a7e..4bb1413ed277 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/AbstractPlanExecution.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/CypherPlanExecution.java @@ -16,8 +16,9 @@ package com.alibaba.graphscope.cypher.executor; -import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.QueryTimeoutConfig; +import com.alibaba.graphscope.common.ir.meta.IrMeta; +import com.alibaba.graphscope.common.ir.tools.GraphPlanExecutor; import com.alibaba.graphscope.common.ir.tools.GraphPlanner; import com.alibaba.graphscope.cypher.result.CypherRecordParser; import com.alibaba.graphscope.cypher.result.CypherRecordProcessor; @@ -27,18 +28,24 @@ import org.neo4j.kernel.impl.query.QueryExecution; import org.neo4j.kernel.impl.query.QuerySubscriber; -public abstract class AbstractPlanExecution implements StatementResults.SubscribableExecution { +public class CypherPlanExecution implements StatementResults.SubscribableExecution { private final GraphPlanner.Summary planSummary; private final QueryTimeoutConfig timeoutConfig; private final QueryStatusCallback statusCallback; + private final IrMeta irMeta; + private final GraphPlanExecutor innerExecutor; - public AbstractPlanExecution( + public CypherPlanExecution( GraphPlanner.Summary planSummary, QueryTimeoutConfig timeoutConfig, - QueryStatusCallback statusCallback) { + QueryStatusCallback statusCallback, + IrMeta irMeta, + GraphPlanExecutor innerExecutor) { this.planSummary = planSummary; this.timeoutConfig = timeoutConfig; this.statusCallback = statusCallback; + this.irMeta = irMeta; + this.innerExecutor = innerExecutor; } @Override @@ -50,12 +57,10 @@ public QueryExecution subscribe(QuerySubscriber querySubscriber) { querySubscriber, timeoutConfig, statusCallback); - execute(recordProcessor); + innerExecutor.execute(planSummary, irMeta, recordProcessor); return recordProcessor; } catch (Exception e) { throw new RuntimeException(e); } } - - protected abstract void execute(ExecutionResponseListener listener) throws Exception; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java index f3b07b1f8780..4771cc39d3a0 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java @@ -23,9 +23,9 @@ import com.alibaba.graphscope.common.config.QueryTimeoutConfig; import com.alibaba.graphscope.common.exception.FrontendException; import com.alibaba.graphscope.common.ir.meta.IrMeta; -import com.alibaba.graphscope.common.ir.tools.GraphPlanner; -import com.alibaba.graphscope.common.ir.tools.QueryCache; -import com.alibaba.graphscope.common.ir.tools.QueryIdGenerator; +import com.alibaba.graphscope.common.ir.meta.procedure.StoredProcedureMeta; +import com.alibaba.graphscope.common.ir.rex.RexProcedureCall; +import com.alibaba.graphscope.common.ir.tools.*; import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; import com.alibaba.graphscope.common.utils.ClassUtils; import com.alibaba.graphscope.gaia.proto.IrResult; @@ -56,7 +56,7 @@ public class GraphQueryExecutor extends FabricExecutor { private static final Logger logger = LoggerFactory.getLogger(GraphQueryExecutor.class); private static final String GET_ROUTING_TABLE_STATEMENT = "CALL dbms.routing.getRoutingTable($routingContext, $databaseName)"; - private static String PING_STATEMENT = "CALL db.ping()"; + private static final String PING_STATEMENT = "CALL db.ping()"; private final Configs graphConfig; private final IrMetaQueryCallback metaQueryCallback; private final ExecutionClient client; @@ -120,7 +120,6 @@ public StatementResult run( new MetricsCollector.Cypher(System.currentTimeMillis()), null, graphConfig); - ; try { // hack ways to execute routing table or ping statement before executing the real query if (statement.equals(GET_ROUTING_TABLE_STATEMENT) || statement.equals(PING_STATEMENT)) { @@ -154,34 +153,46 @@ public StatementResult run( jobName, planSummary.getPhysicalPlan().explain()); QueryTimeoutConfig timeoutConfig = getQueryTimeoutConfig(); - StatementResults.SubscribableExecution execution; + GraphPlanExecutor executor; if (cacheValue.result != null && cacheValue.result.isCompleted) { - execution = - new AbstractPlanExecution(planSummary, timeoutConfig, statusCallback) { + executor = + new GraphPlanExecutor() { @Override - protected void execute(ExecutionResponseListener listener) { + public void execute( + GraphPlanner.Summary summary, + IrMeta irMeta, + ExecutionResponseListener listener) + throws Exception { List records = cacheValue.result.records; records.forEach(k -> listener.onNext(k.getRecord())); listener.onCompleted(); } }; + } else if (metaProcedureCall(planSummary.getLogicalPlan())) { + executor = StoredProcedureMeta.Mode.SCHEMA; } else { - execution = - new AbstractPlanExecution(planSummary, timeoutConfig, statusCallback) { + executor = + new GraphPlanExecutor() { @Override - protected void execute(ExecutionResponseListener listener) + public void execute( + GraphPlanner.Summary summary, + IrMeta meta, + ExecutionResponseListener listener) throws Exception { ExecutionRequest request = new ExecutionRequest( jobId, jobName, - planSummary.getLogicalPlan(), - planSummary.getPhysicalPlan()); + summary.getLogicalPlan(), + summary.getPhysicalPlan()); client.submit(request, listener, timeoutConfig); } }; } - return StatementResults.connectVia(execution, new QuerySubject.BasicQuerySubject()); + return StatementResults.connectVia( + new CypherPlanExecution( + planSummary, timeoutConfig, statusCallback, irMeta, executor), + new QuerySubject.BasicQuerySubject()); } catch (FrontendException e) { e.getDetails().put("QueryId", jobId); statusCallback.onErrorEnd(e.getMessage()); @@ -199,4 +210,10 @@ protected void execute(ExecutionResponseListener listener) private QueryTimeoutConfig getQueryTimeoutConfig() { return new QueryTimeoutConfig(fabricConfig.getTransactionTimeout().toMillis()); } + + private boolean metaProcedureCall(LogicalPlan plan) { + if (!(plan.getProcedureCall() instanceof RexProcedureCall)) return false; + RexProcedureCall procedureCall = (RexProcedureCall) plan.getProcedureCall(); + return procedureCall.getMode() == StoredProcedureMeta.Mode.SCHEMA; + } }