From 50467590ae0c5746cada5ab9add65b335dd495f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Zimmermann?= <101292599+ekneg54@users.noreply.github.com> Date: Tue, 19 Dec 2023 16:46:02 +0100 Subject: [PATCH] fix opensearch output connector (#498) * use generator parallel_bulk * update changelog and prepare release * add MSGSPECSerializer * handle serialisation error as in jsonserializer --- .../publish-latest-dev-release-to-pypi.yml | 2 +- .vscode/settings.json | 2 +- CHANGELOG.md | 14 ++++- logprep/connector/elasticsearch/output.py | 4 +- logprep/connector/opensearch/output.py | 63 ++++++++++++++----- quickstart/docker-compose.yml | 6 +- quickstart/exampledata/config/pipeline.yml | 21 ++++++- .../config/prometheus/prometheus.yml | 2 +- .../unit/connector/test_opensearch_output.py | 40 ++++++++++-- 9 files changed, 122 insertions(+), 32 deletions(-) diff --git a/.github/workflows/publish-latest-dev-release-to-pypi.yml b/.github/workflows/publish-latest-dev-release-to-pypi.yml index c80893953..260a9f33f 100644 --- a/.github/workflows/publish-latest-dev-release-to-pypi.yml +++ b/.github/workflows/publish-latest-dev-release-to-pypi.yml @@ -17,7 +17,7 @@ jobs: - name: Initialize Python uses: actions/setup-python@v1 with: - python-version: 3.10 + python-version: "3.10" - name: Install dependencies run: | diff --git a/.vscode/settings.json b/.vscode/settings.json index 295cdc3c6..6a0cca048 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -9,7 +9,7 @@ 120 ], "editor.codeActionsOnSave": { - "source.organizeImports": true + "source.organizeImports": "explicit" }, "autoDocstring.docstringFormat": "numpy", "files.exclude": { diff --git a/CHANGELOG.md b/CHANGELOG.md index 81746c4d9..821f32776 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,12 +5,24 @@ ### Features -* make thread_count configurable for parallel_bulk in opensearch output connector +### Improvements + +### Bugfix + + +## v9.0.3 +### Breaking + +### Features + +* make `thread_count`, `queue_size` and `chunk_size` configurable for `parallel_bulk` in opensearch output connector ### Improvements ### Bugfix +* fix `parallel_bulk` implementation not delivering messages to opensearch + ## v9.0.2 ### Bugfix diff --git a/logprep/connector/elasticsearch/output.py b/logprep/connector/elasticsearch/output.py index b7806e67b..ccdaa366f 100644 --- a/logprep/connector/elasticsearch/output.py +++ b/logprep/connector/elasticsearch/output.py @@ -313,9 +313,9 @@ def _write_backlog(self): ) self._message_backlog.clear() - def _bulk(self, *args, **kwargs): + def _bulk(self, client, actions, *args, **kwargs): try: - helpers.bulk(*args, **kwargs) + helpers.bulk(client, actions, *args, **kwargs) except search.SerializationError as error: self._handle_serialization_error(error) except search.ConnectionError as error: diff --git a/logprep/connector/opensearch/output.py b/logprep/connector/opensearch/output.py index e0b2f7509..5ef371f0c 100644 --- a/logprep/connector/opensearch/output.py +++ b/logprep/connector/opensearch/output.py @@ -36,14 +36,35 @@ import opensearchpy as search from attrs import define, field, validators from opensearchpy import helpers +from opensearchpy.serializer import JSONSerializer from logprep.abc.output import Output from logprep.connector.elasticsearch.output import ElasticsearchOutput -from logprep.metrics.metrics import Metric logging.getLogger("opensearch").setLevel(logging.WARNING) +class MSGPECSerializer(JSONSerializer): + """A MSGPEC serializer""" + + def __init__(self, output_connector: Output, *args, **kwargs): + super().__init__(*args, **kwargs) + self._encoder = output_connector._encoder + self._decoder = output_connector._decoder + + def dumps(self, data): + # don't serialize strings + if isinstance(data, str): + return data + try: + return self._encoder.encode(data).decode("utf-8") + except (ValueError, TypeError) as e: + raise search.exceptions.SerializationError(data, e) + + def loads(self, data): + return self._decoder.decode(data) + + class OpensearchOutput(ElasticsearchOutput): """An OpenSearch output connector.""" @@ -56,6 +77,16 @@ class Config(ElasticsearchOutput.Config): ) """Number of threads to use for bulk requests.""" + queue_size: int = field( + default=4, validator=[validators.instance_of(int), validators.gt(1)] + ) + """Number of queue size to use for bulk requests.""" + + chunk_size: int = field( + default=500, validator=[validators.instance_of(int), validators.gt(1)] + ) + """Chunk size to use for bulk requests.""" + @cached_property def _search_context(self): return search.OpenSearch( @@ -64,6 +95,7 @@ def _search_context(self): http_auth=self.http_auth, ssl_context=self.ssl_context, timeout=self._config.timeout, + serializer=MSGPECSerializer(self), ) def describe(self) -> str: @@ -78,23 +110,20 @@ def describe(self) -> str: base_description = Output.describe(self) return f"{base_description} - Opensearch Output: {self._config.hosts}" - @Metric.measure_time() - def _write_backlog(self): - if not self._message_backlog: - return - - self._bulk( - self._search_context, - self._message_backlog, - max_retries=self._config.max_retries, - chunk_size=len(self._message_backlog) / self._config.thread_count, - thread_count=self._config.thread_count, - ) - self._message_backlog.clear() - - def _bulk(self, *args, **kwargs): + def _bulk(self, client, actions, *args, **kwargs): try: - helpers.parallel_bulk(*args, **kwargs) + for success, item in helpers.parallel_bulk( + client, + actions=actions, + chunk_size=self._config.chunk_size, + queue_size=self._config.queue_size, + raise_on_error=True, + raise_on_exception=True, + ): + if not success: + result = item[list(item.keys())[0]] + if "error" in result: + raise result.get("error") except search.SerializationError as error: self._handle_serialization_error(error) except search.ConnectionError as error: diff --git a/quickstart/docker-compose.yml b/quickstart/docker-compose.yml index d8cac93b2..d3d716014 100644 --- a/quickstart/docker-compose.yml +++ b/quickstart/docker-compose.yml @@ -60,7 +60,7 @@ services: - ALLOW_PLAINTEXT_LISTENER=yes volumes: - /var/run/docker.sock:/var/run/docker.sock - command: sh -c "((sleep 15 && echo 'kafka up' && kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 3 --topic consumer)&) && /opt/bitnami/scripts/kafka/run.sh" + command: sh -c "((sleep 15 && echo 'kafka up' && kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic consumer)&) && /opt/bitnami/scripts/kafka/run.sh" healthcheck: test: [ @@ -92,6 +92,7 @@ services: depends_on: - kafka - opensearch + volumes: - ../quickstart/:/home/logprep/quickstart/ entrypoint: @@ -166,3 +167,6 @@ services: - ../quickstart/exampledata/config/postgresql:/docker-entrypoint-initdb.d volumes: data: + +networks: + opensearch-net: diff --git a/quickstart/exampledata/config/pipeline.yml b/quickstart/exampledata/config/pipeline.yml index 4ff74e875..3890d2bdc 100644 --- a/quickstart/exampledata/config/pipeline.yml +++ b/quickstart/exampledata/config/pipeline.yml @@ -90,8 +90,19 @@ input: kafka_config: bootstrap.servers: 127.0.0.1:9092 group.id: cgroup3 - enable.auto.commit: "false" + enable.auto.commit: "true" + auto.commit.interval.ms: "10000" enable.auto.offset.store: "false" + queued.min.messages: "100000" + queued.max.messages.kbytes: "65536" + statistics.interval.ms: "60000" + preprocessing: + version_info_target_field: Logprep_version_info + log_arrival_time_target_field: event.ingested + hmac: + target: + key: "thisisasecureandrandomkey" + output_field: Full_event output: opensearch: @@ -100,10 +111,13 @@ output: - 127.0.0.1:9200 default_index: processed error_index: errors - message_backlog_size: 1000 + message_backlog_size: 16000 timeout: 10000 flush_timeout: 60 max_retries: 3 + thread_count: 16 + queue_size: 32 + chunk_size: 500 user: admin secret: admin kafka: @@ -113,4 +127,5 @@ output: error_topic: errors flush_timeout: 300 kafka_config: - bootstrap.servers: 127.0.0.1:9092 \ No newline at end of file + bootstrap.servers: 127.0.0.1:9092 + statistics.interval.ms: "60000" diff --git a/quickstart/exampledata/config/prometheus/prometheus.yml b/quickstart/exampledata/config/prometheus/prometheus.yml index f3642da75..8fdd847e4 100644 --- a/quickstart/exampledata/config/prometheus/prometheus.yml +++ b/quickstart/exampledata/config/prometheus/prometheus.yml @@ -29,7 +29,7 @@ scrape_configs: - targets: ["localhost:9090"] - job_name: "logprep" static_configs: - - targets: ["localhost:8000"] + - targets: ["localhost:8000", "localhost:8001"] - job_name: "kafka" metrics_path: "/metrics" static_configs: diff --git a/tests/unit/connector/test_opensearch_output.py b/tests/unit/connector/test_opensearch_output.py index 67f49cc84..e9c94b907 100644 --- a/tests/unit/connector/test_opensearch_output.py +++ b/tests/unit/connector/test_opensearch_output.py @@ -5,7 +5,10 @@ # pylint: disable=attribute-defined-outside-init # pylint: disable=too-many-arguments import json +import os import re +import time +import uuid from datetime import datetime from math import isclose from unittest import mock @@ -17,6 +20,8 @@ from logprep.abc.component import Component from logprep.abc.output import FatalOutputError +from logprep.connector.opensearch.output import OpensearchOutput +from logprep.factory import Factory from logprep.util.time import TimeParser from tests.unit.connector.base import BaseOutputTestCase @@ -25,13 +30,15 @@ class NotJsonSerializableMock: pass -helpers.bulk = mock.MagicMock() +in_ci = os.environ.get("GITHUB_ACTIONS") == "true" + +helpers.parallel_bulk = mock.MagicMock() class TestOpenSearchOutput(BaseOutputTestCase): CONFIG = { "type": "opensearch_output", - "hosts": ["host:123"], + "hosts": ["localhost:9200"], "default_index": "default_index", "error_index": "error_index", "message_backlog_size": 1, @@ -41,7 +48,7 @@ class TestOpenSearchOutput(BaseOutputTestCase): def test_describe_returns_output(self): assert ( self.object.describe() - == "OpensearchOutput (Test Instance Name) - Opensearch Output: ['host:123']" + == "OpensearchOutput (Test Instance Name) - Opensearch Output: ['localhost:9200']" ) def test_store_sends_to_default_index(self): @@ -193,8 +200,7 @@ def test_handle_bulk_index_error_calls_bulk_with_error_documents(self, fake_bulk } ] self.object._handle_bulk_index_error(mock_bulk_index_error) - call_args = fake_bulk.call_args[0][1] - error_document = call_args[0] + error_document = fake_bulk.call_args.kwargs.get("actions").pop() assert "reason" in error_document assert "@timestamp" in error_document assert "_index" in error_document @@ -360,3 +366,27 @@ def test_message_backlog_is_cleared_after_it_was_written(self): self.object._config.message_backlog_size = 1 self.object.store({"event": "test_event"}) assert len(self.object._message_backlog) == 0 + + @pytest.mark.skip(reason="This test is only for local debugging") + def test_opensearch_parallel_bulk(self): + config = { + "type": "opensearch_output", + "hosts": ["localhost:9200"], + "default_index": "default_index", + "error_index": "error_index", + "message_backlog_size": 1, + "timeout": 5000, + } + output: OpensearchOutput = Factory.create({"opensearch_output": config}, mock.MagicMock()) + uuid_str = str(uuid.uuid4()) + result = output._search_context.search( + index="defaultindex", body={"query": {"match": {"foo": uuid_str}}} + ) + len_before = len(result["hits"]["hits"]) + output._message_backlog = [{"foo": uuid_str, "_index": "defaultindex"}] + output._write_backlog() + time.sleep(1) + result = output._search_context.search( + index="defaultindex", body={"query": {"match": {"foo": uuid_str}}} + ) + assert len(result["hits"]["hits"]) > len_before