Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sapirshuker committed Nov 21, 2024
1 parent ed18726 commit 8671995
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
11 changes: 6 additions & 5 deletions Packs/Kafka/Integrations/KafkaV3/KafkaV3.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class KafkaCommunicator:

SESSION_TIMEOUT: int = 10000
REQUESTS_TIMEOUT: float = 10.0
POLL_TIMEOUT: float = 10.0 # Increased from 1.0 to prevent frequent 'No results' responses in the kafka-consume-msg command,
POLL_TIMEOUT: float = 1.0 # Increased from 1.0 to prevent frequent 'No results' responses in the kafka-consume-msg command.
POLL_TIMEOUT_STOP_UPON_TIMEOUT = 10.0
# which caused test playbook failures in builds.
MAX_POLLS_FOR_LOG: int = 100

Expand Down Expand Up @@ -788,7 +789,7 @@ def fetch_incidents(kafka: KafkaCommunicator, demisto_params: dict) -> None:
last_fetched_offsets = demisto.getLastRun().get('last_fetched_offsets', {})
last_topic = demisto.getLastRun().get('last_topic', '')
stop_consuming_upon_timeout = argToBoolean(demisto_params.get('stop_consuming_upon_timeout', False))

poll_timeout = kafka.POLL_TIMEOUT if stop_consuming_upon_timeout else kafka.POLL_TIMEOUT_STOP_UPON_TIMEOUT
demisto.debug(f"Starting fetch incidents with:\n last_topic: {last_topic}, "
f"last_fetched_offsets: {last_fetched_offsets}, "
f"topic: {topic}, partitions: {partitions}, offset: {offset}, "
Expand Down Expand Up @@ -828,18 +829,18 @@ def fetch_incidents(kafka: KafkaCommunicator, demisto_params: dict) -> None:

demisto.debug("Beginning to poll messages from kafka")
num_polled_msg = 0
for messages_num in range(max_messages):
for _ in range(max_messages):
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
polled_msg = kafka_consumer.poll(kafka.POLL_TIMEOUT)
polled_msg = kafka_consumer.poll(poll_timeout)
if polled_msg:
num_polled_msg += 1
demisto.debug(f"Received a message {num_polled_msg}# from Kafka.")
incidents.append(create_incident(message=polled_msg, topic=topic))
last_fetched_offsets[f'{polled_msg.partition()}'] = polled_msg.offset()
elif stop_consuming_upon_timeout and (not polled_msg):
demisto.debug(f"Didn't get a message after {kafka.POLL_TIMEOUT} seconds"
demisto.debug(f"Didn't get a message after {poll_timeout} seconds"
f", stop_consuming_upon_timeout is true, break the loop. {num_polled_msg=}")
break

Expand Down
2 changes: 2 additions & 0 deletions Packs/Kafka/Integrations/KafkaV3/KafkaV3_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ def test_fetch_incidents_stop_consuming_upon_timeout_is_true(
- Assert the created incidents are as expected
- Assert setting the last run
- Assert break method was called
- Assert poll method was called with timeout 10.0
"""
mocker.patch.object(KConsumer, "__init__", return_value=None)
cluster_metadata = create_cluster_metadata(cluster_tree)
Expand Down Expand Up @@ -869,6 +870,7 @@ def test_fetch_incidents_stop_consuming_upon_timeout_is_true(
debug.call_args_list[-2][0][0]
== "Didn't get a message after 10.0 seconds, stop_consuming_upon_timeout is true, break the loop. num_polled_msg=1"
)
assert poll_mock.assert_any_call(10.0)
close_mock.assert_called_once()
incidents_mock.assert_called_once_with(incidents)
set_last_run_mock.assert_called_once_with(next_run)
Expand Down

0 comments on commit 8671995

Please sign in to comment.