From e9368a800f0270e8354daf1161e692b217646345 Mon Sep 17 00:00:00 2001 From: Sergey Rybakov Date: Tue, 5 Dec 2023 12:33:29 +0200 Subject: [PATCH] VArrayWriteLock does not acquire lock on json; DekerMemoryError message improved --- deker/ABC/base_schemas.py | 4 +--- deker/client.py | 12 ++++++++---- deker/locks.py | 19 +++++++++++++++++-- deker/schemas.py | 7 +++++-- deker/tools/array.py | 25 +++++++++++++++++++++---- tests/parameters/schemas_params.py | 4 ++-- 6 files changed, 54 insertions(+), 17 deletions(-) diff --git a/deker/ABC/base_schemas.py b/deker/ABC/base_schemas.py index 95f0310..1ee0009 100644 --- a/deker/ABC/base_schemas.py +++ b/deker/ABC/base_schemas.py @@ -177,9 +177,7 @@ def as_dict(self) -> dict: return { "dimensions": tuple(dim.as_dict for dim in self.dimensions), "dtype": dtype, - "attributes": tuple(attr.as_dict for attr in self.attributes) - if self.attributes is not None - else tuple(), + "attributes": tuple(attr.as_dict for attr in self.attributes), "fill_value": fill_value, } except (KeyError, ValueError) as e: diff --git a/deker/client.py b/deker/client.py index 75ac531..06c6897 100644 --- a/deker/client.py +++ b/deker/client.py @@ -214,16 +214,20 @@ def __init__( try: set_logging_level(loglevel.upper()) self.__get_plugins() - mem_limit = convert_human_memory_to_bytes(memory_limit) + total_available_mem = virtual_memory().total + swap_memory().total + memory_limit = convert_human_memory_to_bytes(memory_limit) + if memory_limit >= total_available_mem or memory_limit <= 0: + mem_limit = total_available_mem + else: + mem_limit = memory_limit + self.__config = DekerConfig( # type: ignore[call-arg] uri=uri, workers=workers if workers is not None else cpu_count() + 4, write_lock_timeout=write_lock_timeout, write_lock_check_interval=write_lock_check_interval, loglevel=loglevel.upper(), - memory_limit=( - virtual_memory().total + swap_memory().total if mem_limit <= 0 else mem_limit - ), + memory_limit=mem_limit, ) self.__uri: Uri = Uri.create(self.__config.uri) self.__is_closed: bool = True diff --git a/deker/locks.py b/deker/locks.py index 212f666..eba4461 100644 --- a/deker/locks.py +++ b/deker/locks.py @@ -181,7 +181,13 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None: class WriteVarrayLock(BaseLock): - """Write lock for VArrays.""" + """Write lock for VArrays. + + VArray shall not be locked itself when writing data. + Only inner Arrays shall be locked. + If updating subsets do not intersect - it's OK, otherwise the first, + which managed to obtain all Array locks, will survive. + """ ALLOWED_TYPES = ["VSubset"] @@ -310,18 +316,27 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002] Path(f"{lock}:{os.getpid()}{LocksExtensions.varray_lock.value}").unlink(missing_ok=True) super().release() + def acquire(self, path: Optional[Path]) -> None: + """VArray shall not be locked itself. + + :param path: path to the file to be locked + """ + pass + @staticmethod def _inner_method_logic( lock: "WriteVarrayLock", args: Sequence, kwargs: Dict, func: Callable ) -> Any: """Logic of acquiring lock and getting result. + When writing in VArray + :param lock: The lock that will be acquired :param func: decorated function :param args: arguments of decorated function :param kwargs: keyword arguments of decorated function """ - # If we want to skip logic of lock (e.g when we use server adapters) + # If we want to skip logic of lock (e.g. when we use server adapters) if lock.skip_lock: return lock.get_result(func, args, kwargs) return super()._inner_method_logic(lock, args, kwargs, func) diff --git a/deker/schemas.py b/deker/schemas.py index 53972ec..c83369b 100644 --- a/deker/schemas.py +++ b/deker/schemas.py @@ -265,6 +265,9 @@ def __common_arrays_attributes_post_init__(self: BaseArraysSchema) -> None: # n :param self: ArraySchema or VArraySchema instance """ + if self.attributes is None: + self.attributes = tuple() + if not isinstance(self.attributes, (tuple, list)): raise DekerValidationError("Attributes shall be a list or tuple of AttributeSchema") if any(not isinstance(a, AttributeSchema) for a in self.attributes): @@ -333,7 +336,7 @@ class VArraySchema(SelfLoggerMixin, BaseArraysSchema): vgrid: Optional[Union[List[int], Tuple[int, ...]]] = None arrays_shape: Optional[Union[List[int], Tuple[int, ...]]] = None fill_value: Union[Numeric, type(np.nan), None] = None # type: ignore[valid-type] - attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...]] = tuple() + attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...], None] = None def __attrs_post_init__(self) -> None: """Validate schema, convert `vgrid` or `arrays_shape` to tuple and calculate the other grid splitter.""" @@ -423,7 +426,7 @@ class ArraySchema(SelfLoggerMixin, BaseArraysSchema): """ fill_value: Union[Numeric, type(np.nan), None] = None # type: ignore[valid-type] - attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...]] = tuple() + attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...], None] = None def __attrs_post_init__(self) -> None: """Validate schema.""" diff --git a/deker/tools/array.py b/deker/tools/array.py index d122a43..59f33c4 100644 --- a/deker/tools/array.py +++ b/deker/tools/array.py @@ -43,10 +43,10 @@ def convert_human_memory_to_bytes(memory_limit: Union[int, str]) -> int: :param memory_limit: memory limit provided by the user """ bytes_ = 1024 - mapping: Dict[str, int] = {"k": bytes_, "m": bytes_**2, "g": bytes_**3} + mapping: Dict[str, int] = {"k": bytes_, "m": bytes_**2, "g": bytes_**3, "t": bytes_**4} error = ( f"invalid memory_limit value: {memory_limit}; expected `int` or `str` in format [number][unit] " - f'where unit is one of ["k", "K", "m", "M", "g", "G"], e.g. "8G" or "512m"' + f'where unit is one of ["k", "K", "m", "M", "g", "G", "t", "T"], e.g. "8G" or "512m"' ) if not isinstance(memory_limit, (int, str)): raise DekerValidationError(error) @@ -81,15 +81,32 @@ def check_memory(shape: tuple, dtype: type, mem_limit_from_settings: int) -> Non array_size_bytes = np.dtype(dtype).itemsize * array_values array_size_human = convert_size_to_human(array_size_bytes) + total_machine_mem = virtual_memory().total + swap_memory().total + total_human_mem = convert_size_to_human(total_machine_mem) current_limit = virtual_memory().available + swap_memory().free limit = min(mem_limit_from_settings, current_limit) limit_human = convert_size_to_human(limit) + config_human_limit = convert_size_to_human(mem_limit_from_settings) + + deker_limit = f"{config_human_limit} ({mem_limit_from_settings} bytes). " + limit_message = ( + "Deker Client memory usage is limited to the total memory available on this machine: " + + deker_limit + ) + advise = "" + if total_machine_mem > mem_limit_from_settings: + limit_message = ( + f"Total memory available on this machine is {total_human_mem} ({total_machine_mem} bytes). " + f"Deker Client memory usage is manually limited to " + deker_limit + ) + advise = " Also, you may try to increase the value of Deker Client memory limit." if array_size_bytes > limit: raise DekerMemoryError( f"Can not allocate {array_size_human} for array/subset with shape {shape} and dtype {dtype}. " - f"Current Deker limit per array/subset is {limit_human}. Value in config: {mem_limit_from_settings} bytes. " - f"Reduce shape or dtype of your array/subset or increase Deker RAM limit." + f"{limit_message}" + f"Current available free memory per array/subset is {limit_human} ({limit} bytes). " + f"Reduce your schema or the `shape` of your subset or revise other processes memory usage.{advise}" ) diff --git a/tests/parameters/schemas_params.py b/tests/parameters/schemas_params.py index c3e7f05..b22914a 100644 --- a/tests/parameters/schemas_params.py +++ b/tests/parameters/schemas_params.py @@ -520,7 +520,7 @@ def _generate_types( datetime.now(), Scale(0.1, 0.2), ]: - if exception_types and type(item) in exception_types: + if exception_types and type(item) in exception_types or item is None: continue result.append({**base_dict.copy(), key: item}) if key == "scale": @@ -545,7 +545,7 @@ def WRONG_params_dataclass_raises(cls) -> List[Any]: *cls._generate_types( base_dict={"dtype": dtype, "dimensions": dimensions}, key="attributes", - exception_types=[tuple, list], + exception_types=[tuple, list, NoneType], ), ]