Skip to content

Commit

Permalink
[pfsense_nat_outbound] Allow for NET:<interface> addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
opoplawski committed Jan 6, 2024
1 parent 5ffa049 commit 9538d8a
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 52 deletions.
81 changes: 50 additions & 31 deletions plugins/module_utils/nat_outbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ def _params_to_obj(self):

obj = dict()
obj['descr'] = self.params['descr']
if self.params['state'] == 'present':
params = self.params
if params['state'] == 'present':
obj['sourceport'] = ''
obj['interface'] = self.pfsense.parse_interface(self.params['interface'])
obj['interface'] = self.pfsense.parse_interface(params['interface'])
self._get_ansible_param(obj, 'ipprotocol')
if obj['ipprotocol'] == 'inet46':
del obj['ipprotocol']
Expand All @@ -93,15 +94,15 @@ def _params_to_obj(self):
self._get_ansible_param_bool(obj, 'staticnatport', value='')
self._get_ansible_param_bool(obj, 'nosync', value='')

if 'after' in self.params and self.params['after'] is not None:
self.after = self.params['after']
if 'after' in params and params['after'] is not None:
self.after = params['after']

if 'before' in self.params and self.params['before'] is not None:
self.before = self.params['before']
if 'before' in params and params['before'] is not None:
self.before = params['before']

self._parse_address(obj, 'source', 'sourceport', True, 'network')
self._parse_address(obj, 'destination', 'dstport', False, 'address')
if self.params['invert']:
self._parse_address(obj, 'destination', 'dstport', False, 'network')
if params['invert']:
obj['destination']['not'] = None
self._parse_translated_address(obj)

Expand All @@ -120,32 +121,49 @@ def _parse_address(self, obj, field, field_port, allow_self, target):

param = self.params[field]
addr = param.split(':')
if len(addr) > 2:
if len(addr) > 3:
self.module.fail_json(msg='Cannot parse address %s' % (param))

address = addr[0]

ret = dict()
ports = addr[1] if len(addr) > 1 else None
if address == 'any':
if field == 'source':
ret[target] = 'any'
else:
ret['any'] = ''
# rule with this firewall
elif allow_self and address == '(self)':
ret[target] = '(self)'
elif self.pfsense.is_ipv4_address(address):
ret[target] = address + '/32'
elif self.pfsense.is_ipv4_network(address, False):
(addr, bits) = self.pfsense.parse_ip_network(address, False, False)
ret[target] = addr + '/' + str(bits)
elif self.pfsense.find_alias(address, 'host') is not None or self.pfsense.find_alias(address, 'network') is not None:
ret[target] = address

if address == 'NET':
interface = addr[1] if len(addr) > 1 else None
ports = addr[2] if len(addr) > 2 else None
if interface is None or interface == '':
self.module.fail_json(msg='Cannot parse address %s' % (param))

ret['network'] = self.pfsense.parse_interface(interface)
else:
self.module.fail_json(msg='Cannot parse address %s, not IP or alias' % (address))
ports = addr[1] if len(addr) > 1 else None
if address == 'any':
if field == 'source':
ret[target] = 'any'
else:
ret['any'] = ''
# rule with this firewall
elif allow_self and address == '(self)':
ret[target] = '(self)'
elif self.params['ipprotocol'] != 'inet6' and self.pfsense.is_ipv4_address(address):
ret[target] = address + '/32'
self.module.warn('Specifying an address without a CIDR prefix is depracated. Please add /32 if you want a single host address')
elif self.params['ipprotocol'] != 'inet4' and self.pfsense.is_ipv6_address(address):
ret[target] = address + '/128'
self.module.warn('Specifying an address without a CIDR prefix is depracated. Please add /128 if you want a single host address')
elif self.params['ipprotocol'] != 'inet6' and self.pfsense.is_ipv4_network(address, False):
(addr, bits) = self.pfsense.parse_ip_network(address, False, False)
ret[target] = addr + '/' + str(bits)
elif self.params['ipprotocol'] != 'inet4' and self.pfsense.is_ipv6_network(address, False):
(addr, bits) = self.pfsense.parse_ip_network(address, False, False)
ret[target] = addr + '/' + str(bits)
elif self.pfsense.find_alias(address, 'host') is not None or self.pfsense.find_alias(address, 'network') is not None:
ret[target] = address
else:
self.module.fail_json(msg='Cannot parse address %s, not %s network or alias' % (address, self.params['ipprotocol']))

self._parse_ports(obj, ports, field_port, param)
if ports is not None:
self._parse_ports(obj, ports, field_port, param)

obj[field] = ret

Expand Down Expand Up @@ -427,13 +445,14 @@ def _log_fields(self, before=None):

return values

@staticmethod
def _obj_address_to_log_field(rule, addr, target, port):
def _obj_address_to_log_field(self, rule, addr, target, port):
""" return formated address from dict """
field = ''
if addr in rule:
if target in rule[addr]:
field = rule[addr][target]
if self.pfsense.interfaces.find(rule[addr][target]):
field = 'NET:'
field += rule[addr][target]
elif addr == 'destination' and 'any' in rule[addr]:
field = 'any'

Expand All @@ -447,7 +466,7 @@ def _obj_to_log_fields(self, rule):
""" return formated source and destination from dict """
res = {}
res['source'] = self._obj_address_to_log_field(rule, 'source', 'network', 'sourceport')
res['destination'] = self._obj_address_to_log_field(rule, 'destination', 'address', 'dstport')
res['destination'] = self._obj_address_to_log_field(rule, 'destination', 'network', 'dstport')
res['interface'] = self.pfsense.get_interface_display_name(rule['interface'])

