diff --git a/CHANGELOG.md b/CHANGELOG.md index 821f32776..08ee3fd52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ ### Features +* add option to Opensearch Output Connector to use parallel bulk implementation (default is True) + ### Improvements ### Bugfix diff --git a/logprep/connector/elasticsearch/output.py b/logprep/connector/elasticsearch/output.py index ccdaa366f..47992c0ca 100644 --- a/logprep/connector/elasticsearch/output.py +++ b/logprep/connector/elasticsearch/output.py @@ -311,6 +311,8 @@ def _write_backlog(self): max_retries=self._config.max_retries, chunk_size=len(self._message_backlog), ) + if self.input_connector and hasattr(self.input_connector, "batch_finished_callback"): + self.input_connector.batch_finished_callback() self._message_backlog.clear() def _bulk(self, client, actions, *args, **kwargs): @@ -324,8 +326,6 @@ def _bulk(self, client, actions, *args, **kwargs): self._handle_bulk_index_error(error) except search.exceptions.TransportError as error: self._handle_transport_error(error) - if self.input_connector and hasattr(self.input_connector, "batch_finished_callback"): - self.input_connector.batch_finished_callback() def _handle_serialization_error(self, error: search.SerializationError): """Handle serialization error for elasticsearch bulk indexing. diff --git a/logprep/connector/opensearch/output.py b/logprep/connector/opensearch/output.py index 5ef371f0c..f0d0bd0a9 100644 --- a/logprep/connector/opensearch/output.py +++ b/logprep/connector/opensearch/output.py @@ -72,6 +72,9 @@ class OpensearchOutput(ElasticsearchOutput): class Config(ElasticsearchOutput.Config): """Config for OpensearchOutput.""" + parallel_bulk: bool = field(default=True, validator=validators.instance_of(bool)) + """Configure if all events in the backlog should be send, in parallel, via multiple threads + to Opensearch. (Default: :code:`True`)""" thread_count: int = field( default=4, validator=[validators.instance_of(int), validators.gt(1)] ) @@ -112,18 +115,10 @@ def describe(self) -> str: def _bulk(self, client, actions, *args, **kwargs): try: - for success, item in helpers.parallel_bulk( - client, - actions=actions, - chunk_size=self._config.chunk_size, - queue_size=self._config.queue_size, - raise_on_error=True, - raise_on_exception=True, - ): - if not success: - result = item[list(item.keys())[0]] - if "error" in result: - raise result.get("error") + if self._config.parallel_bulk: + self._parallel_bulk(client, actions, *args, **kwargs) + return + helpers.bulk(client, actions, *args, **kwargs) except search.SerializationError as error: self._handle_serialization_error(error) except search.ConnectionError as error: @@ -132,5 +127,17 @@ def _bulk(self, client, actions, *args, **kwargs): self._handle_bulk_index_error(error) except search.exceptions.TransportError as error: self._handle_transport_error(error) - if self.input_connector: - self.input_connector.batch_finished_callback() + + def _parallel_bulk(self, client, actions, *args, **kwargs): + for success, item in helpers.parallel_bulk( + client, + actions=actions, + chunk_size=self._config.chunk_size, + queue_size=self._config.queue_size, + raise_on_error=True, + raise_on_exception=True, + ): + if not success: + result = item[list(item.keys())[0]] + if "error" in result: + raise result.get("error") diff --git a/quickstart/exampledata/config/pipeline.yml b/quickstart/exampledata/config/pipeline.yml index 3890d2bdc..9a47d1a83 100644 --- a/quickstart/exampledata/config/pipeline.yml +++ b/quickstart/exampledata/config/pipeline.yml @@ -111,13 +111,11 @@ output: - 127.0.0.1:9200 default_index: processed error_index: errors - message_backlog_size: 16000 + message_backlog_size: 10000 timeout: 10000 flush_timeout: 60 max_retries: 3 - thread_count: 16 - queue_size: 32 - chunk_size: 500 + parallel_bulk: false user: admin secret: admin kafka: diff --git a/tests/unit/connector/test_opensearch_output.py b/tests/unit/connector/test_opensearch_output.py index e9c94b907..dce0cb178 100644 --- a/tests/unit/connector/test_opensearch_output.py +++ b/tests/unit/connector/test_opensearch_output.py @@ -33,6 +33,7 @@ class NotJsonSerializableMock: in_ci = os.environ.get("GITHUB_ACTIONS") == "true" helpers.parallel_bulk = mock.MagicMock() +helpers.bulk = mock.MagicMock() class TestOpenSearchOutput(BaseOutputTestCase): @@ -348,6 +349,7 @@ def test_setup_raises_fatal_output_error_if_opensearch_error_is_raised(self): self.object.setup() def test_setup_registers_flush_timout_tasks(self): + # this test fails if opensearch is running on localhost job_count = len(Component._scheduler.jobs) with pytest.raises(FatalOutputError): self.object.setup()