Skip to content

Commit

Permalink
Reliable subscriptions by numeric ids (#115)
Browse files Browse the repository at this point in the history
* Allow definition of custom gRPC ChannelArguments for databroker communication

* SubscribeById seems working

* Add env var SDV_SUBSCRIBE_BUFFER_SIZE

* Add reliable messaging

* Reliable subscribeById is working

* Fix Markus's review findings

* Remove assert
  • Loading branch information
BjoernAtBosch authored Jan 16, 2025
1 parent 1577878 commit 8aeaed8
Show file tree
Hide file tree
Showing 21 changed files with 719 additions and 219 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,35 @@ SampleApp::SampleApp()
"username", "password")) {}
```
### Optimizing the gRPC communication channel settings
For possible optimizations of the communication with the KUKSA Databroker you can define setting for
the used gRPC channel via so-called ChannelArguments as defined here: https://grpc.github.io/grpc/core/channel__arg__names_8h.html.
You have to define those settings in a JSON based file and pass its filepath via environment varaiable
`SDV_VDB_CHANNEL_CONFIG_PATH` to the Velocitas SDK based application. The JSON has this structure:
```
{
"channelArguments": {
"<name of channel arg>": <value - either string or integer>
}
}
```
The possible names of channel arguments are as they are defined in the above linked page, for example:
```
{
"channelArguments": {
"grpc.http2.lookahead_bytes": 65536,
"grpc.default_authority": "Some authority defining string"
}
}
```
The buffer size for subscribe requests to the databroker can be set via environment variable `SDV_SUBSCRIBE_BUFFER_SIZE` (default is 0).
## Documentation
* [Velocitas Development Model](https://eclipse.dev/velocitas/docs/concepts/development_model/)
* [Vehicle App SDK Overview](https://eclipse.dev/velocitas/docs/concepts/development_model/vehicle_app_sdk/)
Expand Down
11 changes: 11 additions & 0 deletions sdk/include/sdk/AsyncResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class ScopedBoolInverter {
*/
struct VoidResult {};

enum class CallState { ONGOING, CANCELING, COMPLETED, FAILED };

/**
* @brief Single result of an asynchronous operation which provides
* an item of type TResultType.
Expand All @@ -80,6 +82,7 @@ template <typename TResultType> class AsyncResult {
* @param result Result to insert.
*/
void insertResult(TResultType&& result) {
m_callState = CallState::COMPLETED;
if (m_callback != nullptr) {
m_callback(result);
} else {
Expand All @@ -94,6 +97,7 @@ template <typename TResultType> class AsyncResult {
* @param error Status containing error information.
*/
void insertError(Status&& error) {
m_callState = CallState::FAILED;
if (m_errorCallback != nullptr) {
m_errorCallback(error);
} else {
Expand Down Expand Up @@ -140,6 +144,9 @@ template <typename TResultType> class AsyncResult {
"Invalid usage: Either call await() or register an onResult callback!");
}
m_callback = callback;
if (m_callState == CallState::COMPLETED) {
m_callback(m_result);
}
return this;
}

Expand All @@ -152,6 +159,9 @@ template <typename TResultType> class AsyncResult {
*/
AsyncResult* onError(ErrorCallback_t callback) {
m_errorCallback = callback;
if (m_callState == CallState::FAILED) {
m_errorCallback(m_status);
}
return this;
}

Expand Down Expand Up @@ -192,6 +202,7 @@ template <typename TResultType> class AsyncResult {
}

private:
CallState m_callState{CallState::ONGOING};
TResultType m_result;
ResultCallback_t m_callback;
ErrorCallback_t m_errorCallback;
Expand Down
25 changes: 23 additions & 2 deletions sdk/include/sdk/Job.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
#define VEHICLE_APP_SDK_JOB_H

#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
#include <mutex>

namespace velocitas {

using Clock = std::chrono::steady_clock;
using Timepoint = std::chrono::time_point<Clock>;

/**
* @brief Interface for jobs which can be executed by a worker in the thread pool.
*/
Expand All @@ -32,6 +36,10 @@ class IJob {
IJob() = default;
virtual ~IJob() = default;

[[nodiscard]] virtual bool isDue() const { return true; }

[[nodiscard]] virtual Timepoint getTimepointToExecute() const { return {}; }

/**
* @brief Execute the job.
*/
Expand All @@ -58,19 +66,32 @@ using JobPtr_t = std::shared_ptr<IJob>;
*/
class Job : public IJob {
public:
static JobPtr_t create(std::function<void()> fun) { return std::make_shared<Job>(fun); }
static JobPtr_t create(std::function<void()> fun,
std::chrono::milliseconds delay = std::chrono::milliseconds::zero()) {
return std::make_shared<Job>(fun, delay);
}

explicit Job(std::function<void()> fun);
explicit Job(std::function<void()> fun,
std::chrono::milliseconds delay = std::chrono::milliseconds::zero());

bool isDue() const override {
return (m_timepointToExecute == Timepoint()) || (m_timepointToExecute <= Clock::now());
}

Timepoint getTimepointToExecute() const override { return m_timepointToExecute; }

void execute() override;

void waitForTermination() const;

private:
std::function<void()> m_fun;
Timepoint m_timepointToExecute;
mutable std::mutex m_terminationMutex;
};

bool lowerJobPriority(const JobPtr_t& left, const JobPtr_t& right);

/**
* @brief A recurring job which can be cancelled manually.
*/
Expand Down
18 changes: 12 additions & 6 deletions sdk/include/sdk/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <atomic>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <queue>
Expand Down Expand Up @@ -62,13 +63,18 @@ class ThreadPool final {
ThreadPool& operator=(ThreadPool&&) = delete;

private:
void threadLoop();
JobPtr_t getNextExecutableJob();
void waitForPotentiallyExecutableJob() const;
void threadLoop();

std::mutex m_queueMutex;
std::condition_variable m_cv;
std::queue<JobPtr_t> m_jobs;
std::vector<std::thread> m_workerThreads;
std::atomic_bool m_isRunning{true};
using QueueType =
std::priority_queue<JobPtr_t, std::deque<JobPtr_t>, decltype(lowerJobPriority)*>;

mutable std::mutex m_queueMutex;
mutable std::condition_variable m_cv;
QueueType m_jobs;
std::vector<std::thread> m_workerThreads;
std::atomic_bool m_isRunning{true};
};

} // namespace velocitas
Expand Down
97 changes: 45 additions & 52 deletions sdk/include/sdk/grpc/GrpcCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,83 +37,76 @@ class GrpcCall {
};

/**
* @brief A GRPC call where a request is followed up by a single reply.
* @brief A GRPC call where a request is followed up by a single response.
*
* @tparam TRequestType The data type of the request.
* @tparam TReplyType The data type of the reply.
* @tparam TRequestType The data type of the request.
* @tparam TResponseType The data type of the (success) response.
*/
template <class TRequestType, class TReplyType> class GrpcSingleResponseCall : public GrpcCall {
template <class TRequestType, class TResponseType> class GrpcSingleResponseCall : public GrpcCall {
public:
TRequestType m_request;
TReplyType m_reply;
GrpcSingleResponseCall() = default;
explicit GrpcSingleResponseCall(TRequestType request)
: m_request(std::move(request)) {}
TRequestType m_request;
TResponseType m_response;
};

/**
* @brief A GRPC call where a request is followed up by multiple streamed replies.
* @brief A GRPC call where a request is followed up by multiple streamed responses.
*
* @tparam TRequestType The data type of the request.
* @tparam TReplyType The data type of the reply.
* @tparam TRequestType The data type of the request.
* @tparam TResponseType The data type of a single response.
*/
template <class TRequestType, class TReplyType> class GrpcStreamingResponseCall : public GrpcCall {
private:
class ReadReactor : public grpc::ClientReadReactor<TReplyType> {
public:
explicit ReadReactor(GrpcStreamingResponseCall& parent) // NOLINT
: m_parent(parent) {}

void OnReadDone(bool isOk) override {
if (isOk) {
try {
m_onDataHandler(m_reply);
} catch (std::exception& e) {
velocitas::logger().error(
"GRPC: Exception occurred during \"GetDatapoints\": {}", e.what());
}
this->StartRead(&m_reply);
}
}

void OnDone(const grpc::Status& status) override {
m_onFinishHandler(status);
m_parent.m_isComplete = true;
}

private:
TReplyType m_reply;
std::function<void(const TReplyType&)> m_onDataHandler;
std::function<void(const grpc::Status&)> m_onFinishHandler;
GrpcStreamingResponseCall& m_parent;

friend class GrpcStreamingResponseCall;
};

template <class TRequestType, class TResponseType>
class GrpcStreamingResponseCall : public GrpcCall, private grpc::ClientReadReactor<TResponseType> {
public:
GrpcStreamingResponseCall()
: m_readReactor(std::make_shared<ReadReactor>(*this)) {}
GrpcStreamingResponseCall() = default;
explicit GrpcStreamingResponseCall(TRequestType request)
: m_request(std::move(request)) {}

GrpcStreamingResponseCall& startCall() {
m_readReactor->StartCall();
m_readReactor->StartRead(&m_readReactor->m_reply);
this->StartRead(&m_response);
this->StartCall();
return *this;
}

GrpcStreamingResponseCall& onData(std::function<void(const TReplyType&)> handler) {
m_readReactor->m_onDataHandler = handler;
GrpcStreamingResponseCall& onData(std::function<void(const TResponseType&)> handler) {
m_onResponseHandler = handler;
return *this;
}

GrpcStreamingResponseCall& onFinish(std::function<void(const grpc::Status&)> handler) {
m_readReactor->m_onFinishHandler = handler;
m_onFinishHandler = handler;
return *this;
}

TRequestType& getRequest() { return m_request; }

ReadReactor& getReactor() { return *(m_readReactor.get()); }
grpc::ClientReadReactor<TResponseType>& getReactor() { return *this; }

private:
TRequestType m_request;
std::shared_ptr<ReadReactor> m_readReactor;
void OnReadDone(bool isOk) override {
if (isOk) {
try {
m_onResponseHandler(m_response);
} catch (const std::exception& e) {
velocitas::logger().error(
"GrpcCall: Exception occurred during response handler notification: {}",
e.what());
}
this->StartRead(&m_response);
}
}

void OnDone(const grpc::Status& status) override {
m_onFinishHandler(status);
m_isComplete = true;
}

TRequestType m_request;
TResponseType m_response;
std::function<void(const TResponseType&)> m_onResponseHandler;
std::function<void(const grpc::Status&)> m_onFinishHandler;
};

} // namespace velocitas
Expand Down
1 change: 1 addition & 0 deletions sdk/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ add_library(${TARGET_NAME}
sdk/pubsub/MqttPubSubClient.cpp
sdk/vdb/DataPointBatch.cpp
sdk/vdb/IVehicleDataBrokerClient.cpp
sdk/vdb/grpc/common/ChannelConfiguration.cpp
sdk/vdb/grpc/common/TypeConversions.cpp
sdk/vdb/grpc/kuksa_val_v2/BrokerAsyncGrpcFacade.cpp
sdk/vdb/grpc/kuksa_val_v2/BrokerClient.cpp
Expand Down
12 changes: 10 additions & 2 deletions sdk/src/sdk/Job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@

namespace velocitas {

Job::Job(std::function<void()> fun)
: m_fun(std::move(fun)) {}
bool lowerJobPriority(const JobPtr_t& left, const JobPtr_t& right) {
return left->getTimepointToExecute() > right->getTimepointToExecute();
}

Job::Job(std::function<void()> fun, std::chrono::milliseconds delay)
: m_fun(std::move(fun)) {
if (delay > std::chrono::milliseconds::zero()) {
m_timepointToExecute = Clock::now() + delay;
}
}

void Job::waitForTermination() const { std::lock_guard lock(m_terminationMutex); }

Expand Down
Loading

0 comments on commit 8aeaed8

Please sign in to comment.