diff --git a/coordinator/flex/server/controllers/graph_controller.py b/coordinator/flex/server/controllers/graph_controller.py index 72c9ac8..0340e7a 100644 --- a/coordinator/flex/server/controllers/graph_controller.py +++ b/coordinator/flex/server/controllers/graph_controller.py @@ -3,33 +3,20 @@ from typing import Tuple from typing import Union -from flex.server.models.change_graph_version_request import ( - ChangeGraphVersionRequest, -) # noqa: E501 +from flex.server.models.change_graph_version_request import ChangeGraphVersionRequest # noqa: E501 from flex.server.models.create_edge_type import CreateEdgeType # noqa: E501 -from flex.server.models.create_graph_by_pgql_request import ( - CreateGraphByPgqlRequest, -) # noqa: E501 -from flex.server.models.create_graph_by_yaml_request import ( - CreateGraphByYamlRequest, -) # noqa: E501 +from flex.server.models.create_graph_by_pgql_request import CreateGraphByPgqlRequest # noqa: E501 +from flex.server.models.create_graph_by_yaml_request import CreateGraphByYamlRequest # noqa: E501 from flex.server.models.create_graph_request import CreateGraphRequest # noqa: E501 from flex.server.models.create_graph_response import CreateGraphResponse # noqa: E501 -from flex.server.models.create_graph_schema_request import ( - CreateGraphSchemaRequest, -) # noqa: E501 +from flex.server.models.create_graph_schema_request import CreateGraphSchemaRequest # noqa: E501 from flex.server.models.create_vertex_type import CreateVertexType # noqa: E501 from flex.server.models.error import Error # noqa: E501 from flex.server.models.get_graph_response import GetGraphResponse # noqa: E501 -from flex.server.models.get_graph_schema_response import ( - GetGraphSchemaResponse, -) # noqa: E501 -from flex.server.models.get_graph_version_response import ( - GetGraphVersionResponse, -) # noqa: E501 +from flex.server.models.get_graph_schema_response import GetGraphSchemaResponse # noqa: E501 +from flex.server.models.get_graph_version_response import GetGraphVersionResponse # noqa: E501 from flex.server import util - import etcd3 from urllib.parse import urlparse import json @@ -194,18 +181,16 @@ def create_edge_type(graph_id, create_edge_type=None): # noqa: E501 Create a edge type # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str - :param create_edge_type: + :param create_edge_type: :type create_edge_type: dict | bytes :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ if connexion.request.is_json: - create_edge_type = CreateEdgeType.from_dict( - connexion.request.get_json() - ) # noqa: E501 - return "do some magic!" + create_edge_type = CreateEdgeType.from_dict(connexion.request.get_json()) # noqa: E501 + return 'do some magic!' def create_graph(create_graph_request): # noqa: E501 @@ -213,7 +198,7 @@ def create_graph(create_graph_request): # noqa: E501 Create a new graph # noqa: E501 - :param create_graph_request: + :param create_graph_request: :type create_graph_request: dict | bytes :rtype: Union[CreateGraphResponse, Tuple[CreateGraphResponse, int], Tuple[CreateGraphResponse, int, Dict[str, str]] @@ -243,7 +228,7 @@ def create_graph_by_pgql(create_graph_by_pgql_request): # noqa: E501 Create a new graph by providing pgql # noqa: E501 - :param create_graph_by_pgql_request: + :param create_graph_by_pgql_request: :type create_graph_by_pgql_request: dict | bytes :rtype: Union[CreateGraphResponse, Tuple[CreateGraphResponse, int], Tuple[CreateGraphResponse, int, Dict[str, str]] @@ -272,7 +257,7 @@ def create_graph_by_yaml(create_graph_by_yaml_request): # noqa: E501 Create a new graph by providing yaml # noqa: E501 - :param create_graph_by_yaml_request: + :param create_graph_by_yaml_request: :type create_graph_by_yaml_request: dict | bytes :rtype: Union[CreateGraphResponse, Tuple[CreateGraphResponse, int], Tuple[CreateGraphResponse, int, Dict[str, str]] @@ -301,39 +286,35 @@ def create_vertex_type(graph_id, create_vertex_type): # noqa: E501 Create a vertex type # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str - :param create_vertex_type: + :param create_vertex_type: :type create_vertex_type: dict | bytes :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ if connexion.request.is_json: - create_vertex_type = CreateVertexType.from_dict( - connexion.request.get_json() - ) # noqa: E501 - return "do some magic!" + create_vertex_type = CreateVertexType.from_dict(connexion.request.get_json()) # noqa: E501 + return 'do some magic!' -def delete_edge_type_by_name( - graph_id, type_name, source_vertex_type, destination_vertex_type -): # noqa: E501 +def delete_edge_type_by_name(graph_id, type_name, source_vertex_type, destination_vertex_type): # noqa: E501 """delete_edge_type_by_name Delete edge type by name # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str - :param type_name: + :param type_name: :type type_name: str - :param source_vertex_type: + :param source_vertex_type: :type source_vertex_type: str - :param destination_vertex_type: + :param destination_vertex_type: :type destination_vertex_type: str :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return "do some magic!" + return 'do some magic!' def delete_graph_by_id(graph_id): # noqa: E501 @@ -341,12 +322,12 @@ def delete_graph_by_id(graph_id): # noqa: E501 Delete graph by ID # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return "do some magic!" + return 'do some magic!' def delete_vertex_type_by_name(graph_id, type_name): # noqa: E501 @@ -354,14 +335,14 @@ def delete_vertex_type_by_name(graph_id, type_name): # noqa: E501 Delete vertex type by name # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str - :param type_name: + :param type_name: :type type_name: str :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return "do some magic!" + return 'do some magic!' def get_graph_all_available_versions(graph_id): # noqa: E501 @@ -369,7 +350,7 @@ def get_graph_all_available_versions(graph_id): # noqa: E501 Get all available versions for a specific graph # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str :rtype: Union[List[GetGraphVersionResponse], Tuple[List[GetGraphVersionResponse], int], Tuple[List[GetGraphVersionResponse], int, Dict[str, str]] @@ -384,6 +365,8 @@ def get_graph_all_available_versions(graph_id): # noqa: E501 result_dict = {} result_dict["version_id"] = str(all_versions[idx][0]) result_dict["creation_time"] = str(all_versions[idx][1]) + result_dict["num_vertices"] = str(all_versions[idx][2]) + result_dict["num_edges"] = str(all_versions[idx][3]) result.append(GetGraphVersionResponse.from_dict(result_dict)) return (result, 200) @@ -393,7 +376,7 @@ def get_graph_by_id(graph_id): # noqa: E501 Get graph by ID # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str :rtype: Union[GetGraphResponse, Tuple[GetGraphResponse, int], Tuple[GetGraphResponse, int, Dict[str, str]] @@ -411,9 +394,9 @@ def get_graph_version_by_timestamp(graph_id, timestamp): # noqa: E501 Get the latest version by providing a timestamp # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str - :param timestamp: + :param timestamp: :type timestamp: str :rtype: Union[GetGraphVersionResponse, Tuple[GetGraphVersionResponse, int], Tuple[GetGraphVersionResponse, int, Dict[str, str]] @@ -433,7 +416,7 @@ def get_schema_by_id(graph_id): # noqa: E501 Get graph schema by ID # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str :rtype: Union[GetGraphSchemaResponse, Tuple[GetGraphSchemaResponse, int], Tuple[GetGraphSchemaResponse, int, Dict[str, str]] @@ -451,18 +434,16 @@ def import_schema_by_id(graph_id, create_graph_schema_request): # noqa: E501 Import graph schema # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str - :param create_graph_schema_request: + :param create_graph_schema_request: :type create_graph_schema_request: dict | bytes :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ if connexion.request.is_json: - create_graph_schema_request = CreateGraphSchemaRequest.from_dict( - connexion.request.get_json() - ) # noqa: E501 - return "do some magic!" + create_graph_schema_request = CreateGraphSchemaRequest.from_dict(connexion.request.get_json()) # noqa: E501 + return 'do some magic!' def list_graphs(): # noqa: E501 @@ -484,9 +465,9 @@ def read_graph_by_a_given_version(graph_id, change_graph_version_request): # no Read graph by a given version # noqa: E501 - :param graph_id: + :param graph_id: :type graph_id: str - :param change_graph_version_request: + :param change_graph_version_request: :type change_graph_version_request: dict | bytes :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] diff --git a/coordinator/flex/server/controllers/job_controller.py b/coordinator/flex/server/controllers/job_controller.py index 6b1f61e..092879a 100644 --- a/coordinator/flex/server/controllers/job_controller.py +++ b/coordinator/flex/server/controllers/job_controller.py @@ -10,6 +10,12 @@ from flex.server.models.job_status import JobStatus # noqa: E501 from flex.server import util +import requests +import os +import etcd3 +from urllib.parse import urlparse + +RUNNING = None def delete_job_by_id(job_id, delete_scheduler=None): # noqa: E501 """delete_job_by_id @@ -53,9 +59,32 @@ def get_job_by_id(job_id): # noqa: E501 :rtype: Union[JobStatus, Tuple[JobStatus, int], Tuple[JobStatus, int, Dict[str, str]] """ - return 'do some magic!' - - + global RUNNING + result_dict = {} + etcd_server = os.getenv("ETCD_SERVICE", "127.0.0.1:23790") + if not etcd_server.startswith(("http://", "https://")): + etcd_server = f"http://{etcd_server}" + parsed_url = urlparse(etcd_server) + etcd_host = parsed_url.netloc.split(":")[0] + etcd_port = parsed_url.port + etcd_client = etcd3.client(host=etcd_host, port=etcd_port) + etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_") + + debezium_status_key = f"{etcd_prefix}debezium_request_is_sent" + try: + debezium_status, _ = etcd_client.get(debezium_status_key) + if debezium_status == b"True": + if RUNNING is None: + RUNNING = "RUNNING" + result_dict["status"] = RUNNING + result_dict["id"] = "0" + result_dict["type"] = "dataloading" + except: + return (JobStatus.from_dict(result_dict), 200) + + return (JobStatus.from_dict(result_dict), 200) + + def list_jobs(): # noqa: E501 """list_jobs @@ -64,7 +93,68 @@ def list_jobs(): # noqa: E501 :rtype: Union[List[JobStatus], Tuple[List[JobStatus], int], Tuple[List[JobStatus], int, Dict[str, str]] """ - return 'do some magic!' + global RUNNING + result_dict = {} + etcd_server = os.getenv("ETCD_SERVICE", "127.0.0.1:23790") + if not etcd_server.startswith(("http://", "https://")): + etcd_server = f"http://{etcd_server}" + parsed_url = urlparse(etcd_server) + etcd_host = parsed_url.netloc.split(":")[0] + etcd_port = parsed_url.port + etcd_client = etcd3.client(host=etcd_host, port=etcd_port) + etcd_prefix = os.getenv("ETCD_PREFIX", "gart_meta_") + + debezium_status_key = f"{etcd_prefix}debezium_request_is_sent" + try: + debezium_status, _ = etcd_client.get(debezium_status_key) + if debezium_status == b"True": + if RUNNING is None: + RUNNING = "RUNNING" + result_dict["status"] = RUNNING + result_dict["id"] = "0" + result_dict["type"] = "dataloading" + except: + return ([JobStatus.from_dict(result_dict)], 200) + + return ([JobStatus.from_dict(result_dict)], 200) + + +def pause_job(job_id): # noqa: E501 + """pause_job + + Pause an existing job # noqa: E501 + + :param job_id: + :type job_id: str + + :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] + """ + gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080") + if not gart_controller_server.startswith(("http://", "https://")): + gart_controller_server = f"http://{gart_controller_server}" + response = requests.post(f"{gart_controller_server}/control/pause") + global RUNNING + RUNNING = "PAUSED" + return (response.text, response.status_code) + + +def resume_job(job_id): # noqa: E501 + """resume_job + + Resume an existing job # noqa: E501 + + :param job_id: + :type job_id: str + + :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] + """ + gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080") + if not gart_controller_server.startswith(("http://", "https://")): + gart_controller_server = f"http://{gart_controller_server}" + response = requests.post(f"{gart_controller_server}/control/resume") + global RUNNING + RUNNING = "RUNNING" + return (response.text, response.status_code) def submit_dataloading_job(graph_id, dataloading_job_config): # noqa: E501 diff --git a/coordinator/flex/server/controllers/service_controller.py b/coordinator/flex/server/controllers/service_controller.py index deef9d0..8173b7d 100644 --- a/coordinator/flex/server/controllers/service_controller.py +++ b/coordinator/flex/server/controllers/service_controller.py @@ -8,8 +8,6 @@ from flex.server.models.start_service_request import StartServiceRequest # noqa: E501 from flex.server import util -import requests -import os def get_service_status_by_id(graph_id): # noqa: E501 """get_service_status_by_id @@ -43,11 +41,7 @@ def pause_data_loading(): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080") - if not gart_controller_server.startswith(("http://", "https://")): - gart_controller_server = f"http://{gart_controller_server}" - response = requests.post(f"{gart_controller_server}/control/pause") - return (response.text, response.status_code) + return 'do some magic!' def restart_service(): # noqa: E501 @@ -69,11 +63,7 @@ def resume_data_loading(): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - gart_controller_server = os.getenv("GART_CONTROLLER_SERVER", "127.0.0.1:8080") - if not gart_controller_server.startswith(("http://", "https://")): - gart_controller_server = f"http://{gart_controller_server}" - response = requests.post(f"{gart_controller_server}/control/resume") - return (response.text, response.status_code) + return 'do some magic!' def start_service(start_service_request=None): # noqa: E501 diff --git a/coordinator/flex/server/models/get_graph_version_response.py b/coordinator/flex/server/models/get_graph_version_response.py index b7be4fc..fae6c43 100644 --- a/coordinator/flex/server/models/get_graph_version_response.py +++ b/coordinator/flex/server/models/get_graph_version_response.py @@ -12,26 +12,36 @@ class GetGraphVersionResponse(Model): Do not edit the class manually. """ - def __init__(self, creation_time=None, version_id=None): # noqa: E501 + def __init__(self, creation_time=None, version_id=None, num_vertices=None, num_edges=None): # noqa: E501 """GetGraphVersionResponse - a model defined in OpenAPI :param creation_time: The creation_time of this GetGraphVersionResponse. # noqa: E501 :type creation_time: str :param version_id: The version_id of this GetGraphVersionResponse. # noqa: E501 :type version_id: str + :param num_vertices: The num_vertices of this GetGraphVersionResponse. # noqa: E501 + :type num_vertices: str + :param num_edges: The num_edges of this GetGraphVersionResponse. # noqa: E501 + :type num_edges: str """ self.openapi_types = { 'creation_time': str, - 'version_id': str + 'version_id': str, + 'num_vertices': str, + 'num_edges': str } self.attribute_map = { 'creation_time': 'creation_time', - 'version_id': 'version_id' + 'version_id': 'version_id', + 'num_vertices': 'num_vertices', + 'num_edges': 'num_edges' } self._creation_time = creation_time self._version_id = version_id + self._num_vertices = num_vertices + self._num_edges = num_edges @classmethod def from_dict(cls, dikt) -> 'GetGraphVersionResponse': @@ -89,3 +99,49 @@ def version_id(self, version_id: str): raise ValueError("Invalid value for `version_id`, must not be `None`") # noqa: E501 self._version_id = version_id + + @property + def num_vertices(self) -> str: + """Gets the num_vertices of this GetGraphVersionResponse. + + + :return: The num_vertices of this GetGraphVersionResponse. + :rtype: str + """ + return self._num_vertices + + @num_vertices.setter + def num_vertices(self, num_vertices: str): + """Sets the num_vertices of this GetGraphVersionResponse. + + + :param num_vertices: The num_vertices of this GetGraphVersionResponse. + :type num_vertices: str + """ + if num_vertices is None: + raise ValueError("Invalid value for `num_vertices`, must not be `None`") # noqa: E501 + + self._num_vertices = num_vertices + + @property + def num_edges(self) -> str: + """Gets the num_edges of this GetGraphVersionResponse. + + + :return: The num_edges of this GetGraphVersionResponse. + :rtype: str + """ + return self._num_edges + + @num_edges.setter + def num_edges(self, num_edges: str): + """Sets the num_edges of this GetGraphVersionResponse. + + + :param num_edges: The num_edges of this GetGraphVersionResponse. + :type num_edges: str + """ + if num_edges is None: + raise ValueError("Invalid value for `num_edges`, must not be `None`") # noqa: E501 + + self._num_edges = num_edges diff --git a/coordinator/flex/server/models/job_status.py b/coordinator/flex/server/models/job_status.py index 5119551..129b396 100644 --- a/coordinator/flex/server/models/job_status.py +++ b/coordinator/flex/server/models/job_status.py @@ -133,7 +133,7 @@ def status(self, status: str): :param status: The status of this JobStatus. :type status: str """ - allowed_values = ["RUNNING", "SUCCESS", "FAILED", "CANCELLED", "WAITING"] # noqa: E501 + allowed_values = ["RUNNING", "SUCCESS", "FAILED", "CANCELLED", "WAITING","PAUSED"] # noqa: E501 if status not in allowed_values: raise ValueError( "Invalid value for `status` ({0}), must be one of {1}" diff --git a/coordinator/flex/server/openapi/openapi.yaml b/coordinator/flex/server/openapi/openapi.yaml index 414dc68..1f64dee 100644 --- a/coordinator/flex/server/openapi/openapi.yaml +++ b/coordinator/flex/server/openapi/openapi.yaml @@ -1834,7 +1834,7 @@ paths: tags: - Stored Procedure x-openapi-router-controller: flex.server.controllers.stored_procedure_controller - /api/v1/graph/{graph_id}/versions: + /api/v1/graph/{graph_id}/version: get: description: Get all available versions for a specific graph operationId: get_graph_all_available_versions @@ -1897,7 +1897,7 @@ paths: tags: - Graph x-openapi-router-controller: flex.server.controllers.graph_controller - /api/v1/graph/{graph_id}/versions/timestamp: + /api/v1/graph/{graph_id}/version/{timestamp}: get: description: Get the latest version by providing a timestamp operationId: get_graph_version_by_timestamp @@ -1909,14 +1909,14 @@ paths: schema: type: string style: simple - - explode: true - in: query + - explode: false + in: path name: timestamp required: true schema: description: The timestamp of the graph to read. Provide as a query parameter. type: string - style: form + style: simple responses: "200": content: @@ -2052,6 +2052,74 @@ paths: tags: - Job x-openapi-router-controller: flex.server.controllers.job_controller + /api/v1/job/{job_id}/pause: + post: + description: Pause an existing job + operationId: pause_job + parameters: + - explode: false + in: path + name: job_id + required: true + schema: + type: string + style: simple + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/APIResponse' + description: Successfully paused the job + "400": + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + description: Bad request + "500": + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + description: Server error + tags: + - Job + x-openapi-router-controller: flex.server.controllers.job_controller + /api/v1/job/{job_id}/resume: + post: + description: Resume an existing job + operationId: resume_job + parameters: + - explode: false + in: path + name: job_id + required: true + schema: + type: string + style: simple + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/APIResponse' + description: Successfully resumed a job + "400": + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + description: Bad request + "500": + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + description: Server error + tags: + - Job + x-openapi-router-controller: flex.server.controllers.job_controller /api/v1/service: get: description: List all service status @@ -3642,7 +3710,9 @@ components: GetGraphVersionResponse: example: creation_time: creation_time + num_vertices: num_vertices version_id: version_id + num_edges: num_edges properties: creation_time: title: creation_time @@ -3650,8 +3720,16 @@ components: version_id: title: version_id type: string + num_vertices: + title: num_vertices + type: string + num_edges: + title: num_edges + type: string required: - creation_time + - num_edges + - num_vertices - version_id title: GetGraphVersionResponse ChangeGraphVersionRequest: @@ -3982,6 +4060,7 @@ components: - FAILED - CANCELLED - WAITING + - PAUSED title: status type: string start_time: diff --git a/coordinator/flex/server/test/test_graph_controller.py b/coordinator/flex/server/test/test_graph_controller.py index 269c821..0eedb73 100644 --- a/coordinator/flex/server/test/test_graph_controller.py +++ b/coordinator/flex/server/test/test_graph_controller.py @@ -172,7 +172,7 @@ def test_get_graph_all_available_versions(self): 'Accept': 'application/json', } response = self.client.open( - '/api/v1/graph/{graph_id}/versions'.format(graph_id='graph_id_example'), + '/api/v1/graph/{graph_id}/version'.format(graph_id='graph_id_example'), method='GET', headers=headers) self.assert200(response, @@ -198,15 +198,13 @@ def test_get_graph_version_by_timestamp(self): """ - query_string = [('timestamp', 'timestamp_example')] headers = { 'Accept': 'application/json', } response = self.client.open( - '/api/v1/graph/{graph_id}/versions/timestamp'.format(graph_id='graph_id_example'), + '/api/v1/graph/{graph_id}/version/{timestamp}'.format(graph_id='graph_id_example', timestamp='timestamp_example'), method='GET', - headers=headers, - query_string=query_string) + headers=headers) self.assert200(response, 'Response body is : ' + response.data.decode('utf-8')) @@ -270,7 +268,7 @@ def test_read_graph_by_a_given_version(self): 'Content-Type': 'application/json', } response = self.client.open( - '/api/v1/graph/{graph_id}/versions'.format(graph_id='graph_id_example'), + '/api/v1/graph/{graph_id}/version'.format(graph_id='graph_id_example'), method='POST', headers=headers, data=json.dumps(change_graph_version_request), diff --git a/coordinator/flex/server/test/test_job_controller.py b/coordinator/flex/server/test/test_job_controller.py index c9854a1..3f31a38 100644 --- a/coordinator/flex/server/test/test_job_controller.py +++ b/coordinator/flex/server/test/test_job_controller.py @@ -79,6 +79,36 @@ def test_list_jobs(self): self.assert200(response, 'Response body is : ' + response.data.decode('utf-8')) + def test_pause_job(self): + """Test case for pause_job + + + """ + headers = { + 'Accept': 'application/json', + } + response = self.client.open( + '/api/v1/job/{job_id}/pause'.format(job_id='job_id_example'), + method='POST', + headers=headers) + self.assert200(response, + 'Response body is : ' + response.data.decode('utf-8')) + + def test_resume_job(self): + """Test case for resume_job + + + """ + headers = { + 'Accept': 'application/json', + } + response = self.client.open( + '/api/v1/job/{job_id}/resume'.format(job_id='job_id_example'), + method='POST', + headers=headers) + self.assert200(response, + 'Response body is : ' + response.data.decode('utf-8')) + def test_submit_dataloading_job(self): """Test case for submit_dataloading_job diff --git a/coordinator/requirements.txt b/coordinator/requirements.txt index 3466d7d..b07e1b9 100644 --- a/coordinator/requirements.txt +++ b/coordinator/requirements.txt @@ -11,4 +11,4 @@ swagger-ui-bundle >= 0.0.2 python_dateutil >= 2.6.0 setuptools >= 21.0.0 Flask == 2.1.1 -etcd3 +etcd3 \ No newline at end of file diff --git a/scripts/controller.py b/scripts/controller.py index 42bd350..1a735df 100755 --- a/scripts/controller.py +++ b/scripts/controller.py @@ -155,13 +155,13 @@ def get_read_epoch_by_timestamp(): unix_time = int(dt.timestamp()) epoch_unix_time_pairs = get_all_available_read_epochs_internal()[1] # iterate through the list of epoch_unix_time pairs from end to start - for epoch, unix_time_epoch in reversed(epoch_unix_time_pairs): + for epoch, unix_time_epoch, num_vertices, num_edges in reversed(epoch_unix_time_pairs): if unix_time_epoch <= unix_time: converted_time = datetime.fromtimestamp(unix_time_epoch) # convert time into local time zone converted_time = converted_time.replace(tzinfo=timezone.utc).astimezone(tz=None) formatted_time = converted_time.strftime("%Y-%m-%d %H:%M:%S") - result = {"version_id": str(epoch), "creation_time": formatted_time} + result = {"version_id": str(epoch), "creation_time": formatted_time, "num_vertices": num_vertices, "num_edges": num_edges} return json.dumps(result), 200 return "No read epoch found", 200 @@ -371,6 +371,8 @@ def get_all_available_read_epochs_internal(): available_epochs_internal = [] for epoch in range(latest_read_epoch + 1): latest_timestamp = None + num_vertices = 0 + num_edges = 0 for frag_id in range(int(num_fragment)): schema_key = etcd_prefix + "gart_blob_m0" + f"_p{frag_id}" + f"_e{epoch}" schema_str, _ = etcd_client.get(schema_key) @@ -382,8 +384,8 @@ def get_all_available_read_epochs_internal(): # convert time into local time zone converted_time = converted_time.replace(tzinfo=timezone.utc).astimezone(tz=None) formatted_time = converted_time.strftime("%Y-%m-%d %H:%M:%S") - available_epochs.append([epoch, formatted_time]) - available_epochs_internal.append([epoch, latest_timestamp]) + available_epochs.append([epoch, formatted_time, num_vertices, num_edges]) + available_epochs_internal.append([epoch, latest_timestamp, num_vertices, num_edges]) return [available_epochs, available_epochs_internal] diff --git a/scripts/gart_cli.py b/scripts/gart_cli.py index 3cbab91..42572c3 100755 --- a/scripts/gart_cli.py +++ b/scripts/gart_cli.py @@ -71,7 +71,7 @@ def resume_data_loading(ctx): click.echo('Please connect to an endpoint first using the "connect" command.') return - response = requests.post(f"{endpoint}/api/v1/service/resume") + response = requests.post(f"{endpoint}/api/v1/job/0/resume") click.echo(f"Resumed data loading: {response.text}") @@ -84,7 +84,7 @@ def pause_data_loading(ctx): click.echo('Please connect to an endpoint first using the "connect" command.') return - response = requests.post(f"{endpoint}/api/v1/service/pause") + response = requests.post(f"{endpoint}/api/v1/job/0/pause") click.echo(f"Paused data loading: {response.text}") @@ -97,7 +97,7 @@ def get_all_available_versions(ctx): click.echo('Please connect to an endpoint first using the "connect" command.') return - response = requests.get(f"{endpoint}/api/v1/graph/{GRAPH_ID}/versions") + response = requests.get(f"{endpoint}/api/v1/graph/{GRAPH_ID}/version") click.echo(f"Available versions: {response.text}") @@ -110,8 +110,9 @@ def get_version_by_timestamp(ctx, timestamp): if not endpoint: click.echo('Please connect to an endpoint first using the "connect" command.') return + format_timestamp = timestamp.replace(" ", "%20") response = requests.get( - f"{endpoint}/api/v1/graph/{GRAPH_ID}/versions/timestamp?timestamp={timestamp}") + f"{endpoint}/api/v1/graph/{GRAPH_ID}/version/{format_timestamp}") click.echo(f"Version at {timestamp}: {response.text}") @@ -227,7 +228,7 @@ def change_graph_version_gie(ctx, graph_version): return response = requests.post( - f"{endpoint}/api/v1/graph/{GRAPH_ID}/versions", data={"read_epoch": graph_version} + f"{endpoint}/api/v1/graph/{GRAPH_ID}/version", data={"read_epoch": graph_version} ) click.echo(f"Changed graph version to {graph_version}: {response.text}")