Skip to content

Commit

Permalink
Add GRPC endpoint to get segments
Browse files Browse the repository at this point in the history
  • Loading branch information
khoa165 committed Jan 20, 2025
1 parent 863539e commit 2d10ff4
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 3 deletions.
4 changes: 4 additions & 0 deletions rs/index/src/collection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ impl Collection {

current_version
}

pub fn get_all_segment_names(&self) -> Vec<String> {
self.all_segments.iter().map(|pair| pair.key().clone()).collect()
}
}

// Test
Expand Down
36 changes: 33 additions & 3 deletions rs/index_server/src/index_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use index::utils::SearchContext;
use log::{debug, info};
use proto::muopdb::index_server_server::IndexServer;
use proto::muopdb::{
CreateCollectionRequest, CreateCollectionResponse, FlushRequest, FlushResponse,
InsertPackedRequest, InsertPackedResponse, InsertRequest, InsertResponse, SearchRequest,
SearchResponse,
CreateCollectionRequest, CreateCollectionResponse, FlushRequest, FlushResponse, InsertPackedRequest, InsertPackedResponse, InsertRequest, InsertResponse, SearchRequest, SearchResponse, GetSegmentsRequest, GetSegmentsResponse,
};
use tokio::sync::Mutex;
use utils::mem::transmute_u8_to_slice;
Expand Down Expand Up @@ -337,4 +335,36 @@ impl IndexServer for IndexServerImpl {
)),
}
}

async fn get_segments(
&self,
request: tonic::Request<GetSegmentsRequest>,
) -> Result<tonic::Response<GetSegmentsResponse>, tonic::Status> {
let start = std::time::Instant::now();
let req = request.into_inner();
let collection_name = req.collection_name;

let collection_opt = self
.collection_catalog
.lock()
.await
.get_collection(&collection_name)
.await;

match collection_opt {
Some(collection) => {
let segments = collection.get_all_segment_names();
let end = std::time::Instant::now();
let duration = end.duration_since(start);
debug!("Get segments for collection {} in {:?}", collection_name, duration);
Ok(tonic::Response::new(GetSegmentsResponse {
segment_names: segments
}))
}
None => Err(tonic::Status::new(
tonic::Code::NotFound,
"Collection not found",
)),
}
}
}
10 changes: 10 additions & 0 deletions rs/proto/proto/muopdb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ service IndexServer {
rpc InsertPacked(InsertPackedRequest) returns (InsertPackedResponse) {}

rpc Flush(FlushRequest) returns (FlushResponse) {}

rpc GetSegments(GetSegmentsRequest) returns (GetSegmentsResponse) {}
}

message GetSegmentsRequest {
string collection_name = 1;
}

message GetSegmentsResponse {
repeated string segment_names = 1;
}

message CreateCollectionRequest {
Expand Down
61 changes: 61 additions & 0 deletions rs/proto/src/muopdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ pub struct GetResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetSegmentsRequest {
#[prost(string, tag = "1")]
pub collection_name: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetSegmentsResponse {
#[prost(string, repeated, tag = "1")]
pub segment_names: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateCollectionRequest {
#[prost(string, tag = "1")]
pub collection_name: ::prost::alloc::string::String,
Expand Down Expand Up @@ -428,6 +440,20 @@ pub mod index_server_client {
let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/Flush");
self.inner.unary(request.into_request(), path, codec).await
}
pub async fn get_segments(
&mut self,
request: impl tonic::IntoRequest<super::GetSegmentsRequest>,
) -> Result<tonic::Response<super::GetSegmentsResponse>, tonic::Status> {
self.inner.ready().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/GetSegments");
self.inner.unary(request.into_request(), path, codec).await
}
}
}
/// Generated server implementations.
Expand Down Expand Up @@ -588,6 +614,10 @@ pub mod index_server_server {
&self,
request: tonic::Request<super::FlushRequest>,
) -> Result<tonic::Response<super::FlushResponse>, tonic::Status>;
async fn get_segments(
&self,
request: tonic::Request<super::GetSegmentsRequest>,
) -> Result<tonic::Response<super::GetSegmentsResponse>, tonic::Status>;
}
#[derive(Debug)]
pub struct IndexServerServer<T: IndexServer> {
Expand Down Expand Up @@ -801,6 +831,37 @@ pub mod index_server_server {
};
Box::pin(fut)
}
"/muopdb.IndexServer/GetSegments" => {
#[allow(non_camel_case_types)]
struct GetSegmentsSvc<T: IndexServer>(pub Arc<T>);
impl<T: IndexServer> tonic::server::UnaryService<super::GetSegmentsRequest> for GetSegmentsSvc<T> {
type Response = super::GetSegmentsResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(
&mut self,
request: tonic::Request<super::GetSegmentsRequest>,
) -> Self::Future {
let inner = self.0.clone();
let fut = async move { (*inner).get_segments(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = GetSegmentsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
Expand Down

0 comments on commit 2d10ff4

Please sign in to comment.