Skip to content

Commit

Permalink
Merge pull request #2 from ConvertKit/feature/s3_ia_storage_class
Browse files Browse the repository at this point in the history
Upgrade to boto3 and add support to define a S3 custom storage class
  • Loading branch information
ivanmp91 authored Aug 5, 2019
2 parents ea902e9 + 2482e92 commit dcfdef5
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 128 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
1.2.0
* Add support for boto3
* Add support to upload objects on S3 with a custom storage class

1.1.0
* Add file timestamps attributes to S3 object metadata on each sstable
* Improved restoration process

1.0.1
* add support for cn-north-1 region
* add support for password authentication
Expand Down
2 changes: 1 addition & 1 deletion cassandra_snapshotter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '1.1.0'
__version__ = '1.2.0'
__maintainer__ = 'Tommaso Barbugli'
__email__ = '[email protected]'
103 changes: 39 additions & 64 deletions cassandra_snapshotter/agent.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import (absolute_import, print_function)

import boto
from boto.s3.connection import S3Connection
from yaml import load

import boto3
from botocore.exceptions import ClientError

try:
# LibYAML based parser and emitter
from yaml import CLoader as Loader
Expand All @@ -17,10 +17,11 @@
from multiprocessing.dummy import Pool

from cassandra_snapshotter import logging_helper
from cassandra_snapshotter.s3_multipart_upload import S3MultipartUpload
from cassandra_snapshotter.timeout import timeout
from cassandra_snapshotter.utils import (add_s3_arguments, base_parser,
map_wrap, get_s3_connection_host,
check_lzop, check_pv, compressed_pipe)
map_wrap, check_lzop,
check_pv, compressed_pipe)

DEFAULT_CONCURRENCY = max(multiprocessing.cpu_count() - 1, 1)
BUFFER_SIZE = 64 # Default bufsize is 64M
Expand All @@ -30,28 +31,29 @@
SLEEP_MULTIPLIER = 3
UPLOAD_TIMEOUT = 600
DEFAULT_REDUCED_REDUNDANCY=False
DEFAULT_INFREQUENT_ACCESS=False

logging_helper.configure(
format='%(name)-12s %(levelname)-8s %(message)s')

logger = logging_helper.CassandraSnapshotterLogger('cassandra_snapshotter.agent')
boto.set_stream_logger('boto', logging.WARNING)
boto3.set_stream_logger('boto', logging.WARNING)


def get_bucket(
s3_bucket, aws_access_key_id,
aws_secret_access_key, s3_connection_host):
connection = S3Connection(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
host=s3_connection_host
aws_secret_access_key, s3_bucket_region):
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=s3_bucket_region
)
return connection.get_bucket(s3_bucket, validate=False)
return s3.bucket(s3_bucket)


def destination_path(s3_base_path, file_path, compressed=True):
suffix = compressed and '.lzo' or ''
return '/'.join([s3_base_path, file_path + suffix])
return s3_base_path + file_path + suffix


def s3_progress_update_callback(*args):
Expand All @@ -60,46 +62,51 @@ def s3_progress_update_callback(*args):


@map_wrap
def upload_file(bucket, source, destination, s3_ssenc, bufsize, reduced_redundancy, rate_limit, quiet):
def upload_file(s3_bucket, aws_access_key_id, aws_secret_access_key, s3_bucket_region, source, destination, s3_ssenc, bufsize, rate_limit, storage_class, quiet):
mp = None
retry_count = 0
sleep_time = SLEEP_TIME
while True:
try:
if mp is None:
# Initiate the multi-part upload.
try:
mtime_epoch = os.path.getmtime(source)
atime_epoch = os.path.getatime(source)
ctime_epoch = os.path.getctime(source)
file_mtime = time.strftime('%Y-%m-%d:%H:%M:%S:%Z', time.localtime(mtime_epoch))
mp = bucket.initiate_multipart_upload(destination, encrypt_key=s3_ssenc, reduced_redundancy=reduced_redundancy, metadata={'modified': file_mtime, 'mtime': mtime_epoch, 'atime': atime_epoch, 'ctime': ctime_epoch})
# Initiate the multi-part upload.
mp = S3MultipartUpload(
bucket=s3_bucket,
key=destination,
local_path=source,
region_name=s3_bucket_region,
logger=logger,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
metadata={'modified': str(file_mtime), 'mtime': str(mtime_epoch), 'atime': str(atime_epoch), 'ctime': str(ctime_epoch)},
storage_class=storage_class
)
mpu_id = mp.create()
logger.info("Initialized multipart upload for file {!s} to {!s}".format(source, destination))
except Exception as exc:
logger.error("Error while initializing multipart upload for file {!s} to {!s}".format(source, destination))
logger.error(exc.message)
raise

