Skip to content

Commit

Permalink
Fix concurrent update
Browse files Browse the repository at this point in the history
Gateways will fail when unable to store requests in the config state object.
Resolve the issue of the gateway is not up to date with the latest changes in the config object,
i.e. its local version is different from the config object version in Ceph

https://ibm.ent.box.com/file/1262684610455?s=t2pbpvjr2082u5bvur8xwwaof1brw2al

Signed-off-by: Alexander Indenbaum <[email protected]>
  • Loading branch information
Alexander Indenbaum committed Sep 10, 2023
1 parent 69db8d8 commit a04b692
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 44 deletions.
74 changes: 44 additions & 30 deletions control/omap.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,43 +80,57 @@ def _notify(self) -> None:
except Exception as ex:
self.logger.info(f"Failed to notify.")

def _update(self) -> None:
self.cached = self.get()
self.version = int(self.cached[OmapObject.OMAP_VERSION_KEY])

def add_key(self, key: str, val: str) -> None:
"""Adds key and value to the OMAP."""
try:
version_update = self.version + 1
with rados.WriteOpCtx() as write_op:
# Compare operation failure will cause write failure
write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version),
rados.LIBRADOS_CMPXATTR_OP_EQ)
self.ioctx.set_omap(write_op, (key,), (val,))
self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,),
(str(version_update),))
self.ioctx.operate_write_op(write_op, self.name)
self.version = version_update
self.logger.debug(f"omap_key generated: {key}")
except Exception:
self.logger.exception(f"Unable to add {key=} {val=} to omap:")
raise
while True:
try:
version_update = self.version + 1
with rados.WriteOpCtx() as write_op:
# Compare operation failure will cause write failure
write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version),
rados.LIBRADOS_CMPXATTR_OP_EQ)
self.ioctx.set_omap(write_op, (key,), (val,))
self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,),
(str(version_update),))
self.ioctx.operate_write_op(write_op, self.name)
self.version = version_update
self.logger.info(f"omap object {self.name} add_key: {key}")
break
except rados.OSError:
# this exception happens due to object being out of date,
# for instance due to update from another gateway.
# read new object's version and retry
self.logger.debug(f"omap object {self.name} failed to remove_key {key=}: ")
self._update()

self._notify()

def remove_key(self, key: str) -> None:
"""Removes key from the OMAP."""
try:
version_update = self.version + 1
with rados.WriteOpCtx() as write_op:
# Compare operation failure will cause remove failure
write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version),
rados.LIBRADOS_CMPXATTR_OP_EQ)
self.ioctx.remove_omap_keys(write_op, (key,))
self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,),
(str(version_update),))
self.ioctx.operate_write_op(write_op, self.name)
self.version = version_update
self.logger.debug(f"omap_key removed: {key}")
except Exception:
self.logger.exception(f"Unable to remove key from omap:")
raise
while True:
try:
version_update = self.version + 1
with rados.WriteOpCtx() as write_op:
# Compare operation failure will cause remove failure
write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version),
rados.LIBRADOS_CMPXATTR_OP_EQ)
self.ioctx.remove_omap_keys(write_op, (key,))
self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,),
(str(version_update),))
self.ioctx.operate_write_op(write_op, self.name)
self.version = version_update
self.logger.info(f"omap object {self.name} remove_key: {key}")
break
except rados.OSError:
# this exception happens due to object being out of date,
# for instance due to update from another gateway.
# read new object's version and retry
self.logger.debug(f"omap object {self.name} failed to remove_key {key=}: ")
self._update()

self._notify()

Expand Down
21 changes: 7 additions & 14 deletions control/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,11 @@ def controllerid_range(self, gateway_name: str) -> (int, int):
start, end = gateway_range[0]
return (int(start), int(end))

while True:
try:
keys = [key for key in self.obj.get().keys() if key != self.obj.OMAP_VERSION_KEY]
ranges = [self._parse_range(range) for range in keys]
start, end = self._calculate_non_overlapping_range(ranges, range_length)
self.obj.add_key(f"{GatewayState.RANGE_PREFIX}{start}_{end}", gateway_name)
return ( start, end )
except rados.OSError:
self.logger.exception(f"Failed to add range for {gateway_name=}, {start=} {end=}, retrying: ")
self.obj.cached = self.obj.get()
self.obj.version = int(self.obj.cached[OmapObject.OMAP_VERSION_KEY])
continue
except Exception:
raise
keys = [key for key in self.obj.cached.keys() if key != self.obj.OMAP_VERSION_KEY]
ranges = [self._parse_range(range) for range in keys]
start, end = self._calculate_non_overlapping_range(ranges, range_length)
self.obj.add_key(f"{GatewayState.RANGE_PREFIX}{start}_{end}", gateway_name)
return ( start, end )

def _parse_range(self, range: str) -> (int, int):
start, end = range.removeprefix(GatewayState.RANGE_PREFIX).split("_")
Expand Down Expand Up @@ -238,6 +229,8 @@ def __init__(self, config: GatewayConfig, state: OmapGatewayState,
self.update_interval = 1
self.use_notify = self.config.getboolean("gateway",
"state_update_notify")
# set this instance of GatewayStateHandler as updater for the spdk object
self.state.spdk.obj._update = self.update

def start_update(self) -> None:
"""Initiates periodic polling and watch/notify for updates."""
Expand Down

0 comments on commit a04b692

Please sign in to comment.