Skip to content

Commit

Permalink
Merge branch 'master' into missing-allowable-types
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmlee authored Apr 24, 2020
2 parents 9a73566 + 3b706b2 commit 96925a1
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 17 deletions.
79 changes: 77 additions & 2 deletions src/cfnlint/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import fnmatch
import json
import hashlib
import os
import datetime
import logging
Expand All @@ -18,7 +19,7 @@
from cfnlint.decode.node import dict_node, list_node, str_node
from cfnlint.data import CloudSpecs
try:
from urllib.request import urlopen
from urllib.request import urlopen, Request
except ImportError:
from urllib2 import urlopen
try:
Expand Down Expand Up @@ -166,10 +167,65 @@
'AWS::Redshift::Cluster'
]

def get_metadata_filename(url):
"""Returns the filename for a metadata file associated with a remote resource"""
caching_dir = os.path.join(os.path.dirname(__file__), 'data', 'DownloadsMetadata')
encoded_url = hashlib.sha256(url.encode()).hexdigest()
metadata_filename = os.path.join(caching_dir, encoded_url + '.meta.json')

def get_url_content(url):
return metadata_filename

def url_has_newer_version(url):
"""Checks to see if a newer version of the resource at the URL is available
Always returns true if using Python2.7 due to lack of HEAD request support,
or if we have no caching information for the local version of the resource
"""
metadata_filename = get_metadata_filename(url)

# Load in the cache
metadata = load_metadata(metadata_filename)

# Etag is a caching identifier used by S3 and Cloudfront
if 'etag' in metadata:
cached_etag = metadata['etag']
else:
# If we don't know the etag of the local version, we should force an update
return True

# Need to wrap this in a try, as URLLib2 in Python2 doesn't support HEAD requests
try:
# Make an initial HEAD request
req = Request(url, method='HEAD')
res = urlopen(req)

except NameError:
# We should force an update
return True

# If we have an ETag value stored and it matches the returned one,
# then we already have a copy of the most recent version of the
# resource, so don't bother fetching it again
if cached_etag and res.info().get('ETag') and cached_etag == res.info().get('ETag'):
LOGGER.debug('We already have a cached version of url %s with ETag value of %s', url, cached_etag)
return False

# The ETag value of the remote resource does not match the local one, so a newer version is available
return True

def get_url_content(url, caching=False):
"""Get the contents of a spec file"""

res = urlopen(url)

if caching and res.info().get('ETag'):
metadata_filename = get_metadata_filename(url)
# Load in all existing values
metadata = load_metadata(metadata_filename)
metadata['etag'] = res.info().get('ETag')
metadata['url'] = url # To make it obvious which url the Tag relates to
save_metadata(metadata, metadata_filename)

# Continue to handle the file download normally
if res.info().get('Content-Encoding') == 'gzip':
buf = BytesIO(res.read())
f = gzip.GzipFile(fileobj=buf)
Expand All @@ -180,6 +236,25 @@ def get_url_content(url):
return content


def load_metadata(filename):
"""Get the contents of the download metadata file"""
metadata = {}
if os.path.exists(filename):
with open(filename, 'r') as metadata_file:
metadata = json.load(metadata_file)
return metadata


def save_metadata(metadata, filename):
"""Save the contents of the download metadata file"""
dirname = os.path.dirname(filename)
if not os.path.exists(dirname):
os.mkdir(dirname)

with open(filename, 'w') as metadata_file:
json.dump(metadata, metadata_file)


