diff --git a/lib/Config.js b/lib/Config.js index 8611f3ac..30f4b90a 100644 --- a/lib/Config.js +++ b/lib/Config.js @@ -172,6 +172,13 @@ class Config { + 'expireMetrics must be a boolean'); this.expireMetrics = config.expireMetrics; } + + if (config.onlyCountLatestWhenObjectLocked !== undefined) { + assert(typeof config.onlyCountLatestWhenObjectLocked === 'boolean', + 'bad config: onlyCountLatestWhenObjectLocked must be a boolean'); + this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked; + } + return config; } } diff --git a/lib/UtapiReindex.js b/lib/UtapiReindex.js index b3786da8..8c98648c 100644 --- a/lib/UtapiReindex.js +++ b/lib/UtapiReindex.js @@ -61,6 +61,8 @@ class UtapiReindex { this._log = new werelogs.Logger('UtapiReindex', { level, dump }); } + this._onlyCountLatestWhenObjectLocked = (config && config.onlyCountLatestWhenObjectLocked === true); + this._requestLogger = this._log.newRequestLogger(); } @@ -97,6 +99,7 @@ class UtapiReindex { if (this._sentinel.sentinelPassword) { flags.redis_password = this._sentinel.sentinelPassword; } + /* eslint-enable camelcase */ const opts = []; Object.keys(flags) @@ -105,6 +108,10 @@ class UtapiReindex { opts.push(name); opts.push(flags[flag]); }); + + if (this._onlyCountLatestWhenObjectLocked) { + opts.push('--only-latest-when-locked'); + } return opts; } diff --git a/lib/reindex/s3_bucketd.py b/lib/reindex/s3_bucketd.py index 4a4b238b..30ba4829 100644 --- a/lib/reindex/s3_bucketd.py +++ b/lib/reindex/s3_bucketd.py @@ -10,7 +10,6 @@ import urllib from collections import defaultdict, namedtuple from concurrent.futures import ThreadPoolExecutor -from pprint import pprint import redis import requests @@ -34,6 +33,8 @@ def get_options(): parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers") parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed") parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request") + parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy") + parser.add_argument("--debug", action='store_true', help="Enable debug logging") return parser.parse_args() def chunks(iterable, size): @@ -49,7 +50,7 @@ def inner(*args, **kwargs): return urllib.parse.quote(val.encode('utf-8')) return inner -Bucket = namedtuple('Bucket', ['userid', 'name']) +Bucket = namedtuple('Bucket', ['userid', 'name', 'object_lock_enabled'], defaults=[False]) MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id']) BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size']) @@ -64,12 +65,14 @@ def __init__(self, bucket): class BucketDClient: '''Performs Listing calls against bucketd''' - __url_format = '{addr}/default/bucket/{bucket}' + __url_attribute_format = '{addr}/default/attributes/{bucket}' + __url_bucket_format = '{addr}/default/bucket/{bucket}' __headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"} - def __init__(self, bucketd_addr=None, max_retries=2): + def __init__(self, bucketd_addr=None, max_retries=2, only_latest_when_locked=False): self._bucketd_addr = bucketd_addr self._max_retries = max_retries + self._only_latest_when_locked = only_latest_when_locked self._session = requests.Session() def _do_req(self, url, check_500=True, **kwargs): @@ -101,7 +104,7 @@ def _list_bucket(self, bucket, **kwargs): parameters value. On the first request the function will be called with `None` and should return its initial value. Return `None` for the param to be excluded. ''' - url = self.__url_format.format(addr=self._bucketd_addr, bucket=bucket) + url = self.__url_bucket_format.format(addr=self._bucketd_addr, bucket=bucket) static_params = {k: v for k, v in kwargs.items() if not callable(v)} dynamic_params = {k: v for k, v in kwargs.items() if callable(v)} is_truncated = True # Set to True for first loop @@ -114,6 +117,9 @@ def _list_bucket(self, bucket, **kwargs): _log.debug('listing bucket bucket: %s params: %s'%( bucket, ', '.join('%s=%s'%p for p in params.items()))) resp = self._do_req(url, params=params) + if resp.status_code == 404: + _log.debug('Bucket not found bucket: %s'%bucket) + return if resp.status_code == 200: payload = resp.json() except ValueError as e: @@ -135,6 +141,27 @@ def _list_bucket(self, bucket, **kwargs): else: is_truncated = len(payload) > 0 + def _get_bucket_attributes(self, name): + url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name) + try: + resp = self._do_req(url) + if resp.status_code == 200: + return resp.json() + else: + _log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code)) + raise InvalidListing(name) + except ValueError as e: + _log.exception(e) + _log.error('Invalid attributes response body! bucket:%s'%name) + raise + except MaxRetriesReached: + _log.error('Max retries reached getting bucket attributes bucket:%s'%name) + raise + except Exception as e: + _log.exception(e) + _log.error('Unhandled exception getting bucket attributes bucket:%s'%name) + raise + def list_buckets(self, name = None): def get_next_marker(p): @@ -149,11 +176,17 @@ def get_next_marker(p): } for _, payload in self._list_bucket(USERS_BUCKET, **params): buckets = [] - for result in payload['Contents']: + for result in payload.get('Contents', []): match = re.match("(\w+)..\|..(\w+.*)", result['key']) bucket = Bucket(*match.groups()) if name is None or bucket.name == name: + # We need to get the attributes for each bucket to determine if it is locked + if self._only_latest_when_locked: + bucket_attrs = self._get_bucket_attributes(bucket.name) + object_lock_enabled = bucket_attrs.get('objectLockEnabled', False) + bucket = bucket._replace(object_lock_enabled=object_lock_enabled) buckets.append(bucket) + if buckets: yield buckets if name is not None: @@ -196,18 +229,12 @@ def get_next_upload_id(p): upload_id=key['value']['UploadId'])) return keys - def _sum_objects(self, bucket, listing): + def _sum_objects(self, bucket, listing, only_latest_when_locked = False): count = 0 total_size = 0 - last_master = None - last_size = None - for status_code, payload in listing: - contents = payload['Contents'] if isinstance(payload, dict) else payload - if contents is None: - _log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket, status_code)) - raise InvalidListing(bucket) - for obj in contents: - count += 1 + last_key = None + try: + for obj in listing: if isinstance(obj['value'], dict): # bucketd v6 returns a dict: data = obj.get('value', {}) @@ -216,39 +243,51 @@ def _sum_objects(self, bucket, listing): # bucketd v7 returns an encoded string data = json.loads(obj['value']) size = data.get('content-length', 0) - total_size += size - # If versioned, subtract the size of the master to avoid double counting - if last_master is not None and obj['key'].startswith(last_master + '\x00'): - _log.debug('Detected versioned key: %s - subtracting master size: %i'% ( - obj['key'], - last_size, - )) - total_size -= last_size - count -= 1 - last_master = None - - # Only save master versions - elif '\x00' not in obj['key']: - last_master = obj['key'] - last_size = size + is_latest = obj['key'] != last_key + last_key = obj['key'] + + if only_latest_when_locked and bucket.object_lock_enabled and not is_latest: + _log.debug('Skipping versioned key: %s'%obj['key']) + continue + count += 1 + total_size += size + + except InvalidListing: + _log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket.name, status_code)) + raise InvalidListing(bucket.name) return count, total_size + def _extract_listing(self, key, listing): + for status_code, payload in listing: + contents = payload[key] if isinstance(payload, dict) else payload + if contents is None: + raise InvalidListing('') + for obj in contents: + yield obj + def count_bucket_contents(self, bucket): - def get_next_marker(p): - if p is None or len(p) == 0: + def get_key_marker(p): + if p is None: + return '' + return p.get('NextKeyMarker', '') + + def get_vid_marker(p): + if p is None: return '' - return p[-1].get('key', '') + return p.get('NextVersionIdMarker', '') params = { - 'listingType': 'Basic', + 'listingType': 'DelimiterVersions', 'maxKeys': 1000, - 'gt': get_next_marker, + 'keyMarker': get_key_marker, + 'versionIdMarker': get_vid_marker, } - count, total_size = self._sum_objects(bucket.name, self._list_bucket(bucket.name, **params)) + listing = self._list_bucket(bucket.name, **params) + count, total_size = self._sum_objects(bucket, self._extract_listing('Versions', listing), self._only_latest_when_locked) return BucketContents( bucket=bucket, obj_count=count, @@ -256,7 +295,8 @@ def get_next_marker(p): ) def count_mpu_parts(self, mpu): - _bucket = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name + shadow_bucket_name = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name + shadow_bucket = mpu.bucket._replace(name=shadow_bucket_name) def get_prefix(p): if p is None: @@ -276,9 +316,10 @@ def get_next_marker(p): 'listingType': 'Delimiter', } - count, total_size = self._sum_objects(_bucket, self._list_bucket(_bucket, **params)) + listing = self._list_bucket(shadow_bucket_name, **params) + count, total_size = self._sum_objects(shadow_bucket, self._extract_listing('Contents', listing)) return BucketContents( - bucket=mpu.bucket._replace(name=_bucket), + bucket=shadow_bucket, obj_count=0, # MPU parts are not counted towards numberOfObjects total_size=total_size ) @@ -361,7 +402,9 @@ def log_report(resource, name, obj_count, total_size): if options.bucket is not None and not options.bucket.strip(): print('You must provide a bucket name with the --bucket flag') sys.exit(1) - bucket_client = BucketDClient(options.bucketd_addr, options.max_retries) + if options.debug: + _log.setLevel(logging.DEBUG) + bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked) redis_client = get_redis_client(options) account_reports = {} observed_buckets = set() diff --git a/package.json b/package.json index 39417cad..2e29ed8f 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "engines": { "node": ">=16" }, - "version": "7.70.2", + "version": "7.70.3", "description": "API for tracking resource utilization and reporting metrics", "main": "index.js", "repository": { diff --git a/tests/utils/mock/BucketD.js b/tests/utils/mock/BucketD.js index bd65d325..302f7c20 100644 --- a/tests/utils/mock/BucketD.js +++ b/tests/utils/mock/BucketD.js @@ -112,6 +112,17 @@ class BucketD { return body; } + _getBucketVersionResponse(bucketName) { + const body = { + CommonPrefixes: [], + IsTruncated: false, + Versions: (this._bucketContent[bucketName] || []) + // patch in a versionId to more closely match the real response + .map(entry => ({ ...entry, versionId: 'null' })), + }; + return body; + } + _getShadowBucketOverviewResponse(bucketName) { const mpus = (this._bucketContent[bucketName] || []).map(o => ({ key: o.key, @@ -137,6 +148,8 @@ class BucketD { || req.query.listingType === 'Delimiter' ) { req.body = this._getBucketResponse(bucketName); + } else if (req.query.listingType === 'DelimiterVersions') { + req.body = this._getBucketVersionResponse(bucketName); } // v2 reindex uses `Basic` listing type for everything