Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:openweathermap/deker into fix/ordere…
Browse files Browse the repository at this point in the history
…d-dict
  • Loading branch information
matveyvarg committed Apr 15, 2024
2 parents 3aacf3d + b992e58 commit 8aec238
Show file tree
Hide file tree
Showing 21 changed files with 961 additions and 501 deletions.
43 changes: 11 additions & 32 deletions deker/ABC/base_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
from deker.schemas import ArraySchema, VArraySchema
from deker.subset import Subset, VSubset
from deker.tools.array import check_memory, get_id
from deker.tools.attributes import deserialize_attribute_value, serialize_attribute_value
from deker.tools.schema import create_dimensions
from deker.types.private.classes import ArrayMeta, Serializer
from deker.types.private.typings import FancySlice, Numeric, Slice
from deker.validators import process_attributes, is_valid_uuid, validate_custom_attributes_update
from deker.validators import is_valid_uuid, process_attributes, validate_custom_attributes_update


if TYPE_CHECKING:
Expand Down Expand Up @@ -392,26 +393,14 @@ def update_custom_attributes(self, attributes: dict) -> None:
self.logger.info(f"{self!s} custom attributes updated: {attributes}")

def _create_meta(self) -> str:
"""Serialize array into metadata string."""
"""Serialize array into metadata JSON string."""
primary_attrs, custom_attrs = deepcopy(self.primary_attributes), deepcopy(
self.custom_attributes
)
for attrs in (primary_attrs, custom_attrs):
for key, value in attrs.items():
if isinstance(value, datetime):
attrs[key] = value.isoformat()
elif isinstance(value, np.ndarray):
attrs[key] = value.tolist()
elif isinstance(value, (list, tuple)):
elements = []
for element in value:
if isinstance(element, np.integer):
elements.append(int(element))
else:
elements.append(element)
attrs[key] = tuple(elements)
else:
attrs[key] = value
attrs[key] = serialize_attribute_value(value)

