Skip to content

Commit

Permalink
enhance: add resource group declarative api (#2002)
Browse files Browse the repository at this point in the history
issue: milvus-io/milvus#30931

- Add param `config` for `create_resource_group` api.

- Add `update_resource_groups` api to implement declarative resource
group api.

- Add new example for declarative resource group api.

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Apr 26, 2024
1 parent 66af344 commit 39fdc43
Show file tree
Hide file tree
Showing 15 changed files with 441 additions and 112 deletions.
5 changes: 3 additions & 2 deletions examples/resource_group.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pymilvus import utility, connections, DEFAULT_RESOURCE_GROUP
from pymilvus import utility, connections
from pymilvus.client.constants import DEFAULT_RESOURCE_GROUP
from example import *

_HOST = '127.0.0.1'
Expand Down Expand Up @@ -52,7 +53,7 @@ def transfer_replica(source, target, collection_name, num_replica):
f"transfer {num_replica} replicas in {collection_name} from {source} to {target}")
utility.transfer_replica(
source, target, collection_name, num_replica, using=_CONNECTION_NAME)

def run():
create_connection("root", "123456")
coll = create_collection(_COLLECTION_NAME, _ID_FIELD_NAME, _VECTOR_FIELD_NAME)
Expand Down
157 changes: 157 additions & 0 deletions examples/resource_group_declarative_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from pymilvus import utility, connections, Collection
from pymilvus.client.constants import DEFAULT_RESOURCE_GROUP
from pymilvus.client.types import ResourceGroupConfig
from typing import List
from example import create_connection, create_collection, insert, create_index

_PENDING_NODES_RESOURCE_GROUP="pending_nodes"
# Vector parameters
_DIM = 128
_COLLECTION_NAME = 'rg_declarative_demo'
_ID_FIELD_NAME = 'id_field'
_VECTOR_FIELD_NAME = 'float_vector_field'

def create_example_collection_and_load(replica_number: int, resource_groups: List[str]):
print(f"\nCreate collection and load...")
coll = create_collection(_COLLECTION_NAME, _ID_FIELD_NAME, _VECTOR_FIELD_NAME)
insert(coll, 10000, _DIM)
coll.flush()
create_index(coll, _VECTOR_FIELD_NAME)
coll.load(replica_number=replica_number, _resource_groups=resource_groups)

def transfer_replica(src: str, dest: str, num_replica: int):
utility.transfer_replica(source_group=src, target_group=dest, collection_name=_COLLECTION_NAME, num_replicas=num_replica)

def list_replica():
coll = Collection(name=_COLLECTION_NAME)
replicas = coll.get_replicas()
print(replicas)

def init_cluster(node_num: int):
print(f"Init cluster with {node_num} nodes, all nodes will be put in default resource group")
# create a pending resource group, which can used to hold the pending nodes that do not hold any data.
utility.create_resource_group(name=_PENDING_NODES_RESOURCE_GROUP, config=ResourceGroupConfig(
requests={"node_num": 0}, # this resource group can hold 0 nodes, no data will be load on it.
limits={"node_num": 10000}, # this resource group can hold at most 10000 nodes
))

# create a default resource group, which can used to hold the nodes that all initial node in it.
utility.update_resource_groups({
DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(
requests={"node_num": node_num},
limits={"node_num": node_num},
transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover missing node from pending resource group at high priority.
transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover redundant node to pending resource group at low priority.
)})

def list_all_resource_groups():
rg_names = utility.list_resource_groups()

for rg_name in rg_names:
resource_group = utility.describe_resource_group(rg_name)
print(resource_group)
# print(f"Resource group {rg_name} has {resource_group.nodes} with config: {resource_group.config}")

