Skip to content

Commit

Permalink
Merge pull request #32 from openweathermap/feature/locs-fix
Browse files Browse the repository at this point in the history
Locks fix
  • Loading branch information
SerGeRybakov authored Dec 18, 2023
2 parents b14d7a7 + 64acb85 commit 3175cf1
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 32 deletions.
84 changes: 66 additions & 18 deletions deker/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pathlib import Path
from threading import get_native_id
from time import sleep
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union, Tuple
from uuid import uuid4

from deker.ABC.base_locks import BaseLock
Expand Down Expand Up @@ -94,7 +94,14 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:

dir_path = get_main_path(id_, self.instance.collection_path / self.instance.data_dir)
path = dir_path / (id_ + self.instance.file_ext)

for file in dir_path.iterdir():
# Skip lock from current process.
if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"):
self.is_locked_with_varray = True
return
# If we've found another varray lock, that not from current process.
if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore
raise DekerLockError(f"Array {array} is locked with {file.name}")
try:
with open(path, "r") as f:
fcntl.flock(f, fcntl.LOCK_SH | fcntl.LOCK_NB)
Expand Down Expand Up @@ -129,6 +136,8 @@ class WriteArrayLock(BaseLock):

ALLOWED_TYPES = ["LocalArrayAdapter"]

is_locked_with_varray: bool = False

def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
"""Get path to the file for locking.
Expand All @@ -147,9 +156,6 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
:param func_args: arguments of method call
:param func_kwargs: keyword arguments of method call
"""
# Increment write lock, to prevent more read locks coming.
self.acquire(self.get_path(func_args, func_kwargs))

# Check current read locks
array = func_kwargs.get("array") or func_args[1] # zero arg is 'self'
dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir)
Expand All @@ -159,11 +165,15 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
for file in dir_path.iterdir():
# Skip lock from current process.
if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"):
continue
self.is_locked_with_varray = True
return
# If we've found another varray lock, that not from current process.
if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore
raise DekerLockError(f"Array {array} is locked with {file.name}")

# Increment write lock, to prevent more read locks coming.
self.acquire(self.get_path(func_args, func_kwargs))

# Pattern that has to find any read locks
glob_pattern = f"{array.id}:*{LocksExtensions.array_read_lock.value}"

Expand All @@ -179,6 +189,27 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
self.release()
raise DekerLockError(f"Array {array} is locked with read locks")

def release(self, e: Optional[Exception] = None) -> None:
"""Release Flock.
If array is locked from Varary from current Process, do nothing
:param e: exception that might have been raised
"""
if self.is_locked_with_varray:
return

super().release(e)

def acquire(self, path: Optional[Path]) -> None:
"""Make lock using flock.
If array is locked from Varary from current Process, do nothing
:param path: Path to file that should be flocked
"""
if self.is_locked_with_varray:
return
super().acquire(path)


class WriteVarrayLock(BaseLock):
"""Write lock for VArrays.
Expand All @@ -192,7 +223,7 @@ class WriteVarrayLock(BaseLock):
ALLOWED_TYPES = ["VSubset"]

# Locks that have been acquired by varray
locks: List[Path] = []
locks: List[Tuple[Flock, Path]] = []
skip_lock: bool = False # shows that we must skip this lock (e.g server adapters for subset)

def check_type(self) -> None:
Expand Down Expand Up @@ -228,9 +259,9 @@ def check_locks_for_array_and_set_flock(self, filename: Path) -> Flock:
"""
# Check read lock first
array_id = filename.name.split(".")[0]
glob_pattern = f"{array_id}:*{LocksExtensions.array_read_lock.value}"
glob_pattern = f"{array_id}:*"
for _ in filename.parent.rglob(glob_pattern):
raise DekerLockError(f"Array {array_id} is locked on read")
raise DekerLockError(f"Array {array_id} is locked")

# Check write lock and set it
lock = Flock(filename)
Expand Down Expand Up @@ -267,8 +298,8 @@ def check_arrays_locks(
# Path to the main file (not symlink)
filename = filename.resolve()
try:
self.check_locks_for_array_and_set_flock(filename)
self.locks.append(filename)
lock = self.check_locks_for_array_and_set_flock(filename)
self.locks.append((lock, filename))
except DekerLockError:
currently_locked.append(filename)

Expand All @@ -292,7 +323,8 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
# Iterate over Arrays in VArray and try to lock them. If locking fails - wait.
# If it fails again - release all locks.
currently_locked = self.check_arrays_locks(arrays_positions, adapter, varray)
if not currently_locked and len(self.locks) == len(arrays_positions):
if not currently_locked and (len(self.locks) == len(arrays_positions)):
# Release array locks
return

# Wait till there are no more read locks
Expand All @@ -311,9 +343,11 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002]
:param e: Exception that may have been raised.
"""
# Release array locks
for lock in self.locks:
Flock(lock).release()
Path(f"{lock}:{os.getpid()}{LocksExtensions.varray_lock.value}").unlink(missing_ok=True)
for lock, filename in self.locks:
lock.release()
Path(f"{filename}:{os.getpid()}{LocksExtensions.varray_lock.value}").unlink(
missing_ok=True
)
super().release()

def acquire(self, path: Optional[Path]) -> None:
Expand Down Expand Up @@ -347,6 +381,8 @@ class CreateArrayLock(BaseLock):

ALLOWED_TYPES = ["LocalArrayAdapter", "LocalVArrayAdapter"]

path: Optional[Path] = None

def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
"""Return path to the file that should be locked.
Expand All @@ -357,7 +393,7 @@ def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
array = func_kwargs.get("array") or func_args[1] # zero arg is 'self'

