diff --git a/CHANGELOG.md b/CHANGELOG.md index 653bbf2f9..addab1c0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ ### Bugfix * make the s3 connector actually use the `max_retries` parameter +* fixed a bug which leads to a `FatalOutputError` on handling `CriticalInputError` in pipeline ## v9.0.3 ### Breaking diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index ab51c616a..9b16f964e 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -58,7 +58,7 @@ def _inner(self: "Pipeline") -> Any: self.logger.error(str(error)) self.stop() except CriticalInputError as error: - if raw_input := error.raw_input and self._output: # pylint: disable=protected-access + if (raw_input := error.raw_input) and self._output: # pylint: disable=protected-access for _, output in self._output.items(): # pylint: disable=protected-access if output.default: output.store_failed(str(self), raw_input, {}) diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 22526d4c3..2e7061228 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -291,6 +291,9 @@ def raise_critical(timeout): str(CriticalInputError(self.pipeline._input, "mock input error", input_event)) ) assert self.pipeline._output["dummy"].store_failed.call_count == 1, "one error is stored" + self.pipeline._output["dummy"].store_failed.assert_called_with( + str(self.pipeline), input_event, {} + ) assert self.pipeline._output["dummy"].store.call_count == 0, "no event is stored" @mock.patch("logging.Logger.warning")