Skip to content

Commit

Permalink
CH: fix container name used for MPU
Browse files Browse the repository at this point in the history
This commit will allow to store Parts into
container dedicated to directory

Previously, a MultiPartUpload to /bucket/dir/dir2/object
was storing parts into /bucket/dir/dir2/object/uploadid/
container with object name 1, 2, 3...

With this fix, parts will be stored in /bucket/dir/dir2/
with parts named object/uploadid/1, object/uploadid/2, ...
  • Loading branch information
murlock committed Nov 16, 2018
1 parent 8b3e4d7 commit 47efe61
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 49 deletions.
139 changes: 103 additions & 36 deletions oioswift/common/middleware/container_hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import json
import importlib
from paste.deploy import loadwsgi
Expand Down Expand Up @@ -183,13 +184,13 @@ def _create_key(self, req, account, container, mode, path):
LOG.warn("%s: failed to create key %s", self.SWIFT_SOURCE, key)

def _remove_key(self, req, account, container, mode, path):
key = self.key(account, container, mode, path)
key = self.key(account, container, mode, path) + '/'
if mode == CNT:
# remove container key only if empty
empty = not any(self._list_objects(
req.environ.copy(),
account,
[container] + path.split('/')[:-1],
[container] + path.split('/'),
None,
limit=1))

Expand All @@ -201,10 +202,50 @@ def _remove_key(self, req, account, container, mode, path):
if not res:
LOG.warn("%s: failed to remove key %s", self.SWIFT_SOURCE, key)

def _build_object_listing_mpu(self, start_response, env,
account, container, prefix,
limit=None,
recursive=False, marker=None):
"""Implement specific listing for MPU"""

LOG.debug("%s: MPU listing with %s %s %s %s %s %s",
self.SWIFT_SOURCE, account, container, prefix, limit,
recursive, marker)

# FIXME(mb): should use marker for big MPU
# but it doesn't seems to manage truncated MPU listing in swift3
prefix = prefix[0] if prefix else ''
path, _ = self._path(prefix.split('/'), True)
mpu_prefix = prefix[len(path):].lstrip('/')

def header_cb(header_dict):
oheaders.update(header_dict)

oheaders = dict()
ct_parts = [container]
if path:
ct_parts += path.strip('/').split('/')
ret = self._list_objects(env, account, ct_parts, header_cb,
mpu_prefix, limit=DEFAULT_LIMIT,
marker=marker)

all_objs = list(ret)
all_objs.sort(key=lambda entry: entry.get('name', entry.get('subdir')))
body = json.dumps(all_objs)

oheaders['Content-Length'] = len(body)
start_response("200 OK", oheaders.items())
return [body]

def _build_object_listing(self, start_response, env,
account, container, prefix,
limit=None,
recursive=False, marker=None):
recursive=False, marker=None, is_mpu=False):

if is_mpu:
return self._build_object_listing_mpu(start_response, env, account,
container, prefix, limit,
recursive, marker)

