Skip to content

Commit

Permalink
feat: additional advertised listners/broker addresses for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
ghthor committed Dec 15, 2024
1 parent 35bf0cd commit 8cffdbf
Show file tree
Hide file tree
Showing 4 changed files with 564 additions and 23 deletions.
40 changes: 40 additions & 0 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ The Kafka container will be started using a custom shell script:
[Init script](../../modules/kafka/kafka.go) inside_block:starterScript
<!--/codeinclude-->

That will set the advertised listeners with these values:

<!--codeinclude-->
[Advertised Listeners](../../modules/kafka/kafka.go) inside_block:advertisedListeners
<!--/codeinclude-->

KafkaContainer provides methods to read the broker addresses for different
connectivity environments.

#### Environment variables

The environment variables that are already set by default are:
Expand All @@ -82,3 +91,34 @@ The `Brokers(ctx)` method returns the Kafka brokers as a string slice, containin
<!--codeinclude-->
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:getBrokers
<!--/codeinclude-->
#### BrokersByHostDockerInternal
The `BrokersByHostDockerInternal(ctx)` method returns the Kafka brokers as a
string slice, containing the hostname `host.docker.internal` and a random port
defined by Kafka's public port (`19092/tcp`).

This method is useful when you need to run additional containers that need to
connect to Kafka.

<!--codeinclude-->
[Get Kafka brokers by host.docker.internal](../../modules/kafka/examples_test.go) inside_block:getBrokersByHostDockerInternal
<!--/codeinclude-->

#### BrokersByContainerName

The `BrokersByContainerName(ctx)` method returns the Kafka brokers as a string
slice, addressed by the container's name(`Ex: charming_dijkstra:19093`). This
method is useful when you need to run additional containers that need to connect
to Kafka.
To use this broker address you should run all the containers inside a docker
network.
<!--codeinclude-->
[Start Kafka inside a docker network](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kafka
<!--/codeinclude-->
<!--codeinclude-->
[Get Kafka brokers by container name](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kcat
<!--/codeinclude-->
303 changes: 303 additions & 0 deletions modules/kafka/examples_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package kafka_test

import (
"bytes"
"context"
"fmt"
"io"
"log"
"strings"

"github.com/IBM/sarama"
"github.com/docker/docker/api/types/container"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/kafka"
"github.com/testcontainers/testcontainers-go/network"
"github.com/testcontainers/testcontainers-go/wait"
)

func ExampleRun() {
Expand Down Expand Up @@ -41,3 +48,299 @@ func ExampleRun() {
// test-cluster
// true
}

func ExampleKafkaContainer_BrokersByHostDockerInternal() {
ctx := context.Background()

kafkaContainer, err := kafka.Run(ctx,
"confluentinc/confluent-local:7.5.0",
kafka.WithClusterID("test-cluster"),
)
if err != nil {
log.Fatalf("failed to start container: %s", err)
}

// Clean up the container after
defer func() {
if err := kafkaContainer.Terminate(ctx); err != nil {
log.Fatalf("failed to terminate container: %s", err)
}
}()

{
state, err := kafkaContainer.State(ctx)
if err != nil {
log.Fatalf("failed to get container state: %s", err) // nolint:gocritic

Check failure on line 73 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.22.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.22.x

directive `// nolint:gocritic` should be written without leading space as `//nolint:gocritic` (nolintlint)

Check failure on line 73 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.x

directive `// nolint:gocritic` should be written without leading space as `//nolint:gocritic` (nolintlint)
}

fmt.Println(kafkaContainer.ClusterID)
fmt.Println(state.Running)
}

const topic = "example-topic"

// Produce a message from the host that will be read by a consumer in another docker container
{
brokers, err := kafkaContainer.Brokers(ctx)

Check failure on line 84 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.22.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.22.x

ineffectual assignment to err (ineffassign)

Check failure on line 84 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.x

ineffectual assignment to err (ineffassign)

config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatal(err)
}

if _, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("key"),
Value: sarama.StringEncoder("example_message_value"),
}); err != nil {
log.Fatal(err)
}

}

// getBrokersByHostDockerInternal {
brokers, err := kafkaContainer.BrokersByHostDockerInternal(ctx)
if err != nil {
log.Fatal(err)
}

// Run another container that can connect to the kafka container via hostname "host.docker.internal"
kcat, err := testcontainers.GenericContainer(
ctx,
testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "confluentinc/cp-kafkacat",
Entrypoint: []string{"kafkacat"},
Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"},
WaitingFor: wait.ForExit(),

// Add host.docker.internal to the consumer container so it can contact the kafka borkers
HostConfigModifier: func(hc *container.HostConfig) {
hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway")
},
},
Started: true,
},
)
if err != nil {
log.Fatalf("kafkacat error: %v", err)
}

lr, err := kcat.Logs(ctx)
if err != nil {
log.Fatalf("kafkacat logs error: %v", err)
}

logs, err := io.ReadAll(lr)
if err != nil {
log.Fatalf("kafkacat logs read error: %v", err)
}

fmt.Println("read message:", string(bytes.TrimSpace(logs)))
// }

// Output:
// test-cluster
// true
// read message: example_message_value
}

