From f3040c7486f7d1f8db1a59804260d0dd7ceb202f Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Tue, 9 Apr 2024 13:58:25 +0200 Subject: [PATCH 1/9] Changes before moving --- deker/ABC/base_locks.py | 14 +- deker/errors.py | 3 + deker/locks.py | 276 ++++++++++-------- deker/tools/path.py | 1 + deker/types/private/enums.py | 2 + .../test_client/test_client_methods.py | 24 +- tests/test_cases/test_locking/test_locks.py | 28 +- 7 files changed, 205 insertions(+), 143 deletions(-) diff --git a/deker/ABC/base_locks.py b/deker/ABC/base_locks.py index 590a94f..cc2ea60 100644 --- a/deker/ABC/base_locks.py +++ b/deker/ABC/base_locks.py @@ -15,7 +15,7 @@ # along with this program. If not, see . """Abstract interfaces for locks.""" - +# TODO: MOVED from abc import ABC, abstractmethod from functools import wraps from pathlib import Path @@ -31,16 +31,15 @@ class BaseLock(SelfLoggerMixin, ABC): ALLOWED_TYPES: List[str] = [] lock: Optional[Union[Flock, Path]] = None instance: Optional[Any] = None + args: Optional[List[Any]] = None + kwargs: Optional[Dict] = None @abstractmethod - def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Optional[Path]: + def get_path(self) -> Optional[Path]: """Get path to the lock file. For Arrays it shall be .arrlock (array read lock) or path to the file (array write lock) For VArrays there are no specific locks for reading, for writing - lock on .json - - :param func_args: Arguments of method call - :param func_kwargs: Keyword arguments of method call """ pass @@ -98,7 +97,7 @@ def _inner_method_logic(lock: "BaseLock", args: Sequence, kwargs: Dict, func: Ca :param kwargs: Keyword arguments of decorated function """ lock.check_existing_lock(args, kwargs) - path = lock.get_path(args, kwargs) + path = lock.get_path() lock.acquire(path) try: # Logic is in the separate method, so we can override its behavior @@ -129,7 +128,8 @@ def inner(*args: Sequence, **kwargs: Dict[str, Any]) -> Any: lock = self.__class__() # as we wrap methods, we should have access to 'self' objects lock.instance = kwargs.get("self") or args[0] - + lock.args = args + lock.kwargs = kwargs # Check that we don't have lock on anything that besides methods that require lock lock.check_type() diff --git a/deker/errors.py b/deker/errors.py index cae065f..0801ff0 100644 --- a/deker/errors.py +++ b/deker/errors.py @@ -15,6 +15,7 @@ # along with this program. If not, see . +# TODO: MOVED class DekerBaseApplicationError(Exception): """Base attribute exception.""" @@ -83,6 +84,7 @@ class DekerIntegrityError(DekerBaseApplicationError): pass +# TODO: MOVED class DekerLockError(DekerBaseApplicationError): """If a Collection or a Array or VArray instance is locked.""" @@ -110,5 +112,6 @@ class DekerVSubsetError(DekerSubsetError): """If something goes wrong while VSubset managing.""" +# TODO: MOVED class DekerMemoryError(DekerBaseApplicationError, MemoryError): """Early memory overflow exception.""" diff --git a/deker/locks.py b/deker/locks.py index 5aaa723..4bbb708 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -19,11 +19,24 @@ import fcntl import os import time +from contextlib import contextmanager from pathlib import Path from threading import get_native_id from time import sleep -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union, Tuple +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + List, + Optional, + Sequence, + Union, + Tuple, + TypeVar, + Generic, +) from uuid import uuid4 from deker.ABC.base_locks import BaseLock @@ -40,40 +53,109 @@ from deker.types.private.classes import ArrayPositionedData META_DIVIDER = ":" +ArrayFromArgs = Union[Path, Union["Array", "VArray"]] +T = TypeVar("T") -class ReadArrayLock(BaseLock): +def _get_lock_filename(id_: str, lock_ext: LocksExtensions) -> str: + """Get filename for lockfile + + :param id_: ID of array + :param lock_ext: Extension of lock + :return: + """ + name = META_DIVIDER.join([f"{id_}", f"{uuid4()}", f"{os.getpid()}", f"{get_native_id()}"]) + return str(name + lock_ext.value) + + +def _check_write_locks(dir_path: Path, id_: str) -> bool: + """Checks write locks from VArrays that differs from current. + + :param dir_path: Dir where locks are stored (the one with hdf file) + """ + for file in dir_path.iterdir(): + # Skip lock from current process. + # Used when you have to read meta inside .update operation of varray + if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"): + return True + # If we've found another varray lock, that not from current process. + if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore + raise DekerLockError(f"Array {id_} is locked with {file.name}") + return False + + +class LockWithArrayMixin(Generic[T]): + """Base class with getter of array.""" + + args: Optional[List[Any]] + kwargs: Optional[Dict] + instance: Optional[Any] + is_locked_with_varray: bool = False + + @property + def array_id(self) -> str: + """Return id of an Array""" + return self.array.id + + @property + def array(self) -> T: + """Parse array from args and save ref to it.""" + array = self.kwargs.get("array") or self.args[1] # zero arg is 'self' + return array + + @property + def dir_path(self) -> Path: + """Path to directory with main file.""" + return get_main_path(self.array_id, self.instance.collection_path / self.instance.data_dir) + + +def wait_for_unlock(check_func: Callable, check_func_args: tuple, timeout, interval: float) -> bool: + """Waiting while there is no locks + + :param instance: + :param check_func: + :param check_func_args: + :return: + """ + start_time = time.monotonic() + while (time.monotonic() - start_time) <= timeout: + if check_func(*check_func_args): + return True + sleep(interval) + return False + + +class ReadArrayLock(LockWithArrayMixin[ArrayFromArgs], BaseLock): """Read lock for Array.""" ALLOWED_TYPES = ["LocalArrayAdapter"] - def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: + @property + def array_id(self) -> str: + """Get if from Array, or Path to the array.""" + # Get instance of the array + if isinstance(self.array, Path): + path = self.array + id_ = path.name.split(".")[0] + else: + id_ = self.array.id + return id_ + + def get_path(self) -> Path: """Get path to read-lock file. It's only the case for arrays, varrays don't have read locks. :param func_args: arguments of method call :param func_kwargs: keyword arguments of method call """ - # Get instance of the array - array: Union[Path, Union[Array, VArray]] = ( - func_kwargs.get("array") or func_args[1] - ) # zero arg is 'self' - if isinstance(array, Path): - path = array - id_ = path.name.split(".")[0] - else: - id_ = array.id # Get file directory - dir_path = get_main_path(id_, self.instance.collection_path / self.instance.data_dir) - filename = ( - META_DIVIDER.join([f"{id_}", f"{uuid4()}", f"{os.getpid()}", f"{get_native_id()}"]) - + LocksExtensions.array_read_lock.value - ) + filename = _get_lock_filename(self.array_id, LocksExtensions.array_read_lock) + # Create read lock file path - path = dir_path / f"{filename}" + path = self.dir_path / f"{filename}" - self.logger.debug(f"Got path for array.id {id_} lock file: {path}") + self.logger.debug(f"Got path for array.id {self.array_id} lock file: {path}") return path def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: @@ -83,32 +165,22 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: :param func_args: arguments for called function. :param func_kwargs: keyword arguments for called function. """ - array: Union[Path, Union[Array, VArray]] = ( - func_kwargs.get("array") or func_args[1] - ) # zero arg is 'self' - if isinstance(array, Path): - path = array - id_ = path.name.split(".")[0] - else: - id_ = array.id - - dir_path = get_main_path(id_, self.instance.collection_path / self.instance.data_dir) - path = dir_path / (id_ + self.instance.file_ext) - for file in dir_path.iterdir(): - # Skip lock from current process. - if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"): - self.is_locked_with_varray = True - return - # If we've found another varray lock, that not from current process. - if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore - raise DekerLockError(f"Array {array} is locked with {file.name}") + # Check write locks + if _check_write_locks(self.dir_path, self.array_id): + self.is_locked_with_varray = True + # File was locked with VArray from current process. + return + # No write locks found + path = self.dir_path / (self.array_id + self.instance.file_ext) try: with open(path, "r") as f: fcntl.flock(f, fcntl.LOCK_SH | fcntl.LOCK_NB) self.logger.debug(f"Set shared flock for {path}") except BlockingIOError: - raise DekerLockError(f"Array {id_} is locked for update operation, cannot be read.") + raise DekerLockError( + f"Array {self.array_id} is locked for update operation, cannot be read." + ) def acquire(self, path: Union[str, Path]) -> Any: """Read files will not be flocked - only created. @@ -118,7 +190,7 @@ def acquire(self, path: Union[str, Path]) -> Any: # Create lock file self.lock = path open(path, "a").close() - self.logger.debug(f"Acquired lock for {self.lock}") + self.logger.debug(f"Acquired read lock for {self.lock}") def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002] """Release lock by deleting file. @@ -127,27 +199,19 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002] """ if self.lock and self.lock.exists(): self.lock.unlink() + self.logger.debug(f"Releasing read lock for {self.lock}") self.lock = None - self.logger.debug(f"Released lock for {self.lock}") -class WriteArrayLock(BaseLock): +class WriteArrayLock(LockWithArrayMixin["Array"], BaseLock): """Write lock for arrays.""" ALLOWED_TYPES = ["LocalArrayAdapter"] - is_locked_with_varray: bool = False - - def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: - """Get path to the file for locking. - - :param func_args: arguments of method call - :param func_kwargs: keyword arguments of method call - """ - array = func_kwargs.get("array") or func_args[1] # zero arg is 'self' - dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir) - path = dir_path / (array.id + self.instance.file_ext) - self.logger.debug(f"Got path for array.id {array.id} lock file: {path}") + def get_path(self) -> Path: + """Get path to the file for locking.""" + path = self.dir_path / (self.array.id + self.instance.file_ext) + self.logger.debug(f"Got path for array.id {self.array.id} lock file: {path}") return path def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: @@ -156,38 +220,29 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: :param func_args: arguments of method call :param func_kwargs: keyword arguments of method call """ - # Check current read locks - array = func_kwargs.get("array") or func_args[1] # zero arg is 'self' - dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir) - # If array belongs to varray, we should check if varray is also locked - if array._vid: - for file in dir_path.iterdir(): - # Skip lock from current process. - if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"): - self.is_locked_with_varray = True - return - # If we've found another varray lock, that not from current process. - if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore - raise DekerLockError(f"Array {array} is locked with {file.name}") + if self.array._vid: + self.is_locked_with_varray = _check_write_locks(self.dir_path, self.array.id) # Increment write lock, to prevent more read locks coming. - self.acquire(self.get_path(func_args, func_kwargs)) + self.acquire(self.get_path()) # Pattern that has to find any read locks - glob_pattern = f"{array.id}:*{LocksExtensions.array_read_lock.value}" + glob_pattern = f"{self.array.id}:*{LocksExtensions.array_read_lock.value}" # Wait till there are no more read locks - start_time = time.monotonic() - while (time.monotonic() - start_time) <= self.instance.ctx.config.write_lock_timeout: - if not list(dir_path.rglob(glob_pattern)): - return - - sleep(self.instance.ctx.config.write_lock_check_interval) + if wait_for_unlock( + lambda path, pattern: not list(path.rglob(pattern)), + (self.dir_path, glob_pattern), + self.instance.ctx.config.write_lock_timeout, + self.instance.ctx.config.write_lock_check_interval, + ): + # If all locks are released, go further + return # If we hit the timeout, release write lock and raise DekerLockError self.release() - raise DekerLockError(f"Array {array} is locked with read locks") + raise DekerLockError(f"Array {self.array} is locked with read locks") def release(self, e: Optional[Exception] = None) -> None: """Release Flock. @@ -237,7 +292,7 @@ def check_type(self) -> None: if not is_running_on_local: self.skip_lock = True - def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Optional[Path]: # noqa[ARG002] + def get_path(self) -> Optional[Path]: # noqa[ARG002] """Path of json Varray file. :param func_args: arguments of the function that has been called. @@ -273,7 +328,6 @@ def check_locks_for_array_and_set_flock(self, filename: Path) -> Flock: def check_arrays_locks( self, - arrays_positions: List[ArrayPositionedData], adapter: LocalArrayAdapter, varray: VArray, ) -> List[Path]: @@ -286,7 +340,7 @@ def check_arrays_locks( currently_locked = [] collection = varray._VArray__collection # type: ignore[attr-defined] - for array_position in arrays_positions: + for array_position in self.instance._VSubset__arrays: filename = adapter._get_symlink_filename( varray.id, array_position.vposition, @@ -314,25 +368,25 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: adapter = self.instance._VSubset__array_adapter varray = self.instance._VSubset__array - arrays_positions: List[ArrayPositionedData] = self.instance._VSubset__arrays - # Clear links to locks self.locks = [] # Locks that have been acquired by third party process # Iterate over Arrays in VArray and try to lock them. If locking fails - wait. # If it fails again - release all locks. - currently_locked = self.check_arrays_locks(arrays_positions, adapter, varray) + currently_locked = self.check_arrays_locks(adapter, varray) if not currently_locked: # Release array locks return # Wait till there are no more read locks - start_time = time.monotonic() - while (time.monotonic() - start_time) <= adapter.ctx.config.write_lock_timeout: - if not self.check_arrays_locks(arrays_positions, adapter, varray): - return - sleep(adapter.ctx.config.write_lock_check_interval) + if wait_for_unlock( + check_func=lambda: not self.check_arrays_locks(adapter, varray), + check_func_args=tuple(), + timeout=adapter.ctx.config.write_lock_timeout, + interval=adapter.ctx.config.write_lock_check_interval, + ): + return # Release all locks self.release() raise DekerLockError(f"VArray {varray} is locked") @@ -376,39 +430,25 @@ def _inner_method_logic( return super()._inner_method_logic(lock, args, kwargs, func) -class CreateArrayLock(BaseLock): +class CreateArrayLock(LockWithArrayMixin[Union["Array", "VArray", dict]], BaseLock): """Lock that we set when we want to create an array.""" ALLOWED_TYPES = ["LocalArrayAdapter", "LocalVArrayAdapter"] path: Optional[Path] = None - def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: - """Return path to the file that should be locked. - - :param func_args: arguments for called method - :param func_kwargs: keyword arguments for called method - :return: - """ - array = func_kwargs.get("array") or func_args[1] # zero arg is 'self' - + def get_path(self) -> Path: + """Return path to the file that should be locked.""" # Get file directory path dir_path = self.instance.collection_path - filename = META_DIVIDER.join( - [ - f"{array.id}", - f"{uuid4()}", - f"{os.getpid()}", - f"{get_native_id()}", - ] - ) + # Create lock file - path = dir_path / f"{filename}{LocksExtensions.array_lock.value}" + path = dir_path / _get_lock_filename(self.array.id, LocksExtensions.array_lock) if not path.exists(): path.open("w").close() self.path = path - self.logger.debug(f"got path for array.id {array.id} lock file: {path}") + self.logger.debug(f"got path for array.id {self.array.id} lock file: {path}") return path def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: @@ -419,8 +459,8 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: """ from deker.arrays import Array, VArray + array = self.array # Check current read locks - array = func_kwargs.get("array") or func_args[1] # zero arg is 'self' if isinstance(array, dict): adapter = array["adapter"].__class__.__name__ if adapter not in self.ALLOWED_TYPES: @@ -435,7 +475,7 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: # Pattern that has to find any create locks glob_pattern = f"{array.id}:*{LocksExtensions.array_lock.value}" for _ in dir_path.rglob(glob_pattern): - raise DekerLockError(f"Array {array} is locked for creating") + raise DekerLockError(f"Array {array.id} is locked for creating") func_kwargs["array"] = array @@ -464,24 +504,24 @@ def release(self, e: Optional[Exception] = None) -> None: super().release(e) -class UpdateMetaAttributeLock(BaseLock): +class UpdateMetaAttributeLock(LockWithArrayMixin[Union["Array", "VArray"]], BaseLock): """Lock for updating meta.""" ALLOWED_TYPES = ["LocalArrayAdapter", "LocalVArrayAdapter"] - def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: + def get_path(self) -> Path: """Return path to the file that should be locked. :param func_args: arguments for called method :param func_kwargs: keyword arguments for called method :return: """ - array = func_kwargs.get("array") or func_args[1] # zero arg is 'self' - # Get directory where file locates - dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir) - path = dir_path / f"{array.id}{self.instance.file_ext}" - self.logger.debug(f"Got path for array.id {array.id} lock file: {path}") + dir_path = get_main_path( + self.array.id, self.instance.collection_path / self.instance.data_dir + ) + path = dir_path / f"{self.array.id}{self.instance.file_ext}" + self.logger.debug(f"Got path for array.id {self.array.id} lock file: {path}") return path @@ -490,13 +530,13 @@ class CollectionLock(BaseLock): ALLOWED_TYPES = ["LocalCollectionAdapter"] - def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: # noqa[ARG002] + def get_path(self) -> Path: # noqa[ARG002] """Return path to collection lock file. :param func_args: arguments for called method :param func_kwargs: keyword arguments for called method """ - collection = func_args[1] + collection = self.args[1] path = self.instance.collections_resource / (collection.name + ".lock") self.logger.debug(f"Got path for collection {collection.name} lock file: {path}") return path diff --git a/deker/tools/path.py b/deker/tools/path.py index 58ad76d..8deac53 100644 --- a/deker/tools/path.py +++ b/deker/tools/path.py @@ -54,6 +54,7 @@ def get_symlink_path( return symlink_path +# TODO: MOVED def get_main_path(array_id: str, data_directory: Path) -> Path: """Generate main path for the given array id by its type. diff --git a/deker/types/private/enums.py b/deker/types/private/enums.py index 882f6ed..14d3acb 100644 --- a/deker/types/private/enums.py +++ b/deker/types/private/enums.py @@ -88,6 +88,7 @@ class DimensionType(str, Enum): time = "time" +# TODO: MOVED class LocksExtensions(str, Enum): """Extensions for lock files.""" @@ -97,6 +98,7 @@ class LocksExtensions(str, Enum): varray_lock = ".varraylock" +# TODO: MOVED class LocksTypes(str, Enum): """Locks enum.""" diff --git a/tests/test_cases/test_client/test_client_methods.py b/tests/test_cases/test_client/test_client_methods.py index 2eb2921..8d63cc7 100644 --- a/tests/test_cases/test_client/test_client_methods.py +++ b/tests/test_cases/test_client/test_client_methods.py @@ -382,7 +382,10 @@ def test_client_check_integrity_collection( try: client.check_integrity(2, stop_on_error=False, collection=collection_1.name) except Exception as e: - assert str(e) == f"Collection \"{collection_1.name}\" metadata is invalid/corrupted: 'test'" + assert ( + str(e) + == f"Collection \"{collection_1.name}\" metadata is invalid/corrupted: 'test'" + ) errors = capsys.readouterr().out collection_1.delete() collection_2.delete() @@ -771,7 +774,9 @@ def test_client_get_locks_array_read_lock( self, client: Client, params, read_array_lock, inserted_array ): client.create_collection(**params) - file = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array}) + read_array_lock.args = [] + read_array_lock.kwargs = {"array": inserted_array} + file = read_array_lock.get_path() file.touch() meta = file.name.split(":") assert { @@ -796,7 +801,9 @@ def test_client_get_locks_type_array_read_lock( self, client: Client, params, read_array_lock, inserted_array ): client.create_collection(**params) - file = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array}) + read_array_lock.args = [] + read_array_lock.kwargs = {"array": inserted_array} + file = read_array_lock.get_path() file.touch() meta = file.name.split(":") assert [ @@ -823,7 +830,9 @@ def test_client_get_locks_skip_lock( self, client: Client, params, read_array_lock, inserted_array ): client.create_collection(**params) - file = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array}) + read_array_lock.args = [] + read_array_lock.kwargs = {"array": inserted_array} + file = read_array_lock.get_path() open(f"{file}:{os.getpid()}{LocksExtensions.varray_lock.value}", "w").close() assert not client._get_locks(lock_type=LocksTypes.varray_lock) @@ -849,7 +858,9 @@ def test_client_clear_locks_collection_empty(self, client: Client, params): def test_client_clear_locks_inserted_array( self, client: Client, root_path, read_array_lock, inserted_array ): - filepath = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array}) + read_array_lock.args = [] + read_array_lock.kwargs = {"array": inserted_array} + filepath = read_array_lock.get_path() filepath.touch() assert list(Path.rglob(root_path, f"*{LocksExtensions.array_read_lock.value}")) client.clear_locks() @@ -858,7 +869,8 @@ def test_client_clear_locks_inserted_array( def test_client_clear_locks_inserted_array_multiple_locks( self, client: Client, root_path, read_array_lock, inserted_array ): - filepath = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array}) + read_array_lock.kwargs = {"array": inserted_array} + filepath = read_array_lock.get_path() open(f"{filepath}:{os.getpid()}{LocksExtensions.varray_lock.value}", "w").close() filepath.touch() assert list(Path.rglob(root_path, f"*{LocksExtensions.array_read_lock.value}")) diff --git a/tests/test_cases/test_locking/test_locks.py b/tests/test_cases/test_locking/test_locks.py index 34f19fb..f9b2a9a 100644 --- a/tests/test_cases/test_locking/test_locks.py +++ b/tests/test_cases/test_locking/test_locks.py @@ -93,9 +93,9 @@ def test_read_array_lock_path( ) ) ) - assert pattern.match( - str(read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array})) - ) + read_array_lock.args = [] + read_array_lock.kwargs = {"array": inserted_array} + assert pattern.match(str(read_array_lock.get_path())) def test_read_array_check_existing_lock( self, @@ -110,6 +110,7 @@ def test_read_array_check_existing_lock( hdf_path = dir_path / (inserted_array.id + local_array_adapter.file_ext) with Flock(hdf_path): with pytest.raises(DekerLockError): + read_array_lock.kwargs = {"array": inserted_array} read_array_lock.check_existing_lock( func_args=[], func_kwargs={"array": inserted_array} ) @@ -128,7 +129,8 @@ def test_read_array_locks_create_files( mocker.patch("deker.locks.uuid4", lambda: uuid) # Get path of the file that should be created - filepath = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array}) + read_array_lock.kwargs = {"array": inserted_array} + filepath = read_array_lock.get_path() # Mocked method to test def check_file(self, *args, **kwargs): @@ -175,10 +177,8 @@ def test_write_array_lock_lock_path( inserted_array.id, local_array_adapter.collection_path / local_array_adapter.data_dir ) hdf_path = dir_path / (inserted_array.id + local_array_adapter.file_ext) - - assert hdf_path == write_array_lock.get_path( - func_args=[], func_kwargs={"array": inserted_array} - ) + write_array_lock.kwargs = {"array": inserted_array} + assert hdf_path == write_array_lock.get_path() def test_write_array_lock_check_existing_lock_no_read_locks( self, @@ -191,7 +191,7 @@ def test_write_array_lock_check_existing_lock_no_read_locks( # Mock acquire as we do not want to lock file here. acquire = mocker.patch.object(write_array_lock, "acquire", autospec=True) release = mocker.patch.object(write_array_lock, "release", autospec=True) - + write_array_lock.kwargs = {"array": inserted_array} write_array_lock.check_existing_lock(func_args=[], func_kwargs={"array": inserted_array}) acquire.assert_called() release.assert_not_called() @@ -213,7 +213,8 @@ def test_write_array_lock_check_existing_lock_read_locks_success( release = mocker.patch.object(write_array_lock, "release", autospec=True) # Make read lock - filepath = write_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array}) + write_array_lock.kwargs = {"array": inserted_array} + filepath = write_array_lock.get_path() process = Process(target=make_read_lock, args=(filepath, inserted_array.id, file_created)) process.start() @@ -242,12 +243,14 @@ def test_write_array_lock_check_existing_lock_read_locks_fail( release = mocker.patch.object(write_array_lock, "release", autospec=True) # Make read lock - filepath = write_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array}) + write_array_lock.kwargs = {"array": inserted_array} + filepath = write_array_lock.get_path() process = Process(target=make_read_lock, args=(filepath, inserted_array.id, file_created)) process.start() # Call check existing with pytest.raises(DekerLockError): + write_array_lock.kwargs = {"array": inserted_array} write_array_lock.check_existing_lock( func_args=[], func_kwargs={"array": inserted_array} ) @@ -308,7 +311,8 @@ def test_check_get_path( path = get_main_path( inserted_varray.id, varray_collection.path / local_varray_adapter.data_dir ) / (inserted_varray.id + local_varray_adapter.file_ext) - assert path == write_varray_lock.get_path([], {}) + write_varray_lock.kwargs = {"array": inserted_varray} + assert path == write_varray_lock.get_path() def test_check_existing_locks_fail( self, From 3aacf3dd8a224092791bdd835763362ac23509b3 Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Mon, 15 Apr 2024 14:50:59 +0200 Subject: [PATCH 2/9] Locks and ordered dict --- deker/ABC/base_array.py | 39 ++++++++++++------- deker/locks.py | 34 +++++----------- .../test_arrays/test_array_methods.py | 39 ++++++++++++++++++- 3 files changed, 73 insertions(+), 39 deletions(-) diff --git a/deker/ABC/base_array.py b/deker/ABC/base_array.py index a435d59..e8adaa5 100644 --- a/deker/ABC/base_array.py +++ b/deker/ABC/base_array.py @@ -470,34 +470,45 @@ def _create_from_meta( attrs_schema = collection.varray_schema.attributes else: attrs_schema = collection.array_schema.attributes + try: - for attr in attrs_schema: - attributes = ( - meta["primary_attributes"] if attr.primary else meta["custom_attributes"] - ) + # To ensure the order of attributes + primary_attributes, custom_attributes = OrderedDict(), OrderedDict() + + # Iterate over every attribute in schema: + for attr_schema in attrs_schema: + if attr_schema.primary: + attributes_from_meta = meta["primary_attributes"] + result_attributes = primary_attributes + else: + attributes_from_meta = meta["custom_attributes"] + result_attributes = custom_attributes - value = attributes[attr.name] + value = attributes_from_meta[attr_schema.name] - if attr.dtype == datetime: - attributes[attr.name] = get_utc(value) - if attr.dtype == tuple: + if attr_schema.dtype == datetime: + result_attributes[attr_schema.name] = get_utc(value) + elif attr_schema.dtype == tuple: if ( - attr.primary or (not attr.primary and value is not None) + attr_schema.primary or (not attr_schema.primary and value is not None) ) and not isinstance(value, list): raise DekerMetaDataError( f"Collection '{collection.name}' metadata is corrupted: " - f"attribute '{attr.name}' has invalid type '{type(value)}'; '{attr.dtype}' expected" + f"attribute '{attr_schema.name}' has invalid type '{type(value)}';" + f"'{attr_schema.dtype}' expected" ) - if attr.primary or (not attr.primary and value is not None): - attributes[attr.name] = tuple(value) + if attr_schema.primary or (not attr_schema.primary and value is not None): + result_attributes[attr_schema.name] = tuple(value) + else: + result_attributes[attr_schema.name] = value arr_params = { "collection": collection, "adapter": array_adapter, "id_": meta["id"], - "primary_attributes": meta.get("primary_attributes"), - "custom_attributes": meta.get("custom_attributes"), + "primary_attributes": primary_attributes, + "custom_attributes": custom_attributes, } if varray_adapter: arr_params.update({"adapter": varray_adapter, "array_adapter": array_adapter}) diff --git a/deker/locks.py b/deker/locks.py index 4bbb708..054273d 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -19,7 +19,6 @@ import fcntl import os import time -from contextlib import contextmanager from pathlib import Path from threading import get_native_id @@ -50,7 +49,6 @@ from deker_local_adapters import LocalArrayAdapter from deker.arrays import Array, VArray - from deker.types.private.classes import ArrayPositionedData META_DIVIDER = ":" ArrayFromArgs = Union[Path, Union["Array", "VArray"]] @@ -94,8 +92,14 @@ class LockWithArrayMixin(Generic[T]): @property def array_id(self) -> str: - """Return id of an Array""" - return self.array.id + """Get if from Array, or Path to the array.""" + # Get instance of the array + if isinstance(self.array, Path): + path = self.array + id_ = path.name.split(".")[0] + else: + id_ = self.array.id + return id_ @property def array(self) -> T: @@ -128,19 +132,6 @@ def wait_for_unlock(check_func: Callable, check_func_args: tuple, timeout, inter class ReadArrayLock(LockWithArrayMixin[ArrayFromArgs], BaseLock): """Read lock for Array.""" - ALLOWED_TYPES = ["LocalArrayAdapter"] - - @property - def array_id(self) -> str: - """Get if from Array, or Path to the array.""" - # Get instance of the array - if isinstance(self.array, Path): - path = self.array - id_ = path.name.split(".")[0] - else: - id_ = self.array.id - return id_ - def get_path(self) -> Path: """Get path to read-lock file. @@ -206,11 +197,9 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002] class WriteArrayLock(LockWithArrayMixin["Array"], BaseLock): """Write lock for arrays.""" - ALLOWED_TYPES = ["LocalArrayAdapter"] - def get_path(self) -> Path: """Get path to the file for locking.""" - path = self.dir_path / (self.array.id + self.instance.file_ext) + path = self.dir_path / (self.array_id + self.instance.file_ext) self.logger.debug(f"Got path for array.id {self.array.id} lock file: {path}") return path @@ -221,8 +210,7 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: :param func_kwargs: keyword arguments of method call """ # If array belongs to varray, we should check if varray is also locked - if self.array._vid: - self.is_locked_with_varray = _check_write_locks(self.dir_path, self.array.id) + self.is_locked_with_varray = _check_write_locks(self.dir_path, self.array_id) # Increment write lock, to prevent more read locks coming. self.acquire(self.get_path()) @@ -275,8 +263,6 @@ class WriteVarrayLock(BaseLock): which managed to obtain all Array locks, will survive. """ - ALLOWED_TYPES = ["VSubset"] - # Locks that have been acquired by varray locks: List[Tuple[Flock, Path]] = [] skip_lock: bool = False # shows that we must skip this lock (e.g server adapters for subset) diff --git a/tests/test_cases/test_arrays/test_array_methods.py b/tests/test_cases/test_arrays/test_array_methods.py index c5b400c..a45a1f5 100644 --- a/tests/test_cases/test_arrays/test_array_methods.py +++ b/tests/test_cases/test_arrays/test_array_methods.py @@ -1,17 +1,20 @@ import os import string +from copy import deepcopy from datetime import datetime, timedelta, timezone from pathlib import Path +from random import shuffle from typing import Any import numpy as np import pytest -from deker_local_adapters import HDF5StorageAdapter +from deker_local_adapters import HDF5StorageAdapter, LocalArrayAdapter, LocalVArrayAdapter from deker_local_adapters.factory import AdaptersFactory from numpy import ndarray +from deker.types import ArrayMeta from tests.parameters.array_params import attributes_validation_params from tests.parameters.index_exp_params import invalid_index_params, valid_index_exp_params from tests.parameters.uri import embedded_uri @@ -717,6 +720,40 @@ def test_step_validator(self, array: Array, index_exp): with pytest.raises(IndexError): array[index_exp] + def test_create_from_meta_ordered( + self, + array_collection_with_attributes: Collection, + local_array_adapter: LocalArrayAdapter, + local_varray_adapter: LocalVArrayAdapter, + array_with_attributes: Array, + ): + meta: ArrayMeta = array_with_attributes.as_dict + + primary_attribute_keys = list(meta["primary_attributes"].keys()) + shuffle(primary_attribute_keys) + + custom_attribute_keys = list(meta["custom_attributes"].keys()) + shuffle(custom_attribute_keys) + + primary_attributes, custom_attributes = {}, {} + for key in primary_attribute_keys: + primary_attributes[key] = meta["primary_attributes"][key] + + for key in custom_attribute_keys: + custom_attributes[key] = meta["custom_attributes"][key] + + meta["primary_attributes"] = primary_attributes + meta["custom_attributes"] = custom_attributes + + array = Array._create_from_meta( + array_collection_with_attributes, + meta=meta, + array_adapter=local_array_adapter, + varray_adapter=None, + ) + assert array.primary_attributes == array_with_attributes.primary_attributes + assert array.custom_attributes == array_with_attributes.custom_attributes + if __name__ == "__main__": pytest.main() From ffc4dea39ba40c6e41e999a0b15bb78a6191f768 Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Mon, 15 Apr 2024 22:12:23 +0200 Subject: [PATCH 3/9] tests & locks fix --- deker/locks.py | 4 ++ .../test_arrays/test_varray_methods.py | 38 ++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/deker/locks.py b/deker/locks.py index ef00b70..da34768 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -132,6 +132,8 @@ def wait_for_unlock(check_func: Callable, check_func_args: tuple, timeout, inter class ReadArrayLock(LockWithArrayMixin[ArrayFromArgs], BaseLock): """Read lock for Array.""" + ALLOWED_TYPES = ["LocalArrayAdapter"] + def get_path(self) -> Path: """Get path to read-lock file. @@ -197,6 +199,8 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002] class WriteArrayLock(LockWithArrayMixin["Array"], BaseLock): """Write lock for arrays.""" + ALLOWED_TYPES = ["LocalArrayAdapter"] + def get_path(self) -> Path: """Get path to the file for locking.""" path = self.dir_path / (self.array_id + self.instance.file_ext) diff --git a/tests/test_cases/test_arrays/test_varray_methods.py b/tests/test_cases/test_arrays/test_varray_methods.py index 69871c4..7fc6807 100644 --- a/tests/test_cases/test_arrays/test_varray_methods.py +++ b/tests/test_cases/test_arrays/test_varray_methods.py @@ -3,16 +3,18 @@ from datetime import datetime, timedelta, timezone from pathlib import Path +from random import shuffle from typing import Any import numpy as np import pytest -from deker_local_adapters import HDF5StorageAdapter +from deker_local_adapters import HDF5StorageAdapter, LocalArrayAdapter, LocalVArrayAdapter from deker_local_adapters.factory import AdaptersFactory from deker_tools.path import is_empty from numpy import ndarray +from deker.types import ArrayMeta from tests.parameters.array_params import attributes_validation_params from tests.parameters.index_exp_params import invalid_index_params, valid_index_exp_params from tests.parameters.uri import embedded_uri @@ -783,6 +785,40 @@ def test_step_validaor(self, varray: VArray, index_exp): with pytest.raises(IndexError): varray[index_exp] + def test_create_from_meta_ordered( + self, + varray_collection_with_attributes: Collection, + local_array_adapter: LocalArrayAdapter, + local_varray_adapter: LocalVArrayAdapter, + varray_with_attributes: VArray, + ): + meta: ArrayMeta = varray_with_attributes.as_dict + + primary_attribute_keys = list(meta["primary_attributes"].keys()) + shuffle(primary_attribute_keys) + + custom_attribute_keys = list(meta["custom_attributes"].keys()) + shuffle(custom_attribute_keys) + + primary_attributes, custom_attributes = {}, {} + for key in primary_attribute_keys: + primary_attributes[key] = meta["primary_attributes"][key] + + for key in custom_attribute_keys: + custom_attributes[key] = meta["custom_attributes"][key] + + meta["primary_attributes"] = primary_attributes + meta["custom_attributes"] = custom_attributes + + array = VArray._create_from_meta( + varray_collection_with_attributes, + meta=meta, + array_adapter=local_array_adapter, + varray_adapter=local_varray_adapter, + ) + assert array.primary_attributes == varray_with_attributes.primary_attributes + assert array.custom_attributes == varray_with_attributes.custom_attributes + if __name__ == "__main__": pytest.main() From 3f15040bf326f66eb6a6972b99e9146f1bcd6fcf Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Tue, 16 Apr 2024 09:23:40 +0200 Subject: [PATCH 4/9] linters fix --- deker/locks.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/deker/locks.py b/deker/locks.py index da34768..2dc0fb3 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -56,7 +56,7 @@ def _get_lock_filename(id_: str, lock_ext: LocksExtensions) -> str: - """Get filename for lockfile + """Get filename for lockfile. :param id_: ID of array :param lock_ext: Extension of lock @@ -70,6 +70,7 @@ def _check_write_locks(dir_path: Path, id_: str) -> bool: """Checks write locks from VArrays that differs from current. :param dir_path: Dir where locks are stored (the one with hdf file) + :param id_: Id of array """ for file in dir_path.iterdir(): # Skip lock from current process. @@ -113,12 +114,15 @@ def dir_path(self) -> Path: return get_main_path(self.array_id, self.instance.collection_path / self.instance.data_dir) -def wait_for_unlock(check_func: Callable, check_func_args: tuple, timeout, interval: float) -> bool: +def wait_for_unlock( + check_func: Callable, check_func_args: tuple, timeout: int, interval: float +) -> bool: """Waiting while there is no locks - :param instance: - :param check_func: - :param check_func_args: + :param check_func: Func that check if lock has been releases + :param check_func_args: Args for func + :param timeout: For how long we should wait lock release + :param interval: How often we check locks :return: """ start_time = time.monotonic() @@ -138,10 +142,7 @@ def get_path(self) -> Path: """Get path to read-lock file. It's only the case for arrays, varrays don't have read locks. - :param func_args: arguments of method call - :param func_kwargs: keyword arguments of method call """ - # Get file directory filename = _get_lock_filename(self.array_id, LocksExtensions.array_read_lock) From 3c1fcc18c6bd3f24a1c7c93fab44eda4deafcedd Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Tue, 16 Apr 2024 09:35:58 +0200 Subject: [PATCH 5/9] linters fix --- deker/ABC/base_array.py | 3 ++- deker/locks.py | 29 +++++++---------------------- 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/deker/ABC/base_array.py b/deker/ABC/base_array.py index d6ebeef..e73366c 100644 --- a/deker/ABC/base_array.py +++ b/deker/ABC/base_array.py @@ -462,7 +462,8 @@ def _create_from_meta( try: # To ensure the order of attributes - primary_attributes, custom_attributes = OrderedDict(), OrderedDict() + primary_attributes: OrderedDict = OrderedDict() + custom_attributes: OrderedDict = OrderedDict() # Iterate over every attribute in schema: for attr_schema in attrs_schema: diff --git a/deker/locks.py b/deker/locks.py index 2dc0fb3..1aeca16 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -28,13 +28,13 @@ Any, Callable, Dict, + Generic, List, Optional, Sequence, - Union, Tuple, TypeVar, - Generic, + Union, ) from uuid import uuid4 @@ -99,7 +99,7 @@ def array_id(self) -> str: path = self.array id_ = path.name.split(".")[0] else: - id_ = self.array.id + id_ = self.array.id # type: ignore[attr-defined] return id_ @property @@ -117,7 +117,7 @@ def dir_path(self) -> Path: def wait_for_unlock( check_func: Callable, check_func_args: tuple, timeout: int, interval: float ) -> bool: - """Waiting while there is no locks + """Waiting while there is no locks. :param check_func: Func that check if lock has been releases :param check_func_args: Args for func @@ -286,11 +286,7 @@ def check_type(self) -> None: self.skip_lock = True def get_path(self) -> Optional[Path]: # noqa[ARG002] - """Path of json Varray file. - - :param func_args: arguments of the function that has been called. - :param func_kwargs: keyword arguments of the function that has been called. - """ + """Path of json Varray file.""" array = self.instance._VSubset__array adapter = self.instance._VSubset__adapter path = get_main_path( @@ -303,7 +299,6 @@ def check_locks_for_array_and_set_flock(self, filename: Path) -> Flock: """Check if there is no read lock. :param filename: Path to file that should be flocked - :return: """ # Check read lock first array_id = filename.name.split(".")[0] @@ -326,7 +321,6 @@ def check_arrays_locks( ) -> List[Path]: """Check all Arrays that are in current VArray. - :param arrays_positions: Arrays' positions in VArray :param adapter: Array Adapter instance :param varray: VArray """ @@ -503,12 +497,7 @@ class UpdateMetaAttributeLock(LockWithArrayMixin[Union["Array", "VArray"]], Base ALLOWED_TYPES = ["LocalArrayAdapter", "LocalVArrayAdapter"] def get_path(self) -> Path: - """Return path to the file that should be locked. - - :param func_args: arguments for called method - :param func_kwargs: keyword arguments for called method - :return: - """ + """Return path to the file that should be locked.""" # Get directory where file locates dir_path = get_main_path( self.array.id, self.instance.collection_path / self.instance.data_dir @@ -524,11 +513,7 @@ class CollectionLock(BaseLock): ALLOWED_TYPES = ["LocalCollectionAdapter"] def get_path(self) -> Path: # noqa[ARG002] - """Return path to collection lock file. - - :param func_args: arguments for called method - :param func_kwargs: keyword arguments for called method - """ + """Return path to collection lock file.""" collection = self.args[1] path = self.instance.collections_resource / (collection.name + ".lock") self.logger.debug(f"Got path for collection {collection.name} lock file: {path}") From afd511d542095078f29d5c6edcc38151db78d745 Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Tue, 16 Apr 2024 11:42:37 +0200 Subject: [PATCH 6/9] todos removed & dict returned to ordered dict --- deker/ABC/base_array.py | 4 ++-- deker/ABC/base_locks.py | 1 - deker/errors.py | 3 --- deker/tools/path.py | 1 - deker/types/private/enums.py | 2 -- 5 files changed, 2 insertions(+), 9 deletions(-) diff --git a/deker/ABC/base_array.py b/deker/ABC/base_array.py index e73366c..d395cdd 100644 --- a/deker/ABC/base_array.py +++ b/deker/ABC/base_array.py @@ -462,8 +462,8 @@ def _create_from_meta( try: # To ensure the order of attributes - primary_attributes: OrderedDict = OrderedDict() - custom_attributes: OrderedDict = OrderedDict() + primary_attributes: dict = dict() + custom_attributes: dict = dict() # Iterate over every attribute in schema: for attr_schema in attrs_schema: diff --git a/deker/ABC/base_locks.py b/deker/ABC/base_locks.py index cc2ea60..2d90782 100644 --- a/deker/ABC/base_locks.py +++ b/deker/ABC/base_locks.py @@ -15,7 +15,6 @@ # along with this program. If not, see . """Abstract interfaces for locks.""" -# TODO: MOVED from abc import ABC, abstractmethod from functools import wraps from pathlib import Path diff --git a/deker/errors.py b/deker/errors.py index 8991fd5..2b169aa 100644 --- a/deker/errors.py +++ b/deker/errors.py @@ -16,7 +16,6 @@ from typing import List -# TODO: MOVED class DekerBaseApplicationError(Exception): """Base attribute exception.""" @@ -85,7 +84,6 @@ class DekerIntegrityError(DekerBaseApplicationError): pass -# TODO: MOVED class DekerLockError(DekerBaseApplicationError): """If a Collection or a Array or VArray instance is locked.""" @@ -144,6 +142,5 @@ def __str__(self) -> str: return f"{self.message}; exceptions:\n\n{joined} " -# TODO: MOVED class DekerMemoryError(DekerBaseApplicationError, MemoryError): """Early memory overflow exception.""" diff --git a/deker/tools/path.py b/deker/tools/path.py index 8deac53..58ad76d 100644 --- a/deker/tools/path.py +++ b/deker/tools/path.py @@ -54,7 +54,6 @@ def get_symlink_path( return symlink_path -# TODO: MOVED def get_main_path(array_id: str, data_directory: Path) -> Path: """Generate main path for the given array id by its type. diff --git a/deker/types/private/enums.py b/deker/types/private/enums.py index 14d3acb..882f6ed 100644 --- a/deker/types/private/enums.py +++ b/deker/types/private/enums.py @@ -88,7 +88,6 @@ class DimensionType(str, Enum): time = "time" -# TODO: MOVED class LocksExtensions(str, Enum): """Extensions for lock files.""" @@ -98,7 +97,6 @@ class LocksExtensions(str, Enum): varray_lock = ".varraylock" -# TODO: MOVED class LocksTypes(str, Enum): """Locks enum.""" From a996ad53f0017292ad0bfedf9a54f8fe1486f891 Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Wed, 17 Apr 2024 09:36:32 +0200 Subject: [PATCH 7/9] todos removed & dict returned to ordered dict --- deker/ABC/base_array.py | 33 ++++++----------------- deker/errors.py | 20 ++++++++------ deker/tools/attributes.py | 43 +++++++++++++++++++++++++++++- tests/parameters/schemas_params.py | 2 +- 4 files changed, 63 insertions(+), 35 deletions(-) diff --git a/deker/ABC/base_array.py b/deker/ABC/base_array.py index d395cdd..dce3819 100644 --- a/deker/ABC/base_array.py +++ b/deker/ABC/base_array.py @@ -19,7 +19,6 @@ import json from abc import ABC, abstractmethod -from collections import OrderedDict from copy import deepcopy from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Type, Union @@ -35,7 +34,7 @@ from deker.schemas import ArraySchema, VArraySchema from deker.subset import Subset, VSubset from deker.tools.array import check_memory, get_id -from deker.tools.attributes import deserialize_attribute_value, serialize_attribute_value +from deker.tools.attributes import make_ordered_dict, serialize_attribute_value from deker.tools.schema import create_dimensions from deker.types.private.classes import ArrayMeta, Serializer from deker.types.private.typings import FancySlice, Numeric, Slice @@ -296,10 +295,9 @@ def __init__( self.schema, primary_attributes, custom_attributes ) - self.primary_attributes: OrderedDict = ( - OrderedDict({**primary_attributes}) if primary_attributes else OrderedDict() + self.primary_attributes, self.custom_attributes = make_ordered_dict( + primary_attributes, custom_attributes, self.schema.attributes # type: ignore[arg-type] ) - self.custom_attributes: dict = custom_attributes if custom_attributes else {} def __del__(self) -> None: del self.__adapter @@ -462,26 +460,11 @@ def _create_from_meta( try: # To ensure the order of attributes - primary_attributes: dict = dict() - custom_attributes: dict = dict() - - # Iterate over every attribute in schema: - for attr_schema in attrs_schema: - if attr_schema.primary: - attributes_from_meta = meta["primary_attributes"] - result_attributes = primary_attributes - else: - attributes_from_meta = meta["custom_attributes"] - result_attributes = custom_attributes - - value = attributes_from_meta[attr_schema.name] - if value is None and not attr_schema.primary: - result_attributes[attr_schema.name] = value - continue - - result_attributes[attr_schema.name] = deserialize_attribute_value( - value, attr_schema.dtype, False - ) + primary_attributes, custom_attributes = make_ordered_dict( + meta["primary_attributes"], + meta["custom_attributes"], + attrs_schema, # type: ignore[arg-type] + ) arr_params = { "collection": collection, diff --git a/deker/errors.py b/deker/errors.py index 2b169aa..596dcc1 100644 --- a/deker/errors.py +++ b/deker/errors.py @@ -107,15 +107,10 @@ class DekerSubsetError(DekerArrayError): """If something goes wrong while Subset managing.""" -class DekerVSubsetError(DekerSubsetError): - """If something goes wrong while VSubset managing. +class DekerExceptionGroup(DekerBaseApplicationError): + """If one or more threads finished with any exception. - Regarding that VSubset's inner Subsets' processing - is made in an Executor, this exception actually is - an `exceptions messages group`. - - If one or more threads finished with any exception, - name, message and reasonable tracebacks of all + Name, message and reasonable tracebacks of all of these exceptions shall be collected in a list and passed to this class along with some message. @@ -144,3 +139,12 @@ def __str__(self) -> str: class DekerMemoryError(DekerBaseApplicationError, MemoryError): """Early memory overflow exception.""" + + +class DekerVSubsetError(DekerSubsetError, DekerExceptionGroup): + """If something goes wrong while VSubset managing. + + Regarding that VSubset's inner Subsets' processing + is made in an Executor, this exception actually is + an `exceptions messages group`. + """ diff --git a/deker/tools/attributes.py b/deker/tools/attributes.py index 91f2d68..8689d38 100644 --- a/deker/tools/attributes.py +++ b/deker/tools/attributes.py @@ -15,14 +15,19 @@ # along with this program. If not, see . import re +from collections import OrderedDict from datetime import datetime -from typing import Any, Tuple, Type, Union +from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Type, Union import numpy as np from deker_tools.time import get_utc +if TYPE_CHECKING: + from deker import AttributeSchema + + def serialize_attribute_value( val: Any, ) -> Union[Tuple[str, int, float, tuple], str, int, float, tuple]: @@ -112,3 +117,39 @@ def deserialize_attribute_nested_tuples(value: Tuple[Any, ...]) -> Tuple[Any, .. value = deserialize_attribute_value(el, type(el), True) deserialized.append(value) return tuple(deserialized) + + +def make_ordered_dict( + primary_attributes: Optional[dict], + custom_attributes: Optional[dict], + attrs_schema: Union[List["AttributeSchema"], Tuple[AttributeSchema, ...]], +) -> Tuple[OrderedDict, OrderedDict]: + """Ensure that attributes in dict are located in correct order (Based on schema). + + :param primary_attributes: Primary attributes dict + :param custom_attributes: Custom attributes dict + :param attrs_schema: Schema of attributes to get order + """ + # To ensure the order of attributes + ordered_primary_attributes: OrderedDict = OrderedDict() + ordered_custom_attributes: OrderedDict = OrderedDict() + + # Iterate over every attribute in schema: + for attr_schema in attrs_schema: + if attr_schema.primary: + attributes_from_meta = primary_attributes + result_attributes = ordered_primary_attributes + else: + attributes_from_meta = custom_attributes + result_attributes = ordered_custom_attributes + + value = attributes_from_meta[attr_schema.name] + if value is None and not attr_schema.primary: + result_attributes[attr_schema.name] = value + continue + + result_attributes[attr_schema.name] = deserialize_attribute_value( + value, attr_schema.dtype, False + ) + + return ordered_primary_attributes, ordered_custom_attributes diff --git a/tests/parameters/schemas_params.py b/tests/parameters/schemas_params.py index 039c6d2..1263a13 100644 --- a/tests/parameters/schemas_params.py +++ b/tests/parameters/schemas_params.py @@ -569,7 +569,7 @@ def WRONG_params_dataclass_raises(cls) -> List[Any]: *cls._generate_types( base_dict={"dtype": dtype, "dimensions": dimensions, "vgrid": vgrid}, key="attributes", - exception_types=[tuple, list], + exception_types=[tuple, list, None], ), # wrong vgrid *cls._generate_types( From 2af076174e81cb1b733611c1f05954f963daa685 Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Wed, 17 Apr 2024 09:55:54 +0200 Subject: [PATCH 8/9] skip "Error" suffix --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 869a727..de50866 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -229,6 +229,7 @@ ignore = [ "I202", "N807", "N813", + "N818", "W503", "W504", ] From 98b22722e37b598606fa00de734851adaa2c1205 Mon Sep 17 00:00:00 2001 From: matveyvarg Date: Wed, 17 Apr 2024 10:08:07 +0200 Subject: [PATCH 9/9] tests typing fix --- deker/tools/attributes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deker/tools/attributes.py b/deker/tools/attributes.py index 8689d38..54f6957 100644 --- a/deker/tools/attributes.py +++ b/deker/tools/attributes.py @@ -122,7 +122,7 @@ def deserialize_attribute_nested_tuples(value: Tuple[Any, ...]) -> Tuple[Any, .. def make_ordered_dict( primary_attributes: Optional[dict], custom_attributes: Optional[dict], - attrs_schema: Union[List["AttributeSchema"], Tuple[AttributeSchema, ...]], + attrs_schema: Union[List["AttributeSchema"], Tuple["AttributeSchema", ...]], ) -> Tuple[OrderedDict, OrderedDict]: """Ensure that attributes in dict are located in correct order (Based on schema).