Skip to content

Commit

Permalink
Merge pull request #26 from openweathermap/dev
Browse files Browse the repository at this point in the history
Cluster version
  • Loading branch information
matveyvarg authored Nov 20, 2023
2 parents 4ef7d90 + 9c012db commit 5cb3cd1
Show file tree
Hide file tree
Showing 17 changed files with 829 additions and 470 deletions.
35 changes: 10 additions & 25 deletions deker/ABC/base_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
from deker.dimensions import Dimension, TimeDimension
from deker.errors import DekerMetaDataError, DekerValidationError
from deker.log import SelfLoggerMixin
from deker.schemas import ArraySchema, TimeDimensionSchema, VArraySchema
from deker.schemas import ArraySchema, VArraySchema
from deker.subset import Subset, VSubset
from deker.tools.array import check_memory, get_id
from deker.tools.schema import create_dimensions
from deker.types.private.classes import ArrayMeta, Serializer
from deker.types.private.typings import FancySlice, Numeric, Slice
from deker.validators import process_attributes
from deker.validators import process_attributes, is_valid_uuid, validate_custom_attributes_update


if TYPE_CHECKING:
Expand Down Expand Up @@ -251,9 +251,7 @@ def _validate(
primary_attributes: Optional[dict] = None,
custom_attributes: Optional[dict] = None,
) -> None:
if id_ is not None and (
not isinstance(id_, str) or len(id_.split("-")) != 5 # noqa[PLR2004]
):
if id_ is not None and not is_valid_uuid(id_):
raise DekerValidationError(
f"{self.__class__.__name__} id shall be a non-empty uuid.uuid5 string or None"
)
Expand Down Expand Up @@ -382,26 +380,13 @@ def update_custom_attributes(self, attributes: dict) -> None:
:param attributes: attributes for updating
"""
if not attributes:
raise DekerValidationError("No attributes passed for update")
for s in self.schema.dimensions:
if (
isinstance(s, TimeDimensionSchema)
and isinstance(s.start_value, str)
and s.start_value.startswith("$")
):
if s.start_value[1:] in self.primary_attributes:
continue
if s.start_value[1:] not in attributes:
for d in self.dimensions:
if d.name == s.name:
attributes[s.start_value[1:]] = d.start_value # type: ignore[attr-defined]
else:
for attr in self.schema.attributes:
if not attr.primary and attr.name not in attributes:
attributes[attr.name] = self.custom_attributes[attr.name]

process_attributes(self.schema, self.primary_attributes, attributes)
attributes = validate_custom_attributes_update(
self.schema,
self.dimensions,
self.primary_attributes,
self.custom_attributes,
attributes,
)
self._adapter.update_meta_custom_attributes(self, attributes)
self.custom_attributes = attributes
self.logger.info(f"{self!s} custom attributes updated: {attributes}")
Expand Down
28 changes: 20 additions & 8 deletions deker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,20 @@ def _open(self) -> None:
"No installed adapters are found: run `pip install deker_local_adapters`"
)

if self.__uri.scheme not in self.__plugins:
if self.__uri.scheme not in self.__plugins: # type: ignore[attr-defined]
raise DekerClientError(
f"Invalid uri: {self.__uri.scheme} is not supported; {self.__uri}"
f"Invalid uri: {self.__uri.scheme} is not supported; {self.__uri}" # type: ignore[attr-defined]
)

if self.is_closed:
self.__is_closed = False

try:
factory = self.__plugins[self.__uri.scheme]
factory = self.__plugins[self.__uri.scheme] # type: ignore[attr-defined]
except AttributeError:
raise DekerClientError(
f"Invalid source: installed package does not provide AdaptersFactory "
f"for managing uri scheme {self.__uri.scheme}"
f"for managing uri scheme {self.__uri.scheme}" # type: ignore[attr-defined]
)

self.__ctx = CTX(
Expand Down Expand Up @@ -267,7 +267,10 @@ def meta_version(self) -> str:
@property
def root_path(self) -> Path:
"""Get root path to the current storage."""
return Path(self.__adapter.uri.path) / self.__config.collections_directory
return (
Path(self.__adapter.uri.path) # type: ignore[attr-defined]
/ self.__config.collections_directory
)

@property
def is_closed(self) -> bool:
Expand Down Expand Up @@ -400,8 +403,10 @@ def get_collection(
self.logger.info(f"Collection {name} not found")
return None

def collection_from_dict(self, collection_data: dict) -> Collection:
"""Create a new ``Collection`` in the database from collection metadata dictionary.
def _validate_collection(self, collection_data: dict) -> Collection:
"""Validate ``Collection`` object and return it without creation.
Not recommended to use except for validation.
:param collection_data: Dictionary with collection metadata
"""
Expand Down Expand Up @@ -432,9 +437,16 @@ def collection_from_dict(self, collection_data: dict) -> Collection:

