-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbucket.py
288 lines (207 loc) · 9.69 KB
/
bucket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import boto3
import logging
import os
import pandas as pd
import pathlib
from glob import glob
# Create Global Variables
file = "rootkey.csv"
aws_region_name = "us-east-2"
BUCKET_NAME = "new-bucket-with-website-some5"
DOWNLOADS_DIR = "downloads"
# Connect to S3 Bucket: To connect to S3 Bucket, call createSession(file=)
"""
createSession: is a function that creates an S3 session.
parameters: file - is the .csv file containing the access key id
and secret access key directly downloaded from AWS S3.
"""
def createSession(file, aws_region_name = "us-east-2"):
# Read AWS credentials from CSV file
key_file = pd.read_csv(file)
access_key_id = key_file.loc[0, "Access key ID"]
secret_access_key = key_file.loc[0, "Secret access key"]
# Create a session with the provided credentials
session = boto3.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name=aws_region_name
)
return session
# Initialize session
session = createSession(file = file)
# List buckets
def list_buckets(prefix=None):
# Create S3 resource and client using the session
s3_resource = session.resource('s3')
s3_client = session.client('s3')
try:
buckets = list(s3_resource.buckets.all())
if prefix:
# Sort buckets with those starting with the prefix first
buckets_sorted = sorted(buckets, key=lambda bucket: (not bucket.name.lower().startswith(prefix.lower()), bucket.name))
else:
buckets_sorted = buckets
for bucket in buckets_sorted:
bucket_name = bucket.name
# Get bucket location
response = s3_client.get_bucket_location(Bucket=bucket_name)
bucket_location = response["LocationConstraint"]
print(f"Bucket: {bucket_name}, Location: {bucket_location}")
except Exception as e:
print("Error Listing Buckets")
# Create s3 Bucket:
def createBucket(bucket_name, aws_region_name = "us-east-2"):
s3_resource = session.resource("s3")
s3_client = session.client("s3")
print(f"\n***Creating New Bucket: {bucket_name}")
# Get s3 resource bucket
bucket = s3_resource.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration=
{
'LocationConstraint': aws_region_name
}
)
bucket.wait_until_exists()
print(f"\n****Bucket Created. List Buckets:\n")
list_buckets()
# Deleting s3 Bucket
def deleteBucket(bucket_name):
s3_resource = session.resource("s3")
# Get s3 resource bucket
bucket = s3_resource.Bucket(bucket_name)
print(f"****Deleting Bucket: {bucket}...")
# Delete all versions in the bucket
for version in bucket.object_versions.all():
version.delete()
# Call the delete()
bucket.delete()
bucket.wait_until_not_exists()
print("****\nBucket Deleted. Remaining Buckets: \n")
list_buckets()
# Bucket Versioning Support for AWS S3 Buckets:
def enable_Bucket_Versioning_Support(bucket_name):
s3_resource = session.resource("s3")
bucket_versioning = s3_resource.BucketVersioning(bucket_name) # create the bucket_versioning object
bucket_versioning.enable() # enable bucket verisoning
print(f"\n****Bucket versioning support: {bucket_versioning.status}")
# Uploading files and folders to S3 Bucket
def uploadFiles(bucket_name, *file_paths):
try:
s3_resource = session.resource("s3")
for path in file_paths:
if os.path.isdir(path):
root_path = path # local folder for upload
my_bucket = s3_resource.Bucket(bucket_name)
for root, subdirs, files in os.walk(root_path):
folder_name = os.path.basename(root_path)
relative_root = os.path.relpath(root, root_path)
s3_folder_path = os.path.join(folder_name, relative_root).replace("\\", "/")
for file in files:
file_path = os.path.join(root, file)
s3_key = os.path.join(s3_folder_path, file).replace("\\", "/")
my_bucket.upload_file(file_path, s3_key)
print(f"Successfully uploaded {file_path} to {bucket_name}/{s3_key}")
else:
# Upload a single file
file_name = os.path.basename(path)
s3_resource.Bucket(bucket_name).upload_file(path, file_name)
print(f"Successfully uploaded {path} to {bucket_name}/{file_name}")
except Exception as e:
print(f"Failed to upload {path}: {e}")
# Write DataFrame to S3 Bucket:
def writeDataframe_to_S3(df, bucket_name):
try:
df = pd.DataFrame.to_csv(df)
uploadFiles(bucket_name, df)
except Exception as e:
print("Failed to upload DataFrame")
# List and filter objects from s3 buckets:
def listAndFilter_Bucket(bucket_name, prefix = None):
object_versions = getS3_objectVersions(bucket_name, prefix)
for object in object_versions:
print(f"-- {object.key}")
# Get s3 object versions
def getS3_objectVersions(bucket_name, prefix=None):
s3_resource = session.resource("s3")
bucket = s3_resource.Bucket(bucket_name)
if not prefix:
prefix = ""
objects_versions = bucket.objects.filter(Prefix=prefix)
return objects_versions
# Download S3 objects to local machine
def downloadS3_Objects(bucket_name, prefix=None):
try:
object_versions = getS3_objectVersions(bucket_name=bucket_name, prefix=prefix)
for obj_version in object_versions:
obj = obj_version.Object()
obj_key = obj.key # stores the files in the s3 bucket
# s3-subpath is a list of paths
s3_subpath = obj_key.split("/")[:-1] # take the rest of the path's list except the file name.
s3_file_name = obj_key.split("/")[-1] # Get the file name
local_subfolder = os.path.join(DOWNLOADS_DIR, *s3_subpath) # Get the files from the s3 subpath.
if not os.path.exists(local_subfolder):
os.makedirs(local_subfolder) # Create the local subfolder if it does not exist in the local machine.
obj.download_file(os.path.join(local_subfolder, s3_file_name)) # downloads the files to the local_subfolder
print(f"Successfully downloaded {obj_key} to {local_subfolder}/{s3_file_name}")
except Exception as e:
print(f"Failed to download objects: {e}")
# Delete Objects and their versios from S3 Buckets
def deleteObjects(bucket_name, prefix = None):
s3_resource = session.resource("s3")
bucket = s3_resource.Bucket(bucket_name)
# To delete the object...
if prefix:
bucket.objects.filter(Prefix = prefix).delete()
else:
bucket.objects.delete()
# Delete Object Version
def deleteObjects_Versions(bucket_name, prefix = None):
s3_resource = session.resource("s3")
bucket = s3_resource.Bucket(bucket_name)
# To delete the object versions...
if prefix:
bucket.object_versions.filter(Prefix = prefix).delete()
else:
bucket.object_versions.delete()
# Empty S3 Bucket, i.e delete objects in s3 bucket
def emptyS3Bucket(bucket_name, prefix = None):
deleteObjects(bucket_name, prefix)
deleteObjects_Versions(bucket_name, prefix)
# Copying and Moving Objects Within the AWS S3 Bucket
def copyingAndMovingObjects(bucket_name, source_object, destination_object):
# Create S3 resource and client using the session
s3_resource = session.resource('s3')
try:
s3_resource = session.resource("s3")
bucket = s3_resource.Bucket(bucket_name)
# Copy object to destination
copy_source = {'Bucket': bucket_name, 'Key': source_object}
s3_resource.Object(bucket_name, destination_object).copy(copy_source)
print(f"Successfully copied {source_object} to {destination_object}")
# Optionally, delete the original object if you want to move it
bucket.Object(source_object).delete()
print(f"Successfully moved {source_object} to {destination_object}")
except Exception as e:
print(f"Failed to copy and move {source_object}: {e}")
# Generating pre-signed URLs for Objects in S3 Buckets:
def generatePreSignedURL(bucket_name, object_key):
s3_client = session.client("s3") # allows us to generate pre-signed URLs
url = s3_client.generate_presigned_url(
ClientMethod = "get_object",
Params = {"Bucket": bucket_name, "Key": object_key},
ExpiresIn=3600
) # This will generate the URL
# To download from the URL in powershell terminal, run:
"""
curl -o "object_key_or_file_name" "your_link"
"""
print(f"Download URL: {url}")
if __name__ == "__main__":
listAndFilter_Bucket(bucket_name=BUCKET_NAME)
# Example usage:
copyingAndMovingObjects(bucket_name=BUCKET_NAME,
source_object="Degree_Plan_and_Flowchart/./Data Science.BS.24flowchart.pdf",
destination_object="new_data.csv")
listAndFilter_Bucket(bucket_name=BUCKET_NAME)
generatePreSignedURL(bucket_name=BUCKET_NAME, object_key="Data Science.BS.24flowchart.pdf")