def load_resource(package, filename='us-east-1.json'):
"""Load CloudSpec resources
:param filename: filename to load
Expand Down
46 changes: 35 additions & 11 deletions src/cfnlint/maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import fnmatch
import json
import logging
import multiprocessing
import os
import jsonpointer
import jsonpatch
import cfnlint
from cfnlint.helpers import get_url_content
from cfnlint.helpers import get_url_content, url_has_newer_version
from cfnlint.helpers import SPEC_REGIONS
import cfnlint.data.ExtendedSpecs
import cfnlint.data.AdditionalSpecs
Expand All @@ -21,19 +22,42 @@
def update_resource_specs():
""" Update Resource Specs """

for region, url in SPEC_REGIONS.items():
filename = os.path.join(os.path.dirname(cfnlint.__file__),
'data/CloudSpecs/%s.json' % region)
LOGGER.debug('Downloading template %s into %s', url, filename)
spec = json.loads(get_url_content(url))
# Pool() uses cpu count if no number of processors is specified
# Pool() only implements the Context Manager protocol from Python3.3 onwards,
# so it will fail Python2.7 style linting, as well as throw AttributeError
try:
# pylint: disable=not-context-manager
with multiprocessing.Pool() as pool:
pool.starmap(update_resource_spec, SPEC_REGIONS.items())
except AttributeError:

# Patch the files
spec = patch_spec(spec, 'all')
spec = patch_spec(spec, region)
# Do it the long, slow way
for region, url in SPEC_REGIONS.items():
update_resource_spec(region, url)

with open(filename, 'w') as f:
json.dump(spec, f, indent=2, sort_keys=True, separators=(',', ': '))
def update_resource_spec(region, url):
""" Update a single resource spec """
filename = os.path.join(os.path.dirname(cfnlint.__file__), 'data/CloudSpecs/%s.json' % region)

multiprocessing_logger = multiprocessing.log_to_stderr()

multiprocessing_logger.debug('Downloading template %s into %s', url, filename)

# Check to see if we already have the latest version, and if so stop
if not url_has_newer_version(url):
return

spec_content = get_url_content(url, caching=True)

multiprocessing_logger.debug('A more recent version of %s was found, and will be downloaded to %s', url, filename)
spec = json.loads(spec_content)

# Patch the files
spec = patch_spec(spec, 'all')
spec = patch_spec(spec, region)

with open(filename, 'w') as f:
json.dump(spec, f, indent=2, sort_keys=True, separators=(',', ': '))

def update_documentation(rules):
"""Generate documentation"""
Expand Down
75 changes: 75 additions & 0 deletions test/unit/module/helpers/test_downloads_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import sys
import os
from test.testlib.testcase import BaseTestCase
from mock import patch, MagicMock, mock_open
import cfnlint.helpers
import json


class TestDownloadsMetadata(BaseTestCase):
"""Test Downloads Metadata"""

@patch('cfnlint.helpers.os.path.exists')
def test_no_file(self, mock_path_exists):
"""Test success run"""

mock_path_exists.return_value = False

filename = 'foo.bar'

result = cfnlint.helpers.load_metadata(filename)

self.assertEqual(result, {})

@patch('cfnlint.helpers.os.path.exists')
def test_load_metadata(self, mock_path_exists):
"""Test success run"""

mock_path_exists.return_value = True

filename = 'foo.bar'
file_contents = {
'etag': 'foo'
}

if sys.version_info.major == 3:
builtin_module_name = 'builtins'
else:
builtin_module_name = '__builtin__'

mo = mock_open(read_data=json.dumps(file_contents))
with patch('{}.open'.format(builtin_module_name), mo) as mock_builtin_open:
result = cfnlint.helpers.load_metadata(filename)

self.assertEqual(result, file_contents)

@patch('cfnlint.helpers.os.path.exists')
@patch('cfnlint.helpers.os.path.dirname')
@patch('cfnlint.helpers.os.mkdir')
@patch('cfnlint.helpers.json.dump')
def test_save_download_metadata(self, mock_json_dump, mock_mkdir, mock_dirname, mock_path_exists):
"""Test success run"""

filename = 'foo.bar'
filedir = 'foobardir'
file_contents = {
'etag': 'foo'
}

mock_path_exists.return_value = False
mock_dirname.return_value = filedir

if sys.version_info.major == 3:
builtin_module_name = 'builtins'
else:
builtin_module_name = '__builtin__'

mo = mock_open()
with patch('{}.open'.format(builtin_module_name), mo) as mock_builtin_open:
cfnlint.helpers.save_metadata(file_contents, filename)
mock_mkdir.assert_called_with(filedir)
mock_json_dump.assert_called_once
92 changes: 92 additions & 0 deletions test/unit/module/helpers/test_get_url_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,95 @@ def test_get_url_content_zipped(self, mocked_urlopen):
result = cfnlint.helpers.get_url_content(url)
mocked_urlopen.assert_called_with(url)
self.assertEqual(result, '{"key": "value"}')

@patch('cfnlint.helpers.urlopen')
@patch('cfnlint.helpers.load_metadata')
@patch('cfnlint.helpers.save_metadata')
def test_get_url_content_zipped_cache_update(self, mock_save_metadata, mock_load_metadata, mocked_urlopen):
"""Test success run"""
input_buffer = '{"key": "value"}'
etag = 'ETAG_ONE'
url = 'http://foo.com'

mock_load_metadata.return_value = {}

cm = MagicMock()
cm.getcode.return_value = 200
cm.info.return_value = {
'Content-Encoding': 'gzip',
'ETag': etag
}

if sys.version_info.major == 3:
cm.read.return_value = gzip.compress(input_buffer.encode('utf-8'))
else:
string_buffer = StringIO.StringIO()
gzip_file = gzip.GzipFile(fileobj=string_buffer, mode='w', compresslevel=6)
gzip_file.write(input_buffer)
gzip_file.close()
cm.read.return_value = string_buffer.getvalue()

mocked_urlopen.return_value = cm

result = cfnlint.helpers.get_url_content(url, caching=True)
mocked_urlopen.assert_called_with(url)
mock_load_metadata.assert_called_once()
mock_save_metadata.assert_called_once()

self.assertEqual(result, '{"key": "value"}')

@patch('cfnlint.helpers.urlopen')
@patch('cfnlint.helpers.load_metadata')
def test_url_has_newer_version_affirmative(self, mock_load_metadata, mocked_urlopen):
"""Test success run"""

input_buffer = '{"key": "value"}'
etag = 'ETAG_ONE'
url = 'http://foo.com'

mock_load_metadata.return_value = {
'etag': etag
}

cm = MagicMock()
cm.getcode.return_value = 200
cm.info.return_value = {
'Content-Encoding': 'gzip',
'ETag': etag
}

mocked_urlopen.return_value = cm

result = cfnlint.helpers.url_has_newer_version(url)

# Python2 does not support caching, so will always return true
if sys.version_info.major == 2:
self.assertTrue(result)
else:
self.assertFalse(result)

@patch('cfnlint.helpers.urlopen')
@patch('cfnlint.helpers.load_metadata')
def test_url_has_newer_version_negative(self, mock_load_metadata, mocked_urlopen):
"""Test success run"""

input_buffer = '{"key": "value"}'
# Generate a random ETag to test with
etag = 'ETAG_ONE'
etag2 = 'ETAG_TWO'

url = 'http://foo.com'
mock_load_metadata.return_value = {
'etag': etag
}

cm = MagicMock()
cm.getcode.return_value = 200
cm.info.return_value = {
'Content-Encoding': 'gzip',
'ETag': etag2
}
mocked_urlopen.return_value = cm

result = cfnlint.helpers.url_has_newer_version(url)
self.assertTrue(result)
Loading

0 comments on commit 96925a1

Please sign in to comment.