Skip to content

Commit

Permalink
refine clang-format
Browse files Browse the repository at this point in the history
  • Loading branch information
martinzink committed Nov 26, 2024
1 parent 39690f9 commit 3fd27e0
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 157 deletions.
7 changes: 6 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ AllowShortLambdasOnASingleLine: All
AllowShortLoopsOnASingleLine: true
AlwaysBreakAfterReturnType: None
AlwaysBreakTemplateDeclarations: Yes
AllowAllArgumentsOnNextLine: false
BreakBeforeBraces: Attach
BreakBeforeBinaryOperators: None
BreakBeforeTernaryOperators: true
BreakConstructorInitializers: BeforeColon
BreakInheritanceList: BeforeColon
ColumnLimit: 125
ColumnLimit: 150
CompactNamespaces: false
ContinuationIndentWidth: 4
IndentCaseLabels: true
Expand Down Expand Up @@ -53,3 +54,7 @@ SpacesInSquareBrackets: false
Standard: c++20
TabWidth: 2
UseTab: Never
BinPackArguments: false # Prevents arguments from being packed onto the same line
PenaltyBreakAssignment: 200
PenaltyBreakOpenParenthesis: 200
PenaltyReturnTypeOnItsOwnLine: 200
68 changes: 25 additions & 43 deletions extensions/kafka/ConsumeKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,8 @@ void ConsumeKafka::onSchedule(core::ProcessContext& context, core::ProcessSessio
context.getProperty(DuplicateHeaderHandling, duplicate_header_handling_);

headers_to_add_as_attributes_ = utils::listFromCommaSeparatedProperty(context, HeadersToAddAsAttributes.name);
max_poll_records_ = gsl::narrow<std::size_t>(
context.getProperty<uint64_t>(MaxPollRecords)
.value_or(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE.parse(DEFAULT_MAX_POLL_RECORDS)));
max_poll_records_ = gsl::narrow<std::size_t>(context.getProperty<uint64_t>(MaxPollRecords)
.value_or(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE.parse(DEFAULT_MAX_POLL_RECORDS)));

if (!utils::string::equalsIgnoreCase(KEY_ATTR_ENCODING_UTF_8, key_attribute_encoding_) &&
!utils::string::equalsIgnoreCase(KEY_ATTR_ENCODING_HEX, key_attribute_encoding_)) {
Expand All @@ -82,8 +81,7 @@ void ConsumeKafka::onSchedule(core::ProcessContext& context, core::ProcessSessio
}

namespace {
void rebalance_cb(
rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) {
void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) {
// Cooperative, incremental assignment is not supported in the current librdkafka version
std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ConsumeKafka>::getLogger()};
logger->log_debug("Rebalance triggered.");
Expand Down Expand Up @@ -133,8 +131,7 @@ void ConsumeKafka::create_topic_partition_list() {
// This might happen until the cross-overship between processors and connections are settled
rd_kafka_resp_err_t subscribe_response = rd_kafka_subscribe(consumer_.get(), kf_topic_partition_list_.get());
if (RD_KAFKA_RESP_ERR_NO_ERROR != subscribe_response) {
logger_->log_error("rd_kafka_subscribe error {}: {}", magic_enum::enum_underlying(subscribe_response),
rd_kafka_err2str(subscribe_response));
logger_->log_error("rd_kafka_subscribe error {}: {}", magic_enum::enum_underlying(subscribe_response), rd_kafka_err2str(subscribe_response));
}
}

Expand Down Expand Up @@ -192,8 +189,7 @@ void ConsumeKafka::configure_new_connection(core::ProcessContext& context) {
extend_config_from_dynamic_properties(context);

std::array<char, 512U> errstr{};
consumer_ = {
rd_kafka_new(RD_KAFKA_CONSUMER, conf_.release(), errstr.data(), errstr.size()), utils::rd_kafka_consumer_deleter()};
consumer_ = {rd_kafka_new(RD_KAFKA_CONSUMER, conf_.release(), errstr.data(), errstr.size()), utils::rd_kafka_consumer_deleter()};
if (consumer_ == nullptr) {
const std::string error_msg{errstr.data()};
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create Kafka consumer " + error_msg);
Expand All @@ -208,23 +204,22 @@ void ConsumeKafka::configure_new_connection(core::ProcessContext& context) {
//
// As far as I understand, instead of rd_kafka_position() an rd_kafka_committed() call if preferred here,
// as it properly fetches offsets from the broker
if (RD_KAFKA_RESP_ERR_NO_ERROR !=
rd_kafka_committed(consumer_.get(), kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS)) {
if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_committed(consumer_.get(), kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS)) {
logger_->log_error("Retrieving committed offsets for topics+partitions failed.");
}

rd_kafka_resp_err_t poll_set_consumer_response = rd_kafka_poll_set_consumer(consumer_.get());
if (RD_KAFKA_RESP_ERR_NO_ERROR != poll_set_consumer_response) {
logger_->log_error("rd_kafka_poll_set_consumer error {}: {}", magic_enum::enum_underlying(poll_set_consumer_response),
logger_->log_error("rd_kafka_poll_set_consumer error {}: {}",
magic_enum::enum_underlying(poll_set_consumer_response),
rd_kafka_err2str(poll_set_consumer_response));
}
}

std::string ConsumeKafka::extract_message(const rd_kafka_message_t& rkmessage) {
if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage.err) {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION,
"ConsumeKafka: received error message from broker: " + std::to_string(rkmessage.err) + " " +
rd_kafka_err2str(rkmessage.err));
"ConsumeKafka: received error message from broker: " + std::to_string(rkmessage.err) + " " + rd_kafka_err2str(rkmessage.err));
}
return {static_cast<char*>(rkmessage.payload), rkmessage.len};
}
Expand All @@ -236,13 +231,11 @@ std::vector<utils::rd_kafka_message_unique_ptr> ConsumeKafka::poll_kafka_message
auto elapsed = std::chrono::steady_clock::now() - start;
while (messages.size() < max_poll_records_ && elapsed < max_poll_time_milliseconds_) {
logger_->log_debug("Polling for new messages for {}...", max_poll_time_milliseconds_);
const auto timeout_ms = gsl::narrow<int>(
std::chrono::duration_cast<std::chrono::milliseconds>(max_poll_time_milliseconds_ - elapsed).count());
const auto timeout_ms = gsl::narrow<int>(std::chrono::duration_cast<std::chrono::milliseconds>(max_poll_time_milliseconds_ - elapsed).count());
utils::rd_kafka_message_unique_ptr message{rd_kafka_consumer_poll(consumer_.get(), timeout_ms)};
if (!message) { break; }
if (RD_KAFKA_RESP_ERR_NO_ERROR != message->err) {
logger_->log_error(
"Received message with error {}: {}", magic_enum::enum_underlying(message->err), rd_kafka_err2str(message->err));
logger_->log_error("Received message with error {}: {}", magic_enum::enum_underlying(message->err), rd_kafka_err2str(message->err));
break;
}
utils::print_kafka_message(*message, *logger_);
Expand All @@ -253,20 +246,14 @@ std::vector<utils::rd_kafka_message_unique_ptr> ConsumeKafka::poll_kafka_message
}

utils::KafkaEncoding ConsumeKafka::key_attr_encoding_attr_to_enum() const {
if (utils::string::equalsIgnoreCase(key_attribute_encoding_, KEY_ATTR_ENCODING_UTF_8)) {
return utils::KafkaEncoding::UTF8;
}
if (utils::string::equalsIgnoreCase(key_attribute_encoding_, KEY_ATTR_ENCODING_UTF_8)) { return utils::KafkaEncoding::UTF8; }
if (utils::string::equalsIgnoreCase(key_attribute_encoding_, KEY_ATTR_ENCODING_HEX)) { return utils::KafkaEncoding::HEX; }
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Key Attribute Encoding\" property not recognized.");
}

utils::KafkaEncoding ConsumeKafka::message_header_encoding_attr_to_enum() const {
if (utils::string::equalsIgnoreCase(message_header_encoding_, MSG_HEADER_ENCODING_UTF_8)) {
return utils::KafkaEncoding::UTF8;
}
if (utils::string::equalsIgnoreCase(message_header_encoding_, MSG_HEADER_ENCODING_HEX)) {
return utils::KafkaEncoding::HEX;
}
if (utils::string::equalsIgnoreCase(message_header_encoding_, MSG_HEADER_ENCODING_UTF_8)) { return utils::KafkaEncoding::UTF8; }
if (utils::string::equalsIgnoreCase(message_header_encoding_, MSG_HEADER_ENCODING_HEX)) { return utils::KafkaEncoding::HEX; }
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Message Header Encoding\" property not recognized.");
}

Expand All @@ -277,15 +264,15 @@ std::string ConsumeKafka::resolve_duplicate_headers(const std::vector<std::strin
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Duplicate Header Handling\" property not recognized.");
}

