Skip to content

Commit

Permalink
Add support for GZip compression
Browse files Browse the repository at this point in the history
  • Loading branch information
GodTamIt authored and generall committed Apr 22, 2024
1 parent 3a889a8 commit 9db4279
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ categories = ["database", "api-bindings"]
keywords = ["qdrant", "vector-search", "search-engine", "client", "grpc"]

[dependencies]
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
tonic = { version = "0.11.0", features = ["tls", "tls-roots", "gzip"] }
prost = "0.12.4"
prost-types = "0.12.4"
anyhow = "1"
Expand Down
120 changes: 83 additions & 37 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct QdrantClientConfig {

/// API key or token to use for authorization
pub api_key: Option<String>,
pub compression: Option<CompressionEncoding>,
}

/// A builder type for `QdrantClient`s
Expand Down Expand Up @@ -118,6 +119,20 @@ impl AsTimeout for u64 {
}
}

/// The type of compression to use for requests.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionEncoding {
Gzip,
}

impl From<CompressionEncoding> for tonic::codec::CompressionEncoding {
fn from(encoding: CompressionEncoding) -> Self {
match encoding {
CompressionEncoding::Gzip => tonic::codec::CompressionEncoding::Gzip,
}
}
}

impl QdrantClientConfig {
pub fn from_url(url: &str) -> Self {
QdrantClientConfig {
Expand All @@ -142,9 +157,13 @@ impl QdrantClientConfig {
pub fn set_keep_alive_while_idle(&mut self, keep_alive_while_idle: bool) {
self.keep_alive_while_idle = keep_alive_while_idle;
}

pub fn set_compression(&mut self, compression: Option<CompressionEncoding>) {
self.compression = compression;
}

/// Set the API key or token, builder-like. The API key argument can be any of
/// `&str`, `String`, `Option<&str>`, `Option<String>` or `Result<String>`.
/// set the API key, builder-like. The API key argument can be any of
/// `&str`, `String`, `Option<&str>``, `Option<String>` or `Result<String>`.`
///
/// # Examples:
///
Expand Down Expand Up @@ -188,6 +207,12 @@ impl QdrantClientConfig {
self
}

/// Set the compression to use for this client
pub fn with_compression(mut self, compression: Option<CompressionEncoding>) -> Self {
self.compression = compression;
self
}

/// Build the QdrantClient
pub fn build(self) -> Result<QdrantClient> {
QdrantClient::new(Some(self))
Expand Down Expand Up @@ -339,6 +364,7 @@ impl Default for QdrantClientConfig {
connect_timeout: Duration::from_secs(5),
keep_alive_while_idle: true,
api_key: None,
compression: None,
}
}
}
Expand Down Expand Up @@ -384,16 +410,21 @@ impl QdrantClient {
InterceptedService::new(channel, interceptor)
}

pub async fn with_snapshot_client<T, O: Future<Output = Result<T, Status>>>(
pub async fn with_snapshot_client<T, O: Future<Output=Result<T, Status>>>(
&self,
f: impl Fn(SnapshotsClient<InterceptedService<Channel, TokenInterceptor>>) -> O,
) -> Result<T, Status> {
self.channel
.with_channel(
|channel| {
let service = self.with_api_key(channel);
let client = SnapshotsClient::new(service);
let client = client.max_decoding_message_size(usize::MAX);
let mut client =
SnapshotsClient::new(service).max_decoding_message_size(usize::MAX);
if let Some(compression) = self.cfg.compression {
client = client
.send_compressed(compression.into())
.accept_compressed(compression.into());
}
f(client)
},
false,
Expand All @@ -402,16 +433,21 @@ impl QdrantClient {
}

// Access to raw collection API
pub async fn with_collections_client<T, O: Future<Output = Result<T, Status>>>(
pub async fn with_collections_client<T, O: Future<Output=Result<T, Status>>>(
&self,
f: impl Fn(CollectionsClient<InterceptedService<Channel, TokenInterceptor>>) -> O,
) -> Result<T, Status> {
self.channel
.with_channel(
|channel| {
let service = self.with_api_key(channel);
let client = CollectionsClient::new(service);
let client = client.max_decoding_message_size(usize::MAX);
let mut client =
CollectionsClient::new(service).max_decoding_message_size(usize::MAX);
if let Some(compression) = self.cfg.compression {
client = client
.send_compressed(compression.into())
.accept_compressed(compression.into());
}
f(client)
},
false,
Expand All @@ -420,16 +456,21 @@ impl QdrantClient {
}

// Access to raw points API
pub async fn with_points_client<T, O: Future<Output = Result<T, Status>>>(
pub async fn with_points_client<T, O: Future<Output=Result<T, Status>>>(
&self,
f: impl Fn(PointsClient<InterceptedService<Channel, TokenInterceptor>>) -> O,
) -> Result<T, Status> {
self.channel
.with_channel(
|channel| {
let service = self.with_api_key(channel);
let client = PointsClient::new(service);
let client = client.max_decoding_message_size(usize::MAX);
let mut client =
PointsClient::new(service).max_decoding_message_size(usize::MAX);
if let Some(compression) = self.cfg.compression {
client = client
.send_compressed(compression.into())
.accept_compressed(compression.into());
}
f(client)
},
true,
Expand All @@ -438,16 +479,21 @@ impl QdrantClient {
}

// Access to raw root qdrant API
pub async fn with_root_qdrant_client<T, O: Future<Output = Result<T, Status>>>(
pub async fn with_root_qdrant_client<T, O: Future<Output=Result<T, Status>>>(
&self,
f: impl Fn(qdrant_client::QdrantClient<InterceptedService<Channel, TokenInterceptor>>) -> O,
) -> Result<T, Status> {
self.channel
.with_channel(
|channel| {
let service = self.with_api_key(channel);
let client = qdrant_client::QdrantClient::new(service);
let client = client.max_decoding_message_size(usize::MAX);
let mut client = qdrant_client::QdrantClient::new(service)
.max_decoding_message_size(usize::MAX);
if let Some(compression) = self.cfg.compression {
client = client
.send_compressed(compression.into())
.accept_compressed(compression.into());
}
f(client)
},
true,
Expand Down Expand Up @@ -856,7 +902,7 @@ impl QdrantClient {
false,
ordering,
)
.await
.await
}

/// Update or insert points into the collection, wait for completion.
Expand Down Expand Up @@ -924,7 +970,7 @@ impl QdrantClient {
ordering,
chunk_size,
)
.await
.await
}

/// Update or insert points into the collection, splitting in chunks and
Expand All @@ -946,7 +992,7 @@ impl QdrantClient {
ordering,
chunk_size,
)
.await
.await
}

#[inline]
Expand Down Expand Up @@ -1012,7 +1058,7 @@ impl QdrantClient {
false,
ordering,
)
.await
.await
}

pub async fn set_payload_blocking(
Expand All @@ -1033,7 +1079,7 @@ impl QdrantClient {
true,
ordering,
)
.await
.await
}

#[inline]
Expand Down Expand Up @@ -1091,7 +1137,7 @@ impl QdrantClient {
false,
ordering,
)
.await
.await
}

pub async fn overwrite_payload_blocking(
Expand All @@ -1112,7 +1158,7 @@ impl QdrantClient {
true,
ordering,
)
.await
.await
}

#[inline]
Expand Down Expand Up @@ -1168,7 +1214,7 @@ impl QdrantClient {
false,
ordering,
)
.await
.await
}

pub async fn delete_payload_blocking(
Expand All @@ -1187,7 +1233,7 @@ impl QdrantClient {
true,
ordering,
)
.await
.await
}

#[inline]
Expand Down Expand Up @@ -1237,7 +1283,7 @@ impl QdrantClient {
false,
ordering,
)
.await
.await
}

pub async fn clear_payload_blocking(
Expand All @@ -1254,7 +1300,7 @@ impl QdrantClient {
true,
ordering,
)
.await
.await
}

#[inline]
Expand Down Expand Up @@ -1426,7 +1472,7 @@ impl QdrantClient {
vector_selector,
ordering,
)
.await
.await
}

pub async fn delete_vectors_blocking(
Expand All @@ -1445,7 +1491,7 @@ impl QdrantClient {
vector_selector,
ordering,
)
.await
.await
}

async fn _delete_vectors(
Expand Down Expand Up @@ -1704,7 +1750,7 @@ impl QdrantClient {
false,
ordering,
)
.await
.await
}

pub async fn create_field_index_blocking(
Expand All @@ -1723,7 +1769,7 @@ impl QdrantClient {
true,
ordering,
)
.await
.await
}

pub async fn _delete_field_index(
Expand Down Expand Up @@ -1878,8 +1924,8 @@ impl QdrantClient {
snapshot_name: Option<T>,
rest_api_uri: Option<T>,
) -> Result<()>
where
T: ToString + Clone,
where
T: ToString + Clone,
{
use futures_util::StreamExt;
use std::io::Write;
Expand Down Expand Up @@ -1908,8 +1954,8 @@ impl QdrantClient {
collection_name.to_string(),
snapshot_name
))
.await?
.bytes_stream();
.await?
.bytes_stream();

let out_path = out_path.into();
let _ = std::fs::remove_file(&out_path);
Expand Down Expand Up @@ -2038,8 +2084,8 @@ impl From<Payload> for Value {
}

impl<T> From<Vec<T>> for Value
where
T: Into<Value>,
where
T: Into<Value>,
{
fn from(val: Vec<T>) -> Self {
Self {
Expand All @@ -2051,8 +2097,8 @@ where
}

impl<T> From<Vec<(&str, T)>> for Value
where
T: Into<Value>,
where
T: Into<Value>,
{
fn from(val: Vec<(&str, T)>) -> Self {
Self {
Expand Down

0 comments on commit 9db4279

Please sign in to comment.