Skip to content

Commit

Permalink
fix(kafka): wait_for_logs in kafka container to reduce lib requirement (
Browse files Browse the repository at this point in the history
#377)

Use `wait_for_logs` to wait for startup instead of waiting for
successful connection via kafka-python. Also removes the dependency on
kafka-python.

Closes #351

---------

Co-authored-by: Gudjon Ragnar Brynjarsson <[email protected]>
  • Loading branch information
gudjonragnar and Gudjon Ragnar Brynjarsson authored Mar 24, 2024
1 parent e962189 commit 909107b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 23 deletions.
15 changes: 3 additions & 12 deletions modules/kafka/testcontainers/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
from io import BytesIO
from textwrap import dedent

from kafka import KafkaConsumer
from kafka.errors import KafkaError, NoBrokersAvailable, UnrecognizedBrokerVersion
from testcontainers.core.container import DockerContainer
from testcontainers.core.utils import raise_for_deprecated_parameter
from testcontainers.core.waiting_utils import wait_container_is_ready
from testcontainers.core.waiting_utils import wait_for_logs


class KafkaContainer(DockerContainer):
Expand Down Expand Up @@ -47,13 +45,6 @@ def get_bootstrap_server(self) -> str:
port = self.get_exposed_port(self.port)
return f"{host}:{port}"

@wait_container_is_ready(UnrecognizedBrokerVersion, NoBrokersAvailable, KafkaError, ValueError)
def _connect(self) -> None:
bootstrap_server = self.get_bootstrap_server()
consumer = KafkaConsumer(group_id="test", bootstrap_servers=[bootstrap_server])
if not consumer.bootstrap_connected():
raise KafkaError("Unable to connect with kafka container!")

def tc_start(self) -> None:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.port)
Expand All @@ -78,13 +69,13 @@ def tc_start(self) -> None:
)
self.create_file(data, KafkaContainer.TC_START_SCRIPT)

def start(self) -> "KafkaContainer":
def start(self, timeout=30) -> "KafkaContainer":
script = KafkaContainer.TC_START_SCRIPT
command = f'sh -c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
self.with_command(command)
super().start()
self.tc_start()
self._connect()
wait_for_logs(self, r".*\[KafkaServer id=\d+\] started.*", timeout=timeout)
return self

def create_file(self, content: bytes, path: str) -> None:
Expand Down
20 changes: 12 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ influxdb = { version = "*", optional = true }
influxdb-client = { version = "*", optional = true }
kubernetes = { version = "*", optional = true }
pyyaml = { version = "*", optional = true }
kafka-python = { version = "*", optional = true }
python-keycloak = { version = "*", optional = true }
boto3 = { version = "*", optional = true }
minio = { version = "*", optional = true }
Expand All @@ -93,7 +92,7 @@ elasticsearch = []
google = ["google-cloud-pubsub"]
influxdb = ["influxdb", "influxdb-client"]
k3s = ["kubernetes", "pyyaml"]
kafka = ["kafka-python"]
kafka = []
keycloak = ["python-keycloak"]
localstack = ["boto3"]
minio = ["minio"]
Expand Down Expand Up @@ -121,7 +120,7 @@ anyio = "^4.3.0"
psycopg2-binary = "*"
pg8000 = "*"
sqlalchemy = "*"

kafka-python = "^2.0.2"

[[tool.poetry.source]]
name = "PyPI"
Expand Down

0 comments on commit 909107b

Please sign in to comment.