From 52cac825715d7fb0f6b024ee9424066cdb2a1f1a Mon Sep 17 00:00:00 2001 From: Tao He Date: Thu, 21 Sep 2023 19:03:55 +0800 Subject: [PATCH] fix(python): default session needs to be aware of g.set_option(num_workers=...) (#3246) Signed-off-by: Tao He --- python/graphscope/client/session.py | 7 +- python/graphscope/config.py | 99 +++++++++++++++-------------- 2 files changed, 55 insertions(+), 51 deletions(-) diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index 2a96cdafd245..d5ea6d61808d 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -1466,7 +1466,7 @@ def get_default_session() -> Session: def get_session_by_id(handle): """Return the session by handle.""" if handle not in _session_dict: - raise ValueError("Session {} not exists.".format(handle)) + raise ValueError(f"Session {handle} not exists.") return _session_dict.get(handle) @@ -1480,7 +1480,10 @@ def __init__(self): def get_default(self) -> Session: if not self.stack: logger.info("Creating default session ...") - sess = session(cluster_type="hosts", num_workers=1) + sess = session( + cluster_type="hosts", + num_workers=gs_config.session.default_local_num_workers, + ) sess.as_default() return self.stack[-1] diff --git a/python/graphscope/config.py b/python/graphscope/config.py index 1c8e7315470e..bab1fc69b763 100644 --- a/python/graphscope/config.py +++ b/python/graphscope/config.py @@ -40,9 +40,8 @@ class ResourceSpec: """Resource requirements for a container in kubernetes.""" cpu: Union[str, float, None] = None # CPU cores of container. - memory: Union[ - str, None - ] = None # Memory of container, suffix with ['Mi', 'Gi', 'Ti']. + # Memory of container, suffix with ['Mi', 'Gi', 'Ti']. + memory: Union[str, None] = None def as_dict(self): ret = {} @@ -87,12 +86,12 @@ def make_burstable(cpu, memory): class ImageConfig: """Image related stuffs.""" - registry: Union[str, None] = "registry.cn-hongkong.aliyuncs.com" # k8s image registry. + registry: Union[str, None] = "registry.cn-hongkong.aliyuncs.com" repository: str = "graphscope" # k8s image repository. tag: str = __version__ # k8s image tag. - pull_secrets: List[str] = field(default_factory=list) # A list of secrets to pull image. + pull_secrets: List[str] = field(default_factory=list) pull_policy: str = "IfNotPresent" # Kubernetes image pull policy. @@ -114,8 +113,8 @@ class DatasetConfig: """A Dataset container could be shipped with GraphScope in kubernetes.""" enable: bool = False # Mount the aliyun dataset bucket as a volume by ossfs. - proxy: Union[str, None] = None # A json string specifies the dataset proxy info. Available options of proxy: http_proxy, https_proxy, no_proxy. + proxy: Union[str, None] = None @dataclass @@ -123,38 +122,36 @@ class EngineConfig: """Engine configuration""" enabled_engines: str = "gae,gie,gle" # A set of engines to enable. - node_selector: Union[ - str, None - ] = None # Node selector for engine pods, default is None. + # Node selector for engine pods, default is None. + node_selector: Union[str, None] = None enable_gae: bool = True # Enable or disable analytical engine. - enable_gae_java: bool = ( - False # Enable or disable analytical engine with java support. - ) + # Enable or disable analytical engine with java support. + enable_gae_java: bool = False enable_gie: bool = True # Enable or disable interactive engine. enable_gle: bool = True # Enable or disable learning engine. preemptive: bool = True + # Resource for analytical pod gae_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(1, "4Gi") ) - # Resource for analytical pod + # Resource for interactive executor pod gie_executor_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(1, "2Gi") ) - # Resource for interactive executor pod + # Resource for interactive frontend pod gie_frontend_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.5, "1Gi") ) - # Resource for interactive frontend pod + # Resource for learning pod gle_resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.2, "1Gi") ) - # Resource for learning pod def post_setup(self): valid_engines = set( @@ -190,10 +187,11 @@ class EtcdConfig: If address is set, all other etcd configurations are ignored. """ - listening_client_port: int = 2379 # The port that etcd server will bind to for accepting client connections. Defaults to 2379. - listening_peer_port: int = 2380 + listening_client_port: int = 2379 + # The port that etcd server will bind to for accepting peer connections. Defaults to 2380. + listening_peer_port: int = 2380 # Kubernetes related config replicas: int = 1 @@ -203,20 +201,21 @@ class EtcdConfig: class VineyardConfig: """Vineyard configuration""" - socket: Union[str, None] = None # Vineyard IPC socket path, a socket suffixed by timestamp will be created in '/tmp' if not given. + socket: Union[str, None] = None rpc_port: int = 9600 # Vineyard RPC port. # Kubernetes related config - deployment_name: Union[str, None] = None + # The name of vineyard deployment, it should exist as expected. + deployment_name: Union[str, None] = None image: str = "vineyardcloudnative/vineyardd:latest" # Image for vineyard container. + # Resource for vineyard sidecar container resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.2, "256Mi") ) - # Resource for vineyard sidecar container @dataclass @@ -231,62 +230,63 @@ class CoordinatorConfig: monitor_port: int = 9090 # Coordinator prometheus exporter service port. # Kubernetes related config - deployment_name: Union[str, None] = None + # Name of the coordinator deployment and service. - node_selector: Union[str, None] = None + deployment_name: Union[str, None] = None # Node selector for coordinator pod in kubernetes + node_selector: Union[str, None] = None + # Resource configuration of coordinator. resource: ResourceConfig = field( default_factory=lambda: ResourceConfig.make_burstable(0.5, "512Mi") ) - # Resource configuration of coordinator. # For GraphScope operator - operator_mode: bool = False # Launch coordinator only, do not let coordinator launch resources or delete resources. # It would try to find existing resources and connect to it. + operator_mode: bool = False @dataclass class HostsLauncherConfig: """Local cluster configuration.""" - hosts: List[str] = list_field("localhost") # list of hostnames of graphscope engine workers. - etcd: EtcdConfig = field(default_factory=EtcdConfig) + hosts: List[str] = list_field("localhost") # Etcd configuration. Only local session needs to configure etcd. + etcd: EtcdConfig = field(default_factory=EtcdConfig) - dataset_download_retries: int = 3 # The number of retries when downloading dataset from internet. + dataset_download_retries: int = 3 @dataclass class KubernetesLauncherConfig: """Kubernetes cluster configuration.""" - namespace: Union[str, None] = None # The namespace to create all resource, which must exist in advance. + namespace: Union[str, None] = None delete_namespace: bool = False # Delete the namespace that created by graphscope. config_file: Union[str, None] = None # kube config file path - deployment_mode = "eager" # The deployment mode of engines on the kubernetes cluster, choose from 'eager' or 'lazy'. + # The deployment mode of engines on the kubernetes cluster, choose from 'eager' or 'lazy'. + deployment_mode: str = "eager" - service_type: str = "NodePort" # Service type, choose from 'NodePort' or 'LoadBalancer'. + service_type: str = "NodePort" - volumes: Union[str, None] = None # A base64 encoded json string specifies the kubernetes volumes to mount. + volumes: Union[str, None] = None - waiting_for_delete: bool = False # Wait until the graphscope instance has been deleted successfully. + waiting_for_delete: bool = False image: ImageConfig = field(default_factory=ImageConfig) # Image configuration. engine: EngineConfig = field(default_factory=EngineConfig) # Engine configuration. - dataset: DatasetConfig = field( - default_factory=DatasetConfig - ) # Dataset configuration. + # Dataset configuration. + dataset: DatasetConfig = field(default_factory=DatasetConfig) mars: MarsConfig = field(default_factory=MarsConfig) # Mars configuration. @@ -303,6 +303,8 @@ class SessionConfig: """Session configuration""" num_workers: int = 2 # The number of graphscope engine workers. + # The number of graphscope engine workers when launch local workers. + default_local_num_workers: int = 1 reconnect: bool = False # Connect to an existed GraphScope Cluster instance_id: Union[str, None] = None # Unique id for each GraphScope instance. @@ -310,43 +312,41 @@ class SessionConfig: show_log: bool = False # Show log or not. log_level: str = "info" # Log level, choose from 'info' or 'debug'. + # The length of time to wait before giving up launching graphscope.z timeout_seconds: int = 600 - # The length of time to wait before giving up launching graphscope. - dangling_timeout_seconds: int = 600 # The length of time to wait starting from client disconnected before killing the graphscope instance. + dangling_timeout_seconds: int = 600 - retry_time_seconds: int = 1 # The length of time to wait before retrying to launch graphscope. + retry_time_seconds: int = 1 execution_mode: str = "eager" # The deploying mode of graphscope, eager or lazy. @dataclass class Config(Serializable): - launcher_type: str = "k8s" # Launcher type, choose from 'hosts', 'k8s' or 'operator'. + launcher_type: str = "k8s" session: SessionConfig = field(default_factory=SessionConfig) - coordinator: CoordinatorConfig = field( - default_factory=CoordinatorConfig - ) # Coordinator configuration. - vineyard: VineyardConfig = field( - default_factory=VineyardConfig - ) # Vineyard configuration. + # Coordinator configuration. + coordinator: CoordinatorConfig = field(default_factory=CoordinatorConfig) + # Vineyard configuration. + vineyard: VineyardConfig = field(default_factory=VineyardConfig) - hosts_launcher: HostsLauncherConfig = field(default_factory=HostsLauncherConfig) # Local cluster configuration. + hosts_launcher: HostsLauncherConfig = field(default_factory=HostsLauncherConfig) + # Kubernetes cluster configuration. kubernetes_launcher: KubernetesLauncherConfig = field( default_factory=KubernetesLauncherConfig ) - # Kubernetes cluster configuration. + # Launcher used in operator mode. operator_launcher: OperatorLauncherConfig = field( default_factory=OperatorLauncherConfig ) - # Launcher used in operator mode. def set_option(self, key, value): # noqa: C901 """Forward set_option target to actual config fields""" @@ -422,6 +422,7 @@ def set_option(self, key, value): # noqa: C901 self.kubernetes_launcher.waiting_for_delete = value elif key == "num_workers": self.session.num_workers = value + self.session.default_local_num_workers = value elif key == "show_log": self.session.show_log = value elif key == "log_level":