# Get file directory path
dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir)
dir_path = self.instance.collection_path
filename = META_DIVIDER.join(
[
f"{array.id}",
Expand All @@ -366,8 +402,12 @@ def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
f"{get_native_id()}",
]
)
# Create read lock file
# Create lock file
path = dir_path / f"{filename}{LocksExtensions.array_lock.value}"
if not path.exists():
path.open("w").close()

self.path = path
self.logger.debug(f"got path for array.id {array.id} lock file: {path}")
return path

Expand All @@ -390,7 +430,7 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
array_type = Array if adapter == self.ALLOWED_TYPES[0] else VArray
array = array_type(**array)

dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir)
dir_path = self.instance.collection_path

# Pattern that has to find any create locks
glob_pattern = f"{array.id}:*{LocksExtensions.array_lock.value}"
Expand All @@ -415,6 +455,14 @@ def get_result(self, func: Callable, args: Any, kwargs: Any) -> Any:
result = func(*args, **kwargs)
return result

def release(self, e: Optional[Exception] = None) -> None:
"""Release Flock.
:param e: exception that might have been raised
"""
self.path.unlink(missing_ok=True)
super().release(e)


class UpdateMetaAttributeLock(BaseLock):
"""Lock for updating meta."""
Expand Down
46 changes: 32 additions & 14 deletions tests/test_cases/test_concurrency/test_in_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
from multiprocessing import Event, Manager, Process, cpu_count
from multiprocessing.pool import Pool
from pathlib import Path
from threading import get_native_id
from typing import Callable, Dict, Literal
from unittest.mock import patch
from uuid import uuid4

import h5py
import numpy as np
Expand All @@ -34,6 +36,7 @@
UpdateMetaAttributeLock,
WriteArrayLock,
WriteVarrayLock,
CreateArrayLock,
)
from deker.schemas import ArraySchema, DimensionSchema, VArraySchema
from deker.tools import get_paths
Expand Down Expand Up @@ -102,7 +105,9 @@ def call_array_method(
# Get Array object
if method == "create":
with patch.object(
Flock, "release", wait_unlock(Flock.release, lock_set, funcs_finished, wait)
CreateArrayLock,
"release",
wait_unlock(CreateArrayLock.release, lock_set, funcs_finished, wait),
):
with patch("deker.ABC.base_array.get_id", lambda *a: id_):
try:
Expand Down Expand Up @@ -131,7 +136,8 @@ def call_array_method(
lock, "release", wait_unlock(lock.release, lock_set, funcs_finished, wait)
):
data = np.ndarray(
shape=create_shape_from_slice(schema.shape, subset_slice), dtype=schema.dtype
shape=create_shape_from_slice(schema.shape, subset_slice),
dtype=schema.dtype,
)
try:
if data.shape == ():
Expand Down Expand Up @@ -325,7 +331,7 @@ def test_varray_lock_wait_till_read_timeout(
def test_varray_locks_inner_arrays(
self, inserted_varray: VArray, root_path, varray_collection: Collection
):
"""Test that as we lock varray, inner varrays also locked."""
"""Test that as we lock varray, inner arrays also locked."""
manager = Manager()
lock_set = manager.Event()
func_finished = manager.Event()
Expand All @@ -346,14 +352,10 @@ def test_varray_locks_inner_arrays(
)
proc.start()
lock_set.wait()
try:
for array in varray_collection.arrays:
with pytest.raises(DekerLockError):
array[:].update(np.zeros(shape=varray_collection.array_schema.shape))
func_finished.set()
except Exception:
proc.kill()
raise
with pytest.raises(DekerLockError):
for _ in varray_collection.arrays:
pass
func_finished.set()

def test_non_intersecting_vsubsets_update_OK(
self, inserted_varray: VArray, root_path, varray_collection: Collection
Expand Down Expand Up @@ -465,8 +467,6 @@ def test_intersecting_vsubsets_update_fail(
raise
finally:
func_finished.set()
proc.kill()
print(111111111, result, flush=True)
try:
assert result.count(DekerLockError) == len(slices) - 1
check_data[blocking_slice].fill(blocking_value)
Expand Down Expand Up @@ -659,10 +659,27 @@ def test_array_with_attributes_create_multiple_processes(
manager = Manager()
lock_set = manager.Event()
func_finished = manager.Event()
proc = Process(
target=call_array_method,
args=(
array_with_attributes.collection,
str(embedded_uri(root_path)),
array_with_attributes.id,
"create",
lock_set,
func_finished,
True,
False,
array_with_attributes.primary_attributes,
array_with_attributes.custom_attributes,
),
)
proc.start()
lock_set.wait()

methods = ["create"] * 3
with Pool(WORKERS - 1) as pool:
pool.starmap(
result = pool.starmap(
call_array_method,
[
(
Expand All @@ -681,6 +698,7 @@ def test_array_with_attributes_create_multiple_processes(
],
)
lock_set.wait()
assert result.count(DekerLockError) == len(methods)
func_finished.set()

paths = get_paths(
Expand Down

0 comments on commit 3175cf1

Please sign in to comment.