Skip to content

Commit

Permalink
support graph schema and data source api in coordinator
Browse files Browse the repository at this point in the history
Signed-off-by: Lei Wang <[email protected]>
  • Loading branch information
doudoubobo committed Sep 27, 2024
1 parent 94a1033 commit 022727c
Show file tree
Hide file tree
Showing 7 changed files with 2,058 additions and 25 deletions.
20 changes: 16 additions & 4 deletions coordinator/flex/server/controllers/data_source_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from flex.server.models.schema_mapping import SchemaMapping # noqa: E501
from flex.server import util

import os
import requests
import json

def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501
"""bind_datasource_in_batch
Expand All @@ -20,10 +23,19 @@ def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if connexion.request.is_json:
schema_mapping = SchemaMapping.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'

gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"

if not isinstance(schema_mapping, dict):
schema_mapping = json.loads(schema_mapping)

response = requests.post(
f"{gart_controller_server}/submit-data-source",
headers={"Content-Type": "application/json"},
data=json.dumps({"schema": json.dumps(schema_mapping)}),
)
return (response.text, response.status_code)

def get_datasource_by_id(graph_id): # noqa: E501
"""get_datasource_by_id
Expand Down
8 changes: 4 additions & 4 deletions coordinator/flex/server/controllers/graph_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ def create_graph(create_graph_request): # noqa: E501
result_dict["graph_id"] = create_graph_request["name"]
global GRAPH_ID
GRAPH_ID = result_dict["graph_id"]
create_graph_request = create_graph_request["schema"]
create_graph_request_yaml = yaml.dump(create_graph_request)
gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"

response = requests.post(
f"{gart_controller_server}/submit-config",
data={"schema": create_graph_request_yaml},
f"{gart_controller_server}/submit-graph-schema",
headers={"Content-Type": "application/json"},
data=json.dumps({"schema": json.dumps(create_graph_request)}),
)
return (CreateGraphResponse.from_dict(result_dict), response.status_code)

Expand Down
20 changes: 17 additions & 3 deletions coordinator/flex/server/controllers/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import os
import etcd3
from urllib.parse import urlparse
import json

RUNNING = None

Expand Down Expand Up @@ -169,6 +170,19 @@ def submit_dataloading_job(graph_id, dataloading_job_config): # noqa: E501
:rtype: Union[CreateDataloadingJobResponse, Tuple[CreateDataloadingJobResponse, int], Tuple[CreateDataloadingJobResponse, int, Dict[str, str]]
"""
if connexion.request.is_json:
dataloading_job_config = DataloadingJobConfig.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'
gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"

if not isinstance(dataloading_job_config, dict):
dataloading_job_config = json.loads(dataloading_job_config)

response = requests.post(
f"{gart_controller_server}/submit-data-loading",
headers={"Content-Type": "application/json"},
data=json.dumps({"schema": json.dumps(dataloading_job_config)}),
)

result_dict = {}
result_dict["job_id"] = "0"
return (CreateDataloadingJobResponse.from_dict(result_dict), response.status_code)
205 changes: 198 additions & 7 deletions scripts/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import socket
import json
from datetime import datetime, timezone
import yaml

app = Flask(__name__)
port = int(os.getenv("CONTROLLER_FLASK_PORT", 5000))
Expand All @@ -29,7 +30,7 @@ def submit_config():
except Exception as e:
return jsonify({"error": str(e)}), 400
elif "schema" in request.form:
content = request.form["schema"].encode('utf-8')
content = request.form["schema"].encode("utf-8")
else:
return jsonify({"error": "No file part or config string in the request"}), 400

Expand Down Expand Up @@ -60,7 +61,7 @@ def submit_pgql_config():
except Exception as e:
return jsonify({"error": str(e)}), 400
elif "schema" in request.form:
content = request.form["schema"].encode('utf-8')
content = request.form["schema"].encode("utf-8")
else:
return jsonify({"error": "No file part or config string in the request"}), 400

Expand Down Expand Up @@ -90,7 +91,186 @@ def submit_pgql_config():
except Exception as e:
time.sleep(5)
return "PGQL config submitted", 200



@app.route("/submit-graph-schema", methods=["POST"])
def submit_graph_schema():
graph_schema = request.json.get("schema").encode("utf-8")
# graph_schema = request.form["schema"].encode('utf-8')
etcd_server = os.getenv("ETCD_SERVICE", "etcd")
if not etcd_server.startswith(("http://", "https://")):
etcd_server = f"http://{etcd_server}"
etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_")
etcd_host = etcd_server.split("://")[1].split(":")[0]
etcd_port = etcd_server.split(":")[2]
etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
with open("/tmp/graph_schema.json", "wb") as f:
f.write(graph_schema)
try:
etcd_client.put(etcd_prefix + "gart_graph_schema_json", graph_schema)
return "Graph schema submitted", 200
except Exception as e:
return "Failed to submit graph schema: " + str(e), 500


@app.route("/submit-data-source", methods=["POST"])
def submit_data_source():
data_source_config = request.json.get("schema").encode("utf-8")

etcd_server = os.getenv("ETCD_SERVICE", "etcd")
if not etcd_server.startswith(("http://", "https://")):
etcd_server = f"http://{etcd_server}"
etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_")
etcd_host = etcd_server.split("://")[1].split(":")[0]
etcd_port = etcd_server.split(":")[2]
etcd_client = etcd3.client(host=etcd_host, port=etcd_port)

try:
etcd_client.put(etcd_prefix + "gart_data_source_json", data_source_config)
return "Data source submitted", 200
except Exception as e:
return "Failed to submit data source: " + str(e), 500


@app.route("/submit-data-loading", methods=["POST"])
def submit_data_loading():
etcd_server = os.getenv("ETCD_SERVICE", "etcd")
if not etcd_server.startswith(("http://", "https://")):
etcd_server = f"http://{etcd_server}"
etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_")
etcd_host = etcd_server.split("://")[1].split(":")[0]
etcd_port = etcd_server.split(":")[2]
etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
try:
graph_schema, _ = etcd_client.get(etcd_prefix + "gart_graph_schema_json")
except Exception as e:
return "Failed to get graph schema: " + str(e), 500

try:
data_source_config, _ = etcd_client.get(etcd_prefix + "gart_data_source_json")
except Exception as e:
return "Failed to get data source: " + str(e), 500

graph_schema = json.loads(graph_schema.decode("utf-8"))
data_source_config = json.loads(data_source_config.decode("utf-8"))
result_dict = {}
result_dict["graph"] = graph_schema["name"]
graph_schema = graph_schema["schema"]
result_dict["loadingConfig"] = {}
result_dict["loadingConfig"]["dataSource"] = "rdbms"
result_dict["loadingConfig"]["method"] = "append"
result_dict["loadingConfig"]["enableRowStore"] = False
db_name = os.getenv("DB_NAME", "rdbms")
result_dict["loadingConfig"]["database"] = db_name

vertex_mappings_dict = {}
vertex_types_list = []
vertex_types_info = graph_schema["vertex_types"]
for vertex_id in range(len(vertex_types_info)):
vertex_type_element = {}
vertex_type_element["type_name"] = vertex_types_info[vertex_id]["type_name"]
mappings_list = []
for table_id in range(len(vertex_types_info)):
if (
vertex_type_element["type_name"]
== data_source_config["vertex_mappings"][table_id]["type_name"]
):
vertex_type_element["dataSourceName"] = data_source_config[
"vertex_mappings"
][table_id]["inputs"][0]
pk_prop_name = vertex_types_info[vertex_id]["primary_keys"][0]
column_mappings = data_source_config["vertex_mappings"][table_id][
"column_mappings"
]
for column_id in range(len(column_mappings)):
mappings_element = {}
mappings_element["property"] = column_mappings[column_id][
"property"
]
mappings_element["dataField"] = {}
mappings_element["dataField"]["name"] = column_mappings[column_id][
"column"
]["name"]
mappings_list.append(mappings_element)
if pk_prop_name == column_mappings[column_id]["property"]:
vertex_type_element["idFieldName"] = column_mappings[column_id][
"column"
]["name"]
break
vertex_type_element["mappings"] = mappings_list
vertex_types_list.append(vertex_type_element)

vertex_mappings_dict["vertex_types"] = vertex_types_list

edge_mappings_dict = {}
edge_types_list = []
edge_types_info = graph_schema["edge_types"]
for edge_id in range(len(edge_types_info)):
edge_type_element = {}
edge_type_element["type_pair"] = {}
edge_type_element["type_pair"]["edge"] = edge_types_info[edge_id]["type_name"]
edge_type_element["type_pair"]["source_vertex"] = edge_types_info[edge_id][
"vertex_type_pair_relations"
][0]["source_vertex"]
edge_type_element["type_pair"]["destination_vertex"] = edge_types_info[edge_id][
"vertex_type_pair_relations"
][0]["destination_vertex"]
for table_id in range(len(edge_types_info)):
if (
edge_type_element["type_pair"]["edge"]
== data_source_config["edge_mappings"][table_id]["type_triplet"]["edge"]
):
edge_type_element["dataSourceName"] = data_source_config[
"edge_mappings"
][table_id]["inputs"][0]
edge_type_element["sourceVertexMappings"] = [
{
"dataField": {
"name": data_source_config["edge_mappings"][table_id][
"source_vertex_mappings"
][0]["column"]["name"]
}
}
]
edge_type_element["destinationVertexMappings"] = [
{
"dataField": {
"name": data_source_config["edge_mappings"][table_id][
"destination_vertex_mappings"
][0]["column"]["name"]
}
}
]
data_field_mappings_list = []
column_mappings = data_source_config["edge_mappings"][table_id][
"column_mappings"
]
for column_id in range(len(column_mappings)):
mappings_element = {}
mappings_element["property"] = column_mappings[column_id][
"property"
]
mappings_element["dataField"] = {}
mappings_element["dataField"]["name"] = column_mappings[column_id][
"column"
]["name"]
data_field_mappings_list.append(mappings_element)
edge_type_element["dataFieldMappings"] = data_field_mappings_list
break
edge_types_list.append(edge_type_element)

edge_mappings_dict["edge_types"] = edge_types_list

result_dict["vertexMappings"] = vertex_mappings_dict
result_dict["edgeMappings"] = edge_mappings_dict

result_dict_str = yaml.dump(result_dict)

try:
etcd_client.put(etcd_prefix + "gart_rg_mapping_yaml", result_dict_str)
return "Data loading config submitted", 200
except Exception as e:
return "Failed to submit data loading config: " + str(e), 500


@app.route("/control/pause", methods=["POST"])
Expand Down Expand Up @@ -155,13 +335,22 @@ def get_read_epoch_by_timestamp():
unix_time = int(dt.timestamp())
epoch_unix_time_pairs = get_all_available_read_epochs_internal()[1]
# iterate through the list of epoch_unix_time pairs from end to start
for epoch, unix_time_epoch, num_vertices, num_edges in reversed(epoch_unix_time_pairs):
for epoch, unix_time_epoch, num_vertices, num_edges in reversed(
epoch_unix_time_pairs
):
if unix_time_epoch <= unix_time:
converted_time = datetime.fromtimestamp(unix_time_epoch)
# convert time into local time zone
converted_time = converted_time.replace(tzinfo=timezone.utc).astimezone(tz=None)
converted_time = converted_time.replace(tzinfo=timezone.utc).astimezone(
tz=None
)
formatted_time = converted_time.strftime("%Y-%m-%d %H:%M:%S")
result = {"version_id": str(epoch), "creation_time": formatted_time, "num_vertices": num_vertices, "num_edges": num_edges}
result = {
"version_id": str(epoch),
"creation_time": formatted_time,
"num_vertices": num_vertices,
"num_edges": num_edges,
}
return json.dumps(result), 200
return "No read epoch found", 200

Expand Down Expand Up @@ -385,7 +574,9 @@ def get_all_available_read_epochs_internal():
converted_time = converted_time.replace(tzinfo=timezone.utc).astimezone(tz=None)
formatted_time = converted_time.strftime("%Y-%m-%d %H:%M:%S")
available_epochs.append([epoch, formatted_time, num_vertices, num_edges])
available_epochs_internal.append([epoch, latest_timestamp, num_vertices, num_edges])
available_epochs_internal.append(
[epoch, latest_timestamp, num_vertices, num_edges]
)
return [available_epochs, available_epochs_internal]


Expand Down
16 changes: 9 additions & 7 deletions scripts/gart_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def get_version_by_timestamp(ctx, timestamp):
return
format_timestamp = timestamp.replace(" ", "%20")
response = requests.get(
f"{endpoint}/api/v1/graph/{GRAPH_ID}/version/{format_timestamp}")
f"{endpoint}/api/v1/graph/{GRAPH_ID}/version/{format_timestamp}"
)
click.echo(f"Version at {timestamp}: {response.text}")


Expand All @@ -133,13 +134,13 @@ def submit_config(ctx, config_path):
payload = {
"name": graph_name,
"description": graph_name,
"schema": yaml_content
"schema": yaml_content,
}
try:
response = requests.post(
f"{endpoint}/api/v1/graph/yaml",
headers={"Content-Type": "application/json"},
data=json.dumps(payload)
data=json.dumps(payload),
)
response.raise_for_status()
click.echo(f"Success: Server responded with {response.status_code} status")
Expand All @@ -163,7 +164,7 @@ def submit_pgql_config(ctx, config_path):

with open(config_path, "r") as file:
pgql_content = file.read()
match = re.search(r'CREATE PROPERTY GRAPH (\w+)', pgql_content)
match = re.search(r"CREATE PROPERTY GRAPH (\w+)", pgql_content)
if match:
graph_name = match.group(1)
else:
Expand All @@ -172,13 +173,13 @@ def submit_pgql_config(ctx, config_path):
payload = {
"name": graph_name,
"description": graph_name,
"schema": pgql_content
"schema": pgql_content,
}
try:
response = requests.post(
f"{endpoint}/api/v1/graph/pgql",
headers={"Content-Type": "application/json"},
data=json.dumps(payload)
data=json.dumps(payload),
)
response.raise_for_status()
click.echo(f"Success: Server responded with {response.status_code} status")
Expand Down Expand Up @@ -228,7 +229,8 @@ def change_graph_version_gie(ctx, graph_version):
return

response = requests.post(
f"{endpoint}/api/v1/graph/{GRAPH_ID}/version", data={"read_epoch": graph_version}
f"{endpoint}/api/v1/graph/{GRAPH_ID}/version",
data={"read_epoch": graph_version},
)
click.echo(f"Changed graph version to {graph_version}: {response.text}")

Expand Down
Loading

0 comments on commit 022727c

Please sign in to comment.