func ExampleKafkaContainer_BrokersByContainerName() {
ctx := context.Background()

// getBrokersByContainerName_Kafka {
net, err := network.New(ctx)
if err != nil {
log.Fatalf("failed to create network: %s", err)
}

kafkaContainer, err := kafka.Run(ctx,
"confluentinc/confluent-local:7.5.0",
kafka.WithClusterID("test-cluster"),
network.WithNetwork(nil, net), // Run kafka test container in a new docker network
)
if err != nil {
log.Fatalf("failed to start container: %s", err)
}
// }

// Clean up the container after
defer func() {
if err := kafkaContainer.Terminate(ctx); err != nil {
log.Fatalf("failed to terminate container: %s", err)
}
}()

{
state, err := kafkaContainer.State(ctx)
if err != nil {
log.Fatalf("failed to get container state: %s", err) // nolint:gocritic

Check failure on line 179 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.22.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.22.x

directive `// nolint:gocritic` should be written without leading space as `//nolint:gocritic` (nolintlint)

Check failure on line 179 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.x

directive `// nolint:gocritic` should be written without leading space as `//nolint:gocritic` (nolintlint)
}

fmt.Println(kafkaContainer.ClusterID)
fmt.Println(state.Running)
}

const topic = "example-topic"

// Produce a message from the host that will be read by a consumer in another docker container
{
brokers, err := kafkaContainer.Brokers(ctx)

Check failure on line 190 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.22.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.22.x

ineffectual assignment to err (ineffassign)

Check failure on line 190 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.x

ineffectual assignment to err (ineffassign)

config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatal(err)
}

if _, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("key"),
Value: sarama.StringEncoder("example_message_value"),
}); err != nil {
log.Fatal(err)
}
}

// getBrokersByContainerName_Kcat {
brokers, err := kafkaContainer.BrokersByContainerName(ctx)
if err != nil {
log.Fatal(err)
}

// Run another container that can connect to the kafka container via the kafka containers name
kcat, err := testcontainers.GenericContainer(
ctx,
testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "confluentinc/cp-kafkacat",
Entrypoint: []string{"kafkacat"},
Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"},
WaitingFor: wait.ForExit(),
Networks: []string{net.Name}, // Run kafkacat in the same docker network as the testcontainer
},
Started: true,
},
)
if err != nil {
log.Fatalf("kafkacat error: %v", err)
}

lr, err := kcat.Logs(ctx)
if err != nil {
log.Fatalf("kafkacat logs error: %v", err)
}

logs, err := io.ReadAll(lr)
if err != nil {
log.Fatalf("kafkacat logs read error: %v", err)
}

fmt.Println("read message:", string(bytes.TrimSpace(logs)))
// }

// Output:
// test-cluster
// true
// read message: example_message_value
}

func ExampleKafkaContainer_BrokersByContainerId() {
ctx := context.Background()

net, err := network.New(ctx)
if err != nil {
log.Fatalf("failed to create network: %s", err)
}

kafkaContainer, err := kafka.Run(ctx,
"confluentinc/confluent-local:7.5.0",
kafka.WithClusterID("test-cluster"),
network.WithNetwork(nil, net), // Run kafka test container in a new docker network
)
if err != nil {
log.Fatalf("failed to start container: %s", err)
}

// Clean up the container after
defer func() {
if err := kafkaContainer.Terminate(ctx); err != nil {
log.Fatalf("failed to terminate container: %s", err)
}
}()

{
state, err := kafkaContainer.State(ctx)
if err != nil {
log.Fatalf("failed to get container state: %s", err) // nolint:gocritic

Check failure on line 278 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.22.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.22.x

directive `// nolint:gocritic` should be written without leading space as `//nolint:gocritic` (nolintlint)

Check failure on line 278 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.x

directive `// nolint:gocritic` should be written without leading space as `//nolint:gocritic` (nolintlint)
}

fmt.Println(kafkaContainer.ClusterID)
fmt.Println(state.Running)
}

const topic = "example-topic"

// Produce a message from the host that will be read by a consumer in another docker container
{
brokers, err := kafkaContainer.Brokers(ctx)

Check failure on line 289 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.22.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.22.x

ineffectual assignment to err (ineffassign)

Check failure on line 289 in modules/kafka/examples_test.go

View workflow job for this annotation

GitHub Actions / test-modules (1.x, ubuntu-latest, kafka) / modules/kafka/ubuntu-latest/1.x

ineffectual assignment to err (ineffassign)

config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatal(err)
}

if _, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("key"),
Value: sarama.StringEncoder("example_message_value"),
}); err != nil {
log.Fatal(err)
}
}

brokers, err := kafkaContainer.BrokersByContainerId(ctx)
if err != nil {
log.Fatal(err)
}

// Run another container that can connect to the kafka container via the kafka containers ContainerID
kcat, err := testcontainers.GenericContainer(
ctx,
testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "confluentinc/cp-kafkacat",
Entrypoint: []string{"kafkacat"},
Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"},
WaitingFor: wait.ForExit(),
Networks: []string{net.Name}, // Run kafkacat in the same docker network as the testcontainer
},
Started: true,
},
)
if err != nil {
log.Fatalf("kafkacat error: %v", err)
}

lr, err := kcat.Logs(ctx)
if err != nil {
log.Fatalf("kafkacat logs error: %v", err)
}

logs, err := io.ReadAll(lr)
if err != nil {
log.Fatalf("kafkacat logs read error: %v", err)
}

fmt.Println("read message:", string(bytes.TrimSpace(logs)))

// Output:
// test-cluster
// true
// read message: example_message_value
}
Loading

0 comments on commit 8cffdbf

Please sign in to comment.