Skip to content

Commit

Permalink
make run with portal
Browse files Browse the repository at this point in the history
Signed-off-by: Lei Wang <[email protected]>
  • Loading branch information
doudoubobo committed Oct 17, 2024
1 parent f67800b commit a66aa55
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 15 deletions.
28 changes: 27 additions & 1 deletion coordinator/flex/server/controllers/data_source_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import requests
import json
import etcd3

def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501
"""bind_datasource_in_batch
Expand All @@ -23,6 +24,11 @@ def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
with open("/tmp/graph_id.txt", "r") as f:
existing_graph_id = f.read()
if graph_id != existing_graph_id:
return (f"Graph id {graph_id} not founded", 500)

gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"
Expand All @@ -47,7 +53,27 @@ def get_datasource_by_id(graph_id): # noqa: E501
:rtype: Union[SchemaMapping, Tuple[SchemaMapping, int], Tuple[SchemaMapping, int, Dict[str, str]]
"""
return 'do some magic!'
with open("/tmp/graph_id.txt", "r") as f:
existing_graph_id = f.read()
if graph_id != existing_graph_id:
return (f"Graph id {graph_id} not founded", 500)

etcd_server = os.getenv("ETCD_SERVICE", "etcd")
if not etcd_server.startswith(("http://", "https://")):
etcd_server = f"http://{etcd_server}"
etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_")
etcd_host = etcd_server.split("://")[1].split(":")[0]
etcd_port = etcd_server.split(":")[2]
etcd_client = etcd3.client(host=etcd_host, port=etcd_port)

try:
data_source_config, _ = etcd_client.get(etcd_prefix + "gart_data_source_json")
except Exception as e:
return "Failed to get data source: " + str(e), 500

data_source_config = json.loads(data_source_config.decode("utf-8"))

return (SchemaMapping.from_dict(data_source_config), 200)


def unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type): # noqa: E501
Expand Down
11 changes: 10 additions & 1 deletion coordinator/flex/server/controllers/deployment_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ def get_deployment_info(): # noqa: E501
:rtype: Union[RunningDeploymentInfo, Tuple[RunningDeploymentInfo, int], Tuple[RunningDeploymentInfo, int, Dict[str, str]]
"""
return 'do some magic!'
result_dict = {}
result_dict["cluster_type"] = "KUBERNETES"
with open ("/tmp/graph_schema_create_time.txt", "r") as f:
result_dict["creation_time"] = f.read()
result_dict["instance_name"] = "gart"
result_dict["frontend"] = "Cypher/Gremlin"
result_dict["engine"] = "Gaia"
result_dict["storage"] = "MutableCSR"
result_dict["version"] = "0.1.0"
return (RunningDeploymentInfo.from_dict(result_dict), 200)


def get_deployment_pod_log(pod_name, component, from_cache): # noqa: E501
Expand Down
90 changes: 83 additions & 7 deletions coordinator/flex/server/controllers/graph_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import time
import sys
import requests
from datetime import datetime

GRAPH_ID = None
SCHEMA_CREATE_TIME = None

def get_graph_schema():
property_data_type_mapping = {}
Expand Down Expand Up @@ -62,10 +64,10 @@ def get_graph_schema():
rg_mapping_str = rg_mapping_str.decode("utf-8")
break
try_times += 1
time.sleep(2)
time.sleep(1)
except Exception as e:
try_times += 1
time.sleep(2)
time.sleep(1)

if try_times == try_max_times:
return result_dict
Expand All @@ -81,10 +83,10 @@ def get_graph_schema():
table_schema_str = table_schema_str.decode("utf-8")
break
try_times += 1
time.sleep(2)
time.sleep(1)
except Exception as e:
try_times += 1
time.sleep(2)
time.sleep(1)

if try_times == try_max_times:
return result_dict
Expand All @@ -103,6 +105,7 @@ def get_graph_schema():
for idx in range(vertex_type_number):
vertex_type_dict = {}
vertex_type_dict["type_name"] = vertex_types[idx]["type_name"]
vertex_type_dict["primary_keys"] = vertex_types[idx]["primary_keys"]
table_name = vertex_types[idx]["dataSourceName"]
property_mappings = vertex_types[idx]["mappings"]
properties_array = []
Expand Down Expand Up @@ -211,8 +214,19 @@ def create_graph(create_graph_request): # noqa: E501
result_dict["graph_id"] = create_graph_request["name"]
global GRAPH_ID
GRAPH_ID = result_dict["graph_id"]

if os.path.exists("/tmp/graph_id.txt"):
return ("Graph schema already exists, do not submit again", 500)

with open("/tmp/graph_id.txt", "w") as f:
f.write(GRAPH_ID)
current_time = datetime.now()
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
global SCHEMA_CREATE_TIME
SCHEMA_CREATE_TIME = formatted_time
with open("/tmp/graph_schema_create_time.txt", "w") as f:
f.write(SCHEMA_CREATE_TIME)

gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"
Expand All @@ -222,6 +236,10 @@ def create_graph(create_graph_request): # noqa: E501
headers={"Content-Type": "application/json"},
data=json.dumps({"schema": json.dumps(create_graph_request)}),
)

if response.status_code != 200:
return (response.text, response.status_code)

return (CreateGraphResponse.from_dict(result_dict), response.status_code)


Expand All @@ -243,6 +261,10 @@ def create_graph_by_pgql(create_graph_by_pgql_request): # noqa: E501
result_dict["graph_id"] = create_graph_by_pgql_request["name"]
global GRAPH_ID
GRAPH_ID = result_dict["graph_id"]

if os.path.exists("/tmp/graph_id.txt"):
return ("Graph PGQL config already exists, do not submit again", 500)

with open("/tmp/graph_id.txt", "w") as f:
f.write(GRAPH_ID)
create_graph_by_pgql_request = create_graph_by_pgql_request["schema"]
Expand All @@ -253,6 +275,10 @@ def create_graph_by_pgql(create_graph_by_pgql_request): # noqa: E501
f"{gart_controller_server}/submit-pgql-config",
data={"schema": create_graph_by_pgql_request},
)

if response.status_code != 200:
return (response.text, response.status_code)

return (CreateGraphResponse.from_dict(result_dict), response.status_code)


Expand All @@ -274,6 +300,10 @@ def create_graph_by_yaml(create_graph_by_yaml_request): # noqa: E501
result_dict["graph_id"] = create_graph_by_yaml_request["name"]
global GRAPH_ID
GRAPH_ID = result_dict["graph_id"]

if os.path.exists("/tmp/graph_id.txt"):
return ("GART config already exists, do not submit again", 500)

with open("/tmp/graph_id.txt", "w") as f:
f.write(GRAPH_ID)
create_graph_request_yaml = create_graph_by_yaml_request["schema"]
Expand All @@ -284,6 +314,10 @@ def create_graph_by_yaml(create_graph_by_yaml_request): # noqa: E501
f"{gart_controller_server}/submit-config",
data={"schema": create_graph_request_yaml},
)

if response.status_code != 200:
return (response.text, response.status_code)

return (CreateGraphResponse.from_dict(result_dict), response.status_code)


Expand Down Expand Up @@ -361,12 +395,20 @@ def get_graph_all_available_versions(graph_id): # noqa: E501
:rtype: Union[List[GetGraphVersionResponse], Tuple[List[GetGraphVersionResponse], int], Tuple[List[GetGraphVersionResponse], int, Dict[str, str]]
"""
if graph_id != GRAPH_ID:
return (f"Graph {graph_id} does not exist...", 500)

gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"
response = requests.get(f"{gart_controller_server}/get-all-available-read-epochs")

if response.status_code != 200:
return (response.text, response.status_code)

result = []
all_versions = response.json()

for idx in range(len(all_versions)):
result_dict = {}
result_dict["version_id"] = str(all_versions[idx][0])
Expand All @@ -388,11 +430,23 @@ def get_graph_by_id(graph_id): # noqa: E501
:rtype: Union[GetGraphResponse, Tuple[GetGraphResponse, int], Tuple[GetGraphResponse, int, Dict[str, str]]
"""
if graph_id != GRAPH_ID:
return (f"Graph {graph_id} does not exist...", 500)

result_dict = get_graph_schema()
if not result_dict:
return (GetGraphResponse.from_dict(result_dict), 500)
return (f"Get graph by id {graph_id} failed...", 500)

result_dict["id"] = graph_id
result_dict["name"] = graph_id

with open("/tmp/graph_schema_create_time.txt", "r") as f:
result_dict["creation_time"] = f.read()
result_dict["schema_update_time"] = result_dict["creation_time"]

with open("/tmp/data_loading_job_created_time.txt", "r") as f:
result_dict["data_update_time"] = f.read()

return (GetGraphResponse.from_dict(result_dict), 200)


Expand All @@ -408,13 +462,20 @@ def get_graph_version_by_timestamp(graph_id, timestamp): # noqa: E501
:rtype: Union[GetGraphVersionResponse, Tuple[GetGraphVersionResponse, int], Tuple[GetGraphVersionResponse, int, Dict[str, str]]
"""
if graph_id != GRAPH_ID:
return (f"Graph {graph_id} does not exist...", 500)

gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"
response = requests.post(
f"{gart_controller_server}/get-read-epoch-by-timestamp",
data={"timestamp": timestamp},
)

if response.status_code != 200:
return (response.text, response.status_code)

return (GetGraphVersionResponse.from_dict(response.json()), 200)


Expand All @@ -428,12 +489,15 @@ def get_schema_by_id(graph_id): # noqa: E501
:rtype: Union[GetGraphSchemaResponse, Tuple[GetGraphSchemaResponse, int], Tuple[GetGraphSchemaResponse, int, Dict[str, str]]
"""
if graph_id != GRAPH_ID:
return (f"Graph {graph_id} does not exist...", 500)

result_dict = get_graph_schema()

if result_dict:
return (GetGraphSchemaResponse.from_dict(result_dict["schema"]), 200)
else:
return (GetGraphSchemaResponse.from_dict(result_dict["schema"]), 500)
return ("Get graph schema failed...", 500)


def import_schema_by_id(graph_id, create_graph_schema_request): # noqa: E501
Expand Down Expand Up @@ -462,8 +526,17 @@ def list_graphs(): # noqa: E501
:rtype: Union[List[GetGraphResponse], Tuple[List[GetGraphResponse], int], Tuple[List[GetGraphResponse], int, Dict[str, str]]
"""
result_dict = get_graph_schema()

if not result_dict:
return ([GetGraphResponse.from_dict(result_dict)], 500)
return ([GetGraphResponse.from_dict({})], 200)

with open("/tmp/graph_schema_create_time.txt", "r") as f:
result_dict["creation_time"] = f.read()
result_dict["schema_update_time"] = result_dict["creation_time"]

with open("/tmp/data_loading_job_created_time.txt", "r") as f:
result_dict["data_update_time"] = f.read()

return ([GetGraphResponse.from_dict(result_dict)], 200)


Expand All @@ -479,6 +552,9 @@ def read_graph_by_a_given_version(graph_id, change_graph_version_request): # no
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if graph_id != GRAPH_ID:
return (f"Graph {graph_id} does not exist...", 500)

gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"
Expand Down
24 changes: 24 additions & 0 deletions coordinator/flex/server/controllers/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import etcd3
from urllib.parse import urlparse
import json
from datetime import datetime

RUNNING = None
DATA_LOADING = None
DATA_LOADING_JOB_CREATED_TIME = None

def delete_job_by_id(job_id, delete_scheduler=None): # noqa: E501
"""delete_job_by_id
Expand Down Expand Up @@ -177,12 +180,33 @@ def submit_dataloading_job(graph_id, dataloading_job_config): # noqa: E501
if not isinstance(dataloading_job_config, dict):
dataloading_job_config = json.loads(dataloading_job_config)

with open("/tmp/graph_id.txt", "r") as f:
existing_graph_id = f.read()
if graph_id != existing_graph_id:
return (f"Graph id {graph_id} not founded", 500)

global DATA_LOADING

if DATA_LOADING is not None:
return ("Data loading is already in progress", 500)

current_time = datetime.now()
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
global DATA_LOADING_JOB_CREATED_TIME
DATA_LOADING_JOB_CREATED_TIME = formatted_time
with open("/tmp/data_loading_job_created_time.txt", "w") as f:
f.write(DATA_LOADING_JOB_CREATED_TIME)

response = requests.post(
f"{gart_controller_server}/submit-data-loading",
headers={"Content-Type": "application/json"},
data=json.dumps({"schema": json.dumps(dataloading_job_config)}),
)

if response.status_code != 200:
return (response.text, response.status_code)

result_dict = {}
result_dict["job_id"] = "0"
DATA_LOADING = "RUNNING"
return (CreateDataloadingJobResponse.from_dict(result_dict), response.status_code)
Loading

0 comments on commit a66aa55

Please sign in to comment.