Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uint support + varray lock upgrade #30

Merged
merged 6 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deker/ABC/base_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def create_collection_from_meta(
schema_class = SchemaTypeEnum[collection_data.get("type")].value

try:
dtype = DTypeEnum[data["dtype"].lstrip("numpy.")].value
dtype = DTypeEnum[data["dtype"].split("numpy.")[-1]].value
fill_value = (
dtype(data["fill_value"]) if data["fill_value"] is not None else data["fill_value"]
)
Expand Down
15 changes: 10 additions & 5 deletions deker/ABC/base_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from deker.errors import DekerInvalidSchemaError, DekerValidationError
from deker.tools.schema import get_default_fill_value
from deker.types.private.enums import DTypeEnum
from deker.types.private.typings import Numeric
from deker.types.private.typings import Numeric, NumericDtypes


@dataclass(repr=True)
Expand Down Expand Up @@ -90,7 +90,7 @@ class BaseArraysSchema:
dtype: Type[Numeric]
fill_value: Union[Numeric, type(np.nan), None] # type: ignore[valid-type]
dimensions: Union[List[BaseDimensionSchema], Tuple[BaseDimensionSchema, ...]]
attributes: Union[List[BaseAttributeSchema], Tuple[BaseAttributeSchema, ...]]
attributes: Union[List[BaseAttributeSchema], Tuple[BaseAttributeSchema, ...], None]

@property
def primary_attributes(self) -> Optional[Tuple[BaseAttributeSchema, ...]]:
Expand Down Expand Up @@ -121,6 +121,9 @@ def __attrs_post_init__(self) -> None:
if len({d.name for d in self.dimensions}) < len(self.dimensions):
raise DekerValidationError("Dimensions shall have unique names")

if self.dtype not in NumericDtypes:
raise DekerValidationError(f"Invalid dtype {self.dtype}")

try:
if self.dtype == int:
self.dtype = np.int64
Expand Down Expand Up @@ -163,6 +166,10 @@ def named_shape(self) -> Tuple[Tuple[str, int], ...]:
@property
def as_dict(self) -> dict:
"""Serialize as dict."""
error = f'Schema "{self.__class__.__name__}" is invalid/corrupted: '

if self.dtype not in NumericDtypes:
raise DekerInvalidSchemaError(error + f"wrong dtype {self.dtype}")
try:
dtype = DTypeEnum.get_name(DTypeEnum(self.dtype))
fill_value = None if np.isnan(self.fill_value) else str(self.fill_value) # type: ignore[arg-type]
Expand All @@ -174,6 +181,4 @@ def as_dict(self) -> dict:
"fill_value": fill_value,
}
except (KeyError, ValueError) as e:
raise DekerInvalidSchemaError(
f'Schema "{self.__class__.__name__}" is invalid/corrupted: {e}'
)
raise DekerInvalidSchemaError(error + str(e))
12 changes: 8 additions & 4 deletions deker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,20 @@ def __init__(
try:
set_logging_level(loglevel.upper())
self.__get_plugins()
mem_limit = convert_human_memory_to_bytes(memory_limit)
total_available_mem = virtual_memory().total + swap_memory().total
memory_limit = convert_human_memory_to_bytes(memory_limit)
if memory_limit >= total_available_mem or memory_limit <= 0:
matveyvarg marked this conversation as resolved.
Show resolved Hide resolved
mem_limit = total_available_mem
else:
mem_limit = memory_limit

self.__config = DekerConfig( # type: ignore[call-arg]
uri=uri,
workers=workers if workers is not None else cpu_count() + 4,
write_lock_timeout=write_lock_timeout,
write_lock_check_interval=write_lock_check_interval,
loglevel=loglevel.upper(),
memory_limit=(
virtual_memory().total + swap_memory().total if mem_limit <= 0 else mem_limit
),
memory_limit=mem_limit,
)
self.__uri: Uri = Uri.create(self.__config.uri)
self.__is_closed: bool = True
Expand Down
19 changes: 17 additions & 2 deletions deker/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,13 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:


class WriteVarrayLock(BaseLock):
"""Write lock for VArrays."""
"""Write lock for VArrays.

VArray shall not be locked itself when writing data.
Only inner Arrays shall be locked.
If updating subsets do not intersect - it's OK, otherwise the first,
which managed to obtain all Array locks, will survive.
"""

ALLOWED_TYPES = ["VSubset"]

Expand Down Expand Up @@ -310,18 +316,27 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002]
Path(f"{lock}:{os.getpid()}{LocksExtensions.varray_lock.value}").unlink(missing_ok=True)
super().release()

