-
Notifications
You must be signed in to change notification settings - Fork 598
/
Copy pathServiceFargate.py
147 lines (133 loc) · 5.19 KB
/
ServiceFargate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from __future__ import annotations
from collections import deque
from typing import Any, Iterator
from cfnlint.helpers import ensure_list, is_function
from cfnlint.jsonschema import ValidationError, ValidationResult
from cfnlint.jsonschema.protocols import Validator
from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword
class ServiceFargate(CfnLintKeyword):
id = "E3054"
shortdesc = (
"Validate ECS service using Fargate uses TaskDefinition that allows Fargate"
)
description = (
"When using an ECS service with 'LaunchType' of 'FARGATE' "
"the associated task definition must have 'RequiresCompatibilities' "
"specified with 'FARGATE' listed"
)
tags = ["resources", "ecs"]
def __init__(self) -> None:
super().__init__(
keywords=["Resources/AWS::ECS::Service/Properties"],
)
def _filter_resource_name(self, instance: Any) -> str | None:
fn_k, fn_v = is_function(instance)
if fn_k is None:
return None
if fn_k == "Ref":
if isinstance(fn_v, str):
return fn_v
elif fn_k == "Fn::GetAtt":
name = ensure_list(fn_v)[0].split(".")[0]
if isinstance(name, str):
return name
return None
def _get_service_properties(
self, validator: Validator, instance: Any
) -> Iterator[tuple[str, str, Validator]]:
for task_definition_id, task_definition_validator in get_value_from_path(
validator, instance, deque(["TaskDefinition"])
):
task_definition_resource_name = self._filter_resource_name(
task_definition_id
)
if task_definition_resource_name is None:
continue
for (
launch_type,
launch_type_validator,
) in get_value_from_path(
task_definition_validator, instance, deque(["LaunchType"])
):
yield (
task_definition_resource_name,
launch_type,
launch_type_validator,
)
def _get_task_definition_properties(
self, validator: Validator, resource_name: Any
) -> Iterator[tuple[list[Any] | None, Validator]]:
task_definition, task_definition_validator = get_resource_by_name(
validator, resource_name, ["AWS::ECS::TaskDefinition"]
)
if not task_definition:
return
for capabilities, capabilities_validator in get_value_from_path(
task_definition_validator,
task_definition,
path=deque(["Properties", "RequiresCompatibilities"]),
):
for network_mode, network_mode_validator in get_value_from_path(
capabilities_validator,
task_definition,
path=deque(["Properties", "NetworkMode"]),
):
if network_mode == "awsvpc" or network_mode_validator.is_type(
network_mode, "object"
):
continue
if capabilities is None:
yield capabilities, capabilities_validator
continue
if not isinstance(capabilities, list):
continue
for capibility, _ in get_value_from_path(
network_mode_validator,
capabilities,
path=deque(["*"]),
):
if isinstance(capibility, dict) or capibility == "FARGATE":
break
else:
yield capabilities, capabilities_validator
def validate(
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:
for (
task_definition_resource_name,
launch_type,
service_validator,
) in self._get_service_properties(
validator,
instance,
):
if launch_type != "FARGATE":
continue
for (
capabilities,
capabilities_validator,
) in self._get_task_definition_properties(
service_validator,
task_definition_resource_name,
):
if capabilities is None:
yield ValidationError(
"'RequiresCompatibilities' is a required property",
validator="required",
rule=self,
path_override=deque(
list(capabilities_validator.context.path.path)[:-1]
),
)
continue
yield ValidationError(
f"{capabilities!r} does not contain items matching 'FARGATE'",
validator="contains",
rule=self,
path_override=capabilities_validator.context.path.path,
)