Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically error on committing offset #1843

Open
4 of 7 tasks
V4kodin opened this issue Oct 25, 2024 · 0 comments
Open
4 of 7 tasks

Periodically error on committing offset #1843

V4kodin opened this issue Oct 25, 2024 · 0 comments

Comments

@V4kodin
Copy link

V4kodin commented Oct 25, 2024

Description

I have random problem while committing offset.
Here is consumer code

import time
from confluent_kafka import Consumer, KafkaError
from typing import Callable, Any, List
from dags.pkg.proceed_data import proceed_messages
from dags.entities.config_types import KafkaConsumerConfig, BdConfig
import logging
import threading

consumer_running = True

def stop_loop():
    global consumer_running
    consumer_running = False
    print("End consumer on timer") 

def kafka_consumer(kafka_config: dict,
                   kafka_consumer_config: KafkaConsumerConfig,
                   bd_config: BdConfig,
                   logger: logging.Logger,
                   reformat_record: Callable[[dict, Any], dict | None],
                   table_name: str = None,
                   topic: str = None
                   ):
    global consumer_running

    if table_name is not None:
        bd_config['table_name'] = table_name
    if topic is not None:
        kafka_consumer_config['topic'] = topic
    if reformat_record is None:
        raise Exception('reformat_record is None')
    if logger is None:
        raise Exception('logger is None')
    logger.info(f'reformat_record: {reformat_record.__name__}')

    consumer = Consumer(kafka_config)
    consumer.subscribe([kafka_consumer_config['topic']])
    messages_batch = []
    consumer_running = True

    timer = threading.Timer(kafka_consumer_config['consume_time'], stop_loop)
    timer.start()
    try:
        while consumer_running:
            message = consumer.poll(kafka_consumer_config['poll_timeout'])
            if message is None:
                continue
            if message.error():
                if message.error().code() == KafkaError._PARTITION_EOF:
                    logger.info('%% %s [%d] reached end at offset %d\n' %
                                     (message.topic(), message.partition(), message.offset()))
                else:
                    logger.error(f'Error while consuming: {message.error()}')
                    raise message.error()
                continue

            messages_batch.append(message.value())
            if len(messages_batch) >= kafka_consumer_config['max_messages']:
                logger.debug(f'messages batch: {messages_batch}')
                try:
                    proceed_messages(messages=messages_batch, logger=logger, host=bd_config['host'],
                                     port=bd_config['port'], username=bd_config['username'],
                                     password=bd_config['password'], table_name=bd_config['table_name'],
                                     reformat_record=reformat_record)
                    consumer.commit(asynchronous=False)
                    messages_batch.clear()
                except Exception as e:
                    logger.error('Error with proceed_messages func')
                    raise e
        if len(messages_batch) > 0:
            time.sleep(5)
            try:
                proceed_messages(messages=messages_batch, logger=logger, host=bd_config['host'],
                                 port=bd_config['port'], username=bd_config['username'],
                                 password=bd_config['password'], table_name=bd_config['table_name'],
                                 reformat_record=reformat_record)
                consumer.commit(asynchronous=False)
                messages_batch.clear()
            except Exception as e:
                logger.error('Error with proceed_messages func')
                raise e
    except Exception as e:
        logger.error(f'Error in Kafka consumer: {str(e)}')
        raise e
    finally:
        timer.cancel()
        consumer.close()

I recently added an exit from the loop on timeout (stop_loop() function) and I wanted to process all messages that were not processed due to the condition len(messages_batch) >= kafka_consumer_config['max_messages'].
So, I copied the block with "proceed" and "commit", placing it after the loop with the condition len(messages_batch) > 0. However, this block sometimes causes an unexpected error KafkaError{code=NO_OFFSET, val=-168, str="Commit failed: Local: No offset stored"}.
I have no idea what the problem is because, in theory, if this commit block starts working, there should be unprocessed messages. But sometimes, I get this error and sometimes I don't.

Maybe I do not see something?

I can send any information or log if necessary.

P.S.

confluent_kafka.version - 2.5.0
confluent_kafka.libversion - 2.5.0

client config except secrets:

kafka_config['auto.offset.reset'] = 'earliest'
kafka_config['enable.auto.offset.store'] = True
kafka_config['enable.auto.commit'] = False

Operating system - docker container, image apache/airflow:2.9.2

How to reproduce

I guess run my code, no idea.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant