Skip to content

Commit

Permalink
fix(modules.kafka): Use broker container IP instead of host IP for ad…
Browse files Browse the repository at this point in the history
…vertised broker listener (#1989)

* Fix advertised broker ip

port 9092 is not exposed to the host, so we can not connect to it via host ip, but need to use the broker container ip

* Add fn to fetch the hostname

* chore: use the new Inspect method instead

* chore: add test

---------

Co-authored-by: Manuel de la Peña <[email protected]>
Co-authored-by: Eddú Meléndez <[email protected]>
  • Loading branch information
3 people authored May 8, 2024
1 parent 5fa6548 commit de893e1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
9 changes: 8 additions & 1 deletion modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,19 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return err
}

inspect, err := c.Inspect(ctx)
if err != nil {
return err
}

hostname := inspect.Config.Hostname

port, err := c.MappedPort(ctx, publicPort)
if err != nil {
return err
}

scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), host)
scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname)

return c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755)
},
Expand Down
32 changes: 32 additions & 0 deletions modules/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka_test

import (
"context"
"io"
"strings"
"testing"

Expand All @@ -28,6 +29,8 @@ func TestKafka(t *testing.T) {
}
})

assertAdvertisedListeners(t, kafkaContainer)

if !strings.EqualFold(kafkaContainer.ClusterID, "kraftCluster") {
t.Fatalf("expected clusterID to be %s, got %s", "kraftCluster", kafkaContainer.ClusterID)
}
Expand Down Expand Up @@ -93,3 +96,32 @@ func TestKafka_invalidVersion(t *testing.T) {
t.Fatal(err)
}
}

// assertAdvertisedListeners checks that the advertised listeners are set correctly:
// - The BROKER:// protocol is using the hostname of the Kafka container
func assertAdvertisedListeners(t *testing.T, container testcontainers.Container) {
inspect, err := container.Inspect(context.Background())
if err != nil {
t.Fatal(err)
}

hostname := inspect.Config.Hostname

code, r, err := container.Exec(context.Background(), []string{"cat", "/usr/sbin/testcontainers_start.sh"})
if err != nil {
t.Fatal(err)
}

if code != 0 {
t.Fatalf("expected exit code to be 0, got %d", code)
}

bs, err := io.ReadAll(r)
if err != nil {
t.Fatal(err)
}

if !strings.Contains(string(bs), "BROKER://"+hostname+":9092") {
t.Fatalf("expected advertised listeners to contain %s, got %s", "BROKER://"+hostname+":9092", string(bs))
}
}

0 comments on commit de893e1

Please sign in to comment.