From 4bd7dfe9a9dd00b2ccd84fcd1b53665c87aa8884 Mon Sep 17 00:00:00 2001 From: Johanna England Date: Wed, 2 Aug 2023 15:45:38 +0200 Subject: [PATCH] Move validation of expressions to form Add validation of IP addresses --- python/nav/web/alertprofiles/forms.py | 65 ++++++++++++++++++++++++--- python/nav/web/alertprofiles/views.py | 53 +++++++++++----------- 2 files changed, 83 insertions(+), 35 deletions(-) diff --git a/python/nav/web/alertprofiles/forms.py b/python/nav/web/alertprofiles/forms.py index 2f38b3c775..9c35286be7 100644 --- a/python/nav/web/alertprofiles/forms.py +++ b/python/nav/web/alertprofiles/forms.py @@ -18,6 +18,8 @@ # pylint: disable=R0903 +from typing import Any, Dict + from django import forms from django.db.models import Q @@ -27,10 +29,10 @@ from nav.alertengine.dispatchers.email_dispatcher import Email from nav.alertengine.dispatchers.slack_dispatcher import Slack from nav.alertengine.dispatchers.sms_dispatcher import Sms - -from nav.models.profiles import MatchField, Filter, Expression, FilterGroup +from nav.models.profiles import Expression, Filter, FilterGroup, MatchField, Operator from nav.models.profiles import AlertProfile, TimePeriod, AlertSubscription from nav.models.profiles import AlertAddress, AlertSender +from nav.util import is_valid_cidr, is_valid_ip from nav.web.crispyforms import HelpField _ = lambda a: a # gettext variable (for future implementations) @@ -536,19 +538,31 @@ class ExpressionForm(forms.ModelForm): create expressions that can be used in a filter. """ - filter = forms.IntegerField(widget=forms.widgets.HiddenInput) - match_field = forms.IntegerField(widget=forms.widgets.HiddenInput) - value = forms.CharField(required=True) + filter = forms.ModelChoiceField( + queryset=Filter.objects.all(), widget=forms.widgets.HiddenInput + ) + match_field = forms.ModelChoiceField( + queryset=MatchField.objects.all(), widget=forms.widgets.HiddenInput + ) class Meta(object): model = Expression fields = '__all__' def __init__(self, *args, **kwargs): - match_field = kwargs.pop('match_field', None) + match_field = kwargs.pop('match_field', None) # add_expression + if not match_field: + match_field = args[0].get('match_field', None) # save_expression + self.match_field = match_field super(ExpressionForm, self).__init__(*args, **kwargs) - if isinstance(match_field, MatchField): + if not match_field: + return + + if not isinstance(match_field, MatchField): + match_field = MatchField.objects.get(pk=match_field) + + if True: # maintain indent for the sake off smaller diff! # Get all operators and make a choice field operators = match_field.operator_set.all() self.fields['operator'] = forms.models.ChoiceField( @@ -625,3 +639,40 @@ def __init__(self, *args, **kwargs): # At last we acctually add the multiple choice field. self.fields['value'] = forms.MultipleChoiceField(choices=choices) + else: + self.fields['value'] = forms.CharField(required=True) + + def clean(self) -> Dict[str, Any]: + validated_data = super().clean() + + match_field = validated_data["match_field"] + operator_type = int(validated_data["operator"]) + value = validated_data["value"] + + if match_field.data_type == MatchField.IP: + if operator_type == Operator.IN: + ip_list = value.split() + else: + ip_list = [value] + validated_ip_addresses = [] + for ip in ip_list: + if not is_valid_ip(ip=ip, strict=True) and not is_valid_cidr(cidr=ip): + self.add_error( + field="value", + error=forms.ValidationError(("Invalid IP address: %s" % ip)), + ) + else: + validated_ip_addresses.append(str(ip)) + # Bring ip address back into original format to be processed below + value = " ".join(validated_ip_addresses) + + if operator_type == Operator.IN: + """If input was a multiple choice list we have to join each option in one + string, where each option is separated by a | (pipe). + If input was a IP adress we should replace space with | (pipe).""" + if match_field.data_type == MatchField.IP: + validated_data["value"] = value.replace(' ', '|') + else: + validated_data["value"] = "|".join(value) + + return validated_data diff --git a/python/nav/web/alertprofiles/views.py b/python/nav/web/alertprofiles/views.py index 72abd2189c..ff554a21b2 100644 --- a/python/nav/web/alertprofiles/views.py +++ b/python/nav/web/alertprofiles/views.py @@ -24,7 +24,7 @@ # TODO Filter/filter_groups have owners, check that the account that performs # the operation is the owner -from django.http import HttpResponseRedirect +from django.http import HttpResponseRedirect, QueryDict from django.core.exceptions import ObjectDoesNotExist from django.db.models import Q from django.shortcuts import render @@ -1563,40 +1563,37 @@ def filter_addexpression(request): @requires_post('alertprofiles-filters') def filter_saveexpression(request): """Saves an expression to a filter""" - # Get the MatchField, Filter and Operator objects associated with the - # input POST-data - filtr = Filter.objects.get(pk=request.POST.get('filter')) - type_ = request.POST.get('operator') - match_field = MatchField.objects.get(pk=request.POST.get('match_field')) - operator = Operator.objects.get(type=type_, match_field=match_field.pk) + if request.POST.get('id'): + existing_expression = Expression.objects.get(pk=request.POST.get('id')) + form = ExpressionForm(request.POST, instance=existing_expression) + else: + form = ExpressionForm(request.POST) + + if not form.is_valid(): + dictionary = { + 'id': str(form.cleaned_data["filter"].pk), + 'matchfield': str(form.cleaned_data["match_field"].pk), + } + qdict = QueryDict("", mutable=True) + qdict.update(dictionary) + request.POST = qdict + new_message( + request, + form.errors, + Messages.ERROR, + ) + + return filter_addexpression(request=request) + + filtr = form.cleaned_data['filter'] if not account_owns_filters(get_account(request), filtr): return alertprofiles_response_forbidden( request, _('You do not own this filter.') ) - # Get the value - if operator.type == Operator.IN: - # If input was a multiple choice list we have to join each option - # in one string, where each option is separated by a | (pipe). - # If input was a IP adress we should replace space with | (pipe). - # FIXME We might want some data checks here - if match_field.data_type == MatchField.IP: - # FIXME We might want to check that it is a valid IP adress. - # If we do so, we need to remember both IPv4 and IPv6 - value = request.POST.get('value').replace(' ', '|') - else: - value = "|".join([value for value in request.POST.getlist('value')]) - else: - value = request.POST.get('value') + form.save() - expression = Expression( - filter=filtr, - match_field=match_field, - operator=operator.type, - value=value, - ) - expression.save() new_message( request, _('Added expression to filter %(name)s') % {'name': filtr.name},