try:
for i, chunk in enumerate(compressed_pipe(source, bufsize, rate_limit, quiet)):
mp.upload_part_from_file(chunk, i + 1, cb=s3_progress_update_callback)
parts = mp.upload(mpu_id, bufsize, rate_limit, quiet)
except Exception as exc:
logger.error("Error uploading file {!s} to {!s}".format(source, destination))
logger.error(exc.message)
raise

try:
mp.complete_upload()
result=mp.complete(mpu_id, parts)
logger.info("Finished multipart upload for file {!s} to {!s}".format(source,result['Key']))
except Exception as exc:
logger.error("Error completing multipart file upload for file {!s} to {!s}".format(source, destination))
logger.error(exc.message)
# The multi-part object may be in a bad state. Extract an error
# message if we can, then discard it.
try:
logger.error(mp.to_xml())
except Exception as exc:
pass
cancel_upload(bucket, mp, destination)
mp.abort_all()
mp = None
raise

Expand All @@ -112,7 +119,7 @@ def upload_file(bucket, source, destination, s3_ssenc, bufsize, reduced_redundan
logger.error("Retried too many times uploading file {!s}".format(source))
# Abort the multi-part upload if it was ever initiated.
if mp is not None:
cancel_upload(bucket, mp, destination)
mp.abort_all()
return None
else:
logger.info("Sleeping before retry")
Expand All @@ -126,48 +133,23 @@ def upload_file(bucket, source, destination, s3_ssenc, bufsize, reduced_redundan
def upload_chunk(mp, chunk, index):
mp.upload_part_from_file(chunk, index)


def cancel_upload(bucket, mp, remote_path):
"""
Safe way to cancel a multipart upload
sleeps SLEEP_TIME seconds and then makes sure that there are not parts left
in storage
"""
attempts = 0
while attempts < 5:
try:
time.sleep(SLEEP_TIME)
mp.cancel_upload()
time.sleep(SLEEP_TIME)
for mp in bucket.list_multipart_uploads():
if mp.key_name == remote_path:
mp.cancel_upload()
return
except Exception:
logger.error("Error while cancelling multipart upload")
attempts += 1


def put_from_manifest(
s3_bucket, s3_connection_host, s3_ssenc, s3_base_path,
s3_bucket, s3_bucket_region, s3_ssenc, s3_base_path,
aws_access_key_id, aws_secret_access_key, manifest,
bufsize, reduced_redundancy, rate_limit, quiet,
bufsize, rate_limit, storage_class, quiet,
concurrency=None, incremental_backups=False):
"""
Uploads files listed in a manifest to amazon S3
to support larger than 5GB files multipart upload is used (chunks of 60MB)
files are uploaded compressed with lzop, the .lzo suffix is appended
"""
exit_code = 0
bucket = get_bucket(
s3_bucket, aws_access_key_id,
aws_secret_access_key, s3_connection_host)
manifest_fp = open(manifest, 'r')
buffer_size = int(bufsize * MBFACTOR)
files = manifest_fp.read().splitlines()
pool = Pool(concurrency)
for f in pool.imap(upload_file,
((bucket, f, destination_path(s3_base_path, f), s3_ssenc, buffer_size, reduced_redundancy, rate_limit, quiet)
((s3_bucket, aws_access_key_id, aws_secret_access_key, s3_bucket_region,f, destination_path(s3_base_path, f), s3_ssenc, buffer_size, rate_limit, storage_class, quiet)
for f in files if f)):
if f is None:
# Upload failed.
Expand Down Expand Up @@ -265,13 +247,6 @@ def main():
type=int,
help="Compress and upload concurrent processes")

put_parser.add_argument(
'--reduced-redundancy',
required=False,
default=DEFAULT_REDUCED_REDUNDANCY,
action="store_true",
help="Compress and upload concurrent processes")

put_parser.add_argument(
'--rate-limit',
required=False,
Expand Down Expand Up @@ -317,15 +292,15 @@ def main():

put_from_manifest(
args.s3_bucket_name,
get_s3_connection_host(args.s3_bucket_region),
args.s3_bucket_region,
args.s3_ssenc,
args.s3_base_path,
args.aws_access_key_id,
args.aws_secret_access_key,
args.manifest,
args.bufsize,
args.reduced_redundancy,
args.rate_limit,
args.storage_class,
args.quiet,
args.concurrency,
args.incremental_backups
Expand Down
20 changes: 10 additions & 10 deletions cassandra_snapshotter/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# From package
from .snapshotting import (BackupWorker, RestoreWorker,
Snapshot, SnapshotCollection)
from .utils import (add_s3_arguments, get_s3_connection_host)
from .utils import add_s3_arguments
from .utils import base_parser as _base_parser

env.use_ssh_config = True
Expand Down Expand Up @@ -40,7 +40,7 @@ def run_backup(args):
args.aws_secret_access_key,
args.s3_base_path,
args.s3_bucket_name,
get_s3_connection_host(args.s3_bucket_region)
args.s3_bucket_region
).get_snapshot_for(
hosts=env.hosts,
keyspaces=env.keyspaces,
Expand All @@ -53,7 +53,6 @@ def run_backup(args):
aws_secret_access_key=args.aws_secret_access_key,
s3_bucket_region=args.s3_bucket_region,
s3_ssenc=args.s3_ssenc,
s3_connection_host=get_s3_connection_host(args.s3_bucket_region),
cassandra_conf_path=args.cassandra_conf_path,
nodetool_path=args.nodetool_path,
cassandra_bin_dir=args.cassandra_bin_dir,
Expand All @@ -64,8 +63,8 @@ def run_backup(args):
use_sudo=args.use_sudo,
connection_pool_size=args.connection_pool_size,
exclude_tables=args.exclude_tables,
reduced_redundancy=args.reduced_redundancy,
rate_limit=args.rate_limit,
storage_class=args.storage_class,
quiet=args.quiet
)

Expand All @@ -91,7 +90,7 @@ def list_backups(args):
args.aws_secret_access_key,
args.s3_base_path,
args.s3_bucket_name,
get_s3_connection_host(args.s3_bucket_region)
args.s3_bucket_region
)
path_snapshots = defaultdict(list)

Expand All @@ -113,7 +112,7 @@ def restore_backup(args):
args.aws_secret_access_key,
args.s3_base_path,
args.s3_bucket_name,
get_s3_connection_host(args.s3_bucket_region),
args.s3_bucket_region,
)
if args.snapshot_name == 'LATEST':
snapshot = snapshots.get_latest()
Expand All @@ -127,7 +126,8 @@ def restore_backup(args):
restore_dir=args.restore_dir,
no_sstableloader=args.no_sstableloader,
local_restore=args.local_restore,
s3_connection_host=get_s3_connection_host(args.s3_bucket_region))
s3_bucket_region=args.s3_bucket_region,
s3_bucket_name=args.s3_bucket_name)

if args.hosts:
hosts = args.hosts.split(',')
Expand Down Expand Up @@ -253,9 +253,9 @@ def main():
help="Number of simultaneous connections to cassandra nodes")

backup_parser.add_argument(
'--reduced-redundancy',
action='store_true',
help="Use S3 reduced redundancy")
'--storage-class',
default='STANDARD',
help="S3 storage class used to store objects")

backup_parser.add_argument(
'--rate-limit',
Expand Down
Loading

0 comments on commit dcfdef5

Please sign in to comment.