-
Notifications
You must be signed in to change notification settings - Fork 600
/
Copy pathRefFormat.py
94 lines (79 loc) · 3.17 KB
/
RefFormat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from __future__ import annotations
from copy import deepcopy
from typing import Any
from cfnlint.jsonschema import ValidationResult, Validator
from cfnlint.jsonschema._utils import Unset
from cfnlint.rules.formats._schema_comparer import compare_schemas
from cfnlint.rules.jsonschema import CfnLintKeyword
from cfnlint.schema import PROVIDER_SCHEMA_MANAGER
class RefFormat(CfnLintKeyword):
id = "E1041"
shortdesc = "Check if Ref matches destination format"
description = (
"When source and destination format exists validate that they match in a Ref"
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/best-practices.html#parmtypes"
tags = ["functions", "ref"]
def __init__(self):
super().__init__(["*"])
self.parent_rules = ["E1020"]
def _filter_schema(
self, validator: Validator, type_name: str, id: str, schema: dict[str, Any]
) -> dict[str, Any]:
if type_name != "AWS::EC2::SecurityGroup":
return schema
items = list(
validator.cfn.get_cfn_path(
["Resources", id, "Properties", "VpcId"], validator.context
)
)
if items:
# VpcId is specified and will have a value which means the returned value is
# "AWS::EC2::SecurityGroup.Id"
schema = deepcopy(schema)
schema.pop("anyOf")
schema["format"] = "AWS::EC2::SecurityGroup.Id"
return schema
# VpcId being None means it wasn't specified and the value is a Name
schema = deepcopy(schema)
schema.pop("anyOf")
schema["format"] = "AWS::EC2::SecurityGroup.Name"
return schema
def validate(
self, validator: Validator, _, instance: Any, schema: Any
) -> ValidationResult:
if instance in validator.context.parameters:
return
if instance not in validator.context.resources:
return
t = validator.context.resources[instance].type
for (
regions,
resource_schema,
) in PROVIDER_SCHEMA_MANAGER.get_resource_schemas_by_regions(
t, validator.context.regions
):
region = regions[0]
ref_schema = validator.context.resources[instance].ref(region)
ref_schema = self._filter_schema(validator, t, instance, ref_schema)
err = compare_schemas(schema, ref_schema)
if err:
if err.instance:
err.message = (
f"{{'Ref': {instance!r}}} with formats {err.instance!r} "
"does not match destination format of "
f"{err.schema.get('format')!r}"
)
else:
err.message = (
f"{{'Ref': {instance!r}}} does not match "
f"destination format of {err.schema.get('format')!r}"
)
err.instance = Unset()
err.schema = Unset()
err.rule = self
yield err