def acquire(self, path: Optional[Path]) -> None:
"""VArray shall not lock itself.

:param path: path to the file to be locked
"""
pass

@staticmethod
def _inner_method_logic(
lock: "WriteVarrayLock", args: Sequence, kwargs: Dict, func: Callable
) -> Any:
"""Logic of acquiring lock and getting result.

When writing in VArray

:param lock: The lock that will be acquired
:param func: decorated function
:param args: arguments of decorated function
:param kwargs: keyword arguments of decorated function
"""
# If we want to skip logic of lock (e.g when we use server adapters)
# If we want to skip logic of lock (e.g. when we use server adapters)
if lock.skip_lock:
return lock.get_result(func, args, kwargs)
return super()._inner_method_logic(lock, args, kwargs, func)
Expand Down
7 changes: 5 additions & 2 deletions deker/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ def __common_arrays_attributes_post_init__(self: BaseArraysSchema) -> None: # n

:param self: ArraySchema or VArraySchema instance
"""
if self.attributes is None:
self.attributes = tuple()

if not isinstance(self.attributes, (tuple, list)):
raise DekerValidationError("Attributes shall be a list or tuple of AttributeSchema")
if any(not isinstance(a, AttributeSchema) for a in self.attributes):
Expand Down Expand Up @@ -333,7 +336,7 @@ class VArraySchema(SelfLoggerMixin, BaseArraysSchema):
vgrid: Optional[Union[List[int], Tuple[int, ...]]] = None
arrays_shape: Optional[Union[List[int], Tuple[int, ...]]] = None
fill_value: Union[Numeric, type(np.nan), None] = None # type: ignore[valid-type]
attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...]] = tuple()
attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...], None] = None

def __attrs_post_init__(self) -> None:
"""Validate schema, convert `vgrid` or `arrays_shape` to tuple and calculate the other grid splitter."""
Expand Down Expand Up @@ -423,7 +426,7 @@ class ArraySchema(SelfLoggerMixin, BaseArraysSchema):
"""

fill_value: Union[Numeric, type(np.nan), None] = None # type: ignore[valid-type]
attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...]] = tuple()
attributes: Union[List[AttributeSchema], Tuple[AttributeSchema, ...], None] = None

