forked from aws-samples/awscli-profile-credential-helpers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaws-refresh-credentials
executable file
·152 lines (119 loc) · 6.05 KB
/
aws-refresh-credentials
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python3
import configparser
import datetime
import json
import os
import pathlib
import subprocess
import sys
import time
profile_filter = sys.argv[1:]
profile_variable = os.environ.get('AWS_PROFILE')
if profile_variable:
profile_filter.append(profile_variable)
home_folder = pathlib.Path.home()
midway_cookie_filename = f'{home_folder}/.midway/cookie'
cli_cache_folder = f'{home_folder}/.aws/cli/cache'
sso_cache_folder = f'{home_folder}/.aws/sso/cache'
aws_config_filename = f'{home_folder}/.aws/config'
aws_credentials_filename = f'{home_folder}/.aws/credentials'
def get_config(config_filename):
config = configparser.ConfigParser()
config.read(config_filename)
return config
def get_profile(section):
profile = section.replace('profile ', '')
okta_profile = aws_config.get(section, 'okta_profile', fallback=None)
okta_account_id = aws_config.get(section, 'okta_account_id', fallback=None)
okta_role_name = aws_config.get(section, 'okta_role_name', fallback=None)
sso_account_id = aws_config.get(section, 'sso_account_id', fallback=None)
sso_role_name = aws_config.get(section, 'sso_role_name', fallback=None)
sso_start_url = aws_config.get(section, 'sso_start_url', fallback=None)
source_profile = aws_config.get(section, 'source_profile', fallback=None)
role_arn = aws_config.get(section, 'role_arn', fallback=None)
if okta_profile and okta_account_id and okta_role_name:
return get_okta_credentials, profile, okta_account_id, okta_role_name, okta_profile
if sso_account_id and sso_role_name and sso_start_url:
return get_sso_credentials, profile, sso_account_id, sso_role_name
if source_profile and role_arn:
arn_components = role_arn.split(':')
account_id = arn_components[4]
role_name = arn_components[5].replace('role/', '')
return get_vault_credentials, profile, account_id, role_name, source_profile
def run_sso_refresh_command(profile):
command = ('aws', 'sts', '--profile', profile, 'get-caller-identity')
subprocess.run(command, stdout=subprocess.PIPE, check=True)
def run_sso_login_command(profile):
command = ('aws', 'sso', '--profile', profile, 'login')
subprocess.run(command, stdout=subprocess.PIPE, check=True)
def get_okta_credentials(profile, account, role, extra):
okta_profile = extra
role_arn = f'arn:aws:iam::{account}:role/{role}'
command = ('gimme-aws-creds', '--profile', okta_profile, '--roles', role_arn)
output = subprocess.run(command, stdout=subprocess.PIPE, check=True)
output_credentials = json.loads(output.stdout)['credentials']
expiration = datetime.datetime.utcnow() + datetime.timedelta(hours=1)
credentials = {
'accessKeyId': output_credentials['aws_access_key_id'],
'secretAccessKey': output_credentials['aws_secret_access_key'],
'sessionToken': output_credentials['aws_session_token'],
'expiration': expiration.isoformat() + 'Z'
}
return [(profile, credentials)]
def get_sso_credentials(profile, account, role, extra):
try:
run_sso_refresh_command(profile)
except subprocess.CalledProcessError:
run_sso_login_command(profile)
run_sso_refresh_command(profile)
time.sleep(2.0)
cache_filenames = (os.path.join(cli_cache_folder, f) for f in os.listdir(cli_cache_folder))
cache_credentials = [json.load(open(f)) for f in cache_filenames]
newest_credentials = sorted(cache_credentials, key=lambda c: c['Credentials']['Expiration'])[-1]
credentials = {
'accessKeyId': newest_credentials['Credentials']['AccessKeyId'],
'secretAccessKey': newest_credentials['Credentials']['SecretAccessKey'],
'sessionToken': newest_credentials['Credentials']['SessionToken'],
'expiration': newest_credentials['Credentials']['Expiration'].replace('UTC', 'Z'),
}
return [(profile, credentials)]
def get_vault_credentials(profile, account, role, extra):
source_profile = extra
command = ('aws-vault', 'exec', source_profile, 'env')
vault_process = subprocess.run(command, stdout=subprocess.PIPE, encoding='utf-8')
split_lines = (l.strip().split('=') for l in vault_process.stdout.split())
variable_lines = ((l[0], '='.join(l[1:])) for l in split_lines)
aws_variable_lines = (l for l in variable_lines if l and l[0].startswith('AWS'))
variables = dict(aws_variable_lines)
expiration = datetime.datetime.utcnow() + datetime.timedelta(hours=12)
credentials = {
'accessKeyId': variables['AWS_ACCESS_KEY_ID'],
'secretAccessKey': variables['AWS_SECRET_ACCESS_KEY'],
'sessionToken': variables['AWS_SESSION_TOKEN'],
'expiration': expiration.isoformat() + 'Z'
}
return [(source_profile, credentials), (profile, credentials)]
def get_credentials(process, profile, account, role, extra=None):
print(f'Getting credentials for profile {profile} (arn:aws:iam::{account}:role/{role})')
return process(profile, account, role, extra)
def set_credentials(profile, credentials):
if not aws_credentials.has_section(profile):
aws_credentials.add_section(profile)
aws_credentials.set(profile, 'aws_access_key_id', credentials['accessKeyId'])
aws_credentials.set(profile, 'aws_secret_access_key', credentials['secretAccessKey'])
aws_credentials.set(profile, 'aws_session_token', credentials['sessionToken'])
aws_credentials.set(profile, 'expires', credentials['expiration'])
print(f'Saved credentials for profile {profile}')
def process_profile(params):
profile_credential_list = get_credentials(*params)
for profile, credentials in profile_credential_list:
set_credentials(profile, credentials)
aws_config = get_config(aws_config_filename)
aws_credentials = get_config(aws_credentials_filename)
profiles = (get_profile(s) for s in aws_config.sections())
profiles = (p for p in profiles if p)
profiles = (p for p in profiles if p[1] in profile_filter or not profile_filter)
for profile in profiles:
process_profile(profile)
with open(aws_credentials_filename, 'w') as credentials_file:
aws_credentials.write(credentials_file)