-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3.py
78 lines (61 loc) · 2.1 KB
/
s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import boto3
from datetime import datetime
from os import path
import re
class S3UploadType:
GENERIC_ASSET = "asset"
DB_DUMP = "database"
FILE_DUMP = "files"
def __init__(self):
raise Exception("Enum class should not be instantiated.")
class S3RotationPolicy:
def __init__(self, month=False, weekday=False, day=False, hour=False, glue="_"):
self.glue = glue
self.month = month
self.weekday = weekday
self.day = day
self.hour = hour
@classmethod
def weekly(self):
return S3RotationPolicy(weekday=True)
def path_marker(self):
parts = []
now = datetime.now()
if self.month:
parts.append(now.strftime('%b'))
if self.weekday:
parts.append(now.strftime('%a'))
if self.day:
parts.append(now.strftime('%d'))
if self.hour:
parts.append(now.strftime('%H'))
return self.glue.join(parts).lower()
class S3Handler:
def __init__(self, bucket_name):
"""
:type bucket_name: string
"""
self.bucket_name = bucket_name
self.s3 = boto3.resource('s3')
def upload(self, upload_package):
"""
:type upload_package: S3UploadPackage
"""
try:
self.s3.Object(self.bucket_name, upload_package.full_destination_path()).put(
Body=open(upload_package.path, 'rb'))
print(upload_package.path + " has been uploaded.")
except Exception as e:
print("Upload failed.")
print(e.message)
class S3UploadPackage():
def __init__(self, file_path, project, upload_type=S3UploadType.GENERIC_ASSET,
rotation_policy=S3RotationPolicy.weekly()):
self.project = project
self.rotation_policy = rotation_policy
self.upload_type = upload_type
self.path = file_path
def full_destination_path(self):
file_name = path.basename(self.path)
parts = [self.project, self.upload_type, self.rotation_policy.path_marker() + "__" + file_name]
return "/".join(parts)