def __attrs_post_init__(self) -> None:
"""Validate schema."""
Expand Down
25 changes: 21 additions & 4 deletions deker/tools/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ def convert_human_memory_to_bytes(memory_limit: Union[int, str]) -> int:
:param memory_limit: memory limit provided by the user
"""
bytes_ = 1024
mapping: Dict[str, int] = {"k": bytes_, "m": bytes_**2, "g": bytes_**3}
mapping: Dict[str, int] = {"k": bytes_, "m": bytes_**2, "g": bytes_**3, "t": bytes_**4}
error = (
f"invalid memory_limit value: {memory_limit}; expected `int` or `str` in format [number][unit] "
f'where unit is one of ["k", "K", "m", "M", "g", "G"], e.g. "8G" or "512m"'
f'where unit is one of ["k", "K", "m", "M", "g", "G", "t", "T"], e.g. "8G" or "512m"'
)
if not isinstance(memory_limit, (int, str)):
raise DekerValidationError(error)
Expand Down Expand Up @@ -81,15 +81,32 @@ def check_memory(shape: tuple, dtype: type, mem_limit_from_settings: int) -> Non
array_size_bytes = np.dtype(dtype).itemsize * array_values
array_size_human = convert_size_to_human(array_size_bytes)

total_machine_mem = virtual_memory().total + swap_memory().total
total_human_mem = convert_size_to_human(total_machine_mem)
current_limit = virtual_memory().available + swap_memory().free
limit = min(mem_limit_from_settings, current_limit)
limit_human = convert_size_to_human(limit)
config_human_limit = convert_size_to_human(mem_limit_from_settings)

deker_limit = f"{config_human_limit} ({mem_limit_from_settings} bytes). "
limit_message = (
"Deker Client memory usage is limited to the total memory available on this machine: "
+ deker_limit
)
advise = ""
if total_machine_mem > mem_limit_from_settings:
limit_message = (
f"Total memory available on this machine is {total_human_mem} ({total_machine_mem} bytes). "
f"Deker Client memory usage is manually limited to " + deker_limit
)
advise = " Also, you may try to increase the value of Deker Client memory limit."

if array_size_bytes > limit:
raise DekerMemoryError(
f"Can not allocate {array_size_human} for array/subset with shape {shape} and dtype {dtype}. "
f"Current Deker limit per array/subset is {limit_human}. Value in config: {mem_limit_from_settings}"
f"Reduce shape or dtype of your array/subset or increase Deker RAM limit."
f"{limit_message}"
f"Current available free memory per array/subset is {limit_human} ({limit} bytes). "
f"Reduce your schema or the `shape` of your subset or revise other processes memory usage.{advise}"
)


Expand Down
2 changes: 1 addition & 1 deletion deker/tools/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def create_attributes_schema(attributes_schemas: List[dict]) -> Tuple["Attribute
attributes = []
try:
for params in attributes_schemas:
dtype = DTypeEnum[params["dtype"].lstrip("numpy.")].value
dtype = DTypeEnum[params["dtype"].split("numpy.")[-1]].value
attr_schema = AttributeSchema(**{**params, "dtype": dtype})
attributes.append(attr_schema)
return tuple(attributes)
Expand Down
11 changes: 11 additions & 0 deletions deker/types/private/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,18 @@ class DTypeEnum(Enum):
int16 = np.int16
int32 = np.int32
int64 = np.int64
int_ = np.int64
longlong = np.longlong
uint = np.uint
uint8 = np.uint8
ubyte = np.ubyte
uint16 = np.uint16
uint32 = np.uint32
uintc = np.uintc
uint64 = np.uint64
uintp = np.uintp
ushort = np.ushort
ulonglong = np.ulonglong
float16 = np.float16
float32 = np.float32
float64 = np.float64
Expand Down
22 changes: 22 additions & 0 deletions deker/types/private/typings.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,18 @@
np.int16,
np.int32,
np.int64,
np.uint,
np.uint8,
np.ubyte,
np.uint16,
np.uint32,
np.uintc,
np.uint64,
np.uintp,
np.ushort,
np.ulonglong,
np.float16,
np.float32,
np.float64,
np.float128,
np.longfloat,
Expand All @@ -76,7 +87,18 @@
np.int16,
np.int32,
np.int64,
np.uint,
np.uint8,
np.ubyte,
np.uint16,
np.uint32,
np.uintc,
np.uint64,
np.uintp,
np.ushort,
np.ulonglong,
np.float16,
np.float32,
np.float64,
np.float128,
np.longfloat,
Expand Down
53 changes: 34 additions & 19 deletions deker/uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from collections import OrderedDict, namedtuple
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from typing import List, Optional, Tuple, Union
from urllib.parse import ParseResult, _NetlocResultMixinStr, parse_qs, quote, urlparse

from deker_tools.path import is_path_valid
Expand All @@ -39,22 +39,22 @@ class Uri(ParseResultWithServers, _NetlocResultMixinStr):
__annotations__ = OrderedDict(
scheme={"separator": None, "divider": None},
netloc={"separator": "://", "divider": None},
path={"separator": "/", "divider": None},
params={"separator": ";", "divider": None},
servers={"separator": "@", "divider": ","},
path={"separator": "/", "divider": "/"},
params={"separator": ";", "divider": ","},
query={"separator": "?", "divider": "&"},
fragment={"separator": "#", "divider": None},
servers=Optional[List[str]],
)
query: Dict[str, List[str]]
servers: List[str]

@property
def raw_url(self) -> str:
"""Get url from raw uri without query string, arguments and fragments."""
url = self.scheme + "://" # type: ignore[attr-defined]
url = self.scheme + self.__annotations__["netloc"]["separator"] # type: ignore[attr-defined]
if self.netloc: # type: ignore[attr-defined]
url += self.netloc # type: ignore[attr-defined]
url += quote(str(self.path), safe=":/") # type: ignore[attr-defined]
url += quote(
str(self.path), safe=self.__annotations__["netloc"]["separator"][:-1] # type: ignore[attr-defined]
)
return url

@classmethod
Expand All @@ -64,18 +64,33 @@ def __get_servers_and_netloc(cls, netloc: str, scheme: str) -> Tuple[str, Option
:param netloc: Netloc object
:param scheme: http or https
"""
# If scheme is not http or https, it couldn't work in cluster mode
if "," not in netloc or scheme not in ["http", "https"]:
# If scheme is not http or https, it cannot work in cluster mode
if (
scheme not in ["http", "https"]
or cls.__annotations__["servers"]["divider"] not in netloc
):
# So servers will be None
return netloc, None

