Skip to content

Commit

Permalink
VArrayWriteLock does not acquire lock on json; DekerMemoryError messa…
Browse files Browse the repository at this point in the history
…ge improved
  • Loading branch information
Sergey Rybakov committed Dec 5, 2023
1 parent 33d4870 commit e9368a8
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 17 deletions.
4 changes: 1 addition & 3 deletions deker/ABC/base_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,7 @@ def as_dict(self) -> dict:
return {
"dimensions": tuple(dim.as_dict for dim in self.dimensions),
"dtype": dtype,
"attributes": tuple(attr.as_dict for attr in self.attributes)
if self.attributes is not None
else tuple(),
"attributes": tuple(attr.as_dict for attr in self.attributes),
"fill_value": fill_value,
}
except (KeyError, ValueError) as e:
Expand Down
12 changes: 8 additions & 4 deletions deker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,20 @@ def __init__(
try:
set_logging_level(loglevel.upper())
self.__get_plugins()
mem_limit = convert_human_memory_to_bytes(memory_limit)
total_available_mem = virtual_memory().total + swap_memory().total
memory_limit = convert_human_memory_to_bytes(memory_limit)
if memory_limit >= total_available_mem or memory_limit <= 0:
mem_limit = total_available_mem
else:
mem_limit = memory_limit

self.__config = DekerConfig( # type: ignore[call-arg]
uri=uri,
workers=workers if workers is not None else cpu_count() + 4,
write_lock_timeout=write_lock_timeout,
write_lock_check_interval=write_lock_check_interval,
loglevel=loglevel.upper(),
memory_limit=(
virtual_memory().total + swap_memory().total if mem_limit <= 0 else mem_limit
),
memory_limit=mem_limit,
)
self.__uri: Uri = Uri.create(self.__config.uri)
self.__is_closed: bool = True
Expand Down
19 changes: 17 additions & 2 deletions deker/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,13 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:


class WriteVarrayLock(BaseLock):
"""Write lock for VArrays."""
"""Write lock for VArrays.
VArray shall not be locked itself when writing data.
Only inner Arrays shall be locked.
If updating subsets do not intersect - it's OK, otherwise the first,
which managed to obtain all Array locks, will survive.
"""

ALLOWED_TYPES = ["VSubset"]

Expand Down Expand Up @@ -310,18 +316,27 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002]
Path(f"{lock}:{os.getpid()}{LocksExtensions.varray_lock.value}").unlink(missing_ok=True)
super().release()

def acquire(self, path: Optional[Path]) -> None:
"""VArray shall not be locked itself.
:param path: path to the file to be locked
"""
pass

@staticmethod
def _inner_method_logic(
lock: "WriteVarrayLock", args: Sequence, kwargs: Dict, func: Callable
) -> Any:
"""Logic of acquiring lock and getting result.
When writing in VArray
:param lock: The lock that will be acquired
:param func: decorated function
:param args: arguments of decorated function
:param kwargs: keyword arguments of decorated function
"""
# If we want to skip logic of lock (e.g when we use server adapters)
# If we want to skip logic of lock (e.g. when we use server adapters)
if lock.skip_lock:
return lock.get_result(func, args, kwargs)
return super()._inner_method_logic(lock, args, kwargs, func)
Expand Down
7 changes: 5 additions & 2 deletions deker/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ def __common_arrays_attributes_post_init__(self: BaseArraysSchema) -> None: # n
:param self: ArraySchema or VArraySchema instance
"""
if self.attributes is None:
self.attributes = tuple()

if not isinstance(self.attributes, (tuple, list)):
raise DekerValidationError("Attributes shall be a list or tuple of AttributeSchema")
if any(not isinstance(a, AttributeSchema) for a in self.attributes):
Expand Down Expand Up @@ -333,7 +336,7 @@ class VArraySchema(SelfLoggerMixin, BaseArraysSchema):
vgrid: Optional[Union[List[int], Tuple[int, ...]]] = None
arrays_shape: Optional[Union[List[int], Tuple[int, ...]]] = None
fill_value: Union[Numeric, type(np.nan), None] = None # type: ignore[valid-type]
attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...]] = tuple()
attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...], None] = None

def __attrs_post_init__(self) -> None:
"""Validate schema, convert `vgrid` or `arrays_shape` to tuple and calculate the other grid splitter."""
Expand Down Expand Up @@ -423,7 +426,7 @@ class ArraySchema(SelfLoggerMixin, BaseArraysSchema):
"""