elif k not in collection_data[key]:
collection_data[key][k] = default_fields[key][k]
collection = self.__adapter.create_collection_from_meta( # type: ignore[return-value]
return self.__adapter.create_collection_from_meta( # type: ignore[return-value]
collection_data, self.__factory
)

def collection_from_dict(self, collection_data: dict) -> Collection:
"""Create a new ``Collection`` in the database from collection metadata dictionary.
:param collection_data: Dictionary with collection metadata
"""
collection = self._validate_collection(collection_data)
self.__adapter.create(collection)
self.logger.debug(f"Collection {collection.name} created from dict")
return collection # type: ignore[return-value]
Expand Down
10 changes: 7 additions & 3 deletions deker/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ def arrays(self) -> ArrayManager:

@not_deleted
def create(
self, primary_attributes: Optional[dict] = None, custom_attributes: Optional[dict] = None
self,
primary_attributes: Optional[dict] = None,
custom_attributes: Optional[dict] = None,
id_: Optional[str] = None,
) -> Union[Array, VArray]:
"""Create ``Array`` or ``VArray`` according to collection main schema.
Expand All @@ -272,12 +275,13 @@ def create(
Otherwise, only ``Arrays`` will be created.
:param primary_attributes: ``Array`` or ``VArray`` primary attribute
:param custom_attributes: ``VArray`` or ``VArray`` custom attributes
:param custom_attributes: ``Array`` or ``VArray`` custom attributes
:param id_: ``Array`` or ``VArray`` unique UUID string
"""
schema = self.array_schema
shape = schema.arrays_shape if hasattr(schema, "arrays_shape") else schema.shape
check_memory(shape, schema.dtype, self.__adapter.ctx.config.memory_limit)
array = self.__manager.create(primary_attributes, custom_attributes)
array = self.__manager.create(primary_attributes, custom_attributes, id_)
self.logger.debug(
f"{array.__class__.__name__} id={array.id} {primary_attributes=}, {custom_attributes=} created"
)
Expand Down
33 changes: 22 additions & 11 deletions deker/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,20 @@ def _create( # type: ignore
schema: "BaseArraysSchema",
primary_attributes: Optional[dict] = None,
custom_attributes: Optional[dict] = None,
id_: Optional[str] = None,
) -> Union[Array, VArray]:
"""Create Array or VArray.
:param primary_attributes: array primary attribute
:param custom_attributes: array custom attributes
:param schema: schema decides which array will be created
:param id_: (V)Array uuid string
"""
arr_params = {
"collection": self.__collection,
"primary_attributes": primary_attributes,
"custom_attributes": custom_attributes,
"id_": id_,
}

if isinstance(schema, VArraySchema):
Expand All @@ -135,15 +138,19 @@ def _create( # type: ignore
return array

def create(
self, primary_attributes: Optional[dict] = None, custom_attributes: Optional[dict] = None
self,
primary_attributes: Optional[dict] = None,
custom_attributes: Optional[dict] = None,
id_: Optional[str] = None,
) -> Union[Array, VArray]:
"""Create array or varray.
:param primary_attributes: Primary attributes
:param custom_attributes: Custom attributes
:param primary_attributes: primary attributes
:param custom_attributes: custom attributes
:param id_: unique UUID string
"""
schema = self.__collection.varray_schema or self.__collection.array_schema
return self._create(schema, primary_attributes, custom_attributes)
return self._create(schema, primary_attributes, custom_attributes, id_)


class VArrayManager(SelfLoggerMixin, DataManager):
Expand Down Expand Up @@ -177,14 +184,16 @@ def create(
self,
primary_attributes: Optional[dict] = None,
custom_attributes: Optional[dict] = None,
id_: Optional[str] = None,
) -> VArray:
"""Create varray in collection.
:param primary_attributes: Primary attributes the varray
:param custom_attributes: Custom attributes the varray
:param primary_attributes: VArray primary attributes
:param custom_attributes: VArray custom attributes
:param id_: VArray unique UUID string
"""
return self._create( # type: ignore[return-value]
self.__collection.varray_schema, primary_attributes, custom_attributes # type: ignore[arg-type]
self.__collection.varray_schema, primary_attributes, custom_attributes, id_ # type: ignore[arg-type]
)

def __iter__(self) -> Generator[VArray, None, None]:
Expand Down Expand Up @@ -223,14 +232,16 @@ def create(
self,
primary_attributes: Optional[dict] = None,
custom_attributes: Optional[dict] = None,
id_: Optional[str] = None,
) -> Array:
"""Create varray in collection.
"""Create array in collection.
:param primary_attributes: Primary attributes the varray
:param custom_attributes: Custom attributes the varray
:param primary_attributes: Array primary attributes
:param custom_attributes: Array custom attributes
:param id_: Array unique UUID string
"""
return self._create( # type: ignore[return-value]
self.__collection.array_schema, primary_attributes, custom_attributes
self.__collection.array_schema, primary_attributes, custom_attributes, id_
)

def __iter__(self) -> Generator[Array, None, None]:
Expand Down
22 changes: 20 additions & 2 deletions deker/tools/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from psutil import swap_memory, virtual_memory

from deker.errors import DekerMemoryError, DekerValidationError
from deker.types.private.enums import ArrayType


def calculate_total_cells_in_array(seq: Union[Tuple[int, ...], List[int]]) -> int:
Expand Down Expand Up @@ -53,6 +54,11 @@ def convert_human_memory_to_bytes(memory_limit: Union[int, str]) -> int:
if isinstance(memory_limit, int):
return memory_limit

try:
return int(memory_limit)
except ValueError:
pass

limit, div = memory_limit[:-1], memory_limit.lower()[-1]
try:
int_limit: int = int(limit)
Expand Down Expand Up @@ -87,6 +93,18 @@ def check_memory(shape: tuple, dtype: type, mem_limit_from_settings: int) -> Non
)


def generate_uid(array_type: ArrayType) -> str:
"""Generate uuid5 for given array_type.
:param array_type: Either array or varray
"""
if not isinstance(array_type, ArrayType):
raise TypeError("Invalid argument type. Array type is required")

namespace = uuid.NAMESPACE_X500 if array_type == ArrayType.array else uuid.NAMESPACE_OID
return str(uuid.uuid5(namespace, array_type.value + get_utc().isoformat()))


def get_id(array: Any) -> str:
"""Generate unique id by object type and datetime.
Expand All @@ -108,14 +126,14 @@ def array_id(arr: Array) -> str: # noqa[ARG001]
:param arr: Array type
"""
return str(uuid.uuid5(uuid.NAMESPACE_X500, "array" + get_utc().isoformat()))
return generate_uid(ArrayType.array)

@generate_id.register(VArray)
def varray_id(arr: VArray) -> str: # noqa[ARG001]
"""Generate id for VArray.
:param arr: VArray type
"""
return str(uuid.uuid5(uuid.NAMESPACE_OID, "varray" + get_utc().isoformat()))
return generate_uid(ArrayType.varray)

return generate_id(array)
11 changes: 8 additions & 3 deletions deker/types/private/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ def get_name(object: "DTypeEnum") -> str:
return f"numpy.{object.name}"


class DimensionType(Enum):
class DimensionType(str, Enum):
"""Enum of dimensions' types."""

generic = "generic"
time = "time"


class LocksExtensions(Enum):
class LocksExtensions(str, Enum):
"""Extensions for lock files."""

array_lock = ".arrlock"
Expand All @@ -77,10 +77,15 @@ class LocksExtensions(Enum):
varray_lock = ".varraylock"


class LocksTypes(Enum):
class LocksTypes(str, Enum):
"""Locks enum."""

array_lock = "array creation lock"
array_read_lock = "array read lock"
collection_lock = "collection creation lock"
varray_lock = "varray write lock"


class ArrayType(str, Enum):
array = "array"
varray = "varray"
Loading

0 comments on commit 5cb3cd1

Please sign in to comment.