LOG.debug("%s: listing with %s %s %s %s %s %s",
self.SWIFT_SOURCE, account, container, prefix, limit,
Expand All @@ -216,7 +257,7 @@ def header_cb(header_dict):
oheaders = dict()
all_objs = []

prefix = prefix[0]
prefix = prefix[0] if prefix else ''

# have we to parse root container ?
# / must be absent from prefix AND marker
Expand Down Expand Up @@ -337,7 +378,25 @@ def header_cb(header_dict):

return [body]

def _fake_container_and_obj(self, container, obj_parts, is_listing=False):
def _path(self, obj_parts, is_mpu, sep=None):
if not sep:
sep = self.DELIMITER

# FIXME(mbo): a proper HEADER should be added in swift3 controller
# to help recognize manifest / part
if is_mpu:
for i, item in reversed(list(enumerate(obj_parts))):
if len(item) > 32:
try:
base64.b64decode(item)
return sep.join(obj_parts[:i-1]), i
except TypeError:
pass
LOG.error("MPU fails to detect UploadId")
return sep.join(obj_parts[:-1]), len(obj_parts)

def _fake_container_and_obj(self, container, obj_parts, is_listing=False,
is_mpu=False):
"""
Aggregate object parts (except the last) into the container name.
Expand All @@ -348,9 +407,12 @@ def _fake_container_and_obj(self, container, obj_parts, is_listing=False):
[container] + obj_parts[:-2])
obj = obj_parts[-2] + self.DELIMITER
else:
container = self.ENCODED_DELIMITER.join(
[container] + obj_parts[:-1])
obj = obj_parts[-1] if obj_parts else ''
cnt, idx = self._path(obj_parts, is_mpu,
sep=self.ENCODED_DELIMITER)
if cnt:
container += self.ENCODED_DELIMITER + cnt
obj = obj_parts[idx-1:] if obj_parts else ''
obj = '/'.join(obj)
return container, obj

def _list_objects(self, env, account, ct_parts, header_cb,
Expand All @@ -369,14 +431,15 @@ def _list_objects(self, env, account, ct_parts, header_cb,
body='',
swift_source=self.SWIFT_SOURCE)
params = sub_req.params
params['delimiter'] = self.DELIMITER
params.pop('delimiter', None) # allow list-multipart-uploads
params['limit'] = str(limit) # FIXME: why is it str?
params['prefix'] = prefix
params['format'] = 'json'
if marker:
params['marker'] = marker
else:
params.pop('marker', None)

sub_req.params = params
resp = sub_req.get_response(self.app)
obj_prefix = ''
Expand All @@ -402,6 +465,16 @@ def should_bypass(self, env):
return (env.get('REQUEST_METHOD') == 'TEST' or
super(ContainerHierarchyMiddleware, self).should_bypass(env))

def update_copy_headers(self, req, env2):
if 'Oio-Copy-From' in req.headers and req.method == 'PUT':
# TODO(mb): check if MPU is used here (with upload-part-copy)
_, c_container, c_obj = req.headers['Oio-Copy-From'].split('/', 2)
c_container, c_obj = \
self._fake_container_and_obj(c_container, c_obj.split('/'))
# update Headers
req.headers['Oio-Copy-From'] = '/' + c_container + '/' + c_obj
env2['HTTP_OIO_COPY_FROM'] = '/' + c_container + '/' + c_obj

def __call__(self, env, start_response):
if self.should_bypass(env):
return self.app(env, start_response)
Expand All @@ -416,31 +489,27 @@ def __call__(self, env, start_response):
env2 = env.copy()
qs = parse_qs(req.query_string or '')
prefix = qs.get('prefix') # returns a list or None
if not prefix:
prefix = ['']
marker = qs.get('marker')
limit = qs.get('limit')

# if obj and prefix are None with container+segments, we want the
# normal listing because it is the list-multipart-uploads operation
is_mpu = container.endswith("+segments") and (obj is not None
or prefix is not None)

LOG.debug("%s: Got %s request for container=%s, "
"obj=%s, prefix=%s marker=%s",
self.SWIFT_SOURCE, req.method, container, obj, prefix,
marker)
"obj=%s, prefix=%s marker=%s is_mpu=%d",
self.SWIFT_SOURCE, req.method, container, obj,
prefix, marker, is_mpu)
must_recurse = False
obj_parts = ()

if 'Oio-Copy-From' in req.headers and req.method == 'PUT':
_, c_container, c_obj = req.headers['Oio-Copy-From'].split('/', 2)
c_container, c_obj = \
self._fake_container_and_obj(c_container, c_obj.split('/'))
# update Headers
req.headers['Oio-Copy-From'] = '/' + c_container + '/' + c_obj
env2['HTTP_OIO_COPY_FROM'] = '/' + c_container + '/' + c_obj
self.update_copy_headers(req, env2)

if obj is None:
LOG.debug("%s: -> is a listing request", self.SWIFT_SOURCE)
must_recurse = req.method == 'GET' and 'delimiter' not in qs
if not marker:
marker = None
else:
if marker:
marker = marker[0]
if not limit:
limit = DEFAULT_LIMIT
Expand All @@ -452,13 +521,14 @@ def __call__(self, env, start_response):
LOG.debug("%s: -> is NOT listing request", self.SWIFT_SOURCE)
obj_parts = obj.split(self.DELIMITER)
if len(obj_parts) > 1:
path = self.DELIMITER.join(obj_parts[:-1]) + self.DELIMITER
# path = self.DELIMITER.join(obj_parts[:-1]) + self.DELIMITER
path, _ = self._path(obj_parts, is_mpu)
is_dir = obj.endswith(self.DELIMITER)
# TODO (MBO) we should accept create key d1/d2/ only
# for empty object
if req.method == 'PUT':
self._create_key(req, account, container,
OBJ if is_dir else CNT, path)
OBJ if is_dir else CNT, path + '/')
if is_dir:
oheaders = {'Content-Length': 0,
'Etag': 'd41d8cd98f00b204e9800998ecf8427e'}
Expand All @@ -473,20 +543,17 @@ def __call__(self, env, start_response):
return ['']

container2, obj2 = self._fake_container_and_obj(container,
obj_parts)
obj_parts,
is_mpu=is_mpu)

LOG.debug("%s: Converted to container=%s, obj=%s, qs=%s",
self.SWIFT_SOURCE, container2, obj2, qs)
if must_recurse:
res = self._build_object_listing(start_response, env,
account, container2, prefix,
limit=limit, recursive=True,
marker=marker)
elif qs.get('prefix') or qs.get('delimiter'):
if must_recurse or prefix or qs.get('delimiter'):
res = self._build_object_listing(start_response, env,
account, container2, prefix,
limit=limit,
recursive=False, marker=marker)
limit=limit, marker=marker,
recursive=must_recurse,
is_mpu=is_mpu)
else:
# should be other operation that listing
if obj:
Expand All @@ -496,7 +563,7 @@ def __call__(self, env, start_response):
env2['PATH_INFO'] = "/v1/%s/%s" % (account, container2)
res = self.app(env2, start_response)

if req.method == 'DELETE' and len(obj_parts) > 1:
if req.method == 'DELETE' and not is_mpu and len(obj_parts) > 1:
# only remove marker
self._remove_key(req, account, container, CNT, path)
return res
Expand Down
Loading

0 comments on commit 47efe61

Please sign in to comment.