Skip to content

Commit

Permalink
format test
Browse files Browse the repository at this point in the history
  • Loading branch information
sapirshuker committed Nov 20, 2024
1 parent 9fce422 commit 89c9a40
Showing 1 changed file with 72 additions and 31 deletions.
103 changes: 72 additions & 31 deletions Packs/Kafka/Integrations/KafkaV3/KafkaV3_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from CommonServerPython import DemistoException, demisto

from KafkaV3 import KafkaCommunicator, command_test_module, KConsumer, KProducer, print_topics, fetch_partitions, \
Expand Down Expand Up @@ -770,25 +769,55 @@ def test_fetch_incidents(mocker, demisto_params, last_run, cluster_tree, topic_p
incidents_mock.assert_called_once_with(incidents)
set_last_run_mock.assert_called_once_with(next_run)


@pytest.mark.parametrize(
'demisto_params, last_run, cluster_tree, topic_partitions, incidents, next_run, polled_msgs, offsets',
"demisto_params, last_run, cluster_tree, topic_partitions, incidents, next_run, polled_msgs, offsets",
[
pytest.param(
{'topic': 'some-topic',
'partition': '',
'first_fetch': '0',
'max_fetch': '2',
'stop_consuming_upon_timeout': True}, {}, {'some-topic': [0]}, [TopicPartition(topic='some-topic', partition=0, offset=1)],
[{'name': 'Kafka some-topic partition:0 offset:1',
'details': 'polled_msg',
'rawJSON': '{"Topic": "some-topic", "Partition": 0, "Offset": 1, '
'"Message": "polled_msg"}'}],
{'last_fetched_offsets': {'0': 1}, 'last_topic': 'some-topic'},
[MessageMock(message='polled_msg', partition=0, offset=1,
timestamp=(TIMESTAMP_NOT_AVAILABLE, 0)), None], [(0, 2), (0, 2), (0, 2)], id="first run, offset is 0,"
"stop_consuming_upon_timeout is true")])
def test_fetch_incidents_stop_consuming_upon_timeout_is_true(mocker, demisto_params, last_run, cluster_tree, topic_partitions,
incidents, next_run, polled_msgs, offsets):
{
"topic": "some-topic",
"partition": "",
"first_fetch": "0",
"max_fetch": "2",
"stop_consuming_upon_timeout": True,
},
{},
{"some-topic": [0]},
[TopicPartition(topic="some-topic", partition=0, offset=1)],
[
{
"name": "Kafka some-topic partition:0 offset:1",
"details": "polled_msg",
"rawJSON": '{"Topic": "some-topic", "Partition": 0, "Offset": 1, '
'"Message": "polled_msg"}',
}
],
{"last_fetched_offsets": {"0": 1}, "last_topic": "some-topic"},
[
MessageMock(
message="polled_msg",
partition=0,
offset=1,
timestamp=(TIMESTAMP_NOT_AVAILABLE, 0),
),
None,
],
[(0, 2), (0, 2), (0, 2)],
id="first run, offset is 0," "stop_consuming_upon_timeout is true",
)
],
)
def test_fetch_incidents_stop_consuming_upon_timeout_is_true(
mocker,
demisto_params,
last_run,
cluster_tree,
topic_partitions,
incidents,
next_run,
polled_msgs,
offsets,
):
"""
Given:
- initialized KafkaCommunicator
Expand All @@ -805,30 +834,42 @@ def test_fetch_incidents_stop_consuming_upon_timeout_is_true(mocker, demisto_par
- Assert setting the last run
- Assert break method was called
"""
mocker.patch.object(KConsumer, '__init__', return_value=None)
mocker.patch.object(KConsumer, "__init__", return_value=None)
cluster_metadata = create_cluster_metadata(cluster_tree)
mocker.patch.object(KConsumer, 'list_topics', return_value=cluster_metadata)
mocker.patch.object(demisto, 'getLastRun', return_value=last_run)
assign_mock = mocker.patch.object(KConsumer, 'assign')
poll_mock = mocker.patch.object(KConsumer, 'poll', side_effect=polled_msgs)
mocker.patch.object(KConsumer, 'get_watermark_offsets', side_effect=offsets)
close_mock = mocker.patch.object(KConsumer, 'close')
set_last_run_mock = mocker.patch.object(demisto, 'setLastRun')
incidents_mock = mocker.patch.object(demisto, 'incidents')
debug = mocker.patch.object(demisto, 'debug')
mocker.patch.object(KConsumer, "list_topics", return_value=cluster_metadata)
mocker.patch.object(demisto, "getLastRun", return_value=last_run)
assign_mock = mocker.patch.object(KConsumer, "assign")
poll_mock = mocker.patch.object(KConsumer, "poll", side_effect=polled_msgs)
mocker.patch.object(KConsumer, "get_watermark_offsets", side_effect=offsets)
close_mock = mocker.patch.object(KConsumer, "close")
set_last_run_mock = mocker.patch.object(demisto, "setLastRun")
incidents_mock = mocker.patch.object(demisto, "incidents")
debug = mocker.patch.object(demisto, "debug")

fetch_incidents(KAFKA, demisto_params)

assign_mock.assert_called_once_with(topic_partitions)
called_topic_partitions = assign_mock.call_args.args[0]
for partition_num in range(len(topic_partitions)):
assert called_topic_partitions[partition_num].topic == topic_partitions[partition_num].topic
assert called_topic_partitions[partition_num].partition == topic_partitions[partition_num].partition
assert called_topic_partitions[partition_num].offset == topic_partitions[partition_num].offset
assert (
called_topic_partitions[partition_num].topic
== topic_partitions[partition_num].topic
)
assert (
called_topic_partitions[partition_num].partition
== topic_partitions[partition_num].partition
)
assert (
called_topic_partitions[partition_num].offset
== topic_partitions[partition_num].offset
)

assert len(polled_msgs) == poll_mock.call_count
debug.assert_called_with(f"Fetching finished, setting last run to {next_run}")
assert debug.call_args_list[-2][0][0] == "Didn't get a message after 10.0 seconds, stop_consuming_upon_timeout is true, break the loop. num_polled_msg=1"
assert (
debug.call_args_list[-2][0][0]
== "Didn't get a message after 10.0 seconds, stop_consuming_upon_timeout is true, break the loop. num_polled_msg=1"
)
close_mock.assert_called_once()
incidents_mock.assert_called_once_with(incidents)
set_last_run_mock.assert_called_once_with(next_run)
Expand Down

0 comments on commit 89c9a40

Please sign in to comment.