def scale_resource_group_to(name :str, node_num: int):
"""scale resource group to node_num nodes, new query node need to be added from outside orchestration system"""
utility.update_resource_groups({
name: ResourceGroupConfig(
requests={"node_num": node_num},
limits={"node_num": node_num},
transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover missing node from pending resource group at high priority.
transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover redundant node to pending resource group at low priority.
)
})

def create_resource_group(name: str, node_num: int):
print(f"Create resource group {name} with {node_num} nodes")
utility.create_resource_group(name, config=ResourceGroupConfig(
requests={"node_num": node_num},
limits={"node_num": node_num},
transfer_from=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover missing node from pending resource group at high priority.
transfer_to=[{"resource_group": _PENDING_NODES_RESOURCE_GROUP}], # recover redundant node to pending resource group at low priority.
))

def resource_group_management():
# cluster is initialized with 1 node in default resource group, and 0 node in pending resource group.
init_cluster(1)
list_all_resource_groups()
# DEFAULT_RESOURCE_GROUP: 1
# _PENDING_NODES_RESOURCE_GROUP: 0

# rg1 missing two query node.
create_resource_group("rg1", 2)
list_all_resource_groups()
# DEFAULT_RESOURCE_GROUP: 1
# _PENDING_NODES_RESOURCE_GROUP: 0
# rg1: 0(missing 2)

# scale_out(2)
# scale out two new query node into cluster by orchestration system, these node will be added to rg1 automatically.
list_all_resource_groups()
# DEFAULT_RESOURCE_GROUP: 1
# _PENDING_NODES_RESOURCE_GROUP: 0
# rg1: 2


# rg1 missing one query node.
scale_resource_group_to("rg1", 3)
list_all_resource_groups()
# DEFAULT_RESOURCE_GROUP: 1
# _PENDING_NODES_RESOURCE_GROUP: 0
# rg1: 2(missing 1)

# scale_out(2)
# scale out two new query node into cluster by orchestration system, one node will be added to rg1 automatically
# and one redundant node will be added to pending resource group.
list_all_resource_groups()
# DEFAULT_RESOURCE_GROUP: 1
# _PENDING_NODES_RESOURCE_GROUP: 1
# rg1: 3

scale_resource_group_to("rg1", 1)
list_all_resource_groups()
# DEFAULT_RESOURCE_GROUP: 1
# _PENDING_NODES_RESOURCE_GROUP: 3
# rg1: 1

# rg2 missing three query node, will be added from pending resource group.
create_resource_group("rg2", 3)
list_all_resource_groups()
# DEFAULT_RESOURCE_GROUP: 1
# _PENDING_NODES_RESOURCE_GROUP: 0
# rg1: 1
# rg2: 3

scale_resource_group_to(DEFAULT_RESOURCE_GROUP, 5)
list_all_resource_groups()
# DEFAULT_RESOURCE_GROUP: 1(missing 4)
# _PENDING_NODES_RESOURCE_GROUP: 0
# rg1: 1
# rg2: 3

# scale_out(4)
list_all_resource_groups()
# DEFAULT_RESOURCE_GROUP: 5
# _PENDING_NODES_RESOURCE_GROUP: 1
# rg1: 1
# rg2: 3

def replica_management():
# load collection into default.
# create_example_collection_and_load(4, ["rg1", "rg2", "rg2", "rg2"])
# one replica per node in default resource group.
list_replica()
transfer_replica("rg1", DEFAULT_RESOURCE_GROUP, 1)
list_replica()
transfer_replica("rg2", DEFAULT_RESOURCE_GROUP, 1)
list_replica()
# DEFAULT_RESOURCE_GROUP: 2 replica on 5 nodes.
# rg1: 0 replica.
# rg2: 2 replica on 3 nodes.

