diff --git a/deker/ABC/base_array.py b/deker/ABC/base_array.py
index c9907eb..dce3819 100644
--- a/deker/ABC/base_array.py
+++ b/deker/ABC/base_array.py
@@ -19,7 +19,6 @@
import json
from abc import ABC, abstractmethod
-from collections import OrderedDict
from copy import deepcopy
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Type, Union
@@ -35,7 +34,7 @@
from deker.schemas import ArraySchema, VArraySchema
from deker.subset import Subset, VSubset
from deker.tools.array import check_memory, get_id
-from deker.tools.attributes import deserialize_attribute_value, serialize_attribute_value
+from deker.tools.attributes import make_ordered_dict, serialize_attribute_value
from deker.tools.schema import create_dimensions
from deker.types.private.classes import ArrayMeta, Serializer
from deker.types.private.typings import FancySlice, Numeric, Slice
@@ -296,10 +295,9 @@ def __init__(
self.schema, primary_attributes, custom_attributes
)
- self.primary_attributes: OrderedDict = (
- OrderedDict({**primary_attributes}) if primary_attributes else OrderedDict()
+ self.primary_attributes, self.custom_attributes = make_ordered_dict(
+ primary_attributes, custom_attributes, self.schema.attributes # type: ignore[arg-type]
)
- self.custom_attributes: dict = custom_attributes if custom_attributes else {}
def __del__(self) -> None:
del self.__adapter
@@ -459,25 +457,21 @@ def _create_from_meta(
attrs_schema = collection.varray_schema.attributes
else:
attrs_schema = collection.array_schema.attributes
- try:
- for attr in attrs_schema:
- attributes = (
- meta["primary_attributes"] if attr.primary else meta["custom_attributes"]
- )
-
- value = attributes[attr.name]
- if value is None and not attr.primary:
- attributes[attr.name] = value
- continue
- attributes[attr.name] = deserialize_attribute_value(value, attr.dtype, False)
+ try:
+ # To ensure the order of attributes
+ primary_attributes, custom_attributes = make_ordered_dict(
+ meta["primary_attributes"],
+ meta["custom_attributes"],
+ attrs_schema, # type: ignore[arg-type]
+ )
arr_params = {
"collection": collection,
"adapter": array_adapter,
"id_": meta["id"],
- "primary_attributes": meta.get("primary_attributes"),
- "custom_attributes": meta.get("custom_attributes"),
+ "primary_attributes": primary_attributes,
+ "custom_attributes": custom_attributes,
}
if varray_adapter:
arr_params.update({"adapter": varray_adapter, "array_adapter": array_adapter})
diff --git a/deker/ABC/base_locks.py b/deker/ABC/base_locks.py
index 590a94f..2d90782 100644
--- a/deker/ABC/base_locks.py
+++ b/deker/ABC/base_locks.py
@@ -15,7 +15,6 @@
# along with this program. If not, see .
"""Abstract interfaces for locks."""
-
from abc import ABC, abstractmethod
from functools import wraps
from pathlib import Path
@@ -31,16 +30,15 @@ class BaseLock(SelfLoggerMixin, ABC):
ALLOWED_TYPES: List[str] = []
lock: Optional[Union[Flock, Path]] = None
instance: Optional[Any] = None
+ args: Optional[List[Any]] = None
+ kwargs: Optional[Dict] = None
@abstractmethod
- def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Optional[Path]:
+ def get_path(self) -> Optional[Path]:
"""Get path to the lock file.
For Arrays it shall be .arrlock (array read lock) or path to the file (array write lock)
For VArrays there are no specific locks for reading, for writing - lock on .json
-
- :param func_args: Arguments of method call
- :param func_kwargs: Keyword arguments of method call
"""
pass
@@ -98,7 +96,7 @@ def _inner_method_logic(lock: "BaseLock", args: Sequence, kwargs: Dict, func: Ca
:param kwargs: Keyword arguments of decorated function
"""
lock.check_existing_lock(args, kwargs)
- path = lock.get_path(args, kwargs)
+ path = lock.get_path()
lock.acquire(path)
try:
# Logic is in the separate method, so we can override its behavior
@@ -129,7 +127,8 @@ def inner(*args: Sequence, **kwargs: Dict[str, Any]) -> Any:
lock = self.__class__()
# as we wrap methods, we should have access to 'self' objects
lock.instance = kwargs.get("self") or args[0]
-
+ lock.args = args
+ lock.kwargs = kwargs
# Check that we don't have lock on anything that besides methods that require lock
lock.check_type()
diff --git a/deker/errors.py b/deker/errors.py
index 2b169aa..596dcc1 100644
--- a/deker/errors.py
+++ b/deker/errors.py
@@ -107,15 +107,10 @@ class DekerSubsetError(DekerArrayError):
"""If something goes wrong while Subset managing."""
-class DekerVSubsetError(DekerSubsetError):
- """If something goes wrong while VSubset managing.
+class DekerExceptionGroup(DekerBaseApplicationError):
+ """If one or more threads finished with any exception.
- Regarding that VSubset's inner Subsets' processing
- is made in an Executor, this exception actually is
- an `exceptions messages group`.
-
- If one or more threads finished with any exception,
- name, message and reasonable tracebacks of all
+ Name, message and reasonable tracebacks of all
of these exceptions shall be collected in a list
and passed to this class along with some message.
@@ -144,3 +139,12 @@ def __str__(self) -> str:
class DekerMemoryError(DekerBaseApplicationError, MemoryError):
"""Early memory overflow exception."""
+
+
+class DekerVSubsetError(DekerSubsetError, DekerExceptionGroup):
+ """If something goes wrong while VSubset managing.
+
+ Regarding that VSubset's inner Subsets' processing
+ is made in an Executor, this exception actually is
+ an `exceptions messages group`.
+ """
diff --git a/deker/locks.py b/deker/locks.py
index 6319f34..1aeca16 100644
--- a/deker/locks.py
+++ b/deker/locks.py
@@ -23,7 +23,19 @@
from pathlib import Path
from threading import get_native_id
from time import sleep
-from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Callable,
+ Dict,
+ Generic,
+ List,
+ Optional,
+ Sequence,
+ Tuple,
+ TypeVar,
+ Union,
+)
from uuid import uuid4
from deker.ABC.base_locks import BaseLock
@@ -37,43 +49,107 @@
from deker_local_adapters import LocalArrayAdapter
from deker.arrays import Array, VArray
- from deker.types.private.classes import ArrayPositionedData
META_DIVIDER = ":"
+ArrayFromArgs = Union[Path, Union["Array", "VArray"]]
+T = TypeVar("T")
-class ReadArrayLock(BaseLock):
+def _get_lock_filename(id_: str, lock_ext: LocksExtensions) -> str:
+ """Get filename for lockfile.
+
+ :param id_: ID of array
+ :param lock_ext: Extension of lock
+ :return:
+ """
+ name = META_DIVIDER.join([f"{id_}", f"{uuid4()}", f"{os.getpid()}", f"{get_native_id()}"])
+ return str(name + lock_ext.value)
+
+
+def _check_write_locks(dir_path: Path, id_: str) -> bool:
+ """Checks write locks from VArrays that differs from current.
+
+ :param dir_path: Dir where locks are stored (the one with hdf file)
+ :param id_: Id of array
+ """
+ for file in dir_path.iterdir():
+ # Skip lock from current process.
+ # Used when you have to read meta inside .update operation of varray
+ if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"):
+ return True
+ # If we've found another varray lock, that not from current process.
+ if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore
+ raise DekerLockError(f"Array {id_} is locked with {file.name}")
+ return False
+
+
+class LockWithArrayMixin(Generic[T]):
+ """Base class with getter of array."""
+
+ args: Optional[List[Any]]
+ kwargs: Optional[Dict]
+ instance: Optional[Any]
+ is_locked_with_varray: bool = False
+
+ @property
+ def array_id(self) -> str:
+ """Get if from Array, or Path to the array."""
+ # Get instance of the array
+ if isinstance(self.array, Path):
+ path = self.array
+ id_ = path.name.split(".")[0]
+ else:
+ id_ = self.array.id # type: ignore[attr-defined]
+ return id_
+
+ @property
+ def array(self) -> T:
+ """Parse array from args and save ref to it."""
+ array = self.kwargs.get("array") or self.args[1] # zero arg is 'self'
+ return array
+
+ @property
+ def dir_path(self) -> Path:
+ """Path to directory with main file."""
+ return get_main_path(self.array_id, self.instance.collection_path / self.instance.data_dir)
+
+
+def wait_for_unlock(
+ check_func: Callable, check_func_args: tuple, timeout: int, interval: float
+) -> bool:
+ """Waiting while there is no locks.
+
+ :param check_func: Func that check if lock has been releases
+ :param check_func_args: Args for func
+ :param timeout: For how long we should wait lock release
+ :param interval: How often we check locks
+ :return:
+ """
+ start_time = time.monotonic()
+ while (time.monotonic() - start_time) <= timeout:
+ if check_func(*check_func_args):
+ return True
+ sleep(interval)
+ return False
+
+
+class ReadArrayLock(LockWithArrayMixin[ArrayFromArgs], BaseLock):
"""Read lock for Array."""
ALLOWED_TYPES = ["LocalArrayAdapter"]
- def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
+ def get_path(self) -> Path:
"""Get path to read-lock file.
It's only the case for arrays, varrays don't have read locks.
- :param func_args: arguments of method call
- :param func_kwargs: keyword arguments of method call
"""
- # Get instance of the array
- array: Union[Path, Union[Array, VArray]] = (
- func_kwargs.get("array") or func_args[1]
- ) # zero arg is 'self'
- if isinstance(array, Path):
- path = array
- id_ = path.name.split(".")[0]
- else:
- id_ = array.id
-
# Get file directory
- dir_path = get_main_path(id_, self.instance.collection_path / self.instance.data_dir)
- filename = (
- META_DIVIDER.join([f"{id_}", f"{uuid4()}", f"{os.getpid()}", f"{get_native_id()}"])
- + LocksExtensions.array_read_lock.value
- )
+ filename = _get_lock_filename(self.array_id, LocksExtensions.array_read_lock)
+
# Create read lock file path
- path = dir_path / f"{filename}"
+ path = self.dir_path / f"{filename}"
- self.logger.debug(f"Got path for array.id {id_} lock file: {path}")
+ self.logger.debug(f"Got path for array.id {self.array_id} lock file: {path}")
return path
def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
@@ -83,32 +159,22 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
:param func_args: arguments for called function.
:param func_kwargs: keyword arguments for called function.
"""
- array: Union[Path, Union[Array, VArray]] = (
- func_kwargs.get("array") or func_args[1]
- ) # zero arg is 'self'
- if isinstance(array, Path):
- path = array
- id_ = path.name.split(".")[0]
- else:
- id_ = array.id
-
- dir_path = get_main_path(id_, self.instance.collection_path / self.instance.data_dir)
- path = dir_path / (id_ + self.instance.file_ext)
- for file in dir_path.iterdir():
- # Skip lock from current process.
- if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"):
- self.is_locked_with_varray = True
- return
- # If we've found another varray lock, that not from current process.
- if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore
- raise DekerLockError(f"Array {array} is locked with {file.name}")
+ # Check write locks
+ if _check_write_locks(self.dir_path, self.array_id):
+ self.is_locked_with_varray = True
+ # File was locked with VArray from current process.
+ return
+ # No write locks found
+ path = self.dir_path / (self.array_id + self.instance.file_ext)
try:
with open(path, "r") as f:
fcntl.flock(f, fcntl.LOCK_SH | fcntl.LOCK_NB)
self.logger.debug(f"Set shared flock for {path}")
except BlockingIOError:
- raise DekerLockError(f"Array {id_} is locked for update operation, cannot be read.")
+ raise DekerLockError(
+ f"Array {self.array_id} is locked for update operation, cannot be read."
+ )
def acquire(self, path: Union[str, Path]) -> Any:
"""Read files will not be flocked - only created.
@@ -118,7 +184,7 @@ def acquire(self, path: Union[str, Path]) -> Any:
# Create lock file
self.lock = path
open(path, "a").close()
- self.logger.debug(f"Acquired lock for {self.lock}")
+ self.logger.debug(f"Acquired read lock for {self.lock}")
def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002]
"""Release lock by deleting file.
@@ -127,27 +193,19 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002]
"""
if self.lock and self.lock.exists():
self.lock.unlink()
+ self.logger.debug(f"Releasing read lock for {self.lock}")
self.lock = None
- self.logger.debug(f"Released lock for {self.lock}")
-class WriteArrayLock(BaseLock):
+class WriteArrayLock(LockWithArrayMixin["Array"], BaseLock):
"""Write lock for arrays."""
ALLOWED_TYPES = ["LocalArrayAdapter"]
- is_locked_with_varray: bool = False
-
- def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
- """Get path to the file for locking.
-
- :param func_args: arguments of method call
- :param func_kwargs: keyword arguments of method call
- """
- array = func_kwargs.get("array") or func_args[1] # zero arg is 'self'
- dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir)
- path = dir_path / (array.id + self.instance.file_ext)
- self.logger.debug(f"Got path for array.id {array.id} lock file: {path}")
+ def get_path(self) -> Path:
+ """Get path to the file for locking."""
+ path = self.dir_path / (self.array_id + self.instance.file_ext)
+ self.logger.debug(f"Got path for array.id {self.array.id} lock file: {path}")
return path
def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
@@ -156,38 +214,28 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
:param func_args: arguments of method call
:param func_kwargs: keyword arguments of method call
"""
- # Check current read locks
- array = func_kwargs.get("array") or func_args[1] # zero arg is 'self'
- dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir)
-
# If array belongs to varray, we should check if varray is also locked
- if array._vid:
- for file in dir_path.iterdir():
- # Skip lock from current process.
- if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"):
- self.is_locked_with_varray = True
- return
- # If we've found another varray lock, that not from current process.
- if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore
- raise DekerLockError(f"Array {array} is locked with {file.name}")
+ self.is_locked_with_varray = _check_write_locks(self.dir_path, self.array_id)
# Increment write lock, to prevent more read locks coming.
- self.acquire(self.get_path(func_args, func_kwargs))
+ self.acquire(self.get_path())
# Pattern that has to find any read locks
- glob_pattern = f"{array.id}:*{LocksExtensions.array_read_lock.value}"
+ glob_pattern = f"{self.array.id}:*{LocksExtensions.array_read_lock.value}"
# Wait till there are no more read locks
- start_time = time.monotonic()
- while (time.monotonic() - start_time) <= self.instance.ctx.config.write_lock_timeout:
- if not list(dir_path.rglob(glob_pattern)):
- return
-
- sleep(self.instance.ctx.config.write_lock_check_interval)
+ if wait_for_unlock(
+ lambda path, pattern: not list(path.rglob(pattern)),
+ (self.dir_path, glob_pattern),
+ self.instance.ctx.config.write_lock_timeout,
+ self.instance.ctx.config.write_lock_check_interval,
+ ):
+ # If all locks are released, go further
+ return
# If we hit the timeout, release write lock and raise DekerLockError
self.release()
- raise DekerLockError(f"Array {array} is locked with read locks")
+ raise DekerLockError(f"Array {self.array} is locked with read locks")
def release(self, e: Optional[Exception] = None) -> None:
"""Release Flock.
@@ -237,12 +285,8 @@ def check_type(self) -> None:
if not is_running_on_local:
self.skip_lock = True
- def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Optional[Path]: # noqa[ARG002]
- """Path of json Varray file.
-
- :param func_args: arguments of the function that has been called.
- :param func_kwargs: keyword arguments of the function that has been called.
- """
+ def get_path(self) -> Optional[Path]: # noqa[ARG002]
+ """Path of json Varray file."""
array = self.instance._VSubset__array
adapter = self.instance._VSubset__adapter
path = get_main_path(
@@ -255,7 +299,6 @@ def check_locks_for_array_and_set_flock(self, filename: Path) -> Flock:
"""Check if there is no read lock.
:param filename: Path to file that should be flocked
- :return:
"""
# Check read lock first
array_id = filename.name.split(".")[0]
@@ -273,20 +316,18 @@ def check_locks_for_array_and_set_flock(self, filename: Path) -> Flock:
def check_arrays_locks(
self,
- arrays_positions: List[ArrayPositionedData],
adapter: LocalArrayAdapter,
varray: VArray,
) -> List[Path]:
"""Check all Arrays that are in current VArray.
- :param arrays_positions: Arrays' positions in VArray
:param adapter: Array Adapter instance
:param varray: VArray
"""
currently_locked = []
collection = varray._VArray__collection # type: ignore[attr-defined]
- for array_position in arrays_positions:
+ for array_position in self.instance._VSubset__arrays:
filename = adapter._get_symlink_filename(
varray.id,
array_position.vposition,
@@ -314,25 +355,25 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
adapter = self.instance._VSubset__array_adapter
varray = self.instance._VSubset__array
- arrays_positions: List[ArrayPositionedData] = self.instance._VSubset__arrays
-
# Clear links to locks
self.locks = []
# Locks that have been acquired by third party process
# Iterate over Arrays in VArray and try to lock them. If locking fails - wait.
# If it fails again - release all locks.
- currently_locked = self.check_arrays_locks(arrays_positions, adapter, varray)
+ currently_locked = self.check_arrays_locks(adapter, varray)
if not currently_locked:
# Release array locks
return
# Wait till there are no more read locks
- start_time = time.monotonic()
- while (time.monotonic() - start_time) <= adapter.ctx.config.write_lock_timeout:
- if not self.check_arrays_locks(arrays_positions, adapter, varray):
- return
- sleep(adapter.ctx.config.write_lock_check_interval)
+ if wait_for_unlock(
+ check_func=lambda: not self.check_arrays_locks(adapter, varray),
+ check_func_args=tuple(),
+ timeout=adapter.ctx.config.write_lock_timeout,
+ interval=adapter.ctx.config.write_lock_check_interval,
+ ):
+ return
# Release all locks
self.release()
raise DekerLockError(f"VArray {varray} is locked")
@@ -376,39 +417,25 @@ def _inner_method_logic(
return super()._inner_method_logic(lock, args, kwargs, func)
-class CreateArrayLock(BaseLock):
+class CreateArrayLock(LockWithArrayMixin[Union["Array", "VArray", dict]], BaseLock):
"""Lock that we set when we want to create an array."""
ALLOWED_TYPES = ["LocalArrayAdapter", "LocalVArrayAdapter"]
path: Optional[Path] = None
- def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
- """Return path to the file that should be locked.
-
- :param func_args: arguments for called method
- :param func_kwargs: keyword arguments for called method
- :return:
- """
- array = func_kwargs.get("array") or func_args[1] # zero arg is 'self'
-
+ def get_path(self) -> Path:
+ """Return path to the file that should be locked."""
# Get file directory path
dir_path = self.instance.collection_path
- filename = META_DIVIDER.join(
- [
- f"{array.id}",
- f"{uuid4()}",
- f"{os.getpid()}",
- f"{get_native_id()}",
- ]
- )
+
# Create lock file
- path = dir_path / f"{filename}{LocksExtensions.array_lock.value}"
+ path = dir_path / _get_lock_filename(self.array.id, LocksExtensions.array_lock)
if not path.exists():
path.open("w").close()
self.path = path
- self.logger.debug(f"got path for array.id {array.id} lock file: {path}")
+ self.logger.debug(f"got path for array.id {self.array.id} lock file: {path}")
return path
def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
@@ -419,8 +446,8 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
"""
from deker.arrays import Array, VArray
+ array = self.array
# Check current read locks
- array = func_kwargs.get("array") or func_args[1] # zero arg is 'self'
if isinstance(array, dict):
adapter = array["adapter"].__class__.__name__
if adapter not in self.ALLOWED_TYPES:
@@ -435,7 +462,7 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
# Pattern that has to find any create locks
glob_pattern = f"{array.id}:*{LocksExtensions.array_lock.value}"
for _ in dir_path.rglob(glob_pattern):
- raise DekerLockError(f"Array {array} is locked for creating")
+ raise DekerLockError(f"Array {array.id} is locked for creating")
func_kwargs["array"] = array
@@ -464,24 +491,19 @@ def release(self, e: Optional[Exception] = None) -> None:
super().release(e)
-class UpdateMetaAttributeLock(BaseLock):
+class UpdateMetaAttributeLock(LockWithArrayMixin[Union["Array", "VArray"]], BaseLock):
"""Lock for updating meta."""
ALLOWED_TYPES = ["LocalArrayAdapter", "LocalVArrayAdapter"]
- def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
- """Return path to the file that should be locked.
-
- :param func_args: arguments for called method
- :param func_kwargs: keyword arguments for called method
- :return:
- """
- array = func_kwargs.get("array") or func_args[1] # zero arg is 'self'
-
+ def get_path(self) -> Path:
+ """Return path to the file that should be locked."""
# Get directory where file locates
- dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir)
- path = dir_path / f"{array.id}{self.instance.file_ext}"
- self.logger.debug(f"Got path for array.id {array.id} lock file: {path}")
+ dir_path = get_main_path(
+ self.array.id, self.instance.collection_path / self.instance.data_dir
+ )
+ path = dir_path / f"{self.array.id}{self.instance.file_ext}"
+ self.logger.debug(f"Got path for array.id {self.array.id} lock file: {path}")
return path
@@ -490,13 +512,9 @@ class CollectionLock(BaseLock):
ALLOWED_TYPES = ["LocalCollectionAdapter"]
- def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path: # noqa[ARG002]
- """Return path to collection lock file.
-
- :param func_args: arguments for called method
- :param func_kwargs: keyword arguments for called method
- """
- collection = func_args[1]
+ def get_path(self) -> Path: # noqa[ARG002]
+ """Return path to collection lock file."""
+ collection = self.args[1]
path = self.instance.collections_resource / (collection.name + ".lock")
self.logger.debug(f"Got path for collection {collection.name} lock file: {path}")
return path
diff --git a/deker/tools/attributes.py b/deker/tools/attributes.py
index 91f2d68..54f6957 100644
--- a/deker/tools/attributes.py
+++ b/deker/tools/attributes.py
@@ -15,14 +15,19 @@
# along with this program. If not, see .
import re
+from collections import OrderedDict
from datetime import datetime
-from typing import Any, Tuple, Type, Union
+from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Type, Union
import numpy as np
from deker_tools.time import get_utc
+if TYPE_CHECKING:
+ from deker import AttributeSchema
+
+
def serialize_attribute_value(
val: Any,
) -> Union[Tuple[str, int, float, tuple], str, int, float, tuple]:
@@ -112,3 +117,39 @@ def deserialize_attribute_nested_tuples(value: Tuple[Any, ...]) -> Tuple[Any, ..
value = deserialize_attribute_value(el, type(el), True)
deserialized.append(value)
return tuple(deserialized)
+
+
+def make_ordered_dict(
+ primary_attributes: Optional[dict],
+ custom_attributes: Optional[dict],
+ attrs_schema: Union[List["AttributeSchema"], Tuple["AttributeSchema", ...]],
+) -> Tuple[OrderedDict, OrderedDict]:
+ """Ensure that attributes in dict are located in correct order (Based on schema).
+
+ :param primary_attributes: Primary attributes dict
+ :param custom_attributes: Custom attributes dict
+ :param attrs_schema: Schema of attributes to get order
+ """
+ # To ensure the order of attributes
+ ordered_primary_attributes: OrderedDict = OrderedDict()
+ ordered_custom_attributes: OrderedDict = OrderedDict()
+
+ # Iterate over every attribute in schema:
+ for attr_schema in attrs_schema:
+ if attr_schema.primary:
+ attributes_from_meta = primary_attributes
+ result_attributes = ordered_primary_attributes
+ else:
+ attributes_from_meta = custom_attributes
+ result_attributes = ordered_custom_attributes
+
+ value = attributes_from_meta[attr_schema.name]
+ if value is None and not attr_schema.primary:
+ result_attributes[attr_schema.name] = value
+ continue
+
+ result_attributes[attr_schema.name] = deserialize_attribute_value(
+ value, attr_schema.dtype, False
+ )
+
+ return ordered_primary_attributes, ordered_custom_attributes
diff --git a/pyproject.toml b/pyproject.toml
index 869a727..de50866 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -229,6 +229,7 @@ ignore = [
"I202",
"N807",
"N813",
+ "N818",
"W503",
"W504",
]
diff --git a/tests/parameters/schemas_params.py b/tests/parameters/schemas_params.py
index 039c6d2..1263a13 100644
--- a/tests/parameters/schemas_params.py
+++ b/tests/parameters/schemas_params.py
@@ -569,7 +569,7 @@ def WRONG_params_dataclass_raises(cls) -> List[Any]:
*cls._generate_types(
base_dict={"dtype": dtype, "dimensions": dimensions, "vgrid": vgrid},
key="attributes",
- exception_types=[tuple, list],
+ exception_types=[tuple, list, None],
),
# wrong vgrid
*cls._generate_types(
diff --git a/tests/test_cases/test_arrays/test_array_methods.py b/tests/test_cases/test_arrays/test_array_methods.py
index 4c77993..080b8d2 100644
--- a/tests/test_cases/test_arrays/test_array_methods.py
+++ b/tests/test_cases/test_arrays/test_array_methods.py
@@ -1,17 +1,20 @@
import os
import string
+from copy import deepcopy
from datetime import datetime, timedelta, timezone
from pathlib import Path
+from random import shuffle
from typing import Any
import numpy as np
import pytest
-from deker_local_adapters import HDF5StorageAdapter
+from deker_local_adapters import HDF5StorageAdapter, LocalArrayAdapter, LocalVArrayAdapter
from deker_local_adapters.factory import AdaptersFactory
from numpy import ndarray
+from deker.types import ArrayMeta
from tests.parameters.array_params import attributes_validation_params
from tests.parameters.index_exp_params import invalid_index_params, valid_index_exp_params
from tests.parameters.uri import embedded_uri
@@ -784,6 +787,40 @@ def test_step_validator(self, array: Array, index_exp):
with pytest.raises(IndexError):
array[index_exp]
+ def test_create_from_meta_ordered(
+ self,
+ array_collection_with_attributes: Collection,
+ local_array_adapter: LocalArrayAdapter,
+ local_varray_adapter: LocalVArrayAdapter,
+ array_with_attributes: Array,
+ ):
+ meta: ArrayMeta = array_with_attributes.as_dict
+
+ primary_attribute_keys = list(meta["primary_attributes"].keys())
+ shuffle(primary_attribute_keys)
+
+ custom_attribute_keys = list(meta["custom_attributes"].keys())
+ shuffle(custom_attribute_keys)
+
+ primary_attributes, custom_attributes = {}, {}
+ for key in primary_attribute_keys:
+ primary_attributes[key] = meta["primary_attributes"][key]
+
+ for key in custom_attribute_keys:
+ custom_attributes[key] = meta["custom_attributes"][key]
+
+ meta["primary_attributes"] = primary_attributes
+ meta["custom_attributes"] = custom_attributes
+
+ array = Array._create_from_meta(
+ array_collection_with_attributes,
+ meta=meta,
+ array_adapter=local_array_adapter,
+ varray_adapter=None,
+ )
+ assert array.primary_attributes == array_with_attributes.primary_attributes
+ assert array.custom_attributes == array_with_attributes.custom_attributes
+
if __name__ == "__main__":
pytest.main()
diff --git a/tests/test_cases/test_arrays/test_varray_methods.py b/tests/test_cases/test_arrays/test_varray_methods.py
index 69871c4..7fc6807 100644
--- a/tests/test_cases/test_arrays/test_varray_methods.py
+++ b/tests/test_cases/test_arrays/test_varray_methods.py
@@ -3,16 +3,18 @@
from datetime import datetime, timedelta, timezone
from pathlib import Path
+from random import shuffle
from typing import Any
import numpy as np
import pytest
-from deker_local_adapters import HDF5StorageAdapter
+from deker_local_adapters import HDF5StorageAdapter, LocalArrayAdapter, LocalVArrayAdapter
from deker_local_adapters.factory import AdaptersFactory
from deker_tools.path import is_empty
from numpy import ndarray
+from deker.types import ArrayMeta
from tests.parameters.array_params import attributes_validation_params
from tests.parameters.index_exp_params import invalid_index_params, valid_index_exp_params
from tests.parameters.uri import embedded_uri
@@ -783,6 +785,40 @@ def test_step_validaor(self, varray: VArray, index_exp):
with pytest.raises(IndexError):
varray[index_exp]
+ def test_create_from_meta_ordered(
+ self,
+ varray_collection_with_attributes: Collection,
+ local_array_adapter: LocalArrayAdapter,
+ local_varray_adapter: LocalVArrayAdapter,
+ varray_with_attributes: VArray,
+ ):
+ meta: ArrayMeta = varray_with_attributes.as_dict
+
+ primary_attribute_keys = list(meta["primary_attributes"].keys())
+ shuffle(primary_attribute_keys)
+
+ custom_attribute_keys = list(meta["custom_attributes"].keys())
+ shuffle(custom_attribute_keys)
+
+ primary_attributes, custom_attributes = {}, {}
+ for key in primary_attribute_keys:
+ primary_attributes[key] = meta["primary_attributes"][key]
+
+ for key in custom_attribute_keys:
+ custom_attributes[key] = meta["custom_attributes"][key]
+
+ meta["primary_attributes"] = primary_attributes
+ meta["custom_attributes"] = custom_attributes
+
+ array = VArray._create_from_meta(
+ varray_collection_with_attributes,
+ meta=meta,
+ array_adapter=local_array_adapter,
+ varray_adapter=local_varray_adapter,
+ )
+ assert array.primary_attributes == varray_with_attributes.primary_attributes
+ assert array.custom_attributes == varray_with_attributes.custom_attributes
+
if __name__ == "__main__":
pytest.main()
diff --git a/tests/test_cases/test_client/test_client_methods.py b/tests/test_cases/test_client/test_client_methods.py
index f5b5182..8d63cc7 100644
--- a/tests/test_cases/test_client/test_client_methods.py
+++ b/tests/test_cases/test_client/test_client_methods.py
@@ -774,7 +774,9 @@ def test_client_get_locks_array_read_lock(
self, client: Client, params, read_array_lock, inserted_array
):
client.create_collection(**params)
- file = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array})
+ read_array_lock.args = []
+ read_array_lock.kwargs = {"array": inserted_array}
+ file = read_array_lock.get_path()
file.touch()
meta = file.name.split(":")
assert {
@@ -799,7 +801,9 @@ def test_client_get_locks_type_array_read_lock(
self, client: Client, params, read_array_lock, inserted_array
):
client.create_collection(**params)
- file = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array})
+ read_array_lock.args = []
+ read_array_lock.kwargs = {"array": inserted_array}
+ file = read_array_lock.get_path()
file.touch()
meta = file.name.split(":")
assert [
@@ -826,7 +830,9 @@ def test_client_get_locks_skip_lock(
self, client: Client, params, read_array_lock, inserted_array
):
client.create_collection(**params)
- file = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array})
+ read_array_lock.args = []
+ read_array_lock.kwargs = {"array": inserted_array}
+ file = read_array_lock.get_path()
open(f"{file}:{os.getpid()}{LocksExtensions.varray_lock.value}", "w").close()
assert not client._get_locks(lock_type=LocksTypes.varray_lock)
@@ -852,7 +858,9 @@ def test_client_clear_locks_collection_empty(self, client: Client, params):
def test_client_clear_locks_inserted_array(
self, client: Client, root_path, read_array_lock, inserted_array
):
- filepath = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array})
+ read_array_lock.args = []
+ read_array_lock.kwargs = {"array": inserted_array}
+ filepath = read_array_lock.get_path()
filepath.touch()
assert list(Path.rglob(root_path, f"*{LocksExtensions.array_read_lock.value}"))
client.clear_locks()
@@ -861,7 +869,8 @@ def test_client_clear_locks_inserted_array(
def test_client_clear_locks_inserted_array_multiple_locks(
self, client: Client, root_path, read_array_lock, inserted_array
):
- filepath = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array})
+ read_array_lock.kwargs = {"array": inserted_array}
+ filepath = read_array_lock.get_path()
open(f"{filepath}:{os.getpid()}{LocksExtensions.varray_lock.value}", "w").close()
filepath.touch()
assert list(Path.rglob(root_path, f"*{LocksExtensions.array_read_lock.value}"))
diff --git a/tests/test_cases/test_locking/test_locks.py b/tests/test_cases/test_locking/test_locks.py
index 34f19fb..f9b2a9a 100644
--- a/tests/test_cases/test_locking/test_locks.py
+++ b/tests/test_cases/test_locking/test_locks.py
@@ -93,9 +93,9 @@ def test_read_array_lock_path(
)
)
)
- assert pattern.match(
- str(read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array}))
- )
+ read_array_lock.args = []
+ read_array_lock.kwargs = {"array": inserted_array}
+ assert pattern.match(str(read_array_lock.get_path()))
def test_read_array_check_existing_lock(
self,
@@ -110,6 +110,7 @@ def test_read_array_check_existing_lock(
hdf_path = dir_path / (inserted_array.id + local_array_adapter.file_ext)
with Flock(hdf_path):
with pytest.raises(DekerLockError):
+ read_array_lock.kwargs = {"array": inserted_array}
read_array_lock.check_existing_lock(
func_args=[], func_kwargs={"array": inserted_array}
)
@@ -128,7 +129,8 @@ def test_read_array_locks_create_files(
mocker.patch("deker.locks.uuid4", lambda: uuid)
# Get path of the file that should be created
- filepath = read_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array})
+ read_array_lock.kwargs = {"array": inserted_array}
+ filepath = read_array_lock.get_path()
# Mocked method to test
def check_file(self, *args, **kwargs):
@@ -175,10 +177,8 @@ def test_write_array_lock_lock_path(
inserted_array.id, local_array_adapter.collection_path / local_array_adapter.data_dir
)
hdf_path = dir_path / (inserted_array.id + local_array_adapter.file_ext)
-
- assert hdf_path == write_array_lock.get_path(
- func_args=[], func_kwargs={"array": inserted_array}
- )
+ write_array_lock.kwargs = {"array": inserted_array}
+ assert hdf_path == write_array_lock.get_path()
def test_write_array_lock_check_existing_lock_no_read_locks(
self,
@@ -191,7 +191,7 @@ def test_write_array_lock_check_existing_lock_no_read_locks(
# Mock acquire as we do not want to lock file here.
acquire = mocker.patch.object(write_array_lock, "acquire", autospec=True)
release = mocker.patch.object(write_array_lock, "release", autospec=True)
-
+ write_array_lock.kwargs = {"array": inserted_array}
write_array_lock.check_existing_lock(func_args=[], func_kwargs={"array": inserted_array})
acquire.assert_called()
release.assert_not_called()
@@ -213,7 +213,8 @@ def test_write_array_lock_check_existing_lock_read_locks_success(
release = mocker.patch.object(write_array_lock, "release", autospec=True)
# Make read lock
- filepath = write_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array})
+ write_array_lock.kwargs = {"array": inserted_array}
+ filepath = write_array_lock.get_path()
process = Process(target=make_read_lock, args=(filepath, inserted_array.id, file_created))
process.start()
@@ -242,12 +243,14 @@ def test_write_array_lock_check_existing_lock_read_locks_fail(
release = mocker.patch.object(write_array_lock, "release", autospec=True)
# Make read lock
- filepath = write_array_lock.get_path(func_args=[], func_kwargs={"array": inserted_array})
+ write_array_lock.kwargs = {"array": inserted_array}
+ filepath = write_array_lock.get_path()
process = Process(target=make_read_lock, args=(filepath, inserted_array.id, file_created))
process.start()
# Call check existing
with pytest.raises(DekerLockError):
+ write_array_lock.kwargs = {"array": inserted_array}
write_array_lock.check_existing_lock(
func_args=[], func_kwargs={"array": inserted_array}
)
@@ -308,7 +311,8 @@ def test_check_get_path(
path = get_main_path(
inserted_varray.id, varray_collection.path / local_varray_adapter.data_dir
) / (inserted_varray.id + local_varray_adapter.file_ext)
- assert path == write_varray_lock.get_path([], {})
+ write_varray_lock.kwargs = {"array": inserted_varray}
+ assert path == write_varray_lock.get_path()
def test_check_existing_locks_fail(
self,