Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add close method to producer #15

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ docker-compose.yml
Dockerfile
scratch.R
tmp*
^.*\.Rproj$
^\.Rproj\.user$
5 changes: 5 additions & 0 deletions .lintr
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
linters: with_defaults(
object_name_linter("snake_case"),
line_length_linter(100)
)
exclude: "# Exclude Linting"
17 changes: 17 additions & 0 deletions .vscode/c_cpp_properties.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"configurations": [
{
"name": "Linux",
"includePath": [
"${workspaceFolder}/**",
"/usr/local/lib/R/site-library/Rcpp/include",
"/usr/share/R/include"
],
"defines": [],
"compilerPath": "/usr/bin/gcc",
"cStandard": "c11",
"cppStandard": "c++17"
}
],
"version": 4
}
12 changes: 6 additions & 6 deletions R/Producer.R
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
#' Create Kafka Producer
#'
#'
#' @details This function creates an instance of a Kafka Producer based on a list with configuration options. The following methods are currently implemented: ...
#'
#'
#' @examples
#' \dontrun{
#' config <- list(
#' "bootstrap.servers" = "my-boostrap-server.com"
#' )
#'
#'
#' producer <- Producer(config)
#'
#'
#' producer$produce(topic = "test-topic", message = "Hello World!")
#'
#'
#' producer$flush(1000)
#' }
#' @param config list with configuration options accepted by librdkafka. See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for details.
Expand All @@ -22,4 +22,4 @@ Producer <- function(config) {
producer_rcpp_module <- Rcpp::Module("producer_module")

new(producer_rcpp_module$Producer, config)
}
}
17 changes: 17 additions & 0 deletions r-kafka-client.Rproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Version: 1.0

RestoreWorkspace: Default
SaveWorkspace: Default
AlwaysSaveHistory: Default

EnableCodeIndexing: Yes
UseSpacesForTab: Yes
NumSpacesForTab: 2
Encoding: UTF-8

RnwWeave: Sweave
LaTeX: pdfLaTeX

BuildType: Package
PackageUseDevtools: Yes
PackageInstallArgs: --no-multiarch --with-keep.source
161 changes: 145 additions & 16 deletions src/producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,52 @@

using namespace Rcpp;

class PersistentDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:
PersistentDeliveryReportCb(const bool verbose)
{
verbose_flag = verbose;
}

void dr_cb(RdKafka::Message &message)
{
switch (message.status())
{
case RdKafka::Message::MSG_STATUS_NOT_PERSISTED:
status_name = "NotPersisted";
++count_not_persisted;
break;
case RdKafka::Message::MSG_STATUS_POSSIBLY_PERSISTED:
status_name = "PossiblyPersisted";
++count_persisted;
break;
case RdKafka::Message::MSG_STATUS_PERSISTED:
status_name = "Persisted";
++count_persisted;
break;
default:
status_name = "Unknown";
++count_unknown;
break;
}
if (verbose_flag)
{
std::cout << "Message delivery for (" << message.len()
<< " bytes): " << status_name << ": " << message.errstr()
<< std::endl;
if (message.key())
std::cout << "Key: " << *(message.key()) << ";" << std::endl;
}
}

std::string status_name;
bool verbose_flag = TRUE;
int count_persisted = 0;
int count_not_persisted = 0;
int count_unknown = 0;
};

class Producer
{
public:
Expand All @@ -13,6 +59,8 @@ class Producer

conf = generate_kafka_config(conf_);

conf->set("dr_cb", &dr_cb, errstr);

producer = RdKafka::Producer::create(conf, errstr);

if (!producer)
Expand All @@ -25,22 +73,23 @@ class Producer
{
if (!producer)
{
Rcpp::stop("Kafka producer is not initialized.");
Rcpp::stop("Kafka producer is not running.");
}

RdKafka::ErrorCode err = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
/* Value */
const_cast<char *>(message.data()), message.size(),
/* Key */
NULL, 0,
/* Timestamp (defaults to now) */
0,
/* Message headers, if any */
NULL,
/* Per-message opaque value passed to
* delivery report */
NULL);
RdKafka::ErrorCode err = producer->produce(
topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
/* Value */
const_cast<char *>(message.data()), message.size(),
/* Key */
NULL, 0,
/* Timestamp (defaults to now) */
0,
/* Message headers, if any */
NULL,
/* Per-message opaque value passed to
* delivery report */
NULL);

if (err != RdKafka::ERR_NO_ERROR)
{
Expand All @@ -50,6 +99,11 @@ class Producer

void flush(const int timeout_ms)
{
if (!producer)
{
Rcpp::stop("Kafka producer is not running.");
}

RdKafka::ErrorCode err = producer->flush(timeout_ms);

if (err != RdKafka::ERR_NO_ERROR)
Expand All @@ -58,16 +112,91 @@ class Producer
}
}

void close()
{
// Following
// https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#producer
// and https://github.com/confluentinc/librdkafka/issues/1499
if (!producer)
{
Rcpp::stop("Kafka producer is not running.");
}

producer->flush(60 * 1000);
delete producer;
}

void fatal_error()
{
if (!producer)
{
Rcpp::stop("Kafka producer is not running.");
}

std::string errstr;
RdKafka::ErrorCode err = producer->fatal_error(errstr);
if (err != RdKafka::ERR_NO_ERROR)
{
Rcpp::stop("Fatal error: " + errstr);
}
}

int poll()
{
if (!producer)
{
Rcpp::stop("Kafka producer is not running.");
}

return producer->poll(0);
}

Rcpp::List delivered_messages()
{
poll();
return Rcpp::List::create(
Named("count_persisted") = dr_cb.count_persisted,
Named("count_unknown") = dr_cb.count_unknown,
Named("count_not_persisted") = dr_cb.count_not_persisted);
}

int status(const std::string topic)
{
if (!producer)
{
Rcpp::stop("Kafka producer is not running.");
}

dr_cb.status_name = "Unknown";
produce(topic, "");
producer->poll(60 * 1000);
producer->flush(5 * 1000);

if (dr_cb.status_name == "Persisted")
{
return 0;
}
else
{
return 1;
}
}

private:
PersistentDeliveryReportCb dr_cb = PersistentDeliveryReportCb(TRUE);
RdKafka::Conf *conf;
RdKafka::Producer *producer;
};


RCPP_MODULE(producer_module)
{
class_<Producer>("Producer")
.constructor<Rcpp::List>()
.method("produce", &Producer::produce)
.method("flush", &Producer::flush);
.method("flush", &Producer::flush)
.method("close", &Producer::close)
.method("fatal_error", &Producer::fatal_error)
.method("poll", &Producer::poll)
.method("status", &Producer::status)
.method("delivered_messages", &Producer::delivered_messages);
}
1 change: 0 additions & 1 deletion tests/testthat/test-producer.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ test_that("Teardown producer", {

expect_error(Producer(config_producer), NA)
})