diff --git a/coordinator/gscoordinator/cluster_builder.py b/coordinator/gscoordinator/cluster_builder.py index 2c1614fc8ba1..3553a697b8b2 100644 --- a/coordinator/gscoordinator/cluster_builder.py +++ b/coordinator/gscoordinator/cluster_builder.py @@ -66,7 +66,7 @@ def __init__( learning_start_port, ): self._instance_id = config.session.instance_id - self._glog_level = parse_as_glog_level(config.session.log_level) + self._glog_level = parse_as_glog_level(config.log_level) self._num_workers = config.session.num_workers launcher_config: KubernetesLauncherConfig = config.kubernetes_launcher diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index 17f324418806..04c282c3fe36 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -19,87 +19,21 @@ """Coordinator between client and engines""" import argparse -import atexit import base64 -import functools -import json import logging import os -import pickle -import queue -import random -import re import signal import sys -import threading -import traceback from concurrent import futures -from string import ascii_letters import grpc -from packaging import version -from simple_parsing import ArgumentParser - -from gscoordinator.io_utils import StdStreamWrapper - -# capture system stdout -sys.stdout = StdStreamWrapper(sys.stdout) -sys.stderr = StdStreamWrapper(sys.stderr) - from graphscope.config import Config -from graphscope.framework.utils import PipeMerger -from graphscope.framework.utils import i_to_attr -from graphscope.framework.utils import s_to_attr from graphscope.proto import coordinator_service_pb2_grpc -from graphscope.proto import error_codes_pb2 -from graphscope.proto import message_pb2 -from graphscope.proto import types_pb2 - -from gscoordinator.dag_manager import DAGManager -from gscoordinator.dag_manager import GSEngine -from gscoordinator.kubernetes_launcher import KubernetesClusterLauncher -from gscoordinator.launcher import AbstractLauncher -from gscoordinator.local_launcher import LocalLauncher -from gscoordinator.monitor import Monitor -from gscoordinator.object_manager import InteractiveInstanceManager -from gscoordinator.object_manager import LearningInstanceManager -from gscoordinator.object_manager import ObjectManager -from gscoordinator.op_executor import OperationExecutor -from gscoordinator.operator_launcher import OperatorLauncher -from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH -from gscoordinator.utils import check_server_ready -from gscoordinator.utils import create_single_op_dag -from gscoordinator.version import __version__ - -def catch_unknown_errors(response_on_error=None, using_yield=False): - """A catcher that catches all (unknown) exceptions in gRPC handlers to ensure - the client not think the coordinator services is crashed. - """ +from gscoordinator.servicer import init_graphscope_one_service_servicer +from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH - def catch_exceptions(handler): - @functools.wraps(handler) - def handler_execution(self, request, context): - try: - if using_yield: - for result in handler(self, request, context): - yield result - else: - yield handler(self, request, context) - except Exception as exc: - error_message = repr(exc) - error_traceback = traceback.format_exc() - context.set_code(grpc.StatusCode.ABORTED) - context.set_details( - 'Error occurs in handler: "%s", with traceback: ' % error_message - + error_traceback - ) - if response_on_error is not None: - yield response_on_error - - return handler_execution - - return catch_exceptions +logger = logging.getLogger("graphscope") def config_logging(log_level): @@ -140,514 +74,21 @@ def config_logging(log_level): vineyard_logger.addHandler(stderr_handler) -logger = logging.getLogger("graphscope") - - -class CoordinatorServiceServicer( - coordinator_service_pb2_grpc.CoordinatorServiceServicer -): - """Provides methods that implement functionality of master service server. - Holding: - 1. launcher: the engine launcher. - 2. session_id: the handle for a particular session to engine - 3. object_manager: the object manager for the session - 4. operation_executor: the operation executor for the session - """ - - def __init__( - self, launcher: AbstractLauncher, dangling_timeout_seconds, log_level="INFO" - ): - config_logging(log_level) - - self._operator_mode = False - - self._object_manager = ObjectManager() - - # only one connection is allowed at the same time - self._connected = False - - # control log fetching - self._streaming_logs = False - self._pipe_merged = PipeMerger(sys.stdout, sys.stderr) - - self._session_id = "session_" + "".join(random.choices(ascii_letters, k=8)) - - # dangling check - self._dangling_timeout_seconds = dangling_timeout_seconds - self._dangling_detecting_timer = None - self._cleanup_instance = False - # the dangling timer should be initialized after the launcher started, - # otherwise there would be a deadlock if `self._launcher.start()` failed. - self._set_dangling_timer(cleanup_instance=True) - - # a lock that protects the coordinator - self._lock = threading.RLock() - atexit.register(self.cleanup) - - self._launcher = launcher - self._launcher.set_session_workspace(self._session_id) - if not self._launcher.start(): - raise RuntimeError("Coordinator launching instance failed.") - - self._operation_executor: OperationExecutor = OperationExecutor( - self._session_id, self._launcher, self._object_manager - ) - - def __del__(self): - self.cleanup() - - @Monitor.connectSession - def ConnectSession(self, request, context): - if self._launcher.analytical_engine_endpoint is not None: - engine_config = self._operation_executor.get_analytical_engine_config() - engine_config.update(self._launcher.get_engine_config()) - host_names = self._launcher.hosts - else: - engine_config = {} - host_names = [] - - # A session is already connected. - if self._connected: - if getattr(request, "reconnect", False): - return message_pb2.ConnectSessionResponse( - session_id=self._session_id, - cluster_type=self._launcher.type(), - num_workers=self._launcher.num_workers, - namespace=self._launcher.get_namespace(), - engine_config=json.dumps(engine_config), - host_names=host_names, - ) - else: - # connect failed, more than one connection at the same time. - context.set_code(grpc.StatusCode.ALREADY_EXISTS) - context.set_details( - "Cannot setup more than one connection at the same time." - ) - return message_pb2.ConnectSessionResponse() - # check version compatibility from client - sv = version.parse(__version__) - cv = version.parse(request.version) - if sv.major != cv.major or sv.minor != cv.minor: - error_msg = f"Version between client and server is inconsistent: {request.version} vs {__version__}" - logger.warning(error_msg) - context.set_code(grpc.StatusCode.FAILED_PRECONDITION) - context.set_details(error_msg) - return message_pb2.ConnectSessionResponse() - - # Connect to serving coordinator. - self._connected = True - # Cleanup after timeout seconds - self._dangling_timeout_seconds = request.dangling_timeout_seconds - # other timeout seconds - self._comm_timeout_seconds = getattr(request, "comm_timeout_seconds", 120) - # If true, also delete graphscope instance (such as pods) in closing process - self._cleanup_instance = request.cleanup_instance - - # Session connected, fetch logs via gRPC. - self._streaming_logs = True - sys.stdout.drop(False) - sys.stderr.drop(False) - - return message_pb2.ConnectSessionResponse( - session_id=self._session_id, - cluster_type=self._launcher.type(), - num_workers=self._launcher.num_workers, - namespace=self._launcher.get_namespace(), - engine_config=json.dumps(engine_config), - host_names=host_names, - ) - - @Monitor.closeSession - def CloseSession(self, request, context): - """ - Disconnect session, note that it won't clean up any resources if self._cleanup_instance is False. - """ - if request.session_id != self._session_id: - context.set_code(grpc.StatusCode.INVALID_ARGUMENT) - context.set_details( - f"Session handle not matched, {request.session_id} versus {self._session_id}" - ) - return message_pb2.CloseSessionResponse() - - self._connected = False - - self.cleanup(cleanup_instance=self._cleanup_instance, is_dangling=False) - if self._cleanup_instance: - self._session_id = None - self._operation_executor = None - - # Session closed, stop streaming logs - sys.stdout.drop(True) - sys.stderr.drop(True) - self._streaming_logs = False - return message_pb2.CloseSessionResponse() - - def HeartBeat(self, request, context): - self._reset_dangling_timer(self._connected, self._cleanup_instance) - # analytical engine - # if self._operation_executor is not None: - # return self._operation_executor.heart_beat(request) - return message_pb2.HeartBeatResponse() - - def RunStep(self, request_iterator, context): - with self._lock: - for response in self.RunStepWrapped(request_iterator, context): - yield response - - def _RunStep(self, request_iterator, context): - from gremlin_python.driver.protocol import GremlinServerError - - # split dag - dag_manager = DAGManager(request_iterator) - loader_op_bodies = {} - - # response list for stream - responses = [ - message_pb2.RunStepResponse(head=message_pb2.RunStepResponseHead()) - ] - - while not dag_manager.empty(): - run_dag_on, dag, dag_bodies = dag_manager.next_dag() - error_code = error_codes_pb2.COORDINATOR_INTERNAL_ERROR - head, bodies = None, None - try: - # run on analytical engine - if run_dag_on == GSEngine.analytical_engine: - # need dag_bodies to load graph from pandas/numpy - error_code = error_codes_pb2.ANALYTICAL_ENGINE_INTERNAL_ERROR - head, bodies = self._operation_executor.run_on_analytical_engine( - dag, dag_bodies, loader_op_bodies - ) - # run on interactive engine - elif run_dag_on == GSEngine.interactive_engine: - error_code = error_codes_pb2.INTERACTIVE_ENGINE_INTERNAL_ERROR - head, bodies = self._operation_executor.run_on_interactive_engine( - dag - ) - # run on learning engine - elif run_dag_on == GSEngine.learning_engine: - error_code = error_codes_pb2.LEARNING_ENGINE_INTERNAL_ERROR - head, bodies = self._operation_executor.run_on_learning_engine(dag) - # run on coordinator - elif run_dag_on == GSEngine.coordinator: - error_code = error_codes_pb2.COORDINATOR_INTERNAL_ERROR - head, bodies = self._operation_executor.run_on_coordinator( - dag, dag_bodies, loader_op_bodies - ) - # merge the responses - responses[0].head.results.extend(head.head.results) - responses.extend(bodies) - - except grpc.RpcError as exc: - # Not raised by graphscope, maybe socket closed, etc. - context.set_code(exc.code()) - context.set_details( - f"{exc.details()}. The traceback is: {traceback.format_exc()}" - ) - - # stop yield to raise the grpc errors to the client, as the - # client may consume nothing if error happens - return - - except GremlinServerError as exc: - response_head = responses[0] - response_head.head.code = error_code - response_head.head.error_msg = f"Error occurred during RunStep. The traceback is: {traceback.format_exc()}" - if hasattr(exc, "status_message"): - exc_message = exc.status_message - else: - exc_message = str(exc) - response_head.head.full_exception = pickle.dumps( - ( - GremlinServerError, - { - "code": exc.status_code, - "message": exc_message, - "attributes": exc.status_attributes, - }, - ) - ) - # stop iteration to propagate the error to client immediately - break - - except Exception as exc: - response_head = responses[0] - response_head.head.code = error_code - response_head.head.error_msg = f"Error occurred during RunStep, The traceback is: {traceback.format_exc()}" - response_head.head.full_exception = pickle.dumps(exc) - - # stop iteration to propagate the error to client immediately - break - - for response in responses: - yield response - - RunStepWrapped = catch_unknown_errors( - message_pb2.RunStepResponse(head=message_pb2.RunStepResponseHead()), True - )(_RunStep) - - def FetchLogs(self, request, context): - while self._streaming_logs: - try: - info_message, error_message = self._pipe_merged.poll(timeout=2) - except queue.Empty: - info_message, error_message = "", "" - except Exception as e: - info_message, error_message = ( - f"WARNING: failed to read log: {e}. The traceback is: {traceback.format_exc()}", - "", - ) - - if info_message or error_message: - if self._streaming_logs: - yield message_pb2.FetchLogsResponse( - info_message=info_message, error_message=error_message - ) - - def AddLib(self, request, context): - try: - self._operation_executor.add_lib(request) - except Exception as e: - context.abort(grpc.StatusCode.ABORTED, str(e)) - return message_pb2.AddLibResponse() - - def CreateAnalyticalInstance(self, request, context): - engine_config = {} - try: - # create GAE rpc service - self._launcher.create_analytical_instance() - engine_config = self._operation_executor.get_analytical_engine_config() - engine_config.update(self._launcher.get_engine_config()) - except NotImplementedError: - # TODO: This is a workaround for that we launching gae unconditionally after session connects, - # make it an error when above logic has been changed. - logger.warning("Analytical engine is not enabled.") - except grpc.RpcError as e: - context.set_code(e.code()) - context.set_details( - f"Get engine config failed: {e.details()}. The traceback is: {traceback.format_exc()}" - ) - return message_pb2.CreateAnalyticalInstanceResponse() - except Exception as e: - context.abort( - grpc.StatusCode.ABORTED, - f"${e}. The traceback is: {traceback.format_exc()}", - ) - return message_pb2.CreateAnalyticalInstanceResponse() - return message_pb2.CreateAnalyticalInstanceResponse( - engine_config=json.dumps(engine_config), - host_names=self._launcher.hosts, - ) - - def CreateInteractiveInstance(self, request, context): - def _match_frontend_endpoint(pattern, lines): - for line in lines.split("\n"): - rlt = re.findall(pattern, line) - if rlt: - return rlt[0].strip() - return "" - - # frontend endpoint pattern - FRONTEND_GREMLIN_PATTERN = re.compile("(?<=FRONTEND_GREMLIN_ENDPOINT:).*$") - FRONTEND_CYPHER_PATTERN = re.compile("(?<=FRONTEND_CYPHER_ENDPOINT:).*$") - # frontend external endpoint, for clients that are outside of cluster to connect - # only available in kubernetes mode, exposed by NodePort or LoadBalancer - FRONTEND_EXTERNAL_GREMLIN_PATTERN = re.compile( - "(?<=FRONTEND_EXTERNAL_GREMLIN_ENDPOINT:).*$" - ) - FRONTEND_EXTERNAL_CYPHER_PATTERN = re.compile( - "(?<=FRONTEND_EXTERNAL_CYPHER_ENDPOINT:).*$" - ) - - # create instance - object_id = request.object_id - schema_path = request.schema_path - params = request.params - with_cypher = request.with_cypher - try: - proc = self._launcher.create_interactive_instance( - object_id, schema_path, params, with_cypher - ) - gie_manager = InteractiveInstanceManager(object_id) - # Put it to object_manager to ensure it could be killed during coordinator cleanup - # If coordinator is shutdown by force when creating interactive instance - self._object_manager.put(object_id, gie_manager) - # 60 seconds is enough, see also GH#1024; try 120 - # already add errs to outs - outs, _ = proc.communicate(timeout=120) # throws TimeoutError - return_code = proc.poll() - if return_code != 0: - raise RuntimeError(f"Error code: {return_code}, message {outs}") - # match frontend endpoints and check for ready - gremlin_endpoint = _match_frontend_endpoint(FRONTEND_GREMLIN_PATTERN, outs) - cypher_endpoint = _match_frontend_endpoint(FRONTEND_CYPHER_PATTERN, outs) - logger.debug("Got endpoints: %s %s", gremlin_endpoint, cypher_endpoint) - # coordinator use internal endpoint - gie_manager.set_endpoint(gremlin_endpoint) - if check_server_ready(gremlin_endpoint, server="gremlin"): - logger.info( - "Built interactive gremlin frontend: %s for graph %ld", - gremlin_endpoint, - object_id, - ) - - if with_cypher and check_server_ready( - cypher_endpoint, server="cypher" - ): # throws TimeoutError - logger.info( - "Built interactive cypher frontend: %s for graph %ld", - cypher_endpoint, - object_id, - ) - except Exception as e: - context.set_code(grpc.StatusCode.ABORTED) - context.set_details( - f"Create interactive instance failed: ${e}. The traceback is: {traceback.format_exc()}" - ) - self._launcher.close_interactive_instance(object_id) - self._object_manager.pop(object_id) - return message_pb2.CreateInteractiveInstanceResponse() - external_gremlin_endpoint = _match_frontend_endpoint( - FRONTEND_EXTERNAL_GREMLIN_PATTERN, outs - ) - external_cypher_endpoint = _match_frontend_endpoint( - FRONTEND_EXTERNAL_CYPHER_PATTERN, outs - ) - logger.debug( - "Got external endpoints: %s %s", - external_gremlin_endpoint, - external_cypher_endpoint, - ) - - # client use external endpoint (k8s mode), or internal endpoint (standalone mode) - gremlin_endpoint = external_gremlin_endpoint or gremlin_endpoint - cypher_endpoint = external_cypher_endpoint or cypher_endpoint - return message_pb2.CreateInteractiveInstanceResponse( - gremlin_endpoint=gremlin_endpoint, - cypher_endpoint=cypher_endpoint, - object_id=object_id, - ) +def launch_graphscope(): + args = parse_sys_args() + if args.config: + config = base64.b64decode(args.config).decode("utf-8", errors="ignore") + config = Config.loads_json(config) + elif args.config_file: + config = Config.load(args.config_file) + else: + raise RuntimeError("Must specify a config or config-file") - def CreateLearningInstance(self, request, context): - object_id = request.object_id - logger.info("Create learning instance with object id %ld", object_id) - handle, config, learning_backend = ( - request.handle, - request.config, - request.learning_backend, - ) - try: - endpoints = self._launcher.create_learning_instance( - object_id, handle, config, learning_backend - ) - self._object_manager.put(object_id, LearningInstanceManager(object_id)) - except Exception as e: - context.set_code(grpc.StatusCode.ABORTED) - context.set_details( - f"Create learning instance failed: ${e}. The traceback is: {traceback.format_exc()}" - ) - self._launcher.close_learning_instance(object_id) - self._object_manager.pop(object_id) - return message_pb2.CreateLearningInstanceResponse() - return message_pb2.CreateLearningInstanceResponse( - object_id=object_id, handle=handle, config=config, endpoints=endpoints - ) + config_logging(config.log_level) + logger.info("Start server with args \n%s", config.dumps_yaml()) - def CloseAnalyticalInstance(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("CloseAnalyticalInstance is not implemented") - return message_pb2.CloseAnalyticalInstanceResponse() - - def CloseInteractiveInstance(self, request, context): - object_id = request.object_id - if object_id in self._object_manager: - self._object_manager.pop(object_id) - try: - self._launcher.close_interactive_instance(object_id) - except Exception as e: - context.set_code(grpc.StatusCode.ABORTED) - context.set_details( - f"Close interactive instance failed: ${e}. The traceback is: {traceback.format_exc()}" - ) - return message_pb2.CloseInteractiveInstanceResponse() - - def CloseLearningInstance(self, request, context): - object_id = request.object_id - if object_id in self._object_manager: - self._object_manager.pop(object_id) - logger.info("Close learning instance with object id %ld", object_id) - try: - self._launcher.close_learning_instance(object_id) - except Exception as e: - context.set_code(grpc.StatusCode.ABORTED) - context.set_details( - f"Close learning instance failed: ${e}. The traceback is: {traceback.format_exc()}" - ) - return message_pb2.CloseLearningInstanceResponse() - - @Monitor.cleanup - def cleanup(self, cleanup_instance=True, is_dangling=False): - # clean up session resources. - logger.info( - "Clean up resources, cleanup_instance: %s, is_dangling: %s", - cleanup_instance, - is_dangling, - ) - for _, obj in self._object_manager.items(): - op_type, config = None, {} - if obj.type == "app": - op_type = types_pb2.UNLOAD_APP - config[types_pb2.APP_NAME] = s_to_attr(obj.key) - elif obj.type == "graph": - op_type = types_pb2.UNLOAD_GRAPH - config[types_pb2.GRAPH_NAME] = s_to_attr(obj.key) - # dynamic graph doesn't have a object id - if obj.object_id != -1: - config[types_pb2.VINEYARD_ID] = i_to_attr(obj.object_id) - elif obj.type == "gie_manager": - self._launcher.close_interactive_instance(obj.object_id) - elif obj.type == "gle_manager": - self._launcher.close_learning_instance(obj.object_id) - - if op_type is not None: - dag_def = create_single_op_dag(op_type, config) - try: - self._operation_executor.run_step(dag_def, []) - except grpc.RpcError as e: - logger.error( - "Cleanup failed, code: %s, details: %s. The traceback is: %s", - e.code().name, - e.details(), - traceback.format_exc(), - ) - - self._object_manager.clear() - self._cancel_dangling_timer() - - if cleanup_instance: - self._launcher.stop(is_dangling=is_dangling) - - def _set_dangling_timer(self, cleanup_instance: bool): - if self._dangling_timeout_seconds > 0: - self._dangling_detecting_timer = threading.Timer( - interval=self._dangling_timeout_seconds, - function=self.cleanup, - args=( - cleanup_instance, - True, - ), - ) - self._dangling_detecting_timer.start() - - def _cancel_dangling_timer(self): - if self._dangling_detecting_timer is not None: - self._dangling_detecting_timer.cancel() - self._dangling_detecting_timer = None - - def _reset_dangling_timer(self, reset: bool, cleanup_instance: bool): - if reset: - self._cancel_dangling_timer() - self._set_dangling_timer(cleanup_instance) + servicer = get_servicer(config) + start_server(servicer, config) def parse_sys_args(): @@ -664,39 +105,25 @@ def parse_sys_args(): return parser.parse_args() -def launch_graphscope(): - args = parse_sys_args() - if args.config: - config = base64.b64decode(args.config).decode("utf-8", errors="ignore") - config = Config.loads_json(config) - elif args.config_file: - config = Config.load(args.config_file) - else: - raise RuntimeError("Must specify a config or config-file") - logger.info("Start server with args \n%s", config.dumps_yaml()) - launcher = get_launcher(config) - start_server(launcher, config) - +def get_servicer(config: Config): + """Get servicer of specified solution under FLEX architecture""" + service_initializers = { + "GraphScope One": init_graphscope_one_service_servicer, + } -def get_launcher(config: Config): - if config.launcher_type == "hosts": - launcher = LocalLauncher(config) - elif config.launcher_type == "k8s": - launcher = KubernetesClusterLauncher(config) - elif config.launcher_type == "operator": - launcher = OperatorLauncher(config) - else: - raise RuntimeError("Expect hosts, k8s or operator of launcher_type parameter") - return launcher + initializer = service_initializers.get(config.solution) + if initializer is None: + raise RuntimeError( + f"Expect {service_initializers.keys()} of solution parameter" + ) + return initializer(config) -def start_server(launcher, config: Config): - coordinator_service_servicer = CoordinatorServiceServicer( - launcher=launcher, - dangling_timeout_seconds=config.session.dangling_timeout_seconds, - log_level=config.session.log_level, - ) +def start_server( + coordinator_service_servicer: coordinator_service_pb2_grpc.CoordinatorServiceServicer, + config: Config, +): # register gRPC server server = grpc.server( futures.ThreadPoolExecutor(max(4, os.cpu_count() or 1)), @@ -716,19 +143,6 @@ def start_server(launcher, config: Config): server.start() - if config.coordinator.monitor: - try: - Monitor.startServer(config.coordinator.monitor_port, "0.0.0.0") - logger.info( - "Coordinator monitor server listen at 0.0.0.0:%d", - config.coordinator.monitor_port, - ) - except Exception: # noqa: E722, pylint: disable=broad-except - logger.exception( - "Failed to start monitor server 0.0.0.0:%d", - config.coordinator.monitor_port, - ) - # handle SIGTERM signal def terminate(signum, frame): server.stop(True) @@ -741,6 +155,7 @@ def terminate(signum, frame): server.wait_for_termination() except KeyboardInterrupt: coordinator_service_servicer.cleanup() + server.stop(True) if __name__ == "__main__": diff --git a/coordinator/gscoordinator/kubernetes_launcher.py b/coordinator/gscoordinator/kubernetes_launcher.py index 3c48e6a13f3d..305e98d51fe8 100644 --- a/coordinator/gscoordinator/kubernetes_launcher.py +++ b/coordinator/gscoordinator/kubernetes_launcher.py @@ -90,9 +90,11 @@ def __init__(self, config: Config): self._config.kubernetes_launcher.engine.post_setup() launcher_config = config.kubernetes_launcher + # glog level + self._glog_level = parse_as_glog_level(config.log_level) + # Session Config self._num_workers = config.session.num_workers - self._glog_level = parse_as_glog_level(config.session.log_level) self._instance_id = config.session.instance_id self._timeout_seconds = config.session.timeout_seconds self._retry_time_seconds = config.session.retry_time_seconds diff --git a/coordinator/gscoordinator/local_launcher.py b/coordinator/gscoordinator/local_launcher.py index bd1aeed7140e..d4bacaff0af2 100644 --- a/coordinator/gscoordinator/local_launcher.py +++ b/coordinator/gscoordinator/local_launcher.py @@ -59,9 +59,11 @@ def __init__(self, config): vineyard_config = config.vineyard launcher_config = config.hosts_launcher + # glog level + self._glog_level = parse_as_glog_level(config.log_level) + # Session Config self._num_workers = session_config.num_workers - self._glog_level = parse_as_glog_level(session_config.log_level) self._instance_id = session_config.instance_id self._timeout_seconds = session_config.timeout_seconds self._retry_time_seconds = session_config.retry_time_seconds diff --git a/coordinator/gscoordinator/servicer/__init__.py b/coordinator/gscoordinator/servicer/__init__.py new file mode 100644 index 000000000000..9cc0c51d02ed --- /dev/null +++ b/coordinator/gscoordinator/servicer/__init__.py @@ -0,0 +1,19 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from gscoordinator.servicer.graphscope_one.service import * diff --git a/coordinator/gscoordinator/servicer/graphscope_one/__init__.py b/coordinator/gscoordinator/servicer/graphscope_one/__init__.py new file mode 100644 index 000000000000..7c0d26b525f8 --- /dev/null +++ b/coordinator/gscoordinator/servicer/graphscope_one/__init__.py @@ -0,0 +1,17 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/coordinator/gscoordinator/servicer/graphscope_one/service.py b/coordinator/gscoordinator/servicer/graphscope_one/service.py new file mode 100644 index 000000000000..df90481f432b --- /dev/null +++ b/coordinator/gscoordinator/servicer/graphscope_one/service.py @@ -0,0 +1,595 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""GraphScope One Service""" + +import atexit +import base64 +import json +import logging +import os +import pickle +import queue +import random +import re +import sys +import threading +import traceback +from string import ascii_letters + +import grpc +from packaging import version +from simple_parsing import ArgumentParser + +from gscoordinator.io_utils import StdStreamWrapper + +# capture system stdout +sys.stdout = StdStreamWrapper(sys.stdout) +sys.stderr = StdStreamWrapper(sys.stderr) + +from graphscope.config import Config +from graphscope.framework.utils import PipeMerger +from graphscope.framework.utils import i_to_attr +from graphscope.framework.utils import s_to_attr +from graphscope.proto import coordinator_service_pb2_grpc +from graphscope.proto import error_codes_pb2 +from graphscope.proto import message_pb2 +from graphscope.proto import types_pb2 + +from gscoordinator.dag_manager import DAGManager +from gscoordinator.dag_manager import GSEngine +from gscoordinator.kubernetes_launcher import KubernetesClusterLauncher +from gscoordinator.launcher import AbstractLauncher +from gscoordinator.local_launcher import LocalLauncher +from gscoordinator.monitor import Monitor +from gscoordinator.object_manager import InteractiveInstanceManager +from gscoordinator.object_manager import LearningInstanceManager +from gscoordinator.object_manager import ObjectManager +from gscoordinator.op_executor import OperationExecutor +from gscoordinator.operator_launcher import OperatorLauncher +from gscoordinator.utils import catch_unknown_errors +from gscoordinator.utils import check_server_ready +from gscoordinator.utils import create_single_op_dag +from gscoordinator.version import __version__ + +__all__ = ["GraphScopeOneServiceServicer", "init_graphscope_one_service_servicer"] + +logger = logging.getLogger("graphscope") + + +class GraphScopeOneServiceServicer( + coordinator_service_pb2_grpc.CoordinatorServiceServicer +): + """Provides methods that implement functionality of master service server. + Holding: + 1. launcher: the engine launcher. + 2. session_id: the handle for a particular session to engine + 3. object_manager: the object manager for the session + 4. operation_executor: the operation executor for the session + """ + + def __init__( + self, launcher: AbstractLauncher, dangling_timeout_seconds, log_level="INFO" + ): + self._operator_mode = False + + self._object_manager = ObjectManager() + + # only one connection is allowed at the same time + self._connected = False + + # control log fetching + self._streaming_logs = False + self._pipe_merged = PipeMerger(sys.stdout, sys.stderr) + + self._session_id = "session_" + "".join(random.choices(ascii_letters, k=8)) + + # dangling check + self._dangling_timeout_seconds = dangling_timeout_seconds + self._dangling_detecting_timer = None + self._cleanup_instance = False + # the dangling timer should be initialized after the launcher started, + # otherwise there would be a deadlock if `self._launcher.start()` failed. + self._set_dangling_timer(cleanup_instance=True) + + # a lock that protects the coordinator + self._lock = threading.RLock() + atexit.register(self.cleanup) + + self._launcher = launcher + self._launcher.set_session_workspace(self._session_id) + if not self._launcher.start(): + raise RuntimeError("Coordinator launching instance failed.") + + self._operation_executor: OperationExecutor = OperationExecutor( + self._session_id, self._launcher, self._object_manager + ) + + def __del__(self): + self.cleanup() + + @Monitor.connectSession + def ConnectSession(self, request, context): + if self._launcher.analytical_engine_endpoint is not None: + engine_config = self._operation_executor.get_analytical_engine_config() + engine_config.update(self._launcher.get_engine_config()) + host_names = self._launcher.hosts + else: + engine_config = {} + host_names = [] + + # A session is already connected. + if self._connected: + if getattr(request, "reconnect", False): + return message_pb2.ConnectSessionResponse( + session_id=self._session_id, + cluster_type=self._launcher.type(), + num_workers=self._launcher.num_workers, + namespace=self._launcher.get_namespace(), + engine_config=json.dumps(engine_config), + host_names=host_names, + ) + else: + # connect failed, more than one connection at the same time. + context.set_code(grpc.StatusCode.ALREADY_EXISTS) + context.set_details( + "Cannot setup more than one connection at the same time." + ) + return message_pb2.ConnectSessionResponse() + # check version compatibility from client + sv = version.parse(__version__) + cv = version.parse(request.version) + if sv.major != cv.major or sv.minor != cv.minor: + error_msg = f"Version between client and server is inconsistent: {request.version} vs {__version__}" + logger.warning(error_msg) + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + context.set_details(error_msg) + return message_pb2.ConnectSessionResponse() + + # Connect to serving coordinator. + self._connected = True + # Cleanup after timeout seconds + self._dangling_timeout_seconds = request.dangling_timeout_seconds + # other timeout seconds + self._comm_timeout_seconds = getattr(request, "comm_timeout_seconds", 120) + # If true, also delete graphscope instance (such as pods) in closing process + self._cleanup_instance = request.cleanup_instance + + # Session connected, fetch logs via gRPC. + self._streaming_logs = True + sys.stdout.drop(False) + sys.stderr.drop(False) + + return message_pb2.ConnectSessionResponse( + session_id=self._session_id, + cluster_type=self._launcher.type(), + num_workers=self._launcher.num_workers, + namespace=self._launcher.get_namespace(), + engine_config=json.dumps(engine_config), + host_names=host_names, + ) + + @Monitor.closeSession + def CloseSession(self, request, context): + """ + Disconnect session, note that it won't clean up any resources if self._cleanup_instance is False. + """ + if request.session_id != self._session_id: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + context.set_details( + f"Session handle not matched, {request.session_id} versus {self._session_id}" + ) + return message_pb2.CloseSessionResponse() + + self._connected = False + + self.cleanup(cleanup_instance=self._cleanup_instance, is_dangling=False) + if self._cleanup_instance: + self._session_id = None + self._operation_executor = None + + # Session closed, stop streaming logs + sys.stdout.drop(True) + sys.stderr.drop(True) + self._streaming_logs = False + return message_pb2.CloseSessionResponse() + + def HeartBeat(self, request, context): + self._reset_dangling_timer(self._connected, self._cleanup_instance) + # analytical engine + # if self._operation_executor is not None: + # return self._operation_executor.heart_beat(request) + return message_pb2.HeartBeatResponse() + + def RunStep(self, request_iterator, context): + with self._lock: + for response in self.RunStepWrapped(request_iterator, context): + yield response + + def _RunStep(self, request_iterator, context): + from gremlin_python.driver.protocol import GremlinServerError + + # split dag + dag_manager = DAGManager(request_iterator) + loader_op_bodies = {} + + # response list for stream + responses = [ + message_pb2.RunStepResponse(head=message_pb2.RunStepResponseHead()) + ] + + while not dag_manager.empty(): + run_dag_on, dag, dag_bodies = dag_manager.next_dag() + error_code = error_codes_pb2.COORDINATOR_INTERNAL_ERROR + head, bodies = None, None + try: + # run on analytical engine + if run_dag_on == GSEngine.analytical_engine: + # need dag_bodies to load graph from pandas/numpy + error_code = error_codes_pb2.ANALYTICAL_ENGINE_INTERNAL_ERROR + head, bodies = self._operation_executor.run_on_analytical_engine( + dag, dag_bodies, loader_op_bodies + ) + # run on interactive engine + elif run_dag_on == GSEngine.interactive_engine: + error_code = error_codes_pb2.INTERACTIVE_ENGINE_INTERNAL_ERROR + head, bodies = self._operation_executor.run_on_interactive_engine( + dag + ) + # run on learning engine + elif run_dag_on == GSEngine.learning_engine: + error_code = error_codes_pb2.LEARNING_ENGINE_INTERNAL_ERROR + head, bodies = self._operation_executor.run_on_learning_engine(dag) + # run on coordinator + elif run_dag_on == GSEngine.coordinator: + error_code = error_codes_pb2.COORDINATOR_INTERNAL_ERROR + head, bodies = self._operation_executor.run_on_coordinator( + dag, dag_bodies, loader_op_bodies + ) + # merge the responses + responses[0].head.results.extend(head.head.results) + responses.extend(bodies) + + except grpc.RpcError as exc: + # Not raised by graphscope, maybe socket closed, etc. + context.set_code(exc.code()) + context.set_details( + f"{exc.details()}. The traceback is: {traceback.format_exc()}" + ) + + # stop yield to raise the grpc errors to the client, as the + # client may consume nothing if error happens + return + + except GremlinServerError as exc: + response_head = responses[0] + response_head.head.code = error_code + response_head.head.error_msg = f"Error occurred during RunStep. The traceback is: {traceback.format_exc()}" + if hasattr(exc, "status_message"): + exc_message = exc.status_message + else: + exc_message = str(exc) + response_head.head.full_exception = pickle.dumps( + ( + GremlinServerError, + { + "code": exc.status_code, + "message": exc_message, + "attributes": exc.status_attributes, + }, + ) + ) + # stop iteration to propagate the error to client immediately + break + + except Exception as exc: + response_head = responses[0] + response_head.head.code = error_code + response_head.head.error_msg = f"Error occurred during RunStep, The traceback is: {traceback.format_exc()}" + response_head.head.full_exception = pickle.dumps(exc) + + # stop iteration to propagate the error to client immediately + break + + for response in responses: + yield response + + RunStepWrapped = catch_unknown_errors( + message_pb2.RunStepResponse(head=message_pb2.RunStepResponseHead()), True + )(_RunStep) + + def FetchLogs(self, request, context): + while self._streaming_logs: + try: + info_message, error_message = self._pipe_merged.poll(timeout=2) + except queue.Empty: + info_message, error_message = "", "" + except Exception as e: + info_message, error_message = ( + f"WARNING: failed to read log: {e}. The traceback is: {traceback.format_exc()}", + "", + ) + + if info_message or error_message: + if self._streaming_logs: + yield message_pb2.FetchLogsResponse( + info_message=info_message, error_message=error_message + ) + + def AddLib(self, request, context): + try: + self._operation_executor.add_lib(request) + except Exception as e: + context.abort(grpc.StatusCode.ABORTED, str(e)) + return message_pb2.AddLibResponse() + + def CreateAnalyticalInstance(self, request, context): + engine_config = {} + try: + # create GAE rpc service + self._launcher.create_analytical_instance() + engine_config = self._operation_executor.get_analytical_engine_config() + engine_config.update(self._launcher.get_engine_config()) + except NotImplementedError: + # TODO: This is a workaround for that we launching gae unconditionally after session connects, + # make it an error when above logic has been changed. + logger.warning("Analytical engine is not enabled.") + except grpc.RpcError as e: + context.set_code(e.code()) + context.set_details( + f"Get engine config failed: {e.details()}. The traceback is: {traceback.format_exc()}" + ) + return message_pb2.CreateAnalyticalInstanceResponse() + except Exception as e: + context.abort( + grpc.StatusCode.ABORTED, + f"${e}. The traceback is: {traceback.format_exc()}", + ) + return message_pb2.CreateAnalyticalInstanceResponse() + return message_pb2.CreateAnalyticalInstanceResponse( + engine_config=json.dumps(engine_config), + host_names=self._launcher.hosts, + ) + + def CreateInteractiveInstance(self, request, context): + def _match_frontend_endpoint(pattern, lines): + for line in lines.split("\n"): + rlt = re.findall(pattern, line) + if rlt: + return rlt[0].strip() + return "" + + # frontend endpoint pattern + FRONTEND_GREMLIN_PATTERN = re.compile("(?<=FRONTEND_GREMLIN_ENDPOINT:).*$") + FRONTEND_CYPHER_PATTERN = re.compile("(?<=FRONTEND_CYPHER_ENDPOINT:).*$") + # frontend external endpoint, for clients that are outside of cluster to connect + # only available in kubernetes mode, exposed by NodePort or LoadBalancer + FRONTEND_EXTERNAL_GREMLIN_PATTERN = re.compile( + "(?<=FRONTEND_EXTERNAL_GREMLIN_ENDPOINT:).*$" + ) + FRONTEND_EXTERNAL_CYPHER_PATTERN = re.compile( + "(?<=FRONTEND_EXTERNAL_CYPHER_ENDPOINT:).*$" + ) + + # create instance + object_id = request.object_id + schema_path = request.schema_path + params = request.params + with_cypher = request.with_cypher + try: + proc = self._launcher.create_interactive_instance( + object_id, schema_path, params, with_cypher + ) + gie_manager = InteractiveInstanceManager(object_id) + # Put it to object_manager to ensure it could be killed during coordinator cleanup + # If coordinator is shutdown by force when creating interactive instance + self._object_manager.put(object_id, gie_manager) + # 60 seconds is enough, see also GH#1024; try 120 + # already add errs to outs + outs, _ = proc.communicate(timeout=120) # throws TimeoutError + return_code = proc.poll() + if return_code != 0: + raise RuntimeError(f"Error code: {return_code}, message {outs}") + # match frontend endpoints and check for ready + gremlin_endpoint = _match_frontend_endpoint(FRONTEND_GREMLIN_PATTERN, outs) + cypher_endpoint = _match_frontend_endpoint(FRONTEND_CYPHER_PATTERN, outs) + logger.debug("Got endpoints: %s %s", gremlin_endpoint, cypher_endpoint) + # coordinator use internal endpoint + gie_manager.set_endpoint(gremlin_endpoint) + if check_server_ready(gremlin_endpoint, server="gremlin"): + logger.info( + "Built interactive gremlin frontend: %s for graph %ld", + gremlin_endpoint, + object_id, + ) + + if with_cypher and check_server_ready( + cypher_endpoint, server="cypher" + ): # throws TimeoutError + logger.info( + "Built interactive cypher frontend: %s for graph %ld", + cypher_endpoint, + object_id, + ) + except Exception as e: + context.set_code(grpc.StatusCode.ABORTED) + context.set_details( + f"Create interactive instance failed: ${e}. The traceback is: {traceback.format_exc()}" + ) + self._launcher.close_interactive_instance(object_id) + self._object_manager.pop(object_id) + return message_pb2.CreateInteractiveInstanceResponse() + external_gremlin_endpoint = _match_frontend_endpoint( + FRONTEND_EXTERNAL_GREMLIN_PATTERN, outs + ) + external_cypher_endpoint = _match_frontend_endpoint( + FRONTEND_EXTERNAL_CYPHER_PATTERN, outs + ) + logger.debug( + "Got external endpoints: %s %s", + external_gremlin_endpoint, + external_cypher_endpoint, + ) + + # client use external endpoint (k8s mode), or internal endpoint (standalone mode) + gremlin_endpoint = external_gremlin_endpoint or gremlin_endpoint + cypher_endpoint = external_cypher_endpoint or cypher_endpoint + return message_pb2.CreateInteractiveInstanceResponse( + gremlin_endpoint=gremlin_endpoint, + cypher_endpoint=cypher_endpoint, + object_id=object_id, + ) + + def CreateLearningInstance(self, request, context): + object_id = request.object_id + logger.info("Create learning instance with object id %ld", object_id) + handle, config, learning_backend = ( + request.handle, + request.config, + request.learning_backend, + ) + try: + endpoints = self._launcher.create_learning_instance( + object_id, handle, config, learning_backend + ) + self._object_manager.put(object_id, LearningInstanceManager(object_id)) + except Exception as e: + context.set_code(grpc.StatusCode.ABORTED) + context.set_details( + f"Create learning instance failed: ${e}. The traceback is: {traceback.format_exc()}" + ) + self._launcher.close_learning_instance(object_id) + self._object_manager.pop(object_id) + return message_pb2.CreateLearningInstanceResponse() + return message_pb2.CreateLearningInstanceResponse( + object_id=object_id, handle=handle, config=config, endpoints=endpoints + ) + + def CloseAnalyticalInstance(self, request, context): + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("CloseAnalyticalInstance is not implemented") + return message_pb2.CloseAnalyticalInstanceResponse() + + def CloseInteractiveInstance(self, request, context): + object_id = request.object_id + if object_id in self._object_manager: + self._object_manager.pop(object_id) + try: + self._launcher.close_interactive_instance(object_id) + except Exception as e: + context.set_code(grpc.StatusCode.ABORTED) + context.set_details( + f"Close interactive instance failed: ${e}. The traceback is: {traceback.format_exc()}" + ) + return message_pb2.CloseInteractiveInstanceResponse() + + def CloseLearningInstance(self, request, context): + object_id = request.object_id + if object_id in self._object_manager: + self._object_manager.pop(object_id) + logger.info("Close learning instance with object id %ld", object_id) + try: + self._launcher.close_learning_instance(object_id) + except Exception as e: + context.set_code(grpc.StatusCode.ABORTED) + context.set_details( + f"Close learning instance failed: ${e}. The traceback is: {traceback.format_exc()}" + ) + return message_pb2.CloseLearningInstanceResponse() + + @Monitor.cleanup + def cleanup(self, cleanup_instance=True, is_dangling=False): + # clean up session resources. + logger.info( + "Clean up resources, cleanup_instance: %s, is_dangling: %s", + cleanup_instance, + is_dangling, + ) + for _, obj in self._object_manager.items(): + op_type, config = None, {} + if obj.type == "app": + op_type = types_pb2.UNLOAD_APP + config[types_pb2.APP_NAME] = s_to_attr(obj.key) + elif obj.type == "graph": + op_type = types_pb2.UNLOAD_GRAPH + config[types_pb2.GRAPH_NAME] = s_to_attr(obj.key) + # dynamic graph doesn't have a object id + if obj.object_id != -1: + config[types_pb2.VINEYARD_ID] = i_to_attr(obj.object_id) + elif obj.type == "gie_manager": + self._launcher.close_interactive_instance(obj.object_id) + elif obj.type == "gle_manager": + self._launcher.close_learning_instance(obj.object_id) + + if op_type is not None: + dag_def = create_single_op_dag(op_type, config) + try: + self._operation_executor.run_step(dag_def, []) + except grpc.RpcError as e: + logger.error( + "Cleanup failed, code: %s, details: %s. The traceback is: %s", + e.code().name, + e.details(), + traceback.format_exc(), + ) + + self._object_manager.clear() + self._cancel_dangling_timer() + + if cleanup_instance: + self._launcher.stop(is_dangling=is_dangling) + + def _set_dangling_timer(self, cleanup_instance: bool): + if self._dangling_timeout_seconds > 0: + self._dangling_detecting_timer = threading.Timer( + interval=self._dangling_timeout_seconds, + function=self.cleanup, + args=( + cleanup_instance, + True, + ), + ) + self._dangling_detecting_timer.start() + + def _cancel_dangling_timer(self): + if self._dangling_detecting_timer is not None: + self._dangling_detecting_timer.cancel() + self._dangling_detecting_timer = None + + def _reset_dangling_timer(self, reset: bool, cleanup_instance: bool): + if reset: + self._cancel_dangling_timer() + self._set_dangling_timer(cleanup_instance) + + +def init_graphscope_one_service_servicer(config: Config): + type2launcher = { + "hosts": LocalLauncher, + "k8s": KubernetesClusterLauncher, + "operator": OperatorLauncher, + } + + launcher = type2launcher.get(config.launcher_type) + if launcher is None: + raise RuntimeError(f"Expect {type2launcher.keys()} of launcher_type parameter") + + return GraphScopeOneServiceServicer( + launcher=launcher(config), + dangling_timeout_seconds=config.session.dangling_timeout_seconds, + log_level=config.log_level, + ) diff --git a/coordinator/gscoordinator/utils.py b/coordinator/gscoordinator/utils.py index 4c7bb438510f..f39d99f2b036 100644 --- a/coordinator/gscoordinator/utils.py +++ b/coordinator/gscoordinator/utils.py @@ -19,6 +19,7 @@ import copy import datetime +import functools import glob import hashlib import inspect @@ -30,6 +31,7 @@ import subprocess import sys import time +import traceback import uuid import zipfile from concurrent.futures import ThreadPoolExecutor @@ -38,6 +40,7 @@ from string import Template from typing import List +import grpc import yaml from google.protobuf.any_pb2 import Any from graphscope.framework import utils @@ -182,6 +185,36 @@ INTERACTIVE_ENGINE_THREADS_PER_WORKER = 2 +def catch_unknown_errors(response_on_error=None, using_yield=False): + """A catcher that catches all (unknown) exceptions in gRPC handlers to ensure + the client not think the coordinator services is crashed. + """ + + def catch_exceptions(handler): + @functools.wraps(handler) + def handler_execution(self, request, context): + try: + if using_yield: + for result in handler(self, request, context): + yield result + else: + yield handler(self, request, context) + except Exception as exc: + error_message = repr(exc) + error_traceback = traceback.format_exc() + context.set_code(grpc.StatusCode.ABORTED) + context.set_details( + 'Error occurs in handler: "%s", with traceback: ' % error_message + + error_traceback + ) + if response_on_error is not None: + yield response_on_error + + return handler_execution + + return catch_exceptions + + def get_timestamp() -> float: return datetime.datetime.timestamp(datetime.datetime.now()) diff --git a/docs/utilities/gs.md b/docs/utilities/gs.md index f5f9034fea7a..1b6dced3e10e 100644 --- a/docs/utilities/gs.md +++ b/docs/utilities/gs.md @@ -23,11 +23,28 @@ This will install `gsctl` in an editable mode, which means that any changes you ## Commands -With `gsctl`, you can do the following things. Always remember to - use `--help` on a command to get more information. +With `gsctl`, you can do the following things. Always remember to use `--help` on a command to get more information. + +The `gsctl` command-line utility supports two modes of operation: utility scripts and client/server mode. You can switch between these modes using the +`gsctl connect` and `gsctl close` commands. + +### Utility Scripts + +Default, the `gsctl` provide helper functions and utilities that can be run using gsctl alone. +`gsctl` acts as the command-line entrypoint for GraphScope. Some examples of utility scripts are: - `gsctl install-deps`, install dependencies for building GraphScope. - `gsctl make`, build GraphScope executable binaries and artifacts. - `gsctl make-image`, build GraphScope docker images. - `gsctl test`, trigger test suites. +- `gsctl connect`, connect to the launched coordinator by ~/.gs/config. +- `gsctl close`, Close the connection from the coordinator. + + +### Client/Server Mode + +To switch to the client/server mode, use the `gsctl connect` command. This command connects gsctl to a launched coordinator using the configuration file located at ~/.gsconfig. +Once connected, you can use `gsctl` to communicate with the coordinator and send commands that will be executed on the coordinator side. +To disconnect from the coordinator and switch back to the utility scripts mode, you can use the `gsctl close` command. This command closes the connection from the coordinator +and allows you to use `gsctl` as a standalone utility again. diff --git a/proto/coordinator_service.proto b/proto/coordinator_service.proto index 038709bd66ff..1d5ed680f8d5 100644 --- a/proto/coordinator_service.proto +++ b/proto/coordinator_service.proto @@ -48,4 +48,7 @@ service CoordinatorService { rpc CloseInteractiveInstance (CloseInteractiveInstanceRequest) returns (CloseInteractiveInstanceResponse); rpc CloseLearningInstance (CloseLearningInstanceRequest) returns (CloseLearningInstanceResponse); + + // service functions under FLEX architecture + rpc Connect(ConnectRequest) returns (ConnectResponse); } diff --git a/proto/message.proto b/proto/message.proto index 2a7bbf6ab54b..caf6859709d7 100644 --- a/proto/message.proto +++ b/proto/message.proto @@ -246,3 +246,21 @@ message CloseLearningInstanceRequest { message CloseLearningInstanceResponse { }; + + +//////////////////////////////////////////////////////////////////////////////// +// +// Protos under the FLEX architecture +// +//////////////////////////////////////////////////////////////////////////////// + +message ConnectRequest { + // check version compatibility + string version = 1; +} + +message ConnectResponse { + // We use the solution encompasses the various applications and use cases of the product + // across different industries and business scenarios, e.g. interactive, graphscope insight + string solution = 1; +} diff --git a/python/graphscope/client/utils.py b/python/graphscope/client/utils.py index 53672115222c..a6e25021c40d 100644 --- a/python/graphscope/client/utils.py +++ b/python/graphscope/client/utils.py @@ -280,8 +280,8 @@ def init(): @staticmethod def update(): - if gs_config.session.show_log: - log_level = gs_config.session.log_level + if gs_config.show_log: + log_level = gs_config.log_level else: log_level = logging.ERROR if isinstance(log_level, str): diff --git a/python/graphscope/config.py b/python/graphscope/config.py index de86c90d68bb..c9ef280ad2ff 100644 --- a/python/graphscope/config.py +++ b/python/graphscope/config.py @@ -309,9 +309,6 @@ class SessionConfig: reconnect: bool = False # Connect to an existed GraphScope Cluster instance_id: Union[str, None] = None # Unique id for each GraphScope instance. - show_log: bool = False # Show log or not. - log_level: str = "info" # Log level, choose from 'info' or 'debug'. - # The length of time to wait before giving up launching graphscope.z timeout_seconds: int = 600 # The length of time to wait starting from client disconnected before killing the graphscope instance. @@ -325,9 +322,17 @@ class SessionConfig: @dataclass class Config(Serializable): + # Solution under the FLEX architecture, choose from 'GraphScope One', 'Interactive' or 'GraphScope Insight' + solution: str = "GraphScope One" + # Launcher type, choose from 'hosts', 'k8s' or 'operator'. launcher_type: str = "k8s" + # Show log or not. + show_log: bool = False + # Log level, choose from 'info' or 'debug'. + log_level: str = "info" + session: SessionConfig = field(default_factory=SessionConfig) # Coordinator configuration. @@ -424,9 +429,9 @@ def set_option(self, key, value): # noqa: C901 self.session.num_workers = value self.session.default_local_num_workers = value elif key == "show_log": - self.session.show_log = value + self.show_log = value elif key == "log_level": - self.session.log_level = value + self.log_level = value elif key == "timeout_seconds": self.session.timeout_seconds = value elif key == "dangling_timeout_seconds": diff --git a/python/graphscope/deploy/hosts/cluster.py b/python/graphscope/deploy/hosts/cluster.py index e9714d5c9b2c..1f3affb14cc6 100644 --- a/python/graphscope/deploy/hosts/cluster.py +++ b/python/graphscope/deploy/hosts/cluster.py @@ -106,7 +106,7 @@ def _launch_coordinator(self): bufsize=1, ) stdout_watcher = PipeWatcher(process.stdout, sys.stdout) - if not self._config.session.show_log: + if not self._config.show_log: stdout_watcher.add_filter( lambda line: "Loading" in line and "it/s]" in line ) diff --git a/python/graphscope/gsctl/__init__.py b/python/graphscope/gsctl/__init__.py index e69de29bb2d1..895287b8d746 100644 --- a/python/graphscope/gsctl/__init__.py +++ b/python/graphscope/gsctl/__init__.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/graphscope/gsctl/commands/__init__.py b/python/graphscope/gsctl/commands/__init__.py new file mode 100644 index 000000000000..29ab31f2d6ab --- /dev/null +++ b/python/graphscope/gsctl/commands/__init__.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from graphscope.gsctl.commands.utils import * diff --git a/python/graphscope/gsctl/commands/common_command.py b/python/graphscope/gsctl/commands/common_command.py new file mode 100644 index 000000000000..ff3a02a4a3e2 --- /dev/null +++ b/python/graphscope/gsctl/commands/common_command.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Group of commands used by all products under the FLEX architecture""" + +import click + +from graphscope.gsctl.config import Context +from graphscope.gsctl.config import load_gs_config +from graphscope.gsctl.rpc import get_grpc_client + + +@click.group() +def cli(): + # nothing happens + pass + + +@click.command() +@click.option( + "--coordinator-endpoint", + help="Coordinator endpoint which gsctl connect to, e.g. http://127.0.0.1:9527", +) +def connect(coordinator_endpoint): + """Connect to the launched coordinator by ~/.graphscope/config. If '--coordinator-endpoint' is specified, + use it as the current context and override the config file. + """ + if coordinator_endpoint is not None: + click.secho( + f"Connect to the coordinator at {coordinator_endpoint}.", fg="green" + ) + + grpc_client = get_grpc_client(coordinator_endpoint) + solution = grpc_client.connect() + + if coordinator_endpoint is not None: + context = Context(solution=solution, coordinator_endpoint=coordinator_endpoint) + config = load_gs_config() + config.set_and_write(context) + + click.secho("Coordinator service connected.", fg="green") + + +@click.command() +def close(): + """Close the connection from the coordinator.""" + config = load_gs_config() + + current_context = config.current_context() + if current_context is None: + return + + config.remove_and_write(current_context) + click.secho(f"Disconnect from the {current_context.to_dict()}.", fg="green") + + +cli.add_command(connect) +cli.add_command(close) + + +if __name__ == "__main__": + cli() diff --git a/python/graphscope/gsctl/commands/dev_command.py b/python/graphscope/gsctl/commands/dev_command.py new file mode 100644 index 000000000000..8df7e70b59ae --- /dev/null +++ b/python/graphscope/gsctl/commands/dev_command.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Group of commands for GraphScope development""" + +import io +import os +import subprocess + +import click + +scripts_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "scripts") +install_deps_script = os.path.join(scripts_dir, "install_deps_command.sh") +make_script = os.path.join(scripts_dir, "make_command.sh") +make_image_script = os.path.join(scripts_dir, "make_image_command.sh") +test_script = os.path.join(scripts_dir, "test_command.sh") + + +def run_shell_cmd(cmd, workingdir): + """wrapper function to run a shell command/scripts.""" + click.echo(f"run a shell command on cwd={workingdir}. \ncmd=\"{' '.join(cmd)}\"") + proc = subprocess.Popen( + cmd, cwd=workingdir, env=os.environ.copy(), stdout=subprocess.PIPE + ) + for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"): + print(line.rstrip()) + + +@click.group() +def cli(): + # nothing happens + pass + + +@click.command() +@click.argument( + "type", + type=click.Choice( + ["dev", "client"], + case_sensitive=False, + ), + required=True, +) +@click.option( + "--graphscope-repo", + envvar="GRAPHSCOPE_REPO", + type=click.Path(), + default=os.path.abspath("."), + show_default=True, + help="GraphScope code repo location.", +) +@click.option( + "--cn", + is_flag=True, + default=False, + help="Whether to use CN located mirrors to speed up download.", +) +@click.option( + "--install-prefix", + type=click.Path(), + default="/opt/graphscope", + show_default=True, + help="Install built binaries to customized location.", +) +@click.option( + "--from-local", + type=click.Path(), + default="/tmp/gs-local-deps", + show_default=True, + help="""Find raw dependencies of GraphScope from a local directory. The raw + dependencies would then be built and installed to [prefix]. If the directory + is empty or not exists, dependency files would be downloaded to [directory].""", +) +@click.option( + "--v6d-version", + default="main", + show_default=True, + help="v6d version to clone.", +) +@click.option( + "-j", + "--jobs", + default="2", + help="Concurrent jobs in building, i.e., -j argument passed to make.", +) +@click.option( + "--for-analytical", + is_flag=True, + default=False, + help="Only install analytical engine dependencies.", +) +@click.option( + "--no-v6d", + is_flag=True, + default=False, + help="Do not install v6d, for build base docker images, could only be used with '--for-analytical'", +) +def install_deps( + type, + graphscope_repo, + cn, + install_prefix, + from_local, + v6d_version, + jobs, + for_analytical, + no_v6d, +): + """Install dependencies for building GraphScope.""" + cmd = [ + "bash", + install_deps_script, + "-t", + type, + "-i", + install_prefix, + "-d", + from_local, + "-v", + str(v6d_version), + "-j", + str(jobs), + ] + if for_analytical: + cmd.append("--for-analytical") + if no_v6d: + if not for_analytical: + # could only be used with '--for-analytical' + raise RuntimeError("Missing --for-analytical with --no-v6d parameter") + cmd.append("--no-v6d") + if cn: + cmd.append("--cn") + run_shell_cmd(cmd, graphscope_repo) + + +@click.command() +@click.argument( + "component", + type=click.Choice( + [ + "interactive", + "interactive-install", + "analytical", + "analytical-java-install", + "analytical-install", + "learning", + "learning-install", + "coordinator", + "client", + "clean", + "all", + ], + case_sensitive=False, + ), + required=False, +) +@click.option( + "--graphscope-repo", + envvar="GRAPHSCOPE_REPO", + type=click.Path(), + default=os.path.abspath("."), + show_default=True, + help="GraphScope code repo location.", +) +@click.option( + "--install-prefix", + type=click.Path(), + default="/opt/graphscope", + show_default=True, + help="Install built binaries to customized location.", +) +@click.option( + "--storage-type", + default="default", + help="Make gie with specified storage type.", +) +def make(component, graphscope_repo, install_prefix, storage_type): + """Build executive binaries of COMPONENT. If not given a specific component, build all. + \f + TODO: maybe without make? + """ + click.secho( + "Before making artifacts, please manually source ENVs from ~/.graphscope_env.", + fg="yellow", + ) + click.secho( + f"Begin the make command, to build components [{component}] of GraphScope, with repo = {graphscope_repo}", + fg="green", + ) + if component is None: + component = "all" + + cmd = [ + "bash", + make_script, + "-c", + component, + "-i", + install_prefix, + "-s", + storage_type, + ] + run_shell_cmd(cmd, graphscope_repo) + + +@click.command() +@click.argument( + "component", + type=click.Choice( + [ + "all", + "graphscope-dev", + "coordinator", + "analytical", + "analytical-java", + "interactive", + "interactive-frontend", + "interactive-executor", + "learning,", + "vineyard-dev", + "vineyard-runtime", + "manylinux2014-ext", + ], + case_sensitive=False, + ), + required=False, +) +@click.option( + "--graphscope-repo", + envvar="GRAPHSCOPE_REPO", + type=click.Path(), + default=os.path.abspath("."), + show_default=True, + help="GraphScope code repo location.", +) +@click.option( + "--tag", + default="latest", + show_default=True, + help="image tag name to build", +) +@click.option( + "--registry", + default="registry.cn-hongkong.aliyuncs.com", + show_default=True, + help="registry name", +) +def make_image(component, graphscope_repo, registry, tag): + """Make docker images from source code for deployment. + \f + TODO: fulfill this. + """ + if component is None: + component = "all" + + cmd = ["bash", make_image_script, "-c", component, "-r", registry, "-t", tag] + run_shell_cmd(cmd, graphscope_repo) + + +@click.command() +@click.argument( + "type", + type=click.Choice( + [ + "analytical", + "analytical-java", + "interactive", + "learning", + "local-e2e", + "k8s-e2e", + "groot", + ], + case_sensitive=False, + ), + required=False, +) +@click.option( + "--graphscope-repo", + envvar="GRAPHSCOPE_REPO", + type=click.Path(), + default=os.path.abspath("."), + show_default=True, + help="GraphScope code repo location.", +) +@click.option( + "--testdata", + type=click.Path(), + default="/tmp/gstest", + show_default=True, + help="""assign a custom test data location. This could be cloned from + https://github.com/graphscope/gstest""", +) +@click.option( + "--local", + is_flag=True, + default=False, + help="Run local tests", +) +@click.option( + "--storage-type", + default="default", + show_default=True, + help="test gie with specified storage type", +) +@click.option( + "--k8s", + is_flag=True, + default=False, + help="Run local tests", +) +@click.option( + "--nx", + is_flag=True, + default=False, + help="Run nx tests", +) +def test(type, graphscope_repo, testdata, local, storage_type, k8s, nx): + """Trigger tests on built artifacts. + + \f + TODO: fulfill this.""" + click.secho(f"graphscope_repo = {graphscope_repo}", fg="green") + click.echo("test") + if type is None: + type = "" + cmd = [ + "bash", + test_script, + "-t", + type, + "-d", + testdata, + "-l", + str(local), + "-s", + storage_type, + "-k", + str(k8s), + "-n", + str(nx), + ] + run_shell_cmd(cmd, graphscope_repo) + + +cli.add_command(install_deps) +cli.add_command(make) +cli.add_command(make_image) +cli.add_command(test) + +if __name__ == "__main__": + cli() diff --git a/python/graphscope/gsctl/commands/utils.py b/python/graphscope/gsctl/commands/utils.py new file mode 100644 index 000000000000..711a7b32e7f5 --- /dev/null +++ b/python/graphscope/gsctl/commands/utils.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import click + +from graphscope.gsctl.commands.common_command import cli as common_cli +from graphscope.gsctl.commands.dev_command import cli as dev_cli +from graphscope.gsctl.config import Context + + +def get_command_collection(context: Context): + if context is None: + # treat gsctl as an utility script, providing hepler functions or utilities. e.g. + # initialize and manage cluster, install the dependencies required to build graphscope locally + commands = click.CommandCollection(sources=[common_cli, dev_cli]) + + elif context.solution == "interactive": + commands = click.CommandCollection(sources=[common_cli]) + + else: + raise RuntimeError( + f"Failed to get command collection with context {context.name}" + ) + + return commands diff --git a/python/graphscope/gsctl/config.py b/python/graphscope/gsctl/config.py new file mode 100644 index 000000000000..e9357c0b4919 --- /dev/null +++ b/python/graphscope/gsctl/config.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Global configuration""" + +import os +import random +from string import ascii_letters + +import yaml + +GS_CONFIG_DEFAULT_LOCATION = os.environ.get( + "GSCONFIG", os.path.expanduser("~/.graphscope/config") +) + + +class Context(object): + def __init__(self, solution, coordinator_endpoint, name=None): + self.supported_solutions = ["interactive"] + if solution not in self.supported_solutions: + raise RuntimeError( + "The solution {0} in context {1} is not supported yet.".format( + solution, name + ) + ) + + if name is None: + name = "context_" + "".join(random.choices(ascii_letters, k=6)) + + self.name = name + self.solution = solution + self.coordinator_endpoint = coordinator_endpoint + + def to_dict(self): + return { + "name": self.name, + "solution": self.solution, + "coordinator_endpoint": self.coordinator_endpoint, + } + + +class GSConfig(object): + def __init__(self, contexts, current_context: str): + self._contexts = contexts + self._current_context = current_context + + def current_context(self) -> Context: + if self._current_context is None: + return None + if self._current_context not in self._contexts.keys(): + raise RuntimeError( + f"Failed to get current context: {self._current_context}" + ) + return self._contexts[self._current_context] + + def set_and_write(self, context: Context): + # treat the same endpoint as the same coordinator + for _, v in self._contexts.items(): + if ( + context.coordinator_endpoint == v.coordinator_endpoint + and context.solution == v.solution + ): + return + + # set + self._current_context = context.name + self._contexts[context.name] = context + + # write + contexts = [v.to_dict() for _, v in self._contexts.items()] + with open(GS_CONFIG_DEFAULT_LOCATION, "w") as file: + yaml.dump( + {"contexts": contexts, "current-context": self._current_context}, file + ) + + def remove_and_write(self, current_context: Context): + # remove + del self._contexts[current_context.name] + self._current_context = None + + # write + contexts = [v.to_dict() for _, v in self._contexts.items()] + with open(GS_CONFIG_DEFAULT_LOCATION, "w") as file: + yaml.dump( + {"contexts": contexts, "current-context": self._current_context}, file + ) + + +class GSConfigLoader(object): + def __init__(self, config_file): + self._config_file = config_file + + def _parse_config(self, config_dict): + if not config_dict: + return {}, None + + current_context = config_dict["current-context"] + if current_context is None: + return {}, None + + contexts = {} + current_context_exists = False + for c in config_dict["contexts"]: + if current_context == c["name"]: + current_context_exists = True + contexts[c["name"]] = Context( + name=c["name"], + solution=c["solution"], + coordinator_endpoint=c["coordinator_endpoint"], + ) + + if not current_context_exists: + raise RuntimeError( + f"Current context {current_context} is not exists in config file {GS_CONFIG_DEFAULT_LOCATION}" + ) + + return contexts, current_context + + def load_config(self): + config_dict = None + with open(self._config_file, "r") as file: + config_dict = yaml.safe_load(file) + contexts, current_context = self._parse_config(config_dict) + return GSConfig(contexts, current_context) + + +def load_gs_config(): + """Loads cluster and context information from gs-config file + and stores them in Config. + """ + config_file = GS_CONFIG_DEFAULT_LOCATION + + # create config file is not exists + if not os.path.exists(config_file): + workdir = os.path.dirname(config_file) + os.makedirs(workdir, exist_ok=True) + with open(config_file, "w") as file: + yaml.safe_dump({}, file) + + loader = GSConfigLoader(config_file) + return loader.load_config() + + +def get_current_context(): + config = load_gs_config() + return config.current_context() diff --git a/python/graphscope/gsctl/gsctl.py b/python/graphscope/gsctl/gsctl.py index 0c9b04c70a92..21d9dc7a69a7 100644 --- a/python/graphscope/gsctl/gsctl.py +++ b/python/graphscope/gsctl/gsctl.py @@ -16,338 +16,37 @@ # limitations under the License. # -import io import os -import subprocess +import sys import click -scripts_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "scripts") -install_deps_script = os.path.join(scripts_dir, "install_deps_command.sh") -make_script = os.path.join(scripts_dir, "make_command.sh") -make_image_script = os.path.join(scripts_dir, "make_image_command.sh") -test_script = os.path.join(scripts_dir, "test_command.sh") +try: + import graphscope +except ModuleNotFoundError: + # if graphscope is not installed, only basic functions or utilities + # can be used, e.g. install dependencies + graphscope = None -def run_shell_cmd(cmd, workingdir): - """wrapper function to run a shell command/scripts.""" - click.echo(f"run a shell command on cwd={workingdir}. \ncmd=\"{' '.join(cmd)}\"") - proc = subprocess.Popen( - cmd, cwd=workingdir, env=os.environ.copy(), stdout=subprocess.PIPE - ) - for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"): - print(line.rstrip()) +def cli(): + if graphscope is None: + sys.path.insert( + 0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "commands") + ) + from dev_command import cli as dev_cli + dev_cli() -class GSCtl(object): - """GraphScope command-line utility + from graphscope.gsctl.commands import get_command_collection + from graphscope.gsctl.config import get_current_context - This is a context for the utility. - """ + context = get_current_context() + # get the specified commands under the FLEX architecture + commands = get_command_collection(context) + # serve the command + commands() - def __init__(self, repo_home=None, debug=False): - self.home = os.path.abspath(".") - self.debug = debug - - -@click.group() -@click.option( - "--repo-home", - envvar="REPO_HOME", - type=click.Path(), - help="GraphScope code repo location.", -) -@click.pass_context -def cli(ctx, repo_home): - ctx.obj = GSCtl(repo_home) - - -@click.command() -@click.argument( - "type", - type=click.Choice( - ["dev", "client"], - case_sensitive=False, - ), - required=True, -) -@click.option( - "--cn", - is_flag=True, - default=False, - help="Whether to use CN located mirrors to speed up download.", -) -@click.option( - "--install-prefix", - type=click.Path(), - default="/opt/graphscope", - show_default=True, - help="Install built binaries to customized location.", -) -@click.option( - "--from-local", - type=click.Path(), - default="/tmp/gs-local-deps", - show_default=True, - help="""Find raw dependencies of GraphScope from a local directory. The raw - dependencies would then be built and installed to [prefix]. If the directory - is empty or not exists, dependency files would be downloaded to [directory].""", -) -@click.option( - "--v6d-version", - default="main", - show_default=True, - help="v6d version to clone.", -) -@click.option( - "-j", - "--jobs", - default="2", - help="Concurrent jobs in building, i.e., -j argument passed to make.", -) -@click.option( - "--for-analytical", - is_flag=True, - default=False, - help="Only install analytical engine dependencies.", -) -@click.option( - "--no-v6d", - is_flag=True, - default=False, - help="Do not install v6d, for build base docker images, could only be used with '--for-analytical'", -) -@click.pass_obj -def install_deps( - repo, - type, - cn, - install_prefix, - from_local, - v6d_version, - jobs, - for_analytical, - no_v6d, -): - """Install dependencies for building GraphScope.""" - cmd = [ - "bash", - install_deps_script, - "-t", - type, - "-i", - install_prefix, - "-d", - from_local, - "-v", - str(v6d_version), - "-j", - str(jobs), - ] - if for_analytical: - cmd.append("--for-analytical") - if no_v6d: - if not for_analytical: - # could only be used with '--for-analytical' - raise RuntimeError("Missing --for-analytical with --no-v6d parameter") - cmd.append("--no-v6d") - if cn: - cmd.append("--cn") - run_shell_cmd(cmd, repo.home) - - -@click.command() -@click.argument( - "component", - type=click.Choice( - [ - "interactive", - "interactive-install", - "analytical", - "analytical-java-install", - "analytical-install", - "learning", - "learning-install", - "coordinator", - "client", - "clean", - "all", - ], - case_sensitive=False, - ), - required=False, -) -@click.option( - "--install-prefix", - type=click.Path(), - default="/opt/graphscope", - show_default=True, - help="Install built binaries to customized location.", -) -@click.option( - "--storage-type", - default="default", - help="Make gie with specified storage type.", -) -@click.pass_obj -def make(repo, component, install_prefix, storage_type): - """Build executive binaries of COMPONENT. If not given a specific component, build all. - \f - TODO: maybe without make? - """ - click.secho( - "Before making artifacts, please manually source ENVs from ~/.graphscope_env.", - fg="yellow", - ) - click.secho( - f"Begin the make command, to build components [{component}] of GraphScope, with repo = {repo.home}", - fg="green", - ) - if component is None: - component = "all" - - cmd = [ - "bash", - make_script, - "-c", - component, - "-i", - install_prefix, - "-s", - storage_type, - ] - run_shell_cmd(cmd, repo.home) - - -@click.command() -@click.argument( - "component", - type=click.Choice( - [ - "all", - "graphscope-dev", - "coordinator", - "analytical", - "analytical-java", - "interactive", - "interactive-frontend", - "interactive-executor", - "learning,", - "vineyard-dev", - "vineyard-runtime", - "manylinux2014-ext", - ], - case_sensitive=False, - ), - required=False, -) -@click.option( - "--tag", - default="latest", - show_default=True, - help="image tag name to build", -) -@click.option( - "--registry", - default="registry.cn-hongkong.aliyuncs.com", - show_default=True, - help="registry name", -) -@click.pass_obj -def make_image(repo, component, registry, tag): - """Make docker images from source code for deployment. - \f - TODO: fulfill this. - """ - if component is None: - component = "all" - - cmd = ["bash", make_image_script, "-c", component, "-r", registry, "-t", tag] - run_shell_cmd(cmd, repo.home) - - -@click.command() -@click.argument( - "type", - type=click.Choice( - [ - "analytical", - "analytical-java", - "interactive", - "learning", - "local-e2e", - "k8s-e2e", - "groot", - ], - case_sensitive=False, - ), - required=False, -) -@click.option( - "--testdata", - type=click.Path(), - default="/tmp/gstest", - show_default=True, - help="""assign a custom test data location. This could be cloned from - https://github.com/graphscope/gstest""", -) -@click.option( - "--local", - is_flag=True, - default=False, - help="Run local tests", -) -@click.option( - "--storage-type", - default="default", - show_default=True, - help="test gie with specified storage type", -) -@click.option( - "--k8s", - is_flag=True, - default=False, - help="Run local tests", -) -@click.option( - "--nx", - is_flag=True, - default=False, - help="Run nx tests", -) -@click.pass_obj -def test(repo, type, testdata, local, storage_type, k8s, nx): - """Trigger tests on built artifacts. - - \f - TODO: fulfill this.""" - click.secho(f"repo.home = {repo.home}", fg="green") - click.echo("test") - if type is None: - type = "" - cmd = [ - "bash", - test_script, - "-t", - type, - "-d", - testdata, - "-l", - str(local), - "-s", - storage_type, - "-k", - str(k8s), - "-n", - str(nx), - ] - run_shell_cmd(cmd, repo.home) - - -cli.add_command(install_deps) -cli.add_command(make) -cli.add_command(make_image) -cli.add_command(test) if __name__ == "__main__": cli() diff --git a/python/graphscope/gsctl/rpc.py b/python/graphscope/gsctl/rpc.py new file mode 100644 index 000000000000..f77d0fe58bae --- /dev/null +++ b/python/graphscope/gsctl/rpc.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import atexit +import time + +import click +import grpc + +from graphscope.client.utils import GS_GRPC_MAX_MESSAGE_LENGTH +from graphscope.client.utils import handle_grpc_error +from graphscope.gsctl.config import get_current_context +from graphscope.proto import coordinator_service_pb2_grpc +from graphscope.proto import message_pb2 +from graphscope.version import __version__ + + +class GRPCClient(object): + def __init__(self, endpoint): + # create the grpc stub + options = [ + ("grpc.max_send_message_length", GS_GRPC_MAX_MESSAGE_LENGTH), + ("grpc.max_receive_message_length", GS_GRPC_MAX_MESSAGE_LENGTH), + ("grpc.max_metadata_size", GS_GRPC_MAX_MESSAGE_LENGTH), + ] + self._channel = grpc.insecure_channel(endpoint, options=options) + self._stub = coordinator_service_pb2_grpc.CoordinatorServiceStub(self._channel) + + atexit.register(self.close) + + @handle_grpc_error + def _connect_impl(self, timeout_seconds): + begin_time = time.time() + + request = message_pb2.ConnectRequest(version=__version__) + while True: + try: + response = self._stub.Connect(request) + break + except Exception as e: + msg = f"code: {e.code().name}, details: {e.details()}" + click.secho( + f"Failed to connect coordinator: {e}, try after second...", + fg="yellow", + ) + if time.time() - begin_time >= timeout_seconds: + raise ConnectionError(f"Connect coordinator timeout, {msg}") + time.sleep(1) + + return response.solution + + def connect(self, timeout_seconds=10): + return self._connect_impl(timeout_seconds) + + def close(self): + try: + self._channel.close() + except: # noqa: E722 + pass + + +def get_grpc_client(coordinator_endpoint=None): + if coordinator_endpoint is not None: + return GRPCClient(coordinator_endpoint) + + # use the latest context in config file + current_context = get_current_context() + if current_context is None: + raise RuntimeError( + "No available context found, please connect to a launched coordinator first." + ) + return GRPCClient(current_context.coordinator_endpoint)