# Otherwise parse servers
servers = netloc.split(",")
servers = netloc.split(cls.__annotations__["servers"]["divider"])
node_with_possible_auth = servers[0]
if "@" in node_with_possible_auth:
auth, _ = node_with_possible_auth.split("@")
return node_with_possible_auth, [f"{scheme}://{auth}@{host}" for host in servers[1:]]
return node_with_possible_auth, [f"{scheme}://{host}" for host in servers[1:]]
if cls.__annotations__["servers"]["separator"] in node_with_possible_auth:
auth, _ = node_with_possible_auth.split(cls.__annotations__["servers"]["separator"])
return (
node_with_possible_auth,
[
f"{scheme}{cls.__annotations__['netloc']['separator']}"
f"{auth}"
f"{cls.__annotations__['servers']['separator']}"
f"{host}"
for host in servers[1:]
],
)
return (
node_with_possible_auth,
[f"{scheme}{cls.__annotations__['netloc']['separator']}{host}" for host in servers[1:]],
)

@classmethod
def __parse(cls, uri: str) -> Uri:
Expand All @@ -86,7 +101,7 @@ def __parse(cls, uri: str) -> Uri:
result = urlparse(uri)

if ";" in result.path:
path, params = result.path.split(";")
path, params = result.path.split(cls.__annotations__["params"]["separator"])
else:
path, params = result.path, result.params

Expand Down Expand Up @@ -135,16 +150,16 @@ def __truediv__(self, other: Union[str, Path]) -> Uri:

:param other: Path or string to join
"""
sep = "/"
sep = self.__annotations__["path"]["separator"]
other = str(other)
path = sep.join((self.path, other.strip())) # type: ignore[attr-defined]
netloc, servers = self.__get_servers_and_netloc(self.netloc, self.scheme) # type: ignore[attr-defined]
res = Uri( # type: ignore
self.scheme, # type: ignore[attr-defined]
netloc,
path, # type: ignore[arg-type]
path, # type: ignore[attr-defined]
self.params, # type: ignore[attr-defined]
self.query, # type: ignore[arg-type]
self.query, # type: ignore[attr-defined]
self.fragment, # type: ignore[attr-defined]
servers,
)
Expand Down
Loading
Loading