Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor(eos_designs): Refactor eos_designs structured_config code ip_access_list #4972

Open
wants to merge 26 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
71e5763
Refactor(eos_designs): Refactor eos_designs structured_config code fo…
MaheshGSLAB Feb 3, 2025
1ac9a3f
Merge branch 'devel' into ip-access-list
MaheshGSLAB Feb 4, 2025
3d17a23
updated the logic
MaheshGSLAB Feb 4, 2025
1a99067
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 4, 2025
9344997
replace append_new with append
MaheshGSLAB Feb 4, 2025
1a009d7
Merge branch 'devel' into ip-access-list
MaheshGSLAB Feb 5, 2025
3f363de
Merge branch 'devel' into ip-access-list
MaheshGSLAB Feb 10, 2025
101daac
Added network services ip_access_lists
Feb 10, 2025
1624017
fix the exact duplicate item issue
Feb 10, 2025
eccbc14
Merge branch 'devel' into ip-access-list
MaheshGSLAB Feb 10, 2025
b7251cc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 10, 2025
5603424
added natural sort
Feb 11, 2025
9181511
Merge branch 'devel' into ip-access-list
Vibhu-gslab Feb 11, 2025
440bc64
Merge branch 'devel' into ip-access-list
Shivani-gslab Feb 12, 2025
e3255b9
optimized the code
Feb 12, 2025
8091c01
used seprator as ..
Feb 12, 2025
1a55bec
Merge branch 'devel' into ip-access-list
Feb 13, 2025
de1cd61
Merge branch 'devel' into ip-access-list
MaheshGSLAB Feb 13, 2025
a36672e
Merge branch 'devel' into ip-access-list
MaheshGSLAB Feb 21, 2025
5d18ee6
run cv-pathfinder molecule
MaheshGSLAB Feb 21, 2025
87b4ff3
Merge branch 'devel' into ip-access-list
ClausHolbechArista Feb 21, 2025
bebb455
Merge branch 'devel' into ip-access-list
MaheshGSLAB Feb 25, 2025
726e6f9
updated the logic to render ip_access_list in network services
MaheshGSLAB Feb 25, 2025
e41b879
Merge branch 'devel' into ip-access-list
MaheshGSLAB Feb 25, 2025
40db13f
updated negative unit test
MaheshGSLAB Feb 25, 2025
5ce98c8
fix the type error and return type
MaheshGSLAB Feb 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ expected_error_message: >-
Found duplicate objects with conflicting data while generating configuration for Network Services point-to-point EthernetInterfaces.
Interface Ethernet6.1000 defined under tenants[TENANT_A].point_to_point_services[TEN_A_site2_site5_eline_port_based].endpoints[0] conflicts with
{'name': 'Ethernet6.1000', 'description': None, 'shutdown': False, 'mtu': None, 'vrf': 'TEST', 'encapsulation_dot1q':
{'vlan': 1000}, 'ip_address': '10.42.255.0/31', 'access_group_in': None, 'access_group_out': None, 'peer_type': 'l3_interface', 'eos_cli': None}.
{'vlan': 1000}, 'ip_address': '10.42.255.0/31', 'peer_type': 'l3_interface', 'eos_cli': None}.
Original file line number Diff line number Diff line change
Expand Up @@ -185,20 +185,20 @@ ip_access_lists:
protocol: ip
source: any
destination: 10.10.40.10
- name: TEST-IPV4-ACL-WITH-IP-FIELDS-IN_Ethernet13.10
entries:
- sequence: 15
action: deny
protocol: ip
source: any
destination: 10.10.40.20
- name: TEST-IPV4-ACL-WITH-IP-FIELDS-OUT_Ethernet12
entries:
- remark: Some remark will not require source and destination fields.
- action: permit
protocol: ip
source: 10.10.40.10
destination: any
- name: TEST-IPV4-ACL-WITH-IP-FIELDS-IN_Ethernet13.10
entries:
- sequence: 15
action: deny
protocol: ip
source: any
destination: 10.10.40.20
- name: TEST-IPV4-ACL-WITH-IP-FIELDS-OUT_Ethernet13.10
entries:
- remark: Some remark will not require source and destination fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,20 @@ ip_access_lists:
protocol: ip
source: any
destination: 10.10.50.10
- name: TEST-IPV4-ACL-WITH-IP-FIELDS-IN_Ethernet13.10
entries:
- sequence: 15
action: deny
protocol: ip
source: any
destination: 10.10.50.20
- name: TEST-IPV4-ACL-WITH-IP-FIELDS-OUT_Ethernet12
entries:
- remark: Some remark will not require source and destination fields.
- action: permit
protocol: ip
source: 10.10.50.10
destination: any
- name: TEST-IPV4-ACL-WITH-IP-FIELDS-IN_Ethernet13.10
entries:
- sequence: 15
action: deny
protocol: ip
source: any
destination: 10.10.50.20
- name: TEST-IPV4-ACL-WITH-IP-FIELDS-OUT_Ethernet13.10
entries:
- remark: Some remark will not require source and destination fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pyavd._eos_cli_config_gen.schema import EosCliConfigGen
from pyavd._eos_designs.structured_config.structured_config_generator import structured_config_contributor
from pyavd._errors import AristaAvdError, AristaAvdInvalidInputsError
from pyavd._utils import get
from pyavd._utils import get_ip_from_ip_prefix
from pyavd.j2filters import natural_sort

if TYPE_CHECKING:
Expand Down Expand Up @@ -83,6 +83,9 @@ def _set_l3_interfaces(
continue

interface_name = l3_interface.interfaces[node_index]
interface_ip = l3_interface.ip_addresses[node_index]
if "/" in interface_ip:
interface_ip = get_ip_from_ip_prefix(interface_ip)
# if 'descriptions' is set, it is preferred
interface_description = l3_interface.descriptions[node_index] if l3_interface.descriptions else l3_interface.description
interface = EosCliConfigGen.EthernetInterfacesItem(
Expand All @@ -106,11 +109,23 @@ def _set_l3_interfaces(
if self.inputs.fabric_sflow.l3_interfaces is not None:
interface.sflow.enable = self.inputs.fabric_sflow.l3_interfaces

if self._l3_interface_acls is not None:
interface._update(
access_group_in=get(self._l3_interface_acls, f"{interface_name}..ipv4_acl_in..name", separator=".."),
access_group_out=get(self._l3_interface_acls, f"{interface_name}..ipv4_acl_out..name", separator=".."),
if l3_interface.ipv4_acl_in:
acl = self.shared_utils.get_ipv4_acl(
name=l3_interface.ipv4_acl_in,
interface_name=interface_name,
interface_ip=interface_ip,
)
interface.access_group_in = acl.name
self._set_ipv4_acl(acl)

if l3_interface.ipv4_acl_out:
acl = self.shared_utils.get_ipv4_acl(
name=l3_interface.ipv4_acl_out,
interface_name=interface_name,
interface_ip=interface_ip,
)
interface.access_group_out = acl.name
self._set_ipv4_acl(acl)

if "." in interface_name:
# This is a subinterface so we need to ensure that the parent is created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
# that can be found in the LICENSE file.
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Literal, Protocol

from pyavd._eos_cli_config_gen.schema import EosCliConfigGen
from pyavd._eos_designs.structured_config.structured_config_generator import structured_config_contributor
from pyavd._errors import AristaAvdError
from pyavd._utils import append_if_not_duplicate, get_ip_from_ip_prefix
from pyavd.j2filters import natural_sort
from pyavd._utils import get_ip_from_ip_prefix

if TYPE_CHECKING:
from pyavd._eos_designs.schema import EosDesigns

from . import AvdStructuredConfigNetworkServicesProtocol


Expand All @@ -21,62 +23,51 @@ class IpAccesslistsMixin(Protocol):
Class should only be used as Mixin to a AvdStructuredConfig class.
"""

@cached_property
def _acl_internet_exit_zscaler(self: AvdStructuredConfigNetworkServicesProtocol) -> dict:
return {
"name": self.get_internet_exit_nat_acl_name("zscaler"),
"entries": [
{
"sequence": 10,
"action": "permit",
"protocol": "ip",
"source": "any",
"destination": "any",
},
],
}

@cached_property
def _acl_internet_exit_direct(self: AvdStructuredConfigNetworkServicesProtocol) -> dict | None:
@structured_config_contributor
def ip_access_lists(self: AvdStructuredConfigNetworkServicesProtocol) -> None:
"""Set the structured config for ip_access_lists."""
for ie_policy_type in self._filtered_internet_exit_policy_types:
self._acl_internet_exit(ie_policy_type)

def _set_ipv4_acl(self: AvdStructuredConfigNetworkServicesProtocol, ipv4_acl: EosDesigns.Ipv4AclsItem) -> None:
"""
Set structured config for ip_access_lists.

Called for each interface in l3_interfaces and l3_port_channels when applying ipv4_acls
"""
self.structured_config.ip_access_lists.append(ipv4_acl._cast_as(EosCliConfigGen.IpAccessListsItem))

def _acl_internet_exit_zscaler(self: AvdStructuredConfigNetworkServicesProtocol) -> None:
ip_access_list = EosCliConfigGen.IpAccessListsItem(name=self.get_internet_exit_nat_acl_name("zscaler"))
ip_access_list.entries.append_new(sequence=10, action="permit", protocol="ip", source="any", destination="any")
self.structured_config.ip_access_lists.append(ip_access_list)

def _acl_internet_exit_direct(self: AvdStructuredConfigNetworkServicesProtocol) -> None:
interface_ips = set()
for ie_policy, connections in self._filtered_internet_exit_policies_and_connections:
if ie_policy.type == "direct":
for connection in connections:
interface_ips.add(connection["source_interface_ip_address"])

if interface_ips:
acl = EosCliConfigGen.IpAccessListsItem(name=self.get_internet_exit_nat_acl_name("direct"))
interface_ips = sorted(interface_ips)
entries = []
i = 0
for i, interface_ip in enumerate(interface_ips):
entries.append(
{
"sequence": 10 + i * 10,
"action": "deny",
"protocol": "ip",
"source": get_ip_from_ip_prefix(interface_ip),
"destination": "any",
},
)
entries.append(
{
"sequence": 20 + i * 10,
"action": "permit",
"protocol": "ip",
"source": "any",
"destination": "any",
},
acl.entries.append_new(sequence=10 + i * 10, action="deny", protocol="ip", source=get_ip_from_ip_prefix(interface_ip), destination="any")
acl.entries.append_new(
sequence=20 + i * 10,
action="permit",
protocol="ip",
source="any",
destination="any",
)

return {
"name": self.get_internet_exit_nat_acl_name("direct"),
"entries": entries,
}
return None
self.structured_config.ip_access_lists.append(acl)

def _acl_internet_exit_user_defined(
self: AvdStructuredConfigNetworkServicesProtocol, internet_exit_policy_type: Literal["zscaler", "direct"]
) -> list[dict] | None:
) -> EosDesigns.Ipv4AclsItem | None:
acl_name = self.get_internet_exit_nat_acl_name(internet_exit_policy_type)
if acl_name not in self.inputs.ipv4_acls:
# TODO: Evaluate if we should continue so we raise when there is no ACL.
Expand All @@ -86,41 +77,19 @@ def _acl_internet_exit_user_defined(
acl = self.shared_utils.get_ipv4_acl(acl_name, "random", interface_ip="random", peer_ip="random")
if acl.name == acl_name:
# ACL doesn't need replacement
return [acl._as_dict()]
return acl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a type error

Suggested change
return acl
return acl._cast_as(EosCliConfigGen.IpAccessListsItem)

Update the return type in line 54 as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, this change fixes the ordering as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated


# TODO: We still have one nat for all interfaces, need to also add logic to make nat per interface
# if acl needs substitution
msg = f"ipv4_acls[name={acl_name}] field substitution is not supported for internet exit access lists"
raise AristaAvdError(msg)

def _acl_internet_exit(self: AvdStructuredConfigNetworkServicesProtocol, internet_exit_policy_type: Literal["zscaler", "direct"]) -> list[dict] | None:
def _acl_internet_exit(self: AvdStructuredConfigNetworkServicesProtocol, internet_exit_policy_type: Literal["zscaler", "direct"]) -> None:
acls = self._acl_internet_exit_user_defined(internet_exit_policy_type)
if acls:
return acls

if internet_exit_policy_type == "zscaler":
return [self._acl_internet_exit_zscaler]
if internet_exit_policy_type == "direct" and (acl := self._acl_internet_exit_direct):
return [acl]
return None

@cached_property
def ip_access_lists(self: AvdStructuredConfigNetworkServicesProtocol) -> list | None:
"""Return structured config for ip_access_lists."""
ip_access_lists = []
if self._svi_acls:
for interface_acls in self._svi_acls.values():
for acl in interface_acls.values():
append_if_not_duplicate(ip_access_lists, "name", acl, context="IPv4 Access lists for SVI", context_keys=["name"])

if self._l3_interface_acls:
for l3_interface_acl in self._l3_interface_acls.values():
for acl in l3_interface_acl.values():
append_if_not_duplicate(ip_access_lists, "name", acl, context="IPv4 Access lists for L3 interface", context_keys=["name"])

for ie_policy_type in self._filtered_internet_exit_policy_types:
acls = self._acl_internet_exit(ie_policy_type)
if acls:
ip_access_lists.extend(acls)
self.structured_config.ip_access_lists.append(acls._cast_as(EosCliConfigGen.IpAccessListsItem))

return natural_sort(ip_access_lists, "name") or None
elif internet_exit_policy_type == "zscaler":
self._acl_internet_exit_zscaler()
elif internet_exit_policy_type == "direct":
self._acl_internet_exit_direct()
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pyavd._eos_designs.schema import EosDesigns
from pyavd._errors import AristaAvdError, AristaAvdInvalidInputsError, AristaAvdMissingVariableError
from pyavd._utils import get, get_ip_from_ip_prefix
from pyavd._utils import get
from pyavd._utils.password_utils.password import simple_7_encrypt
from pyavd.j2filters import natural_sort, range_expand

Expand Down Expand Up @@ -418,51 +418,6 @@ def _local_path_groups_connected_to_pathfinder(self: AvdStructuredConfigNetworkS
if any(wan_interface["connected_to_pathfinder"] for wan_interface in path_group._internal_data.interfaces)
]

@cached_property
def _svi_acls(self: AvdStructuredConfigNetworkServicesProtocol) -> dict[str, dict[str, dict]] | None:
"""
Returns a dict of SVI ACLs.

<interface_name>: {
"ipv4_acl_in": <generated_ipv4_acl>,
"ipv4_acl_out": <generated_ipv4_acl>,
}

Only contains interfaces with ACLs and only the ACLs that are set,
so use `get(self._svi_acls, f"{interface_name}.ipv4_acl_in")` to get the value.
"""
if not self.shared_utils.network_services_l3:
return None

svi_acls = {}
for tenant in self.shared_utils.filtered_tenants:
for vrf in tenant.vrfs:
for svi in vrf.svis:
ipv4_acl_in = svi.ipv4_acl_in
ipv4_acl_out = svi.ipv4_acl_out
if ipv4_acl_in is None and ipv4_acl_out is None:
continue

interface_name = f"Vlan{svi.id}"
interface_ip = svi.ip_address_virtual
if interface_ip is not None and "/" in interface_ip:
interface_ip = get_ip_from_ip_prefix(interface_ip)

if ipv4_acl_in is not None:
svi_acls.setdefault(interface_name, {})["ipv4_acl_in"] = self.shared_utils.get_ipv4_acl(
name=ipv4_acl_in,
interface_name=interface_name,
interface_ip=interface_ip,
)._as_dict()
if ipv4_acl_out is not None:
svi_acls.setdefault(interface_name, {})["ipv4_acl_out"] = self.shared_utils.get_ipv4_acl(
name=ipv4_acl_out,
interface_name=interface_name,
interface_ip=interface_ip,
)._as_dict()

return svi_acls

def get_internet_exit_nat_profile_name(self: AvdStructuredConfigNetworkServicesProtocol, internet_exit_policy_type: Literal["zscaler", "direct"]) -> str:
if internet_exit_policy_type == "zscaler":
return "NAT-IE-ZSCALER"
Expand All @@ -475,53 +430,6 @@ def get_internet_exit_nat_acl_name(self: AvdStructuredConfigNetworkServicesProto
def _filtered_internet_exit_policy_types(self: AvdStructuredConfigNetworkServicesProtocol) -> list:
return sorted({internet_exit_policy.type for internet_exit_policy, _connections in self._filtered_internet_exit_policies_and_connections})

@cached_property
def _l3_interface_acls(self: AvdStructuredConfigNetworkServicesProtocol) -> dict | None:
"""
Returns a dict of interfaces and ACLs set on the interfaces.

{
<interface_name>: {
"ipv4_acl_in": <generated_ipv4_acl>,
"ipv4_acl_out": <generated_ipv4_acl>,
}
}
Only contains interfaces with ACLs and only the ACLs that are set,
so use `get(self._l3_interface_acls, f"{interface_name}..ipv4_acl_in", separator="..")` to get the value.
"""
if not self.shared_utils.network_services_l3:
return None

l3_interface_acls = {}
for tenant in self.shared_utils.filtered_tenants:
for vrf in tenant.vrfs:
for l3_interface in vrf.l3_interfaces:
for interface_idx, interface in enumerate(l3_interface.interfaces):
if l3_interface.nodes[interface_idx] != self.shared_utils.hostname:
continue

ipv4_acl_in = l3_interface.ipv4_acl_in
ipv4_acl_out = l3_interface.ipv4_acl_out
if ipv4_acl_in is None and ipv4_acl_out is None:
continue
interface_name = interface
interface_ip: str | None = l3_interface.ip_addresses[interface_idx]
if interface_ip is not None:
interface_ip = get_ip_from_ip_prefix(interface_ip)
if ipv4_acl_in is not None:
l3_interface_acls.setdefault(interface_name, {})["ipv4_acl_in"] = self.shared_utils.get_ipv4_acl(
name=ipv4_acl_in,
interface_name=interface_name,
interface_ip=interface_ip,
)._as_dict()
if ipv4_acl_out is not None:
l3_interface_acls.setdefault(interface_name, {})["ipv4_acl_out"] = self.shared_utils.get_ipv4_acl(
name=ipv4_acl_out,
interface_name=interface_name,
interface_ip=interface_ip,
)._as_dict()
return l3_interface_acls

@cached_property
def _filtered_internet_exit_policies_and_connections(
self: AvdStructuredConfigNetworkServicesProtocol,
Expand Down
Loading