Skip to content

Commit

Permalink
impr(UTAPI-97): Add config option to reindex only latest version in o…
Browse files Browse the repository at this point in the history
…bject locked buckets
  • Loading branch information
tmacro committed Nov 16, 2023
1 parent 9f36624 commit 64e5668
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 13 deletions.
7 changes: 7 additions & 0 deletions lib/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ class Config {
+ 'expireMetrics must be a boolean');
this.expireMetrics = config.expireMetrics;
}

if (config.onlyCountLatestWhenObjectLocked !== undefined) {
assert(typeof config.onlyCountLatestWhenObjectLocked === 'boolean',
'bad config: onlyCountLatestWhenObjectLocked must be a boolean');
this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked;
}

return config;
}
}
Expand Down
9 changes: 9 additions & 0 deletions lib/UtapiReindex.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class UtapiReindex {
this._log = new werelogs.Logger('UtapiReindex', { level, dump });
}

if (config && config.onlyCountLatestWhenObjectLocked) {
this._onlyCountLatestWhenObjectLocked = true;
}

this._requestLogger = this._log.newRequestLogger();
}

Expand Down Expand Up @@ -97,6 +101,7 @@ class UtapiReindex {
if (this._sentinel.sentinelPassword) {
flags.redis_password = this._sentinel.sentinelPassword;
}

/* eslint-enable camelcase */
const opts = [];
Object.keys(flags)
Expand All @@ -105,6 +110,10 @@ class UtapiReindex {
opts.push(name);
opts.push(flags[flag]);
});

if (this._onlyCountLatestWhenObjectLocked) {
opts.push('--only-latest-when-locked');
}
return opts;
}

Expand Down
67 changes: 54 additions & 13 deletions lib/reindex/s3_bucketd.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def get_options():
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers")
parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed")
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request")
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy")
parser.add_argument("--debug", action='store_true', help="Enable debug logging")
return parser.parse_args()

def chunks(iterable, size):
Expand All @@ -49,7 +51,7 @@ def inner(*args, **kwargs):
return urllib.parse.quote(val.encode('utf-8'))
return inner

Bucket = namedtuple('Bucket', ['userid', 'name'])
Bucket = namedtuple('Bucket', ['userid', 'name', 'object_locked'], defaults=[False])
MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id'])
BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size'])

Expand All @@ -64,12 +66,14 @@ def __init__(self, bucket):
class BucketDClient:

'''Performs Listing calls against bucketd'''
__url_format = '{addr}/default/bucket/{bucket}'
__url_attribute_format = '{addr}/default/attributes/{bucket}'
__url_bucket_format = '{addr}/default/bucket/{bucket}'
__headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"}

def __init__(self, bucketd_addr=None, max_retries=2):
def __init__(self, bucketd_addr=None, max_retries=2, only_latest_when_locked=False):
self._bucketd_addr = bucketd_addr
self._max_retries = max_retries
self._only_latest_when_locked = only_latest_when_locked
self._session = requests.Session()

