From f6c8526d24a9c21e4c778465f7e65ccade6ff13c Mon Sep 17 00:00:00 2001 From: DevanjanMishra Date: Thu, 11 Jan 2024 22:04:07 +0530 Subject: [PATCH 1/7] added initial code to seek specific offset list from partition --- examples/seek_specific_offset_partition.py | 189 +++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 examples/seek_specific_offset_partition.py diff --git a/examples/seek_specific_offset_partition.py b/examples/seek_specific_offset_partition.py new file mode 100644 index 000000000..7636684b7 --- /dev/null +++ b/examples/seek_specific_offset_partition.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +# +# Copyright 2020 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# A simple example demonstrating use of JSONDeserializer. +# + +import sys +import numpy as np +import logging +import argparse +from confluent_kafka import Consumer,TopicPartition +from confluent_kafka.error import KafkaException + + +# TODO: MAX OFFSET TO EXTRACT AT A TIME + + +def partition_list(kafka_consumer:Consumer, topic:str): + """ + Parameters + ---------- + kafka_consumer : kafka_consumer object of cimpl module + defined consumer for ex: kafka_consumer = kafka_consumer({'bootstrap.servers': "servers"}) + topic : str + Name of the topic on Kafka Server to subscribe to. + Ex: "Food" + + Returns + ------- + partition_list : list + list of partitions in the topic. + topic_partition: list + list of topic partition objects (to iterate or multi-thread over) + partition_offset: list + list of list having partition,[minimum, max offset] of each partition + + Operation + ---------- + c1=kafka_consumer.list_topics() #Admin Cluster metadata + c2=kafka_consumer.list_topics().topics #Dictionary with "topic_name":topic_matadata as key value pair + c3=kafka_consumer.list_topics().topics.get(topic) # Get Specific "topic" metadata + c4=kafka_consumer.list_topics().topics.get(topic).partitions # Dictionary with "partition":"partition_metadata" as key value pair + c5=list(kafka_consumer.list_topics().topics.get(topic).partitions.keys()) #List all keys (partitions) of above dictionary + + """ + try: + kafka_consumer.subscribe([topic]) + except KafkaException as exc: + logging.error("Unable to subscribe to topic {}.\n Exception: {}".format(str(topic),str(exc))) + return [],[],[] + + try: + partition_list=list(kafka_consumer.list_topics().topics.get(topic).partitions.keys()) + topic_partition=[TopicPartition(topic,partition) for partition in partition_list] + + partition_offset=[[ele1,(kafka_consumer.get_watermark_offsets(ele2))] for ele1,ele2 + in zip(partition_list,topic_partition)] + except KafkaException as exc: + logging.error("Unable to extract offset list from topic {}.\n Exception: {}".format(str(topic),str(exc))) + return [],[],[] + return partition_list,topic_partition,partition_offset + + + + +def fetch_message_partition(consumer_conf,partition,min_offset,max_offset,topic="my"): + """ + Returns list of messages and [offset,partition] for min,max offsets for a given partition and topic. + + Parameters + ---------- + consumer_conf + partition : TYPE + DESCRIPTION. + min_offset : TYPE + DESCRIPTION. + max_offset : TYPE + DESCRIPTION. + topic : TYPE, optional + DESCRIPTION. The default is "my". + + Returns + ------- + partition : TYPE + DESCRIPTION. + TYPE + DESCRIPTION. + TYPE + DESCRIPTION. + + """ + total_messages=[] + total_partition_offest=[] + consumer = Consumer(consumer_conf) + available_min_offset,available_max_offset=consumer.get_watermark_offsets(TopicPartition(topic,partition)) + + if min_offset>max_offset: + logging.info("Provided minimum offset: {} greater than Provided max offset:{} in partition:{} Topic:{}".format(str(min_offset), + str(max_offset),str(partition),str(topic))) + return [],[] + if min_offset< available_min_offset: + logging.info("Minimum Offset: {} less than available minimum offset: {} in partition:{} Topic:{}".format(str(min_offset), + str(available_min_offset),str(partition),str(topic))) + min_offset=available_min_offset + if max_offset> available_max_offset: + logging.info("Maximum Offset: {} greater than available maximum offset: {} in partition:{} Topic:{}".format(str(max_offset), + str(available_max_offset),str(partition),str(topic))) + max_offset=available_max_offset + + try: + partition1=TopicPartition(topic,partition,min_offset) + consumer.assign([partition1]) + consumer.seek( partition1) + start_offset=min_offset + except Exception as exc: + logging.error("Unable to seek consumer to Topic:{} Partition:{} and Offset:{}.\nException:{}".format( + str(topic),str(partition),str(min_offset),str(exc))) + + try: + message=None + while message==None: + message=consumer.poll() + start_offset=message.offset() + break + total_messages.append(str(message.value())) + total_partition_offest.append([message.offset(),message.partition()]) + + while start_offset Date: Thu, 11 Jan 2024 22:43:17 +0530 Subject: [PATCH 2/7] Updated main function --- examples/seek_specific_offset_partition.py | 38 +++++++++++++++------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/examples/seek_specific_offset_partition.py b/examples/seek_specific_offset_partition.py index 7636684b7..10652067c 100644 --- a/examples/seek_specific_offset_partition.py +++ b/examples/seek_specific_offset_partition.py @@ -17,8 +17,7 @@ # A simple example demonstrating use of JSONDeserializer. # -import sys -import numpy as np + import logging import argparse from confluent_kafka import Consumer,TopicPartition @@ -28,7 +27,7 @@ # TODO: MAX OFFSET TO EXTRACT AT A TIME -def partition_list(kafka_consumer:Consumer, topic:str): +def fetch_partition_list(kafka_consumer:Consumer, topic:str): """ Parameters ---------- @@ -157,18 +156,26 @@ def main(args): consumer_conf = {'bootstrap.servers': args.bootstrap_servers, 'group.id': args.group, 'auto.offset.reset': "earliest"} + partition=args.partition + start_offset=args.start_offset + end_offset=args.end_offset - - try: consumer = Consumer(consumer_conf) except KafkaException as exc: logging.error("Unable to connect to Kafka Server.\n Exception:{} "+str(exc)) - partition_list,topic_partition,partition_offset= partition_list(consumer, topic) + partition_list,topic_partition,partition_offset= fetch_partition_list(consumer, topic) - total_messages,total_partition_offest=fetch_message_partition(partition,min_offset,max_offset,topic="my") + if partition in partition_list: + total_messages,total_partition_offest=fetch_message_partition(partition,start_offset,end_offset,topic="my") + if len(total_messages)>0: + total_messages_output=[[ele1,ele2[0],ele2[1]] for ele1,ele2 in zip(total_messages,total_partition_offest)] + with open("file.txt", "w") as output: + output.write(str(total_messages_output)) + else: + logging.error("Partition {} not in consumer."+str(partition)) consumer.close() @@ -176,12 +183,21 @@ def main(args): parser = argparse.ArgumentParser(description="JSONDeserializer example") parser.add_argument('-b', dest="bootstrap_servers", required=True, help="Bootstrap broker(s) (host[:port])") - parser.add_argument('-s', dest="schema_registry", required=True, - help="Schema Registry (http(s)://host[:port]") - parser.add_argument('-t', dest="topic", default="example_serde_json", - help="Topic name") parser.add_argument('-g', dest="group", default="example_serde_json", help="Consumer group") + parser.add_argument('-t', dest="topic", default="topic_z", + help="Topic name") + parser.add_argument('-p', dest="partition", default="partition", + help="Partition of topic to fetch data from") + parser.add_argument('-sof', dest="start_offset", required=True, + help="Start Offset for Partition p") + parser.add_argument('-eof', dest="end_offset", required=True, + help="End Offset for Partition p") + + + + + main(parser.parse_args()) From 756f3ee193664ddb6d22c0b08551146c0eaa6d0b Mon Sep 17 00:00:00 2001 From: DevanjanMishra Date: Thu, 11 Jan 2024 22:47:01 +0530 Subject: [PATCH 3/7] Updated Readme --- examples/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/README.md b/examples/README.md index 32a248a1d..0801d29b0 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,7 +14,7 @@ The scripts in this directory provide various examples of using Confluent's Pyth * [sasl_producer.py](sasl_producer.py): Demonstrates SASL Authentication. * [get_watermark_offsets.py](get_watermark_offsets.py): Consumer method for listing committed offsets and consumer lag for group and topics. * [oauth_producer.py](oauth_producer.py): Demonstrates OAuth Authentication (client credentials). - +* [seek_specific_offset_partition.py](seek_specific_offset_partition.py): Demonstrates usage of seek method for consumer to fetch specfic offset messages from specific partition. Additional examples for [Confluent Cloud](https://www.confluent.io/confluent-cloud/): * [confluent_cloud.py](confluent_cloud.py): Produce messages to Confluent Cloud and then read them back again. From 02046a44788c6ee6ddeac4c047b29d3c70ebc55e Mon Sep 17 00:00:00 2001 From: DevanjanMishra Date: Thu, 11 Jan 2024 22:52:05 +0530 Subject: [PATCH 4/7] Updated Readme --- examples/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/README.md b/examples/README.md index 0801d29b0..c1cf1c2bc 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,7 +14,8 @@ The scripts in this directory provide various examples of using Confluent's Pyth * [sasl_producer.py](sasl_producer.py): Demonstrates SASL Authentication. * [get_watermark_offsets.py](get_watermark_offsets.py): Consumer method for listing committed offsets and consumer lag for group and topics. * [oauth_producer.py](oauth_producer.py): Demonstrates OAuth Authentication (client credentials). -* [seek_specific_offset_partition.py](seek_specific_offset_partition.py): Demonstrates usage of seek method for consumer to fetch specfic offset messages from specific partition. +*[seek_specific_offset_partition.py](seek_specific_offset_partition.py): Demonstrates usage of seek method for consumer to fetch specfic offset messages from specific partition. + Additional examples for [Confluent Cloud](https://www.confluent.io/confluent-cloud/): * [confluent_cloud.py](confluent_cloud.py): Produce messages to Confluent Cloud and then read them back again. From 966af0f1bec1d3e1e7073bfec9fe6ba56ec21756 Mon Sep 17 00:00:00 2001 From: DevanjanMishra Date: Thu, 11 Jan 2024 22:53:11 +0530 Subject: [PATCH 5/7] Updated Readme --- examples/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/README.md b/examples/README.md index c1cf1c2bc..8e2ddcbfd 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,7 +14,7 @@ The scripts in this directory provide various examples of using Confluent's Pyth * [sasl_producer.py](sasl_producer.py): Demonstrates SASL Authentication. * [get_watermark_offsets.py](get_watermark_offsets.py): Consumer method for listing committed offsets and consumer lag for group and topics. * [oauth_producer.py](oauth_producer.py): Demonstrates OAuth Authentication (client credentials). -*[seek_specific_offset_partition.py](seek_specific_offset_partition.py): Demonstrates usage of seek method for consumer to fetch specfic offset messages from specific partition. +* [seek_specific_offset_partition.py](seek_specific_offset_partition.py): Demonstrates usage of seek method for consumer to fetch specfic offset messages from specific partition. Additional examples for [Confluent Cloud](https://www.confluent.io/confluent-cloud/): From 4a230635800241118aac32924f06cd56fc276ab6 Mon Sep 17 00:00:00 2001 From: DevanjanMishra Date: Sat, 13 Jan 2024 13:38:35 +0530 Subject: [PATCH 6/7] added documentation to the function and code flow --- examples/seek_specific_offset_partition.py | 37 ++++++++++++++-------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/examples/seek_specific_offset_partition.py b/examples/seek_specific_offset_partition.py index 10652067c..38aaec841 100644 --- a/examples/seek_specific_offset_partition.py +++ b/examples/seek_specific_offset_partition.py @@ -75,21 +75,25 @@ def fetch_partition_list(kafka_consumer:Consumer, topic:str): -def fetch_message_partition(consumer_conf,partition,min_offset,max_offset,topic="my"): +def fetch_message_partition(consumer_conf,topic,partition,min_offset,max_offset): """ Returns list of messages and [offset,partition] for min,max offsets for a given partition and topic. Parameters ---------- - consumer_conf - partition : TYPE - DESCRIPTION. - min_offset : TYPE - DESCRIPTION. - max_offset : TYPE - DESCRIPTION. - topic : TYPE, optional - DESCRIPTION. The default is "my". + consumer_conf: json + Kafka server Configurations to be used for creating consumer/producer object. + topic : str, + topic of Kafka server from which messages to be consumed via consumer. + partition : int + Partition of topic for which data is to be fetched. + min_offset : int + Start offset from which data is to be fetched from Partition (if available). + Else from lowest available offset of the partition. + max_offset : int + End offset till which data is to be fetched from Partition (if available). + Else till highest available offset of the partition. + Returns ------- @@ -106,6 +110,7 @@ def fetch_message_partition(consumer_conf,partition,min_offset,max_offset,topic= consumer = Consumer(consumer_conf) available_min_offset,available_max_offset=consumer.get_watermark_offsets(TopicPartition(topic,partition)) + #Check availabel min and max offset for the partition of the topic if min_offset>max_offset: logging.info("Provided minimum offset: {} greater than Provided max offset:{} in partition:{} Topic:{}".format(str(min_offset), str(max_offset),str(partition),str(topic))) @@ -119,6 +124,7 @@ def fetch_message_partition(consumer_conf,partition,min_offset,max_offset,topic= str(available_max_offset),str(partition),str(topic))) max_offset=available_max_offset + #Seeking the pointer to set to read message from the min offset try: partition1=TopicPartition(topic,partition,min_offset) consumer.assign([partition1]) @@ -127,7 +133,8 @@ def fetch_message_partition(consumer_conf,partition,min_offset,max_offset,topic= except Exception as exc: logging.error("Unable to seek consumer to Topic:{} Partition:{} and Offset:{}.\nException:{}".format( str(topic),str(partition),str(min_offset),str(exc))) - + + # Reading only the 1st message along with offset try: message=None while message==None: @@ -136,7 +143,7 @@ def fetch_message_partition(consumer_conf,partition,min_offset,max_offset,topic= break total_messages.append(str(message.value())) total_partition_offest.append([message.offset(),message.partition()]) - + # Reading the messages after the 1st message while start_offset0: total_messages_output=[[ele1,ele2[0],ele2[1]] for ele1,ele2 in zip(total_messages,total_partition_offest)] + # Saving the message along with offset and partition as txt file with open("file.txt", "w") as output: output.write(str(total_messages_output)) else: @@ -180,6 +190,7 @@ def main(args): consumer.close() if __name__ == '__main__': + parser = argparse.ArgumentParser(description="JSONDeserializer example") parser.add_argument('-b', dest="bootstrap_servers", required=True, help="Bootstrap broker(s) (host[:port])") From b0bcda5f471e6b2d4808ce28ab5f8af2a122ec39 Mon Sep 17 00:00:00 2001 From: DevanjanMishra Date: Sun, 18 Feb 2024 23:47:56 +0530 Subject: [PATCH 7/7] Updated exception handling for usage of seek method for consumers --- examples/README.md | 2 +- examples/seek_specific_offset_partition.py | 32 ++++++++++++---------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/examples/README.md b/examples/README.md index 8e2ddcbfd..f79e20d9f 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,7 +14,7 @@ The scripts in this directory provide various examples of using Confluent's Pyth * [sasl_producer.py](sasl_producer.py): Demonstrates SASL Authentication. * [get_watermark_offsets.py](get_watermark_offsets.py): Consumer method for listing committed offsets and consumer lag for group and topics. * [oauth_producer.py](oauth_producer.py): Demonstrates OAuth Authentication (client credentials). -* [seek_specific_offset_partition.py](seek_specific_offset_partition.py): Demonstrates usage of seek method for consumer to fetch specfic offset messages from specific partition. +* [seek_specific_offset_partition.py](seek_specific_offset_partition.py): Demonstrates usage of seek method to fetch specfic offset messages from specific partition for consumer. Additional examples for [Confluent Cloud](https://www.confluent.io/confluent-cloud/): diff --git a/examples/seek_specific_offset_partition.py b/examples/seek_specific_offset_partition.py index 38aaec841..e95bac185 100644 --- a/examples/seek_specific_offset_partition.py +++ b/examples/seek_specific_offset_partition.py @@ -14,7 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# A simple example demonstrating use of JSONDeserializer. +# A simple example demonstrating use of seek method to seek messages from a specificied offset to a specified offset for a partition in a topic. +# Conventionally, using the seek method with while loop results in some messages being skipped. +# Proposed approach of reading a single message first and then reading the remaining messages, solves the issue. # @@ -94,15 +96,20 @@ def fetch_message_partition(consumer_conf,topic,partition,min_offset,max_offset) End offset till which data is to be fetched from Partition (if available). Else till highest available offset of the partition. - Returns ------- - partition : TYPE - DESCRIPTION. - TYPE - DESCRIPTION. - TYPE - DESCRIPTION. + total_messages : list + list of messages from min offset to max offset in the partition of the topic. + total_partition_offest : list of lists + list of lists having [offset,partition] for the corresponding index of messge in total_messages list. + + Operation + ---------- + 1) Find the min/max available offset in the partition of the topic. + 2) Check if required min/max offset in range of available min/max offset, else assign required min/max to available min/max accordingly. + 3) Seek the consumer to the required min offset. + 4) Read a single message from consumer and store the message and [partition,offset] + 5) Read the remaining number (min-max) of messages from the consumer and store the messages and list of [partition,offset] """ total_messages=[] @@ -170,7 +177,7 @@ def main(args): try: consumer = Consumer(consumer_conf) except KafkaException as exc: - logging.error("Unable to connect to Kafka Server.\n Exception:{} "+str(exc)) + logging.error("Unable to connect to Kafka Server.\n Exception:{} ".format(str(exc))) partition_list,topic_partition,partition_offset= fetch_partition_list(consumer, topic) @@ -185,7 +192,7 @@ def main(args): with open("file.txt", "w") as output: output.write(str(total_messages_output)) else: - logging.error("Partition {} not in consumer."+str(partition)) + logging.error("Partition {} not in consumer.".format(str(partition))) consumer.close() @@ -204,11 +211,6 @@ def main(args): help="Start Offset for Partition p") parser.add_argument('-eof', dest="end_offset", required=True, help="End Offset for Partition p") - - - - - main(parser.parse_args())