Skip to content

Commit

Permalink
fix(kafka): add a flag to limit to first hostname for use with networ…
Browse files Browse the repository at this point in the history
…ks (#638)

fix #637
  • Loading branch information
alexanderankin authored Jul 4, 2024
1 parent 41fbdd0 commit 0ce4fec
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
21 changes: 20 additions & 1 deletion modules/kafka/testcontainers/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import tarfile
import time
from dataclasses import dataclass, field
from io import BytesIO
from os import environ
from textwrap import dedent

from typing_extensions import Self
Expand All @@ -14,7 +16,21 @@
__all__ = [
"KafkaContainer",
"RedpandaContainer",
"kafka_config",
]
LIMIT_BROKER_ENV_VAR = "TC_KAFKA_LIMIT_BROKER_TO_FIRST_HOST"


@dataclass
class _KafkaConfig:
limit_broker_to_first_host: bool = field(default_factory=lambda: environ.get(LIMIT_BROKER_ENV_VAR) == "true")
"""
This option is useful for a setup with a network,
see testcontainers/testcontainers-python#637 for more details
"""


kafka_config = _KafkaConfig()


class KafkaContainer(DockerContainer):
Expand Down Expand Up @@ -136,7 +152,10 @@ def get_bootstrap_server(self) -> str:
def tc_start(self) -> None:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.port)
listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i):9092"
if kafka_config.limit_broker_to_first_host:
listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i | cut -d' ' -f1):9092"
else:
listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i):9092"
data = (
dedent(
f"""
Expand Down
23 changes: 21 additions & 2 deletions modules/kafka/tests/test_kafka.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
import pytest
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer, TopicPartition

from testcontainers.kafka import KafkaContainer
from testcontainers.core.network import Network
from testcontainers.kafka import KafkaContainer, kafka_config


def test_kafka_producer_consumer():
Expand All @@ -20,6 +22,23 @@ def test_kafka_producer_consumer_custom_port():
produce_and_consume_kafka_message(container)


def test_kafka_on_networks(monkeypatch: pytest.MonkeyPatch):
"""
this test case comes from testcontainers/testcontainers-python#637
"""
monkeypatch.setattr(kafka_config, "limit_broker_to_first_host", True)

with Network() as network:
kafka_ctr = KafkaContainer()
kafka_ctr.with_network(network)
kafka_ctr.with_network_aliases("kafka")

with kafka_ctr:
print("started") # Will not reach here and timeout
admin_client = KafkaAdminClient(bootstrap_servers=[kafka_ctr.get_bootstrap_server()])
print(admin_client.describe_cluster())


def produce_and_consume_kafka_message(container):
topic = "test-topic"
bootstrap_server = container.get_bootstrap_server()
Expand Down

0 comments on commit 0ce4fec

Please sign in to comment.