Skip to content

Commit

Permalink
fix: kafka offset
Browse files Browse the repository at this point in the history
  • Loading branch information
tavallaie committed Aug 4, 2024
1 parent 0d797bf commit 99cc433
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
11 changes: 8 additions & 3 deletions connectiva/protocols/kafka_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def connect(self):
self.topic,
bootstrap_servers=self.broker_list,
group_id=self.group_id,
auto_offset_reset='earliest', # Start from the earliest message
enable_auto_commit=True, # Automatically commit offsets
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
self.logger.info("Kafka consumer connected.")
Expand All @@ -80,9 +82,12 @@ def send(self, message: Message) -> Dict[str, Any]:
def receive(self) -> Message:
self.logger.info(f"Receiving message from Kafka topic '{self.topic}'...")
try:
for message in self.consumer:
self.logger.info("Message received successfully!")
return Message(action="receive", data=message.value) # Return the first message received
message = next(self.consumer) # Fetch the next message
self.logger.info("Message received successfully!")
return Message(action="receive", data=message.value)
except StopIteration:
self.logger.info("No message received.")
return Message(action="error", data={}, metadata={"error": "No message found"})
except KafkaError as e:
self.logger.error(f"Failed to receive message: {e}")
return Message(action="error", data={}, metadata={"error": str(e)})
Expand Down
30 changes: 18 additions & 12 deletions tests/test_kafka_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ class TestKafkaWithConnectiva(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Set up logging configuration
cls.log_file = "connectiva_kafka_test.log" # Log file path
cls.logger = logging.getLogger("ConnectivaKafkaTest")

# Initialize Connectiva with Kafka configuration
cls.connectiva = Connectiva(
endpoint='kafka://localhost:9092',
endpoint='kafka://localhost:9092', # Correct Kafka endpoint format
topic='test_topic',
group_id='test_group',
log=True, # Enable logging to stdout
log_file=cls.log_file, # Enable logging to file
log_level="DEBUG"
)
cls.connectiva.connect()
Expand All @@ -28,30 +30,34 @@ def tearDownClass(cls):
cls.connectiva.disconnect()
cls.logger.info("Disconnected from Kafka using Connectiva")

def test_send_message(self):
self.logger.debug("Testing send_message")
message = Message(action="send", data={"key": "value"})
result = self.connectiva.send(message)
self.logger.debug(f"Send result: {result}")
self.assertEqual(result["status"], "sent", "Message send status should be 'sent'")
def test_send_and_receive_message(self):
self.logger.debug("Testing send_and_receive_message")

def test_receive_message(self):
self.logger.debug("Testing receive_message")
# Send a message to the Kafka topic
sent_message = Message(action="send", data={"key": "value"})
self.connectiva.send(sent_message)
send_result = self.connectiva.send(sent_message)
self.logger.debug(f"Send result: {send_result}")
self.assertEqual(send_result["status"], "sent", "Message send status should be 'sent'")

time.sleep(1) # Allow some time for the message to be available
# Allow some time for the message to be available
time.sleep(2)

# Receive the message from the Kafka topic
received_message = self.connectiva.receive()
self.logger.debug(f"Received message: {received_message}")

# Validate the received message
self.assertEqual(received_message.action, "receive", "Received action should be 'receive'")
self.assertEqual(received_message.data, sent_message.__dict__, "Received data should match sent message")

def test_receive_no_message(self):
self.logger.debug("Testing receive_no_message")
time.sleep(1) # Allow time for the consumer to poll

# Attempt to receive a message when no messages are expected
received_message = self.connectiva.receive()
self.logger.debug(f"Receive result: {received_message}")

# Check that an error action is returned when no message is found
self.assertEqual(received_message.action, "error", "Action should be 'error' when no message is found")
self.assertIn("No message found", received_message.metadata.get("error", ""), "Error metadata should indicate no message found")

Expand Down

0 comments on commit 99cc433

Please sign in to comment.