From 2cd11441fb98789879d703cdcefbf63afa6870a9 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Wed, 6 Nov 2024 15:07:45 +0800 Subject: [PATCH] fix(interactive): Return the correct port in coordinator's API (#4312) Maintain the port mapping info in coordinator, if deployed with docker. --- .github/workflows/flex-interactive.yml | 6 ++- coordinator/gscoordinator/coordinator.py | 2 + .../controllers/data_source_controller.py | 10 ++--- .../flex/controllers/deployment_controller.py | 12 +++--- .../flex/controllers/graph_controller.py | 22 +++++------ .../flex/controllers/job_controller.py | 12 +++--- .../flex/controllers/service_controller.py | 12 +++--- .../stored_procedure_controller.py | 12 +++--- .../flex/controllers/utils_controller.py | 4 +- .../gscoordinator/flex/core/__init__.py | 3 +- .../flex/core/alert/builtin_rules.py | 6 +-- .../gscoordinator/flex/core/client_wrapper.py | 20 +++++++--- .../gscoordinator/flex/core/insight/groot.py | 3 +- .../flex/core/interactive/hqps.py | 21 +++++++---- k8s/dockerfiles/interactive-entrypoint.sh | 37 ++++++++++++++++++- python/graphscope/config.py | 8 ++++ python/graphscope/gsctl/commands/dev.py | 8 +++- 17 files changed, 136 insertions(+), 62 deletions(-) diff --git a/.github/workflows/flex-interactive.yml b/.github/workflows/flex-interactive.yml index 505946aa4b1d..379f5faf2780 100644 --- a/.github/workflows/flex-interactive.yml +++ b/.github/workflows/flex-interactive.yml @@ -56,7 +56,7 @@ jobs: # install gsctl python3 -m pip install ${GITHUB_WORKSPACE}/python/dist/*.whl # launch service: 8080 for coordinator http port; 7687 for cypher port; - gsctl instance deploy --type interactive --image-registry graphscope --image-tag latest --interactive-config ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml + gsctl instance deploy --type interactive --image-registry graphscope --image-tag latest --cypher-port 7688 --interactive-config ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml sleep 20 # test python3 -m pip install --no-cache-dir pytest pytest-cov pytest-timeout pytest-xdist @@ -66,6 +66,10 @@ jobs: --exitfirst \ $(dirname $(python3 -c "import graphscope.gsctl as gsctl; print(gsctl.__file__)"))/tests/test_interactive.py + # test coordinator + res=`curl http://127.0.0.1:8080/api/v1/service` + echo $res | grep 7688 || exit 1 + # destroy instance gsctl instance destroy --type interactive -y diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index f4955102d8bf..bcac18e8a5fe 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -32,6 +32,7 @@ from graphscope.config import Config from graphscope.proto import coordinator_service_pb2_grpc +from gscoordinator.flex.core.client_wrapper import initialize_client_wrapper from gscoordinator.flex.encoder import JSONEncoder from gscoordinator.monitor import Monitor from gscoordinator.servicer import init_graphscope_one_service_servicer @@ -125,6 +126,7 @@ def get_servicer(config: Config): def start_http_service(config): + initialize_client_wrapper(config) app = connexion.App(__name__, specification_dir="./flex/openapi/") app.app.json_encoder = JSONEncoder app.add_api( diff --git a/coordinator/gscoordinator/flex/controllers/data_source_controller.py b/coordinator/gscoordinator/flex/controllers/data_source_controller.py index 792d6e5a11df..2d016dee40df 100644 --- a/coordinator/gscoordinator/flex/controllers/data_source_controller.py +++ b/coordinator/gscoordinator/flex/controllers/data_source_controller.py @@ -7,7 +7,7 @@ from gscoordinator.flex.models.schema_mapping import SchemaMapping # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -26,7 +26,7 @@ def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501 """ if connexion.request.is_json: schema_mapping = SchemaMapping.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.bind_datasource_in_batch(graph_id, schema_mapping) + return get_client_wrapper().bind_datasource_in_batch(graph_id, schema_mapping) @handle_api_exception() @@ -40,7 +40,7 @@ def get_datasource_by_id(graph_id): # noqa: E501 :rtype: Union[SchemaMapping, Tuple[SchemaMapping, int], Tuple[SchemaMapping, int, Dict[str, str]] """ - return client_wrapper.get_datasource_by_id(graph_id) + return get_client_wrapper().get_datasource_by_id(graph_id) @handle_api_exception() @@ -60,7 +60,7 @@ def unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_ :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type) + return get_client_wrapper().unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type) @handle_api_exception() @@ -76,4 +76,4 @@ def unbind_vertex_datasource(graph_id, type_name): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.unbind_vertex_datasource(graph_id, type_name) + return get_client_wrapper().unbind_vertex_datasource(graph_id, type_name) diff --git a/coordinator/gscoordinator/flex/controllers/deployment_controller.py b/coordinator/gscoordinator/flex/controllers/deployment_controller.py index 98ba7375f920..fc3a8d0f27c1 100644 --- a/coordinator/gscoordinator/flex/controllers/deployment_controller.py +++ b/coordinator/gscoordinator/flex/controllers/deployment_controller.py @@ -11,7 +11,7 @@ from gscoordinator.flex.models.running_deployment_status import RunningDeploymentStatus # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -24,7 +24,7 @@ def get_deployment_info(): # noqa: E501 :rtype: Union[RunningDeploymentInfo, Tuple[RunningDeploymentInfo, int], Tuple[RunningDeploymentInfo, int, Dict[str, str]] """ - return client_wrapper.get_deployment_info() + return get_client_wrapper().get_deployment_info() @handle_api_exception() @@ -42,7 +42,7 @@ def get_deployment_pod_log(pod_name, component, from_cache): # noqa: E501 :rtype: Union[GetPodLogResponse, Tuple[GetPodLogResponse, int], Tuple[GetPodLogResponse, int, Dict[str, str]] """ - return client_wrapper.get_deployment_pod_log(pod_name, component, from_cache) + return get_client_wrapper().get_deployment_pod_log(pod_name, component, from_cache) @handle_api_exception() @@ -54,7 +54,7 @@ def get_deployment_resource_usage(): # noqa: E501 :rtype: Union[GetResourceUsageResponse, Tuple[GetResourceUsageResponse, int], Tuple[GetResourceUsageResponse, int, Dict[str, str]] """ - return client_wrapper.get_deployment_resource_usage() + return get_client_wrapper().get_deployment_resource_usage() @handle_api_exception() @@ -66,7 +66,7 @@ def get_deployment_status(): # noqa: E501 :rtype: Union[RunningDeploymentStatus, Tuple[RunningDeploymentStatus, int], Tuple[RunningDeploymentStatus, int, Dict[str, str]] """ - return client_wrapper.get_deployment_status() + return get_client_wrapper().get_deployment_status() @handle_api_exception() @@ -78,4 +78,4 @@ def get_storage_usage(): # noqa: E501 :rtype: Union[GetStorageUsageResponse, Tuple[GetStorageUsageResponse, int], Tuple[GetStorageUsageResponse, int, Dict[str, str]] """ - return client_wrapper.get_storage_usage() + return get_client_wrapper().get_storage_usage() diff --git a/coordinator/gscoordinator/flex/controllers/graph_controller.py b/coordinator/gscoordinator/flex/controllers/graph_controller.py index ea7a6020610f..41c56c6e0897 100644 --- a/coordinator/gscoordinator/flex/controllers/graph_controller.py +++ b/coordinator/gscoordinator/flex/controllers/graph_controller.py @@ -13,7 +13,7 @@ from gscoordinator.flex.models.get_graph_schema_response import GetGraphSchemaResponse # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -32,7 +32,7 @@ def create_edge_type(graph_id, create_edge_type=None): # noqa: E501 """ if connexion.request.is_json: create_edge_type = CreateEdgeType.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.create_edge_type(graph_id, create_edge_type) + return get_client_wrapper().create_edge_type(graph_id, create_edge_type) @handle_api_exception() @@ -48,7 +48,7 @@ def create_graph(create_graph_request): # noqa: E501 """ if connexion.request.is_json: create_graph_request = CreateGraphRequest.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.create_graph(create_graph_request) + return get_client_wrapper().create_graph(create_graph_request) @handle_api_exception() @@ -66,7 +66,7 @@ def create_vertex_type(graph_id, create_vertex_type): # noqa: E501 """ if connexion.request.is_json: create_vertex_type = CreateVertexType.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.create_vertex_type(graph_id, create_vertex_type) + return get_client_wrapper().create_vertex_type(graph_id, create_vertex_type) @handle_api_exception() @@ -86,7 +86,7 @@ def delete_edge_type_by_name(graph_id, type_name, source_vertex_type, destinatio :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.delete_edge_type_by_name( + return get_client_wrapper().delete_edge_type_by_name( graph_id, type_name, source_vertex_type, destination_vertex_type ) @@ -102,7 +102,7 @@ def delete_graph_by_id(graph_id): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.delete_graph_by_id(graph_id) + return get_client_wrapper().delete_graph_by_id(graph_id) @handle_api_exception() @@ -118,7 +118,7 @@ def delete_vertex_type_by_name(graph_id, type_name): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.delete_vertex_type_by_name(graph_id, type_name) + return get_client_wrapper().delete_vertex_type_by_name(graph_id, type_name) @handle_api_exception() @@ -132,7 +132,7 @@ def get_graph_by_id(graph_id): # noqa: E501 :rtype: Union[GetGraphResponse, Tuple[GetGraphResponse, int], Tuple[GetGraphResponse, int, Dict[str, str]] """ - return client_wrapper.get_graph_by_id(graph_id) + return get_client_wrapper().get_graph_by_id(graph_id) @handle_api_exception() @@ -146,7 +146,7 @@ def get_schema_by_id(graph_id): # noqa: E501 :rtype: Union[GetGraphSchemaResponse, Tuple[GetGraphSchemaResponse, int], Tuple[GetGraphSchemaResponse, int, Dict[str, str]] """ - return client_wrapper.get_schema_by_id(graph_id) + return get_client_wrapper().get_schema_by_id(graph_id) @handle_api_exception() @@ -164,7 +164,7 @@ def import_schema_by_id(graph_id, create_graph_schema_request): # noqa: E501 """ if connexion.request.is_json: create_graph_schema_request = CreateGraphSchemaRequest.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.import_schema(graph_id, create_graph_schema_request) + return get_client_wrapper().import_schema(graph_id, create_graph_schema_request) @handle_api_exception() @@ -176,4 +176,4 @@ def list_graphs(): # noqa: E501 :rtype: Union[List[GetGraphResponse], Tuple[List[GetGraphResponse], int], Tuple[List[GetGraphResponse], int, Dict[str, str]] """ - return client_wrapper.list_graphs() + return get_client_wrapper().list_graphs() diff --git a/coordinator/gscoordinator/flex/controllers/job_controller.py b/coordinator/gscoordinator/flex/controllers/job_controller.py index ef5c58550a2c..30b218fff017 100644 --- a/coordinator/gscoordinator/flex/controllers/job_controller.py +++ b/coordinator/gscoordinator/flex/controllers/job_controller.py @@ -10,7 +10,7 @@ from gscoordinator.flex.models.job_status import JobStatus # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -27,7 +27,7 @@ def delete_job_by_id(job_id, delete_scheduler=None): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.delete_job_by_id(job_id, delete_scheduler) + return get_client_wrapper().delete_job_by_id(job_id, delete_scheduler) @handle_api_exception() @@ -45,7 +45,7 @@ def get_dataloading_job_config(graph_id, dataloading_job_config): # noqa: E501 """ if connexion.request.is_json: dataloading_job_config = DataloadingJobConfig.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.get_dataloading_job_config(graph_id, dataloading_job_config) + return get_client_wrapper().get_dataloading_job_config(graph_id, dataloading_job_config) @handle_api_exception() @@ -59,7 +59,7 @@ def get_job_by_id(job_id): # noqa: E501 :rtype: Union[JobStatus, Tuple[JobStatus, int], Tuple[JobStatus, int, Dict[str, str]] """ - return client_wrapper.get_job_by_id(job_id) + return get_client_wrapper().get_job_by_id(job_id) @handle_api_exception() @@ -71,7 +71,7 @@ def list_jobs(): # noqa: E501 :rtype: Union[List[JobStatus], Tuple[List[JobStatus], int], Tuple[List[JobStatus], int, Dict[str, str]] """ - return client_wrapper.list_jobs() + return get_client_wrapper().list_jobs() @handle_api_exception() @@ -89,4 +89,4 @@ def submit_dataloading_job(graph_id, dataloading_job_config): # noqa: E501 """ if connexion.request.is_json: dataloading_job_config = DataloadingJobConfig.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.submit_dataloading_job(graph_id, dataloading_job_config) + return get_client_wrapper().submit_dataloading_job(graph_id, dataloading_job_config) diff --git a/coordinator/gscoordinator/flex/controllers/service_controller.py b/coordinator/gscoordinator/flex/controllers/service_controller.py index d37fae8bb9c7..dceda84a7291 100644 --- a/coordinator/gscoordinator/flex/controllers/service_controller.py +++ b/coordinator/gscoordinator/flex/controllers/service_controller.py @@ -8,7 +8,7 @@ from gscoordinator.flex.models.start_service_request import StartServiceRequest # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -22,7 +22,7 @@ def get_service_status_by_id(graph_id): # noqa: E501 :rtype: Union[ServiceStatus, Tuple[ServiceStatus, int], Tuple[ServiceStatus, int, Dict[str, str]] """ - return client_wrapper.get_service_status_by_id(graph_id) + return get_client_wrapper().get_service_status_by_id(graph_id) @handle_api_exception() @@ -34,7 +34,7 @@ def list_service_status(): # noqa: E501 :rtype: Union[List[ServiceStatus], Tuple[List[ServiceStatus], int], Tuple[List[ServiceStatus], int, Dict[str, str]] """ - return client_wrapper.list_service_status() + return get_client_wrapper().list_service_status() @handle_api_exception() @@ -46,7 +46,7 @@ def restart_service(): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.restart_service() + return get_client_wrapper().restart_service() @handle_api_exception() @@ -62,7 +62,7 @@ def start_service(start_service_request=None): # noqa: E501 """ if connexion.request.is_json: start_service_request = StartServiceRequest.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.start_service(start_service_request) + return get_client_wrapper().start_service(start_service_request) @handle_api_exception() @@ -74,4 +74,4 @@ def stop_service(): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.stop_service() + return get_client_wrapper().stop_service() diff --git a/coordinator/gscoordinator/flex/controllers/stored_procedure_controller.py b/coordinator/gscoordinator/flex/controllers/stored_procedure_controller.py index 9410963119af..f0e84f820961 100644 --- a/coordinator/gscoordinator/flex/controllers/stored_procedure_controller.py +++ b/coordinator/gscoordinator/flex/controllers/stored_procedure_controller.py @@ -10,7 +10,7 @@ from gscoordinator.flex.models.update_stored_proc_request import UpdateStoredProcRequest # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -29,7 +29,7 @@ def create_stored_procedure(graph_id, create_stored_proc_request): # noqa: E501 """ if connexion.request.is_json: create_stored_proc_request = CreateStoredProcRequest.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.create_stored_procedure(graph_id, create_stored_proc_request) + return get_client_wrapper().create_stored_procedure(graph_id, create_stored_proc_request) @handle_api_exception() @@ -45,7 +45,7 @@ def delete_stored_procedure_by_id(graph_id, stored_procedure_id): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.delete_stored_procedure_by_id(graph_id, stored_procedure_id) + return get_client_wrapper().delete_stored_procedure_by_id(graph_id, stored_procedure_id) @handle_api_exception() @@ -61,7 +61,7 @@ def get_stored_procedure_by_id(graph_id, stored_procedure_id): # noqa: E501 :rtype: Union[GetStoredProcResponse, Tuple[GetStoredProcResponse, int], Tuple[GetStoredProcResponse, int, Dict[str, str]] """ - return client_wrapper.get_stored_procedure_by_id(graph_id, stored_procedure_id) + return get_client_wrapper().get_stored_procedure_by_id(graph_id, stored_procedure_id) @handle_api_exception() @@ -75,7 +75,7 @@ def list_stored_procedures(graph_id): # noqa: E501 :rtype: Union[List[GetStoredProcResponse], Tuple[List[GetStoredProcResponse], int], Tuple[List[GetStoredProcResponse], int, Dict[str, str]] """ - return client_wrapper.list_stored_procedures(graph_id) + return get_client_wrapper().list_stored_procedures(graph_id) @handle_api_exception() @@ -95,4 +95,4 @@ def update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_p """ if connexion.request.is_json: update_stored_proc_request = UpdateStoredProcRequest.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_proc_request) + return get_client_wrapper().update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_proc_request) diff --git a/coordinator/gscoordinator/flex/controllers/utils_controller.py b/coordinator/gscoordinator/flex/controllers/utils_controller.py index 731bdb5f8645..0f80be8ea4f3 100644 --- a/coordinator/gscoordinator/flex/controllers/utils_controller.py +++ b/coordinator/gscoordinator/flex/controllers/utils_controller.py @@ -7,7 +7,7 @@ from gscoordinator.flex.models.upload_file_response import UploadFileResponse # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -22,4 +22,4 @@ def upload_file(filestorage=None): # noqa: E501 :rtype: Union[UploadFileResponse, Tuple[UploadFileResponse, int], Tuple[UploadFileResponse, int, Dict[str, str]] """ - return client_wrapper.upload_file(filestorage) + return get_client_wrapper().upload_file(filestorage) diff --git a/coordinator/gscoordinator/flex/core/__init__.py b/coordinator/gscoordinator/flex/core/__init__.py index 606078a84fba..11501683b369 100644 --- a/coordinator/gscoordinator/flex/core/__init__.py +++ b/coordinator/gscoordinator/flex/core/__init__.py @@ -21,5 +21,6 @@ # Disable warnings warnings.filterwarnings("ignore", category=Warning) -from gscoordinator.flex.core.client_wrapper import client_wrapper # noqa: F401, E402 +from gscoordinator.flex.core.client_wrapper import \ + get_client_wrapper # noqa: F401, E402 from gscoordinator.flex.core.utils import handle_api_exception # noqa: F401, E402 diff --git a/coordinator/gscoordinator/flex/core/alert/builtin_rules.py b/coordinator/gscoordinator/flex/core/alert/builtin_rules.py index fb22b9478b3c..1344e4186fb8 100644 --- a/coordinator/gscoordinator/flex/core/alert/builtin_rules.py +++ b/coordinator/gscoordinator/flex/core/alert/builtin_rules.py @@ -23,7 +23,7 @@ import psutil from gremlin_python.driver.client import Client -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core.alert.alert_rule import AlertRule from gscoordinator.flex.core.alert.message_collector import AlertMessageCollector from gscoordinator.flex.core.config import CLUSTER_TYPE @@ -65,7 +65,7 @@ def run_alert(self): try: alert_nodes = [] disk_usages = [] - disk_utils = client_wrapper.get_storage_usage().to_dict() + disk_utils = get_client_wrapper().get_storage_usage().to_dict() for node, usage in disk_utils["storage_usage"].items(): if float(usage) > self._threshold: alert_nodes.append(node) @@ -107,7 +107,7 @@ def __init__( def run_alert(self): """This function needs to handle exception by itself""" try: - available = client_wrapper.gremlin_service_available() + available = get_client_wrapper().gremlin_service_available() if not available: message = f"Gremlin service unavailable: unknown reason" except Exception as e: diff --git a/coordinator/gscoordinator/flex/core/client_wrapper.py b/coordinator/gscoordinator/flex/core/client_wrapper.py index 7d39bd03084b..d16c41da702f 100644 --- a/coordinator/gscoordinator/flex/core/client_wrapper.py +++ b/coordinator/gscoordinator/flex/core/client_wrapper.py @@ -22,6 +22,8 @@ import threading from typing import List +from graphscope.config import Config + from gscoordinator.flex.core.config import CLUSTER_TYPE from gscoordinator.flex.core.config import DATASET_WORKSPACE from gscoordinator.flex.core.config import SOLUTION @@ -61,17 +63,17 @@ class ClientWrapper(object): """Wrapper of client that interacts with engine""" - def __init__(self): + def __init__(self, config: Config): # lock to protect the service self._lock = threading.RLock() # initialize specific client - self._client = self._initialize_client() + self._client = self._initialize_client(config) # data source management self._datasource_manager = DataSourceManager() # deployment self._deployment = initialize_deployemnt() - def _initialize_client(self): + def _initialize_client(self, config: Config): service_initializer = { "INTERACTIVE": init_hqps_client, "GRAPHSCOPE_INSIGHT": init_groot_client, @@ -80,7 +82,7 @@ def _initialize_client(self): if initializer is None: logger.warn(f"Client initializer of {SOLUTION} not found.") return None - return initializer() + return initializer(config) def list_graphs(self) -> List[GetGraphResponse]: graphs = self._client.list_graphs() @@ -368,4 +370,12 @@ def gremlin_service_available(self) -> bool: return self._client.gremlin_service_available() -client_wrapper = ClientWrapper() +client_wrapper = None + +# Interactive/Insight specific configuration +def initialize_client_wrapper(config=None): + global client_wrapper + client_wrapper = ClientWrapper(config) + +def get_client_wrapper(): + return client_wrapper diff --git a/coordinator/gscoordinator/flex/core/insight/groot.py b/coordinator/gscoordinator/flex/core/insight/groot.py index a5aaa6738daa..a1b7dede86f7 100644 --- a/coordinator/gscoordinator/flex/core/insight/groot.py +++ b/coordinator/gscoordinator/flex/core/insight/groot.py @@ -26,6 +26,7 @@ from typing import List import psutil +from graphscope.config import Config from gremlin_python.driver.client import Client from gscoordinator.flex.core.config import CLUSTER_TYPE @@ -307,5 +308,5 @@ def gremlin_service_available(self) -> bool: return True -def init_groot_client(): +def init_groot_client(config: Config): return GrootClient() diff --git a/coordinator/gscoordinator/flex/core/interactive/hqps.py b/coordinator/gscoordinator/flex/core/interactive/hqps.py index 202df40a1436..519c03ed721c 100644 --- a/coordinator/gscoordinator/flex/core/interactive/hqps.py +++ b/coordinator/gscoordinator/flex/core/interactive/hqps.py @@ -28,6 +28,7 @@ import gs_interactive import psutil import requests +from graphscope.config import Config from gs_interactive.models.create_graph_request import CreateGraphRequest from gs_interactive.models.create_procedure_request import CreateProcedureRequest from gs_interactive.models.schema_mapping import SchemaMapping @@ -49,9 +50,15 @@ class HQPSClient(object): """Class used to interact with hqps engine""" - def __init__(self): + def __init__(self, config: Config): # hqps admin service endpoint self._hqps_endpoint = self._get_hqps_service_endpoints() + self._port_mapping = config.interactive.port_mapping + + def _get_mapped_port(self, port: int) -> int: + if self._port_mapping and port in self._port_mapping: + return self._port_mapping[port] + return port def _get_hqps_service_endpoints(self): if CLUSTER_TYPE == "HOSTS": @@ -229,7 +236,7 @@ def list_service_status(self) -> List[dict]: api_instance = gs_interactive.AdminServiceServiceManagementApi(api_client) response = api_instance.get_service_status() if CLUSTER_TYPE == "HOSTS": - host = get_internal_ip() + host = 'localhost' # for interactive deployed in hosts, we could not determine the public ip in container. So we let user to replace with the public ip. if response.status == "Running" and response.graph is not None: g = response.graph.to_dict() serving_graph_id = g["id"] @@ -239,9 +246,9 @@ def list_service_status(self) -> List[dict]: status = { "status": response.status, "sdk_endpoints": { - "cypher": f"neo4j://{host}:{response.bolt_port} (internal)", - "hqps": f"http://{host}:{response.hqps_port} (internal)", - "gremlin": f"ws://{host}:{response.gremlin_port}/gremlin (internal)", + "cypher": f"neo4j://{host}:{self._get_mapped_port(response.bolt_port)} (Replace localhost with public ip if connecting from outside)", + "hqps": f"http://{host}:{self._get_mapped_port(response.hqps_port)} (Replace localhost with public ip if connecting from outside)", + "gremlin": f"ws://{host}:{self._get_mapped_port(response.gremlin_port)}/gremlin (Replace localhost with public ip if connecting from outside)", }, "start_time": service_start_time, "graph_id": g["id"], @@ -372,5 +379,5 @@ def import_schema(self, graph_id, schema: dict): raise RuntimeError("Method is not supported.") -def init_hqps_client(): - return HQPSClient() +def init_hqps_client(config: Config): + return HQPSClient(config) diff --git a/k8s/dockerfiles/interactive-entrypoint.sh b/k8s/dockerfiles/interactive-entrypoint.sh index 74eb149b3118..009323f6c31f 100644 --- a/k8s/dockerfiles/interactive-entrypoint.sh +++ b/k8s/dockerfiles/interactive-entrypoint.sh @@ -32,6 +32,9 @@ function usage() { -c, --enable-coordinator: Launch the Interactive service along with Coordinator. Must enable this option if you want to use `gsctl` command-line tool. + -p, --port-mapping: Specify the port mapping for the interactive. + The format is container_port:host_port, multiple mappings are + separated by comma. For example, 8080:8081,7777:7778 EOF } @@ -90,6 +93,16 @@ function launch_service() { } function launch_coordinator() { + local host_ports=() + local container_ports=() + if [ -n "$1" ]; then + IFS=',' read -ra port_mappings <<< "$1" + for port_mapping in "${port_mappings[@]}"; do + IFS=':' read -ra ports <<< "$port_mapping" + container_ports+=(${ports[0]}) + host_ports+=(${ports[1]}) + done + fi if $ENABLE_COORDINATOR; then coordinator_config_file="/tmp/coordinator-config.yaml" @@ -102,6 +115,19 @@ launcher_type: hosts session: instance_id: demo EOF + + if [ ${#host_ports[@]} -gt 0 ]; then + echo "interactive:" >> $coordinator_config_file + echo " port_mapping:" >> $coordinator_config_file + for i in "${!host_ports[@]}"; do + echo " ${container_ports[$i]}: ${host_ports[$i]}" >> $coordinator_config_file + done + fi + # i.e + # interactive: + # port_mapping: + # 8080: 8081 + # 7777: 7778 python3 -m gscoordinator --config-file $coordinator_config_file fi } @@ -126,6 +152,15 @@ while [[ $# -gt 0 ]]; do ENABLE_COORDINATOR=true shift ;; + -p | --port-mapping) + shift + if [[ $# -eq 0 || $1 == -* ]]; then + echo "Option -p requires an argument." >&2 + exit 1 + fi + PORT_MAPPING=$1 + shift + ;; -h | --help) usage exit 0 @@ -141,4 +176,4 @@ done prepare_workspace $WORKSPACE launch_service $WORKSPACE -launch_coordinator +launch_coordinator $PORT_MAPPING diff --git a/python/graphscope/config.py b/python/graphscope/config.py index c0294fdb3337..f075cf3a7b10 100644 --- a/python/graphscope/config.py +++ b/python/graphscope/config.py @@ -228,6 +228,12 @@ class VineyardConfig: ) +@dataclass +class InteractiveConfig: + # a map from internal port to external port + port_mapping: Union[dict, None] = None + + @dataclass class CoordinatorConfig: endpoint: Union[str, None] = None @@ -351,6 +357,8 @@ class Config(Serializable): coordinator: CoordinatorConfig = field(default_factory=CoordinatorConfig) # Vineyard configuration. vineyard: VineyardConfig = field(default_factory=VineyardConfig) + # Interactive configuration. + interactive: InteractiveConfig = field(default_factory=InteractiveConfig) # Local cluster configuration. hosts_launcher: HostsLauncherConfig = field(default_factory=HostsLauncherConfig) diff --git a/python/graphscope/gsctl/commands/dev.py b/python/graphscope/gsctl/commands/dev.py index c88a7977a12c..98c09e1dcbd6 100644 --- a/python/graphscope/gsctl/commands/dev.py +++ b/python/graphscope/gsctl/commands/dev.py @@ -251,7 +251,6 @@ def deploy( ] if gremlin_port != -1: cmd.extend(["-p", f"{gremlin_port}:8182"]) - image = f"{image_registry}/{type}:{image_tag}" if interactive_config is not None: if not os.path.isfile(interactive_config): click.secho( @@ -263,7 +262,14 @@ def deploy( cmd.extend( ["-v", f"{interactive_config}:{INTERACTIVE_DOCKER_DEFAULT_CONFIG_PATH}"] ) + image = f"{image_registry}/{type}:{image_tag}" cmd.extend([image, "--enable-coordinator"]) + cmd.extend( + [ + "--port-mapping", + f"8080:{coordinator_port},7777:{admin_port},10000:{storedproc_port},7687:{cypher_port}", + ] + ) returncode = run_shell_cmd(cmd, os.getcwd()) if returncode == 0: message = f"""