diff --git a/storey/sources.py b/storey/sources.py index b153dc92..7b9acd66 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -18,6 +18,7 @@ import queue import threading import time +import traceback import uuid import warnings import weakref @@ -343,6 +344,12 @@ async def _run_loop(self): await _commit_handled_events(self._outstanding_offsets, committer, commit_all=True) self._termination_future.set_result(termination_result) except BaseException as ex: + if self.logger: + message = "An error was raised" + raised_by = getattr(ex, "_raised_by_storey_step", None) + if raised_by: + message += f" by step {type(raised_by)}" + self.logger.error(f"{message}: {traceback.format_exc()}") if event is not _termination_obj and event._awaitable_result: event._awaitable_result._set_error(ex) self._ex = ex @@ -362,7 +369,7 @@ async def _run_loop(self): await maybe_coroutine except Exception as ex: if self.context: - self.context.logger.error(f"Error trying to close {closeable}: {ex}") + self.logger.error(f"Error trying to close {closeable}: {ex}") def _loop_thread_main(self): asyncio.run(self._run_loop()) @@ -638,6 +645,12 @@ async def _run_loop(self): await _commit_handled_events(self._outstanding_offsets, committer, commit_all=True) return termination_result except BaseException as ex: + if self.logger: + message = "An error was raised" + raised_by = getattr(ex, "_raised_by_storey_step", None) + if raised_by: + message += f" by step {type(raised_by)}" + self.logger.error(f"{message}: {traceback.format_exc()}") self._ex = ex if event is not _termination_obj and event._awaitable_result: awaitable = event._awaitable_result._set_error(ex) @@ -655,7 +668,7 @@ async def _run_loop(self): await maybe_coroutine except Exception as ex: if self.context: - self.context.logger.error(f"Error trying to close {closeable}: {ex}") + self.logger.error(f"Error trying to close {closeable}: {ex}") def _raise_on_error(self): if self._ex: @@ -801,7 +814,7 @@ async def _run_loop(self): none_keys = [key for key, value in zip(key_fields, keys) if pd.isna(value)] if none_keys: if self.context: - self.context.logger.error( + self.logger.error( f"Encountered null values in the following key fields:" f" {', '.join(none_keys)}, in line: {body}." ) @@ -1116,7 +1129,7 @@ async def _run_loop(self): key = [] for key_field in self._key_field: if key_field not in body or pandas.isna(body[key_field]): - self.context.logger.error( + self.logger.error( f"For {body} value there is no {self._key_field} " f"field (key_field)" ) break @@ -1124,9 +1137,7 @@ async def _run_loop(self): else: key = body.get(self._key_field, None) if key is None: - self.context.logger.error( - f"For {body} value there is no {self._key_field} field (key_field)" - ) + self.logger.error(f"For {body} value there is no {self._key_field} field (key_field)") if self._id_field: event_id = body[self._id_field] else: diff --git a/storey/targets.py b/storey/targets.py index f391c992..7585b382 100644 --- a/storey/targets.py +++ b/storey/targets.py @@ -473,8 +473,9 @@ def _event_to_batch_entry(self, event): return writer_entry async def _terminate(self): - asyncio.get_running_loop().run_in_executor(None, lambda: self._data_buffer.put(_termination_obj)) - await self._blocking_io_loop_future + if self._blocking_io_loop_future: + asyncio.get_running_loop().run_in_executor(None, lambda: self._data_buffer.put(_termination_obj)) + await self._blocking_io_loop_future async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_time=None): if not self._blocking_io_loop_future: diff --git a/tests/test_flow.py b/tests/test_flow.py index 1e851786..7151a9a7 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -2044,6 +2044,17 @@ def test_write_csv_error(tmpdir): asyncio.run(async_test_write_csv_error(tmpdir)) +# ML-5299 +def test_write_csv_with_zero_records(tmpdir): + file_path = f"{tmpdir}/test_write_csv_with_zero_records.csv" + controller = build_flow([SyncEmitSource(), CSVTarget(file_path, columns=["n", "n*10"], header=True)]).run() + + controller.terminate() + controller.await_termination() + + assert not os.path.isfile(file_path) + + def test_write_csv_with_dict(tmpdir): file_path = f"{tmpdir}/test_write_csv_with_dict.csv" controller = build_flow([SyncEmitSource(), CSVTarget(file_path, columns=["n", "n*10"], header=True)]).run()