diff --git a/include/kafka/KafkaConsumer.h b/include/kafka/KafkaConsumer.h index 32dccd59..6feaccab 100644 --- a/include/kafka/KafkaConsumer.h +++ b/include/kafka/KafkaConsumer.h @@ -130,24 +130,32 @@ class KafkaConsumer: public KafkaClient /** * Get the first offset for the given partitions. * This method does not change the current consumer position of the partitions. + * If the timeout is 0 cached offsets will be loaded and no call to the broker is made. * Throws KafkaException with errors: * - RD_KAFKA_RESP_ERR__FAIL: Generic failure */ std::map beginningOffsets(const TopicPartitions& topicPartitions, std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const { + if (timeout == 0) { + return getOffsetsCached(topicPartitions, true); + } return getOffsets(topicPartitions, true, timeout); } /** * Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1. * This method does not change the current consumer position of the partitions. + * If the timeout is 0 cached offsets will be loaded and no call to the broker is made. * Throws KafkaException with errors: * - RD_KAFKA_RESP_ERR__FAIL: Generic failure */ std::map endOffsets(const TopicPartitions& topicPartitions, std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const { + if (timeout == 0) { + return getOffsetsCached(topicPartitions, false); + } return getOffsets(topicPartitions, false, timeout); } @@ -279,6 +287,8 @@ class KafkaConsumer: public KafkaClient std::map getOffsets(const TopicPartitions& topicPartitions, bool atBeginning, std::chrono::milliseconds timeout) const; + std::map getOffsetsCached(const TopicPartitions& topicPartitions, + bool atBeginning) const; enum class PartitionsRebalanceEvent { Assign, Revoke, IncrementalAssign, IncrementalUnassign }; void changeAssignment(PartitionsRebalanceEvent event, const TopicPartitions& tps); @@ -752,6 +762,28 @@ KafkaConsumer::getOffsets(const TopicPartitions& topicPartitions, return result; } +inline std::map +KafkaConsumer::getOffsetsCached(const TopicPartitions& topicPartitions, + bool atBeginning) const +{ + std::map result; + + for (const auto& topicPartition: topicPartitions) + { + Offset beginning{}, end{}; + const Error error{ rd_kafka_get_watermark_offsets(getClientHandle(), + topicPartition.first.c_str(), + topicPartition.second, + &beginning, + &end) }; + KAFKA_THROW_IF_WITH_ERROR(error); + + result[topicPartition] = (atBeginning ? beginning : end); + } + + return result; +} + // Commit inline void KafkaConsumer::commit(const TopicPartitionOffsets& topicPartitionOffsets, CommitType type)