Skip to content

Commit

Permalink
ML-280: Support time range in readers (#200)
Browse files Browse the repository at this point in the history
* ML-280: adding filter (before and after) to ReadParquet

* Support list key_field in Source. (#197)

Co-authored-by: Gal Topper <[email protected]>

* Cache (#184)

* refactor

* working integ

* lint

* fix typo

* test fixes

* lint

* temp

* integ

* parametrize tests

* rename

* handle exceptions inside flush worker

* mid

* fix reuse

* flushing

* use changed items list instead of time lookup

* check if running loop exists

* add test

* fix test

* add flush interval enum

* update test and doc

* code review

* make flush interval an optional[int] and init_flush_task only from async code

* update doc

Co-authored-by: Dina Nimrodi <[email protected]>

* iterate over a copy of changed keys (#198)

* iterate over a copy of changed keys

* don't add key to persist job if it's already pending

* fix several bugs

Co-authored-by: Dina Nimrodi <[email protected]>

* ML-389: Fix expected webapi error in case of concurrent modification of aggregations.  (#199)

* Don't use now in tests.

* ML-389: Fix expected webapi error in case of concurrent modification of aggregations.

Co-authored-by: Gal Topper <[email protected]>

* some cleanup

* more cleanup

* fix

* PR comments and adding read_parquet with filtering

* some cleanup

* filter by column inside the method

* pr comment

* bug fix

* bug fix

* bug fix

* one more bug fix

* pr comments

* fix

* pr comments

* minor fix

Co-authored-by: Gal Topper <[email protected]>
Co-authored-by: Gal Topper <[email protected]>
Co-authored-by: Dina Nimrodi <[email protected]>
Co-authored-by: Dina Nimrodi <[email protected]>
  • Loading branch information
5 people authored Apr 22, 2021
1 parent 9fdf96c commit 05b6db8
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 5 deletions.
180 changes: 179 additions & 1 deletion integration/test_filesystems_integration.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import asyncio
import sys
import random as rand

from .integration_test_utils import setup_teardown_test, _generate_table_name, V3ioHeaders, V3ioError
from storey import build_flow, ReadCSV, WriteToCSV, Source, Reduce, Map, FlatMap, AsyncSource, WriteToParquet
from storey import build_flow, ReadCSV, WriteToCSV, Source, Reduce, Map, FlatMap, AsyncSource, WriteToParquet, ReadParquet, DataframeSource
import pandas as pd
import aiohttp
import pytest
import v3io
import uuid
import datetime


@pytest.fixture()
Expand Down Expand Up @@ -246,3 +250,177 @@ def test_write_to_parquet_to_v3io_with_indices(setup_teardown_test):

read_back_df = pd.read_parquet(out_file, columns=columns)
assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}"


def append_and_return(lst, x):
lst.append(x)
return lst


def test_filter_before_after_non_partitioned(setup_teardown_test):
columns = ['my_string', 'my_time']

df = pd.DataFrame([['good', pd.Timestamp('2018-05-07 13:52:37')],
['hello', pd.Timestamp('2019-01-26 14:52:37')],
['world', pd.Timestamp('2020-05-11 13:52:37')]],
columns=columns)
df.set_index('my_string')

out_file = f'v3io:///{setup_teardown_test}/'
controller = build_flow([
DataframeSource(df),
WriteToParquet(out_file, columns=columns, partition_cols=[]),
]).run()
controller.await_termination()

before = pd.Timestamp('2019-12-01 00:00:00')
after = pd.Timestamp('2019-01-01 23:59:59.999999')

controller = build_flow([
ReadParquet(out_file, end_filter=before, start_filter=after, filter_column='my_time'),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()
expected = [{'my_string': 'hello', 'my_time': pd.Timestamp('2019-01-26 14:52:37')}]

assert read_back_result == expected, f"{read_back_result}\n!=\n{expected}"


def test_filter_before_after_partitioned_random(setup_teardown_test):
low_limit = pd.Timestamp('2018-01-01')
high_limit = pd.Timestamp('2020-12-31 23:59:59.999999')

delta = high_limit - low_limit

seed_value = rand.randrange(sys.maxsize)
print('Seed value:', seed_value)

rand.seed(seed_value)

random_second = rand.randrange(int(delta.total_seconds()))
middle_limit = low_limit + datetime.timedelta(seconds=random_second)

print("middle_limit is " + str(middle_limit))

number_below_middle_limit = rand.randrange(0, 10)

def create_rand_data(num_elements, low_limit, high_limit, char):
import datetime
delta = high_limit - low_limit

data = []
for i in range(0, num_elements):
element = {}
element['string'] = char + str(i)
random_second = rand.randrange(int(delta.total_seconds()))
element['datetime'] = low_limit + datetime.timedelta(seconds=random_second)
data.append(element)
return data

list1 = create_rand_data(number_below_middle_limit, low_limit, middle_limit, 'lower')
list2 = create_rand_data(10-number_below_middle_limit, middle_limit, high_limit, 'higher')
combined_list = list1 + list2

df = pd.DataFrame(combined_list)
print("data frame is " + str(df))

all_partition_columns = ['$year', '$month', '$day', '$hour', '$minute', '$second']
num_part_columns = rand.randrange(1, 6)
partition_columns = all_partition_columns[:num_part_columns]
print("partitioned by " + str(partition_columns))

out_file = f'v3io:///{setup_teardown_test}/'
controller = build_flow([
DataframeSource(df, time_field='datetime'),
WriteToParquet(out_file, columns=['string', 'datetime'], partition_cols=partition_columns),
]).run()

controller.await_termination()

controller = build_flow([
ReadParquet(out_file, end_filter=high_limit, start_filter=middle_limit, filter_column='datetime'),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()
print("expecting " + str(10 - number_below_middle_limit) + " to be above middle limit")
assert(len(read_back_result)) == 10 - number_below_middle_limit

controller = build_flow([
ReadParquet(out_file, end_filter=middle_limit, start_filter=low_limit, filter_column='datetime'),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()
print("expecting " + str(number_below_middle_limit) + " to be below middle limit")
assert (len(read_back_result)) == number_below_middle_limit


def test_filter_before_after_partitioned_inner_other_partition(setup_teardown_test):
columns = ['my_string', 'my_time', 'my_city']

df = pd.DataFrame([['hello', pd.Timestamp('2020-12-31 14:05:00'), 'tel aviv'],
['world', pd.Timestamp('2018-12-30 09:00:00'), 'haifa'],
['sun', pd.Timestamp('2019-12-29 09:00:00'), 'tel aviv'],
['is', pd.Timestamp('2019-06-30 15:00:45'), 'hod hasharon'],
['shining', pd.Timestamp('2020-02-28 13:00:56'), 'hod hasharon']],
columns=columns)
df.set_index('my_string')

out_file = f'v3io:///{setup_teardown_test}/'
controller = build_flow([
DataframeSource(df, time_field='my_time'),
WriteToParquet(out_file, columns=columns, partition_cols=['$year', '$month', '$day', '$hour', 'my_city']),
]).run()
controller.await_termination()

before = pd.Timestamp('2020-12-31 14:00:00')
after = pd.Timestamp('2019-07-01 00:00:00')

controller = build_flow([
ReadParquet(out_file, end_filter=before, start_filter=after, filter_column='my_time'),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()

expected = [{'my_string': 'sun', 'my_time': pd.Timestamp('2019-12-29 09:00:00'), 'my_city': 'tel aviv',
'year': 2019, 'month': 12, 'day': 29, 'hour': 9},
{'my_string': 'shining', 'my_time': pd.Timestamp('2020-02-28 13:00:56'), 'my_city': 'hod hasharon',
'year': 2020, 'month': 2, 'day': 28, 'hour': 13}]

assert read_back_result == expected, f"{read_back_result}\n!=\n{expected}"


def test_filter_before_after_partitioned_outer_other_partition(setup_teardown_test):
columns = ['my_string', 'my_time', 'my_city']

df = pd.DataFrame([['hello', pd.Timestamp('2020-12-31 15:05:00'), 'tel aviv'],
['world', pd.Timestamp('2020-12-30 09:00:00'), 'haifa'],
['sun', pd.Timestamp('2020-12-29 09:00:00'), 'tel aviv'],
['is', pd.Timestamp('2020-12-30 15:00:45'), 'hod hasharon'],
['shining', pd.Timestamp('2020-12-31 13:00:56'), 'hod hasharon']],
columns=columns)
df.set_index('my_string')

out_file = f'v3io:///{setup_teardown_test}/'
controller = build_flow([
DataframeSource(df, time_field='my_time'),
WriteToParquet(out_file, columns=columns, partition_cols=['my_city', '$year', '$month', '$day', '$hour']),
]).run()
controller.await_termination()

before = pd.Timestamp('2020-12-31 14:00:00')
after = pd.Timestamp('2020-12-30 08:53:00')

controller = build_flow([
ReadParquet(out_file, end_filter=before, start_filter=after, filter_column='my_time'),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()
expected = [{'my_string': 'world', 'my_time': pd.Timestamp('2020-12-30 09:00:00'), 'my_city': 'haifa', 'year': 2020,
'month': 12, 'day': 30, 'hour': 9},
{'my_string': 'is', 'my_time': pd.Timestamp('2020-12-30 15:00:45'), 'my_city': 'hod hasharon', 'year': 2020,
'month': 12, 'day': 30, 'hour': 15},
{'my_string': 'shining', 'my_time': pd.Timestamp('2020-12-31 13:00:56'), 'my_city': 'hod hasharon', 'year': 2020,
'month': 12, 'day': 31, 'hour': 13}]

assert read_back_result == expected, f"{read_back_result}\n!=\n{expected}"

40 changes: 36 additions & 4 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import warnings
from datetime import datetime
from typing import List, Optional, Union, Callable, Coroutine, Iterable
import pyarrow.parquet as pq

import pandas

from .dtypes import _termination_obj, Event
from .flow import Flow, Complete
from .utils import url_to_file_system
from .utils import url_to_file_system, find_filters


class AwaitableResult:
Expand Down Expand Up @@ -673,11 +674,42 @@ class ReadParquet(DataframeSource):
"""Reads Parquet files as input source for a flow.
:parameter paths: paths to Parquet files
:parameter columns : list, default=None. If not None, only these columns will be read from the file.
:parameter start_filter: datetime. If not None, the results will be filtered by partitions and 'filter_column' >= start_filter.
Default is None
:parameter end_filter: datetime. If not None, the results will be filtered by partitions 'filter_column' < end_filter.
Default is None
:parameter filter_column: Optional. if not None, the results will be filtered by this column and before and/or after
"""
def __init__(self, paths: Union[str, Iterable[str]], columns=None, start_filter: Optional[datetime] = None,
end_filter: Optional[datetime] = None, filter_column: Optional[str] = None, **kwargs):
if end_filter or start_filter:
start_filter = datetime.min if start_filter is None else start_filter
end_filter = datetime.max if end_filter is None else end_filter
if filter_column is None:
raise TypeError('Filter column is required when passing start/end filters')

def __init__(self, paths: Union[str, Iterable[str]], columns=None, **kwargs):
if isinstance(paths, str):
paths = [paths]
dfs = map(lambda path: pandas.read_parquet(path, columns=columns,
storage_options=kwargs.get('storage_options')), paths)
storage_options = kwargs.get('storage_options')

def read_filtered_parquet(path, start_filter, end_filter, storage_options, columns):
fs, file_path = url_to_file_system(path, storage_options)
dataset = pq.ParquetDataset(path, filesystem=fs)
if dataset.partitions:
partitions = dataset.partitions.partition_names
time_attributes = ['year', 'month', 'day', 'hour', 'minute', 'second']
partitions_time_attributes = [j for j in time_attributes if j in partitions]
else:
partitions_time_attributes = []
filters = []
find_filters(partitions_time_attributes, start_filter, end_filter, filters, filter_column)
return pandas.read_parquet(path, columns=columns, filters=filters,
storage_options=storage_options)

if start_filter or end_filter:
dfs = map(lambda path: read_filtered_parquet(path, start_filter, end_filter, storage_options, columns), paths)

else:
dfs = map(lambda path: pandas.read_parquet(path, columns=columns,
storage_options=storage_options), paths)
super().__init__(dfs, **kwargs)
76 changes: 76 additions & 0 deletions storey/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,79 @@ def stringify_key(key_list):
return key_list[0]
else:
return key_list


def _create_filter_tuple(dtime, attr, sign, list_tuples):
if attr:
value = getattr(dtime, attr, None)
tuple1 = (attr, sign, value)
list_tuples.append(tuple1)


def _find_filter_helper(list_partitions, dtime, sign, first_sign, first_uncommon, filters, filter_column=None):
single_filter = []
if len(list_partitions) == 0 or first_uncommon is None:
return
last_partition = list_partitions[-1]
if len(list_partitions) == 1 or last_partition == first_uncommon:
return
list_partitions_without_last_element = list_partitions[:-1]
for partition in list_partitions_without_last_element:
_create_filter_tuple(dtime, partition, "=", single_filter)
if first_sign:
# only for the first iteration we need to have ">="/"<=" instead of ">"/"<"
_create_filter_tuple(dtime, last_partition, first_sign, single_filter)
tuple_last_range = (filter_column, sign, dtime)
single_filter.append(tuple_last_range)
else:
_create_filter_tuple(dtime, last_partition, sign, single_filter)
_find_filter_helper(list_partitions_without_last_element, dtime, sign, None, first_uncommon, filters)
filters.append(single_filter)


def _get_filters_for_filter_column(start, end, filter_column, side_range):
lower_limit_tuple = (filter_column, ">=", start)
upper_limit_tuple = (filter_column, "<=", end)
side_range.append(lower_limit_tuple)
side_range.append(upper_limit_tuple)


def find_filters(partitions_time_attributes, start, end, filters, filter_column):
# this method build filters to be used by
# https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
common_partitions = []
first_uncommon = None
# finding the common attributes. for example for start=1.2.2018 08:53:15, end=5.2.2018 16:24:31, partitioned by
# year, month, day, hour. common_partions=[year, month], first_uncommon=day
for part in partitions_time_attributes:
value_start = getattr(start, part, None)
value_end = getattr(end, part, None)
if value_end == value_start:
common_partitions.append(part)
else:
first_uncommon = part
break

# for start=1.2.2018 08:53:15, end=5.2.2018 16:24:31, this method will append to filters
# [(year=2018, month=2,day>=1, filter_column>1.2.2018 08:53:15)]
_find_filter_helper(partitions_time_attributes, start, ">", ">=", first_uncommon, filters, filter_column)

middle_range_filter = []
for partition in common_partitions:
_create_filter_tuple(start, partition, "=", middle_range_filter)

if len(filters) == 0:
# creating only the middle range
_create_filter_tuple(start, first_uncommon, ">=", middle_range_filter)
_create_filter_tuple(end, first_uncommon, "<=", middle_range_filter)
_get_filters_for_filter_column(start, end, filter_column, middle_range_filter)
else:
_create_filter_tuple(start, first_uncommon, ">", middle_range_filter)
_create_filter_tuple(end, first_uncommon, "<", middle_range_filter)
# for start=1.2.2018 08:53:15, end=5.2.2018 16:24:31, this will append to filters
# [(year=2018, month=2, 1<day<5)]
filters.append(middle_range_filter)

# for start=1.2.2018 08:53:15, end=5.2.2018 16:24:31, this method will append to filters
# [(year=2018, month=2,day<=5, filter_column<5.2.2018 16:24:31)]
_find_filter_helper(partitions_time_attributes, end, "<", "<=", first_uncommon, filters, filter_column)

0 comments on commit 05b6db8

Please sign in to comment.