From 36419bc3c9f4f3b3d5b75c38ab816814fb6af511 Mon Sep 17 00:00:00 2001 From: Johanna England Date: Wed, 16 Aug 2023 12:27:31 +0200 Subject: [PATCH 1/3] Fix typos --- python/nav/web/alertprofiles/forms.py | 6 +++--- python/nav/web/alertprofiles/views.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/nav/web/alertprofiles/forms.py b/python/nav/web/alertprofiles/forms.py index 75105ac0a4..2f38b3c775 100644 --- a/python/nav/web/alertprofiles/forms.py +++ b/python/nav/web/alertprofiles/forms.py @@ -559,7 +559,7 @@ def __init__(self, *args, **kwargs): # Values are selected from a multiple choice list. # Populate that list with possible choices. - # MatcField stores which table and column alert engine should + # MatchField stores which table and column alert engine should # watch, as well as a table and column for "friendly" names in # the GUI and how we should sort the fields in the GUI (if we # are displaying a list) @@ -605,8 +605,8 @@ def __init__(self, *args, **kwargs): choices = [] for obj in model_objects: - # ID is what is acctually used in the expression that will - # be evaluted by alert engine + # ID is what is actually used in the expression that will + # be evaluated by alert engine ident = getattr(obj, attname) if model == name_model: diff --git a/python/nav/web/alertprofiles/views.py b/python/nav/web/alertprofiles/views.py index 829aba4a02..72abd2189c 100644 --- a/python/nav/web/alertprofiles/views.py +++ b/python/nav/web/alertprofiles/views.py @@ -1513,7 +1513,7 @@ def filter_remove(request): @requires_post('alertprofiles-filters', ('id', 'matchfield')) def filter_addexpression(request): - """Shows the form to add en expression to a filter""" + """Shows the form to add an expression to a filter""" try: filtr = Filter.objects.get(pk=request.POST.get('id')) except Filter.DoesNotExist: From a57959a6d4b725e7156221f13d2924b68d07dd2b Mon Sep 17 00:00:00 2001 From: Johanna England Date: Wed, 2 Aug 2023 15:45:38 +0200 Subject: [PATCH 2/3] Move validation of expressions to form Add validation of IP addresses --- python/nav/web/alertprofiles/forms.py | 65 ++++++++++++++++++++++++--- python/nav/web/alertprofiles/views.py | 53 +++++++++++----------- 2 files changed, 83 insertions(+), 35 deletions(-) diff --git a/python/nav/web/alertprofiles/forms.py b/python/nav/web/alertprofiles/forms.py index 2f38b3c775..9c35286be7 100644 --- a/python/nav/web/alertprofiles/forms.py +++ b/python/nav/web/alertprofiles/forms.py @@ -18,6 +18,8 @@ # pylint: disable=R0903 +from typing import Any, Dict + from django import forms from django.db.models import Q @@ -27,10 +29,10 @@ from nav.alertengine.dispatchers.email_dispatcher import Email from nav.alertengine.dispatchers.slack_dispatcher import Slack from nav.alertengine.dispatchers.sms_dispatcher import Sms - -from nav.models.profiles import MatchField, Filter, Expression, FilterGroup +from nav.models.profiles import Expression, Filter, FilterGroup, MatchField, Operator from nav.models.profiles import AlertProfile, TimePeriod, AlertSubscription from nav.models.profiles import AlertAddress, AlertSender +from nav.util import is_valid_cidr, is_valid_ip from nav.web.crispyforms import HelpField _ = lambda a: a # gettext variable (for future implementations) @@ -536,19 +538,31 @@ class ExpressionForm(forms.ModelForm): create expressions that can be used in a filter. """ - filter = forms.IntegerField(widget=forms.widgets.HiddenInput) - match_field = forms.IntegerField(widget=forms.widgets.HiddenInput) - value = forms.CharField(required=True) + filter = forms.ModelChoiceField( + queryset=Filter.objects.all(), widget=forms.widgets.HiddenInput + ) + match_field = forms.ModelChoiceField( + queryset=MatchField.objects.all(), widget=forms.widgets.HiddenInput + ) class Meta(object): model = Expression fields = '__all__' def __init__(self, *args, **kwargs): - match_field = kwargs.pop('match_field', None) + match_field = kwargs.pop('match_field', None) # add_expression + if not match_field: + match_field = args[0].get('match_field', None) # save_expression + self.match_field = match_field super(ExpressionForm, self).__init__(*args, **kwargs) - if isinstance(match_field, MatchField): + if not match_field: + return + + if not isinstance(match_field, MatchField): + match_field = MatchField.objects.get(pk=match_field) + + if True: # maintain indent for the sake off smaller diff! # Get all operators and make a choice field operators = match_field.operator_set.all() self.fields['operator'] = forms.models.ChoiceField( @@ -625,3 +639,40 @@ def __init__(self, *args, **kwargs): # At last we acctually add the multiple choice field. self.fields['value'] = forms.MultipleChoiceField(choices=choices) + else: + self.fields['value'] = forms.CharField(required=True) + + def clean(self) -> Dict[str, Any]: + validated_data = super().clean() + + match_field = validated_data["match_field"] + operator_type = int(validated_data["operator"]) + value = validated_data["value"] + + if match_field.data_type == MatchField.IP: + if operator_type == Operator.IN: + ip_list = value.split() + else: + ip_list = [value] + validated_ip_addresses = [] + for ip in ip_list: + if not is_valid_ip(ip=ip, strict=True) and not is_valid_cidr(cidr=ip): + self.add_error( + field="value", + error=forms.ValidationError(("Invalid IP address: %s" % ip)), + ) + else: + validated_ip_addresses.append(str(ip)) + # Bring ip address back into original format to be processed below + value = " ".join(validated_ip_addresses) + + if operator_type == Operator.IN: + """If input was a multiple choice list we have to join each option in one + string, where each option is separated by a | (pipe). + If input was a IP adress we should replace space with | (pipe).""" + if match_field.data_type == MatchField.IP: + validated_data["value"] = value.replace(' ', '|') + else: + validated_data["value"] = "|".join(value) + + return validated_data diff --git a/python/nav/web/alertprofiles/views.py b/python/nav/web/alertprofiles/views.py index 72abd2189c..ff554a21b2 100644 --- a/python/nav/web/alertprofiles/views.py +++ b/python/nav/web/alertprofiles/views.py @@ -24,7 +24,7 @@ # TODO Filter/filter_groups have owners, check that the account that performs # the operation is the owner -from django.http import HttpResponseRedirect +from django.http import HttpResponseRedirect, QueryDict from django.core.exceptions import ObjectDoesNotExist from django.db.models import Q from django.shortcuts import render @@ -1563,40 +1563,37 @@ def filter_addexpression(request): @requires_post('alertprofiles-filters') def filter_saveexpression(request): """Saves an expression to a filter""" - # Get the MatchField, Filter and Operator objects associated with the - # input POST-data - filtr = Filter.objects.get(pk=request.POST.get('filter')) - type_ = request.POST.get('operator') - match_field = MatchField.objects.get(pk=request.POST.get('match_field')) - operator = Operator.objects.get(type=type_, match_field=match_field.pk) + if request.POST.get('id'): + existing_expression = Expression.objects.get(pk=request.POST.get('id')) + form = ExpressionForm(request.POST, instance=existing_expression) + else: + form = ExpressionForm(request.POST) + + if not form.is_valid(): + dictionary = { + 'id': str(form.cleaned_data["filter"].pk), + 'matchfield': str(form.cleaned_data["match_field"].pk), + } + qdict = QueryDict("", mutable=True) + qdict.update(dictionary) + request.POST = qdict + new_message( + request, + form.errors, + Messages.ERROR, + ) + + return filter_addexpression(request=request) + + filtr = form.cleaned_data['filter'] if not account_owns_filters(get_account(request), filtr): return alertprofiles_response_forbidden( request, _('You do not own this filter.') ) - # Get the value - if operator.type == Operator.IN: - # If input was a multiple choice list we have to join each option - # in one string, where each option is separated by a | (pipe). - # If input was a IP adress we should replace space with | (pipe). - # FIXME We might want some data checks here - if match_field.data_type == MatchField.IP: - # FIXME We might want to check that it is a valid IP adress. - # If we do so, we need to remember both IPv4 and IPv6 - value = request.POST.get('value').replace(' ', '|') - else: - value = "|".join([value for value in request.POST.getlist('value')]) - else: - value = request.POST.get('value') + form.save() - expression = Expression( - filter=filtr, - match_field=match_field, - operator=operator.type, - value=value, - ) - expression.save() new_message( request, _('Added expression to filter %(name)s') % {'name': filtr.name}, From ef272377ed1929510452f8d34de1c418fdc47d44 Mon Sep 17 00:00:00 2001 From: Johanna England Date: Fri, 4 Aug 2023 13:32:01 +0200 Subject: [PATCH 3/3] Add tests for adding expression to filters --- tests/integration/web/alertprofiles_test.py | 197 ++++++++++++++++++++ 1 file changed, 197 insertions(+) diff --git a/tests/integration/web/alertprofiles_test.py b/tests/integration/web/alertprofiles_test.py index 3e924cad6f..1cb5a6a493 100644 --- a/tests/integration/web/alertprofiles_test.py +++ b/tests/integration/web/alertprofiles_test.py @@ -14,6 +14,10 @@ AlertPreference, AlertProfile, AlertSender, + Expression, + Filter, + MatchField, + Operator, ) from nav.web.alertprofiles.views import set_active_profile @@ -162,6 +166,192 @@ def test_alertprofiles_add_public_filter_should_succeed(client): assert response.status_code == 200 +def test_alertprofiles_add_expression_with_valid_ipv4_address_should_succeed( + client, dummy_filter +): + """Tests that an expression with a valid IPv4 address can be added""" + ip_match_field = MatchField.objects.get(data_type=MatchField.IP) + url = reverse("alertprofiles-filters-saveexpression") + data = { + "filter": dummy_filter.pk, + "match_field": ip_match_field.pk, + "operator": Operator.EQUALS, + "value": "172.0.0.1", + } + response = client.post(url, data=data, follow=True) + assert response.status_code == 200 + assert Expression.objects.filter( + filter=dummy_filter, + match_field=ip_match_field, + operator=Operator.EQUALS, + value=data["value"], + ).exists() + assert f"Added expression to filter {dummy_filter}" in smart_str(response.content) + + +def test_alertprofiles_add_expression_with_valid_ipv6_address_should_succeed( + client, dummy_filter +): + """Tests that an expression with a valid IPv6 address can be added""" + url = reverse("alertprofiles-filters-saveexpression") + ip_match_field = MatchField.objects.get(data_type=MatchField.IP) + data = { + "filter": dummy_filter.pk, + "match_field": ip_match_field.pk, + "operator": Operator.EQUALS, + "value": "2001:db8:3333:4444:5555:6666:7777:8888", + } + response = client.post(url, data=data, follow=True) + assert response.status_code == 200 + assert Expression.objects.filter( + filter=dummy_filter, + match_field=ip_match_field, + operator=Operator.EQUALS, + value=data["value"], + ).exists() + assert f"Added expression to filter {dummy_filter}" in smart_str(response.content) + + +def test_alertprofiles_add_expression_with_valid_cidr_address_should_succeed( + client, dummy_filter +): + """Tests that an expression with a valid CIDR address can be added""" + url = reverse("alertprofiles-filters-saveexpression") + ip_match_field = MatchField.objects.get(data_type=MatchField.IP) + data = { + "filter": dummy_filter.pk, + "match_field": ip_match_field.pk, + "operator": Operator.EQUALS, + "value": "129.241.190.0/24", + } + response = client.post(url, data=data, follow=True) + assert response.status_code == 200 + assert Expression.objects.filter( + filter=dummy_filter, + match_field=ip_match_field, + operator=Operator.EQUALS, + value=data["value"], + ).exists() + assert f"Added expression to filter {dummy_filter}" in smart_str(response.content) + + +def test_alertprofiles_add_expression_with_non_valid_ip_address_should_fail( + client, dummy_filter +): + """Tests that an expression with a not valid IP address cannot be added""" + ip_match_field = MatchField.objects.get(data_type=MatchField.IP) + url = reverse("alertprofiles-filters-saveexpression") + data = { + "filter": dummy_filter.pk, + "match_field": ip_match_field.pk, + "operator": Operator.EQUALS, + "value": "wrong", + } + response = client.post(url, data=data, follow=True) + assert response.status_code == 200 + assert not Expression.objects.filter( + filter=dummy_filter, + match_field=ip_match_field, + operator=Operator.EQUALS, + value=data["value"], + ).exists() + assert f"Invalid IP address: {data['value']}" in smart_str(response.content) + + +def test_alertprofiles_add_expression_with_non_valid_cidr_address_should_fail( + client, dummy_filter +): + """Tests that an expression with a not valid CIDR address cannot be added""" + ip_match_field = MatchField.objects.get(data_type=MatchField.IP) + url = reverse("alertprofiles-filters-saveexpression") + data = { + "filter": dummy_filter.pk, + "match_field": ip_match_field.pk, + "operator": Operator.EQUALS, + "value": "10.0.2.1/28", + } + response = client.post(url, data=data, follow=True) + assert response.status_code == 200 + assert not Expression.objects.filter( + filter=dummy_filter, + match_field=ip_match_field, + operator=Operator.EQUALS, + value=data["value"], + ).exists() + assert f"Invalid IP address: {data['value']}" in smart_str(response.content) + + +def test_alertprofiles_add_expression_with_multiple_valid_ip_addresses_should_succeed( + client, dummy_filter +): + """Tests that an expression with multiple valid IP addresses can be added""" + ip_match_field = MatchField.objects.get(data_type=MatchField.IP) + url = reverse("alertprofiles-filters-saveexpression") + data = { + "filter": dummy_filter.pk, + "match_field": ip_match_field.pk, + "operator": Operator.IN, + "value": "172.0.0.1 2001:db8:3333:4444:5555:6666:7777:8888 129.241.190.0/24", + } + response = client.post(url, data=data, follow=True) + assert response.status_code == 200 + assert Expression.objects.filter( + filter=dummy_filter, + match_field=ip_match_field, + operator=Operator.IN, + value=data["value"].replace(' ', '|'), + ).exists() + assert f"Added expression to filter {dummy_filter}" in smart_str(response.content) + + +def test_alertprofiles_add_expression_with_multiple_non_valid_ip_addresses_should_fail( + client, dummy_filter +): + """Tests that an expression with a not valid IP address cannot be added""" + ip_match_field = MatchField.objects.get(data_type=MatchField.IP) + valid_ip = "172.0.0.1" + invalid_ip = "wrong" + url = reverse("alertprofiles-filters-saveexpression") + data = { + "filter": dummy_filter.pk, + "match_field": ip_match_field.pk, + "operator": Operator.IN, + "value": f"{valid_ip} {invalid_ip}", + } + response = client.post(url, data=data, follow=True) + assert response.status_code == 200 + assert not Expression.objects.filter( + filter=dummy_filter, + match_field=ip_match_field, + operator=Operator.IN, + value=data["value"], + ).exists() + assert f"Invalid IP address: {invalid_ip}" in smart_str(response.content) + + +def test_alertprofiles_add_expression_with_multiple_alert_types_should_succeed( + client, dummy_filter +): + """Tests that an expression with multiple alert types can be added""" + string_match_field = MatchField.objects.get(name="Alert type") + url = reverse("alertprofiles-filters-saveexpression") + data = { + "filter": dummy_filter.pk, + "match_field": string_match_field.pk, + "operator": Operator.IN, + "value": ["apDown", "apUp"], + } + response = client.post(url, data=data, follow=True) + assert response.status_code == 200 + assert Expression.objects.filter( + filter=dummy_filter, + match_field=string_match_field, + operator=Operator.IN, + value="|".join(data["value"]), + ).exists() + assert f"Added expression to filter {dummy_filter}" in smart_str(response.content) + + def test_set_accountgroup_permissions_should_not_crash(db, client): """Regression test for #2281""" url = reverse('alertprofiles-permissions-save') @@ -252,3 +442,10 @@ def activated_dummy_profile(dummy_profile): ) preference.save() return dummy_profile + + +@pytest.fixture(scope="function") +def dummy_filter(): + filtr = Filter(name="dummy", owner=Account.objects.get(id=Account.ADMIN_ACCOUNT)) + filtr.save() + return filtr