fill_value: Union[Numeric, type(np.nan), None] = None # type: ignore[valid-type]
attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...]] = tuple()
attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...], None] = None

def __attrs_post_init__(self) -> None:
"""Validate schema."""
Expand Down
25 changes: 21 additions & 4 deletions deker/tools/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ def convert_human_memory_to_bytes(memory_limit: Union[int, str]) -> int:
:param memory_limit: memory limit provided by the user
"""
bytes_ = 1024
mapping: Dict[str, int] = {"k": bytes_, "m": bytes_**2, "g": bytes_**3}
mapping: Dict[str, int] = {"k": bytes_, "m": bytes_**2, "g": bytes_**3, "t": bytes_**4}
error = (
f"invalid memory_limit value: {memory_limit}; expected `int` or `str` in format [number][unit] "
f'where unit is one of ["k", "K", "m", "M", "g", "G"], e.g. "8G" or "512m"'
f'where unit is one of ["k", "K", "m", "M", "g", "G", "t", "T"], e.g. "8G" or "512m"'
)
if not isinstance(memory_limit, (int, str)):
raise DekerValidationError(error)
Expand Down Expand Up @@ -81,15 +81,32 @@ def check_memory(shape: tuple, dtype: type, mem_limit_from_settings: int) -> Non
array_size_bytes = np.dtype(dtype).itemsize * array_values
array_size_human = convert_size_to_human(array_size_bytes)

total_machine_mem = virtual_memory().total + swap_memory().total
total_human_mem = convert_size_to_human(total_machine_mem)
current_limit = virtual_memory().available + swap_memory().free
limit = min(mem_limit_from_settings, current_limit)
limit_human = convert_size_to_human(limit)
config_human_limit = convert_size_to_human(mem_limit_from_settings)

deker_limit = f"{config_human_limit} ({mem_limit_from_settings} bytes). "
limit_message = (
"Deker Client memory usage is limited to the total memory available on this machine: "
+ deker_limit
)
advise = ""
if total_machine_mem > mem_limit_from_settings:
limit_message = (
f"Total memory available on this machine is {total_human_mem} ({total_machine_mem} bytes). "
f"Deker Client memory usage is manually limited to " + deker_limit
)
advise = " Also, you may try to increase the value of Deker Client memory limit."

if array_size_bytes > limit:
raise DekerMemoryError(
f"Can not allocate {array_size_human} for array/subset with shape {shape} and dtype {dtype}. "
f"Current Deker limit per array/subset is {limit_human}. Value in config: {mem_limit_from_settings} bytes. "
f"Reduce shape or dtype of your array/subset or increase Deker RAM limit."
f"{limit_message}"
f"Current available free memory per array/subset is {limit_human} ({limit} bytes). "
f"Reduce your schema or the `shape` of your subset or revise other processes memory usage.{advise}"
)


Expand Down
4 changes: 2 additions & 2 deletions tests/parameters/schemas_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ def _generate_types(
datetime.now(),
Scale(0.1, 0.2),
]:
if exception_types and type(item) in exception_types:
if exception_types and type(item) in exception_types or item is None:
continue
result.append({**base_dict.copy(), key: item})
if key == "scale":
Expand All @@ -545,7 +545,7 @@ def WRONG_params_dataclass_raises(cls) -> List[Any]:
*cls._generate_types(
base_dict={"dtype": dtype, "dimensions": dimensions},
key="attributes",
exception_types=[tuple, list],
exception_types=[tuple, list, NoneType],
),
]

Expand Down

0 comments on commit e9368a8

Please sign in to comment.