return json.dumps(
{
"id": self.id,
Expand Down Expand Up @@ -485,23 +474,13 @@ def _create_from_meta(
result_attributes = custom_attributes

value = attributes_from_meta[attr_schema.name]

if attr_schema.dtype == datetime:
result_attributes[attr_schema.name] = get_utc(value)
elif attr_schema.dtype == tuple:
if (
attr_schema.primary or (not attr_schema.primary and value is not None)
) and not isinstance(value, list):
raise DekerMetaDataError(
f"Collection '{collection.name}' metadata is corrupted: "
f"attribute '{attr_schema.name}' has invalid type '{type(value)}';"
f"'{attr_schema.dtype}' expected"
)

if attr_schema.primary or (not attr_schema.primary and value is not None):
result_attributes[attr_schema.name] = tuple(value)
else:
if value is None and not attr_schema.primary:
result_attributes[attr_schema.name] = value
continue

result_attributes[attr_schema.name] = deserialize_attribute_value(
value, attr_schema.dtype, False
)

arr_params = {
"collection": collection,
Expand Down
9 changes: 4 additions & 5 deletions deker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
# nopycln: file
# isort: skip_file

from deker_local_adapters.storage_adapters.hdf5.hdf5_options import (
HDF5Options,
HDF5CompressionOpts,
)

from deker.arrays import Array, VArray
from deker.client import Client
from deker.collection import Collection
Expand All @@ -36,6 +31,10 @@
)
from deker.subset import Subset, VSubset
from deker.types.public.classes import Scale
from deker_local_adapters.storage_adapters.hdf5.hdf5_options import (
HDF5Options,
HDF5CompressionOpts,
)

__all__ = (
# deker.adapters.hdf5
Expand Down
4 changes: 2 additions & 2 deletions deker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Type, Union

from deker_tools.data import convert_size_to_human
from deker_tools.path import is_path_valid
from deker_tools.log import set_logger
from deker_tools.path import is_path_valid
from psutil import swap_memory, virtual_memory
from tqdm import tqdm

Expand All @@ -43,7 +43,7 @@
)
from deker.integrity import IntegrityChecker
from deker.locks import META_DIVIDER
from deker.log import SelfLoggerMixin, set_logging_level, format_string
from deker.log import SelfLoggerMixin, format_string, set_logging_level
from deker.schemas import ArraySchema, VArraySchema
from deker.tools import convert_human_memory_to_bytes
from deker.types import ArrayLockMeta, CollectionLockMeta, LocksExtensions, LocksTypes, StorageSize
Expand Down
34 changes: 33 additions & 1 deletion deker/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List


# TODO: MOVED
Expand Down Expand Up @@ -109,7 +110,38 @@ class DekerSubsetError(DekerArrayError):


class DekerVSubsetError(DekerSubsetError):
"""If something goes wrong while VSubset managing."""
"""If something goes wrong while VSubset managing.
Regarding that VSubset's inner Subsets' processing
is made in an Executor, this exception actually is
an `exceptions messages group`.
If one or more threads finished with any exception,
name, message and reasonable tracebacks of all
of these exceptions shall be collected in a list
and passed to this class along with some message.
```
futures = [executor.submit(func, arg) for arg in iterable]
exceptions = []
for future in futures:
try:
future.result()
except Exception as e:
exceptions.append(repr(e) + "\n" + traceback.format_exc(-1))
```
"""

def __init__(self, message: str, exceptions: List[str]):
self.message = message
self.exceptions = exceptions
super().__init__(message)

def __str__(self) -> str:
enumerated = [f"{num}) {e}" for num, e in enumerate(self.exceptions, start=1)]
joined = "\n".join(str(e) for e in enumerated)
return f"{self.message}; exceptions:\n\n{joined} "


# TODO: MOVED
Expand Down
13 changes: 7 additions & 6 deletions deker/integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from deker.tools import get_main_path, get_symlink_path
from deker.types.private.enums import LocksExtensions


if TYPE_CHECKING:
from deker.client import Client

Expand Down Expand Up @@ -187,15 +188,15 @@ def _check_varrays_or_arrays(
except DekerBaseApplicationError as e:
if self.stop_on_error:
raise DekerIntegrityError(str(e))
self.errors[
f"Collection {collection.name} arrays integrity errors:"
].append(str(e))
self.errors[f"Collection {collection.name} arrays integrity errors:"].append(
str(e)
)
except DekerMetaDataError as e:
if self.stop_on_error:
raise e
self.errors[
f"Collection {collection.name} (V)Arrays initialization errors:"
].append(str(e))
self.errors[f"Collection {collection.name} (V)Arrays initialization errors:"].append(
str(e)
)

def check(self, collection: Collection) -> None:
"""Check if Arrays or VArrays and their locks in Collection are valid.
Expand Down
2 changes: 2 additions & 0 deletions deker/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ class WriteVarrayLock(BaseLock):
which managed to obtain all Array locks, will survive.
"""

ALLOWED_TYPES = ["VSubset"]

# Locks that have been acquired by varray
locks: List[Tuple[Flock, Path]] = []
skip_lock: bool = False # shows that we must skip this lock (e.g server adapters for subset)
Expand Down
38 changes: 21 additions & 17 deletions deker/subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import builtins
import traceback

from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple, Union

Expand All @@ -24,7 +25,7 @@
from numpy import ndarray

from deker.ABC.base_subset import BaseSubset
from deker.errors import DekerArrayError, DekerLockError, DekerVSubsetError
from deker.errors import DekerArrayError, DekerVSubsetError
from deker.locks import WriteVarrayLock
from deker.schemas import TimeDimensionSchema
from deker.tools import not_deleted
Expand Down Expand Up @@ -621,20 +622,23 @@ def _update(array_data: ArrayPositionedData) -> None:
self.__array.dtype, self.__array.shape, data, self.__bounds
)

results = self.__adapter.executor.map(
_update,
[
ArrayPositionedData(vpos, array_bounds, data[data_bounds])
for vpos, array_bounds, data_bounds in self.__arrays
],
)
try:
list(results)
except Exception as e:
if isinstance(e, DekerLockError):
raise e
else:
raise DekerVSubsetError(
f"ATTENTION: Data in {self!s} IS NOW CORRUPTED due to the exception above"
).with_traceback(e.__traceback__)
positions = [
ArrayPositionedData(vpos, array_bounds, data[data_bounds])
for vpos, array_bounds, data_bounds in self.__arrays
]
futures = [self.__adapter.executor.submit(_update, position) for position in positions]

exceptions = []
for future in futures:
try:
future.result()
except Exception as e:
exceptions.append(repr(e) + "\n" + traceback.format_exc(-1))

if exceptions:
raise DekerVSubsetError(
f"ATTENTION: Data in {self!s} MAY BE NOW CORRUPTED due to the exceptions occurred in threads",
exceptions,
)

self.logger.info(f"{self!s} data updated OK")
114 changes: 114 additions & 0 deletions deker/tools/attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# deker - multidimensional arrays storage engine
# Copyright (C) 2023 OpenWeather
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import re

from datetime import datetime
from typing import Any, Tuple, Type, Union

import numpy as np

from deker_tools.time import get_utc


def serialize_attribute_value(
val: Any,
) -> Union[Tuple[str, int, float, tuple], str, int, float, tuple]:
"""Serialize attribute value.
:param val: complex number
"""
if isinstance(val, datetime):
return val.isoformat()
if isinstance(val, np.ndarray):
return val.tolist() # type: ignore[attr-defined]
if isinstance(val, complex):
return str(val)
if isinstance(val, np.integer):
return int(val)
if isinstance(val, np.floating):
return float(val)
if isinstance(val, np.complexfloating):
return str(complex(val))
if isinstance(val, (list, tuple)):
return serialize_attribute_nested_tuples(val) # type: ignore[arg-type]

return val


def serialize_attribute_nested_tuples(value: Union[tuple, list]) -> Tuple[Any, ...]:
"""Serialize attribute nested tuples and their elements.
:param value: tuple instance
"""
serialized = []
for el in value:
if isinstance(el, (list, tuple)):
val = serialize_attribute_nested_tuples(el)
else:
val = serialize_attribute_value(el)
serialized.append(val) # type: ignore[arg-type]
return tuple(serialized)


def deserialize_attribute_value(val: Any, dtype: Type, from_tuple: bool) -> Any:
"""Deserialize attribute value.
:param val: attribute value
:param dtype: attribute dtype from schema or type of tuple element
:param from_tuple: flag for tuple inner elements
"""
if dtype == datetime:
val = get_utc(val)
else:
val = dtype(val)

if isinstance(val, (list, tuple)) and dtype == tuple:
return deserialize_attribute_nested_tuples(val) # type: ignore[arg-type]

if dtype == str:
# if the value comes from a tuple as one of its elements
if from_tuple:
# it may be a serialized string representation of a complex number
complex_number_regex = re.compile(
r"^(\()([+-]?)\d+(?:\.\d+)?(e?)([+-]?)(\d+)?"
r"([+-]?)\d+(?:\.\d+)?(e?)([+-]?)(\d+)?j(\))$"
)
# as far as we don't exactly know what it is
# we try to catch it by a regular expression
if re.findall(complex_number_regex, val): # type: ignore[arg-type]
try:
# and to convert it to a complex number if there's a match
return complex(val) # type: ignore[arg-type]
except ValueError:
# if conversion fails we return string
return val

return val


def deserialize_attribute_nested_tuples(value: Tuple[Any, ...]) -> Tuple[Any, ...]:
"""Deserialize attribute nested tuples and their elements.
:param value: attribute tuple value
"""
deserialized = []
for el in value:
if isinstance(el, (tuple, list)):
value = deserialize_attribute_nested_tuples(el) # type: ignore[arg-type]
else:
value = deserialize_attribute_value(el, type(el), True)
deserialized.append(value)
return tuple(deserialized)
Loading

0 comments on commit 8aec238

Please sign in to comment.