forked from kytos/mef_eline
-
Notifications
You must be signed in to change notification settings - Fork 9
/
utils.py
198 lines (169 loc) · 6.43 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
"""Utility functions."""
from typing import Union
from kytos.core.common import EntityStatus
from kytos.core.events import KytosEvent
from kytos.core.interface import UNI, Interface, TAGRange
from napps.kytos.mef_eline.exceptions import DisabledSwitch
def map_evc_event_content(evc, **kwargs) -> dict:
"""Returns a set of values from evc to be used for content"""
return kwargs | {"evc_id": evc.id,
"id": evc.id,
"name": evc.name,
"metadata": evc.metadata,
"active": evc._active,
"enabled": evc._enabled,
"uni_a": evc.uni_a.as_dict(),
"uni_z": evc.uni_z.as_dict()}
def emit_event(controller, name, context="kytos/mef_eline", content=None,
timeout=None):
"""Send an event when something happens with an EVC."""
event_name = f"{context}.{name}"
event = KytosEvent(name=event_name, content=content)
controller.buffers.app.put(event, timeout=timeout)
def merge_flow_dicts(
dst: dict[str, list], *srcs: list[dict[str, list]]
) -> dict[str, list]:
"""Merge srcs dict flows into dst."""
for src in srcs:
for k, v in src.items():
if k not in dst:
dst[k] = v
else:
dst[k].extend(v)
return dst
async def aemit_event(controller, name, content):
"""Send an asynchronous event"""
event = KytosEvent(name=name, content=content)
await controller.buffers.app.aput(event)
def compare_endpoint_trace(endpoint, vlan, trace):
"""Compare and endpoint with a trace step."""
if vlan and "vlan" in trace:
return (
endpoint.switch.dpid == trace["dpid"]
and endpoint.port_number == trace["port"]
and vlan == trace["vlan"]
)
return (
endpoint.switch.dpid == trace["dpid"]
and endpoint.port_number == trace["port"]
)
def map_dl_vlan(value: Union[str, int]) -> bool:
"""Map dl_vlan value with the following criteria:
dl_vlan = untagged or 0 -> None
dl_vlan = any or "4096/4096" -> 1
dl_vlan = "num1/num2" -> int in [1, 4095]"""
special_untagged = {"untagged", 0}
if value in special_untagged:
return None
special_any = {"any": 1, "4096/4096": 1}
value = special_any.get(value, value)
if isinstance(value, int):
return value
value, mask = map(int, value.split('/'))
return value & (mask & 4095)
def compare_uni_out_trace(
tag_value: Union[None, int, str],
interface: Interface,
trace: dict
) -> bool:
"""Check if the trace last step (output) matches the UNI attributes."""
# keep compatibility for old versions of sdntrace-cp
if "out" not in trace:
return True
if not isinstance(trace["out"], dict):
return False
uni_vlan = map_dl_vlan(tag_value) if tag_value else None
return (
interface.port_number == trace["out"].get("port")
and uni_vlan == trace["out"].get("vlan")
)
def max_power2_divisor(number: int, limit: int = 4096) -> int:
"""Get the max power of 2 that is divisor of number"""
while number % limit > 0:
limit //= 2
return limit
def get_vlan_tags_and_masks(tag_ranges: list[list[int]]) -> list[int, str]:
"""Get a list of vlan/mask pairs for a given list of ranges."""
masks_list = []
for start, end in tag_ranges:
limit = end + 1
while start < limit:
divisor = max_power2_divisor(start)
while divisor > limit - start:
divisor //= 2
mask = 4096 - divisor
if mask == 4095:
masks_list.append(start)
else:
masks_list.append(f"{start}/{mask}")
start += divisor
return masks_list
def check_disabled_component(uni_a: UNI, uni_z: UNI):
"""Check if a switch or an interface is disabled"""
if uni_a.interface.switch != uni_z.interface.switch:
return
if uni_a.interface.switch.status == EntityStatus.DISABLED:
dpid = uni_a.interface.switch.dpid
raise DisabledSwitch(f"Switch {dpid} is disabled")
if uni_a.interface.status == EntityStatus.DISABLED:
id_ = uni_a.interface.id
raise DisabledSwitch(f"Interface {id_} is disabled")
if uni_z.interface.status == EntityStatus.DISABLED:
id_ = uni_z.interface.id
raise DisabledSwitch(f"Interface {id_} is disabled")
def make_uni_list(list_circuits: list) -> list:
"""Make uni list to be sent to sdntrace"""
uni_list = []
for circuit in list_circuits:
if isinstance(circuit.uni_a.user_tag, TAGRange):
# TAGRange value from uni_a and uni_z are currently mirrored
mask_list = (circuit.uni_a.user_tag.mask_list or
circuit.uni_z.user_tag.mask_list)
for mask in mask_list:
uni_list.append((circuit.uni_a.interface, mask))
uni_list.append((circuit.uni_z.interface, mask))
else:
tag_a = None
if circuit.uni_a.user_tag:
tag_a = circuit.uni_a.user_tag.value
uni_list.append(
(circuit.uni_a.interface, tag_a)
)
tag_z = None
if circuit.uni_z.user_tag:
tag_z = circuit.uni_z.user_tag.value
uni_list.append(
(circuit.uni_z.interface, tag_z)
)
return uni_list
def send_flow_mods_event(
controller, flow_dict: dict[str, list], action: str, force=True
):
"""Send flow mods to be deleted or install to flow_manager
through an event"""
for dpid, flows in flow_dict.items():
emit_event(
controller,
context="kytos.flow_manager",
name=f"flows.{action}",
content={
"dpid": dpid,
"flow_dict": {"flows": flows},
"force": force,
},
)
def prepare_delete_flow(evc_flows: dict[str, list[dict]]):
"""Create flow mods suited for flow deletion."""
dpid_flows: dict[str, list[dict]] = {}
if not evc_flows:
return dpid_flows
for dpid, flows in evc_flows.items():
dpid_flows.setdefault(dpid, [])
for flow in flows:
dpid_flows[dpid].append({
"cookie": flow["cookie"],
"match": flow["match"],
"owner": "mef_eline",
"cookie_mask": int(0xffffffffffffffff)
})
return dpid_flows