Skip to content

Commit

Permalink
fix(interactive): Return the correct port in coordinator's API (#4312)
Browse files Browse the repository at this point in the history
Maintain the port mapping info in coordinator, if deployed with docker.
  • Loading branch information
zhanglei1949 authored Nov 6, 2024
1 parent 25fa9e9 commit 2cd1144
Show file tree
Hide file tree
Showing 17 changed files with 136 additions and 62 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/flex-interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
# install gsctl
python3 -m pip install ${GITHUB_WORKSPACE}/python/dist/*.whl
# launch service: 8080 for coordinator http port; 7687 for cypher port;
gsctl instance deploy --type interactive --image-registry graphscope --image-tag latest --interactive-config ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
gsctl instance deploy --type interactive --image-registry graphscope --image-tag latest --cypher-port 7688 --interactive-config ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
sleep 20
# test
python3 -m pip install --no-cache-dir pytest pytest-cov pytest-timeout pytest-xdist
Expand All @@ -66,6 +66,10 @@ jobs:
--exitfirst \
$(dirname $(python3 -c "import graphscope.gsctl as gsctl; print(gsctl.__file__)"))/tests/test_interactive.py
# test coordinator
res=`curl http://127.0.0.1:8080/api/v1/service`
echo $res | grep 7688 || exit 1
# destroy instance
gsctl instance destroy --type interactive -y
Expand Down
2 changes: 2 additions & 0 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from graphscope.config import Config
from graphscope.proto import coordinator_service_pb2_grpc

from gscoordinator.flex.core.client_wrapper import initialize_client_wrapper
from gscoordinator.flex.encoder import JSONEncoder
from gscoordinator.monitor import Monitor
from gscoordinator.servicer import init_graphscope_one_service_servicer
Expand Down Expand Up @@ -125,6 +126,7 @@ def get_servicer(config: Config):


def start_http_service(config):
initialize_client_wrapper(config)
app = connexion.App(__name__, specification_dir="./flex/openapi/")
app.app.json_encoder = JSONEncoder
app.add_api(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from gscoordinator.flex.models.schema_mapping import SchemaMapping # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -26,7 +26,7 @@ def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501
"""
if connexion.request.is_json:
schema_mapping = SchemaMapping.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.bind_datasource_in_batch(graph_id, schema_mapping)
return get_client_wrapper().bind_datasource_in_batch(graph_id, schema_mapping)


@handle_api_exception()
Expand All @@ -40,7 +40,7 @@ def get_datasource_by_id(graph_id): # noqa: E501
:rtype: Union[SchemaMapping, Tuple[SchemaMapping, int], Tuple[SchemaMapping, int, Dict[str, str]]
"""
return client_wrapper.get_datasource_by_id(graph_id)
return get_client_wrapper().get_datasource_by_id(graph_id)


@handle_api_exception()
Expand All @@ -60,7 +60,7 @@ def unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type)
return get_client_wrapper().unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type)


@handle_api_exception()
Expand All @@ -76,4 +76,4 @@ def unbind_vertex_datasource(graph_id, type_name): # noqa: E501
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.unbind_vertex_datasource(graph_id, type_name)
return get_client_wrapper().unbind_vertex_datasource(graph_id, type_name)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from gscoordinator.flex.models.running_deployment_status import RunningDeploymentStatus # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -24,7 +24,7 @@ def get_deployment_info(): # noqa: E501
:rtype: Union[RunningDeploymentInfo, Tuple[RunningDeploymentInfo, int], Tuple[RunningDeploymentInfo, int, Dict[str, str]]
"""
return client_wrapper.get_deployment_info()
return get_client_wrapper().get_deployment_info()


@handle_api_exception()
Expand All @@ -42,7 +42,7 @@ def get_deployment_pod_log(pod_name, component, from_cache): # noqa: E501
:rtype: Union[GetPodLogResponse, Tuple[GetPodLogResponse, int], Tuple[GetPodLogResponse, int, Dict[str, str]]
"""
return client_wrapper.get_deployment_pod_log(pod_name, component, from_cache)
return get_client_wrapper().get_deployment_pod_log(pod_name, component, from_cache)


@handle_api_exception()
Expand All @@ -54,7 +54,7 @@ def get_deployment_resource_usage(): # noqa: E501
:rtype: Union[GetResourceUsageResponse, Tuple[GetResourceUsageResponse, int], Tuple[GetResourceUsageResponse, int, Dict[str, str]]
"""
return client_wrapper.get_deployment_resource_usage()
return get_client_wrapper().get_deployment_resource_usage()


@handle_api_exception()
Expand All @@ -66,7 +66,7 @@ def get_deployment_status(): # noqa: E501
:rtype: Union[RunningDeploymentStatus, Tuple[RunningDeploymentStatus, int], Tuple[RunningDeploymentStatus, int, Dict[str, str]]
"""
return client_wrapper.get_deployment_status()
return get_client_wrapper().get_deployment_status()


@handle_api_exception()
Expand All @@ -78,4 +78,4 @@ def get_storage_usage(): # noqa: E501
:rtype: Union[GetStorageUsageResponse, Tuple[GetStorageUsageResponse, int], Tuple[GetStorageUsageResponse, int, Dict[str, str]]
"""
return client_wrapper.get_storage_usage()
return get_client_wrapper().get_storage_usage()
22 changes: 11 additions & 11 deletions coordinator/gscoordinator/flex/controllers/graph_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from gscoordinator.flex.models.get_graph_schema_response import GetGraphSchemaResponse # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -32,7 +32,7 @@ def create_edge_type(graph_id, create_edge_type=None): # noqa: E501
"""
if connexion.request.is_json:
create_edge_type = CreateEdgeType.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.create_edge_type(graph_id, create_edge_type)
return get_client_wrapper().create_edge_type(graph_id, create_edge_type)


@handle_api_exception()
Expand All @@ -48,7 +48,7 @@ def create_graph(create_graph_request): # noqa: E501
"""
if connexion.request.is_json:
create_graph_request = CreateGraphRequest.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.create_graph(create_graph_request)
return get_client_wrapper().create_graph(create_graph_request)


@handle_api_exception()
Expand All @@ -66,7 +66,7 @@ def create_vertex_type(graph_id, create_vertex_type): # noqa: E501
"""
if connexion.request.is_json:
create_vertex_type = CreateVertexType.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.create_vertex_type(graph_id, create_vertex_type)
return get_client_wrapper().create_vertex_type(graph_id, create_vertex_type)


@handle_api_exception()
Expand All @@ -86,7 +86,7 @@ def delete_edge_type_by_name(graph_id, type_name, source_vertex_type, destinatio
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_edge_type_by_name(
return get_client_wrapper().delete_edge_type_by_name(
graph_id, type_name, source_vertex_type, destination_vertex_type
)

Expand All @@ -102,7 +102,7 @@ def delete_graph_by_id(graph_id): # noqa: E501
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_graph_by_id(graph_id)
return get_client_wrapper().delete_graph_by_id(graph_id)


@handle_api_exception()
Expand All @@ -118,7 +118,7 @@ def delete_vertex_type_by_name(graph_id, type_name): # noqa: E501
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_vertex_type_by_name(graph_id, type_name)
return get_client_wrapper().delete_vertex_type_by_name(graph_id, type_name)


@handle_api_exception()
Expand All @@ -132,7 +132,7 @@ def get_graph_by_id(graph_id): # noqa: E501
:rtype: Union[GetGraphResponse, Tuple[GetGraphResponse, int], Tuple[GetGraphResponse, int, Dict[str, str]]
"""
return client_wrapper.get_graph_by_id(graph_id)
return get_client_wrapper().get_graph_by_id(graph_id)


@handle_api_exception()
Expand All @@ -146,7 +146,7 @@ def get_schema_by_id(graph_id): # noqa: E501
:rtype: Union[GetGraphSchemaResponse, Tuple[GetGraphSchemaResponse, int], Tuple[GetGraphSchemaResponse, int, Dict[str, str]]
"""
return client_wrapper.get_schema_by_id(graph_id)
return get_client_wrapper().get_schema_by_id(graph_id)


@handle_api_exception()
Expand All @@ -164,7 +164,7 @@ def import_schema_by_id(graph_id, create_graph_schema_request): # noqa: E501
"""
if connexion.request.is_json:
create_graph_schema_request = CreateGraphSchemaRequest.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.import_schema(graph_id, create_graph_schema_request)
return get_client_wrapper().import_schema(graph_id, create_graph_schema_request)


@handle_api_exception()
Expand All @@ -176,4 +176,4 @@ def list_graphs(): # noqa: E501
:rtype: Union[List[GetGraphResponse], Tuple[List[GetGraphResponse], int], Tuple[List[GetGraphResponse], int, Dict[str, str]]
"""
return client_wrapper.list_graphs()
return get_client_wrapper().list_graphs()
12 changes: 6 additions & 6 deletions coordinator/gscoordinator/flex/controllers/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from gscoordinator.flex.models.job_status import JobStatus # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -27,7 +27,7 @@ def delete_job_by_id(job_id, delete_scheduler=None): # noqa: E501
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_job_by_id(job_id, delete_scheduler)
return get_client_wrapper().delete_job_by_id(job_id, delete_scheduler)


@handle_api_exception()
Expand All @@ -45,7 +45,7 @@ def get_dataloading_job_config(graph_id, dataloading_job_config): # noqa: E501
"""
if connexion.request.is_json:
dataloading_job_config = DataloadingJobConfig.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.get_dataloading_job_config(graph_id, dataloading_job_config)
return get_client_wrapper().get_dataloading_job_config(graph_id, dataloading_job_config)


@handle_api_exception()
Expand All @@ -59,7 +59,7 @@ def get_job_by_id(job_id): # noqa: E501
:rtype: Union[JobStatus, Tuple[JobStatus, int], Tuple[JobStatus, int, Dict[str, str]]
"""
return client_wrapper.get_job_by_id(job_id)
return get_client_wrapper().get_job_by_id(job_id)


@handle_api_exception()
Expand All @@ -71,7 +71,7 @@ def list_jobs(): # noqa: E501
:rtype: Union[List[JobStatus], Tuple[List[JobStatus], int], Tuple[List[JobStatus], int, Dict[str, str]]
"""
return client_wrapper.list_jobs()
return get_client_wrapper().list_jobs()


@handle_api_exception()
Expand All @@ -89,4 +89,4 @@ def submit_dataloading_job(graph_id, dataloading_job_config): # noqa: E501
"""
if connexion.request.is_json:
dataloading_job_config = DataloadingJobConfig.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.submit_dataloading_job(graph_id, dataloading_job_config)
return get_client_wrapper().submit_dataloading_job(graph_id, dataloading_job_config)
12 changes: 6 additions & 6 deletions coordinator/gscoordinator/flex/controllers/service_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from gscoordinator.flex.models.start_service_request import StartServiceRequest # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -22,7 +22,7 @@ def get_service_status_by_id(graph_id): # noqa: E501
:rtype: Union[ServiceStatus, Tuple[ServiceStatus, int], Tuple[ServiceStatus, int, Dict[str, str]]
"""
return client_wrapper.get_service_status_by_id(graph_id)
return get_client_wrapper().get_service_status_by_id(graph_id)


@handle_api_exception()
Expand All @@ -34,7 +34,7 @@ def list_service_status(): # noqa: E501
:rtype: Union[List[ServiceStatus], Tuple[List[ServiceStatus], int], Tuple[List[ServiceStatus], int, Dict[str, str]]
"""
return client_wrapper.list_service_status()
return get_client_wrapper().list_service_status()


@handle_api_exception()
Expand All @@ -46,7 +46,7 @@ def restart_service(): # noqa: E501
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.restart_service()
return get_client_wrapper().restart_service()


@handle_api_exception()
Expand All @@ -62,7 +62,7 @@ def start_service(start_service_request=None): # noqa: E501
"""
if connexion.request.is_json:
start_service_request = StartServiceRequest.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.start_service(start_service_request)
return get_client_wrapper().start_service(start_service_request)


@handle_api_exception()
Expand All @@ -74,4 +74,4 @@ def stop_service(): # noqa: E501
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.stop_service()
return get_client_wrapper().stop_service()
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from gscoordinator.flex.models.update_stored_proc_request import UpdateStoredProcRequest # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -29,7 +29,7 @@ def create_stored_procedure(graph_id, create_stored_proc_request): # noqa: E501
"""
if connexion.request.is_json:
create_stored_proc_request = CreateStoredProcRequest.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.create_stored_procedure(graph_id, create_stored_proc_request)
return get_client_wrapper().create_stored_procedure(graph_id, create_stored_proc_request)


@handle_api_exception()
Expand All @@ -45,7 +45,7 @@ def delete_stored_procedure_by_id(graph_id, stored_procedure_id): # noqa: E501
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_stored_procedure_by_id(graph_id, stored_procedure_id)
return get_client_wrapper().delete_stored_procedure_by_id(graph_id, stored_procedure_id)


@handle_api_exception()
Expand All @@ -61,7 +61,7 @@ def get_stored_procedure_by_id(graph_id, stored_procedure_id): # noqa: E501
:rtype: Union[GetStoredProcResponse, Tuple[GetStoredProcResponse, int], Tuple[GetStoredProcResponse, int, Dict[str, str]]
"""
return client_wrapper.get_stored_procedure_by_id(graph_id, stored_procedure_id)
return get_client_wrapper().get_stored_procedure_by_id(graph_id, stored_procedure_id)


@handle_api_exception()
Expand All @@ -75,7 +75,7 @@ def list_stored_procedures(graph_id): # noqa: E501
:rtype: Union[List[GetStoredProcResponse], Tuple[List[GetStoredProcResponse], int], Tuple[List[GetStoredProcResponse], int, Dict[str, str]]
"""
return client_wrapper.list_stored_procedures(graph_id)
return get_client_wrapper().list_stored_procedures(graph_id)


@handle_api_exception()
Expand All @@ -95,4 +95,4 @@ def update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_p
"""
if connexion.request.is_json:
update_stored_proc_request = UpdateStoredProcRequest.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_proc_request)
return get_client_wrapper().update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_proc_request)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from gscoordinator.flex.models.upload_file_response import UploadFileResponse # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -22,4 +22,4 @@ def upload_file(filestorage=None): # noqa: E501
:rtype: Union[UploadFileResponse, Tuple[UploadFileResponse, int], Tuple[UploadFileResponse, int, Dict[str, str]]
"""
return client_wrapper.upload_file(filestorage)
return get_client_wrapper().upload_file(filestorage)
Loading

0 comments on commit 2cd1144

Please sign in to comment.