diff --git a/flex/engines/graph_db/CMakeLists.txt b/flex/engines/graph_db/CMakeLists.txt index 6618b6abf5dc..2e91eda1394e 100644 --- a/flex/engines/graph_db/CMakeLists.txt +++ b/flex/engines/graph_db/CMakeLists.txt @@ -1,6 +1,7 @@ add_subdirectory(runtime) file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc" - "${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc") + "${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/app/builtin/*.cc") add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES}) diff --git a/flex/engines/graph_db/app/builtin/count_vertices.cc b/flex/engines/graph_db/app/builtin/count_vertices.cc new file mode 100644 index 000000000000..e36e6a1b1398 --- /dev/null +++ b/flex/engines/graph_db/app/builtin/count_vertices.cc @@ -0,0 +1,54 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "flex/engines/graph_db/app/builtin/count_vertices.h" + +namespace gs { + +bool CountVertices::DoQuery(GraphDBSession& sess, Decoder& input, + Encoder& output) { + // First get the read transaction. + auto txn = sess.GetReadTransaction(); + // We expect one param of type string from decoder. + if (input.empty()) { + return false; + } + std::string label_name{input.get_string()}; + const auto& schema = txn.schema(); + if (!schema.has_vertex_label(label_name)) { + output.put_string_view("The requested label doesn't exits."); + return false; // The requested label doesn't exits. + } + auto label_id = schema.get_vertex_label_id(label_name); + // The vertices are labeled internally from 0 ~ vertex_label_num, accumulate + auto vertex_num = txn.GetVertexNum(label_id); + // the count. + results::CollectiveResults results; + auto result = results.add_results(); + result->mutable_record() + ->add_columns() + ->mutable_entry() + ->mutable_element() + ->mutable_object() + ->set_i32(vertex_num); + + output.put_string_view(results.SerializeAsString()); + txn.Commit(); + return true; +} + +AppWrapper CountVerticesFactory::CreateApp(const GraphDB& db) { + return AppWrapper(new CountVertices(), NULL); +} +} // namespace gs diff --git a/flex/engines/graph_db/app/builtin/count_vertices.h b/flex/engines/graph_db/app/builtin/count_vertices.h new file mode 100644 index 000000000000..76d7e1bb403d --- /dev/null +++ b/flex/engines/graph_db/app/builtin/count_vertices.h @@ -0,0 +1,39 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_ +#define ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_ +#include "flex/engines/graph_db/database/graph_db_session.h" +#include "flex/engines/hqps_db/app/interactive_app_base.h" + +namespace gs { +// A simple app to count the number of vertices of a given label. +class CountVertices : public CypherInternalPbWriteAppBase { + public: + CountVertices() {} + bool DoQuery(GraphDBSession& sess, Decoder& input, Encoder& output) override; +}; + +class CountVerticesFactory : public AppFactoryBase { + public: + CountVerticesFactory() = default; + ~CountVerticesFactory() = default; + + AppWrapper CreateApp(const GraphDB& db) override; +}; + +} // namespace gs + +#endif // ENGINES_GRAPH_DB_APP_BUILDIN_COUNT_VERTICES_H_ \ No newline at end of file diff --git a/flex/engines/graph_db/database/graph_db.cc b/flex/engines/graph_db/database/graph_db.cc index 2f934e232aee..0aa50c6b0624 100644 --- a/flex/engines/graph_db/database/graph_db.cc +++ b/flex/engines/graph_db/database/graph_db.cc @@ -15,6 +15,7 @@ #include "flex/engines/graph_db/database/graph_db.h" #include "flex/engines/graph_db/app/adhoc_app.h" +#include "flex/engines/graph_db/app/builtin/count_vertices.h" #include "flex/engines/graph_db/app/hqps_app.h" #include "flex/engines/graph_db/app/server_app.h" #include "flex/engines/graph_db/database/graph_db_session.h" @@ -405,6 +406,8 @@ void GraphDB::initApps( } // Builtin apps app_factories_[0] = std::make_shared(); + app_factories_[Schema::BUILTIN_COUNT_VERTICES_PLUGIN_ID] = + std::make_shared(); app_factories_[Schema::HQPS_ADHOC_READ_PLUGIN_ID] = std::make_shared(); app_factories_[Schema::HQPS_ADHOC_WRITE_PLUGIN_ID] = diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index 744206c72bad..05f31700f85d 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -284,6 +284,13 @@ GraphDBSession::parse_query_type_from_cypher_internal( gs::Status(StatusCode::NOT_FOUND, "Query name is empty")); } const auto& app_name_to_path_index = schema().GetPlugins(); + // First check whether the query name is builtin query + for (int i = 0; i < Schema::BUILTIN_PLUGIN_NUM; ++i) { + std::string builtin_query_name = Schema::BUILTIN_PLUGIN_NAMES[i]; + if (query_name == builtin_query_name) { + return std::make_pair(Schema::BUILTIN_PLUGIN_IDS[i], str_view); + } + } if (app_name_to_path_index.count(query_name) <= 0) { LOG(ERROR) << "Query name is not registered: " << query_name; return Result>(gs::Status( diff --git a/flex/engines/hqps_db/app/interactive_app_base.h b/flex/engines/hqps_db/app/interactive_app_base.h index cadf3a04c1d0..78b29a719c70 100644 --- a/flex/engines/hqps_db/app/interactive_app_base.h +++ b/flex/engines/hqps_db/app/interactive_app_base.h @@ -25,7 +25,8 @@ namespace gs { -void put_argument(gs::Encoder& encoder, const procedure::Argument& argument) { +inline void put_argument(gs::Encoder& encoder, + const procedure::Argument& argument) { auto& value = argument.value(); auto item_case = value.item_case(); switch (item_case) { @@ -46,8 +47,8 @@ void put_argument(gs::Encoder& encoder, const procedure::Argument& argument) { } } -bool parse_input_argument(gs::Decoder& raw_input, - gs::Encoder& argument_encoder) { +inline bool parse_input_argument(gs::Decoder& raw_input, + gs::Encoder& argument_encoder) { if (raw_input.size() == 0) { VLOG(10) << "No arguments found in input"; return true; diff --git a/flex/engines/http_server/actor/admin_actor.act.cc b/flex/engines/http_server/actor/admin_actor.act.cc index f67e27e9dbde..081e70a3d89a 100644 --- a/flex/engines/http_server/actor/admin_actor.act.cc +++ b/flex/engines/http_server/actor/admin_actor.act.cc @@ -225,6 +225,12 @@ seastar::future invoke_creating_procedure( if (json.HasMember("name")) { // Currently we need id== name rapidjson::Value& name = json["name"]; + if (gs::Schema::IsBuiltinPlugin(name.GetString())) { + return seastar::make_exception_future( + std::string( + "The plugin name is a builtin plugin, cannot be created: ") + + name.GetString()); + } rapidjson::Value name_copy(name, json.GetAllocator()); json.AddMember("id", name_copy, json.GetAllocator()); } @@ -516,6 +522,9 @@ seastar::future admin_actor::run_get_graph_meta( // There can also be procedures that builtin in the graph meta. for (auto& plugin_meta : graph_meta.plugin_metas) { add_runnable_info(plugin_meta); + if (plugin_meta.bound_graph.empty()) { + plugin_meta.bound_graph = query_param.content; + } } graph_meta.plugin_metas.insert(graph_meta.plugin_metas.end(), all_plugin_metas.begin(), @@ -697,6 +706,15 @@ admin_actor::get_procedure_by_procedure_name( auto get_procedure_res = metadata_store_->GetPluginMeta(graph_id, procedure_id); + auto builtin_plugins = gs::get_builtin_plugin_metas(); + for (auto& builtin_plugin : builtin_plugins) { + if (builtin_plugin.id == procedure_id.c_str()) { + add_runnable_info(builtin_plugin); + return seastar::make_ready_future( + gs::Result(builtin_plugin.ToJson())); + } + } + if (get_procedure_res.ok()) { VLOG(10) << "Successfully get procedure procedures"; auto& proc_meta = get_procedure_res.value(); @@ -803,6 +821,16 @@ seastar::future admin_actor::delete_procedure( gs::Result(graph_meta_res.status())); } + if (gs::Schema::IsBuiltinPlugin(procedure_id)) { + LOG(ERROR) << "The plugin name is a builtin plugin, cannot be deleted: " + << procedure_id; + return seastar::make_ready_future( + gs::Result(gs::Status( + gs::StatusCode::ILLEGAL_OPERATION, + "The plugin name is a builtin plugin, cannot be deleted: " + + procedure_id))); + } + auto get_procedure_res = metadata_store_->GetPluginMeta(graph_id, procedure_id); @@ -859,6 +887,16 @@ seastar::future admin_actor::update_procedure( gs::Result(graph_meta_res.status())); } + if (gs::Schema::IsBuiltinPlugin(procedure_id)) { + LOG(ERROR) << "The plugin name is a builtin plugin, cannot be updated: " + << procedure_id; + return seastar::make_ready_future( + gs::Result(gs::Status( + gs::StatusCode::ILLEGAL_OPERATION, + "The plugin name is a builtin plugin, cannot be updated: " + + procedure_id))); + } + auto get_procedure_res = metadata_store_->GetPluginMeta(graph_id, procedure_id); diff --git a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py index 7770279083f8..b4402cbae0bd 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py @@ -395,8 +395,8 @@ def run_cypher_test_suite(neo4j_sess : Neo4jSession, graph_id: str, queries: lis for query in queries: submit_query_via_neo4j_endpoint(neo4j_sess, graph_id, query) -def call_procedure(neo4j_sess : Neo4jSession, graph_id: str, proc_name: str): - query = "CALL " + proc_name + "()" +def call_procedure(neo4j_sess : Neo4jSession, graph_id: str, proc_name: str, *args): + query = "CALL " + proc_name + "(" + ",".join(args) + ")" result = neo4j_sess.run(query) for record in result: print(record) @@ -435,6 +435,19 @@ def create_procedure(sess: Session, graph_id: str, name: str, query: str, descri proc_id = resp.get_value().procedure_id return proc_id +def delete_procedure(sess: Session, graph_id: str, proc_id: str): + resp = sess.delete_procedure(graph_id, proc_id) + if not resp.is_ok(): + print("Failed to delete procedure: ", resp.get_status_message()) + raise Exception("Failed to delete procedure, status: ", resp.get_status_message()) + +def update_procedure(sess: Session, graph_id: str, proc_id: str, desc : str): + request = UpdateProcedureRequest( + description=desc) + resp = sess.update_procedure(graph_id, proc_id, request) + if not resp.is_ok(): + print("Failed to update procedure: ", resp.get_status_message()) + raise Exception("Failed to update procedure, status: ", resp.get_status_message()) def start_service_on_graph(interactive_session, graph_id : str): resp = interactive_session.start_service(StartServiceRequest(graph_id=graph_id)) diff --git a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py index 11f3d40e528d..093dc03b0aca 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py @@ -28,7 +28,7 @@ from gs_interactive.tests.conftest import create_vertex_only_modern_graph, start_service_on_graph,interactive_driver -from gs_interactive.tests.conftest import create_procedure, delete_running_graph, create_modern_graph, create_partial_modern_graph,run_cypher_test_suite, call_procedure +from gs_interactive.tests.conftest import create_procedure,delete_procedure,update_procedure, delete_running_graph, create_modern_graph, create_partial_modern_graph,run_cypher_test_suite, call_procedure from gs_interactive.tests.conftest import import_data_to_vertex_only_modern_graph, import_data_to_partial_modern_graph, import_data_to_full_modern_graph @@ -121,3 +121,18 @@ def test_procedure_creation(interactive_session, neo4j_session, create_modern_gr with pytest.raises(Exception): create_procedure(interactive_session, create_modern_graph, "test_proc2", "MATCH(n: IDONTKOWN) return count(n)") +def test_builtin_procedure(interactive_session,neo4j_session, create_modern_graph): + print("[Test builtin procedure]") + # Delete the builtin procedure should fail + with pytest.raises(Exception): + delete_procedure(interactive_session, create_modern_graph, "count_vertices") + # Create a procedure with the same name as builtin procedure should fail + with pytest.raises(Exception): + create_procedure(interactive_session, create_modern_graph, "count_vertices", "MATCH(n: software) return count(n);") + # Update the builtin procedure should fail + with pytest.raises(Exception): + update_procedure(interactive_session, create_modern_graph, "count_vertices", "A updated description") + # Call the builtin procedure + start_service_on_graph(interactive_session, create_modern_graph) + call_procedure(neo4j_session, create_modern_graph, "count_vertices", '"person"') + diff --git a/flex/openapi/openapi_coordinator.yaml b/flex/openapi/openapi_coordinator.yaml index d6403e6decea..9b134259fd93 100644 --- a/flex/openapi/openapi_coordinator.yaml +++ b/flex/openapi/openapi_coordinator.yaml @@ -533,6 +533,9 @@ components: type: array items: $ref: '#/components/schemas/Parameter' + option: + type: object + additionalProperties: true GetStoredProcResponse: allOf: diff --git a/flex/openapi/openapi_interactive.yaml b/flex/openapi/openapi_interactive.yaml index 0c077ba44fd6..763b54fd1763 100644 --- a/flex/openapi/openapi_interactive.yaml +++ b/flex/openapi/openapi_interactive.yaml @@ -1582,6 +1582,9 @@ components: enable: type: boolean example : true + option: + type: object + additionalProperties: true GetProcedureResponse: x-body-name: get_procedure_response allOf: diff --git a/flex/storages/metadata/graph_meta_store.cc b/flex/storages/metadata/graph_meta_store.cc index 9c21da1461bc..4cd0ccaf26cc 100644 --- a/flex/storages/metadata/graph_meta_store.cc +++ b/flex/storages/metadata/graph_meta_store.cc @@ -52,6 +52,32 @@ std::string read_file_to_string(const std::string& file_path) { return ""; } } +const std::vector& get_builtin_plugin_metas() { + static std::vector builtin_plugins; + static bool initialized = false; + if (!initialized) { + PluginMeta count_vertices; + count_vertices.id = "count_vertices"; + count_vertices.name = "count_vertices"; + count_vertices.description = "A builtin plugin to count vertices"; + count_vertices.enable = true; + count_vertices.runnable = true; + count_vertices.type = "cypher"; + count_vertices.creation_time = GetCurrentTimeStamp(); + count_vertices.update_time = GetCurrentTimeStamp(); + count_vertices.params.push_back({"labelName", PropertyType::kString}); + count_vertices.returns.push_back({"count", PropertyType::kInt32}); + initialized = true; + builtin_plugins.push_back(count_vertices); + } + return builtin_plugins; +} + +void append_builtin_plugins(std::vector& plugin_metas) { + auto builtin_plugin_metas = get_builtin_plugin_metas(); + plugin_metas.insert(plugin_metas.end(), builtin_plugin_metas.begin(), + builtin_plugin_metas.end()); +} UpdateGraphMetaRequest::UpdateGraphMetaRequest( int64_t data_update_time, const std::string& data_import_config) @@ -420,6 +446,8 @@ CreateGraphMetaRequest CreateGraphMetaRequest::FromJson( request.plugin_metas.push_back(PluginMeta::FromJson(plugin)); } } + // Add builtin plugins + append_builtin_plugins(request.plugin_metas); return request; } @@ -671,11 +699,6 @@ UpdatePluginMetaRequest UpdatePluginMetaRequest::FromJson( if (j.HasMember("enable")) { request.enable = j["enable"].GetBool(); } - // } catch (const std::exception& e) { - // LOG(ERROR) << "UpdatePluginMetaRequest::FromJson error: " << e.what() << - // " " - // << json; - // } return request; } diff --git a/flex/storages/metadata/graph_meta_store.h b/flex/storages/metadata/graph_meta_store.h index 8d166a309812..ae73605804b9 100644 --- a/flex/storages/metadata/graph_meta_store.h +++ b/flex/storages/metadata/graph_meta_store.h @@ -55,6 +55,8 @@ JobStatus parseFromString(const std::string& status_string); ////////////////// MetaData /////////////////////// struct PluginMeta; +const std::vector& get_builtin_plugin_metas(); + struct GraphMeta { GraphId id; std::string name; diff --git a/flex/storages/rt_mutable_graph/schema.cc b/flex/storages/rt_mutable_graph/schema.cc index 4c592f38268f..e01ec85cb4ae 100644 --- a/flex/storages/rt_mutable_graph/schema.cc +++ b/flex/storages/rt_mutable_graph/schema.cc @@ -20,6 +20,15 @@ namespace gs { +bool Schema::IsBuiltinPlugin(const std::string& plugin_name) { + for (uint8_t i = 0; i < BUILTIN_PLUGIN_NUM; i++) { + if (plugin_name == BUILTIN_PLUGIN_NAMES[i]) { + return true; + } + } + return false; +} + Schema::Schema() : has_multi_props_edge_(false){}; Schema::~Schema() = default; @@ -1235,6 +1244,11 @@ bool Schema::EmplacePlugins( LOG(ERROR) << "Too many plugins, max plugin id is " << MAX_PLUGIN_ID; return false; } + if (Schema::IsBuiltinPlugin(name_path.first)) { + LOG(WARNING) << "Plugin name " << name_path.first + << " is a built-in plugin, skipped"; + continue; + } if (name_path.second.empty()) { // if the path is empty, try to find from plugin_dir. plugin_names.insert(name_path.first); @@ -1275,6 +1289,11 @@ bool Schema::EmplacePlugins( } if (root["name"] && root["library"]) { std::string name = root["name"].as(); + if (Schema::IsBuiltinPlugin(name)) { + LOG(WARNING) << "Plugin name " << name + << " is a built-in plugin, skipped"; + continue; + } std::string path = root["library"].as(); if (plugin_names.find(name) != plugin_names.end()) { if (plugin_name_to_path_and_id_.find(name) != @@ -1302,6 +1321,7 @@ bool Schema::EmplacePlugins( << ", name or library not found."; } } + LOG(INFO) << "Load " << plugin_name_to_path_and_id_.size() << " plugins"; return true; } diff --git a/flex/storages/rt_mutable_graph/schema.h b/flex/storages/rt_mutable_graph/schema.h index bca189dd603a..21ef5a68f62e 100644 --- a/flex/storages/rt_mutable_graph/schema.h +++ b/flex/storages/rt_mutable_graph/schema.h @@ -31,10 +31,10 @@ class Schema { // How many built-in plugins are there. // Currently only one builtin plugin, SERVER_APP is supported. static constexpr uint8_t RESERVED_PLUGIN_NUM = 1; - static constexpr uint8_t MAX_PLUGIN_ID = 252; + static constexpr uint8_t MAX_PLUGIN_ID = 251; + static constexpr uint8_t ADHOC_READ_PLUGIN_ID = 253; static constexpr uint8_t HQPS_ADHOC_READ_PLUGIN_ID = 254; static constexpr uint8_t HQPS_ADHOC_WRITE_PLUGIN_ID = 255; - static constexpr uint8_t ADHOC_READ_PLUGIN_ID = 253; static constexpr const char* HQPS_ADHOC_READ_PLUGIN_ID_STR = "\xFE"; static constexpr const char* HQPS_ADHOC_WRITE_PLUGIN_ID_STR = "\xFF"; static constexpr const char* ADHOC_READ_PLUGIN_ID_STR = "\xFD"; @@ -43,10 +43,22 @@ class Schema { static constexpr const char* MAX_LENGTH_KEY = "max_length"; static constexpr const uint16_t STRING_DEFAULT_MAX_LENGTH = 256; + // The builtin plugins are reserved for the system. + static constexpr uint8_t BUILTIN_PLUGIN_NUM = 1; + static constexpr uint8_t BUILTIN_COUNT_VERTICES_PLUGIN_ID = 252; + static constexpr const char* BUILTIN_COUNT_VERTICES_PLUGIN_NAME = + "count_vertices"; + static constexpr const char* BUILTIN_PLUGIN_NAMES[BUILTIN_PLUGIN_NUM] = { + BUILTIN_COUNT_VERTICES_PLUGIN_NAME}; + static constexpr uint8_t BUILTIN_PLUGIN_IDS[BUILTIN_PLUGIN_NUM] = { + BUILTIN_COUNT_VERTICES_PLUGIN_ID}; + // An array containing all compatible versions of schema. static const std::vector COMPATIBLE_VERSIONS; static constexpr const char* DEFAULT_SCHEMA_VERSION = "v0.0"; + static bool IsBuiltinPlugin(const std::string& plugin_name); + using label_type = label_t; Schema(); ~Schema(); diff --git a/python/setup_gsctl.py b/python/setup_gsctl.py index 7a1d1e1425ca..82e2dc633eea 100644 --- a/python/setup_gsctl.py +++ b/python/setup_gsctl.py @@ -51,7 +51,7 @@ def parse_version(root, **kwargs): if platform.system() == "Linux" and platform.machine() == "aarch64" else "click >= 8.1.6" ), - "graphscope-flex >= 0.28.1", + "graphscope-flex >= 0.28.0", "treelib", "packaging", "pyyaml",