Skip to content

Commit

Permalink
tests for intersecting vsubsets
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Rybakov committed Dec 6, 2023
1 parent f11825d commit d5eca85
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 6 deletions.
2 changes: 1 addition & 1 deletion deker/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,12 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
currently_locked = self.check_arrays_locks(arrays_positions, adapter, varray)
if not currently_locked and len(self.locks) == len(arrays_positions):
return

# Wait till there are no more read locks
start_time = time.monotonic()
while (time.monotonic() - start_time) <= adapter.ctx.config.write_lock_timeout:
if not self.check_arrays_locks(arrays_positions, adapter, varray):
return

sleep(adapter.ctx.config.write_lock_check_interval)
# Release all locks
self.release()
Expand Down
167 changes: 162 additions & 5 deletions tests/test_cases/test_concurrency/test_in_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
correctly.
"""
import os
import time
import traceback

from multiprocessing import Event, Manager, Process, cpu_count
Expand All @@ -18,12 +19,14 @@
import numpy as np
import pytest

from deker_tools.slices import create_shape_from_slice, slice_converter

from tests.parameters.uri import embedded_uri

from deker.arrays import Array, VArray
from deker.client import Client
from deker.collection import Collection
from deker.errors import DekerLockError
from deker.errors import DekerLockError, DekerVSubsetError
from deker.locks import (
CollectionLock,
Flock,
Expand All @@ -34,7 +37,7 @@
)
from deker.schemas import ArraySchema, DimensionSchema, VArraySchema
from deker.tools import get_paths
from deker.types import LocksExtensions
from deker.types import LocksExtensions, Numeric


UPDATED_CUSTOM_ATTR = 20
Expand Down Expand Up @@ -74,6 +77,8 @@ def call_array_method(
is_virtual: bool = False,
primary_attributes: Dict = None,
custom_attributes: Dict = None,
subset_slice: str = "[:]",
value: Numeric = 0,
):
"""Call a method on an Array object.
Expand All @@ -87,9 +92,11 @@ def call_array_method(
:param is_virtual: VArray flag
:param primary_attributes: primary attributes dict
:param custom_attributes: custom attributes dict
:param subset_slice: slice string from slice_converter to make a subset from array
"""
client = Client(uri, write_lock_timeout=1, loglevel="ERROR")
collection = client.get_collection(collection_name)
subset_slice = slice_converter[subset_slice]

# Get Array object
if method == "create":
Expand All @@ -109,6 +116,9 @@ def call_array_method(
array = collection.filter({"id": id_}).last()
except DekerLockError:
return DekerLockError
except DekerVSubsetError:
return DekerVSubsetError

if is_virtual:
lock = WriteVarrayLock
schema = collection.varray_schema
Expand All @@ -121,13 +131,17 @@ def call_array_method(
with patch.object(
lock, "release", wait_unlock(lock.release, lock_set, funcs_finished, wait)
):
array[:].update(np.zeros(shape=schema.shape))
data = np.ndarray(
shape=create_shape_from_slice(schema.shape, subset_slice), dtype=schema.dtype
)
data.fill(value)
array[subset_slice].update(data)

elif method == "clear":
with patch.object(
lock, "release", wait_unlock(lock.release, lock_set, funcs_finished, wait)
):
array[:].clear()
array[subset_slice].clear()

# Read used in varray only.
elif method == "read":
Expand All @@ -138,7 +152,7 @@ def call_array_method(
wait_unlock(ReadArrayLock.release, lock_set, funcs_finished, wait),
):
# patch timeout time
array[:].read()
array[subset_slice].read()

elif method == "update_meta_custom_attributes":
if wait:
Expand Down Expand Up @@ -334,6 +348,149 @@ def test_varray_locks_inner_arrays(
func_finished.set()
except Exception:
proc.kill()
raise

def test_non_intersecting_vsubsets_update_OK(
self, inserted_varray: VArray, root_path, varray_collection: Collection
):
"""Test that as we lock varray, inner varrays also locked."""
manager = Manager()
lock_set = manager.Event()
func_finished = manager.Event()
inserted_varray[:].clear()
value = -9999.9
proc1 = Process(
target=call_array_method,
args=(
inserted_varray.collection,
str(embedded_uri(root_path)),
inserted_varray.id,
"update",
lock_set,
func_finished,
False,
True,
None,
None,
slice_converter[:2, :2, :2],
value,
),
)
proc2 = Process(
target=call_array_method,
args=(
inserted_varray.collection,
str(embedded_uri(root_path)),
inserted_varray.id,
"update",
lock_set,
func_finished,
False,
True,
None,
None,
slice_converter[8:, 8:, 8:],
value,
),
)
proc1.start()
proc2.start()
proc1.join()
proc2.join()

try:
for slice_ in (np.index_exp[:2, :2, :2], np.index_exp[8:, 8:, 8:]):
subset = inserted_varray[slice_]
data = np.ndarray(shape=subset.shape, dtype=subset.dtype)
data.fill(value)
result = subset.read()
assert not proc1.is_alive()
assert not proc2.is_alive()
assert np.all(result == data)
assert not np.all(inserted_varray[:].read() == value)
except AssertionError:
raise
except Exception:
proc1.kill()
proc2.kill()
raise
finally:
print(inserted_varray[:].read(), flush=True)

def test_intersecting_vsubsets_update_fail(
self, inserted_varray: VArray, root_path, varray_collection: Collection
):
"""Test that as we lock varray, inner varrays also locked."""
manager = Manager()
lock_set = manager.Event()
func_finished = manager.Event()
inserted_varray[:].clear()
value = -9999.9
slices = (
np.index_exp[1:4, 1:2, 1:4],
np.index_exp[:2, 0:2, 0:2],
np.index_exp[3, 2, 1],
np.index_exp[:6, 0:6, 0:6],
)
# Call read process to lock arrays for reading
proc = Process(
target=call_array_method,
args=(
inserted_varray.collection,
str(embedded_uri(root_path)),
inserted_varray.id,
"update",
lock_set,
func_finished,
True,
True,
None,
None,
slice_converter[:],
value,
),
)
# try:
proc.start()
# proc.join()

lock_set.wait()
time.sleep(1)
try:
result = None
with Pool(WORKERS) as pool:
result = pool.starmap(
call_array_method,
[
(
inserted_varray.collection,
str(embedded_uri(root_path)),
inserted_varray.id,
"update",
lock_set,
func_finished,
False,
True,
None,
None,
slice_converter[subset_slice],
100,
)
for subset_slice in slices
],
)
print(111111111, result, flush=True)
# assert result.count(DekerLockError) == len(slices)
assert result.count(DekerVSubsetError) == len(slices)
func_finished.set()
except AssertionError:
raise
# except DekerVSubsetError:
# pass
finally:
print(inserted_varray[:].read())
func_finished.set()
proc.kill()

def test_varray_locks_release_arrays(
self,
Expand Down

0 comments on commit d5eca85

Please sign in to comment.