Skip to content

Commit

Permalink
refine coordinator by new APIs
Browse files Browse the repository at this point in the history
Signed-off-by: Lei Wang <[email protected]>
  • Loading branch information
doudoubobo committed Sep 24, 2024
1 parent 6544c9f commit 94a1033
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 101 deletions.
101 changes: 41 additions & 60 deletions coordinator/flex/server/controllers/graph_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,20 @@
from typing import Tuple
from typing import Union

from flex.server.models.change_graph_version_request import (
ChangeGraphVersionRequest,
) # noqa: E501
from flex.server.models.change_graph_version_request import ChangeGraphVersionRequest # noqa: E501
from flex.server.models.create_edge_type import CreateEdgeType # noqa: E501
from flex.server.models.create_graph_by_pgql_request import (
CreateGraphByPgqlRequest,
) # noqa: E501
from flex.server.models.create_graph_by_yaml_request import (
CreateGraphByYamlRequest,
) # noqa: E501
from flex.server.models.create_graph_by_pgql_request import CreateGraphByPgqlRequest # noqa: E501
from flex.server.models.create_graph_by_yaml_request import CreateGraphByYamlRequest # noqa: E501
from flex.server.models.create_graph_request import CreateGraphRequest # noqa: E501
from flex.server.models.create_graph_response import CreateGraphResponse # noqa: E501
from flex.server.models.create_graph_schema_request import (
CreateGraphSchemaRequest,
) # noqa: E501
from flex.server.models.create_graph_schema_request import CreateGraphSchemaRequest # noqa: E501
from flex.server.models.create_vertex_type import CreateVertexType # noqa: E501
from flex.server.models.error import Error # noqa: E501
from flex.server.models.get_graph_response import GetGraphResponse # noqa: E501
from flex.server.models.get_graph_schema_response import (
GetGraphSchemaResponse,
) # noqa: E501
from flex.server.models.get_graph_version_response import (
GetGraphVersionResponse,
) # noqa: E501
from flex.server.models.get_graph_schema_response import GetGraphSchemaResponse # noqa: E501
from flex.server.models.get_graph_version_response import GetGraphVersionResponse # noqa: E501
from flex.server import util


import etcd3
from urllib.parse import urlparse
import json
Expand Down Expand Up @@ -194,26 +181,24 @@ def create_edge_type(graph_id, create_edge_type=None): # noqa: E501
Create a edge type # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:param create_edge_type:
:param create_edge_type:
:type create_edge_type: dict | bytes
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if connexion.request.is_json:
create_edge_type = CreateEdgeType.from_dict(
connexion.request.get_json()
) # noqa: E501
return "do some magic!"
create_edge_type = CreateEdgeType.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'


def create_graph(create_graph_request): # noqa: E501
"""create_graph
Create a new graph # noqa: E501
:param create_graph_request:
:param create_graph_request:
:type create_graph_request: dict | bytes
:rtype: Union[CreateGraphResponse, Tuple[CreateGraphResponse, int], Tuple[CreateGraphResponse, int, Dict[str, str]]
Expand Down Expand Up @@ -243,7 +228,7 @@ def create_graph_by_pgql(create_graph_by_pgql_request): # noqa: E501
Create a new graph by providing pgql # noqa: E501
:param create_graph_by_pgql_request:
:param create_graph_by_pgql_request:
:type create_graph_by_pgql_request: dict | bytes
:rtype: Union[CreateGraphResponse, Tuple[CreateGraphResponse, int], Tuple[CreateGraphResponse, int, Dict[str, str]]
Expand Down Expand Up @@ -272,7 +257,7 @@ def create_graph_by_yaml(create_graph_by_yaml_request): # noqa: E501
Create a new graph by providing yaml # noqa: E501
:param create_graph_by_yaml_request:
:param create_graph_by_yaml_request:
:type create_graph_by_yaml_request: dict | bytes
:rtype: Union[CreateGraphResponse, Tuple[CreateGraphResponse, int], Tuple[CreateGraphResponse, int, Dict[str, str]]
Expand Down Expand Up @@ -301,75 +286,71 @@ def create_vertex_type(graph_id, create_vertex_type): # noqa: E501
Create a vertex type # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:param create_vertex_type:
:param create_vertex_type:
:type create_vertex_type: dict | bytes
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if connexion.request.is_json:
create_vertex_type = CreateVertexType.from_dict(
connexion.request.get_json()
) # noqa: E501
return "do some magic!"
create_vertex_type = CreateVertexType.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'


def delete_edge_type_by_name(
graph_id, type_name, source_vertex_type, destination_vertex_type
): # noqa: E501
def delete_edge_type_by_name(graph_id, type_name, source_vertex_type, destination_vertex_type): # noqa: E501
"""delete_edge_type_by_name
Delete edge type by name # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:param type_name:
:param type_name:
:type type_name: str
:param source_vertex_type:
:param source_vertex_type:
:type source_vertex_type: str
:param destination_vertex_type:
:param destination_vertex_type:
:type destination_vertex_type: str
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return "do some magic!"
return 'do some magic!'


def delete_graph_by_id(graph_id): # noqa: E501
"""delete_graph_by_id
Delete graph by ID # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return "do some magic!"
return 'do some magic!'


def delete_vertex_type_by_name(graph_id, type_name): # noqa: E501
"""delete_vertex_type_by_name
Delete vertex type by name # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:param type_name:
:param type_name:
:type type_name: str
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return "do some magic!"
return 'do some magic!'


def get_graph_all_available_versions(graph_id): # noqa: E501
"""get_graph_all_available_versions
Get all available versions for a specific graph # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:rtype: Union[List[GetGraphVersionResponse], Tuple[List[GetGraphVersionResponse], int], Tuple[List[GetGraphVersionResponse], int, Dict[str, str]]
Expand All @@ -384,6 +365,8 @@ def get_graph_all_available_versions(graph_id): # noqa: E501
result_dict = {}
result_dict["version_id"] = str(all_versions[idx][0])
result_dict["creation_time"] = str(all_versions[idx][1])
result_dict["num_vertices"] = str(all_versions[idx][2])
result_dict["num_edges"] = str(all_versions[idx][3])
result.append(GetGraphVersionResponse.from_dict(result_dict))
return (result, 200)

Expand All @@ -393,7 +376,7 @@ def get_graph_by_id(graph_id): # noqa: E501
Get graph by ID # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:rtype: Union[GetGraphResponse, Tuple[GetGraphResponse, int], Tuple[GetGraphResponse, int, Dict[str, str]]
Expand All @@ -411,9 +394,9 @@ def get_graph_version_by_timestamp(graph_id, timestamp): # noqa: E501
Get the latest version by providing a timestamp # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:param timestamp:
:param timestamp:
:type timestamp: str
:rtype: Union[GetGraphVersionResponse, Tuple[GetGraphVersionResponse, int], Tuple[GetGraphVersionResponse, int, Dict[str, str]]
Expand All @@ -433,7 +416,7 @@ def get_schema_by_id(graph_id): # noqa: E501
Get graph schema by ID # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:rtype: Union[GetGraphSchemaResponse, Tuple[GetGraphSchemaResponse, int], Tuple[GetGraphSchemaResponse, int, Dict[str, str]]
Expand All @@ -451,18 +434,16 @@ def import_schema_by_id(graph_id, create_graph_schema_request): # noqa: E501
Import graph schema # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:param create_graph_schema_request:
:param create_graph_schema_request:
:type create_graph_schema_request: dict | bytes
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if connexion.request.is_json:
create_graph_schema_request = CreateGraphSchemaRequest.from_dict(
connexion.request.get_json()
) # noqa: E501
return "do some magic!"
create_graph_schema_request = CreateGraphSchemaRequest.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'


def list_graphs(): # noqa: E501
Expand All @@ -484,9 +465,9 @@ def read_graph_by_a_given_version(graph_id, change_graph_version_request): # no
Read graph by a given version # noqa: E501
:param graph_id:
:param graph_id:
:type graph_id: str
:param change_graph_version_request:
:param change_graph_version_request:
:type change_graph_version_request: dict | bytes
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
Expand Down
98 changes: 94 additions & 4 deletions coordinator/flex/server/controllers/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
from flex.server.models.job_status import JobStatus # noqa: E501
from flex.server import util

import requests
import os
import etcd3
from urllib.parse import urlparse

RUNNING = None

def delete_job_by_id(job_id, delete_scheduler=None): # noqa: E501
"""delete_job_by_id
Expand Down Expand Up @@ -53,9 +59,32 @@ def get_job_by_id(job_id): # noqa: E501
:rtype: Union[JobStatus, Tuple[JobStatus, int], Tuple[JobStatus, int, Dict[str, str]]
"""
return 'do some magic!'


global RUNNING
result_dict = {}
etcd_server = os.getenv("ETCD_SERVICE", "127.0.0.1:23790")
if not etcd_server.startswith(("http://", "https://")):
etcd_server = f"http://{etcd_server}"
parsed_url = urlparse(etcd_server)
etcd_host = parsed_url.netloc.split(":")[0]
etcd_port = parsed_url.port
etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_")

debezium_status_key = f"{etcd_prefix}debezium_request_is_sent"
try:
debezium_status, _ = etcd_client.get(debezium_status_key)
if debezium_status == b"True":
if RUNNING is None:
RUNNING = "RUNNING"
result_dict["status"] = RUNNING
result_dict["id"] = "0"
result_dict["type"] = "dataloading"
except:
return (JobStatus.from_dict(result_dict), 200)

return (JobStatus.from_dict(result_dict), 200)


def list_jobs(): # noqa: E501
"""list_jobs
Expand All @@ -64,7 +93,68 @@ def list_jobs(): # noqa: E501
:rtype: Union[List[JobStatus], Tuple[List[JobStatus], int], Tuple[List[JobStatus], int, Dict[str, str]]
"""
return 'do some magic!'
global RUNNING
result_dict = {}
etcd_server = os.getenv("ETCD_SERVICE", "127.0.0.1:23790")
if not etcd_server.startswith(("http://", "https://")):
etcd_server = f"http://{etcd_server}"
parsed_url = urlparse(etcd_server)
etcd_host = parsed_url.netloc.split(":")[0]
etcd_port = parsed_url.port
etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_")

debezium_status_key = f"{etcd_prefix}debezium_request_is_sent"
try:
debezium_status, _ = etcd_client.get(debezium_status_key)
if debezium_status == b"True":
if RUNNING is None:
RUNNING = "RUNNING"
result_dict["status"] = RUNNING
result_dict["id"] = "0"
result_dict["type"] = "dataloading"
except:
return ([JobStatus.from_dict(result_dict)], 200)

return ([JobStatus.from_dict(result_dict)], 200)


def pause_job(job_id): # noqa: E501
"""pause_job
Pause an existing job # noqa: E501
:param job_id:
:type job_id: str
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"
response = requests.post(f"{gart_controller_server}/control/pause")
global RUNNING
RUNNING = "PAUSED"
return (response.text, response.status_code)


def resume_job(job_id): # noqa: E501
"""resume_job
Resume an existing job # noqa: E501
:param job_id:
:type job_id: str
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080")
if not gart_controller_server.startswith(("http://", "https://")):
gart_controller_server = f"http://{gart_controller_server}"
response = requests.post(f"{gart_controller_server}/control/resume")
global RUNNING
RUNNING = "RUNNING"
return (response.text, response.status_code)


def submit_dataloading_job(graph_id, dataloading_job_config): # noqa: E501
Expand Down
Loading

0 comments on commit 94a1033

Please sign in to comment.