Skip to content

Commit

Permalink
Remove nanoseconds when reading from CSV & writing to parquet (#188)
Browse files Browse the repository at this point in the history
* Remove nanoseconds when reading from CSV & writing to parquet

* Removed bad timestamp test

* WriteToParquet: Remove nanosec from timestamp index columns

* Fix indentation
  • Loading branch information
urihoenig authored Mar 21, 2021
1 parent bb3651d commit d75b19f
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 12 deletions.
2 changes: 1 addition & 1 deletion storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def _parse_field(self, field, index):

def _datetime_from_timestamp(self, timestamp):
if self._timestamp_format:
return datetime.strptime(timestamp, self._timestamp_format)
return pandas.to_datetime(timestamp, format=self._timestamp_format).floor('u').to_pydatetime()
else:
return datetime.fromisoformat(timestamp)

Expand Down
6 changes: 6 additions & 0 deletions storey/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,12 @@ async def _emit(self, batch, batch_key, batch_time):
if dir_path:
self._file_system.makedirs(dir_path, exist_ok=True)
file_path = self._path if self._single_file_mode else f'{dir_path}{uuid.uuid4()}.parquet'
# Remove nanosecs from timestamp columns & index
for name, _ in df.items():
if pd.core.dtypes.common.is_datetime64_dtype(df[name]):
df[name] = df[name].astype('datetime64[us]')
if pd.core.dtypes.common.is_datetime64_dtype(df.index):
df.index = df.index.floor('u')
with self._file_system.open(file_path, 'wb') as file:
df.to_parquet(path=file, index=bool(self._index_cols))

Expand Down
3 changes: 0 additions & 3 deletions tests/test-with-timestamp-bad.csv

This file was deleted.

3 changes: 3 additions & 0 deletions tests/test-with-timestamp-nanosecs.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
k,t
m1,15/02/2020 02:03:04.123456789
m2,16/02/2020 02:03:04.123456789
41 changes: 33 additions & 8 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,24 @@ async def async_dataframe_source():
def test_async_dataframe_source():
asyncio.run(async_test_async_source())

def test_write_parquet_timestamp_nanosecs(tmpdir):
out_dir = f'{tmpdir}/test_write_parquet_timestamp_nanosecs/{uuid.uuid4().hex}/'
columns=['string', 'timestamp1', 'timestamp2']
df = pd.DataFrame([['hello', pd.Timestamp('2020-01-26 14:52:37.12325679'), pd.Timestamp('2020-01-26 12:41:37.123456789')], ['world', pd.Timestamp('2018-05-11 13:52:37.333421789'), pd.Timestamp('2020-01-14 14:52:37.987654321')]], columns=columns)
df.set_index(keys=['timestamp1'], inplace=True)
controller = build_flow([
DataframeSource(df),
WriteToParquet(out_dir, columns=['string', 'timestamp2'], partition_cols=[], index_cols='timestamp1')
]).run()
controller.await_termination()

controller = build_flow([
ReadParquet(out_dir), Reduce([], append_and_return),
]).run()

termination_result = controller.await_termination()
expected = [{'string': 'hello', 'timestamp1': pd.Timestamp('2020-01-26 14:52:37.123256'), 'timestamp2': pd.Timestamp('2020-01-26 12:41:37.123456')}, {'string': 'world', 'timestamp1': pd.Timestamp('2018-05-11 13:52:37.333421'), 'timestamp2': pd.Timestamp('2020-01-14 14:52:37.987654')}]
assert termination_result == expected

def test_read_parquet():
controller = build_flow([
Expand Down Expand Up @@ -1866,17 +1884,24 @@ def test_csv_reader_parquet_write_microsecs(tmpdir):

assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}"

def test_csv_reader_parquet_write_nanosecs(tmpdir):
out_file = f'{tmpdir}/test_csv_reader_parquet_write_nanosecs_{uuid.uuid4().hex}/'
columns = ['k', 't']

def test_csv_reader_bad_timestamp(tmpdir):
time_format = '%d/%m/%Y %H:%M:%S.%f'
controller = build_flow([
ReadCSV('tests/test-with-timestamp-bad.csv', header=True, key_field='k',
time_field='t', timestamp_format='%d/%m/%Y %H:%M:%S.%f')
ReadCSV('tests/test-with-timestamp-nanosecs.csv', header=True, key_field='k',
time_field='t', timestamp_format=time_format),
WriteToParquet(out_file, columns=columns, max_events=2)
]).run()
try:
controller.await_termination()
assert False
except TypeError:
pass

expected = pd.DataFrame([['m1', datetime.strptime("15/02/2020 02:03:04.123456", time_format)],
['m2', datetime.strptime("16/02/2020 02:03:04.123456", time_format)]],
columns=columns)
controller.await_termination()
read_back_df = pd.read_parquet(out_file, columns=columns)

assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}"


def test_error_in_concurrent_by_key_task():
Expand Down

0 comments on commit d75b19f

Please sign in to comment.