From e4b045d8d3c87d2d28002a03365fc89ef09f5145 Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 2 Apr 2024 15:16:24 +0800 Subject: [PATCH] Support major compaction in ManualCompaction Signed-off-by: wayblink --- pymilvus/client/grpc_handler.py | 4 ++-- pymilvus/client/prepare.py | 3 ++- pymilvus/client/stub.py | 14 ++++++++++---- pymilvus/orm/collection.py | 26 +++++++++++++++++++++----- 4 files changed, 35 insertions(+), 12 deletions(-) diff --git a/pymilvus/client/grpc_handler.py b/pymilvus/client/grpc_handler.py index 1e132b3fb..4e9fd780c 100644 --- a/pymilvus/client/grpc_handler.py +++ b/pymilvus/client/grpc_handler.py @@ -1523,13 +1523,13 @@ def load_balance( check_status(status) @retry_on_rpc_failure() - def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int: + def compact(self, collection_name: str, timeout: Optional[float] = None, is_major: Optional[bool] = False, **kwargs) -> int: request = Prepare.describe_collection_request(collection_name) rf = self._stub.DescribeCollection.future(request, timeout=timeout) response = rf.result() check_status(response.status) - req = Prepare.manual_compaction(response.collectionID) + req = Prepare.manual_compaction(response.collectionID, is_major) future = self._stub.ManualCompaction.future(req, timeout=timeout) response = future.result() check_status(response.status) diff --git a/pymilvus/client/prepare.py b/pymilvus/client/prepare.py index 2217bb480..dcf449d34 100644 --- a/pymilvus/client/prepare.py +++ b/pymilvus/client/prepare.py @@ -960,12 +960,13 @@ def load_balance_request( ) @classmethod - def manual_compaction(cls, collection_id: int): + def manual_compaction(cls, collection_id: int, is_major: bool): if collection_id is None or not isinstance(collection_id, int): raise ParamError(message=f"collection_id value {collection_id} is illegal") request = milvus_types.ManualCompactionRequest() request.collectionID = collection_id + request.majorCompaction = is_major return request diff --git a/pymilvus/client/stub.py b/pymilvus/client/stub.py index e5b200d0e..4e6d26499 100644 --- a/pymilvus/client/stub.py +++ b/pymilvus/client/stub.py @@ -1044,7 +1044,7 @@ def load_balance( **kwargs, ) - def compact(self, collection_name, timeout=None, **kwargs) -> int: + def compact(self, collection_name, timeout=None, is_major=False, **kwargs) -> int: """ Do compaction for the collection. @@ -1054,15 +1054,18 @@ def compact(self, collection_name, timeout=None, **kwargs) -> int: :param timeout: The timeout for this method, unit: second :type timeout: int + :param is_major: trigger major compaction + :type is_major: bool + :return: the compaction ID :rtype: int :raises MilvusException: If collection name not exist. """ with self._connection() as handler: - return handler.compact(collection_name, timeout=timeout, **kwargs) + return handler.compact(collection_name, timeout=timeout, is_major=is_major, **kwargs) - def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> CompactionState: + def get_compaction_state(self, compaction_id: int, timeout=None, is_major=False, **kwargs) -> CompactionState: """ Get compaction states of a targeted compaction id @@ -1072,6 +1075,9 @@ def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> Co :param timeout: The timeout for this method, unit: second :type timeout: int + :param is_major: get major compaction + :type is_major: bool + :return: the state of the compaction :rtype: CompactionState @@ -1079,7 +1085,7 @@ def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> Co """ with self._connection() as handler: - return handler.get_compaction_state(compaction_id, timeout=timeout, **kwargs) + return handler.get_compaction_state(compaction_id, timeout=timeout, is_major=is_major, **kwargs) def wait_for_compaction_completed( self, compaction_id: int, timeout=None, **kwargs diff --git a/pymilvus/orm/collection.py b/pymilvus/orm/collection.py index 27fdfee71..fa0fe9f5e 100644 --- a/pymilvus/orm/collection.py +++ b/pymilvus/orm/collection.py @@ -1489,7 +1489,7 @@ def drop_index(self, timeout: Optional[float] = None, **kwargs): ) index.drop(timeout=timeout, **kwargs) - def compact(self, timeout: Optional[float] = None, **kwargs): + def compact(self, timeout: Optional[float] = None, is_major: Optional[bool] = False, **kwargs): """Compact merge the small segments in a collection Args: @@ -1497,13 +1497,18 @@ def compact(self, timeout: Optional[float] = None, **kwargs): for the RPC. When timeout is set to None, client waits until server response or error occur. + is_major (``bool``, optional): An optional setting to trigger major compaction. + Raises: MilvusException: If anything goes wrong. """ conn = self._get_connection() - self.compaction_id = conn.compact(self._name, timeout=timeout, **kwargs) + if is_major: + self.major_compaction_id = conn.compact(self._name, timeout=timeout, is_major=is_major, **kwargs) + else: + self.compaction_id = conn.compact(self._name, timeout=timeout, is_major=is_major, **kwargs) - def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> CompactionState: + def get_compaction_state(self, timeout: Optional[float] = None, is_major: Optional[bool] = False, **kwargs) -> CompactionState: """Get the current compaction state Args: @@ -1511,15 +1516,21 @@ def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> Com for the RPC. When timeout is set to None, client waits until server response or error occur. + is_major (``bool``, optional): An optional setting to get major compaction state. + Raises: MilvusException: If anything goes wrong. """ conn = self._get_connection() - return conn.get_compaction_state(self.compaction_id, timeout=timeout, **kwargs) + if is_major: + return conn.get_compaction_state(self.major_compaction_id, timeout=timeout, **kwargs) + else: + return conn.get_compaction_state(self.compaction_id, timeout=timeout, **kwargs) def wait_for_compaction_completed( self, timeout: Optional[float] = None, + is_major: Optional[bool] = False, **kwargs, ) -> CompactionState: """Block until the current collection's compaction completed @@ -1529,11 +1540,16 @@ def wait_for_compaction_completed( for the RPC. When timeout is set to None, client waits until server response or error occur. + is_major (``bool``, optional): An optional setting to get major compaction state. + Raises: MilvusException: If anything goes wrong. """ conn = self._get_connection() - return conn.wait_for_compaction_completed(self.compaction_id, timeout=timeout, **kwargs) + if is_major: + return conn.wait_for_compaction_completed(self.major_compaction_id, timeout=timeout, **kwargs) + else: + return conn.wait_for_compaction_completed(self.compaction_id, timeout=timeout, **kwargs) def get_compaction_plans(self, timeout: Optional[float] = None, **kwargs) -> CompactionPlans: """Get the current compaction plans