def _do_req(self, url, check_500=True, **kwargs):
Expand Down Expand Up @@ -101,7 +105,7 @@ def _list_bucket(self, bucket, **kwargs):
parameters value. On the first request the function will be called with
`None` and should return its initial value. Return `None` for the param to be excluded.
'''
url = self.__url_format.format(addr=self._bucketd_addr, bucket=bucket)
url = self.__url_bucket_format.format(addr=self._bucketd_addr, bucket=bucket)
static_params = {k: v for k, v in kwargs.items() if not callable(v)}
dynamic_params = {k: v for k, v in kwargs.items() if callable(v)}
is_truncated = True # Set to True for first loop
Expand All @@ -114,6 +118,9 @@ def _list_bucket(self, bucket, **kwargs):
_log.debug('listing bucket bucket: %s params: %s'%(
bucket, ', '.join('%s=%s'%p for p in params.items())))
resp = self._do_req(url, params=params)
if resp.status_code == 404:
_log.debug('Bucket not found bucket: %s'%bucket)
return
if resp.status_code == 200:
payload = resp.json()
except ValueError as e:
Expand All @@ -135,6 +142,26 @@ def _list_bucket(self, bucket, **kwargs):
else:
is_truncated = len(payload) > 0

def _get_bucket_attributes(self, name):
url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name)
try:
resp = self._do_req(url)
if resp.status_code == 200:
return resp.json()
else:
_log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code))
raise InvalidListing(name)
except ValueError as e:
_log.exception(e)
_log.error('Invalid attributes response body! bucket:%s'%name)
except MaxRetriesReached:
_log.error('Max retries reached getting bucket attributes bucket:%s'%name)
raise
except Exception as e:
_log.exception(e)
_log.error('Unhandled exception getting bucket attributes bucket:%s'%name)
raise

def list_buckets(self, name = None):

def get_next_marker(p):
Expand All @@ -149,11 +176,17 @@ def get_next_marker(p):
}
for _, payload in self._list_bucket(USERS_BUCKET, **params):
buckets = []
for result in payload['Contents']:
for result in payload.get('Contents', []):
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups())
if name is None or bucket.name == name:
# We need to get the attributes for each bucket to determine if it is locked
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
obj_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_locked=obj_lock_enabled)
buckets.append(bucket)

if buckets:
yield buckets
if name is not None:
Expand Down Expand Up @@ -204,10 +237,9 @@ def _sum_objects(self, bucket, listing):
for status_code, payload in listing:
contents = payload['Contents'] if isinstance(payload, dict) else payload
if contents is None:
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket, status_code))
raise InvalidListing(bucket)
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket.name, status_code))
raise InvalidListing(bucket.name)
for obj in contents:
count += 1
if isinstance(obj['value'], dict):
# bucketd v6 returns a dict:
data = obj.get('value', {})
Expand All @@ -216,6 +248,12 @@ def _sum_objects(self, bucket, listing):
# bucketd v7 returns an encoded string
data = json.loads(obj['value'])
size = data.get('content-length', 0)

if self._only_latest_when_locked and bucket.object_locked and '\x00' in obj['key']:
_log.debug('Skipping versioned key: %s'%obj['key'])
continue

count += 1
total_size += size

# If versioned, subtract the size of the master to avoid double counting
Expand Down Expand Up @@ -248,15 +286,16 @@ def get_next_marker(p):
'gt': get_next_marker,
}

count, total_size = self._sum_objects(bucket.name, self._list_bucket(bucket.name, **params))
count, total_size = self._sum_objects(bucket, self._list_bucket(bucket.name, **params))
return BucketContents(
bucket=bucket,
obj_count=count,
total_size=total_size
)

def count_mpu_parts(self, mpu):
_bucket = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket_name = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket = mpu.bucket._replace(name=shadow_bucket_name)

def get_prefix(p):
if p is None:
Expand All @@ -276,9 +315,9 @@ def get_next_marker(p):
'listingType': 'Delimiter',
}

count, total_size = self._sum_objects(_bucket, self._list_bucket(_bucket, **params))
count, total_size = self._sum_objects(shadow_bucket, self._list_bucket(shadow_bucket_name, **params))
return BucketContents(
bucket=mpu.bucket._replace(name=_bucket),
bucket=shadow_bucket,
obj_count=0, # MPU parts are not counted towards numberOfObjects
total_size=total_size
)
Expand Down Expand Up @@ -361,7 +400,9 @@ def log_report(resource, name, obj_count, total_size):
if options.bucket is not None and not options.bucket.strip():
print('You must provide a bucket name with the --bucket flag')
sys.exit(1)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries)
if options.debug:
_log.setLevel(logging.DEBUG)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked)
redis_client = get_redis_client(options)
account_reports = {}
observed_buckets = set()
Expand Down

0 comments on commit 64e5668

Please sign in to comment.