forked from aws-cloudformation/cfn-lint
-
Notifications
You must be signed in to change notification settings - Fork 2
/
StateMachineDefinition.py
115 lines (98 loc) · 4.08 KB
/
StateMachineDefinition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from __future__ import annotations
import json
from typing import Any
import cfnlint.data.schemas.other.resources
import cfnlint.data.schemas.other.step_functions
import cfnlint.helpers
from cfnlint.helpers import is_function
from cfnlint.jsonschema import ValidationError, ValidationResult, Validator
from cfnlint.rules.jsonschema.CfnLintJsonSchema import CfnLintJsonSchema, SchemaDetails
from cfnlint.schema.resolver import RefResolver
class StateMachineDefinition(CfnLintJsonSchema):
id = "E3601"
shortdesc = "Validate the structure of a StateMachine definition"
description = (
"Validate the Definition or DefinitionString inside a "
"AWS::StepFunctions::StateMachine resource"
)
source_url = "https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-state-machine-structure.html"
tags = ["resources", "statemachine"]
def __init__(self):
super().__init__(
keywords=[
"Resources/AWS::StepFunctions::StateMachine/Properties",
# https://github.com/aws-cloudformation/cfn-lint/issues/3518
# Resources/AWS::StepFunctions::StateMachine/Properties/DefinitionString
],
schema_details=SchemaDetails(
cfnlint.data.schemas.other.step_functions, "statemachine.json"
),
all_matches=True,
)
store = {
"definition": self.schema,
}
self.resolver = RefResolver.from_schema(self.schema, store=store)
def _fix_message(self, err: ValidationError) -> ValidationError:
if len(err.path) > 1:
err.message = f"{err.message} at {'/'.join(err.path)!r}"
for i, c_err in enumerate(err.context):
err.context[i] = self._fix_message(c_err)
return err
def validate(
self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:
definition_keys = ["Definition"]
if not validator.cfn.has_serverless_transform():
definition_keys.append("DefinitionString")
# First time child rules are configured against the rule
# so we can run this now
for k in definition_keys:
value = instance.get(k)
fn_name, _ = is_function(value)
if fn_name:
continue
if not value:
continue
add_path_to_message = False
if validator.is_type(value, "string"):
try:
step_validator = validator.evolve(
context=validator.context.evolve(
functions=[],
),
resolver=self.resolver,
schema=self.schema,
)
value = json.loads(value)
add_path_to_message = True
except json.JSONDecodeError:
return
else:
step_validator = validator.evolve(
resolver=self.resolver,
schema=self.schema,
)
substitutions = []
props_substitutions = instance.get("DefinitionSubstitutions", {})
if validator.is_type(props_substitutions, "object"):
substitutions = list(props_substitutions.keys())
for err in step_validator.iter_errors(value):
if validator.is_type(err.instance, "string"):
if (
err.instance.replace("${", "").replace("}", "").strip()
in substitutions
):
continue
if add_path_to_message:
err = self._fix_message(err)
err.path.appendleft(k)
if not err.validator.startswith("fn_") and err.validator not in [
"cfnLint"
]:
err.rule = self
yield self._clean_error(err)