Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help: kafka-python consumer not receiving messages #2112

Open
vnmrbu opened this issue Aug 25, 2020 · 6 comments
Open

Help: kafka-python consumer not receiving messages #2112

vnmrbu opened this issue Aug 25, 2020 · 6 comments

Comments

@vnmrbu
Copy link

vnmrbu commented Aug 25, 2020

I'm new to Python.
I do my code as bellow and get this problem. Producer publish message to Kafka, but Consumer not receive any message.
I've follow below topic but not work for me.
[https://github.com//issues/535]
Here is my code:

[Producer.py]
from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         api_version=(0,11,3),
                         value_serializer=lambda x:dumps(x).encode('utf-8'))
print('Processing...')
for num in range(1000):
    data={'number': num}
    # send message
    producer.send('numbertest', data)
    print('send data: ' + str(data))
    # sleep 2s
    sleep(2)

[Consumer.py]

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'numbertest', # topic
     group_id=None,
     bootstrap_servers=['localhost:9092'], # bootstrap server
     api_version=(0,11,3),
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     value_deserializer=lambda x: loads(x.decode('utf-8')))

print('Receiving message...')
for message in consumer:
    print('Message: ' + str(message.value))

Thank you all!

@sanvir10
Copy link

I have the same problem, i'm excuting consumer con GKE.

@vnmrbu
Copy link
Author

vnmrbu commented Aug 31, 2020

Can anyone help me pls?

@gravityshouldbenaut
Copy link

I had the same issue and moved to pykafka because I saw no new responses here. However, be sure to use the bytes style when sending messages via producer. I made an issue to change the readme for pykafka to make sure thats changed, as the readme shows use of string Parsely/pykafka#1021

https://github.com/Parsely/pykafka

@sanvir10
Copy link

sanvir10 commented Dec 11, 2020

Hi, i had the same problem but now is working fine:

Example:

from kafka import KafkaConsumer
import logging

root = logging.getLogger()
root.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)

consumidor= KafkaConsumer("MI_TOPIC",
                        bootstrap_servers= "MI_SERVER",
                        group_id=None,
                        auto_offset_reset='latest',
                        api_version=(0,10)
        )

// Pull messages every 2 seconds
consumidor.poll(timeout_ms=2000)

for msg in consumidor:
    mensaje = str(msg.value.decode('utf-8'))
    logging.info(f"====>>>>>: {str(msg)}")

I hope this help, regards.

@AnikethSDeshpande
Copy link

somehow after poll, there isnt anything happening

@mohamed-rasvi-badal
Copy link

Hi, i had the same problem but now is working fine:

Example:

from kafka import KafkaConsumer
import logging

root = logging.getLogger()
root.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)

consumidor= KafkaConsumer("MI_TOPIC",
                        bootstrap_servers= "MI_SERVER",
                        group_id=None,
                        auto_offset_reset='latest',
                        api_version=(0,10)
        )

// Pull messages every 2 seconds
consumidor.poll(timeout_ms=2000)

for msg in consumidor:
    mensaje = str(msg.value.decode('utf-8'))
    logging.info(f"====>>>>>: {str(msg)}")

I hope this help, regards.

It worked for me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants