diff --git a/ansible_collections/arista/avd/molecule/eos_designs_negative_unit_tests/inventory/host_vars/duplicate-interfaces-point-to-point-services-4.yml b/ansible_collections/arista/avd/molecule/eos_designs_negative_unit_tests/inventory/host_vars/duplicate-interfaces-point-to-point-services-4.yml index 64caf8906fc..73b1896be70 100644 --- a/ansible_collections/arista/avd/molecule/eos_designs_negative_unit_tests/inventory/host_vars/duplicate-interfaces-point-to-point-services-4.yml +++ b/ansible_collections/arista/avd/molecule/eos_designs_negative_unit_tests/inventory/host_vars/duplicate-interfaces-point-to-point-services-4.yml @@ -52,4 +52,4 @@ expected_error_message: >- Found duplicate objects with conflicting data while generating configuration for Network Services point-to-point EthernetInterfaces. Interface Ethernet6.1000 defined under tenants[TENANT_A].point_to_point_services[TEN_A_site2_site5_eline_port_based].endpoints[0] conflicts with {'name': 'Ethernet6.1000', 'description': None, 'shutdown': False, 'mtu': None, 'vrf': 'TEST', 'encapsulation_dot1q': - {'vlan': 1000}, 'ip_address': '10.42.255.0/31', 'access_group_in': None, 'access_group_out': None, 'peer_type': 'l3_interface', 'eos_cli': None}. + {'vlan': 1000}, 'ip_address': '10.42.255.0/31', 'peer_type': 'l3_interface', 'eos_cli': None}. diff --git a/ansible_collections/arista/avd/molecule/eos_designs_unit_tests/intended/structured_configs/DC1-BL1A.yml b/ansible_collections/arista/avd/molecule/eos_designs_unit_tests/intended/structured_configs/DC1-BL1A.yml index 0c081e53885..be3381f4fdf 100644 --- a/ansible_collections/arista/avd/molecule/eos_designs_unit_tests/intended/structured_configs/DC1-BL1A.yml +++ b/ansible_collections/arista/avd/molecule/eos_designs_unit_tests/intended/structured_configs/DC1-BL1A.yml @@ -185,13 +185,6 @@ ip_access_lists: protocol: ip source: any destination: 10.10.40.10 -- name: TEST-IPV4-ACL-WITH-IP-FIELDS-IN_Ethernet13.10 - entries: - - sequence: 15 - action: deny - protocol: ip - source: any - destination: 10.10.40.20 - name: TEST-IPV4-ACL-WITH-IP-FIELDS-OUT_Ethernet12 entries: - remark: Some remark will not require source and destination fields. @@ -199,6 +192,13 @@ ip_access_lists: protocol: ip source: 10.10.40.10 destination: any +- name: TEST-IPV4-ACL-WITH-IP-FIELDS-IN_Ethernet13.10 + entries: + - sequence: 15 + action: deny + protocol: ip + source: any + destination: 10.10.40.20 - name: TEST-IPV4-ACL-WITH-IP-FIELDS-OUT_Ethernet13.10 entries: - remark: Some remark will not require source and destination fields. diff --git a/ansible_collections/arista/avd/molecule/eos_designs_unit_tests/intended/structured_configs/DC1-BL1B.yml b/ansible_collections/arista/avd/molecule/eos_designs_unit_tests/intended/structured_configs/DC1-BL1B.yml index 61327414613..e47fe98ff3f 100644 --- a/ansible_collections/arista/avd/molecule/eos_designs_unit_tests/intended/structured_configs/DC1-BL1B.yml +++ b/ansible_collections/arista/avd/molecule/eos_designs_unit_tests/intended/structured_configs/DC1-BL1B.yml @@ -161,13 +161,6 @@ ip_access_lists: protocol: ip source: any destination: 10.10.50.10 -- name: TEST-IPV4-ACL-WITH-IP-FIELDS-IN_Ethernet13.10 - entries: - - sequence: 15 - action: deny - protocol: ip - source: any - destination: 10.10.50.20 - name: TEST-IPV4-ACL-WITH-IP-FIELDS-OUT_Ethernet12 entries: - remark: Some remark will not require source and destination fields. @@ -175,6 +168,13 @@ ip_access_lists: protocol: ip source: 10.10.50.10 destination: any +- name: TEST-IPV4-ACL-WITH-IP-FIELDS-IN_Ethernet13.10 + entries: + - sequence: 15 + action: deny + protocol: ip + source: any + destination: 10.10.50.20 - name: TEST-IPV4-ACL-WITH-IP-FIELDS-OUT_Ethernet13.10 entries: - remark: Some remark will not require source and destination fields. diff --git a/python-avd/pyavd/_eos_designs/structured_config/network_services/ethernet_interfaces.py b/python-avd/pyavd/_eos_designs/structured_config/network_services/ethernet_interfaces.py index fb1897fb8d5..d5aba50a40e 100644 --- a/python-avd/pyavd/_eos_designs/structured_config/network_services/ethernet_interfaces.py +++ b/python-avd/pyavd/_eos_designs/structured_config/network_services/ethernet_interfaces.py @@ -9,7 +9,7 @@ from pyavd._eos_cli_config_gen.schema import EosCliConfigGen from pyavd._eos_designs.structured_config.structured_config_generator import structured_config_contributor from pyavd._errors import AristaAvdError, AristaAvdInvalidInputsError -from pyavd._utils import get +from pyavd._utils import get_ip_from_ip_prefix from pyavd.j2filters import natural_sort if TYPE_CHECKING: @@ -83,6 +83,9 @@ def _set_l3_interfaces( continue interface_name = l3_interface.interfaces[node_index] + interface_ip = l3_interface.ip_addresses[node_index] + if "/" in interface_ip: + interface_ip = get_ip_from_ip_prefix(interface_ip) # if 'descriptions' is set, it is preferred interface_description = l3_interface.descriptions[node_index] if l3_interface.descriptions else l3_interface.description interface = EosCliConfigGen.EthernetInterfacesItem( @@ -106,11 +109,23 @@ def _set_l3_interfaces( if self.inputs.fabric_sflow.l3_interfaces is not None: interface.sflow.enable = self.inputs.fabric_sflow.l3_interfaces - if self._l3_interface_acls is not None: - interface._update( - access_group_in=get(self._l3_interface_acls, f"{interface_name}..ipv4_acl_in..name", separator=".."), - access_group_out=get(self._l3_interface_acls, f"{interface_name}..ipv4_acl_out..name", separator=".."), + if l3_interface.ipv4_acl_in: + acl = self.shared_utils.get_ipv4_acl( + name=l3_interface.ipv4_acl_in, + interface_name=interface_name, + interface_ip=interface_ip, + ) + interface.access_group_in = acl.name + self._set_ipv4_acl(acl) + + if l3_interface.ipv4_acl_out: + acl = self.shared_utils.get_ipv4_acl( + name=l3_interface.ipv4_acl_out, + interface_name=interface_name, + interface_ip=interface_ip, ) + interface.access_group_out = acl.name + self._set_ipv4_acl(acl) if "." in interface_name: # This is a subinterface so we need to ensure that the parent is created diff --git a/python-avd/pyavd/_eos_designs/structured_config/network_services/ip_access_lists.py b/python-avd/pyavd/_eos_designs/structured_config/network_services/ip_access_lists.py index 41ade49766c..d027e881f4f 100644 --- a/python-avd/pyavd/_eos_designs/structured_config/network_services/ip_access_lists.py +++ b/python-avd/pyavd/_eos_designs/structured_config/network_services/ip_access_lists.py @@ -3,14 +3,16 @@ # that can be found in the LICENSE file. from __future__ import annotations -from functools import cached_property from typing import TYPE_CHECKING, Literal, Protocol +from pyavd._eos_cli_config_gen.schema import EosCliConfigGen +from pyavd._eos_designs.structured_config.structured_config_generator import structured_config_contributor from pyavd._errors import AristaAvdError -from pyavd._utils import append_if_not_duplicate, get_ip_from_ip_prefix -from pyavd.j2filters import natural_sort +from pyavd._utils import get_ip_from_ip_prefix if TYPE_CHECKING: + from pyavd._eos_designs.schema import EosDesigns + from . import AvdStructuredConfigNetworkServicesProtocol @@ -21,23 +23,26 @@ class IpAccesslistsMixin(Protocol): Class should only be used as Mixin to a AvdStructuredConfig class. """ - @cached_property - def _acl_internet_exit_zscaler(self: AvdStructuredConfigNetworkServicesProtocol) -> dict: - return { - "name": self.get_internet_exit_nat_acl_name("zscaler"), - "entries": [ - { - "sequence": 10, - "action": "permit", - "protocol": "ip", - "source": "any", - "destination": "any", - }, - ], - } - - @cached_property - def _acl_internet_exit_direct(self: AvdStructuredConfigNetworkServicesProtocol) -> dict | None: + @structured_config_contributor + def ip_access_lists(self: AvdStructuredConfigNetworkServicesProtocol) -> None: + """Set the structured config for ip_access_lists.""" + for ie_policy_type in self._filtered_internet_exit_policy_types: + self._acl_internet_exit(ie_policy_type) + + def _set_ipv4_acl(self: AvdStructuredConfigNetworkServicesProtocol, ipv4_acl: EosDesigns.Ipv4AclsItem) -> None: + """ + Set structured config for ip_access_lists. + + Called for each interface in l3_interfaces and l3_port_channels when applying ipv4_acls + """ + self.structured_config.ip_access_lists.append(ipv4_acl._cast_as(EosCliConfigGen.IpAccessListsItem)) + + def _acl_internet_exit_zscaler(self: AvdStructuredConfigNetworkServicesProtocol) -> None: + ip_access_list = EosCliConfigGen.IpAccessListsItem(name=self.get_internet_exit_nat_acl_name("zscaler")) + ip_access_list.entries.append_new(sequence=10, action="permit", protocol="ip", source="any", destination="any") + self.structured_config.ip_access_lists.append(ip_access_list) + + def _acl_internet_exit_direct(self: AvdStructuredConfigNetworkServicesProtocol) -> None: interface_ips = set() for ie_policy, connections in self._filtered_internet_exit_policies_and_connections: if ie_policy.type == "direct": @@ -45,38 +50,24 @@ def _acl_internet_exit_direct(self: AvdStructuredConfigNetworkServicesProtocol) interface_ips.add(connection["source_interface_ip_address"]) if interface_ips: + acl = EosCliConfigGen.IpAccessListsItem(name=self.get_internet_exit_nat_acl_name("direct")) interface_ips = sorted(interface_ips) - entries = [] i = 0 for i, interface_ip in enumerate(interface_ips): - entries.append( - { - "sequence": 10 + i * 10, - "action": "deny", - "protocol": "ip", - "source": get_ip_from_ip_prefix(interface_ip), - "destination": "any", - }, - ) - entries.append( - { - "sequence": 20 + i * 10, - "action": "permit", - "protocol": "ip", - "source": "any", - "destination": "any", - }, + acl.entries.append_new(sequence=10 + i * 10, action="deny", protocol="ip", source=get_ip_from_ip_prefix(interface_ip), destination="any") + acl.entries.append_new( + sequence=20 + i * 10, + action="permit", + protocol="ip", + source="any", + destination="any", ) - return { - "name": self.get_internet_exit_nat_acl_name("direct"), - "entries": entries, - } - return None + self.structured_config.ip_access_lists.append(acl) def _acl_internet_exit_user_defined( self: AvdStructuredConfigNetworkServicesProtocol, internet_exit_policy_type: Literal["zscaler", "direct"] - ) -> list[dict] | None: + ) -> EosDesigns.Ipv4AclsItem | None: acl_name = self.get_internet_exit_nat_acl_name(internet_exit_policy_type) if acl_name not in self.inputs.ipv4_acls: # TODO: Evaluate if we should continue so we raise when there is no ACL. @@ -86,41 +77,19 @@ def _acl_internet_exit_user_defined( acl = self.shared_utils.get_ipv4_acl(acl_name, "random", interface_ip="random", peer_ip="random") if acl.name == acl_name: # ACL doesn't need replacement - return [acl._as_dict()] + return acl # TODO: We still have one nat for all interfaces, need to also add logic to make nat per interface # if acl needs substitution msg = f"ipv4_acls[name={acl_name}] field substitution is not supported for internet exit access lists" raise AristaAvdError(msg) - def _acl_internet_exit(self: AvdStructuredConfigNetworkServicesProtocol, internet_exit_policy_type: Literal["zscaler", "direct"]) -> list[dict] | None: + def _acl_internet_exit(self: AvdStructuredConfigNetworkServicesProtocol, internet_exit_policy_type: Literal["zscaler", "direct"]) -> None: acls = self._acl_internet_exit_user_defined(internet_exit_policy_type) if acls: - return acls - - if internet_exit_policy_type == "zscaler": - return [self._acl_internet_exit_zscaler] - if internet_exit_policy_type == "direct" and (acl := self._acl_internet_exit_direct): - return [acl] - return None - - @cached_property - def ip_access_lists(self: AvdStructuredConfigNetworkServicesProtocol) -> list | None: - """Return structured config for ip_access_lists.""" - ip_access_lists = [] - if self._svi_acls: - for interface_acls in self._svi_acls.values(): - for acl in interface_acls.values(): - append_if_not_duplicate(ip_access_lists, "name", acl, context="IPv4 Access lists for SVI", context_keys=["name"]) - - if self._l3_interface_acls: - for l3_interface_acl in self._l3_interface_acls.values(): - for acl in l3_interface_acl.values(): - append_if_not_duplicate(ip_access_lists, "name", acl, context="IPv4 Access lists for L3 interface", context_keys=["name"]) - - for ie_policy_type in self._filtered_internet_exit_policy_types: - acls = self._acl_internet_exit(ie_policy_type) - if acls: - ip_access_lists.extend(acls) + self.structured_config.ip_access_lists.append(acls._cast_as(EosCliConfigGen.IpAccessListsItem)) - return natural_sort(ip_access_lists, "name") or None + elif internet_exit_policy_type == "zscaler": + self._acl_internet_exit_zscaler() + elif internet_exit_policy_type == "direct": + self._acl_internet_exit_direct() diff --git a/python-avd/pyavd/_eos_designs/structured_config/network_services/utils_wan.py b/python-avd/pyavd/_eos_designs/structured_config/network_services/utils_wan.py index 0667773c856..2a49d11a798 100644 --- a/python-avd/pyavd/_eos_designs/structured_config/network_services/utils_wan.py +++ b/python-avd/pyavd/_eos_designs/structured_config/network_services/utils_wan.py @@ -8,7 +8,7 @@ from pyavd._eos_designs.schema import EosDesigns from pyavd._errors import AristaAvdError, AristaAvdInvalidInputsError, AristaAvdMissingVariableError -from pyavd._utils import get, get_ip_from_ip_prefix +from pyavd._utils import get from pyavd._utils.password_utils.password import simple_7_encrypt from pyavd.j2filters import natural_sort, range_expand @@ -418,51 +418,6 @@ def _local_path_groups_connected_to_pathfinder(self: AvdStructuredConfigNetworkS if any(wan_interface["connected_to_pathfinder"] for wan_interface in path_group._internal_data.interfaces) ] - @cached_property - def _svi_acls(self: AvdStructuredConfigNetworkServicesProtocol) -> dict[str, dict[str, dict]] | None: - """ - Returns a dict of SVI ACLs. - - : { - "ipv4_acl_in": , - "ipv4_acl_out": , - } - - Only contains interfaces with ACLs and only the ACLs that are set, - so use `get(self._svi_acls, f"{interface_name}.ipv4_acl_in")` to get the value. - """ - if not self.shared_utils.network_services_l3: - return None - - svi_acls = {} - for tenant in self.shared_utils.filtered_tenants: - for vrf in tenant.vrfs: - for svi in vrf.svis: - ipv4_acl_in = svi.ipv4_acl_in - ipv4_acl_out = svi.ipv4_acl_out - if ipv4_acl_in is None and ipv4_acl_out is None: - continue - - interface_name = f"Vlan{svi.id}" - interface_ip = svi.ip_address_virtual - if interface_ip is not None and "/" in interface_ip: - interface_ip = get_ip_from_ip_prefix(interface_ip) - - if ipv4_acl_in is not None: - svi_acls.setdefault(interface_name, {})["ipv4_acl_in"] = self.shared_utils.get_ipv4_acl( - name=ipv4_acl_in, - interface_name=interface_name, - interface_ip=interface_ip, - )._as_dict() - if ipv4_acl_out is not None: - svi_acls.setdefault(interface_name, {})["ipv4_acl_out"] = self.shared_utils.get_ipv4_acl( - name=ipv4_acl_out, - interface_name=interface_name, - interface_ip=interface_ip, - )._as_dict() - - return svi_acls - def get_internet_exit_nat_profile_name(self: AvdStructuredConfigNetworkServicesProtocol, internet_exit_policy_type: Literal["zscaler", "direct"]) -> str: if internet_exit_policy_type == "zscaler": return "NAT-IE-ZSCALER" @@ -475,53 +430,6 @@ def get_internet_exit_nat_acl_name(self: AvdStructuredConfigNetworkServicesProto def _filtered_internet_exit_policy_types(self: AvdStructuredConfigNetworkServicesProtocol) -> list: return sorted({internet_exit_policy.type for internet_exit_policy, _connections in self._filtered_internet_exit_policies_and_connections}) - @cached_property - def _l3_interface_acls(self: AvdStructuredConfigNetworkServicesProtocol) -> dict | None: - """ - Returns a dict of interfaces and ACLs set on the interfaces. - - { - : { - "ipv4_acl_in": , - "ipv4_acl_out": , - } - } - Only contains interfaces with ACLs and only the ACLs that are set, - so use `get(self._l3_interface_acls, f"{interface_name}..ipv4_acl_in", separator="..")` to get the value. - """ - if not self.shared_utils.network_services_l3: - return None - - l3_interface_acls = {} - for tenant in self.shared_utils.filtered_tenants: - for vrf in tenant.vrfs: - for l3_interface in vrf.l3_interfaces: - for interface_idx, interface in enumerate(l3_interface.interfaces): - if l3_interface.nodes[interface_idx] != self.shared_utils.hostname: - continue - - ipv4_acl_in = l3_interface.ipv4_acl_in - ipv4_acl_out = l3_interface.ipv4_acl_out - if ipv4_acl_in is None and ipv4_acl_out is None: - continue - interface_name = interface - interface_ip: str | None = l3_interface.ip_addresses[interface_idx] - if interface_ip is not None: - interface_ip = get_ip_from_ip_prefix(interface_ip) - if ipv4_acl_in is not None: - l3_interface_acls.setdefault(interface_name, {})["ipv4_acl_in"] = self.shared_utils.get_ipv4_acl( - name=ipv4_acl_in, - interface_name=interface_name, - interface_ip=interface_ip, - )._as_dict() - if ipv4_acl_out is not None: - l3_interface_acls.setdefault(interface_name, {})["ipv4_acl_out"] = self.shared_utils.get_ipv4_acl( - name=ipv4_acl_out, - interface_name=interface_name, - interface_ip=interface_ip, - )._as_dict() - return l3_interface_acls - @cached_property def _filtered_internet_exit_policies_and_connections( self: AvdStructuredConfigNetworkServicesProtocol, diff --git a/python-avd/pyavd/_eos_designs/structured_config/network_services/vlan_interfaces.py b/python-avd/pyavd/_eos_designs/structured_config/network_services/vlan_interfaces.py index e48471a2848..941226347c6 100644 --- a/python-avd/pyavd/_eos_designs/structured_config/network_services/vlan_interfaces.py +++ b/python-avd/pyavd/_eos_designs/structured_config/network_services/vlan_interfaces.py @@ -8,7 +8,7 @@ from pyavd._eos_cli_config_gen.schema import EosCliConfigGen from pyavd._eos_designs.structured_config.structured_config_generator import structured_config_contributor from pyavd._errors import AristaAvdInvalidInputsError -from pyavd._utils import default, get +from pyavd._utils import default, get_ip_from_ip_prefix from pyavd.api.interface_descriptions import InterfaceDescriptionData if TYPE_CHECKING: @@ -61,6 +61,9 @@ def _get_vlan_interface_config_for_svi( pim_source_interface_needed = False interface_name = f"Vlan{svi.id}" + interface_ip = svi.ip_address_virtual + if interface_ip is not None and "/" in interface_ip: + interface_ip = get_ip_from_ip_prefix(interface_ip) vlan_interface_config = EosCliConfigGen.VlanInterfacesItem( name=interface_name, tenant=tenant.name, @@ -70,12 +73,28 @@ def _get_vlan_interface_config_for_svi( ip_address=svi.ip_address, ipv6_address=svi.ipv6_address, ipv6_enable=svi.ipv6_enable, - access_group_in=get(self._svi_acls, f"{interface_name}.ipv4_acl_in.name"), - access_group_out=get(self._svi_acls, f"{interface_name}.ipv4_acl_out.name"), mtu=svi.mtu if self.shared_utils.platform_settings.feature_support.per_interface_mtu else None, eos_cli=svi.raw_eos_cli, ) + if svi.ipv4_acl_in: + acl = self.shared_utils.get_ipv4_acl( + name=svi.ipv4_acl_in, + interface_name=interface_name, + interface_ip=interface_ip, + ) + vlan_interface_config.access_group_in = acl.name + self._set_ipv4_acl(acl) + + if svi.ipv4_acl_out: + acl = self.shared_utils.get_ipv4_acl( + name=svi.ipv4_acl_out, + interface_name=interface_name, + interface_ip=interface_ip, + ) + vlan_interface_config.access_group_out = acl.name + self._set_ipv4_acl(acl) + if svi.structured_config: self.custom_structured_configs.nested.vlan_interfaces.obtain(interface_name)._deepmerge( svi.structured_config, list_merge=self.custom_structured_configs.list_merge_strategy