-
Notifications
You must be signed in to change notification settings - Fork 598
/
Copy pathStateMachineDefinition.py
121 lines (104 loc) · 4.35 KB
/
StateMachineDefinition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from __future__ import annotations
import json
from typing import Any
import cfnlint.data.schemas.other.resources
import cfnlint.data.schemas.other.step_functions
import cfnlint.helpers
from cfnlint.helpers import is_function
from cfnlint.jsonschema import ValidationError, ValidationResult, Validator
from cfnlint.rules.jsonschema.CfnLintJsonSchema import CfnLintJsonSchema, SchemaDetails
from cfnlint.schema.resolver import RefResolver
class StateMachineDefinition(CfnLintJsonSchema):
id = "E3601"
shortdesc = "Validate the structure of a StateMachine definition"
description = (
"Validate the Definition or DefinitionString inside a "
"AWS::StepFunctions::StateMachine resource"
)
source_url = "https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-state-machine-structure.html"
tags = ["resources", "statemachine"]
def __init__(self):
super().__init__(
keywords=[
"Resources/AWS::StepFunctions::StateMachine/Properties",
# https://github.com/aws-cloudformation/cfn-lint/issues/3518
# Resources/AWS::StepFunctions::StateMachine/Properties/DefinitionString
],
schema_details=SchemaDetails(
cfnlint.data.schemas.other.step_functions, "statemachine.json"
),
all_matches=True,
)
store = {
"definition": self.schema,
}
self.resolver = RefResolver.from_schema(self.schema, store=store)
def _fix_message(self, err: ValidationError) -> ValidationError:
if len(err.path) > 1:
err.message = f"{err.message} at {'/'.join(err.path)!r}"
for i, c_err in enumerate(err.context):
err.context[i] = self._fix_message(c_err)
return err
def validate(
self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:
definition_keys = ["Definition"]
if not validator.cfn.has_serverless_transform():
definition_keys.append("DefinitionString")
# First time child rules are configured against the rule
# so we can run this now
for k in definition_keys:
value = instance.get(k)
fn_name, _ = is_function(value)
if fn_name:
continue
if not value:
continue
add_path_to_message = False
if validator.is_type(value, "string"):
try:
step_validator = validator.evolve(
context=validator.context.evolve(
functions=[],
),
resolver=self.resolver,
schema=self.schema,
)
value = json.loads(value)
add_path_to_message = True
except json.JSONDecodeError:
return
else:
step_validator = validator.evolve(
resolver=self.resolver,
schema=self.schema,
)
substitutions = []
props_substitutions = instance.get("DefinitionSubstitutions", {})
if validator.is_type(props_substitutions, "object"):
substitutions = list(props_substitutions.keys())
for err in step_validator.iter_errors(value):
if validator.is_type(err.instance, "string"):
if (
err.instance.replace("${", "").replace("}", "").strip()
in substitutions
):
continue
if add_path_to_message:
err = self._fix_message(err)
err.path.appendleft(k)
if err.schema in [True, False]:
err.message = (
f"Additional properties are not allowed ({err.path[-1]!r} "
"was unexpected)"
)
if not err.validator or (
not err.validator.startswith("fn_")
and err.validator not in ["cfnLint"]
):
err.rule = self
yield self._clean_error(err)