if __name__ == "__main__":
create_connection()
resource_group_management()
create_example_collection_and_load(4, ["rg1", "rg2", "rg2", "rg2"])
replica_management()
2 changes: 2 additions & 0 deletions pymilvus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
transfer_node,
transfer_replica,
update_password,
update_resource_groups,
wait_for_index_building_complete,
wait_for_loading_complete,
)
Expand Down Expand Up @@ -106,6 +107,7 @@
"reset_password",
"create_user",
"update_password",
"update_resource_groups",
"delete_user",
"list_usernames",
"SearchResult",
Expand Down
13 changes: 11 additions & 2 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import socket
import time
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Mapping, Optional, Union
from urllib import parse

import grpc
Expand Down Expand Up @@ -49,6 +49,7 @@
LoadState,
Plan,
Replica,
ResourceGroupConfig,
ResourceGroupInfo,
RoleInfo,
Shard,
Expand Down Expand Up @@ -1838,10 +1839,18 @@ def get_server_version(self, timeout: Optional[float] = None, **kwargs) -> str:

@retry_on_rpc_failure()
def create_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs):
req = Prepare.create_resource_group(name)
req = Prepare.create_resource_group(name, **kwargs)
resp = self._stub.CreateResourceGroup(req, wait_for_ready=True, timeout=timeout)
check_status(resp)

@retry_on_rpc_failure()
def update_resource_groups(
self, configs: Mapping[str, ResourceGroupConfig], timeout: Optional[float] = None, **kwargs
):
req = Prepare.update_resource_groups(configs)
resp = self._stub.UpdateResourceGroups(req, wait_for_ready=True, timeout=timeout)
check_status(resp)

@retry_on_rpc_failure()
def drop_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs):
req = Prepare.drop_resource_group(name)
Expand Down
22 changes: 18 additions & 4 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import base64
import datetime
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Mapping, Optional, Union

import numpy as np

Expand All @@ -19,7 +19,12 @@
ITERATOR_FIELD,
REDUCE_STOP_FOR_BEST,
)
from .types import DataType, PlaceholderType, get_consistency_level
from .types import (
DataType,
PlaceholderType,
ResourceGroupConfig,
get_consistency_level,
)
from .utils import traverse_info, traverse_rows_info


Expand Down Expand Up @@ -1146,9 +1151,18 @@ def get_server_version(cls):
return milvus_types.GetVersionRequest()

@classmethod
def create_resource_group(cls, name: str):
def create_resource_group(cls, name: str, **kwargs):
check_pass_param(resource_group_name=name)
return milvus_types.CreateResourceGroupRequest(resource_group=name)
return milvus_types.CreateResourceGroupRequest(
resource_group=name,
config=kwargs.get("config"),
)

@classmethod
def update_resource_groups(cls, configs: Mapping[str, ResourceGroupConfig]):
return milvus_types.UpdateResourceGroupsRequest(
resource_groups=configs,
)

@classmethod
def drop_resource_group(cls, name: str):
Expand Down
13 changes: 13 additions & 0 deletions pymilvus/client/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pymilvus.decorators import deprecated
from pymilvus.exceptions import MilvusException, ParamError
from pymilvus.settings import Config
from typing import Mapping

from .check import is_legal_host, is_legal_port
from .grpc_handler import GrpcHandler
Expand All @@ -12,6 +13,7 @@
CompactionState,
Replica,
ResourceGroupInfo,
ResourceGroupConfig,
)


Expand Down Expand Up @@ -1381,6 +1383,17 @@ def create_resource_group(self, name, timeout=None, **kwargs):
with self._connection() as handler:
handler.create_resource_group(name, timeout=timeout, **kwargs)

def update_resource_groups(
self, configs: Mapping[str, ResourceGroupConfig], timeout=None, **kwargs
):
"""update resource groups with specific configs
:param configs: resource group configs
:type name: Mapping
"""
with self._connection() as handler:
handler.update_resource_groups(configs=configs, timeout=timeout, **kwargs)

def drop_resource_group(self, name, timeout=None, **kwargs):
"""drop resource group with specific name
Expand Down
Loading

0 comments on commit 39fdc43

Please sign in to comment.