From e3230cb13c8e44fb74e7159bac29f6df38796af0 Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Mon, 18 Dec 2023 10:56:17 +0100 Subject: [PATCH 1/4] Fixed Locks behaviour. - Fixed WriteVarrayLock - Fixed CreateArrayLock - Fixed related tests --- deker/locks.py | 71 ++++++++++++++----- .../test_concurrency/test_in_processes.py | 41 +++++++---- 2 files changed, 80 insertions(+), 32 deletions(-) diff --git a/deker/locks.py b/deker/locks.py index 01887c3..c822d44 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -23,7 +23,7 @@ from pathlib import Path from threading import get_native_id from time import sleep -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union, Tuple from uuid import uuid4 from deker.ABC.base_locks import BaseLock @@ -94,7 +94,14 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: dir_path = get_main_path(id_, self.instance.collection_path / self.instance.data_dir) path = dir_path / (id_ + self.instance.file_ext) - + for file in dir_path.iterdir(): + # Skip lock from current process. + if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"): + self.is_locked_with_varray = True + return + # If we've found another varray lock, that not from current process. + if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore + raise DekerLockError(f"Array {array} is locked with {file.name}") try: with open(path, "r") as f: fcntl.flock(f, fcntl.LOCK_SH | fcntl.LOCK_NB) @@ -129,6 +136,8 @@ class WriteArrayLock(BaseLock): ALLOWED_TYPES = ["LocalArrayAdapter"] + is_locked_with_varray: bool = False + def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: """Get path to the file for locking. @@ -147,9 +156,6 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: :param func_args: arguments of method call :param func_kwargs: keyword arguments of method call """ - # Increment write lock, to prevent more read locks coming. - self.acquire(self.get_path(func_args, func_kwargs)) - # Check current read locks array = func_kwargs.get("array") or func_args[1] # zero arg is 'self' dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir) @@ -159,11 +165,15 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: for file in dir_path.iterdir(): # Skip lock from current process. if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"): - continue + self.is_locked_with_varray = True + return # If we've found another varray lock, that not from current process. if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore raise DekerLockError(f"Array {array} is locked with {file.name}") + # Increment write lock, to prevent more read locks coming. + self.acquire(self.get_path(func_args, func_kwargs)) + # Pattern that has to find any read locks glob_pattern = f"{array.id}:*{LocksExtensions.array_read_lock.value}" @@ -179,6 +189,17 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: self.release() raise DekerLockError(f"Array {array} is locked with read locks") + def release(self, e: Optional[Exception] = None) -> None: + if self.is_locked_with_varray: + return + + super().release(e) + + def acquire(self, path: Optional[Path]) -> None: + if self.is_locked_with_varray: + return + super().acquire(path) + class WriteVarrayLock(BaseLock): """Write lock for VArrays. @@ -192,7 +213,7 @@ class WriteVarrayLock(BaseLock): ALLOWED_TYPES = ["VSubset"] # Locks that have been acquired by varray - locks: List[Path] = [] + locks: List[Tuple[Flock, Path]] = [] skip_lock: bool = False # shows that we must skip this lock (e.g server adapters for subset) def check_type(self) -> None: @@ -228,9 +249,9 @@ def check_locks_for_array_and_set_flock(self, filename: Path) -> Flock: """ # Check read lock first array_id = filename.name.split(".")[0] - glob_pattern = f"{array_id}:*{LocksExtensions.array_read_lock.value}" + glob_pattern = f"{array_id}:*" for _ in filename.parent.rglob(glob_pattern): - raise DekerLockError(f"Array {array_id} is locked on read") + raise DekerLockError(f"Array {array_id} is locked") # Check write lock and set it lock = Flock(filename) @@ -267,8 +288,8 @@ def check_arrays_locks( # Path to the main file (not symlink) filename = filename.resolve() try: - self.check_locks_for_array_and_set_flock(filename) - self.locks.append(filename) + lock = self.check_locks_for_array_and_set_flock(filename) + self.locks.append((lock, filename)) except DekerLockError: currently_locked.append(filename) @@ -292,7 +313,8 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: # Iterate over Arrays in VArray and try to lock them. If locking fails - wait. # If it fails again - release all locks. currently_locked = self.check_arrays_locks(arrays_positions, adapter, varray) - if not currently_locked and len(self.locks) == len(arrays_positions): + if not currently_locked and (len(self.locks) == len(arrays_positions)): + # Release array locks return # Wait till there are no more read locks @@ -311,9 +333,11 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002] :param e: Exception that may have been raised. """ # Release array locks - for lock in self.locks: - Flock(lock).release() - Path(f"{lock}:{os.getpid()}{LocksExtensions.varray_lock.value}").unlink(missing_ok=True) + for lock, filename in self.locks: + lock.release() + Path(f"{filename}:{os.getpid()}{LocksExtensions.varray_lock.value}").unlink( + missing_ok=True + ) super().release() def acquire(self, path: Optional[Path]) -> None: @@ -346,7 +370,8 @@ class CreateArrayLock(BaseLock): """Lock that we set when we want to create an array.""" ALLOWED_TYPES = ["LocalArrayAdapter", "LocalVArrayAdapter"] - + + path: Optional[Path] = None def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: """Return path to the file that should be locked. @@ -357,7 +382,7 @@ def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: array = func_kwargs.get("array") or func_args[1] # zero arg is 'self' # Get file directory path - dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir) + dir_path = self.instance.collection_path filename = META_DIVIDER.join( [ f"{array.id}", @@ -366,8 +391,12 @@ def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: f"{get_native_id()}", ] ) - # Create read lock file + # Create lock file path = dir_path / f"{filename}{LocksExtensions.array_lock.value}" + if not path.exists(): + path.open("w").close() + + self.path = path self.logger.debug(f"got path for array.id {array.id} lock file: {path}") return path @@ -390,7 +419,7 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: array_type = Array if adapter == self.ALLOWED_TYPES[0] else VArray array = array_type(**array) - dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir) + dir_path = self.instance.collection_path # Pattern that has to find any create locks glob_pattern = f"{array.id}:*{LocksExtensions.array_lock.value}" @@ -414,6 +443,10 @@ def get_result(self, func: Callable, args: Any, kwargs: Any) -> Any: else: result = func(*args, **kwargs) return result + + def release(self, e: Optional[Exception] = None) -> None: + self.path.unlink(missing_ok=True) + super().release(e) class UpdateMetaAttributeLock(BaseLock): diff --git a/tests/test_cases/test_concurrency/test_in_processes.py b/tests/test_cases/test_concurrency/test_in_processes.py index ca1fa8e..b70ba6b 100644 --- a/tests/test_cases/test_concurrency/test_in_processes.py +++ b/tests/test_cases/test_concurrency/test_in_processes.py @@ -12,8 +12,10 @@ from multiprocessing import Event, Manager, Process, cpu_count from multiprocessing.pool import Pool from pathlib import Path +from threading import get_native_id from typing import Callable, Dict, Literal from unittest.mock import patch +from uuid import uuid4 import h5py import numpy as np @@ -131,7 +133,8 @@ def call_array_method( lock, "release", wait_unlock(lock.release, lock_set, funcs_finished, wait) ): data = np.ndarray( - shape=create_shape_from_slice(schema.shape, subset_slice), dtype=schema.dtype + shape=create_shape_from_slice(schema.shape, subset_slice), + dtype=schema.dtype, ) try: if data.shape == (): @@ -325,7 +328,7 @@ def test_varray_lock_wait_till_read_timeout( def test_varray_locks_inner_arrays( self, inserted_varray: VArray, root_path, varray_collection: Collection ): - """Test that as we lock varray, inner varrays also locked.""" + """Test that as we lock varray, inner arrays also locked.""" manager = Manager() lock_set = manager.Event() func_finished = manager.Event() @@ -346,14 +349,10 @@ def test_varray_locks_inner_arrays( ) proc.start() lock_set.wait() - try: - for array in varray_collection.arrays: - with pytest.raises(DekerLockError): - array[:].update(np.zeros(shape=varray_collection.array_schema.shape)) - func_finished.set() - except Exception: - proc.kill() - raise + with pytest.raises(DekerLockError): + for _ in varray_collection.arrays: + pass + func_finished.set() def test_non_intersecting_vsubsets_update_OK( self, inserted_varray: VArray, root_path, varray_collection: Collection @@ -465,8 +464,6 @@ def test_intersecting_vsubsets_update_fail( raise finally: func_finished.set() - proc.kill() - print(111111111, result, flush=True) try: assert result.count(DekerLockError) == len(slices) - 1 check_data[blocking_slice].fill(blocking_value) @@ -659,10 +656,27 @@ def test_array_with_attributes_create_multiple_processes( manager = Manager() lock_set = manager.Event() func_finished = manager.Event() + proc = Process( + target=call_array_method, + args=( + array_with_attributes.collection, + str(embedded_uri(root_path)), + array_with_attributes.id, + "create", + lock_set, + func_finished, + True, + False, + array_with_attributes.primary_attributes, + array_with_attributes.custom_attributes, + ), + ) + proc.start() + lock_set.wait() methods = ["create"] * 3 with Pool(WORKERS - 1) as pool: - pool.starmap( + result = pool.starmap( call_array_method, [ ( @@ -681,6 +695,7 @@ def test_array_with_attributes_create_multiple_processes( ], ) lock_set.wait() + assert result.count(DekerLockError) == len(methods) func_finished.set() paths = get_paths( From 11e500960ff104cc8033ce58f17e6c51f93b2b0f Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Mon, 18 Dec 2023 10:57:10 +0100 Subject: [PATCH 2/4] Fixed Locks behaviour. - Fixed WriteVarrayLock - Fixed CreateArrayLock - Fixed related tests --- deker/locks.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/deker/locks.py b/deker/locks.py index c822d44..3df05ce 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -370,8 +370,9 @@ class CreateArrayLock(BaseLock): """Lock that we set when we want to create an array.""" ALLOWED_TYPES = ["LocalArrayAdapter", "LocalVArrayAdapter"] - - path: Optional[Path] = None + + path: Optional[Path] = None + def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: """Return path to the file that should be locked. @@ -395,7 +396,7 @@ def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: path = dir_path / f"{filename}{LocksExtensions.array_lock.value}" if not path.exists(): path.open("w").close() - + self.path = path self.logger.debug(f"got path for array.id {array.id} lock file: {path}") return path @@ -443,7 +444,7 @@ def get_result(self, func: Callable, args: Any, kwargs: Any) -> Any: else: result = func(*args, **kwargs) return result - + def release(self, e: Optional[Exception] = None) -> None: self.path.unlink(missing_ok=True) super().release(e) From b9dd3922d52aeab40c4865e2c434df1eceb68a0d Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Mon, 18 Dec 2023 11:11:15 +0100 Subject: [PATCH 3/4] Linters --- deker/locks.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/deker/locks.py b/deker/locks.py index 3df05ce..1a789bf 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -190,12 +190,22 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: raise DekerLockError(f"Array {array} is locked with read locks") def release(self, e: Optional[Exception] = None) -> None: + """Release Flock. + + If array is locked from Varary from current Process, do nothing + :param e: exception that might have been raised + """ if self.is_locked_with_varray: return super().release(e) def acquire(self, path: Optional[Path]) -> None: + """Make lock using flock. + + If array is locked from Varary from current Process, do nothing + :param path: Path to file that should be flocked + """ if self.is_locked_with_varray: return super().acquire(path) From 64acb85842e687dc4e2fa8b500f14e7b832c2b23 Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Mon, 18 Dec 2023 13:41:59 +0100 Subject: [PATCH 4/4] Linters & Create Tests --- deker/locks.py | 4 ++++ tests/test_cases/test_concurrency/test_in_processes.py | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/deker/locks.py b/deker/locks.py index 1a789bf..ac1053a 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -456,6 +456,10 @@ def get_result(self, func: Callable, args: Any, kwargs: Any) -> Any: return result def release(self, e: Optional[Exception] = None) -> None: + """Release Flock. + + :param e: exception that might have been raised + """ self.path.unlink(missing_ok=True) super().release(e) diff --git a/tests/test_cases/test_concurrency/test_in_processes.py b/tests/test_cases/test_concurrency/test_in_processes.py index b70ba6b..ccb0e5b 100644 --- a/tests/test_cases/test_concurrency/test_in_processes.py +++ b/tests/test_cases/test_concurrency/test_in_processes.py @@ -36,6 +36,7 @@ UpdateMetaAttributeLock, WriteArrayLock, WriteVarrayLock, + CreateArrayLock, ) from deker.schemas import ArraySchema, DimensionSchema, VArraySchema from deker.tools import get_paths @@ -104,7 +105,9 @@ def call_array_method( # Get Array object if method == "create": with patch.object( - Flock, "release", wait_unlock(Flock.release, lock_set, funcs_finished, wait) + CreateArrayLock, + "release", + wait_unlock(CreateArrayLock.release, lock_set, funcs_finished, wait), ): with patch("deker.ABC.base_array.get_id", lambda *a: id_): try: