diff --git a/deker/locks.py b/deker/locks.py index 448e881..01887c3 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -294,12 +294,12 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: currently_locked = self.check_arrays_locks(arrays_positions, adapter, varray) if not currently_locked and len(self.locks) == len(arrays_positions): return + # Wait till there are no more read locks start_time = time.monotonic() while (time.monotonic() - start_time) <= adapter.ctx.config.write_lock_timeout: if not self.check_arrays_locks(arrays_positions, adapter, varray): return - sleep(adapter.ctx.config.write_lock_check_interval) # Release all locks self.release() diff --git a/tests/test_cases/test_concurrency/test_in_processes.py b/tests/test_cases/test_concurrency/test_in_processes.py index 5372394..1951583 100644 --- a/tests/test_cases/test_concurrency/test_in_processes.py +++ b/tests/test_cases/test_concurrency/test_in_processes.py @@ -6,6 +6,7 @@ correctly. """ import os +import time import traceback from multiprocessing import Event, Manager, Process, cpu_count @@ -18,12 +19,14 @@ import numpy as np import pytest +from deker_tools.slices import create_shape_from_slice, slice_converter + from tests.parameters.uri import embedded_uri from deker.arrays import Array, VArray from deker.client import Client from deker.collection import Collection -from deker.errors import DekerLockError +from deker.errors import DekerLockError, DekerVSubsetError from deker.locks import ( CollectionLock, Flock, @@ -34,7 +37,7 @@ ) from deker.schemas import ArraySchema, DimensionSchema, VArraySchema from deker.tools import get_paths -from deker.types import LocksExtensions +from deker.types import LocksExtensions, Numeric UPDATED_CUSTOM_ATTR = 20 @@ -74,6 +77,8 @@ def call_array_method( is_virtual: bool = False, primary_attributes: Dict = None, custom_attributes: Dict = None, + subset_slice: str = "[:]", + value: Numeric = 0, ): """Call a method on an Array object. @@ -87,9 +92,11 @@ def call_array_method( :param is_virtual: VArray flag :param primary_attributes: primary attributes dict :param custom_attributes: custom attributes dict + :param subset_slice: slice string from slice_converter to make a subset from array """ client = Client(uri, write_lock_timeout=1, loglevel="ERROR") collection = client.get_collection(collection_name) + subset_slice = slice_converter[subset_slice] # Get Array object if method == "create": @@ -109,6 +116,9 @@ def call_array_method( array = collection.filter({"id": id_}).last() except DekerLockError: return DekerLockError + except DekerVSubsetError: + return DekerVSubsetError + if is_virtual: lock = WriteVarrayLock schema = collection.varray_schema @@ -121,13 +131,17 @@ def call_array_method( with patch.object( lock, "release", wait_unlock(lock.release, lock_set, funcs_finished, wait) ): - array[:].update(np.zeros(shape=schema.shape)) + data = np.ndarray( + shape=create_shape_from_slice(schema.shape, subset_slice), dtype=schema.dtype + ) + data.fill(value) + array[subset_slice].update(data) elif method == "clear": with patch.object( lock, "release", wait_unlock(lock.release, lock_set, funcs_finished, wait) ): - array[:].clear() + array[subset_slice].clear() # Read used in varray only. elif method == "read": @@ -138,7 +152,7 @@ def call_array_method( wait_unlock(ReadArrayLock.release, lock_set, funcs_finished, wait), ): # patch timeout time - array[:].read() + array[subset_slice].read() elif method == "update_meta_custom_attributes": if wait: @@ -334,6 +348,149 @@ def test_varray_locks_inner_arrays( func_finished.set() except Exception: proc.kill() + raise + + def test_non_intersecting_vsubsets_update_OK( + self, inserted_varray: VArray, root_path, varray_collection: Collection + ): + """Test that as we lock varray, inner varrays also locked.""" + manager = Manager() + lock_set = manager.Event() + func_finished = manager.Event() + inserted_varray[:].clear() + value = -9999.9 + proc1 = Process( + target=call_array_method, + args=( + inserted_varray.collection, + str(embedded_uri(root_path)), + inserted_varray.id, + "update", + lock_set, + func_finished, + False, + True, + None, + None, + slice_converter[:2, :2, :2], + value, + ), + ) + proc2 = Process( + target=call_array_method, + args=( + inserted_varray.collection, + str(embedded_uri(root_path)), + inserted_varray.id, + "update", + lock_set, + func_finished, + False, + True, + None, + None, + slice_converter[8:, 8:, 8:], + value, + ), + ) + proc1.start() + proc2.start() + proc1.join() + proc2.join() + + try: + for slice_ in (np.index_exp[:2, :2, :2], np.index_exp[8:, 8:, 8:]): + subset = inserted_varray[slice_] + data = np.ndarray(shape=subset.shape, dtype=subset.dtype) + data.fill(value) + result = subset.read() + assert not proc1.is_alive() + assert not proc2.is_alive() + assert np.all(result == data) + assert not np.all(inserted_varray[:].read() == value) + except AssertionError: + raise + except Exception: + proc1.kill() + proc2.kill() + raise + finally: + print(inserted_varray[:].read(), flush=True) + + def test_intersecting_vsubsets_update_fail( + self, inserted_varray: VArray, root_path, varray_collection: Collection + ): + """Test that as we lock varray, inner varrays also locked.""" + manager = Manager() + lock_set = manager.Event() + func_finished = manager.Event() + inserted_varray[:].clear() + value = -9999.9 + slices = ( + np.index_exp[1:4, 1:2, 1:4], + np.index_exp[:2, 0:2, 0:2], + np.index_exp[3, 2, 1], + np.index_exp[:6, 0:6, 0:6], + ) + # Call read process to lock arrays for reading + proc = Process( + target=call_array_method, + args=( + inserted_varray.collection, + str(embedded_uri(root_path)), + inserted_varray.id, + "update", + lock_set, + func_finished, + True, + True, + None, + None, + slice_converter[:], + value, + ), + ) + # try: + proc.start() + # proc.join() + + lock_set.wait() + time.sleep(1) + try: + result = None + with Pool(WORKERS) as pool: + result = pool.starmap( + call_array_method, + [ + ( + inserted_varray.collection, + str(embedded_uri(root_path)), + inserted_varray.id, + "update", + lock_set, + func_finished, + False, + True, + None, + None, + slice_converter[subset_slice], + 100, + ) + for subset_slice in slices + ], + ) + print(111111111, result, flush=True) + # assert result.count(DekerLockError) == len(slices) + assert result.count(DekerVSubsetError) == len(slices) + func_finished.set() + except AssertionError: + raise + # except DekerVSubsetError: + # pass + finally: + print(inserted_varray[:].read()) + func_finished.set() + proc.kill() def test_varray_locks_release_arrays( self,