-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvalidator.py
661 lines (551 loc) · 22.3 KB
/
validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
import copy
import ipaddress
import operator
import re
from functools import total_ordering, wraps
from urllib.parse import urlsplit, urlunsplit
class Promise:
"""
Base class for the proxy class created in the closure of the lazy function.
It's used to recognize promises in code.
"""
pass
def _lazy_proxy_unpickle(func, args, kwargs, *resultclasses):
return lazy(func, *resultclasses)(*args, **kwargs)
def lazy(func, *resultclasses):
"""
Turn any callable into a lazy evaluated callable. result classes or types
is required -- at least one is needed so that the automatic forcing of
the lazy evaluation code is triggered. Results are not memoized; the
function is evaluated on every access.
"""
@total_ordering
class __proxy__(Promise):
"""
Encapsulate a function call and act as a proxy for methods that are
called on the result of that function. The function is not evaluated
until one of the methods on the result is called.
"""
__prepared = False
def __init__(self, args, kw):
self.__args = args
self.__kw = kw
if not self.__prepared:
self.__prepare_class__()
self.__class__.__prepared = True
def __reduce__(self):
return (
_lazy_proxy_unpickle,
(func, self.__args, self.__kw) + resultclasses,
)
def __repr__(self):
return repr(self.__cast())
@classmethod
def __prepare_class__(cls):
for resultclass in resultclasses:
for type_ in resultclass.mro():
for method_name in type_.__dict__:
# All __promise__ return the same wrapper method, they
# look up the correct implementation when called.
if hasattr(cls, method_name):
continue
meth = cls.__promise__(method_name)
setattr(cls, method_name, meth)
cls._delegate_bytes = bytes in resultclasses
cls._delegate_text = str in resultclasses
if cls._delegate_bytes and cls._delegate_text:
raise ValueError(
"Cannot call lazy() with both bytes and text return types."
)
if cls._delegate_text:
cls.__str__ = cls.__text_cast
elif cls._delegate_bytes:
cls.__bytes__ = cls.__bytes_cast
@classmethod
def __promise__(cls, method_name):
# Builds a wrapper around some magic method
def __wrapper__(self, *args, **kw):
# Automatically triggers the evaluation of a lazy value and
# applies the given magic method of the result type.
res = func(*self.__args, **self.__kw)
return getattr(res, method_name)(*args, **kw)
return __wrapper__
def __text_cast(self):
return func(*self.__args, **self.__kw)
def __bytes_cast(self):
return bytes(func(*self.__args, **self.__kw))
def __bytes_cast_encoded(self):
return func(*self.__args, **self.__kw).encode()
def __cast(self):
if self._delegate_bytes:
return self.__bytes_cast()
elif self._delegate_text:
return self.__text_cast()
else:
return func(*self.__args, **self.__kw)
def __str__(self):
# object defines __str__(), so __prepare_class__() won't overload
# a __str__() method from the proxied class.
return str(self.__cast())
def __eq__(self, other):
if isinstance(other, Promise):
other = other.__cast()
return self.__cast() == other
def __lt__(self, other):
if isinstance(other, Promise):
other = other.__cast()
return self.__cast() < other
def __hash__(self):
return hash(self.__cast())
def __mod__(self, rhs):
if self._delegate_text:
return str(self) % rhs
return self.__cast() % rhs
def __add__(self, other):
return self.__cast() + other
def __radd__(self, other):
return other + self.__cast()
def __deepcopy__(self, memo):
# Instances of this class are effectively immutable. It's just a
# collection of functions. So we don't need to do anything
# complicated for copying.
memo[id(self)] = self
return self
@wraps(func)
def __wrapper__(*args, **kw):
# Creates the proxy object, instead of the actual value.
return __proxy__(args, kw)
return __wrapper__
def punycode(domain):
"""Return the Punycode of the given domain if it's non-ASCII."""
return domain.encode("idna").decode("ascii")
def is_iterable(x):
"An implementation independent way of checking for iterables"
try:
iter(x)
except TypeError:
return False
else:
return True
def is_valid_ipv6_address(ip_str):
"""
Return whether or not the `ip_str` string is a valid IPv6 address.
"""
try:
ipaddress.IPv6Address(ip_str)
except ValueError:
return False
return True
def validate_ipv6_address(value):
if not is_valid_ipv6_address(value):
raise ValidationError(
"Enter a valid IPv6 address.", code="invalid", params={"value": value}
)
def make_hashable(value):
"""
Attempt to make value hashable or raise a TypeError if it fails.
The returned value should generate the same hash for equal values.
"""
if isinstance(value, dict):
return tuple(
[
(key, make_hashable(nested_value))
for key, nested_value in sorted(value.items())
]
)
# Try hash to avoid converting a hashable iterable (e.g. string, frozenset)
# to a tuple.
try:
hash(value)
except TypeError:
if is_iterable(value):
return tuple(map(make_hashable, value))
# Non-hashable, non-iterable.
raise
return value
class ValidationError(Exception):
"""An error while validating data."""
def __init__(self, message, code=None, params=None):
"""
The `message` argument can be a single error, a list of errors, or a
dictionary that maps field names to lists of errors. What we define as
an "error" can be either a simple string or an instance of
ValidationError with its message attribute set, and what we define as
list or dictionary can be an actual `list` or `dict` or an instance
of ValidationError with its `error_list` or `error_dict` attribute set.
"""
super().__init__(message, code, params)
if isinstance(message, ValidationError):
if hasattr(message, "error_dict"):
message = message.error_dict
elif not hasattr(message, "message"):
message = message.error_list
else:
message, code, params = message.message, message.code, message.params
if isinstance(message, dict):
self.error_dict = {}
for field, messages in message.items():
if not isinstance(messages, ValidationError):
messages = ValidationError(messages)
self.error_dict[field] = messages.error_list
elif isinstance(message, list):
self.error_list = []
for message in message:
# Normalize plain strings to instances of ValidationError.
if not isinstance(message, ValidationError):
message = ValidationError(message)
if hasattr(message, "error_dict"):
self.error_list.extend(sum(message.error_dict.values(), []))
else:
self.error_list.extend(message.error_list)
else:
self.message = message
self.code = code
self.params = params
self.error_list = [self]
@property
def message_dict(self):
# Trigger an AttributeError if this ValidationError
# doesn't have an error_dict.
getattr(self, "error_dict")
return dict(self)
@property
def messages(self):
if hasattr(self, "error_dict"):
return sum(dict(self).values(), [])
return list(self)
def update_error_dict(self, error_dict):
if hasattr(self, "error_dict"):
for field, error_list in self.error_dict.items():
error_dict.setdefault(field, []).extend(error_list)
else:
error_dict.setdefault("__all__", []).extend(self.error_list)
return error_dict
def __iter__(self):
if hasattr(self, "error_dict"):
for field, errors in self.error_dict.items():
yield field, list(ValidationError(errors))
else:
for error in self.error_list:
message = error.message
if error.params:
message %= error.params
yield str(message)
def __str__(self):
if hasattr(self, "error_dict"):
return repr(dict(self))
return repr(list(self))
def __repr__(self):
return "ValidationError(%s)" % self
def __eq__(self, other):
if not isinstance(other, ValidationError):
return NotImplemented
return hash(self) == hash(other)
def __hash__(self):
if hasattr(self, "message"):
return hash(
(
self.message,
self.code,
make_hashable(self.params),
)
)
if hasattr(self, "error_dict"):
return hash(make_hashable(self.error_dict))
return hash(tuple(sorted(self.error_list, key=operator.attrgetter("message"))))
class RegexValidator:
regex = ""
message = ""
code = "invalid"
inverse_match = False
flags = 0
def __init__(
self, regex=None, message=None, code=None, inverse_match=None, flags=None
):
if regex is not None:
self.regex = regex
if message is not None:
self.message = message
if code is not None:
self.code = code
if inverse_match is not None:
self.inverse_match = inverse_match
if flags is not None:
self.flags = flags
if self.flags and not isinstance(self.regex, str):
raise TypeError(
"If the flags are set, regex must be a regular expression string."
)
self.regex = _lazy_re_compile(self.regex, self.flags)
def __call__(self, value):
"""
Validate that the input contains (or does *not* contain, if
inverse_match is True) a match for the regular expression.
"""
regex_matches = self.regex.search(str(value))
invalid_input = regex_matches if self.inverse_match else not regex_matches
if invalid_input:
raise ValidationError(self.message, code=self.code, params={"value": value})
def __eq__(self, other):
return (
isinstance(other, RegexValidator)
and self.regex.pattern == other.regex.pattern
and self.regex.flags == other.regex.flags
and (self.message == other.message)
and (self.code == other.code)
and (self.inverse_match == other.inverse_match)
)
empty = object()
def new_method_proxy(func):
def inner(self, *args):
if (_wrapped := self._wrapped) is empty:
self._setup()
_wrapped = self._wrapped
return func(_wrapped, *args)
inner._mask_wrapped = False
return inner
def unpickle_lazyobject(wrapped):
"""
Used to unpickle lazy objects. Just return its argument, which will be the
wrapped object.
"""
return wrapped
class LazyObject:
"""
A wrapper for another class that can be used to delay instantiation of the
wrapped class.
By subclassing, you have the opportunity to intercept and alter the
instantiation. If you don't need to do that, use SimpleLazyObject.
"""
# Avoid infinite recursion when tracing __init__ (#19456).
_wrapped = None
def __init__(self):
# Note: if a subclass overrides __init__(), it will likely need to
# override __copy__() and __deepcopy__() as well.
self._wrapped = empty
def __getattribute__(self, name):
if name == "_wrapped":
# Avoid recursion when getting wrapped object.
return super().__getattribute__(name)
value = super().__getattribute__(name)
# If attribute is a proxy method, raise an AttributeError to call
# __getattr__() and use the wrapped object method.
if not getattr(value, "_mask_wrapped", True):
raise AttributeError
return value
__getattr__ = new_method_proxy(getattr)
def __setattr__(self, name, value):
if name == "_wrapped":
# Assign to __dict__ to avoid infinite __setattr__ loops.
self.__dict__["_wrapped"] = value
else:
if self._wrapped is empty:
self._setup()
setattr(self._wrapped, name, value)
def __delattr__(self, name):
if name == "_wrapped":
raise TypeError("can't delete _wrapped.")
if self._wrapped is empty:
self._setup()
delattr(self._wrapped, name)
def _setup(self):
"""
Must be implemented by subclasses to initialize the wrapped object.
"""
raise NotImplementedError(
"subclasses of LazyObject must provide a _setup() method"
)
# Because we have messed with __class__ below, we confuse pickle as to what
# class we are pickling. We're going to have to initialize the wrapped
# object to successfully pickle it, so we might as well just pickle the
# wrapped object since they're supposed to act the same way.
#
# Unfortunately, if we try to simply act like the wrapped object, the ruse
# will break down when pickle gets our id(). Thus we end up with pickle
# thinking, in effect, that we are a distinct object from the wrapped
# object, but with the same __dict__. This can cause problems (see #25389).
#
# So instead, we define our own __reduce__ method and custom unpickler. We
# pickle the wrapped object as the unpickler's argument, so that pickle
# will pickle it normally, and then the unpickler simply returns its
# argument.
def __reduce__(self):
if self._wrapped is empty:
self._setup()
return (unpickle_lazyobject, (self._wrapped,))
def __copy__(self):
if self._wrapped is empty:
# If uninitialized, copy the wrapper. Use type(self), not
# self.__class__, because the latter is proxied.
return type(self)()
else:
# If initialized, return a copy of the wrapped object.
return copy.copy(self._wrapped)
def __deepcopy__(self, memo):
if self._wrapped is empty:
# We have to use type(self), not self.__class__, because the
# latter is proxied.
result = type(self)()
memo[id(self)] = result
return result
return copy.deepcopy(self._wrapped, memo)
__bytes__ = new_method_proxy(bytes)
__str__ = new_method_proxy(str)
__bool__ = new_method_proxy(bool)
# Introspection support
__dir__ = new_method_proxy(dir)
# Need to pretend to be the wrapped class, for the sake of objects that
# care about this (especially in equality tests)
__class__ = property(new_method_proxy(operator.attrgetter("__class__")))
__eq__ = new_method_proxy(operator.eq)
__lt__ = new_method_proxy(operator.lt)
__gt__ = new_method_proxy(operator.gt)
__ne__ = new_method_proxy(operator.ne)
__hash__ = new_method_proxy(hash)
# List/Tuple/Dictionary methods support
__getitem__ = new_method_proxy(operator.getitem)
__setitem__ = new_method_proxy(operator.setitem)
__delitem__ = new_method_proxy(operator.delitem)
__iter__ = new_method_proxy(iter)
__len__ = new_method_proxy(len)
__contains__ = new_method_proxy(operator.contains)
class SimpleLazyObject(LazyObject):
"""
A lazy object initialized from any function.
Designed for compound objects of unknown type. For builtins or objects of
known type, use django.utils.functional.lazy.
"""
def __init__(self, func):
"""
Pass in a callable that returns the object to be wrapped.
If copies are made of the resulting SimpleLazyObject, which can happen
in various circumstances within Django, then you must ensure that the
callable can be safely run more than once and will return the same
value.
"""
self.__dict__["_setupfunc"] = func
super().__init__()
def _setup(self):
self._wrapped = self._setupfunc()
# Return a meaningful representation of the lazy object for debugging
# without evaluating the wrapped object.
def __repr__(self):
if self._wrapped is empty:
repr_attr = self._setupfunc
else:
repr_attr = self._wrapped
return "<%s: %r>" % (type(self).__name__, repr_attr)
def __copy__(self):
if self._wrapped is empty:
# If uninitialized, copy the wrapper. Use SimpleLazyObject, not
# self.__class__, because the latter is proxied.
return SimpleLazyObject(self._setupfunc)
else:
# If initialized, return a copy of the wrapped object.
return copy.copy(self._wrapped)
def __deepcopy__(self, memo):
if self._wrapped is empty:
# We have to use SimpleLazyObject, not self.__class__, because the
# latter is proxied.
result = SimpleLazyObject(self._setupfunc)
memo[id(self)] = result
return result
return copy.deepcopy(self._wrapped, memo)
__add__ = new_method_proxy(operator.add)
@new_method_proxy
def __radd__(self, other):
return other + self
def _lazy_re_compile(regex, flags=0):
"""Lazily compile a regex with flags."""
def _compile():
# Compile the regex if it was not passed pre-compiled.
if isinstance(regex, (str, bytes)):
return re.compile(regex, flags)
else:
assert not flags, "flags must be empty if regex is passed pre-compiled"
return regex
return SimpleLazyObject(_compile)
class URLValidator(RegexValidator):
ul = "\u00a1-\uffff" # Unicode letters range (must not be a raw string).
# IP patterns
ipv4_re = (
r"(?:0|25[0-5]|2[0-4][0-9]|1[0-9]?[0-9]?|[1-9][0-9]?)"
r"(?:\.(?:0|25[0-5]|2[0-4][0-9]|1[0-9]?[0-9]?|[1-9][0-9]?)){3}"
)
ipv6_re = r"\[[0-9a-f:.]+\]" # (simple regex, validated later)
# Host patterns
hostname_re = (
r"[a-z" + ul + r"0-9](?:[a-z" + ul + r"0-9-]{0,61}[a-z" + ul + r"0-9])?"
)
# Max length for domain name labels is 63 characters per RFC 1034 sec. 3.1
domain_re = r"(?:\.(?!-)[a-z" + ul + r"0-9-]{1,63}(?<!-))*"
tld_re = (
r"\." # dot
r"(?!-)" # can't start with a dash
r"(?:[a-z" + ul + "-]{2,63}" # domain label
r"|xn--[a-z0-9]{1,59})" # or punycode label
r"(?<!-)" # can't end with a dash
r"\.?" # may have a trailing dot
)
host_re = "(" + hostname_re + domain_re + tld_re + "|localhost)"
regex = _lazy_re_compile(
r"^(?:[a-z0-9.+-]*)://" # scheme is validated separately
r"(?:[^\s:@/]+(?::[^\s:@/]*)?@)?" # user:pass authentication
r"(?:" + ipv4_re + "|" + ipv6_re + "|" + host_re + ")"
r"(?::[0-9]{1,5})?" # port
r"(?:[/?#][^\s]*)?" # resource path
r"\Z",
re.IGNORECASE,
)
message = "Enter a valid URL."
schemes = ["http", "https", "ftp", "ftps"]
unsafe_chars = frozenset("\t\r\n")
def __init__(self, schemes=None, **kwargs):
super().__init__(**kwargs)
if schemes is not None:
self.schemes = schemes
def __call__(self, value):
if not isinstance(value, str):
raise ValidationError(self.message, code=self.code, params={"value": value})
if self.unsafe_chars.intersection(value):
raise ValidationError(self.message, code=self.code, params={"value": value})
# Check if the scheme is valid.
scheme = value.split("://")[0].lower()
if scheme not in self.schemes:
raise ValidationError(self.message, code=self.code, params={"value": value})
# Then check full URL
try:
splitted_url = urlsplit(value)
except ValueError:
raise ValidationError(self.message, code=self.code, params={"value": value})
try:
super().__call__(value)
except ValidationError as e:
# Trivial case failed. Try for possible IDN domain
if value:
scheme, netloc, path, query, fragment = splitted_url
try:
netloc = punycode(netloc) # IDN -> ACE
except UnicodeError: # invalid domain part
raise e
url = urlunsplit((scheme, netloc, path, query, fragment))
super().__call__(url)
else:
raise
else:
# Now verify IPv6 in the netloc part
host_match = re.search(r"^\[(.+)\](?::[0-9]{1,5})?$", splitted_url.netloc)
if host_match:
potential_ip = host_match[1]
try:
validate_ipv6_address(potential_ip)
except ValidationError:
raise ValidationError(
self.message, code=self.code, params={"value": value}
)
# The maximum length of a full host name is 253 characters per RFC 1034
# section 3.1. It's defined to be 255 bytes or less, but this includes
# one byte for the length of the name and one byte for the trailing dot
# that's used to indicate absolute names in DNS.
if splitted_url.hostname is None or len(splitted_url.hostname) > 253:
raise ValidationError(self.message, code=self.code, params={"value": value})