From 8a2389121c0ddc96369f0f16b42569afcd04e8c7 Mon Sep 17 00:00:00 2001 From: Yinxiu Jia <48558845+kenneth-jia@users.noreply.github.com> Date: Wed, 19 Apr 2023 19:57:11 +0800 Subject: [PATCH] Remove KafkaConsumer::poll(std::chrono::milliseconds timeout, std::vector& output) --- .clang-tidy | 2 ++ include/kafka/KafkaConsumer.h | 43 ++++++----------------------------- 2 files changed, 9 insertions(+), 36 deletions(-) diff --git a/.clang-tidy b/.clang-tidy index 446bc659f..71c2b9b31 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -25,6 +25,8 @@ Checks: "*,\ -cppcoreguidelines-macro-usage,\ -cppcoreguidelines-avoid-magic-numbers,\ -cppcoreguidelines-avoid-non-const-global-variables,\ + -cppcoreguidelines-avoid-const-or-ref-data-members,\ + -cppcoreguidelines-avoid-do-while,\ -cppcoreguidelines-pro-type-vararg,\ -cppcoreguidelines-pro-bounds-array-to-pointer-decay,\ -cppcoreguidelines-pro-bounds-pointer-arithmetic,\ diff --git a/include/kafka/KafkaConsumer.h b/include/kafka/KafkaConsumer.h index 5c3f26ba4..32dccd597 100644 --- a/include/kafka/KafkaConsumer.h +++ b/include/kafka/KafkaConsumer.h @@ -213,16 +213,6 @@ class KafkaConsumer: public KafkaClient */ std::vector poll(std::chrono::milliseconds timeout); - /** - * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. - * Returns the number of polled records (which have been saved into parameter `output`). - * Note: 1) The result could be fetched through ConsumerRecord (with member function `error`). - * 2) Make sure the `ConsumerRecord` be destructed before the `KafkaConsumer.close()`. - * Throws KafkaException with errors: - * - RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: Unknow partition - */ - std::size_t poll(std::chrono::milliseconds timeout, std::vector& output); - /** * Suspend fetching from the requested partitions. Future calls to poll() will not return any records from these partitions until they have been resumed using resume(). * Note: 1) After pausing, the application still need to call `poll()` at regular intervals. @@ -320,8 +310,6 @@ class KafkaConsumer: public KafkaClient // Register Callbacks for rd_kafka_conf_t static void registerConfigCallbacks(rd_kafka_conf_t* conf); - void pollMessages(int timeoutMs, std::vector& output); - enum class PauseOrResumeOperation { Pause, Resume }; void pauseOrResumePartitions(const TopicPartitions& topicPartitions, PauseOrResumeOperation op); @@ -820,45 +808,28 @@ KafkaConsumer::storeOffsetsIfNecessary(const std::vector& output) +// Fetch messages +inline std::vector +KafkaConsumer::poll(std::chrono::milliseconds timeout) { // Commit the offsets for these messages which had been polled last time (for "enable.auto.commit=true" case) commitStoredOffsetsIfNecessary(CommitType::Async); // Poll messages with librdkafka's API std::vector msgPtrArray(_maxPollRecords); - auto msgReceived = rd_kafka_consume_batch_queue(_rk_queue.get(), timeoutMs, msgPtrArray.data(), _maxPollRecords); + auto msgReceived = rd_kafka_consume_batch_queue(_rk_queue.get(), convertMsDurationToInt(timeout), msgPtrArray.data(), _maxPollRecords); if (msgReceived < 0) { KAFKA_THROW_ERROR(Error(rd_kafka_last_error())); } // Wrap messages with ConsumerRecord - output.clear(); - output.reserve(static_cast(msgReceived)); - std::for_each(msgPtrArray.begin(), msgPtrArray.begin() + msgReceived, [&output](rd_kafka_message_t* rkMsg) { output.emplace_back(rkMsg); }); + std::vector records(msgPtrArray.begin(), msgPtrArray.begin() + msgReceived); // Store the offsets for all these polled messages (for "enable.auto.commit=true" case) - storeOffsetsIfNecessary(output); -} - -// Fetch messages (return via return value) -inline std::vector -KafkaConsumer::poll(std::chrono::milliseconds timeout) -{ - std::vector result; - poll(timeout, result); - return result; -} + storeOffsetsIfNecessary(records); -// Fetch messages (return via input parameter) -inline std::size_t -KafkaConsumer::poll(std::chrono::milliseconds timeout, std::vector& output) -{ - pollMessages(convertMsDurationToInt(timeout), output); - return output.size(); + return records; } inline void