From d75b19f09762e19a5adf7e43a3e42e1fb9340214 Mon Sep 17 00:00:00 2001 From: urihoenig Date: Sun, 21 Mar 2021 15:29:47 +0200 Subject: [PATCH] Remove nanoseconds when reading from CSV & writing to parquet (#188) * Remove nanoseconds when reading from CSV & writing to parquet * Removed bad timestamp test * WriteToParquet: Remove nanosec from timestamp index columns * Fix indentation --- storey/sources.py | 2 +- storey/writers.py | 6 ++++ tests/test-with-timestamp-bad.csv | 3 -- tests/test-with-timestamp-nanosecs.csv | 3 ++ tests/test_flow.py | 41 +++++++++++++++++++++----- 5 files changed, 43 insertions(+), 12 deletions(-) delete mode 100644 tests/test-with-timestamp-bad.csv create mode 100644 tests/test-with-timestamp-nanosecs.csv diff --git a/storey/sources.py b/storey/sources.py index ae6ef442..70bde992 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -521,7 +521,7 @@ def _parse_field(self, field, index): def _datetime_from_timestamp(self, timestamp): if self._timestamp_format: - return datetime.strptime(timestamp, self._timestamp_format) + return pandas.to_datetime(timestamp, format=self._timestamp_format).floor('u').to_pydatetime() else: return datetime.fromisoformat(timestamp) diff --git a/storey/writers.py b/storey/writers.py index fba6981c..2d539ded 100644 --- a/storey/writers.py +++ b/storey/writers.py @@ -373,6 +373,12 @@ async def _emit(self, batch, batch_key, batch_time): if dir_path: self._file_system.makedirs(dir_path, exist_ok=True) file_path = self._path if self._single_file_mode else f'{dir_path}{uuid.uuid4()}.parquet' + # Remove nanosecs from timestamp columns & index + for name, _ in df.items(): + if pd.core.dtypes.common.is_datetime64_dtype(df[name]): + df[name] = df[name].astype('datetime64[us]') + if pd.core.dtypes.common.is_datetime64_dtype(df.index): + df.index = df.index.floor('u') with self._file_system.open(file_path, 'wb') as file: df.to_parquet(path=file, index=bool(self._index_cols)) diff --git a/tests/test-with-timestamp-bad.csv b/tests/test-with-timestamp-bad.csv deleted file mode 100644 index 20ad871a..00000000 --- a/tests/test-with-timestamp-bad.csv +++ /dev/null @@ -1,3 +0,0 @@ -k,t -m1,15/02/2020 02:03:04.12345678 -m2,16/02/2020 02:03:04.12345678 diff --git a/tests/test-with-timestamp-nanosecs.csv b/tests/test-with-timestamp-nanosecs.csv new file mode 100644 index 00000000..368d72eb --- /dev/null +++ b/tests/test-with-timestamp-nanosecs.csv @@ -0,0 +1,3 @@ +k,t +m1,15/02/2020 02:03:04.123456789 +m2,16/02/2020 02:03:04.123456789 diff --git a/tests/test_flow.py b/tests/test_flow.py index f03c94dd..09bf1a8a 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -235,6 +235,24 @@ async def async_dataframe_source(): def test_async_dataframe_source(): asyncio.run(async_test_async_source()) +def test_write_parquet_timestamp_nanosecs(tmpdir): + out_dir = f'{tmpdir}/test_write_parquet_timestamp_nanosecs/{uuid.uuid4().hex}/' + columns=['string', 'timestamp1', 'timestamp2'] + df = pd.DataFrame([['hello', pd.Timestamp('2020-01-26 14:52:37.12325679'), pd.Timestamp('2020-01-26 12:41:37.123456789')], ['world', pd.Timestamp('2018-05-11 13:52:37.333421789'), pd.Timestamp('2020-01-14 14:52:37.987654321')]], columns=columns) + df.set_index(keys=['timestamp1'], inplace=True) + controller = build_flow([ + DataframeSource(df), + WriteToParquet(out_dir, columns=['string', 'timestamp2'], partition_cols=[], index_cols='timestamp1') + ]).run() + controller.await_termination() + + controller = build_flow([ + ReadParquet(out_dir), Reduce([], append_and_return), + ]).run() + + termination_result = controller.await_termination() + expected = [{'string': 'hello', 'timestamp1': pd.Timestamp('2020-01-26 14:52:37.123256'), 'timestamp2': pd.Timestamp('2020-01-26 12:41:37.123456')}, {'string': 'world', 'timestamp1': pd.Timestamp('2018-05-11 13:52:37.333421'), 'timestamp2': pd.Timestamp('2020-01-14 14:52:37.987654')}] + assert termination_result == expected def test_read_parquet(): controller = build_flow([ @@ -1866,17 +1884,24 @@ def test_csv_reader_parquet_write_microsecs(tmpdir): assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}" +def test_csv_reader_parquet_write_nanosecs(tmpdir): + out_file = f'{tmpdir}/test_csv_reader_parquet_write_nanosecs_{uuid.uuid4().hex}/' + columns = ['k', 't'] -def test_csv_reader_bad_timestamp(tmpdir): + time_format = '%d/%m/%Y %H:%M:%S.%f' controller = build_flow([ - ReadCSV('tests/test-with-timestamp-bad.csv', header=True, key_field='k', - time_field='t', timestamp_format='%d/%m/%Y %H:%M:%S.%f') + ReadCSV('tests/test-with-timestamp-nanosecs.csv', header=True, key_field='k', + time_field='t', timestamp_format=time_format), + WriteToParquet(out_file, columns=columns, max_events=2) ]).run() - try: - controller.await_termination() - assert False - except TypeError: - pass + + expected = pd.DataFrame([['m1', datetime.strptime("15/02/2020 02:03:04.123456", time_format)], + ['m2', datetime.strptime("16/02/2020 02:03:04.123456", time_format)]], + columns=columns) + controller.await_termination() + read_back_df = pd.read_parquet(out_file, columns=columns) + + assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}" def test_error_in_concurrent_by_key_task():