diff --git a/setup.py b/setup.py index 9c2b674..2d929a1 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ def get_dependencies(subpackage="requirements"): setup( name='cloud_array', - version='0.0.6', + version='0.0.7', author="Michal Murawski", author_email="mmurawski777@gmail.com", description="Cloud implementation of array for Big Data", diff --git a/src/cloud_array/array.py b/src/cloud_array/array.py index 3b6556a..908babf 100644 --- a/src/cloud_array/array.py +++ b/src/cloud_array/array.py @@ -1,12 +1,11 @@ -from itertools import product -from typing import AnyStr, Dict, List, Sequence, Tuple +from typing import AnyStr, Dict, Sequence, Tuple import numpy as np from cloud_array.backends import Backend, get_backend from cloud_array.exceptions import CloudArrayException -from cloud_array.utils import (chunk2list, collect, compute_index_of_slice, compute_number_of_chunks, - get_index_of_iter_product) +from cloud_array.helpers import (chunk2list, collect, compute_index_of_slice, compute_number_of_chunks, + generate_chunks_slices, get_chunk_slice_by_index, parse_key_to_slices) class Chunk: @@ -112,30 +111,11 @@ def get_metadata(self) -> dict: return result def generate_chunks_slices(self) -> Tuple[slice]: - _ranges = ( - range(0, a, c) - for c, a in zip(self.chunk_shape, self.shape) - ) - p = product(*_ranges) - for i in p: - yield tuple( - slice( - i[j], - min(self.shape[j], i[j]+self.chunk_shape[j]) - ) - for j in range(len(self.shape)) - ) + for _slice in generate_chunks_slices(self.shape, self.chunk_shape): + yield _slice def get_chunk_slice_by_index(self, number: int) -> Tuple[slice]: - p = tuple((0, a, c) for c, a in zip(self.chunk_shape, self.shape)) - val = get_index_of_iter_product(number, p) - return tuple( - slice( - val[j], - min(self.shape[j], val[j]+self.chunk_shape[j]) - ) - for j in range(len(self.shape)) - ) + return get_chunk_slice_by_index(self.shape, self.chunk_shape, number) @staticmethod def count_number_of_chunks(shape: Tuple[int], chunk_shape: Tuple[int]) -> int: @@ -161,59 +141,8 @@ def save(self, array=None) -> None: for chunk in self.chunks(): chunk.save(array[chunk.slice]) - def initial_merge_of_chunks(self, sorted_chunks) -> List[Tuple[np.ndarray, Tuple[slice]]]: - datasets = [] - for x in sorted_chunks: - data = None - for i in x[0]: - chunk_data = self.get_chunk(i)[:, :, :] - if data is None: - data = chunk_data - else: - data = np.concatenate( - (data, chunk_data), - axis=x[1] - ) - datasets.append( - (data, x[2]) - ) - return datasets - - def parse_key_to_slices(self, key: Tuple[slice]): - result = [] - for i in range(len(key)): - val = key[i] - if isinstance(val, int): - if val < 0: - val = self.shape[i] + val - result.append( - slice(val, val+1) - ) - else: - start = val.start or 0 - stop = val.stop or self.shape[i] - if start > self.shape[i] or stop > self.shape[i]: - raise CloudArrayException( - f"Slice {key[i]} does not fit shape: {self.shape}.") - if start >= stop: - raise CloudArrayException( - f"Key invalid slice {key[i]}. Start >= stop.") - if start < 0: - start = self.shape[i] + start - if stop < 0: - stop = self.shape[i] + stop - - result.append( - slice( - start, - stop, - val.step if val.step else 1 - ) - ) - return tuple(result) - def __getitem__(self, key) -> np.ndarray: - new_key = self.parse_key_to_slices(key) + new_key = parse_key_to_slices(self.shape, self.chunk_shape, key) def _get_chunk_data_by_key(key: Sequence[slice]): idx = compute_index_of_slice(key, self.shape, self.chunk_shape) diff --git a/src/cloud_array/utils.py b/src/cloud_array/helpers.py similarity index 57% rename from src/cloud_array/utils.py rename to src/cloud_array/helpers.py index ae180a6..292fcad 100644 --- a/src/cloud_array/utils.py +++ b/src/cloud_array/helpers.py @@ -1,11 +1,14 @@ import operator from copy import copy from functools import reduce +from itertools import product from math import ceil from typing import Callable, List, Sequence, Tuple import numpy as np +from .exceptions import CloudArrayException + def compute_number_of_chunks(shape: Tuple[int], chunk_shape: Tuple[int]) -> int: """ @@ -81,3 +84,65 @@ def chunk2list(chunk: Tuple[slice]) -> List[List[int]]: def list2chunk(_list: List[List[int]]) -> Tuple[slice]: return tuple([slice(*el) for el in _list]) + + +def generate_chunks_slices(shape: Sequence[int], chunk_shape: Sequence[int]) -> Tuple[slice]: + _ranges = ( + range(0, a, c) + for c, a in zip(chunk_shape, shape) + ) + p = product(*_ranges) + for i in p: + yield tuple( + slice( + i[j], + min(shape[j], i[j]+chunk_shape[j]) + ) + for j in range(len(shape)) + ) + + +def get_chunk_slice_by_index(shape: Sequence[int], chunk_shape: Sequence[int], number: int) -> Tuple[slice]: + p = tuple((0, a, c) for c, a in zip(chunk_shape, shape)) + val = get_index_of_iter_product(number, p) + return tuple( + slice( + val[j], + min(shape[j], val[j]+chunk_shape[j]) + ) + for j in range(len(shape)) + ) + + +def parse_key_to_slices(shape: Sequence[int], chunk_shape: Sequence[int], key: Tuple[slice]): + result = [] + for i in range(len(key)): + val = key[i] + if isinstance(val, int): + if val < 0: + val = shape[i] + val + result.append( + slice(val, val+1) + ) + else: + start = val.start or 0 + stop = val.stop or shape[i] + if start > shape[i] or stop > shape[i]: + raise CloudArrayException( + f"Slice {key[i]} does not fit shape: {shape}.") + if start >= stop: + raise CloudArrayException( + f"Key invalid slice {key[i]}. Start >= stop.") + if start < 0: + start = shape[i] + start + if stop < 0: + stop = shape[i] + stop + + result.append( + slice( + start, + stop, + val.step if val.step else 1 + ) + ) + return tuple(result)