diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index ce3020485..18100bb5a 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -273,7 +273,8 @@ def _kafka_config(self) -> dict: DEFAULTS.update({"client.id": getfqdn()}) DEFAULTS.update( { - "group.instance.id": f"{getfqdn().strip('.')}-Pipeline{self.pipeline_index}-pid{os.getpid()}" + "group.instance.id": f"{getfqdn().strip('.')}-" + f"Pipeline{self.pipeline_index}-pid{os.getpid()}" } ) return DEFAULTS | self._config.kafka_config | injected_config @@ -315,7 +316,7 @@ def _error_callback(self, error: KafkaException) -> None: the error that occurred """ self.metrics.number_of_errors += 1 - logger.error(f"{self.describe()}: {error}") + logger.error(f"{self.describe()}: {error}", ()) def _stats_callback(self, stats: str) -> None: """Callback for statistics data. This callback is triggered by poll() @@ -378,7 +379,8 @@ def _commit_callback( if offset in SPECIAL_OFFSETS: offset = 0 labels = { - "description": f"topic: {self._config.topic} - partition: {topic_partition.partition}" + "description": f"topic: {self._config.topic} - " + f"partition: {topic_partition.partition}" } self.metrics.committed_offsets.add_with_labels(offset, labels) @@ -476,7 +478,8 @@ def batch_finished_callback(self) -> None: """ if self._enable_auto_offset_store: return - # in case the ConfluentKafkaInput._revoke_callback is triggered before the first message was polled + # in case the ConfluentKafkaInput._revoke_callback is triggered before the first message + # was polled if not self._last_valid_record: return try: diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 5d9df8f72..dca6b545e 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -269,7 +269,10 @@ def describe(self) -> str: """ base_description = super().describe() - return f"{base_description} - Kafka Output: {self._config.kafka_config.get('bootstrap.servers')}" + return ( + f"{base_description} - Kafka Output: " + f"{self._config.kafka_config.get('bootstrap.servers')}" + ) def store(self, document: dict) -> Optional[bool]: """Store a document in the producer topic.