Skip to content

Commit

Permalink
Generate service trait with comments (#3840)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Sep 18, 2023
1 parent 08ab948 commit 8c22505
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 29 deletions.
9 changes: 9 additions & 0 deletions quickwit/quickwit-codegen/example/src/codegen/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ pub type HelloStream<T> = quickwit_common::ServiceStream<crate::HelloResult<T>>;
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
#[async_trait::async_trait]
pub trait Hello: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static {
/// Says hello.
async fn hello(
&mut self,
request: HelloRequest,
) -> crate::HelloResult<HelloResponse>;
/// Says goodbye.
async fn goodbye(
&mut self,
request: GoodbyeRequest,
) -> crate::HelloResult<GoodbyeResponse>;
/// Ping pong.
async fn ping(
&mut self,
request: quickwit_common::ServiceStream<PingRequest>,
Expand Down Expand Up @@ -739,6 +742,7 @@ pub mod hello_grpc_client {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
/// Says hello.
pub async fn hello(
&mut self,
request: impl tonic::IntoRequest<super::HelloRequest>,
Expand All @@ -758,6 +762,7 @@ pub mod hello_grpc_client {
req.extensions_mut().insert(GrpcMethod::new("hello.Hello", "Hello"));
self.inner.unary(req, path, codec).await
}
/// Says goodbye.
pub async fn goodbye(
&mut self,
request: impl tonic::IntoRequest<super::GoodbyeRequest>,
Expand All @@ -780,6 +785,7 @@ pub mod hello_grpc_client {
req.extensions_mut().insert(GrpcMethod::new("hello.Hello", "Goodbye"));
self.inner.unary(req, path, codec).await
}
/// Ping pong.
pub async fn ping(
&mut self,
request: impl tonic::IntoStreamingRequest<Message = super::PingRequest>,
Expand Down Expand Up @@ -811,10 +817,12 @@ pub mod hello_grpc_server {
/// Generated trait containing gRPC methods that should be implemented for use with HelloGrpcServer.
#[async_trait]
pub trait HelloGrpc: Send + Sync + 'static {
/// Says hello.
async fn hello(
&self,
request: tonic::Request<super::HelloRequest>,
) -> std::result::Result<tonic::Response<super::HelloResponse>, tonic::Status>;
/// Says goodbye.
async fn goodbye(
&self,
request: tonic::Request<super::GoodbyeRequest>,
Expand All @@ -825,6 +833,7 @@ pub mod hello_grpc_server {
>
+ Send
+ 'static;
/// Ping pong.
async fn ping(
&self,
request: tonic::Request<tonic::Streaming<super::PingRequest>>,
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-codegen/example/src/hello.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ message PingResponse {
}

service Hello {
// Says hello.
rpc Hello(HelloRequest) returns (HelloResponse);

// Says goodbye.
rpc Goodbye(GoodbyeRequest) returns (GoodbyeResponse);

// Ping pong.
rpc Ping(stream PingRequest) returns (stream PingResponse);
}
22 changes: 20 additions & 2 deletions quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

use heck::ToSnakeCase;
use proc_macro2::TokenStream;
use prost_build::{Method, Service, ServiceGenerator};
use prost_build::{Comments, Method, Service, ServiceGenerator};
use quote::{quote, ToTokens};
use syn::Ident;
use syn::{parse_quote, Ident};

use crate::ProstConfig;

Expand Down Expand Up @@ -242,6 +242,7 @@ fn generate_all(service: &Service, result_type_path: &str, error_type_path: &str
struct SynMethod {
name: Ident,
proto_name: Ident,
comments: Vec<syn::Attribute>,
request_type: syn::Path,
response_type: syn::Path,
client_streaming: bool,
Expand Down Expand Up @@ -284,12 +285,14 @@ impl SynMethod {
for method in methods {
let name = quote::format_ident!("{}", method.name);
let proto_name = quote::format_ident!("{}", method.proto_name);
let comments = generate_comment_attributes(&method.comments);
let request_type = syn::parse_str::<syn::Path>(&method.input_type).unwrap();
let response_type = syn::parse_str::<syn::Path>(&method.output_type).unwrap();

let syn_method = SynMethod {
name,
proto_name,
comments,
request_type,
response_type,
client_streaming: method.client_streaming,
Expand All @@ -301,6 +304,19 @@ impl SynMethod {
}
}

fn generate_comment_attributes(comments: &Comments) -> Vec<syn::Attribute> {
let mut attributes = Vec::with_capacity(comments.leading.len());

for comment in &comments.leading {
let comment = syn::LitStr::new(comment, proc_macro2::Span::call_site());
let attribute: syn::Attribute = parse_quote! {
#[doc = #comment]
};
attributes.push(attribute);
}
attributes
}

fn generate_service_trait(context: &CodegenContext) -> TokenStream {
let service_name = &context.service_name;
let trait_methods = generate_service_trait_methods(context);
Expand Down Expand Up @@ -330,10 +346,12 @@ fn generate_service_trait_methods(context: &CodegenContext) -> TokenStream {
let mut stream = TokenStream::new();

for syn_method in &context.methods {
let comments = &syn_method.comments;
let method_name = syn_method.name.to_token_stream();
let request_type = syn_method.request_type(false);
let response_type = syn_method.response_type(context, false);
let method = quote! {
#(#comments)*
async fn #method_name(&mut self, request: #request_type) -> #result_type<#response_type>;
};
stream.extend(method);
Expand Down
22 changes: 22 additions & 0 deletions quickwit/quickwit-ingest/src/codegen/ingest_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,30 @@ use tower::{Layer, Service, ServiceExt};
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
#[async_trait::async_trait]
pub trait IngestService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static {
/// Ingests document in a given queue.
///
/// Upon any kind of error, the client should
/// - retry to get at least once delivery.
/// - not retry to get at most once delivery.
///
/// Exactly once delivery is not supported yet.
async fn ingest(&mut self, request: IngestRequest) -> crate::Result<IngestResponse>;
/// Fetches record from a given queue.
///
/// Records are returned in order.
///
/// The returned `FetchResponse` object is meant to be read with the
/// `crate::iter_records` function.
///
/// Fetching does not necessarily return all of the available records.
/// If returning all records would exceed `FETCH_PAYLOAD_LIMIT` (2MB),
/// the response will be partial.
async fn fetch(&mut self, request: FetchRequest) -> crate::Result<FetchResponse>;
/// Returns a batch containing the last records.
///
/// It returns the last documents, from the newest
/// to the oldest, and stops as soon as `FETCH_PAYLOAD_LIMIT` (2MB)
/// is exceeded.
async fn tail(&mut self, request: TailRequest) -> crate::Result<FetchResponse>;
}
dyn_clone::clone_trait_object!(IngestService);
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-proto/protos/quickwit/control_plane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ service ControlPlaneService {

// Shard API

/// Returns the list of open shards for one or several sources. If the control plane is not able to find any
/// for a source, it will pick a pair of leader-follower ingesters and will open a new shard.
// Returns the list of open shards for one or several sources. If the control plane is not able to find any
// for a source, it will pick a pair of leader-follower ingesters and will open a new shard.
rpc GetOpenShards(GetOpenShardsRequest) returns (GetOpenShardsResponse);

rpc CloseShards(CloseShardsRequest) returns (CloseShardsResponse);

/// Notify the Control Plane that a change on an index occurred. The change
/// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update.
// Notify the Control Plane that a change on an index occurred. The change
// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update.
// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification.
// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the
// type of change. The index ID and/or source ID could also be added.
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-proto/protos/quickwit/indexing.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ syntax = "proto3";
package quickwit.indexing;

service IndexingService {
/// Apply an indexing plan on the node.
// Apply an indexing plan on the node.
rpc ApplyIndexingPlan(ApplyIndexingPlanRequest) returns (ApplyIndexingPlanResponse);
}

Expand All @@ -31,11 +31,11 @@ message ApplyIndexingPlanRequest {
}

message IndexingTask {
/// The tasks's index UID.
// The tasks's index UID.
string index_uid = 1;
/// The task's source ID.
// The task's source ID.
string source_id = 2;
/// The shards assigned to the indexer.
// The shards assigned to the indexer.
repeated uint64 shard_ids = 3;
}

Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-proto/protos/quickwit/router.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ package quickwit.ingest.router;
import "quickwit/ingest.proto";

service IngestRouterService {
/// Ingests batches of documents for one or multiple indexes.
/// TODO: Describe error cases and how to handle them.
// Ingests batches of documents for one or multiple indexes.
// TODO: Describe error cases and how to handle them.
rpc Ingest(IngestRequestV2) returns (IngestResponseV2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,33 @@ use tower::{Layer, Service, ServiceExt};
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
#[async_trait::async_trait]
pub trait ControlPlaneService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static {
/// Creates a new index.
async fn create_index(
&mut self,
request: super::metastore::CreateIndexRequest,
) -> crate::control_plane::ControlPlaneResult<super::metastore::CreateIndexResponse>;
/// Deletes an index.
async fn delete_index(
&mut self,
request: super::metastore::DeleteIndexRequest,
) -> crate::control_plane::ControlPlaneResult<super::metastore::EmptyResponse>;
/// Adds a source to an index.
async fn add_source(
&mut self,
request: super::metastore::AddSourceRequest,
) -> crate::control_plane::ControlPlaneResult<super::metastore::EmptyResponse>;
/// Enables or disables a source.
async fn toggle_source(
&mut self,
request: super::metastore::ToggleSourceRequest,
) -> crate::control_plane::ControlPlaneResult<super::metastore::EmptyResponse>;
/// Removes a source from an index.
async fn delete_source(
&mut self,
request: super::metastore::DeleteSourceRequest,
) -> crate::control_plane::ControlPlaneResult<super::metastore::EmptyResponse>;
/// Returns the list of open shards for one or several sources. If the control plane is not able to find any
/// for a source, it will pick a pair of leader-follower ingesters and will open a new shard.
async fn get_open_shards(
&mut self,
request: GetOpenShardsRequest,
Expand All @@ -102,6 +109,12 @@ pub trait ControlPlaneService: std::fmt::Debug + dyn_clone::DynClone + Send + Sy
&mut self,
request: CloseShardsRequest,
) -> crate::control_plane::ControlPlaneResult<CloseShardsResponse>;
/// Notify the Control Plane that a change on an index occurred. The change
/// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update.
/// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification.
/// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the
/// type of change. The index ID and/or source ID could also be added.
/// However, these attributes will not be used by the Control Plane, at least at short term.
async fn notify_index_change(
&mut self,
request: NotifyIndexChangeRequest,
Expand Down Expand Up @@ -1581,8 +1594,8 @@ pub mod control_plane_service_grpc_client {
);
self.inner.unary(req, path, codec).await
}
/// / Returns the list of open shards for one or several sources. If the control plane is not able to find any
/// / for a source, it will pick a pair of leader-follower ingesters and will open a new shard.
/// Returns the list of open shards for one or several sources. If the control plane is not able to find any
/// for a source, it will pick a pair of leader-follower ingesters and will open a new shard.
pub async fn get_open_shards(
&mut self,
request: impl tonic::IntoRequest<super::GetOpenShardsRequest>,
Expand Down Expand Up @@ -1643,8 +1656,8 @@ pub mod control_plane_service_grpc_client {
);
self.inner.unary(req, path, codec).await
}
/// / Notify the Control Plane that a change on an index occurred. The change
/// / can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update.
/// Notify the Control Plane that a change on an index occurred. The change
/// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update.
/// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification.
/// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the
/// type of change. The index ID and/or source ID could also be added.
Expand Down Expand Up @@ -1728,8 +1741,8 @@ pub mod control_plane_service_grpc_server {
tonic::Response<super::super::metastore::EmptyResponse>,
tonic::Status,
>;
/// / Returns the list of open shards for one or several sources. If the control plane is not able to find any
/// / for a source, it will pick a pair of leader-follower ingesters and will open a new shard.
/// Returns the list of open shards for one or several sources. If the control plane is not able to find any
/// for a source, it will pick a pair of leader-follower ingesters and will open a new shard.
async fn get_open_shards(
&self,
request: tonic::Request<super::GetOpenShardsRequest>,
Expand All @@ -1744,8 +1757,8 @@ pub mod control_plane_service_grpc_server {
tonic::Response<super::CloseShardsResponse>,
tonic::Status,
>;
/// / Notify the Control Plane that a change on an index occurred. The change
/// / can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update.
/// Notify the Control Plane that a change on an index occurred. The change
/// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update.
/// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification.
/// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the
/// type of change. The index ID and/or source ID could also be added.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ pub struct ApplyIndexingPlanRequest {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct IndexingTask {
/// / The tasks's index UID.
/// The tasks's index UID.
#[prost(string, tag = "1")]
pub index_uid: ::prost::alloc::string::String,
/// / The task's source ID.
/// The task's source ID.
#[prost(string, tag = "2")]
pub source_id: ::prost::alloc::string::String,
/// / The shards assigned to the indexer.
/// The shards assigned to the indexer.
#[prost(uint64, repeated, tag = "3")]
pub shard_ids: ::prost::alloc::vec::Vec<u64>,
}
Expand All @@ -28,6 +28,7 @@ use tower::{Layer, Service, ServiceExt};
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
#[async_trait::async_trait]
pub trait IndexingService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static {
/// Apply an indexing plan on the node.
async fn apply_indexing_plan(
&mut self,
request: ApplyIndexingPlanRequest,
Expand Down Expand Up @@ -500,7 +501,7 @@ pub mod indexing_service_grpc_client {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
/// / Apply an indexing plan on the node.
/// Apply an indexing plan on the node.
pub async fn apply_indexing_plan(
&mut self,
request: impl tonic::IntoRequest<super::ApplyIndexingPlanRequest>,
Expand Down Expand Up @@ -540,7 +541,7 @@ pub mod indexing_service_grpc_server {
/// Generated trait containing gRPC methods that should be implemented for use with IndexingServiceGrpcServer.
#[async_trait]
pub trait IndexingServiceGrpc: Send + Sync + 'static {
/// / Apply an indexing plan on the node.
/// Apply an indexing plan on the node.
async fn apply_indexing_plan(
&self,
request: tonic::Request<super::ApplyIndexingPlanRequest>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,27 @@ pub type IngesterServiceStream<T> = quickwit_common::ServiceStream<
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
#[async_trait::async_trait]
pub trait IngesterService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static {
/// Persists batches of documents to primary shards owned by a leader.
async fn persist(
&mut self,
request: PersistRequest,
) -> crate::ingest::IngestV2Result<PersistResponse>;
/// Opens a replication stream from a leader to a follower.
async fn open_replication_stream(
&mut self,
request: quickwit_common::ServiceStream<SynReplicationMessage>,
) -> crate::ingest::IngestV2Result<IngesterServiceStream<AckReplicationMessage>>;
/// Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch.
async fn open_fetch_stream(
&mut self,
request: OpenFetchStreamRequest,
) -> crate::ingest::IngestV2Result<IngesterServiceStream<FetchResponseV2>>;
/// Pings an ingester to check if it is ready to host shards and serve requests.
async fn ping(
&mut self,
request: PingRequest,
) -> crate::ingest::IngestV2Result<PingResponse>;
/// Truncates the shards at the given positions. Indexers should call this RPC on leaders, which will replicate the request to followers.
async fn truncate(
&mut self,
request: TruncateRequest,
Expand Down
Loading

0 comments on commit 8c22505

Please sign in to comment.