From 47efe61033454de98d2453d22ec279ee91b47bc4 Mon Sep 17 00:00:00 2001 From: Michael Bonfils Date: Mon, 12 Nov 2018 17:20:19 +0100 Subject: [PATCH] CH: fix container name used for MPU This commit will allow to store Parts into container dedicated to directory Previously, a MultiPartUpload to /bucket/dir/dir2/object was storing parts into /bucket/dir/dir2/object/uploadid/ container with object name 1, 2, 3... With this fix, parts will be stored in /bucket/dir/dir2/ with parts named object/uploadid/1, object/uploadid/2, ... --- .../common/middleware/container_hierarchy.py | 139 +++++++++++++----- .../middleware/test_container_hierarchy.py | 119 +++++++++++++-- 2 files changed, 209 insertions(+), 49 deletions(-) diff --git a/oioswift/common/middleware/container_hierarchy.py b/oioswift/common/middleware/container_hierarchy.py index 4a690127..b587650a 100644 --- a/oioswift/common/middleware/container_hierarchy.py +++ b/oioswift/common/middleware/container_hierarchy.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import json import importlib from paste.deploy import loadwsgi @@ -183,13 +184,13 @@ def _create_key(self, req, account, container, mode, path): LOG.warn("%s: failed to create key %s", self.SWIFT_SOURCE, key) def _remove_key(self, req, account, container, mode, path): - key = self.key(account, container, mode, path) + key = self.key(account, container, mode, path) + '/' if mode == CNT: # remove container key only if empty empty = not any(self._list_objects( req.environ.copy(), account, - [container] + path.split('/')[:-1], + [container] + path.split('/'), None, limit=1)) @@ -201,10 +202,50 @@ def _remove_key(self, req, account, container, mode, path): if not res: LOG.warn("%s: failed to remove key %s", self.SWIFT_SOURCE, key) + def _build_object_listing_mpu(self, start_response, env, + account, container, prefix, + limit=None, + recursive=False, marker=None): + """Implement specific listing for MPU""" + + LOG.debug("%s: MPU listing with %s %s %s %s %s %s", + self.SWIFT_SOURCE, account, container, prefix, limit, + recursive, marker) + + # FIXME(mb): should use marker for big MPU + # but it doesn't seems to manage truncated MPU listing in swift3 + prefix = prefix[0] if prefix else '' + path, _ = self._path(prefix.split('/'), True) + mpu_prefix = prefix[len(path):].lstrip('/') + + def header_cb(header_dict): + oheaders.update(header_dict) + + oheaders = dict() + ct_parts = [container] + if path: + ct_parts += path.strip('/').split('/') + ret = self._list_objects(env, account, ct_parts, header_cb, + mpu_prefix, limit=DEFAULT_LIMIT, + marker=marker) + + all_objs = list(ret) + all_objs.sort(key=lambda entry: entry.get('name', entry.get('subdir'))) + body = json.dumps(all_objs) + + oheaders['Content-Length'] = len(body) + start_response("200 OK", oheaders.items()) + return [body] + def _build_object_listing(self, start_response, env, account, container, prefix, limit=None, - recursive=False, marker=None): + recursive=False, marker=None, is_mpu=False): + + if is_mpu: + return self._build_object_listing_mpu(start_response, env, account, + container, prefix, limit, + recursive, marker) LOG.debug("%s: listing with %s %s %s %s %s %s", self.SWIFT_SOURCE, account, container, prefix, limit, @@ -216,7 +257,7 @@ def header_cb(header_dict): oheaders = dict() all_objs = [] - prefix = prefix[0] + prefix = prefix[0] if prefix else '' # have we to parse root container ? # / must be absent from prefix AND marker @@ -337,7 +378,25 @@ def header_cb(header_dict): return [body] - def _fake_container_and_obj(self, container, obj_parts, is_listing=False): + def _path(self, obj_parts, is_mpu, sep=None): + if not sep: + sep = self.DELIMITER + + # FIXME(mbo): a proper HEADER should be added in swift3 controller + # to help recognize manifest / part + if is_mpu: + for i, item in reversed(list(enumerate(obj_parts))): + if len(item) > 32: + try: + base64.b64decode(item) + return sep.join(obj_parts[:i-1]), i + except TypeError: + pass + LOG.error("MPU fails to detect UploadId") + return sep.join(obj_parts[:-1]), len(obj_parts) + + def _fake_container_and_obj(self, container, obj_parts, is_listing=False, + is_mpu=False): """ Aggregate object parts (except the last) into the container name. @@ -348,9 +407,12 @@ def _fake_container_and_obj(self, container, obj_parts, is_listing=False): [container] + obj_parts[:-2]) obj = obj_parts[-2] + self.DELIMITER else: - container = self.ENCODED_DELIMITER.join( - [container] + obj_parts[:-1]) - obj = obj_parts[-1] if obj_parts else '' + cnt, idx = self._path(obj_parts, is_mpu, + sep=self.ENCODED_DELIMITER) + if cnt: + container += self.ENCODED_DELIMITER + cnt + obj = obj_parts[idx-1:] if obj_parts else '' + obj = '/'.join(obj) return container, obj def _list_objects(self, env, account, ct_parts, header_cb, @@ -369,7 +431,7 @@ def _list_objects(self, env, account, ct_parts, header_cb, body='', swift_source=self.SWIFT_SOURCE) params = sub_req.params - params['delimiter'] = self.DELIMITER + params.pop('delimiter', None) # allow list-multipart-uploads params['limit'] = str(limit) # FIXME: why is it str? params['prefix'] = prefix params['format'] = 'json' @@ -377,6 +439,7 @@ def _list_objects(self, env, account, ct_parts, header_cb, params['marker'] = marker else: params.pop('marker', None) + sub_req.params = params resp = sub_req.get_response(self.app) obj_prefix = '' @@ -402,6 +465,16 @@ def should_bypass(self, env): return (env.get('REQUEST_METHOD') == 'TEST' or super(ContainerHierarchyMiddleware, self).should_bypass(env)) + def update_copy_headers(self, req, env2): + if 'Oio-Copy-From' in req.headers and req.method == 'PUT': + # TODO(mb): check if MPU is used here (with upload-part-copy) + _, c_container, c_obj = req.headers['Oio-Copy-From'].split('/', 2) + c_container, c_obj = \ + self._fake_container_and_obj(c_container, c_obj.split('/')) + # update Headers + req.headers['Oio-Copy-From'] = '/' + c_container + '/' + c_obj + env2['HTTP_OIO_COPY_FROM'] = '/' + c_container + '/' + c_obj + def __call__(self, env, start_response): if self.should_bypass(env): return self.app(env, start_response) @@ -416,31 +489,27 @@ def __call__(self, env, start_response): env2 = env.copy() qs = parse_qs(req.query_string or '') prefix = qs.get('prefix') # returns a list or None - if not prefix: - prefix = [''] marker = qs.get('marker') limit = qs.get('limit') + + # if obj and prefix are None with container+segments, we want the + # normal listing because it is the list-multipart-uploads operation + is_mpu = container.endswith("+segments") and (obj is not None + or prefix is not None) + LOG.debug("%s: Got %s request for container=%s, " - "obj=%s, prefix=%s marker=%s", - self.SWIFT_SOURCE, req.method, container, obj, prefix, - marker) + "obj=%s, prefix=%s marker=%s is_mpu=%d", + self.SWIFT_SOURCE, req.method, container, obj, + prefix, marker, is_mpu) must_recurse = False obj_parts = () - if 'Oio-Copy-From' in req.headers and req.method == 'PUT': - _, c_container, c_obj = req.headers['Oio-Copy-From'].split('/', 2) - c_container, c_obj = \ - self._fake_container_and_obj(c_container, c_obj.split('/')) - # update Headers - req.headers['Oio-Copy-From'] = '/' + c_container + '/' + c_obj - env2['HTTP_OIO_COPY_FROM'] = '/' + c_container + '/' + c_obj + self.update_copy_headers(req, env2) if obj is None: LOG.debug("%s: -> is a listing request", self.SWIFT_SOURCE) must_recurse = req.method == 'GET' and 'delimiter' not in qs - if not marker: - marker = None - else: + if marker: marker = marker[0] if not limit: limit = DEFAULT_LIMIT @@ -452,13 +521,14 @@ def __call__(self, env, start_response): LOG.debug("%s: -> is NOT listing request", self.SWIFT_SOURCE) obj_parts = obj.split(self.DELIMITER) if len(obj_parts) > 1: - path = self.DELIMITER.join(obj_parts[:-1]) + self.DELIMITER + # path = self.DELIMITER.join(obj_parts[:-1]) + self.DELIMITER + path, _ = self._path(obj_parts, is_mpu) is_dir = obj.endswith(self.DELIMITER) # TODO (MBO) we should accept create key d1/d2/ only # for empty object if req.method == 'PUT': self._create_key(req, account, container, - OBJ if is_dir else CNT, path) + OBJ if is_dir else CNT, path + '/') if is_dir: oheaders = {'Content-Length': 0, 'Etag': 'd41d8cd98f00b204e9800998ecf8427e'} @@ -473,20 +543,17 @@ def __call__(self, env, start_response): return [''] container2, obj2 = self._fake_container_and_obj(container, - obj_parts) + obj_parts, + is_mpu=is_mpu) LOG.debug("%s: Converted to container=%s, obj=%s, qs=%s", self.SWIFT_SOURCE, container2, obj2, qs) - if must_recurse: - res = self._build_object_listing(start_response, env, - account, container2, prefix, - limit=limit, recursive=True, - marker=marker) - elif qs.get('prefix') or qs.get('delimiter'): + if must_recurse or prefix or qs.get('delimiter'): res = self._build_object_listing(start_response, env, account, container2, prefix, - limit=limit, - recursive=False, marker=marker) + limit=limit, marker=marker, + recursive=must_recurse, + is_mpu=is_mpu) else: # should be other operation that listing if obj: @@ -496,7 +563,7 @@ def __call__(self, env, start_response): env2['PATH_INFO'] = "/v1/%s/%s" % (account, container2) res = self.app(env2, start_response) - if req.method == 'DELETE' and len(obj_parts) > 1: + if req.method == 'DELETE' and not is_mpu and len(obj_parts) > 1: # only remove marker self._remove_key(req, account, container, CNT, path) return res diff --git a/tests/unit/common/middleware/test_container_hierarchy.py b/tests/unit/common/middleware/test_container_hierarchy.py index cc550e84..b9fb9bfe 100644 --- a/tests/unit/common/middleware/test_container_hierarchy.py +++ b/tests/unit/common/middleware/test_container_hierarchy.py @@ -92,7 +92,7 @@ def test_recursive_listing(self): self.ch.conn.keys = mock.MagicMock(return_value=['CS:a:cnt:d1/d2/d3/']) self.app.register( 'GET', - '/v1/a/c%2Fd1%2Fd2%2Fd3?prefix=&limit=10000&delimiter=%2F&format=json', # noqa + '/v1/a/c%2Fd1%2Fd2%2Fd3?prefix=&limit=10000&format=json', swob.HTTPOk, {}, json.dumps([{"hash": "d41d8cd98f00b204e9800998ecf8427e", "last_modified": "2018-04-20T09:40:59.000000", @@ -109,7 +109,7 @@ def test_listing_with_space(self): self.ch.conn.keys = mock.MagicMock(return_value=['CS:a:cnt:d 1/d2/']) self.app.register( 'GET', - '/v1/a/c%2Fd 1%2Fd2?prefix=&limit=10000&delimiter=%2F&format=json', # noqa + '/v1/a/c%2Fd 1%2Fd2?prefix=&limit=10000&format=json', swob.HTTPOk, {}, json.dumps([{"hash": "d41d8cd98f00b204e9800998ecf8427e", "last_modified": "2018-04-20T09:40:59.000000", @@ -140,7 +140,7 @@ def test_delete_object(self): self.assertIn('CS:a:c:cnt:d1/d2/d3/', self.ch.conn._keys) self.app.register( - 'GET', '/v1/a/c%2Fd1%2Fd2%2Fd3?delimiter=%2F&limit=1&prefix=&format=json', # noqa + 'GET', '/v1/a/c%2Fd1%2Fd2%2Fd3?prefix=&limit=1&format=json', swob.HTTPOk, {}, json.dumps([{"hash": "d41d8cd98f00b204e9800998ecf8427e", "last_modified": "2018-04-20T09:40:59.000000", @@ -151,26 +151,28 @@ def test_delete_object(self): req = Request.blank('/v1/a/c/d1/d2/d3/o', method='DELETE') resp = self.call_ch(req) + self.assertEqual(resp[0], '204 No Content') self.assertIn('CS:a:c:cnt:d1/d2/d3/', self.ch.conn._keys) self.app.register( - 'GET', '/v1/a/c%2Fd1%2Fd2%2Fd3?delimiter=%2F&limit=1&prefix=&format=json', # noqa + 'GET', '/v1/a/c%2Fd1%2Fd2%2Fd3?prefix=&limit=1&format=json', swob.HTTPOk, {}, json.dumps([])) req = Request.blank('/v1/a/c/d1/d2/d3/o', method='DELETE') resp = self.call_ch(req) self.assertEqual(resp[0], '204 No Content') + self.assertNotIn('CS:a:c:cnt:d1/d2/d3/', self.ch.conn._keys) def test_fake_directory(self): - req = Request.blank('/v1/a/d1/d2/d3/', method='PUT') + req = Request.blank('/v1/a/container/d2/d3/', method='PUT') resp = self.call_ch(req) - self.assertIn('CS:a:d1:obj:d2/d3/', self.ch.conn._keys) - req = Request.blank('/v1/a/d1/d2/d3/', method='DELETE') + self.assertIn('CS:a:container:obj:d2/d3/', self.ch.conn._keys) + req = Request.blank('/v1/a/container/d2/d3/', method='DELETE') resp = self.call_ch(req) self.assertEqual(resp[0], "204 No Content") - self.assertNotIn('CS:a:d1:obj:d2/d3/', self.ch.conn._keys) + self.assertNotIn('CS:a:container:obj:d2/d3/', self.ch.conn._keys) def _listing(self, is_recursive): self.ch.conn.keys = mock.MagicMock( @@ -178,7 +180,7 @@ def _listing(self, is_recursive): self.ch.conn.exist = mock.MagicMock(return_value=True) self.app.register( 'GET', - '/v1/a/bucket%2Fd1?prefix=d&limit=10000&delimiter=%2F&format=json', # noqa + '/v1/a/bucket%2Fd1?prefix=d&limit=10000&format=json', swob.HTTPOk, {}, json.dumps([{"hash": "d41d8cd98f00b204e9800998ecf8427e", "last_modified": "2018-04-20T09:40:59.000000", @@ -187,7 +189,7 @@ def _listing(self, is_recursive): if is_recursive: self.app.register( 'GET', - '/v1/a/bucket%2Fd1%2Fd2?prefix=&limit=10000&delimiter=%2F&format=json', # noqa + '/v1/a/bucket%2Fd1%2Fd2?prefix=&limit=10000&format=json', swob.HTTPOk, {}, json.dumps([{"hash": "d41d8cd98f00b204e9800998ecf8427e", "last_modified": "2018-04-20T09:40:59.000000", @@ -217,7 +219,7 @@ def test_listing_root_container(self): return_value=['CS:a:bucket:cnt:d1/']) self.app.register( 'GET', - '/v1/a/bucket?prefix=d&limit=10000&delimiter=%2F&format=json', # noqa + '/v1/a/bucket?prefix=d&limit=10000&format=json', swob.HTTPOk, {}, json.dumps([{"hash": "d41d8cd98f00b204e9800998ecf8427e", "last_modified": "2018-04-20T09:40:59.000000", @@ -253,7 +255,7 @@ def test_listing_with_marker_multi_container(self): # with marker aa (as we inspect d1/) self.app.register( 'GET', - '/v1/a/bucket%2Fd1?marker=aa&delimiter=%2F&limit=10000&prefix=&format=json', # noqa + '/v1/a/bucket%2Fd1?marker=aa&prefix=&limit=10000&format=json', # noqa swob.HTTPOk, {}, json.dumps([{"hash": "d41d8cd98f00b204e9800998ecf8427e", "last_modified": "2018-04-20T09:40:59.000000", @@ -262,7 +264,7 @@ def test_listing_with_marker_multi_container(self): # without marker on second container self.app.register( 'GET', - '/v1/a/bucket%2Fd2?delimiter=%2F&limit=10000&prefix=&format=json', # noqa + '/v1/a/bucket%2Fd2?prefix=&limit=10000&format=json', swob.HTTPOk, {}, json.dumps([{"hash": "d41d8cd98f00b204e9800998ecf8427e", "last_modified": "2018-04-20T09:40:59.000000", @@ -306,3 +308,94 @@ def test_invalid_path(self): with self.assertRaises(HTTPException) as cm: self.call_ch(req) self.assertEqual(cm.exception.status, "400 Bad Request") + + def test_path(self): + cont = 'bucket' + + path = 'dir1/dir2/object' + res = self.ch._fake_container_and_obj(cont, path.split('/')) + self.assertEqual(res, (cont + '%2Fdir1%2Fdir2', 'object')) + + path = 'object' + res = self.ch._fake_container_and_obj(cont, path.split('/')) + self.assertEqual(res, (cont, 'object')) + + def test_mpu_path(self): + cont = 'bucket+segments' + uploadid = 'MzNkYWZlNjItNjg3Yy00ZmIyLWIwOGYtOTA2OGVlZTA2MzA5' + + path = ('dir1/dir2/object/%s/1' % uploadid).split('/') + res = self.ch._fake_container_and_obj(cont, path, is_mpu=True) + self.assertEqual(res, (cont + '%2Fdir1%2Fdir2', + 'object/%s/1' % uploadid)) + + path = ('dir1/dir2/object/' + uploadid).split('/') + res = self.ch._fake_container_and_obj(cont, path, is_mpu=True) + self.assertEqual(res, (cont + '%2Fdir1%2Fdir2', + 'object/' + uploadid)) + + path = ('object/%s/1' % uploadid).split('/') + res = self.ch._fake_container_and_obj(cont, path, is_mpu=True) + self.assertEqual(res, (cont, + 'object/%s/1' % uploadid)) + + path = ('object/' + uploadid).split('/') + res = self.ch._fake_container_and_obj(cont, path, is_mpu=True) + self.assertEqual(res, (cont, + 'object/' + uploadid)) + + def test_upload_in_progress(self): + self.ch.conn.keys = mock.MagicMock( + return_value=['CS:a:bucket+segments:cnt:d1/d2/d3/']) + upload = ["obj/YmYwY2I1ZDYtNjMyYi00OGNiLWEzMzEtZDdhYTk0ODZkNWU2", + "root/MzNkYWZlNjItNjg3Yy00ZmIyLWIwOGYtOTA2OGVlZTA2MzA5"] + self.app.register( + 'GET', + '/v1/a/bucket+segments%2Fd1%2Fd2%2Fd3?prefix=&limit=10000&format=json', # noqa + swob.HTTPOk, {}, + json.dumps([{"hash": "d41d8cd98f00b204e9800998ecf8427e", + "last_modified": "2018-04-20T09:40:59.000000", + "bytes": 0, "name": upload[0], + "content_type": "application/octet-stream"}, + {"hash": "d41d8cd98f00b204e9800998ecf8427e", + "last_modified": "2018-04-20T09:40:59.000000", + "bytes": 400, "name": upload[0] + '/1', + "content_type": "application/octet-stream"}])) + self.app.register( + 'GET', + '/v1/a/bucket+segments?prefix=&limit=10000&format=json', + swob.HTTPOk, {}, + json.dumps([{"hash": "d41d8cd98f00b204e9800998ecf8427e", + "last_modified": "2018-04-20T09:40:59.000000", + "bytes": 0, "name": upload[1], + "content_type": "application/octet-stream"}, + {"hash": "d41d8cd98f00b204e9800998ecf8427e", + "last_modified": "2018-04-20T09:40:59.000000", + "bytes": 400, "name": upload[1] + '/1', + "content_type": "application/octet-stream"}])) + req = Request.blank('/v1/a/bucket+segments', + method='GET') + resp = self.call_ch(req) + + names = [item.get('name', item.get('subdir')) + for item in json.loads(resp[2])] + self.assertEqual(names, + ['d1/d2/d3/' + upload[0], + 'd1/d2/d3/' + upload[0] + '/1', + upload[1], + upload[1] + '/1']) + + def test_copy_headers(self): + self.app.register( + 'PUT', '/v1/a/bucket%2Fdir1/target', + swob.HTTPNoContent, {}, + ) + req = Request.blank( + '/v1/a/bucket/dir1/target', + method='PUT', + headers={'Oio-Copy-From': '/v1/a/bucket/sub1/source'}) + + resp = self.call_ch(req) + self.assertEqual(resp[0], '204 No Content') + self.assertEqual(self.app.headers[0]['Oio-Copy-From'], + "/v1%2Fa%2Fbucket%2Fsub1/source")