Skip to content

Commit

Permalink
Small improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
badrogger committed Dec 30, 2024
1 parent 80f773b commit 71e1389
Showing 1 changed file with 66 additions and 77 deletions.
143 changes: 66 additions & 77 deletions core/schains/firewall/nftables.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@


import importlib
import ipaddress
import json
import logging
import multiprocessing
import os
from typing import Iterable, TypeVar
from typing import Iterable

from core.schains.firewall.types import IHostFirewallController, SChainRule

from tools.configs import NFT_CHAIN_BASE_PATH

T = TypeVar('T')


logger = logging.getLogger(__name__)

Expand All @@ -55,6 +52,69 @@ def __init__(self, table: str = TABLE, chain: str = CHAIN) -> None:
self.nft = self._nftables.Nftables()
self.nft.set_json_output(True)

Check warning on line 53 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L49-L53

Added lines #L49 - L53 were not covered by tests

@classmethod
def rule_to_expr(cls, rule: SChainRule, counter: bool = True) -> list:
expr = []

Check warning on line 57 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L57

Added line #L57 was not covered by tests

if rule.first_ip:
if rule.last_ip == rule.first_ip:
expr.append(

Check warning on line 61 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L59-L61

Added lines #L59 - L61 were not covered by tests
{
'match': {
'op': '==',
'left': {'payload': {'protocol': 'ip', 'field': 'saddr'}},
'right': f'{rule.first_ip}',
}
}
)
else:
expr.append(

Check warning on line 71 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L71

Added line #L71 was not covered by tests
{
'match': {
'op': '==',
'left': {'payload': {'protocol': 'ip', 'field': 'saddr'}},
'right': {'range': [f'{rule.first_ip}', f'{rule.last_ip}']},
}
}
)

if rule.port:
expr.append(

Check warning on line 82 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L81-L82

Added lines #L81 - L82 were not covered by tests
{
'match': {
'op': '==',
'left': {'payload': {'protocol': 'tcp', 'field': 'dport'}},
'right': rule.port,
}
}
)

if counter:
expr.append({'counter': None})

Check warning on line 93 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L92-L93

Added lines #L92 - L93 were not covered by tests

expr.append({'accept': None})
return expr

Check warning on line 96 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L95-L96

Added lines #L95 - L96 were not covered by tests

@classmethod
def expr_to_rule(self, expr: list) -> None:
port, first_ip, last_ip = None, None, None
for item in expr:
if 'match' in item:
match = item['match']

Check warning on line 103 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L100-L103

Added lines #L100 - L103 were not covered by tests

if match.get('left', {}).get('payload', {}).get('field') == 'dport':
port = match.get('right')

Check warning on line 106 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L105-L106

Added lines #L105 - L106 were not covered by tests

if match.get('left', {}).get('payload', {}).get('field') == 'saddr':
right = match.get('right')
if isinstance(right, str):
first_ip = right

Check warning on line 111 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L108-L111

Added lines #L108 - L111 were not covered by tests
else:
first_ip, last_ip = right['range']

Check warning on line 113 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L113

Added line #L113 was not covered by tests

if any([port, first_ip, last_ip]):
return SChainRule(port=port, first_ip=first_ip, last_ip=last_ip)

Check warning on line 116 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L115-L116

Added lines #L115 - L116 were not covered by tests

def _compose_json(self, commands: list[dict]) -> dict:
json_cmd = {'nftables': commands}
self.nft.json_validate(json_cmd)
Expand Down Expand Up @@ -190,73 +250,10 @@ def add_rule(self, rule: SChainRule) -> None:
]
)

rc, output, error = self.run_json_cmd(json_cmd)
rc, _, error = self.run_json_cmd(json_cmd)
if rc != 0:
raise NFTablesCmdFailedError(f'Failed to add allow rule: {error}')

Check warning on line 255 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L253-L255

Added lines #L253 - L255 were not covered by tests

@classmethod
def rule_to_expr(cls, rule: SChainRule, counter: bool = True) -> list:
expr = []

if rule.first_ip:
if rule.last_ip == rule.first_ip:
expr.append(
{
'match': {
'op': '==',
'left': {'payload': {'protocol': 'ip', 'field': 'saddr'}},
'right': f'{rule.first_ip}',
}
}
)
else:
expr.append(
{
'match': {
'op': '==',
'left': {'payload': {'protocol': 'ip', 'field': 'saddr'}},
'right': {'range': [f'{rule.first_ip}', f'{rule.last_ip}']},
}
}
)

if rule.port:
expr.append(
{
'match': {
'op': '==',
'left': {'payload': {'protocol': 'tcp', 'field': 'dport'}},
'right': rule.port,
}
}
)

if counter:
expr.append({'counter': None})

expr.append({'accept': None})
return expr

@classmethod
def expr_to_rule(self, expr: list) -> None:
port, first_ip, last_ip = None, None, None
for item in expr:
if 'match' in item:
match = item['match']

if match.get('left', {}).get('payload', {}).get('field') == 'dport':
port = match.get('right')

if match.get('left', {}).get('payload', {}).get('field') == 'saddr':
right = match.get('right')
if isinstance(right, str):
first_ip = right
else:
first_ip, last_ip = right['range']

if any([port, first_ip, last_ip]):
return SChainRule(port=port, first_ip=first_ip, last_ip=last_ip)

def remove_rule(self, rule: SChainRule) -> None:
if self.has_rule(rule):
expr = self.rule_to_expr(rule, counter=False)

Check warning on line 259 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L258-L259

Added lines #L258 - L259 were not covered by tests
Expand Down Expand Up @@ -297,7 +294,7 @@ def remove_rule(self, rule: SChainRule) -> None:
]
)

rc, output, error = self.run_json_cmd(json_cmd)
rc, _, error = self.run_json_cmd(json_cmd)
if rc != 0:
raise NFTablesCmdFailedError(f'Failed to delete rule: {error}')

Check warning on line 299 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L297-L299

Added lines #L297 - L299 were not covered by tests

Expand Down Expand Up @@ -328,14 +325,6 @@ def get_rules_by_policy(self, policy: str) -> list[SChainRule]:
logger.debug('Rules for policy %s: %s', policy, rules)
return rules

Check warning on line 326 in core/schains/firewall/nftables.py

View check run for this annotation

Codecov / codecov/patch

core/schains/firewall/nftables.py#L317-L326

Added lines #L317 - L326 were not covered by tests

@classmethod
def from_ip_network(cls, ip: str) -> str:
return str(ipaddress.ip_network(ip).hosts()[0])

@classmethod
def to_ip_network(cls, ip: str) -> str:
return str(ipaddress.ip_network(ip))

def get_plain_chain_rules(self) -> str:
self.nft.set_json_output(False)
output = ''
Expand Down

0 comments on commit 71e1389

Please sign in to comment.