diff --git a/requirements.txt b/requirements.txt index dfb5267..9fa63cf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ pyinotify -boto>=2.6.0 +boto3>=1.4.7 +boto>=2.48.0 argparse python-dateutil eventlet==0.17.4 diff --git a/setup.py b/setup.py index dfb97e8..4edf1e4 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,8 @@ ], install_requires=[ 'pyinotify', - 'boto>=2.6.0', + 'boto3>=1.4.7', + 'boto>=2.48.0', 'argparse', 'python-dateutil', ], diff --git a/tablesnap b/tablesnap index 1d516e0..2b20928 100755 --- a/tablesnap +++ b/tablesnap @@ -1,23 +1,28 @@ #!/usr/bin/env python -import pyinotify -import boto +from __future__ import division import argparse -from traceback import format_exc -from threading import Thread -from Queue import Queue -import logging -import logging.handlers -import os.path -import socket +import grp +import hashlib import json -import sys +import logging.handlers import os +import os.path import pwd -import grp import re import signal -import StringIO +import socket +import subprocess +import sys +from Queue import Queue +from threading import Thread +from traceback import format_exc + +import boto +import boto3 +import pyinotify +from boto3.s3.transfer import TransferConfig, S3Transfer +from botocore.exceptions import ClientError default_log = logging.getLogger('tablesnap') if os.environ.get('TABLESNAP_SYSLOG', False): @@ -52,9 +57,22 @@ default_chunk_size = 256 # Default inotify event to listen on default_listen_events = ['IN_MOVED_TO', 'IN_CLOSE_WRITE'] + +class ProgressPercentage(object): + def __init__(self, filename, filesize, log): + self._filename = filename + self._filesize = filesize + self._seen_so_far = 0 + self.log = log + + def __call__(self, bytes_amount): + self._seen_so_far += bytes_amount + percentage = round((self._seen_so_far / self._filesize) * 100, 2) + self.log.info('Uploading {}. {} out of {} done. Percentage complete: {} %' + .format(self._filename, self._seen_so_far, self._filesize, percentage)) + class UploadHandler(pyinotify.ProcessEvent): - def my_init(self, threads=None, key=None, secret=None, token=None, region=None, - bucket_name=None, + def my_init(self, threads=None, key=None, secret=None, token=None, region=None, bucket=None, prefix=None, name=None, max_size=None, chunk_size=None, include=None, reduced_redundancy=False, @@ -64,12 +82,13 @@ class UploadHandler(pyinotify.ProcessEvent): delete_on_backup=False, log=default_log, md5_on_start=False, - retries=1): + retries=1, + with_tokens=None): self.key = key self.secret = secret self.token = token self.region = region - self.bucket_name = bucket_name + self.bucket = bucket self.prefix = prefix self.name = name or socket.getfqdn() self.keyname_separator = keyname_separator @@ -78,9 +97,15 @@ class UploadHandler(pyinotify.ProcessEvent): self.include = include self.reduced_redundancy = reduced_redundancy self.with_index = with_index - self.with_sse = with_sse + if with_sse == 'AES256': + self.sse = 'AES256' + self.kmskey = None + else: + self.sse = 'aws:kms' + self.kmskey = with_sse self.delete_on_backup = delete_on_backup self.md5_on_start = md5_on_start + self.with_tokens = with_tokens if max_size: self.max_size = max_size * 2**20 @@ -89,8 +114,9 @@ class UploadHandler(pyinotify.ProcessEvent): if chunk_size: self.chunk_size = chunk_size * 2**20 - else: - self.chunk_size = None + + if with_tokens: + self.cassandra_tokens = [] self.fileq = Queue() for i in range(int(threads)): @@ -109,28 +135,25 @@ class UploadHandler(pyinotify.ProcessEvent): else: self.log.info('Skipping %s due to exclusion rule' % filename) - def get_bucket(self): + def get_s3(self): # Reconnect to S3 if self.token: - s3 = boto.s3.connect_to_region(self.region, - aws_access_key_id=self.key, - aws_secret_access_key=self.secret, - security_token=self.token) + s3 = boto3.client('s3', aws_access_key_id=self.key, + aws_secret_access_key=self.secret, + security_token=self.token) else: - s3 = boto.s3.connect_to_region(self.region, - aws_access_key_id=self.key, - aws_secret_access_key=self.secret) - - return s3.get_bucket(self.bucket_name) + s3 = boto3.client('s3', aws_access_key_id=self.key, + aws_secret_access_key=self.secret) + return s3 def worker(self): - bucket = self.get_bucket() + s3 = self.get_s3() while True: f = self.fileq.get() keyname = self.build_keyname(f) try: - self.upload_sstable(bucket, keyname, f) + self.upload_sstable(s3, self.bucket, keyname, f) except: self.log.critical("Failed uploading %s. Aborting.\n%s" % (f, format_exc())) @@ -153,28 +176,28 @@ class UploadHandler(pyinotify.ProcessEvent): # exist, but that the MD5 sum is the same -- this protects against # partial or corrupt uploads. IF you enable md5 at start # - def key_exists(self, bucket, keyname, filename, stat): + + def key_exists(self, s3, bucket, keyname, filename, stat): key = None for r in range(self.retries): try: - key = bucket.get_key(keyname) - if key == None: + key = s3.head_object(Bucket=bucket, Key=keyname) + self.log.debug('Found key %s' % (keyname,)) + break + except ClientError as e: + if e.response['Error']['Code'] == '404': self.log.debug('Key %s does not exist' % (keyname,)) return False - else: - self.log.debug('Found key %s' % (keyname,)) - break - except: - bucket = self.get_bucket() + s3 = self.get_s3() continue - if key == None: + if key is None: self.log.critical("Failed to lookup keyname %s after %d" " retries\n%s" % (keyname, self.retries, format_exc())) raise - if key.size != stat.st_size: + if key['ContentLength'] != stat.st_size: self.log.warning('ATTENTION: your source (%s) and target (%s) ' 'sizes differ, you should take a look. As immutable files ' 'never change, one must assume the local file got corrupted ' @@ -188,7 +211,7 @@ class UploadHandler(pyinotify.ProcessEvent): else: # Compute MD5 sum of file try: - fp = open(filename, "r") + etag = calculate_s3_etag(filename, self.chunk_size) except IOError as (errno, strerror): if errno == 2: # The file was removed, return True to skip this file. @@ -198,25 +221,13 @@ class UploadHandler(pyinotify.ProcessEvent): (filename, strerror, format_exc(),)) raise - md5 = key.compute_md5(fp) - fp.close() - self.log.debug('Computed md5: %s' % (md5,)) + self.log.debug('Computed etag: %s' % (etag,)) - meta = key.get_metadata('md5sum') + meta = key['ETag'][1:-1] - if meta: - self.log.debug('MD5 metadata comparison: %s == %s? : %s' % - (md5[0], meta, (md5[0] == meta))) - result = (md5[0] == meta) - else: - self.log.debug('ETag comparison: %s == %s? : %s' % - (md5[0], key.etag.strip('"'), - (md5[0] == key.etag.strip('"')))) - result = (md5[0] == key.etag.strip('"')) - if result: - self.log.debug('Setting missing md5sum metadata for %s' % - (keyname,)) - key.set_metadata('md5sum', md5[0]) + self.log.debug('MD5 metadata comparison: %s == %s? : %s' % + (etag, meta, (etag == meta))) + result = (etag == meta) if result: self.log.info("Keyname %s already exists, skipping upload" @@ -231,47 +242,55 @@ class UploadHandler(pyinotify.ProcessEvent): return result - def get_free_memory_in_kb(self): - f = open('/proc/meminfo', 'r') - memlines = f.readlines() - f.close() - lines = [] - for line in memlines: - ml = line.rstrip(' kB\n').split(':') - lines.append((ml[0], int(ml[1].strip()))) - d = dict(lines) - return d['Cached'] + d['MemFree'] + d['Buffers'] - - def split_sstable(self, filename): - free = self.get_free_memory_in_kb() * 1024 - self.log.debug('Free memory check: %d < %d ? : %s' % - (free, self.chunk_size, (free < self.chunk_size))) - if free < self.chunk_size: - self.log.warn('Your system is low on memory, ' - 'reading in smaller chunks') - chunk_size = free / 20 - else: - chunk_size = self.chunk_size - self.log.debug('Reading %s in %d byte sized chunks' % - (filename, chunk_size)) - f = open(filename, 'rb') - while True: - chunk = f.read(chunk_size) - if chunk: - yield StringIO.StringIO(chunk) - else: - break - if f and not f.closed: - f.close() - def should_create_index(self, filename): """ Should we upload a JSON index for this file? Cassandra SSTables are composed of six files, so only generate a JSON file when we upload a .Data.db file. """ return ("Data.db" in filename) - def upload_sstable(self, bucket, keyname, filename): + def upload_utility_file(self, s3, bucket, key, body): + for r in range(self.retries): + try: + if self.kmskey: + s3.put_object(Bucket=bucket, + Key=key, + Body=body, + ServerSideEncryption=self.sse, + StorageClass='REDUCED_REDUNDANCY' if self.reduced_redundancy else 'STANDARD', + SSEKMSKeyId=self.kmskey) + else: + s3.put_object(Bucket=bucket, + Key=key, + Body=body, + ServerSideEncryption=self.sse, + StorageClass='REDUCED_REDUNDANCY' if self.reduced_redundancy else 'STANDARD') + self.log.info("Uploaded %s" % key) + except: + if r == self.retries - 1: + self.log.critical("Failed to upload key %s" % key) + raise + s3 = self.get_s3() + continue + + def generate_tokens(self, ip): + """Generate a list of tokens for the current node. + """ + if self.cassandra_tokens: + return self.cassandra_tokens + tokens = [] + try: + p = subprocess.Popen(['nodetool', 'ring'], stdout=subprocess.PIPE) + for line in p.stdout: + if ip in line: + tokens.append(line.split()[7]) + self.log.info('Generated tokens file') + self.cassandra_tokens = ','.join(tokens) + return self.cassandra_tokens + except: + self.log.warn('Unable to obtain tokens from Cassandra.') + return tokens + def upload_sstable(self, s3, bucket, keyname, filename): # Include the file system metadata so that we have the # option of using it to restore the file modes correctly. # @@ -281,17 +300,15 @@ class UploadHandler(pyinotify.ProcessEvent): # File removed? return - if self.key_exists(bucket, keyname, filename, stat): + if self.key_exists(s3, bucket, keyname, filename, stat): return else: try: - fp = open(filename, 'rb') + etag = calculate_s3_etag(filename, self.chunk_size) except IOError: # File removed? return - md5 = boto.utils.compute_md5(fp) - self.log.debug('Computed md5sum before upload is: %s' % (md5,)) - fp.close() + self.log.debug('Computed etag before upload is: %s' % (etag,)) def progress(sent, total): if sent == total: @@ -304,29 +321,14 @@ class UploadHandler(pyinotify.ProcessEvent): try: dirname = os.path.dirname(filename) + listdir = [] if self.with_index and self.should_create_index(filename): - listdir = [] for listfile in os.listdir(dirname): if self.include is None or (self.include is not None and self.include(listfile)): listdir.append(listfile) json_str = json.dumps({dirname: listdir}) - for r in range(self.retries): - try: - key = bucket.new_key('%s-listdir.json' % keyname) - key.set_contents_from_string(json_str, - headers={'Content-Type': 'application/json'}, - replace=True, - reduced_redundancy=self.reduced_redundancy, - encrypt_key=self.with_sse) - break - except: - if r == self.retries - 1: - self.log.critical("Failed to upload directory " - "listing.") - raise - bucket = self.get_bucket() - continue + self.upload_utility_file(s3, bucket, '%s-listdir.json' % keyname, json_str) meta = {'uid': stat.st_uid, 'gid': stat.st_gid, @@ -353,46 +355,25 @@ class UploadHandler(pyinotify.ProcessEvent): (stat.st_size, self.max_size, (stat.st_size > self.max_size),)) if stat.st_size > self.max_size: - self.log.info('Performing multipart upload for %s' % + self.log.debug('Performing multipart upload for %s' % (filename)) - mp = bucket.initiate_multipart_upload(keyname, - reduced_redundancy=self.reduced_redundancy, - metadata={'stat': meta, 'md5sum': md5[0]}, - encrypt_key=self.with_sse) - part = 1 - chunk = None - try: - for chunk in self.split_sstable(filename): - self.log.debug('Uploading part #%d ' - '(size: %d)' % - (part, chunk.len,)) - mp.upload_part_from_file(chunk, part) - chunk.close() - part += 1 - part -= 1 - except Exception as e: - self.log.debug(e) - self.log.info('Error uploading part %d' % (part,)) - mp.cancel_upload() - if chunk: - chunk.close() - raise - self.log.debug('Uploaded %d parts, ' - 'completing upload' % (part,)) - mp.complete_upload() - progress(100, 100) - delete_if_enabled() else: self.log.debug('Performing monolithic upload') - key = bucket.new_key(keyname) - key.set_metadata('stat', meta) - key.set_metadata('md5sum', md5[0]) - key.set_contents_from_filename(filename, replace=True, - cb=progress, num_cb=1, - md5=md5, - reduced_redundancy=self.reduced_redundancy, - encrypt_key=self.with_sse) - delete_if_enabled() + config = TransferConfig(multipart_threshold=self.max_size, + multipart_chunksize=self.chunk_size) + transfer = S3Transfer(s3, config=config) + progress_cb = ProgressPercentage(filename=filename, filesize=stat.st_size, log=self.log) + extra_args = {'Metadata': { + 'stat': meta + }, + 'ServerSideEncryption': self.sse, + 'StorageClass': 'REDUCED_REDUNDANCY' if self.reduced_redundancy else 'STANDARD', + } + if self.kmskey: + extra_args['SSEKMSKeyId'] = self.kmskey + transfer.upload_file(filename=filename, bucket=bucket, key=keyname, callback=progress_cb, + extra_args=extra_args) + delete_if_enabled() break except: if not os.path.exists(filename): @@ -402,12 +383,17 @@ class UploadHandler(pyinotify.ProcessEvent): if r == self.retries - 1: self.log.critical("Failed to upload file contents.") raise - bucket = self.get_bucket() + s3 = self.get_s3() continue + if self.with_tokens is not None and self.should_create_index(keyname): + tokens = self.generate_tokens(self.with_tokens) + self.upload_utility_file(s3, bucket, self.build_keyname('/tokens.yaml'), tokens) + except: self.log.error('Error uploading %s\n%s' % (keyname, format_exc())) raise + def get_mask(listen_events): if not listen_events: listen_events = default_listen_events @@ -448,6 +434,24 @@ def backup_files(handler, paths, recurse, include, log=default_log): return 0 +def calculate_s3_etag(file_path, chunk_size=8 * 1024 * 1024): + md5s = [] + + with open(file_path, 'rb') as fp: + while True: + data = fp.read(chunk_size) + if not data: + break + md5s.append(hashlib.md5(data)) + + if len(md5s) == 1: + return '{}'.format(md5s[0].hexdigest()) + + digests = b''.join(m.digest() for m in md5s) + digests_md5 = hashlib.md5(digests) + return '{}-{}'.format(digests_md5.hexdigest(), len(md5s)) + + def main(): parser = argparse.ArgumentParser(description='Tablesnap is a script that ' 'uses inotify to monitor a directory for events and reacts to them by ' @@ -483,8 +487,11 @@ def main(): parser.add_argument('--without-index', action='store_true', default=False, help='Do not store a JSON representation of the current directory ' 'listing in S3 when uploading a file to S3.') - parser.add_argument('--with-sse', action='store_true', default=False, - help='Enable server-side encryption for all uploads to S3.') + parser.add_argument('--with-sse', nargs='?', const='AES256', + help='Enable server-side encryption for all uploads to S3. Provide either a ' + 'KMS key ID or else leave it empty to use AES256 (default)') + parser.add_argument('--with-tokens', + help='Get a list of tokens for the node as well based on provided IP') parser.add_argument('--keyname-separator', default=':', help='Separator for the keyname between name and path.') parser.add_argument('-t', '--threads', default=default_threads, @@ -512,18 +519,15 @@ def main(): 'WARNING: If neither exclude nor include are defined, then all ' 'files matching "-tmp" are excluded. This option may be used ' 'more than once.') - parser.add_argument('--max-upload-size', default=max_file_size, help='Max size for files to be uploaded before doing multipart ' '(default %dM)' % max_file_size) parser.add_argument('--multipart-chunk-size', default=default_chunk_size, help='Chunk size for multipart uploads (default: %dM or 10%%%% of ' 'free memory if default is not available)' % default_chunk_size) - parser.add_argument('--retries', default=0, type=int, help='If a file upload fails, retry N times before throwing an' - 'exception (default: no retrying)') - + 'exception, backing off linearly between each retry (default: no retrying)') args = parser.parse_args() # For backwards-compatibility: If neither exclude nor include are set, @@ -538,20 +542,16 @@ def main(): # potential thread-safety problems. if args.aws_token: - s3 = boto.s3.connect_to_region(args.aws_region, - aws_access_key_id=args.aws_key, - aws_secret_access_key=args.aws_secret, - security_token=args.aws_token) + boto3.client('s3', aws_access_key_id=args.aws_key, + aws_secret_access_key=args.aws_secret, + security_token=args.aws_token) else: - s3 = boto.s3.connect_to_region(args.aws_region, - aws_access_key_id=args.aws_key, - aws_secret_access_key=args.aws_secret) - bucket = s3.get_bucket(args.bucket) + boto3.client('s3', aws_access_key_id=args.aws_key, + aws_secret_access_key=args.aws_secret) handler = UploadHandler(threads=args.threads, key=args.aws_key, secret=args.aws_secret, token=args.aws_token, - region=args.aws_region, - bucket_name=bucket, + region=args.aws_region, bucket=args.bucket, prefix=args.prefix, name=args.name, include=include, reduced_redundancy=args.reduced_redundancy, @@ -562,7 +562,8 @@ def main(): max_size=int(args.max_upload_size), chunk_size=int(args.multipart_chunk_size), md5_on_start=args.md5_on_start, - retries=(args.retries + 1)) + retries=(args.retries + 1), + with_tokens=args.with_tokens) wm = pyinotify.WatchManager() notifier = pyinotify.Notifier(wm, handler) @@ -582,4 +583,4 @@ def main(): notifier.loop() if __name__ == '__main__': - sys.exit(main()) + sys.exit(main()) \ No newline at end of file