From a66aa55dc9743985f1528c06481ecc9ef68f9890 Mon Sep 17 00:00:00 2001 From: Lei Wang Date: Thu, 17 Oct 2024 17:49:59 +0800 Subject: [PATCH] make run with portal Signed-off-by: Lei Wang --- .../controllers/data_source_controller.py | 28 +++++- .../controllers/deployment_controller.py | 11 ++- .../server/controllers/graph_controller.py | 90 +++++++++++++++++-- .../flex/server/controllers/job_controller.py | 24 +++++ scripts/controller.py | 30 +++++-- vegito/test/schema/ldbc-graph-schema.json | 23 +++++ 6 files changed, 191 insertions(+), 15 deletions(-) diff --git a/coordinator/flex/server/controllers/data_source_controller.py b/coordinator/flex/server/controllers/data_source_controller.py index b02473d..dffd502 100644 --- a/coordinator/flex/server/controllers/data_source_controller.py +++ b/coordinator/flex/server/controllers/data_source_controller.py @@ -10,6 +10,7 @@ import os import requests import json +import etcd3 def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501 """bind_datasource_in_batch @@ -23,6 +24,11 @@ def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ + with open("/tmp/graph_id.txt", "r") as f: + existing_graph_id = f.read() + if graph_id != existing_graph_id: + return (f"Graph id {graph_id} not founded", 500) + gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080") if not gart_controller_server.startswith(("http://", "https://")): gart_controller_server = f"http://{gart_controller_server}" @@ -47,7 +53,27 @@ def get_datasource_by_id(graph_id): # noqa: E501 :rtype: Union[SchemaMapping, Tuple[SchemaMapping, int], Tuple[SchemaMapping, int, Dict[str, str]] """ - return 'do some magic!' + with open("/tmp/graph_id.txt", "r") as f: + existing_graph_id = f.read() + if graph_id != existing_graph_id: + return (f"Graph id {graph_id} not founded", 500) + + etcd_server = os.getenv("ETCD_SERVICE", "etcd") + if not etcd_server.startswith(("http://", "https://")): + etcd_server = f"http://{etcd_server}" + etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_") + etcd_host = etcd_server.split("://")[1].split(":")[0] + etcd_port = etcd_server.split(":")[2] + etcd_client = etcd3.client(host=etcd_host, port=etcd_port) + + try: + data_source_config, _ = etcd_client.get(etcd_prefix + "gart_data_source_json") + except Exception as e: + return "Failed to get data source: " + str(e), 500 + + data_source_config = json.loads(data_source_config.decode("utf-8")) + + return (SchemaMapping.from_dict(data_source_config), 200) def unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type): # noqa: E501 diff --git a/coordinator/flex/server/controllers/deployment_controller.py b/coordinator/flex/server/controllers/deployment_controller.py index cfbfeca..ead2939 100644 --- a/coordinator/flex/server/controllers/deployment_controller.py +++ b/coordinator/flex/server/controllers/deployment_controller.py @@ -20,7 +20,16 @@ def get_deployment_info(): # noqa: E501 :rtype: Union[RunningDeploymentInfo, Tuple[RunningDeploymentInfo, int], Tuple[RunningDeploymentInfo, int, Dict[str, str]] """ - return 'do some magic!' + result_dict = {} + result_dict["cluster_type"] = "KUBERNETES" + with open ("/tmp/graph_schema_create_time.txt", "r") as f: + result_dict["creation_time"] = f.read() + result_dict["instance_name"] = "gart" + result_dict["frontend"] = "Cypher/Gremlin" + result_dict["engine"] = "Gaia" + result_dict["storage"] = "MutableCSR" + result_dict["version"] = "0.1.0" + return (RunningDeploymentInfo.from_dict(result_dict), 200) def get_deployment_pod_log(pod_name, component, from_cache): # noqa: E501 diff --git a/coordinator/flex/server/controllers/graph_controller.py b/coordinator/flex/server/controllers/graph_controller.py index d159151..66e416b 100644 --- a/coordinator/flex/server/controllers/graph_controller.py +++ b/coordinator/flex/server/controllers/graph_controller.py @@ -25,8 +25,10 @@ import time import sys import requests +from datetime import datetime GRAPH_ID = None +SCHEMA_CREATE_TIME = None def get_graph_schema(): property_data_type_mapping = {} @@ -62,10 +64,10 @@ def get_graph_schema(): rg_mapping_str = rg_mapping_str.decode("utf-8") break try_times += 1 - time.sleep(2) + time.sleep(1) except Exception as e: try_times += 1 - time.sleep(2) + time.sleep(1) if try_times == try_max_times: return result_dict @@ -81,10 +83,10 @@ def get_graph_schema(): table_schema_str = table_schema_str.decode("utf-8") break try_times += 1 - time.sleep(2) + time.sleep(1) except Exception as e: try_times += 1 - time.sleep(2) + time.sleep(1) if try_times == try_max_times: return result_dict @@ -103,6 +105,7 @@ def get_graph_schema(): for idx in range(vertex_type_number): vertex_type_dict = {} vertex_type_dict["type_name"] = vertex_types[idx]["type_name"] + vertex_type_dict["primary_keys"] = vertex_types[idx]["primary_keys"] table_name = vertex_types[idx]["dataSourceName"] property_mappings = vertex_types[idx]["mappings"] properties_array = [] @@ -211,8 +214,19 @@ def create_graph(create_graph_request): # noqa: E501 result_dict["graph_id"] = create_graph_request["name"] global GRAPH_ID GRAPH_ID = result_dict["graph_id"] + + if os.path.exists("/tmp/graph_id.txt"): + return ("Graph schema already exists, do not submit again", 500) + with open("/tmp/graph_id.txt", "w") as f: f.write(GRAPH_ID) + current_time = datetime.now() + formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') + global SCHEMA_CREATE_TIME + SCHEMA_CREATE_TIME = formatted_time + with open("/tmp/graph_schema_create_time.txt", "w") as f: + f.write(SCHEMA_CREATE_TIME) + gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080") if not gart_controller_server.startswith(("http://", "https://")): gart_controller_server = f"http://{gart_controller_server}" @@ -222,6 +236,10 @@ def create_graph(create_graph_request): # noqa: E501 headers={"Content-Type": "application/json"}, data=json.dumps({"schema": json.dumps(create_graph_request)}), ) + + if response.status_code != 200: + return (response.text, response.status_code) + return (CreateGraphResponse.from_dict(result_dict), response.status_code) @@ -243,6 +261,10 @@ def create_graph_by_pgql(create_graph_by_pgql_request): # noqa: E501 result_dict["graph_id"] = create_graph_by_pgql_request["name"] global GRAPH_ID GRAPH_ID = result_dict["graph_id"] + + if os.path.exists("/tmp/graph_id.txt"): + return ("Graph PGQL config already exists, do not submit again", 500) + with open("/tmp/graph_id.txt", "w") as f: f.write(GRAPH_ID) create_graph_by_pgql_request = create_graph_by_pgql_request["schema"] @@ -253,6 +275,10 @@ def create_graph_by_pgql(create_graph_by_pgql_request): # noqa: E501 f"{gart_controller_server}/submit-pgql-config", data={"schema": create_graph_by_pgql_request}, ) + + if response.status_code != 200: + return (response.text, response.status_code) + return (CreateGraphResponse.from_dict(result_dict), response.status_code) @@ -274,6 +300,10 @@ def create_graph_by_yaml(create_graph_by_yaml_request): # noqa: E501 result_dict["graph_id"] = create_graph_by_yaml_request["name"] global GRAPH_ID GRAPH_ID = result_dict["graph_id"] + + if os.path.exists("/tmp/graph_id.txt"): + return ("GART config already exists, do not submit again", 500) + with open("/tmp/graph_id.txt", "w") as f: f.write(GRAPH_ID) create_graph_request_yaml = create_graph_by_yaml_request["schema"] @@ -284,6 +314,10 @@ def create_graph_by_yaml(create_graph_by_yaml_request): # noqa: E501 f"{gart_controller_server}/submit-config", data={"schema": create_graph_request_yaml}, ) + + if response.status_code != 200: + return (response.text, response.status_code) + return (CreateGraphResponse.from_dict(result_dict), response.status_code) @@ -361,12 +395,20 @@ def get_graph_all_available_versions(graph_id): # noqa: E501 :rtype: Union[List[GetGraphVersionResponse], Tuple[List[GetGraphVersionResponse], int], Tuple[List[GetGraphVersionResponse], int, Dict[str, str]] """ + if graph_id != GRAPH_ID: + return (f"Graph {graph_id} does not exist...", 500) + gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080") if not gart_controller_server.startswith(("http://", "https://")): gart_controller_server = f"http://{gart_controller_server}" response = requests.get(f"{gart_controller_server}/get-all-available-read-epochs") + + if response.status_code != 200: + return (response.text, response.status_code) + result = [] all_versions = response.json() + for idx in range(len(all_versions)): result_dict = {} result_dict["version_id"] = str(all_versions[idx][0]) @@ -388,11 +430,23 @@ def get_graph_by_id(graph_id): # noqa: E501 :rtype: Union[GetGraphResponse, Tuple[GetGraphResponse, int], Tuple[GetGraphResponse, int, Dict[str, str]] """ + if graph_id != GRAPH_ID: + return (f"Graph {graph_id} does not exist...", 500) + result_dict = get_graph_schema() if not result_dict: - return (GetGraphResponse.from_dict(result_dict), 500) + return (f"Get graph by id {graph_id} failed...", 500) + result_dict["id"] = graph_id result_dict["name"] = graph_id + + with open("/tmp/graph_schema_create_time.txt", "r") as f: + result_dict["creation_time"] = f.read() + result_dict["schema_update_time"] = result_dict["creation_time"] + + with open("/tmp/data_loading_job_created_time.txt", "r") as f: + result_dict["data_update_time"] = f.read() + return (GetGraphResponse.from_dict(result_dict), 200) @@ -408,6 +462,9 @@ def get_graph_version_by_timestamp(graph_id, timestamp): # noqa: E501 :rtype: Union[GetGraphVersionResponse, Tuple[GetGraphVersionResponse, int], Tuple[GetGraphVersionResponse, int, Dict[str, str]] """ + if graph_id != GRAPH_ID: + return (f"Graph {graph_id} does not exist...", 500) + gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080") if not gart_controller_server.startswith(("http://", "https://")): gart_controller_server = f"http://{gart_controller_server}" @@ -415,6 +472,10 @@ def get_graph_version_by_timestamp(graph_id, timestamp): # noqa: E501 f"{gart_controller_server}/get-read-epoch-by-timestamp", data={"timestamp": timestamp}, ) + + if response.status_code != 200: + return (response.text, response.status_code) + return (GetGraphVersionResponse.from_dict(response.json()), 200) @@ -428,12 +489,15 @@ def get_schema_by_id(graph_id): # noqa: E501 :rtype: Union[GetGraphSchemaResponse, Tuple[GetGraphSchemaResponse, int], Tuple[GetGraphSchemaResponse, int, Dict[str, str]] """ + if graph_id != GRAPH_ID: + return (f"Graph {graph_id} does not exist...", 500) + result_dict = get_graph_schema() if result_dict: return (GetGraphSchemaResponse.from_dict(result_dict["schema"]), 200) else: - return (GetGraphSchemaResponse.from_dict(result_dict["schema"]), 500) + return ("Get graph schema failed...", 500) def import_schema_by_id(graph_id, create_graph_schema_request): # noqa: E501 @@ -462,8 +526,17 @@ def list_graphs(): # noqa: E501 :rtype: Union[List[GetGraphResponse], Tuple[List[GetGraphResponse], int], Tuple[List[GetGraphResponse], int, Dict[str, str]] """ result_dict = get_graph_schema() + if not result_dict: - return ([GetGraphResponse.from_dict(result_dict)], 500) + return ([GetGraphResponse.from_dict({})], 200) + + with open("/tmp/graph_schema_create_time.txt", "r") as f: + result_dict["creation_time"] = f.read() + result_dict["schema_update_time"] = result_dict["creation_time"] + + with open("/tmp/data_loading_job_created_time.txt", "r") as f: + result_dict["data_update_time"] = f.read() + return ([GetGraphResponse.from_dict(result_dict)], 200) @@ -479,6 +552,9 @@ def read_graph_by_a_given_version(graph_id, change_graph_version_request): # no :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ + if graph_id != GRAPH_ID: + return (f"Graph {graph_id} does not exist...", 500) + gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080") if not gart_controller_server.startswith(("http://", "https://")): gart_controller_server = f"http://{gart_controller_server}" diff --git a/coordinator/flex/server/controllers/job_controller.py b/coordinator/flex/server/controllers/job_controller.py index 8ddcb2a..4f46e9c 100644 --- a/coordinator/flex/server/controllers/job_controller.py +++ b/coordinator/flex/server/controllers/job_controller.py @@ -15,8 +15,11 @@ import etcd3 from urllib.parse import urlparse import json +from datetime import datetime RUNNING = None +DATA_LOADING = None +DATA_LOADING_JOB_CREATED_TIME = None def delete_job_by_id(job_id, delete_scheduler=None): # noqa: E501 """delete_job_by_id @@ -177,12 +180,33 @@ def submit_dataloading_job(graph_id, dataloading_job_config): # noqa: E501 if not isinstance(dataloading_job_config, dict): dataloading_job_config = json.loads(dataloading_job_config) + with open("/tmp/graph_id.txt", "r") as f: + existing_graph_id = f.read() + if graph_id != existing_graph_id: + return (f"Graph id {graph_id} not founded", 500) + + global DATA_LOADING + + if DATA_LOADING is not None: + return ("Data loading is already in progress", 500) + + current_time = datetime.now() + formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') + global DATA_LOADING_JOB_CREATED_TIME + DATA_LOADING_JOB_CREATED_TIME = formatted_time + with open("/tmp/data_loading_job_created_time.txt", "w") as f: + f.write(DATA_LOADING_JOB_CREATED_TIME) + response = requests.post( f"{gart_controller_server}/submit-data-loading", headers={"Content-Type": "application/json"}, data=json.dumps({"schema": json.dumps(dataloading_job_config)}), ) + if response.status_code != 200: + return (response.text, response.status_code) + result_dict = {} result_dict["job_id"] = "0" + DATA_LOADING = "RUNNING" return (CreateDataloadingJobResponse.from_dict(result_dict), response.status_code) diff --git a/scripts/controller.py b/scripts/controller.py index 11fdbad..3336f8b 100755 --- a/scripts/controller.py +++ b/scripts/controller.py @@ -41,6 +41,10 @@ def submit_config(): etcd_host = etcd_server.split("://")[1].split(":")[0] etcd_port = etcd_server.split(":")[2] etcd_client = etcd3.client(host=etcd_host, port=etcd_port) + + if etcd_client.get(etcd_prefix + "gart_rg_mapping_yaml")[0] is not None: + return "Config already exists", 500 + while True: try: etcd_client.put(etcd_prefix + "gart_rg_mapping_yaml", content) @@ -84,6 +88,10 @@ def submit_pgql_config(): etcd_host = etcd_server.split("://")[1].split(":")[0] etcd_port = etcd_server.split(":")[2] etcd_client = etcd3.client(host=etcd_host, port=etcd_port) + + if etcd_client.get(etcd_prefix + "gart_rg_mapping_yaml")[0] is not None: + return "PGQL config already exists", 500 + while True: try: etcd_client.put(etcd_prefix + "gart_rg_mapping_yaml", yaml_content) @@ -104,8 +112,10 @@ def submit_graph_schema(): etcd_host = etcd_server.split("://")[1].split(":")[0] etcd_port = etcd_server.split(":")[2] etcd_client = etcd3.client(host=etcd_host, port=etcd_port) - with open("/tmp/graph_schema.json", "wb") as f: - f.write(graph_schema) + + if etcd_client.get(etcd_prefix + "gart_graph_schema_json")[0] is not None: + return "Graph schema already exists", 500 + try: etcd_client.put(etcd_prefix + "gart_graph_schema_json", graph_schema) return "Graph schema submitted", 200 @@ -125,6 +135,9 @@ def submit_data_source(): etcd_port = etcd_server.split(":")[2] etcd_client = etcd3.client(host=etcd_host, port=etcd_port) + if etcd_client.get(etcd_prefix + "gart_data_source_json")[0] is not None: + return "Data source already exists", 500 + try: etcd_client.put(etcd_prefix + "gart_data_source_json", data_source_config) return "Data source submitted", 200 @@ -141,6 +154,7 @@ def submit_data_loading(): etcd_host = etcd_server.split("://")[1].split(":")[0] etcd_port = etcd_server.split(":")[2] etcd_client = etcd3.client(host=etcd_host, port=etcd_port) + try: graph_schema, _ = etcd_client.get(etcd_prefix + "gart_graph_schema_json") except Exception as e: @@ -169,6 +183,9 @@ def submit_data_loading(): for vertex_id in range(len(vertex_types_info)): vertex_type_element = {} vertex_type_element["type_name"] = vertex_types_info[vertex_id]["type_name"] + vertex_type_element["primary_keys"] = vertex_types_info[vertex_id][ + "primary_keys" + ] mappings_list = [] for table_id in range(len(vertex_types_info)): if ( @@ -207,6 +224,9 @@ def submit_data_loading(): edge_types_info = graph_schema["edge_types"] for edge_id in range(len(edge_types_info)): edge_type_element = {} + edge_type_element["undirected"] = not edge_types_info[edge_id].get( + "directed", True + ) edge_type_element["type_pair"] = {} edge_type_element["type_pair"]["edge"] = edge_types_info[edge_id]["type_name"] edge_type_element["type_pair"]["source_vertex"] = edge_types_info[edge_id][ @@ -318,8 +338,6 @@ def get_graph_schema(): def get_all_available_read_epochs(): all_epochs = get_all_available_read_epochs_internal()[0] all_epochs.reverse() - if len(all_epochs) == 0: - return "No available read epochs", 200 return json.dumps(all_epochs), 200 @@ -366,7 +384,7 @@ def get_read_epoch_by_timestamp(): "num_edges": num_edges, } return json.dumps(result), 200 - return "No read epoch found", 200 + return json.dumps({}), 200 @app.route("/run-gae-task", methods=["POST"]) @@ -470,7 +488,7 @@ def change_read_epoch(): ): break time.sleep(0.5) - return "Read epoch changed", 200 + return f"Read version changed to version {read_epoch}", 200 def get_pod_ips(namespace, label_selector): diff --git a/vegito/test/schema/ldbc-graph-schema.json b/vegito/test/schema/ldbc-graph-schema.json index ad86587..f107adc 100644 --- a/vegito/test/schema/ldbc-graph-schema.json +++ b/vegito/test/schema/ldbc-graph-schema.json @@ -321,6 +321,7 @@ "edge_types": [ { "type_name": "org_islocationin", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "organisation", @@ -337,6 +338,7 @@ }, { "type_name": "ispartof", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "place", @@ -353,6 +355,7 @@ }, { "type_name": "issubclassof", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "tagclass", @@ -369,6 +372,7 @@ }, { "type_name": "hastype", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "tag", @@ -385,6 +389,7 @@ }, { "type_name": "comment_hascreator", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "comment", @@ -401,6 +406,7 @@ }, { "type_name": "comment_hastag", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "comment", @@ -417,6 +423,7 @@ }, { "type_name": "comment_islocationin", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "comment", @@ -433,6 +440,7 @@ }, { "type_name": "replyof_comment", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "comment", @@ -449,6 +457,7 @@ }, { "type_name": "replyof_post", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "comment", @@ -465,6 +474,7 @@ }, { "type_name": "post_hascreator", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "post", @@ -481,6 +491,7 @@ }, { "type_name": "post_hastag", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "post", @@ -497,6 +508,7 @@ }, { "type_name": "post_islocationin", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "post", @@ -513,6 +525,7 @@ }, { "type_name": "forum_containerof", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "forum", @@ -529,6 +542,7 @@ }, { "type_name": "forum_hasmoderator", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "forum", @@ -545,6 +559,7 @@ }, { "type_name": "forum_hastag", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "forum", @@ -561,6 +576,7 @@ }, { "type_name": "person_hasinterest", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "person", @@ -577,6 +593,7 @@ }, { "type_name": "person_islocationin", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "person", @@ -593,6 +610,7 @@ }, { "type_name": "forum_hasmember", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "forum", @@ -614,6 +632,7 @@ }, { "type_name": "knows", + "directed": false, "vertex_type_pair_relations": [ { "source_vertex": "person", @@ -635,6 +654,7 @@ }, { "type_name": "likes_comment", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "person", @@ -656,6 +676,7 @@ }, { "type_name": "likes_post", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "person", @@ -677,6 +698,7 @@ }, { "type_name": "studyat", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "person", @@ -698,6 +720,7 @@ }, { "type_name": "workat", + "directed": true, "vertex_type_pair_relations": [ { "source_vertex": "person",