std::vector<std::string> ConsumeKafka::get_matching_headers(
const rd_kafka_message_t& message, const std::string& header_name) const {
std::vector<std::string> ConsumeKafka::get_matching_headers(const rd_kafka_message_t& message, const std::string& header_name) const {
// Headers fetched this way are freed when rd_kafka_message_destroy is called
// Detaching them using rd_kafka_message_detach_headers does not seem to work
rd_kafka_headers_t* headers_raw = nullptr;
const rd_kafka_resp_err_t get_header_response = rd_kafka_message_headers(&message, &headers_raw);
if (RD_KAFKA_RESP_ERR__NOENT == get_header_response) { return {}; }
if (RD_KAFKA_RESP_ERR_NO_ERROR != get_header_response) {
logger_->log_error("Failed to fetch message headers: {}: {}", magic_enum::enum_underlying(rd_kafka_last_error()),
logger_->log_error("Failed to fetch message headers: {}: {}",
magic_enum::enum_underlying(rd_kafka_last_error()),
rd_kafka_err2str(rd_kafka_last_error()));
}
std::vector<std::string> matching_headers;
Expand All @@ -306,8 +293,7 @@ std::vector<std::string> ConsumeKafka::get_matching_headers(
return matching_headers;
}

std::vector<std::pair<std::string, std::string>> ConsumeKafka::get_flowfile_attributes_from_message_header(
const rd_kafka_message_t& message) const {
std::vector<std::pair<std::string, std::string>> ConsumeKafka::get_flowfile_attributes_from_message_header(const rd_kafka_message_t& message) const {
std::vector<std::pair<std::string, std::string>> attributes_from_headers;
for (const std::string& header_name: headers_to_add_as_attributes_) {
const std::vector<std::string> matching_headers = get_matching_headers(message, header_name);
Expand All @@ -330,16 +316,14 @@ void ConsumeKafka::add_kafka_attributes_to_flowfile(core::FlowFile& flow_file, c
flow_file.setAttribute(KAFKA_TOPIC_ATTR, rd_kafka_topic_name(message.rkt));
}

std::optional<std::vector<std::shared_ptr<core::FlowFile>>> ConsumeKafka::transform_pending_messages_into_flowfiles(
core::ProcessSession& session) const {
std::optional<std::vector<std::shared_ptr<core::FlowFile>>> ConsumeKafka::transform_pending_messages_into_flowfiles(core::ProcessSession& session)
const {
std::vector<std::shared_ptr<core::FlowFile>> flow_files_created;
for (const auto& message: pending_messages_) {
std::string message_content = extract_message(*message);
std::vector<std::pair<std::string, std::string>> attributes_from_headers =
get_flowfile_attributes_from_message_header(*message);
std::vector<std::string> split_message{!message_demarcator_.empty()
? utils::string::split(message_content, message_demarcator_)
: std::vector<std::string>{message_content}};
std::vector<std::pair<std::string, std::string>> attributes_from_headers = get_flowfile_attributes_from_message_header(*message);
std::vector<std::string> split_message{
!message_demarcator_.empty() ? utils::string::split(message_content, message_demarcator_) : std::vector<std::string>{message_content}};
for (auto& flowfile_content: split_message) {
auto flow_file = session.create();
if (flow_file == nullptr) {
Expand All @@ -358,14 +342,12 @@ std::optional<std::vector<std::shared_ptr<core::FlowFile>>> ConsumeKafka::transf
}

void ConsumeKafka::process_pending_messages(core::ProcessSession& session) {
std::optional<std::vector<std::shared_ptr<core::FlowFile>>> flow_files_created =
transform_pending_messages_into_flowfiles(session);
std::optional<std::vector<std::shared_ptr<core::FlowFile>>> flow_files_created = transform_pending_messages_into_flowfiles(session);
if (!flow_files_created) { return; }
for (const auto& flow_file: flow_files_created.value()) { session.transfer(flow_file, Success); }
session.commit();
// Commit the offset from the latest message only
if (RD_KAFKA_RESP_ERR_NO_ERROR !=
rd_kafka_commit_message(consumer_.get(), pending_messages_.back().get(), /* async = */ 0)) {
if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_commit_message(consumer_.get(), pending_messages_.back().get(), /* async = */ 0)) {
logger_->log_error("Committing offset failed.");
}
pending_messages_.clear();
Expand Down
Loading

0 comments on commit 3fd27e0

Please sign in to comment.