From de893e150002b4ee36bd57fdef1aba33bb0acac2 Mon Sep 17 00:00:00 2001 From: khartld <144441108+khartld@users.noreply.github.com> Date: Wed, 8 May 2024 14:50:22 +0200 Subject: [PATCH] fix(modules.kafka): Use broker container IP instead of host IP for advertised broker listener (#1989) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix advertised broker ip port 9092 is not exposed to the host, so we can not connect to it via host ip, but need to use the broker container ip * Add fn to fetch the hostname * chore: use the new Inspect method instead * chore: add test --------- Co-authored-by: Manuel de la Peña Co-authored-by: Eddú Meléndez --- modules/kafka/kafka.go | 9 ++++++++- modules/kafka/kafka_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/modules/kafka/kafka.go b/modules/kafka/kafka.go index f5d49e9db9..34758e8824 100644 --- a/modules/kafka/kafka.go +++ b/modules/kafka/kafka.go @@ -72,12 +72,19 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return err } + inspect, err := c.Inspect(ctx) + if err != nil { + return err + } + + hostname := inspect.Config.Hostname + port, err := c.MappedPort(ctx, publicPort) if err != nil { return err } - scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), host) + scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname) return c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755) }, diff --git a/modules/kafka/kafka_test.go b/modules/kafka/kafka_test.go index 662bb5d0a8..16b8b76355 100644 --- a/modules/kafka/kafka_test.go +++ b/modules/kafka/kafka_test.go @@ -2,6 +2,7 @@ package kafka_test import ( "context" + "io" "strings" "testing" @@ -28,6 +29,8 @@ func TestKafka(t *testing.T) { } }) + assertAdvertisedListeners(t, kafkaContainer) + if !strings.EqualFold(kafkaContainer.ClusterID, "kraftCluster") { t.Fatalf("expected clusterID to be %s, got %s", "kraftCluster", kafkaContainer.ClusterID) } @@ -93,3 +96,32 @@ func TestKafka_invalidVersion(t *testing.T) { t.Fatal(err) } } + +// assertAdvertisedListeners checks that the advertised listeners are set correctly: +// - The BROKER:// protocol is using the hostname of the Kafka container +func assertAdvertisedListeners(t *testing.T, container testcontainers.Container) { + inspect, err := container.Inspect(context.Background()) + if err != nil { + t.Fatal(err) + } + + hostname := inspect.Config.Hostname + + code, r, err := container.Exec(context.Background(), []string{"cat", "/usr/sbin/testcontainers_start.sh"}) + if err != nil { + t.Fatal(err) + } + + if code != 0 { + t.Fatalf("expected exit code to be 0, got %d", code) + } + + bs, err := io.ReadAll(r) + if err != nil { + t.Fatal(err) + } + + if !strings.Contains(string(bs), "BROKER://"+hostname+":9092") { + t.Fatalf("expected advertised listeners to contain %s, got %s", "BROKER://"+hostname+":9092", string(bs)) + } +}