forked from scylladb/scylla-cluster-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_utils_k8s.py
306 lines (242 loc) · 12.2 KB
/
test_utils_k8s.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
from copy import deepcopy
from unittest import mock
from sdcm.utils.k8s import (
HelmValues,
KubernetesOps,
ScyllaPodsIPChangeTrackerThread,
)
BASE_HELM_VALUES = {
"no_nesting_key": "no_nesting_value",
"nested_dict": {
"first_nested_dict_key": "first_nested_dict_value",
"second_nested_dict_key": "second_nested_dict_value",
},
"nested_list": [1, 2, 3],
}
def test_helm_values_init_with_dict_arg():
helm_values = HelmValues(BASE_HELM_VALUES)
assert helm_values.as_dict() == BASE_HELM_VALUES
def test_helm_values_init_with_kwargs():
helm_values = HelmValues(**BASE_HELM_VALUES)
assert helm_values.as_dict() == BASE_HELM_VALUES
def test_helm_values_get():
helm_values = HelmValues(BASE_HELM_VALUES)
assert helm_values.get("no_nesting_key") == "no_nesting_value"
def test_helm_values_get_nonexistent():
helm_values = HelmValues(BASE_HELM_VALUES)
assert helm_values.get("fake_key") is None
def test_helm_values_set_new():
helm_values = HelmValues(BASE_HELM_VALUES)
helm_values.set("new_key", "new_value")
assert helm_values.get("new_key") == "new_value"
def test_helm_values_set_nested_new():
helm_values = HelmValues(BASE_HELM_VALUES)
helm_values.set("nested_dict.third_nested_dict_key", "third_nested_dict_value")
data = helm_values.as_dict()
assert "nested_dict" in data
assert "third_nested_dict_key" in data["nested_dict"]
assert data["nested_dict"]["third_nested_dict_key"] == "third_nested_dict_value"
assert "first_nested_dict_key" in data["nested_dict"]
assert data["nested_dict"]["first_nested_dict_key"] == "first_nested_dict_value"
assert "second_nested_dict_key" in data["nested_dict"]
assert data["nested_dict"]["second_nested_dict_key"] == "second_nested_dict_value"
def test_helm_values_set_override():
helm_values = HelmValues(BASE_HELM_VALUES)
helm_values.set("no_nesting_key", "custom_value")
assert helm_values.get("no_nesting_key") == "custom_value"
def test_helm_values_set_nested_override():
helm_values = HelmValues(BASE_HELM_VALUES)
helm_values.set("nested_dict.first_nested_dict_key", "new_value")
data = helm_values.as_dict()
assert "nested_dict" in data
assert "first_nested_dict_key" in data["nested_dict"]
assert data["nested_dict"]["first_nested_dict_key"] == "new_value"
def test_helm_values_get_list():
helm_values = HelmValues(BASE_HELM_VALUES)
assert helm_values.get("nested_list") == [1, 2, 3]
def test_helm_values_get_by_list_index():
helm_values = HelmValues(BASE_HELM_VALUES)
assert helm_values.get("nested_list.[0]") == 1
def test_helm_delete_from_list():
helm_values = HelmValues(deepcopy(BASE_HELM_VALUES))
match = deepcopy(BASE_HELM_VALUES)
del match['nested_list'][0]
helm_values.delete("nested_list.[0]")
assert helm_values == match
def test_helm_delete_from_dict():
helm_values = HelmValues(deepcopy(BASE_HELM_VALUES))
match = deepcopy(BASE_HELM_VALUES)
del match['nested_dict']['first_nested_dict_key']
helm_values.delete("nested_dict.first_nested_dict_key")
assert helm_values == match
def test_helm_values_try_set_by_list_index():
helm_values = HelmValues(BASE_HELM_VALUES)
try:
helm_values.set("nested_list[0]", 4)
except ValueError:
return
assert False, "expected 'ValueError' exception was not raised"
class FakeK8SKluster: # pylint: disable=too-few-public-methods
def __init__(self):
self.get_api_client = mock.MagicMock()
self.region_name = 'fake-region-1'
def get_k8s_endpoint_update(namespace, pod_name, ip, required_labels_in_place=True,
as_str=True):
labels = {}
if required_labels_in_place:
labels = {k: "fake_value"
for k in (ScyllaPodsIPChangeTrackerThread.SCYLLA_PODS_EXPECTED_LABEL_KEYS)}
as_dict = {
# NOTE: every field that exists in real but makes no effect in our case
# is replaced with "fake_%its-key-name%" value.
"type": "fake_type",
"object": {
"kind": "Endpoints",
"apiVersion": "v1",
"metadata": {
"name": pod_name,
"namespace": namespace,
"uid": "fake_uid",
"resourceVersion": "fake_resourceVersion",
"creationTimestamp": "fake_creationTimestamp",
"labels": labels,
"annotations": "fake_annotations",
"managedFields": "fake_managedFields"
},
"subsets": [{
"addresses": [{
"ip": ip,
"hostname": "fake_hostname",
"nodeName": "fake_nodeName",
"targetRef": "fake_targetRef"
}],
"ports": "fake_ports"
}]
}
}
if not as_str:
return as_dict
as_str = str(as_dict).replace("\n", "")
return as_str
ns1_p1_callbacks_as_list = [
[mock.Mock(__name__="ns1_p1_callbacks_as_list[0]"), [mock.Mock()], {"fake_kwarg": mock.Mock()}],
mock.Mock(__name__="ns1_p1_callbacks_as_list[1]"),
]
ns1_p2_callbacks_as_callable = mock.Mock(__name__="ns1_p2_callbacks_as_callable")
ns1_eachpod_callbacks_as_callable = mock.Mock(__name__="ns1_eachpod_callbacks_as_callable")
ns2_eachpod_callbacks_as_list = [
mock.Mock(__name__="ns2_eachpod_callbacks_as_list[0]"),
mock.Mock(__name__="ns2_eachpod_callbacks_as_list[1]"),
]
# pylint: disable=protected-access
def test_scylla_pods_ip_change_tracker_01_positive_scenario(): # pylint: disable=too-many-statements
# Init objects
core_v1_api_mock, k8s_kluster, ip_mapper = mock.Mock(), FakeK8SKluster(), {}
namespace1, pod_names1 = 'scylla', ("pod-name-1", "pod-name-2", "pod-name-3")
namespace2, pod_names2 = 'another-namespace', ("an-pod-name-1", "an-pod-name-2", "an-pod-name-3")
fake_ip1, fake_ip2, fake_ip1a, fake_ip3, fake_ip3a = (
'fake-ip-1', 'fake-ip-2', 'fake-ip-1a', 'fake-ip-3', 'fake-ip-3a')
with mock.patch.object(KubernetesOps, "core_v1_api", side_effect=core_v1_api_mock):
ip_tracker = ScyllaPodsIPChangeTrackerThread(k8s_kluster, ip_mapper)
k8s_kluster.get_api_client.assert_called_once()
core_v1_api_mock.assert_called_once()
assert not ip_mapper, ip_mapper
# Register callbacks
ip_tracker.register_callbacks(
callbacks=ns1_p1_callbacks_as_list,
namespace=namespace1, pod_name=pod_names1[0], add_pod_name_as_kwarg=False)
ip_tracker.register_callbacks(
callbacks=ns1_p2_callbacks_as_callable,
namespace=namespace1, pod_name=pod_names1[1], add_pod_name_as_kwarg=True)
ip_tracker.register_callbacks(
callbacks=ns1_eachpod_callbacks_as_callable,
namespace=namespace1, pod_name='__each__', add_pod_name_as_kwarg=False)
# Verify registered callbacks
assert len(ip_mapper) == 1, ip_mapper
ip_tracker.register_callbacks(
callbacks=ns2_eachpod_callbacks_as_list, namespace=namespace2, add_pod_name_as_kwarg=True)
assert len(ip_mapper) == 2, ip_mapper
assert namespace1 in ip_mapper and namespace2 in ip_mapper, ip_mapper
assert pod_names1[0] in ip_mapper[namespace1], ip_mapper
assert 'callbacks' in ip_mapper[namespace1][pod_names1[0]], ip_mapper
assert ip_mapper[namespace1][pod_names1[0]]['callbacks'] == [
(ns1_p1_callbacks_as_list[0][0], ns1_p1_callbacks_as_list[0][1],
ns1_p1_callbacks_as_list[0][2], False),
(ns1_p1_callbacks_as_list[1], [], {}, False),
], ip_mapper
assert pod_names1[1] in ip_mapper[namespace1], ip_mapper
assert 'callbacks' in ip_mapper[namespace1][pod_names1[1]], ip_mapper
assert ip_mapper[namespace1][pod_names1[1]]['callbacks'] == [
(ns1_p2_callbacks_as_callable, [], {}, True),
], ip_mapper
assert '__each__' in ip_mapper[namespace1], ip_mapper
assert 'callbacks' in ip_mapper[namespace1]['__each__'], ip_mapper
assert ip_mapper[namespace1]['__each__']['callbacks'] == [
(ns1_eachpod_callbacks_as_callable, [], {}, False),
], ip_mapper
assert ip_mapper[namespace2]['__each__']['callbacks'] == [
(ns2_eachpod_callbacks_as_list[0], [], {}, True),
(ns2_eachpod_callbacks_as_list[1], [], {}, True),
], ip_mapper
# Process pods updates and assert it's data and callbacks call status
ip_tracker._process_line(get_k8s_endpoint_update(namespace1, pod_names1[0], fake_ip1))
assert pod_names1[0] in ip_mapper[namespace1], ip_mapper
assert 'current_ip' in ip_mapper[namespace1][pod_names1[0]], ip_mapper
assert fake_ip1 == ip_mapper[namespace1][pod_names1[0]]['current_ip'], ip_mapper
assert ip_mapper[namespace1][pod_names1[0]]['old_ips'] == [], ip_mapper
ip_tracker._process_line(get_k8s_endpoint_update(namespace1, pod_names1[1], fake_ip2))
assert pod_names1[1] in ip_mapper[namespace1], ip_mapper
assert 'current_ip' in ip_mapper[namespace1][pod_names1[1]], ip_mapper
assert fake_ip2 == ip_mapper[namespace1][pod_names1[1]]['current_ip'], ip_mapper
assert ip_mapper[namespace1][pod_names1[1]]['old_ips'] == [], ip_mapper
ns1_p1_callbacks_as_list[0][0].assert_not_called()
ns1_p1_callbacks_as_list[1].assert_not_called()
ns1_p2_callbacks_as_callable.assert_not_called()
ns1_eachpod_callbacks_as_callable.assert_not_called()
ns2_eachpod_callbacks_as_list[0].assert_not_called()
ns2_eachpod_callbacks_as_list[1].assert_not_called()
ip_tracker._process_line(get_k8s_endpoint_update(namespace1, pod_names1[0], fake_ip1a))
assert pod_names1[0] in ip_mapper[namespace1], ip_mapper
assert 'current_ip' in ip_mapper[namespace1][pod_names1[0]], ip_mapper
assert fake_ip1a == ip_mapper[namespace1][pod_names1[0]]['current_ip'], ip_mapper
assert ip_mapper[namespace1][pod_names1[0]]['old_ips'] == [fake_ip1], ip_mapper
ns1_p1_callbacks_as_list[0][0].assert_called_once_with(
*ns1_p1_callbacks_as_list[0][1],
**ns1_p1_callbacks_as_list[0][2])
ns1_p1_callbacks_as_list[1].assert_called_once_with()
ns1_p2_callbacks_as_callable.assert_not_called()
ns1_eachpod_callbacks_as_callable.assert_called_once_with()
ns2_eachpod_callbacks_as_list[0].assert_not_called()
ns2_eachpod_callbacks_as_list[1].assert_not_called()
ip_tracker._process_line(get_k8s_endpoint_update(namespace2, pod_names2[0], fake_ip3))
assert pod_names2[0] in ip_mapper[namespace2], ip_mapper
assert 'current_ip' in ip_mapper[namespace2][pod_names2[0]], ip_mapper
assert fake_ip3 == ip_mapper[namespace2][pod_names2[0]]['current_ip'], ip_mapper
assert ip_mapper[namespace2][pod_names2[0]]['old_ips'] == [], ip_mapper
ip_tracker._process_line(get_k8s_endpoint_update(namespace2, pod_names2[0], fake_ip3a))
assert fake_ip3a == ip_mapper[namespace2][pod_names2[0]]['current_ip'], ip_mapper
assert ip_mapper[namespace2][pod_names2[0]]['old_ips'] == [fake_ip3], ip_mapper
ns1_p1_callbacks_as_list[0][0].assert_called_once_with(
*ns1_p1_callbacks_as_list[0][1],
**ns1_p1_callbacks_as_list[0][2])
ns1_p1_callbacks_as_list[1].assert_called_once_with()
ns1_p2_callbacks_as_callable.assert_not_called()
ns1_eachpod_callbacks_as_callable.assert_called_once_with()
ns2_eachpod_callbacks_as_list[0].assert_called_once_with(pod_name=pod_names2[0])
ns2_eachpod_callbacks_as_list[1].assert_called_once_with(pod_name=pod_names2[0])
def test_scylla_pods_ip_change_tracker_02_negative():
core_v1_api_mock, k8s_kluster, ip_mapper = mock.Mock(), FakeK8SKluster(), {}
with mock.patch.object(KubernetesOps, "core_v1_api", side_effect=core_v1_api_mock):
ip_tracker = ScyllaPodsIPChangeTrackerThread(k8s_kluster, ip_mapper)
k8s_kluster.get_api_client.assert_called_once()
core_v1_api_mock.assert_called_once()
ip_tracker._process_line(1)
ip_tracker._process_line([])
ip_tracker._process_line("")
ip_tracker._process_line("some fake not parsible line")
ip_tracker.register_callbacks(callbacks=[], namespace='fake', pod_name='fake')
no_ns_dict = get_k8s_endpoint_update("fake-ns", "fake-pod-name", "fake-ip", as_str=False)
no_ns_dict['object']['metadata'].pop('namespace')
no_ns_str = str(no_ns_dict).replace("\n", "")
ip_tracker._process_line(no_ns_str)
assert not ip_mapper, ip_mapper