if rule['target'] == 'other-subnet':
Expand Down
4 changes: 2 additions & 2 deletions plugins/modules/pfsense_nat_outbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
choices: [ "any", "tcp", "udp", "tcp/udp", "icmp", "esp", "ah", "gre", "ipv6", "igmp", "carp", "pfsync" ]
type: str
source:
description: The matching source address, in {any,(self),ALIAS,NETWORK}[:port] format.
description: The matching source address, in {any,(self),ALIAS,NETWORK,NET:INTERFACE}[:port] format.
required: false
default: null
type: str
destination:
description: The matching destination address, in {any,ALIAS,NETWORK}[:port] format.
description: The matching destination address, in {any,ALIAS,NETWORK,NET:INTERFACE}[:port] format.
required: false
default: null
type: str
Expand Down
62 changes: 43 additions & 19 deletions tests/unit/plugins/modules/test_pfsense_nat_outbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,35 @@ def is_ipv4_address(address):
pass
return False

def parse_address(self, addr, field):
def parse_address(self, name, addr, field, invert=False):
""" return address parsed in dict """
parts = addr.split(':')
res = {}
port = None
if parts[0] == 'any':
if field == 'network':
res[field] = 'any'
else:
res['any'] = None
elif parts[0] == '(self)':
res[field] = '(self)'
elif parts[0] in ['lan', 'vpn', 'vt1', 'lan_100']:
res[field] = self.unalias_interface(parts[0])
if parts[0] == 'NET':
res[field] = parts[1]
if len(parts) > 2:
port = parts[2].replace('-', ':')
else:
res[field] = parts[0]
if parts[0] == 'any':
if name == 'source':
res[field] = 'any'
else:
res['any'] = None
elif parts[0] == '(self)':
res[field] = '(self)'
elif parts[0] in ['lan', 'vpn', 'vt1', 'lan_100']:
res[field] = self.unalias_interface(parts[0])
else:
res[field] = parts[0]

if field in res and self.is_ipv4_address(res[field]) and res[field].find('/') == -1:
res[field] += '/32'
if field in res and self.is_ipv4_address(res[field]) and res[field].find('/') == -1:
res[field] += '/32'

if len(parts) > 1:
port = parts[1].replace('-', ':')
if len(parts) > 1:
port = parts[1].replace('-', ':')
if invert:
res['not'] = None

return (res, port)

Expand All @@ -68,9 +75,9 @@ def reparse_network(value):
return '2.3.4.0/24'
return value

def check_addr(self, params, target_elt, addr, field, port):
def check_addr(self, params, target_elt, addr, field, port, invert=False):
""" test the addresses definition """
(addr_dict, port_value) = self.parse_address(params[addr], field)
(addr_dict, port_value) = self.parse_address(addr, params[addr], field, invert=invert)
addr_elt = self.assert_find_xml_elt(target_elt, addr)
for key, value in addr_dict.items():
self.check_value_equal(addr_elt, key, self.reparse_network(value))
Expand Down Expand Up @@ -107,12 +114,11 @@ def md5(value):
def check_target_elt(self, obj, target_elt, target_idx=-1):
""" test the xml definition """
self.check_addr(obj, target_elt, 'source', 'network', 'sourceport')
self.check_addr(obj, target_elt, 'destination', 'address', 'dstport')
self.check_addr(obj, target_elt, 'destination', 'network', 'dstport', invert=obj.get('invert'))
self.check_target_addr(obj, target_elt)

self.check_param_equal_or_not_find(obj, target_elt, 'disabled')
self.check_param_equal_or_not_find(obj, target_elt, 'nonat')
self.check_param_equal_or_not_find(obj, target_elt, 'invert')
self.check_param_equal_or_not_find(obj, target_elt, 'staticnatport')
self.check_param_equal_or_not_find(obj, target_elt, 'nosync')
self.check_param_equal_or_not_find(obj, target_elt, 'nonat')
Expand Down Expand Up @@ -191,6 +197,24 @@ def test_nat_outbound_create_networks(self):
command = "create nat_outbound 'https-source-rewriting', interface='lan', source='1.2.3.4/24', destination='2.3.4.5/24:443'"
self.do_module_test(obj, command=command, target_idx=3)

def test_nat_outbound_create_networks_invert(self):
""" test """
obj = dict(descr='https-source-rewriting', interface='lan', source='1.2.3.4/24', destination='2.3.4.5/24:443', invert=True)
command = "create nat_outbound 'https-source-rewriting', interface='lan', source='1.2.3.4/24', destination='2.3.4.5/24:443', invert=True"
self.do_module_test(obj, command=command, target_idx=3)

def test_nat_outbound_create_interface_destination_network(self):
""" test """
obj = dict(descr='https-source-rewriting', interface='lan', source='1.2.3.4/24', destination='NET:lan:443')
command = "create nat_outbound 'https-source-rewriting', interface='lan', source='1.2.3.4/24', destination='NET:lan:443'"
self.do_module_test(obj, command=command, target_idx=3)

def test_nat_outbound_create_interface_source_network(self):
""" test """
obj = dict(descr='https-source-rewriting', interface='lan', source='NET:lan', destination='2.3.4.5/24:443')
command = "create nat_outbound 'https-source-rewriting', interface='lan', source='NET:lan', destination='2.3.4.5/24:443'"
self.do_module_test(obj, command=command, target_idx=3)

def test_nat_outbound_create_top(self):
""" test """
obj = dict(descr='https-source-rewriting', interface='lan', source='any', destination='1.2.3.4:443', after='top')
Expand Down

0 comments on commit 9538d8a

Please sign in to comment.