From a9e729537899aca5ea60071415b9a66b57424201 Mon Sep 17 00:00:00 2001 From: Jojii <15957865+JojiiOfficial@users.noreply.github.com> Date: Fri, 26 Apr 2024 11:09:51 +0200 Subject: [PATCH] extract collection functions (#103) --- src/client/collection.rs | 357 +++++++++++++++++++++++++++++++++++++ src/client/mod.rs | 370 ++------------------------------------- 2 files changed, 369 insertions(+), 358 deletions(-) create mode 100644 src/client/collection.rs diff --git a/src/client/collection.rs b/src/client/collection.rs new file mode 100644 index 0000000..249adc8 --- /dev/null +++ b/src/client/collection.rs @@ -0,0 +1,357 @@ +use crate::auth::TokenInterceptor; +use crate::client::QdrantClient; +use crate::qdrant::alias_operations::Action; +use crate::qdrant::collections_client::CollectionsClient; +use crate::qdrant::update_collection_cluster_setup_request::Operation; +use crate::qdrant::{ + shard_key, AliasOperations, ChangeAliases, CollectionClusterInfoRequest, + CollectionClusterInfoResponse, CollectionExistsRequest, CollectionOperationResponse, + CollectionParamsDiff, CreateAlias, CreateCollection, CreateShardKey, CreateShardKeyRequest, + CreateShardKeyResponse, DeleteAlias, DeleteCollection, DeleteShardKey, DeleteShardKeyRequest, + DeleteShardKeyResponse, GetCollectionInfoRequest, GetCollectionInfoResponse, HnswConfigDiff, + ListAliasesRequest, ListAliasesResponse, ListCollectionAliasesRequest, ListCollectionsRequest, + ListCollectionsResponse, OptimizersConfigDiff, QuantizationConfigDiff, RenameAlias, ShardKey, + SparseVectorConfig, UpdateCollection, UpdateCollectionClusterSetupRequest, + UpdateCollectionClusterSetupResponse, VectorsConfigDiff, +}; +use std::future::Future; +use tonic::codegen::InterceptedService; +use tonic::transport::Channel; +use tonic::Status; + +impl QdrantClient { + // Access to raw collection API + pub async fn with_collections_client>>( + &self, + f: impl Fn(CollectionsClient>) -> O, + ) -> anyhow::Result { + self.channel + .with_channel( + |channel| { + let service = self.with_api_key(channel); + let mut client = + CollectionsClient::new(service).max_decoding_message_size(usize::MAX); + if let Some(compression) = self.cfg.compression { + client = client + .send_compressed(compression.into()) + .accept_compressed(compression.into()); + } + f(client) + }, + false, + ) + .await + } + + pub async fn list_collections(&self) -> anyhow::Result { + Ok(self + .with_collections_client(|mut collection_api| async move { + let result = collection_api.list(ListCollectionsRequest {}).await?; + Ok(result.into_inner()) + }) + .await?) + } + + #[deprecated(since = "1.8.0", note = "Please use `collection_exists` instead")] + pub async fn has_collection(&self, collection_name: impl ToString) -> anyhow::Result { + let collection_name = collection_name.to_string(); + let response = self.list_collections().await?; + let result = response + .collections + .into_iter() + .any(|c| c.name == collection_name); + + Ok(result) + } + + pub async fn collection_exists(&self, collection_name: impl ToString) -> anyhow::Result { + let collection_name_ref = &collection_name.to_string(); + Ok(self + .with_collections_client(|mut collection_api| async move { + let request = CollectionExistsRequest { + collection_name: collection_name_ref.clone(), + }; + let result = collection_api.collection_exists(request).await?; + Ok(result + .into_inner() + .result + .map(|r| r.exists) + .unwrap_or(false)) + }) + .await?) + } + + pub async fn create_collection( + &self, + details: &CreateCollection, + ) -> anyhow::Result { + Ok(self + .with_collections_client(|mut collection_api| async move { + let result = collection_api.create(details.clone()).await?; + Ok(result.into_inner()) + }) + .await?) + } + + #[allow(clippy::too_many_arguments)] + pub async fn update_collection( + &self, + collection_name: impl ToString, + optimizers_config: Option<&OptimizersConfigDiff>, + params: Option<&CollectionParamsDiff>, + sparse_vectors_config: Option<&SparseVectorConfig>, + hnsw_config: Option<&HnswConfigDiff>, + vectors_config: Option<&VectorsConfigDiff>, + quantization_config: Option<&QuantizationConfigDiff>, + ) -> anyhow::Result { + let collection_name = collection_name.to_string(); + let collection_name_ref = collection_name.as_str(); + + Ok(self + .with_collections_client(|mut collection_api| async move { + let result = collection_api + .update(UpdateCollection { + collection_name: collection_name_ref.to_string(), + optimizers_config: optimizers_config.cloned(), + timeout: None, + params: params.cloned(), + sparse_vectors_config: sparse_vectors_config.cloned(), + hnsw_config: hnsw_config.cloned(), + vectors_config: vectors_config.cloned(), + quantization_config: quantization_config.cloned(), + }) + .await?; + + Ok(result.into_inner()) + }) + .await?) + } + + pub async fn delete_collection( + &self, + collection_name: impl ToString, + ) -> anyhow::Result { + let collection_name = collection_name.to_string(); + let collection_name_ref = collection_name.as_str(); + + Ok(self + .with_collections_client(|mut collection_api| async move { + let result = collection_api + .delete(DeleteCollection { + collection_name: collection_name_ref.to_string(), + ..Default::default() + }) + .await?; + Ok(result.into_inner()) + }) + .await?) + } + + pub async fn collection_info( + &self, + collection_name: impl ToString, + ) -> anyhow::Result { + let collection_name = collection_name.to_string(); + let collection_name_ref = collection_name.as_str(); + + Ok(self + .with_collections_client(|mut collection_api| async move { + let result = collection_api + .get(GetCollectionInfoRequest { + collection_name: collection_name_ref.to_string(), + }) + .await?; + Ok(result.into_inner()) + }) + .await?) + } + + pub async fn create_alias( + &self, + collection_name: impl ToString, + alias_name: impl ToString, + ) -> anyhow::Result { + let create_alias = CreateAlias { + collection_name: collection_name.to_string(), + alias_name: alias_name.to_string(), + }; + let change_aliases = ChangeAliases { + actions: vec![AliasOperations { + action: Some(Action::CreateAlias(create_alias)), + }], + timeout: None, + }; + self.update_aliases(change_aliases).await + } + + pub async fn delete_alias( + &self, + alias_name: impl ToString, + ) -> anyhow::Result { + let delete_alias = DeleteAlias { + alias_name: alias_name.to_string(), + }; + let change_aliases = ChangeAliases { + actions: vec![AliasOperations { + action: Some(Action::DeleteAlias(delete_alias)), + }], + timeout: None, + }; + self.update_aliases(change_aliases).await + } + + pub async fn rename_alias( + &self, + old_alias_name: impl ToString, + new_alias_name: impl ToString, + ) -> anyhow::Result { + let rename_alias = RenameAlias { + old_alias_name: old_alias_name.to_string(), + new_alias_name: new_alias_name.to_string(), + }; + let change_aliases = ChangeAliases { + actions: vec![AliasOperations { + action: Some(Action::RenameAlias(rename_alias)), + }], + timeout: None, + }; + self.update_aliases(change_aliases).await + } + + // lower level API + pub async fn update_aliases( + &self, + change_aliases: ChangeAliases, + ) -> anyhow::Result { + let change_aliases = change_aliases.clone(); + let chang_aliases_ref = &change_aliases; + Ok(self + .with_collections_client(|mut collection_api| async move { + let result = collection_api + .update_aliases(chang_aliases_ref.clone()) + .await?; + Ok(result.into_inner()) + }) + .await?) + } + + pub async fn list_collection_aliases( + &self, + collection_name: impl ToString, + ) -> anyhow::Result { + let collection_name = collection_name.to_string(); + let collection_name_ref = collection_name.as_str(); + Ok(self + .with_collections_client(|mut collection_api| async move { + let result = collection_api + .list_collection_aliases(ListCollectionAliasesRequest { + collection_name: collection_name_ref.to_string(), + }) + .await?; + Ok(result.into_inner()) + }) + .await?) + } + + pub async fn list_aliases(&self) -> anyhow::Result { + Ok(self + .with_collections_client(|mut collection_api| async move { + let result = collection_api.list_aliases(ListAliasesRequest {}).await?; + Ok(result.into_inner()) + }) + .await?) + } + + pub async fn collection_cluster_info( + &self, + collection_name: impl ToString, + ) -> anyhow::Result { + let collection_name = collection_name.to_string(); + let collection_name_ref = collection_name.as_str(); + + Ok(self + .with_collections_client(|mut collection_api| async move { + let request = CollectionClusterInfoRequest { + collection_name: collection_name_ref.to_string(), + }; + let result = collection_api.collection_cluster_info(request).await?; + Ok(result.into_inner()) + }) + .await?) + } + + pub async fn create_shard_key( + &self, + collection_name: impl AsRef, + shard_key: &shard_key::Key, + shards_number: Option, + replication_factor: Option, + placement: &[u64], + ) -> anyhow::Result { + let collection_name = collection_name.as_ref(); + + Ok(self + .with_collections_client(|mut collection_api| async move { + let result = collection_api + .create_shard_key(CreateShardKeyRequest { + collection_name: collection_name.to_string(), + request: Some(CreateShardKey { + shard_key: Some(ShardKey::from(shard_key.clone())), + shards_number, + replication_factor, + placement: placement.to_vec(), + }), + timeout: None, + }) + .await?; + Ok(result.into_inner()) + }) + .await?) + } + + pub async fn delete_shard_key( + &self, + collection_name: impl AsRef, + shard_key: &shard_key::Key, + ) -> anyhow::Result { + let collection_name = collection_name.as_ref(); + + Ok(self + .with_collections_client(|mut collection_api| async move { + let result = collection_api + .delete_shard_key(DeleteShardKeyRequest { + collection_name: collection_name.to_string(), + request: Some(DeleteShardKey { + shard_key: Some(ShardKey::from(shard_key.clone())), + }), + timeout: None, + }) + .await?; + Ok(result.into_inner()) + }) + .await?) + } + + pub async fn update_collection_cluster_setup( + &self, + collection_name: impl ToString, + operation: Operation, + ) -> anyhow::Result { + let collection_name = collection_name.to_string(); + let collection_name_ref = collection_name.as_str(); + let operation_ref = &operation; + Ok(self + .with_collections_client(|mut collection_api| async move { + let request = UpdateCollectionClusterSetupRequest { + collection_name: collection_name_ref.to_string(), + timeout: None, + operation: Some(operation_ref.clone()), + }; + let result = collection_api + .update_collection_cluster_setup(request) + .await?; + Ok(result.into_inner()) + }) + .await?) + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 3858867..a0c2413 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,31 +1,20 @@ +pub mod collection; pub mod snapshot; use crate::channel_pool::ChannelPool; -use crate::qdrant::alias_operations::Action; -use crate::qdrant::collections_client::CollectionsClient; use crate::qdrant::points_client::PointsClient; -use crate::qdrant::update_collection_cluster_setup_request::Operation; use crate::qdrant::{ - qdrant_client, shard_key, AliasOperations, ChangeAliases, ClearPayloadPoints, - CollectionClusterInfoRequest, CollectionClusterInfoResponse, CollectionExistsRequest, - CollectionOperationResponse, CollectionParamsDiff, CountPoints, CountResponse, CreateAlias, - CreateCollection, CreateFieldIndexCollection, CreateShardKey, CreateShardKeyRequest, - CreateShardKeyResponse, DeleteAlias, DeleteCollection, DeleteFieldIndexCollection, - DeletePayloadPoints, DeletePointVectors, DeletePoints, DeleteShardKey, DeleteShardKeyRequest, - DeleteShardKeyResponse, DiscoverBatchPoints, DiscoverBatchResponse, DiscoverPoints, - DiscoverResponse, FieldType, GetCollectionInfoRequest, GetCollectionInfoResponse, GetPoints, - GetResponse, HealthCheckReply, HealthCheckRequest, HnswConfigDiff, ListAliasesRequest, - ListAliasesResponse, ListCollectionAliasesRequest, ListCollectionsRequest, - ListCollectionsResponse, OptimizersConfigDiff, PayloadIndexParams, PointId, PointStruct, - PointVectors, PointsOperationResponse, PointsSelector, PointsUpdateOperation, - QuantizationConfigDiff, ReadConsistency, RecommendBatchPoints, RecommendBatchResponse, - RecommendGroupsResponse, RecommendPointGroups, RecommendPoints, RecommendResponse, RenameAlias, - ScrollPoints, ScrollResponse, SearchBatchPoints, SearchBatchResponse, SearchGroupsResponse, - SearchPointGroups, SearchPoints, SearchResponse, SetPayloadPoints, ShardKey, ShardKeySelector, - SparseVectorConfig, UpdateBatchPoints, UpdateBatchResponse, UpdateCollection, - UpdateCollectionClusterSetupRequest, UpdateCollectionClusterSetupResponse, UpdatePointVectors, - UpsertPoints, VectorsConfigDiff, VectorsSelector, WithPayloadSelector, WithVectorsSelector, - WriteOrdering, + qdrant_client, shard_key, ClearPayloadPoints, CountPoints, CountResponse, + CreateFieldIndexCollection, DeleteFieldIndexCollection, DeletePayloadPoints, + DeletePointVectors, DeletePoints, DiscoverBatchPoints, DiscoverBatchResponse, DiscoverPoints, + DiscoverResponse, FieldType, GetPoints, GetResponse, HealthCheckReply, HealthCheckRequest, + PayloadIndexParams, PointId, PointStruct, PointVectors, PointsOperationResponse, + PointsSelector, PointsUpdateOperation, ReadConsistency, RecommendBatchPoints, + RecommendBatchResponse, RecommendGroupsResponse, RecommendPointGroups, RecommendPoints, + RecommendResponse, ScrollPoints, ScrollResponse, SearchBatchPoints, SearchBatchResponse, + SearchGroupsResponse, SearchPointGroups, SearchPoints, SearchResponse, SetPayloadPoints, + ShardKeySelector, UpdateBatchPoints, UpdateBatchResponse, UpdatePointVectors, UpsertPoints, + VectorsSelector, WithPayloadSelector, WithVectorsSelector, WriteOrdering, }; use anyhow::Result; use std::future::Future; @@ -57,29 +46,6 @@ impl QdrantClient { InterceptedService::new(channel, interceptor) } - // Access to raw collection API - pub async fn with_collections_client>>( - &self, - f: impl Fn(CollectionsClient>) -> O, - ) -> Result { - self.channel - .with_channel( - |channel| { - let service = self.with_api_key(channel); - let mut client = - CollectionsClient::new(service).max_decoding_message_size(usize::MAX); - if let Some(compression) = self.cfg.compression { - client = client - .send_compressed(compression.into()) - .accept_compressed(compression.into()); - } - f(client) - }, - false, - ) - .await - } - // Access to raw points API pub async fn with_points_client>>( &self, @@ -150,318 +116,6 @@ impl QdrantClient { .await?) } - pub async fn list_collections(&self) -> Result { - Ok(self - .with_collections_client(|mut collection_api| async move { - let result = collection_api.list(ListCollectionsRequest {}).await?; - Ok(result.into_inner()) - }) - .await?) - } - - #[deprecated(since = "1.8.0", note = "Please use `collection_exists` instead")] - pub async fn has_collection(&self, collection_name: impl ToString) -> Result { - let collection_name = collection_name.to_string(); - let response = self.list_collections().await?; - let result = response - .collections - .into_iter() - .any(|c| c.name == collection_name); - - Ok(result) - } - - pub async fn collection_exists(&self, collection_name: impl ToString) -> Result { - let collection_name_ref = &collection_name.to_string(); - Ok(self - .with_collections_client(|mut collection_api| async move { - let request = CollectionExistsRequest { - collection_name: collection_name_ref.clone(), - }; - let result = collection_api.collection_exists(request).await?; - Ok(result - .into_inner() - .result - .map(|r| r.exists) - .unwrap_or(false)) - }) - .await?) - } - - pub async fn create_collection( - &self, - details: &CreateCollection, - ) -> Result { - Ok(self - .with_collections_client(|mut collection_api| async move { - let result = collection_api.create(details.clone()).await?; - Ok(result.into_inner()) - }) - .await?) - } - - #[allow(clippy::too_many_arguments)] - pub async fn update_collection( - &self, - collection_name: impl ToString, - optimizers_config: Option<&OptimizersConfigDiff>, - params: Option<&CollectionParamsDiff>, - sparse_vectors_config: Option<&SparseVectorConfig>, - hnsw_config: Option<&HnswConfigDiff>, - vectors_config: Option<&VectorsConfigDiff>, - quantization_config: Option<&QuantizationConfigDiff>, - ) -> Result { - let collection_name = collection_name.to_string(); - let collection_name_ref = collection_name.as_str(); - - Ok(self - .with_collections_client(|mut collection_api| async move { - let result = collection_api - .update(UpdateCollection { - collection_name: collection_name_ref.to_string(), - optimizers_config: optimizers_config.cloned(), - timeout: None, - params: params.cloned(), - sparse_vectors_config: sparse_vectors_config.cloned(), - hnsw_config: hnsw_config.cloned(), - vectors_config: vectors_config.cloned(), - quantization_config: quantization_config.cloned(), - }) - .await?; - - Ok(result.into_inner()) - }) - .await?) - } - - pub async fn delete_collection( - &self, - collection_name: impl ToString, - ) -> Result { - let collection_name = collection_name.to_string(); - let collection_name_ref = collection_name.as_str(); - - Ok(self - .with_collections_client(|mut collection_api| async move { - let result = collection_api - .delete(DeleteCollection { - collection_name: collection_name_ref.to_string(), - ..Default::default() - }) - .await?; - Ok(result.into_inner()) - }) - .await?) - } - - pub async fn collection_info( - &self, - collection_name: impl ToString, - ) -> Result { - let collection_name = collection_name.to_string(); - let collection_name_ref = collection_name.as_str(); - - Ok(self - .with_collections_client(|mut collection_api| async move { - let result = collection_api - .get(GetCollectionInfoRequest { - collection_name: collection_name_ref.to_string(), - }) - .await?; - Ok(result.into_inner()) - }) - .await?) - } - - pub async fn create_alias( - &self, - collection_name: impl ToString, - alias_name: impl ToString, - ) -> Result { - let create_alias = CreateAlias { - collection_name: collection_name.to_string(), - alias_name: alias_name.to_string(), - }; - let change_aliases = ChangeAliases { - actions: vec![AliasOperations { - action: Some(Action::CreateAlias(create_alias)), - }], - timeout: None, - }; - self.update_aliases(change_aliases).await - } - - pub async fn delete_alias( - &self, - alias_name: impl ToString, - ) -> Result { - let delete_alias = DeleteAlias { - alias_name: alias_name.to_string(), - }; - let change_aliases = ChangeAliases { - actions: vec![AliasOperations { - action: Some(Action::DeleteAlias(delete_alias)), - }], - timeout: None, - }; - self.update_aliases(change_aliases).await - } - - pub async fn rename_alias( - &self, - old_alias_name: impl ToString, - new_alias_name: impl ToString, - ) -> Result { - let rename_alias = RenameAlias { - old_alias_name: old_alias_name.to_string(), - new_alias_name: new_alias_name.to_string(), - }; - let change_aliases = ChangeAliases { - actions: vec![AliasOperations { - action: Some(Action::RenameAlias(rename_alias)), - }], - timeout: None, - }; - self.update_aliases(change_aliases).await - } - - // lower level API - pub async fn update_aliases( - &self, - change_aliases: ChangeAliases, - ) -> Result { - let change_aliases = change_aliases.clone(); - let chang_aliases_ref = &change_aliases; - Ok(self - .with_collections_client(|mut collection_api| async move { - let result = collection_api - .update_aliases(chang_aliases_ref.clone()) - .await?; - Ok(result.into_inner()) - }) - .await?) - } - - pub async fn list_collection_aliases( - &self, - collection_name: impl ToString, - ) -> Result { - let collection_name = collection_name.to_string(); - let collection_name_ref = collection_name.as_str(); - Ok(self - .with_collections_client(|mut collection_api| async move { - let result = collection_api - .list_collection_aliases(ListCollectionAliasesRequest { - collection_name: collection_name_ref.to_string(), - }) - .await?; - Ok(result.into_inner()) - }) - .await?) - } - - pub async fn list_aliases(&self) -> Result { - Ok(self - .with_collections_client(|mut collection_api| async move { - let result = collection_api.list_aliases(ListAliasesRequest {}).await?; - Ok(result.into_inner()) - }) - .await?) - } - - pub async fn collection_cluster_info( - &self, - collection_name: impl ToString, - ) -> Result { - let collection_name = collection_name.to_string(); - let collection_name_ref = collection_name.as_str(); - - Ok(self - .with_collections_client(|mut collection_api| async move { - let request = CollectionClusterInfoRequest { - collection_name: collection_name_ref.to_string(), - }; - let result = collection_api.collection_cluster_info(request).await?; - Ok(result.into_inner()) - }) - .await?) - } - - pub async fn create_shard_key( - &self, - collection_name: impl AsRef, - shard_key: &shard_key::Key, - shards_number: Option, - replication_factor: Option, - placement: &[u64], - ) -> Result { - let collection_name = collection_name.as_ref(); - - Ok(self - .with_collections_client(|mut collection_api| async move { - let result = collection_api - .create_shard_key(CreateShardKeyRequest { - collection_name: collection_name.to_string(), - request: Some(CreateShardKey { - shard_key: Some(ShardKey::from(shard_key.clone())), - shards_number, - replication_factor, - placement: placement.to_vec(), - }), - timeout: None, - }) - .await?; - Ok(result.into_inner()) - }) - .await?) - } - - pub async fn delete_shard_key( - &self, - collection_name: impl AsRef, - shard_key: &shard_key::Key, - ) -> Result { - let collection_name = collection_name.as_ref(); - - Ok(self - .with_collections_client(|mut collection_api| async move { - let result = collection_api - .delete_shard_key(DeleteShardKeyRequest { - collection_name: collection_name.to_string(), - request: Some(DeleteShardKey { - shard_key: Some(ShardKey::from(shard_key.clone())), - }), - timeout: None, - }) - .await?; - Ok(result.into_inner()) - }) - .await?) - } - - pub async fn update_collection_cluster_setup( - &self, - collection_name: impl ToString, - operation: Operation, - ) -> Result { - let collection_name = collection_name.to_string(); - let collection_name_ref = collection_name.as_str(); - let operation_ref = &operation; - Ok(self - .with_collections_client(|mut collection_api| async move { - let request = UpdateCollectionClusterSetupRequest { - collection_name: collection_name_ref.to_string(), - timeout: None, - operation: Some(operation_ref.clone()), - }; - let result = collection_api - .update_collection_cluster_setup(request) - .await?; - Ok(result.into_inner()) - }) - .await?) - } - async fn _batch_updates( &self, collection_name: impl ToString,