Skip to content

Commit

Permalink
Script to create release JSON schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong committed Aug 21, 2024
1 parent 439806b commit 37ed272
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 5 deletions.
95 changes: 95 additions & 0 deletions scripts/release_schemas/_translator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

# Translate cfn-lint unique keywords into json schema keywords
import logging
from collections import deque
from typing import Any, Iterator

from cfnlint.schema import PROVIDER_SCHEMA_MANAGER

logger = logging.getLogger(__name__)


def required_xor(properties: list[str]) -> dict[str, list[Any]]:

return {"oneOf": [{"required": [p]} for p in properties]}


def dependent_excluded(properties: dict[str, list[str]]) -> dict[str, list[Any]]:
dependencies: dict[str, Any] = {"dependencies": {}}
for prop, exclusions in properties.items():
dependencies["dependencies"][prop] = {"not": {"anyOf": []}}
for exclusion in exclusions:
dependencies["dependencies"][prop]["not"]["anyOf"].append(
{"required": exclusion}
)

return dependencies


_keywords = {
"requiredXor": required_xor,
"dependentExcluded": dependent_excluded,
}


def _find_keywords(schema: Any) -> Iterator[deque[str | int]]:

if isinstance(schema, list):
for i, item in enumerate(schema):
for path in _find_keywords(item):
path.appendleft(i)
yield path
elif isinstance(schema, dict):
for key, value in schema.items():
if key in _keywords:
yield deque([key, value])
else:
for path in _find_keywords(value):
path.appendleft(key)
yield path


def translator(resource_type: str, region: str):
keywords = list(
_find_keywords(
PROVIDER_SCHEMA_MANAGER.get_resource_schema(
region=region, resource_type=resource_type
).schema
)
)

for keyword in keywords:
value = keyword.pop()
key = keyword.pop()
if not keyword:
path = ""
else:
path = f"/{'/'.join(str(k) for k in keyword)}"

patch = [
{
"op": "add",
"path": f"{path}/allOf",
"value": [],
}
]

logger.info(f"Patch {resource_type} add allOf for {key}")
PROVIDER_SCHEMA_MANAGER._schemas[region][resource_type].patch(patches=patch)

patch = [
{
"op": "remove",
"path": f"{path}/{key}",
},
{"op": "add", "path": f"{path}/allOf/-", "value": _keywords[key](value)}, # type: ignore
]

logger.info(f"Patch {resource_type} replace for {key}")
PROVIDER_SCHEMA_MANAGER._schemas[region][resource_type].patch(patches=patch)
151 changes: 151 additions & 0 deletions scripts/release_schemas/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env python
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import logging
from collections import deque
from pathlib import Path

import _translator

from cfnlint.helpers import REGIONS, ToPy, format_json_string, load_plugins
from cfnlint.schema import PROVIDER_SCHEMA_MANAGER

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def _get_schema_path(schema, path):
s = schema.schema
schema_path = deque([])
while path:
key = path.popleft()
if key == "*":
schema_path.append("items")
s = s["items"]
else:
s = s["properties"][key]
schema_path.extend(["properties", key])

pointer = s.get("$ref")
if pointer:
_, s = schema.resolver.resolve(pointer)
schema_path = deque(pointer.split("/")[1:])

return schema_path


def _build_patch(path, patch):
if not path:
path_str = "/allOf"
else:
path_str = f"/{'/'.join(path)}/allOf"

return (
[
{
"op": "add",
"path": path_str,
"value": [],
}
],
[
{
"op": "add",
"path": f"{path_str}/-",
"value": patch,
}
],
)


schemas = {}

##########################
#
# Build the definitive list of all resource types across all regions
#
###########################

for region in ["us-east-1"] + list((set(REGIONS) - set(["us-east-1"]))):
for resource_type in PROVIDER_SCHEMA_MANAGER.get_resource_types(region):
if resource_type in ["AWS::CDK::Metadata", "Module"]:
continue
if resource_type not in schemas:
schemas[resource_type] = region


##########################
#
# Merge in rule schemas into the resource schemas
#
###########################

rules_folder = Path("src") / "cfnlint" / "rules"

rules = load_plugins(
rules_folder,
name="CfnLintJsonSchema",
modules=(
"cfnlint.rules.jsonschema.CfnLintJsonSchema",
"cfnlint.rules.jsonschema.CfnLintJsonSchema.CfnLintJsonSchema",
),
)

for rule in rules:
if rule.__class__.__base__ == (
"cfnlint.rules.jsonschema."
"CfnLintJsonSchemaRegional.CfnLintJsonSchemaRegional"
):
continue
if not rule.id or rule.schema == {}:
continue

for keyword in rule.keywords:
if not keyword.startswith("Resources/"):
continue
path = deque(keyword.split("/"))

if len(path) < 3:
continue

path.popleft()
resource_type = path.popleft()
resource_properties = path.popleft()
if resource_type not in schemas and resource_properties != "Properties":
continue

schema_path = _get_schema_path(
PROVIDER_SCHEMA_MANAGER.get_resource_schema(
schemas[resource_type], resource_type
),
path,
)
all_of_patch, schema_patch = _build_patch(schema_path, rule.schema)

PROVIDER_SCHEMA_MANAGER._schemas[schemas[resource_type]][resource_type].patch(
patches=all_of_patch
)
PROVIDER_SCHEMA_MANAGER._schemas[schemas[resource_type]][resource_type].patch(
patches=schema_patch
)

logger.info(f"Patch {rule.id} for {resource_type} in {schemas[resource_type]}")


for resource_type, region in schemas.items():
rt_py = ToPy(resource_type)

_translator.translator(resource_type, region)

with open(f"local/release_schemas/{rt_py.py}.json", "w") as f:
f.write(
format_json_string(
PROVIDER_SCHEMA_MANAGER.get_resource_schema(
region, resource_type
).schema
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
"Subnets": {
"minItems": 2
}
},
"requiredXor": [
"Subnets",
"SubnetMappings"
]
}
}
}

0 comments on commit 37ed272

Please sign in to comment.