-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
255 lines (227 loc) · 8.22 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from ast import parse
import logging
import argparse
import sys
import os
from time import localtime, strftime
from pathlib import Path
import re
import inotify.adapters
import boto3
import requests
PICTURES_FOLDER_REGEX=r"^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
RESYNC_INTERVAL_COUNT=100
class Synchronizer:
def __init__(self, base_folder, tracker_file, bucket_name, bucket_prefix, aws_profile, s3_endpoint_url, server_refresh_url):
self.base_folder = base_folder
self.tracker_file = tracker_file
self.bucket_name = bucket_name
self.bucket_prefix = bucket_prefix
self.aws_profile = aws_profile
self.s3_endpoint_url = s3_endpoint_url
self.server_refresh_url = server_refresh_url
self._init_tracker()
self.synced_files = self._get_already_sync()
self._s3_connect()
def _init_tracker(self):
"""
Initialize the tracker file if not exist
"""
if not os.path.exists(self.tracker_file):
with open(self.tracker_file, "w") as fd:
fd.write("")
def _update_tracker(self, new_uploads):
"""
Update the tracker file
"""
self.synced_files += new_uploads
with open(self.tracker_file, "w") as fd:
fd.write("\n".join(self.synced_files))
def _s3_connect(self):
"""
Initialize an S3 session
"""
self.s3 = boto3.session.Session(
profile_name=self.aws_profile
).resource(
's3',
endpoint_url=self.s3_endpoint_url
)
def _get_already_sync(self):
"""
Get list of files already synced to S3
"""
with open(self.tracker_file, "r") as fd:
files = fd.read().splitlines()
return files
def _get_new_files(self, pics_folder):
"""
List all files that have not already been synced to S3 from a specific folder
"""
# Get list of files locally
folder_files = sorted(os.listdir(pics_folder))
# Keep only ".jpg" files
filtered_files = [file for file in folder_files if Path(file).suffix == '.jpg']
# Diff between synced and local files
new_files = list(set(filtered_files) - set(self.synced_files))
return new_files
def _send_file(self, file_path):
"""
Send a file to S3 bucket.
"""
# Construct object path
obj_name = os.path.join(self.bucket_prefix, Path(file_path).name)
logging.info('Sending picture %s to bucket %s', obj_name, self.bucket_name)
# Upload to S3
self.s3.meta.client.upload_file(file_path, self.bucket_name, obj_name)
def _trigger_server_refresh(self):
"""
Refresh server
"""
try:
requests.post(self.server_refresh_url)
except Exception as err:
logging.error("Unable to refresh backend server : ", err)
def sync(self, pics_folder):
"""
Synchronize a local folder with S3
"""
new_files = self._get_new_files(pics_folder)
# Track successful uploads
successful_uploads = []
# Loop on files to upload
for file in new_files:
try:
src_path = os.path.join(pics_folder, file)
self._send_file(src_path)
except Exception as err:
logging.error("Unable to send picture " + str(src_path) + " to S3 : ", err)
else:
successful_uploads.append(file)
# Trigger a refresh server side
self._trigger_server_refresh()
# Update the synced file list
self._update_tracker(successful_uploads)
def run(self):
"""
Start watching for new files on the local folder to sync
"""
# Watch photobooth folder
watcher = inotify.adapters.InotifyTree(self.base_folder)
# Count files to trigger a full resync sometimes
file_counter = 0
processed_folders = []
# Loop on events
for event in watcher.event_gen():
if event is not None:
(header, type_names, watch_path, filename) = event
# Only send file when closed, in folder matching the pattern and with jpg extension
if "IN_CLOSE_WRITE" in type_names and \
bool(re.match(PICTURES_FOLDER_REGEX, Path(watch_path).name)) and \
Path(filename).suffix == '.jpg':
# Get full file path
src_path = os.path.join(watch_path, filename)
try:
# Send
self._send_file(src_path)
except Exception as err:
logging.error("Unable to send picture " + str(src_path) + " to S3 : ", err)
else:
# Trigger a refresh server side if it's not a shot picture
if 'shot' not in str(Path(filename)):
self._trigger_server_refresh()
# Add the file to the synced list
self._update_tracker([filename])
# Update counter and folder tracker
file_counter += 1
if watch_path not in processed_folders:
processed_folders.append(watch_path)
# After some file processings, trigger a resync
if file_counter >= RESYNC_INTERVAL_COUNT:
file_counter = 0
for folder in processed_folders:
self.sync(folder)
def parse_args(argv):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'--debug',
action='store_true',
help='enable additional debug output'
)
parser.add_argument(
'--photobooth-folder',
help="Base folder where photobooth is running",
default="/home/photobooth/photobooth"
)
#!!!!!! Do not use sync when you have files from another event on the same date !!!
parser.add_argument(
'--folder-resync',
help='Force resync of a folder (value must be folder full path)',
default=None
)
parser.add_argument(
'--tracker-file',
help="File used to track already synced pictures with S3",
default="synced-files.txt"
)
parser.add_argument(
'--bucket-name',
help="Name of S3 bucket to sync with",
default="photobooth"
)
parser.add_argument(
'--bucket-prefix',
help="Prefix for uploaded files inside bucket",
default="input/"
)
parser.add_argument(
'--aws-profile',
help="AWS profile to use for S3",
default="default"
)
parser.add_argument(
'--s3-endpoint-url',
help="Endpoint URL for S3",
default="https://s3.fr-par.scw.cloud"
)
parser.add_argument(
'--server-refresh-url',
help="Endpoint URL for server refresh",
default="https://zalex.fr/refresh"
)
return parser.parse_known_args()
def main(argv):
# Parse command line arguments
parsed_args, unparsed_args = parse_args(argv)
argv = argv[:1] + unparsed_args
# Setup log level and format
if parsed_args.debug:
log_level = logging.DEBUG
else:
log_level = logging.INFO
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y/%m/%d %H:%M:%S',
level=log_level
)
# Check arguments
if parsed_args.photobooth_folder is None and parsed_args.folder_resync is None:
logging.error("At least one of --photobooth-folder or --folder-resync argument is required")
sys.exit(-1)
synchronizer = Synchronizer(
parsed_args.photobooth_folder,
parsed_args.tracker_file,
parsed_args.bucket_name,
parsed_args.bucket_prefix,
parsed_args.aws_profile,
parsed_args.s3_endpoint_url,
parsed_args.server_refresh_url
)
if parsed_args.folder_resync is not None:
synchronizer.sync(parsed_args.folder_resync)
else:
synchronizer.run()
if __name__ == '__main__':
main(sys.argv)