Skip to content

Commit

Permalink
Fix CSVTarget termination after having written nothing (#475)
Browse files Browse the repository at this point in the history
* Add error log to get original traceback

* Log stack trace

* Fix termination of unused `CSVTarget`

* Add regression test

* `self.context.logger` -> `self.logger`
  • Loading branch information
gtopper authored Dec 17, 2023
1 parent fac7787 commit f3f11e8
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
25 changes: 18 additions & 7 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import queue
import threading
import time
import traceback
import uuid
import warnings
import weakref
Expand Down Expand Up @@ -343,6 +344,12 @@ async def _run_loop(self):
await _commit_handled_events(self._outstanding_offsets, committer, commit_all=True)
self._termination_future.set_result(termination_result)
except BaseException as ex:
if self.logger:
message = "An error was raised"
raised_by = getattr(ex, "_raised_by_storey_step", None)
if raised_by:
message += f" by step {type(raised_by)}"
self.logger.error(f"{message}: {traceback.format_exc()}")
if event is not _termination_obj and event._awaitable_result:
event._awaitable_result._set_error(ex)
self._ex = ex
Expand All @@ -362,7 +369,7 @@ async def _run_loop(self):
await maybe_coroutine
except Exception as ex:
if self.context:
self.context.logger.error(f"Error trying to close {closeable}: {ex}")
self.logger.error(f"Error trying to close {closeable}: {ex}")

def _loop_thread_main(self):
asyncio.run(self._run_loop())
Expand Down Expand Up @@ -638,6 +645,12 @@ async def _run_loop(self):
await _commit_handled_events(self._outstanding_offsets, committer, commit_all=True)
return termination_result
except BaseException as ex:
if self.logger:
message = "An error was raised"
raised_by = getattr(ex, "_raised_by_storey_step", None)
if raised_by:
message += f" by step {type(raised_by)}"
self.logger.error(f"{message}: {traceback.format_exc()}")
self._ex = ex
if event is not _termination_obj and event._awaitable_result:
awaitable = event._awaitable_result._set_error(ex)
Expand All @@ -655,7 +668,7 @@ async def _run_loop(self):
await maybe_coroutine
except Exception as ex:
if self.context:
self.context.logger.error(f"Error trying to close {closeable}: {ex}")
self.logger.error(f"Error trying to close {closeable}: {ex}")

def _raise_on_error(self):
if self._ex:
Expand Down Expand Up @@ -801,7 +814,7 @@ async def _run_loop(self):
none_keys = [key for key, value in zip(key_fields, keys) if pd.isna(value)]
if none_keys:
if self.context:
self.context.logger.error(
self.logger.error(
f"Encountered null values in the following key fields:"
f" {', '.join(none_keys)}, in line: {body}."
)
Expand Down Expand Up @@ -1116,17 +1129,15 @@ async def _run_loop(self):
key = []
for key_field in self._key_field:
if key_field not in body or pandas.isna(body[key_field]):
self.context.logger.error(
self.logger.error(
f"For {body} value there is no {self._key_field} " f"field (key_field)"
)
break
key.append(body[key_field])
else:
key = body.get(self._key_field, None)
if key is None:
self.context.logger.error(
f"For {body} value there is no {self._key_field} field (key_field)"
)
self.logger.error(f"For {body} value there is no {self._key_field} field (key_field)")
if self._id_field:
event_id = body[self._id_field]
else:
Expand Down
5 changes: 3 additions & 2 deletions storey/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,9 @@ def _event_to_batch_entry(self, event):
return writer_entry

async def _terminate(self):
asyncio.get_running_loop().run_in_executor(None, lambda: self._data_buffer.put(_termination_obj))
await self._blocking_io_loop_future
if self._blocking_io_loop_future:
asyncio.get_running_loop().run_in_executor(None, lambda: self._data_buffer.put(_termination_obj))
await self._blocking_io_loop_future

async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_time=None):
if not self._blocking_io_loop_future:
Expand Down
11 changes: 11 additions & 0 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2044,6 +2044,17 @@ def test_write_csv_error(tmpdir):
asyncio.run(async_test_write_csv_error(tmpdir))


# ML-5299
def test_write_csv_with_zero_records(tmpdir):
file_path = f"{tmpdir}/test_write_csv_with_zero_records.csv"
controller = build_flow([SyncEmitSource(), CSVTarget(file_path, columns=["n", "n*10"], header=True)]).run()

controller.terminate()
controller.await_termination()

assert not os.path.isfile(file_path)


def test_write_csv_with_dict(tmpdir):
file_path = f"{tmpdir}/test_write_csv_with_dict.csv"
controller = build_flow([SyncEmitSource(), CSVTarget(file_path, columns=["n", "n*10"], header=True)]).run()
Expand Down

0 comments on commit f3f11e8

Please sign in to comment.