diff --git a/Packs/Kafka/Integrations/KafkaV3/KafkaV3.py b/Packs/Kafka/Integrations/KafkaV3/KafkaV3.py index fe04adac4751..f272b914a479 100644 --- a/Packs/Kafka/Integrations/KafkaV3/KafkaV3.py +++ b/Packs/Kafka/Integrations/KafkaV3/KafkaV3.py @@ -37,7 +37,8 @@ class KafkaCommunicator: SESSION_TIMEOUT: int = 10000 REQUESTS_TIMEOUT: float = 10.0 - POLL_TIMEOUT: float = 10.0 # Increased from 1.0 to prevent frequent 'No results' responses in the kafka-consume-msg command, + POLL_TIMEOUT: float = 1.0 # Increased from 1.0 to prevent frequent 'No results' responses in the kafka-consume-msg command. + POLL_TIMEOUT_STOP_UPON_TIMEOUT = 10.0 # which caused test playbook failures in builds. MAX_POLLS_FOR_LOG: int = 100 @@ -788,7 +789,7 @@ def fetch_incidents(kafka: KafkaCommunicator, demisto_params: dict) -> None: last_fetched_offsets = demisto.getLastRun().get('last_fetched_offsets', {}) last_topic = demisto.getLastRun().get('last_topic', '') stop_consuming_upon_timeout = argToBoolean(demisto_params.get('stop_consuming_upon_timeout', False)) - + poll_timeout = kafka.POLL_TIMEOUT if stop_consuming_upon_timeout else kafka.POLL_TIMEOUT_STOP_UPON_TIMEOUT demisto.debug(f"Starting fetch incidents with:\n last_topic: {last_topic}, " f"last_fetched_offsets: {last_fetched_offsets}, " f"topic: {topic}, partitions: {partitions}, offset: {offset}, " @@ -828,18 +829,18 @@ def fetch_incidents(kafka: KafkaCommunicator, demisto_params: dict) -> None: demisto.debug("Beginning to poll messages from kafka") num_polled_msg = 0 - for messages_num in range(max_messages): + for _ in range(max_messages): # Initial message consumption may take up to # `session.timeout.ms` for the consumer group to # rebalance and start consuming - polled_msg = kafka_consumer.poll(kafka.POLL_TIMEOUT) + polled_msg = kafka_consumer.poll(poll_timeout) if polled_msg: num_polled_msg += 1 demisto.debug(f"Received a message {num_polled_msg}# from Kafka.") incidents.append(create_incident(message=polled_msg, topic=topic)) last_fetched_offsets[f'{polled_msg.partition()}'] = polled_msg.offset() elif stop_consuming_upon_timeout and (not polled_msg): - demisto.debug(f"Didn't get a message after {kafka.POLL_TIMEOUT} seconds" + demisto.debug(f"Didn't get a message after {poll_timeout} seconds" f", stop_consuming_upon_timeout is true, break the loop. {num_polled_msg=}") break diff --git a/Packs/Kafka/Integrations/KafkaV3/KafkaV3_test.py b/Packs/Kafka/Integrations/KafkaV3/KafkaV3_test.py index 0560b65b7daa..e238bc18be00 100644 --- a/Packs/Kafka/Integrations/KafkaV3/KafkaV3_test.py +++ b/Packs/Kafka/Integrations/KafkaV3/KafkaV3_test.py @@ -832,6 +832,7 @@ def test_fetch_incidents_stop_consuming_upon_timeout_is_true( - Assert the created incidents are as expected - Assert setting the last run - Assert break method was called + - Assert poll method was called with timeout 10.0 """ mocker.patch.object(KConsumer, "__init__", return_value=None) cluster_metadata = create_cluster_metadata(cluster_tree) @@ -869,6 +870,7 @@ def test_fetch_incidents_stop_consuming_upon_timeout_is_true( debug.call_args_list[-2][0][0] == "Didn't get a message after 10.0 seconds, stop_consuming_upon_timeout is true, break the loop. num_polled_msg=1" ) + assert poll_mock.assert_any_call(10.0) close_mock.assert_called_once() incidents_mock.assert_called_once_with(incidents) set_last_run_mock.assert_called_once_with(next_run)