-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathmysql_s3.py
80 lines (60 loc) · 1.86 KB
/
mysql_s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import mysql.connector
import decimal
import datetime
import boto3
sql = "SELECT * FROM customer_features.v_features WHERE record_type = 'events'"
s3_bucket = 's3-import-demo'
s3_path = 'demo/'
region = 'us-east-2'
items_per_file = 5
def main():
mydb = mysql.connector.connect(
host="my-endpoint-host.us-east-1.rds.amazonaws.com",
user="admin",
password="mriA6p5M7eH"
)
cur = mydb.cursor(buffered=True, dictionary=True)
cur.execute(sql)
res = cur.fetchall()
rowcount = 0
filetext = ''
for row in res:
if rowcount % items_per_file == 0 and rowcount > 0:
write_s3(s3_bucket, s3_path, f'data_upto_{rowcount}.json', filetext)
filetext = ''
rowcount += 1
rowtext = '{"Item":{'
for key in row:
if row[key] is not None:
rowtext += parse_attr(key, row[key]) + ','
rowtext = rowtext[:-1] + '}}'
filetext += rowtext + '\n'
write_s3(s3_bucket, s3_path, f'data_upto_{rowcount}.json', filetext)
def write_s3(bucket, path, objname, obj):
client = boto3.client('s3', region_name=region)
fullpath = path + objname
res = client.put_object(
Body=obj,
Bucket=bucket,
Key=fullpath,
ACL='public-read')
print(f'HTTP {res["ResponseMetadata"]["HTTPStatusCode"]} for S3 object s3://{bucket}/{path}{objname}')
return 'ok'
def parse_attr(key, value):
rtype = 'S'
rvalue = ''
if isinstance(value, int):
rvalue = str(value)
rtype = 'N'
elif isinstance(value, decimal.Decimal):
rvalue = str(value)
rtype = 'N'
elif isinstance(value, datetime.datetime):
rvalue = str(value)
rtype = 'S'
else:
rvalue = value
rtype = 'S'
return '"' + key + '":{"' + rtype + '":"' + rvalue + '"}'
if __name__ == "__main__":
main()