Skip to content

Commit

Permalink
Add streaming reads and writes to tsgrpc.
Browse files Browse the repository at this point in the history
Fixes: #137

The kvstore.proto is not backwards compatible.

Obsoletes: #152
PiperOrigin-RevId: 712684335
Change-Id: I50ed31d694f724b3dd336f8f2fc60f3417eac7fd
  • Loading branch information
laramiel authored and copybara-github committed Jan 7, 2025
1 parent 5e0f3c1 commit 14d7ce1
Show file tree
Hide file tree
Showing 9 changed files with 615 additions and 252 deletions.
2 changes: 2 additions & 0 deletions tensorstore/kvstore/test_matchers.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef TENSORSTORE_KVSTORE_TEST_MATCHERS_H_
#define TENSORSTORE_KVSTORE_TEST_MATCHERS_H_

#include <stdint.h>

#include <string>
#include <type_traits>

Expand Down
2 changes: 1 addition & 1 deletion tensorstore/kvstore/tsgrpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ tensorstore_cc_library(
"//tensorstore/util:result",
"@com_github_grpc_grpc//:grpc++",
"@com_github_grpc_grpc//:grpc++_public_hdrs",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/status",
"@com_google_absl//absl/time",
],
Expand Down Expand Up @@ -215,6 +214,7 @@ tensorstore_cc_test(
"@com_google_absl//absl/strings:cord",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest_main",
],
)
Expand Down
45 changes: 35 additions & 10 deletions tensorstore/kvstore/tsgrpc/handler_template.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
#ifndef TENSORSTORE_KVSTORE_TSGRPC_HANDLER_TEMPLATE_H_
#define TENSORSTORE_KVSTORE_TSGRPC_HANDLER_TEMPLATE_H_

#include <assert.h>
#include <cassert>

#include <new>

#include "absl/log/absl_log.h"
#include "absl/status/status.h"
#include "grpcpp/grpcpp.h" // third_party
#include "grpcpp/server_context.h" // third_party
Expand All @@ -37,7 +34,7 @@ class HandlerBase

HandlerBase(::grpc::CallbackServerContext* grpc_context)
: grpc_context_(grpc_context) {
// This refcount should be adopoted by calling Adopt.
// This refcount should be adopted by calling Adopt.
intrusive_ptr_increment(this);
}

Expand Down Expand Up @@ -78,17 +75,18 @@ class Handler : public HandlerBase, public grpc::ServerUnaryReactor {
Response* response_;
};

// Handler base class for a stream request.
// Handler base class for an RPC with a streaming response.
template <typename RequestProto, typename ResponseProto>
class StreamHandler : public HandlerBase,
public grpc::ServerWriteReactor<ResponseProto> {
class StreamServerResponseHandler
: public HandlerBase,
public grpc::ServerWriteReactor<ResponseProto> {
public:
using Request = RequestProto;
using Response = ResponseProto;
using Reactor = typename grpc::ServerWriteReactor<ResponseProto>;

StreamHandler(::grpc::CallbackServerContext* grpc_context,
const Request* request)
StreamServerResponseHandler(::grpc::CallbackServerContext* grpc_context,
const Request* request)
: HandlerBase(grpc_context), request_(request) {}

using Reactor::Finish;
Expand All @@ -104,6 +102,33 @@ class StreamHandler : public HandlerBase,
const Request* request_;
};

// Handler base class for an RPC with a streaming request.
template <typename RequestProto, typename ResponseProto>
class StreamClientRequestHandler
: public HandlerBase,
public grpc::ServerReadReactor<RequestProto> {
public:
using Request = RequestProto;
using Response = ResponseProto;
using Reactor = typename grpc::ServerReadReactor<RequestProto>;

StreamClientRequestHandler(::grpc::CallbackServerContext* grpc_context,
Response* response)
: HandlerBase(grpc_context), response_(response) {}

using Reactor::Finish;
void Finish(absl::Status status) {
Finish(tensorstore::internal::AbslStatusToGrpcStatus(status));
}

Response* response() { return response_; }

protected:
void OnDone() final { auto adopted = Adopt(); }

Response* response_;
};

} // namespace tensorstore_grpc

#endif // TENSORSTORE_KVSTORE_TSGRPC_HANDLER_TEMPLATE_H_
30 changes: 25 additions & 5 deletions tensorstore/kvstore/tsgrpc/kvstore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import "tensorstore/kvstore/tsgrpc/common.proto";
// Proto-api for a remote key-value store
service KvStoreService {
/// Attempts to read the specified key.
rpc Read(ReadRequest) returns (ReadResponse);
rpc Read(ReadRequest) returns (stream ReadResponse);

/// Performs an optionally-conditional write.
rpc Write(WriteRequest) returns (WriteResponse);
rpc Write(stream WriteRequest) returns (WriteResponse);

/// Performs an optionally-conditional delete.
rpc Delete(DeleteRequest) returns (DeleteResponse);
Expand Down Expand Up @@ -73,6 +73,11 @@ message ReadRequest {
}

message ReadResponse {
// When multiple ReadResponse messages are received, all messages after the
// initial message are partial responses, and only the value_part field is
// meaningful. In such a case, the value is the catenation of all value_parts
// in order.

/// Optionally, a non-ok status message may be returned.
StatusMessage status = 1;

Expand All @@ -92,14 +97,22 @@ message ReadResponse {
/// Indicates a value is present.
VALUE = 2;
}

State state = 3;
bytes value = 4 [ctype = CORD];

// Partial value. Only meaningful when state is VALUE.
// The actaual value is the catenation of all value_part fields in order.
bytes value_part = 4 [ctype = CORD];
}

/// See tensorstore/kvstore/operations.h
/// kvstore::WriteOptions
message WriteRequest {
// When multiple WriteRequest messages are sent, all messages after the
// initial message are partial requests, and only the value_part field
// is meaningful. In such a case, the value is the catenation of all
// value_parts in order.

/// The key to write.
bytes key = 1;

/// The write is aborted if the existing generation associated with the
Expand All @@ -111,7 +124,10 @@ message WriteRequest {
/// - The special value of `StorageGeneration::NoValue()` specifies a
/// condition that the `key` does not have an existing value.
bytes generation_if_equal = 2;
bytes value = 3 [ctype = CORD];

// Partial value.
// The actaual value is the catenation of all value_part fields in order.
bytes value_part = 3 [ctype = CORD];
}

message WriteResponse {
Expand All @@ -125,8 +141,12 @@ message WriteResponse {
/// See tensorstore/kvstore/operations.h
/// kvstore::WriteOptions
message DeleteRequest {
// The key to delete.
bytes key = 1;
KeyRange range = 2;

/// The delete is aborted if the existing generation associated with the
/// stored `key` does not match `if_equal`.
bytes generation_if_equal = 3;
}

Expand Down
Loading

0 comments on commit 14d7ce1

Please sign in to comment.