Skip to content

Commit

Permalink
✨ Add client_cq class
Browse files Browse the repository at this point in the history
  • Loading branch information
Thalhammer committed Mar 23, 2024
1 parent e9723ab commit f8c2999
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 64 deletions.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ else()
include(cmake/Fetch_asyncpp.cmake)
endif()

add_library(asyncpp_grpc ${CMAKE_CURRENT_SOURCE_DIR}/src/grpc/server.cpp
add_library(asyncpp_grpc ${CMAKE_CURRENT_SOURCE_DIR}/src/grpc/client_cq.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/grpc/server.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/grpc/util.cpp)
target_link_libraries(
asyncpp_grpc PUBLIC asyncpp gRPC::grpc++ protobuf::libprotobuf
Expand All @@ -49,6 +50,7 @@ if(ASYNCPP_BUILD_TEST)
asyncpp_grpc-test
${PROTO_SRCS} ${CMAKE_CURRENT_SOURCE_DIR}/test/traits.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test/call.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test/client_cq.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test/task.cpp)
target_link_libraries(asyncpp_grpc-test PRIVATE asyncpp_grpc GTest::gtest
GTest::gtest_main)
Expand Down
80 changes: 52 additions & 28 deletions include/asyncpp/grpc/call.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <asyncpp/detail/std_import.h>
#include <asyncpp/grpc/calldata_interface.h>
#include <asyncpp/grpc/client_cq.h>
#include <asyncpp/grpc/traits.h>
#include <asyncpp/ptr_tag.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
Expand All @@ -26,20 +27,26 @@ namespace asyncpp::grpc {
unary_call& operator=(const unary_call&) = delete;
unary_call& operator=(unary_call&&) = delete;

auto operator()(const typename traits::request_type& request, typename traits::response_type& response, ::grpc::CompletionQueue* cq) {
auto operator()(const typename traits::request_type& request, typename traits::response_type& response, ::grpc::CompletionQueue* cq = nullptr) {
struct awaiter : calldata_interface {
::grpc::ClientContext* m_context;
typename traits::service_type* m_stub;
const typename traits::request_type& m_request;
typename traits::response_type& m_response;
std::shared_ptr<client_cq> m_default_client_cq{};
::grpc::CompletionQueue* m_cq{};
typename traits::stream_type m_reader{};
::grpc::Status m_status{};
coroutine_handle<> m_handle{};

awaiter(::grpc::ClientContext* context, decltype(m_stub) s, const typename traits::request_type& request,
typename traits::response_type& response, ::grpc::CompletionQueue* cq)
: m_context{context}, m_stub{s}, m_request{request}, m_response{response}, m_cq{cq} {}
: m_context{context}, m_stub{s}, m_request{request}, m_response{response}, m_cq{cq} {
if (m_cq == nullptr) {
m_default_client_cq = client_cq::get_default();
m_cq = &m_default_client_cq->cq();
}
}

void handle_event([[maybe_unused]] size_t evt, bool ok) noexcept override {
assert(evt == 0);
Expand All @@ -53,7 +60,7 @@ namespace asyncpp::grpc {
m_handle = h;

m_reader = (m_stub->*FN)(m_context, m_request, m_cq);
m_reader->Finish(&m_response, &m_status, this);
m_reader->Finish(&m_response, &m_status, ptr_tag<0, calldata_interface>(this));
}
::grpc::Status await_resume() noexcept { return m_status; }
};
Expand All @@ -77,28 +84,30 @@ namespace asyncpp::grpc {
typename traits::service_type* stub;
typename traits::stream_type stream{};
bool m_need_writes_done{false};
std::shared_ptr<client_cq> default_client_cq{};
::grpc::CompletionQueue* cq{};

void destruct() {
if (stream) {
context->TryCancel();
if constexpr (traits::is_client_streaming) {
if (m_need_writes_done) {
stream->WritesDone(ptr_tag<0>(this));
stream->WritesDone(ptr_tag<0, calldata_interface>(this));
} else {
stream->Finish(&m_exit_status, ptr_tag<1>(this));
stream->Finish(&m_exit_status, ptr_tag<1, calldata_interface>(this));
}
} else
stream->Finish(&m_exit_status, ptr_tag<1>(this));
stream->Finish(&m_exit_status, ptr_tag<1, calldata_interface>(this));
} else {
delete this;
}
}

private:
::grpc::Status m_exit_status{};
void handle_event(size_t evt, bool ok) noexcept override {
void handle_event(size_t evt, [[maybe_unused]] bool ok) noexcept override {
if (evt == 0) {
return stream->Finish(&m_exit_status, ptr_tag<1>(this));
return stream->Finish(&m_exit_status, ptr_tag<1, calldata_interface>(this));
} else {
stream.reset();
context.reset();
Expand Down Expand Up @@ -131,17 +140,22 @@ namespace asyncpp::grpc {
::grpc::ClientContext& context() noexcept { return *m_state->context; }
const ::grpc::ClientContext& context() const noexcept { return *m_state->context; }

auto start(const typename traits::request_type& req, ::grpc::CompletionQueue* cq)
auto start(const typename traits::request_type& req, ::grpc::CompletionQueue* cq = nullptr)
requires(!traits::is_client_streaming && traits::is_server_streaming)
{
struct awaiter : calldata_interface {
state* m_state;
const typename traits::request_type& m_request{};
::grpc::CompletionQueue* m_cq{};
coroutine_handle<> m_handle{};
bool m_was_ok = false;

awaiter(state* state, const typename traits::request_type& req, ::grpc::CompletionQueue* cq) : m_state{state}, m_request{req}, m_cq{cq} {}
awaiter(state* state, const typename traits::request_type& req, ::grpc::CompletionQueue* cq) : m_state{state}, m_request{req} {
if (cq == nullptr) {
m_state->default_client_cq = client_cq::get_default();
cq = &m_state->default_client_cq->cq();
}
m_state->cq = cq;
}

void handle_event([[maybe_unused]] size_t evt, bool ok) noexcept override {
assert(evt == 0);
Expand All @@ -154,24 +168,29 @@ namespace asyncpp::grpc {
void await_suspend(coroutine_handle<> h) noexcept {
assert(m_state);
m_handle = h;
m_state->stream = (m_state->stub->*FN)(m_state->context.get(), m_request, m_cq, this);
m_state->stream = (m_state->stub->*FN)(m_state->context.get(), m_request, m_state->cq, ptr_tag<0, calldata_interface>(this));
}
bool await_resume() const noexcept { return m_was_ok; }
};
return awaiter{m_state.get(), req, cq};
}

auto start(typename traits::response_type& resp, ::grpc::CompletionQueue* cq)
auto start(typename traits::response_type& resp, ::grpc::CompletionQueue* cq = nullptr)
requires(traits::is_client_streaming && !traits::is_server_streaming)
{
struct awaiter : calldata_interface {
state* m_state;
typename traits::response_type& m_response;
::grpc::CompletionQueue* m_cq{};
coroutine_handle<> m_handle{};
bool m_was_ok = false;

awaiter(state* state, typename traits::response_type& resp, ::grpc::CompletionQueue* cq) : m_state{state}, m_response{resp}, m_cq{cq} {}
awaiter(state* state, typename traits::response_type& resp, ::grpc::CompletionQueue* cq) : m_state{state}, m_response{resp} {
if (cq == nullptr) {
m_state->default_client_cq = client_cq::get_default();
cq = &m_state->default_client_cq->cq();
}
m_state->cq = cq;
}

void handle_event([[maybe_unused]] size_t evt, bool ok) noexcept override {
assert(evt == 0);
Expand All @@ -185,23 +204,28 @@ namespace asyncpp::grpc {
void await_suspend(coroutine_handle<> h) noexcept {
assert(m_state);
m_handle = h;
m_state->stream = (m_state->stub->*FN)(m_state->context.get(), &m_response, m_cq, this);
m_state->stream = (m_state->stub->*FN)(m_state->context.get(), &m_response, m_state->cq, ptr_tag<0, calldata_interface>(this));
}
bool await_resume() noexcept { return m_was_ok; }
};
return awaiter{m_state.get(), resp, cq};
}

auto start(::grpc::CompletionQueue* cq)
auto start(::grpc::CompletionQueue* cq = nullptr)
requires(traits::is_client_streaming && traits::is_server_streaming)
{
struct awaiter : calldata_interface {
state* m_state;
::grpc::CompletionQueue* m_cq{};
coroutine_handle<> m_handle{};
bool m_was_ok = false;

awaiter(state* state, ::grpc::CompletionQueue* cq) : m_state{state}, m_cq{cq} {}
awaiter(state* state, ::grpc::CompletionQueue* cq) : m_state{state} {
if (cq == nullptr) {
m_state->default_client_cq = client_cq::get_default();
cq = &m_state->default_client_cq->cq();
}
m_state->cq = cq;
}

void handle_event([[maybe_unused]] size_t evt, bool ok) noexcept override {
assert(evt == 0);
Expand All @@ -215,7 +239,7 @@ namespace asyncpp::grpc {
void await_suspend(coroutine_handle<> h) noexcept {
assert(m_state);
m_handle = h;
m_state->stream = (m_state->stub->*FN)(m_state->context.get(), m_cq, this);
m_state->stream = (m_state->stub->*FN)(m_state->context.get(), m_state->cq, ptr_tag<0, calldata_interface>(this));
}
bool await_resume() noexcept { return m_was_ok; }
};
Expand Down Expand Up @@ -243,7 +267,7 @@ namespace asyncpp::grpc {
void await_suspend(coroutine_handle<> h) noexcept {
assert(m_state);
m_handle = h;
m_state->stream->Read(m_resp, this);
m_state->stream->Read(m_resp, ptr_tag<0, calldata_interface>(this));
}
bool await_resume() noexcept { return m_was_ok; }
};
Expand Down Expand Up @@ -272,7 +296,7 @@ namespace asyncpp::grpc {
void await_suspend(coroutine_handle<> h) noexcept {
assert(m_state);
m_handle = h;
m_state->stream->Write(m_msg, this);
m_state->stream->Write(m_msg, ptr_tag<0, calldata_interface>(this));
}
bool await_resume() noexcept { return m_was_ok; }
};
Expand Down Expand Up @@ -302,7 +326,7 @@ namespace asyncpp::grpc {
void await_suspend(coroutine_handle<> h) noexcept {
assert(m_state);
m_handle = h;
m_state->stream->WriteLast(m_msg, ::grpc::WriteOptions{}, this);
m_state->stream->WriteLast(m_msg, ::grpc::WriteOptions{}, ptr_tag<0, calldata_interface>(this));
}
bool await_resume() noexcept { return m_was_ok; }
};
Expand Down Expand Up @@ -331,7 +355,7 @@ namespace asyncpp::grpc {
void await_suspend(coroutine_handle<> h) noexcept {
assert(m_state);
m_handle = h;
m_state->stream->WritesDone(this);
m_state->stream->WritesDone(ptr_tag<0, calldata_interface>(this));
}
bool await_resume() noexcept { return m_was_ok; }
};
Expand All @@ -350,7 +374,7 @@ namespace asyncpp::grpc {
assert(m_handle);
if (!ok) m_status = ::grpc::Status(::grpc::StatusCode::UNKNOWN, "Event returned ok=false");
switch (evt) {
case 0: return m_state->stream->Finish(&m_status, asyncpp::ptr_tag<1>(this)); break;
case 0: return m_state->stream->Finish(&m_status, asyncpp::ptr_tag<1, calldata_interface>(this)); break;
case 1: m_state->stream.reset(); break;
default: assert(false); break;
}
Expand All @@ -362,12 +386,12 @@ namespace asyncpp::grpc {
m_handle = h;
if constexpr (traits::is_client_streaming) {
if (m_state->m_need_writes_done) {
m_state->stream->WritesDone(ptr_tag<0>(this));
m_state->stream->WritesDone(ptr_tag<0, calldata_interface>(this));
} else {
m_state->stream->Finish(&m_status, ptr_tag<1>(this));
m_state->stream->Finish(&m_status, ptr_tag<1, calldata_interface>(this));
}
} else {
m_state->stream->Finish(&m_status, ptr_tag<1>(this));
m_state->stream->Finish(&m_status, ptr_tag<1, calldata_interface>(this));
}
}
::grpc::Status await_resume() noexcept { return m_status; }
Expand Down
36 changes: 36 additions & 0 deletions include/asyncpp/grpc/client_cq.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once
#include <asyncpp/dispatcher.h>
#include <asyncpp/grpc/calldata_interface.h>
#include <asyncpp/threadsafe_queue.h>
#include <grpcpp/alarm.h>
#include <grpcpp/completion_queue.h>

#include <memory>
#include <thread>

namespace asyncpp::grpc {
class client_cq : public dispatcher, private grpc::calldata_interface {
std::thread m_thread;
threadsafe_queue<std::function<void()>> m_dispatched;
std::atomic_flag m_alarm_set;
::grpc::Alarm m_alarm;
::grpc::CompletionQueue m_cq;

void handle_event(size_t evt, bool ok) noexcept override;

public:
client_cq();
client_cq(const client_cq&) = delete;
client_cq(client_cq&&) = delete;
client_cq& operator=(const client_cq&) = delete;
client_cq& operator=(client_cq&&) = delete;
~client_cq();

void push(std::function<void()> fn) override;

::grpc::CompletionQueue& cq() noexcept { return m_cq; }
const ::grpc::CompletionQueue& cq() const noexcept { return m_cq; }

static std::shared_ptr<client_cq> get_default();
};
} // namespace asyncpp::grpc
43 changes: 43 additions & 0 deletions src/grpc/client_cq.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#include <asyncpp/grpc/client_cq.h>
#include <asyncpp/ptr_tag.h>

namespace asyncpp::grpc {
void client_cq::handle_event(size_t, bool) noexcept {
m_alarm_set.clear();
auto task = m_dispatched.pop();
while (task) {
if (*task) (*task)();
task = m_dispatched.pop();
}
}

client_cq::client_cq() {
m_thread = std::thread([this]() {
#ifdef __linux__
pthread_setname_np(pthread_self(), "grpc_client_cq");
#endif
void* tag = nullptr;
bool ok = false;
while (this->m_cq.Next(&tag, &ok)) {
auto [i, t] = ptr_untag<calldata_interface>(tag);
if (!i) continue;
i->handle_event(t, ok);
}
});
}

client_cq::~client_cq() {
m_cq.Shutdown();
if (m_thread.joinable()) m_thread.join();
}

void client_cq::push(std::function<void()> fn) {
m_dispatched.emplace(std::move(fn));
if (!m_alarm_set.test_and_set()) { m_alarm.Set(&m_cq, gpr_time_0(GPR_CLOCK_REALTIME), ptr_tag<0, calldata_interface>(this)); }
}

std::shared_ptr<client_cq> client_cq::get_default() {
static auto instance = std::make_shared<client_cq>();
return instance;
}
} // namespace asyncpp::grpc
Loading

0 comments on commit f8c2999

Please sign in to comment.