diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index d2f951b043..5c1b1a3f40 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -71,6 +71,44 @@ The environment variables that are already set by default are: {% include "../features/common_functional_options.md" %} + +#### ClusterId + +You can set up cluster id by using `WithClusterID` option. + +``` +KafkaContainer, err = kafka.Run(ctx, + "confluentinc/confluent-local:7.6.1", + kafka.WithClusterID("test-cluster")) +``` + +#### Listeners + +If you need to connect new listeners, you can use `WithListener(listeners ...Listener)`. +This option controls the following environment variables for the Kafka container: +- `KAFKA_LISTENERS` +- `KAFKA_REST_BOOTSTRAP_SERVERS` +- `KAFKA_LISTENER_SECURITY_PROTOCOL_MAP` +- `KAFKA_INTER_BROKER_LISTENER_NAME` +- `KAFKA_ADVERTISED_LISTENERS` + +Example: + + +[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:kafkaWithListener + + +In the above code, we created a network for our container and attached kafka to it, so they can communicate. Then we marked port 9092 for our internal usage. + +The first listener in the slice will be written in the env parameter `KAFKA_INTER_BROKER_LISTENER_NAME` + +Every listener's name will be converted in upper case. Every name and port should be unique and will be checked in a validation step. + +If you are not using this option or the listeners list is empty, there will be 2 default listeners with the following addresses and ports: + +External - Host():MappedPort() +Internal - Host():9092 + ### Container Methods The Kafka container exposes the following methods: diff --git a/modules/kafka/consumer_test.go b/modules/kafka/consumer_test.go index 9df926e72d..4d95ba9ee5 100644 --- a/modules/kafka/consumer_test.go +++ b/modules/kafka/consumer_test.go @@ -54,3 +54,61 @@ func (k *TestKafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, cl } } } + +// Consumer represents a Sarama consumer group consumer +type TestConsumer struct { + t *testing.T + ready chan bool + messages []*sarama.ConsumerMessage +} + +func NewTestConsumer(t *testing.T) TestConsumer { + t.Helper() + + return TestConsumer{ + t: t, + ready: make(chan bool), + } +} + +// Setup is run at the beginning of a new session, before ConsumeClaim +func (c *TestConsumer) Setup(sarama.ConsumerGroupSession) error { + // Mark the consumer as ready + close(c.ready) + return nil +} + +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +func (consumer *TestConsumer) Cleanup(sarama.ConsumerGroupSession) error { + return nil +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). +// Once the Messages() channel is closed, the Handler must finish its processing +// loop and exit. +func (c *TestConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + // NOTE: + // Do not move the code below to a goroutine. + // The `ConsumeClaim` itself is called within a goroutine, see: + // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29 + for { + select { + case message, ok := <-claim.Messages(): + if !ok { + c.t.Log("message channel was closed") + return nil + } + c.t.Logf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) + session.MarkMessage(message, "") + + // Store the message to be consumed later + c.messages = append(c.messages, message) + + // Should return when `session.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/IBM/sarama/issues/1192 + case <-session.Context().Done(): + return nil + } + } +} diff --git a/modules/kafka/go.mod b/modules/kafka/go.mod index da1366a692..0416389461 100644 --- a/modules/kafka/go.mod +++ b/modules/kafka/go.mod @@ -3,7 +3,7 @@ module github.com/testcontainers/testcontainers-go/modules/kafka go 1.22 require ( - github.com/IBM/sarama v1.42.1 + github.com/IBM/sarama v1.43.3 github.com/docker/go-connections v0.5.0 github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.34.0 @@ -23,7 +23,7 @@ require ( github.com/distribution/reference v0.6.0 // indirect github.com/docker/docker v27.1.1+incompatible // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -41,7 +41,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.17.4 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/kr/text v0.2.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect @@ -53,7 +53,7 @@ require ( github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect - github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect @@ -69,8 +69,7 @@ require ( go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect golang.org/x/crypto v0.28.0 // indirect - golang.org/x/net v0.26.0 // indirect - golang.org/x/sync v0.7.0 // indirect + golang.org/x/net v0.28.0 // indirect golang.org/x/sys v0.26.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect diff --git a/modules/kafka/go.sum b/modules/kafka/go.sum index dfa6758b3f..5be28a065d 100644 --- a/modules/kafka/go.sum +++ b/modules/kafka/go.sum @@ -4,8 +4,8 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9 github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= -github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= -github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= +github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= +github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -32,8 +32,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= -github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= @@ -85,8 +85,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= -github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -111,8 +111,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= -github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= -github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -190,14 +190,14 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/modules/kafka/kafka.go b/modules/kafka/kafka.go index 73e392e1d2..1308598097 100644 --- a/modules/kafka/kafka.go +++ b/modules/kafka/kafka.go @@ -22,7 +22,7 @@ const ( // starterScript { starterScriptContent = `#!/bin/bash source /etc/confluent/docker/bash-config -export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092 +export KAFKA_ADVERTISED_LISTENERS=%s echo Starting Kafka KRaft mode sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure @@ -38,6 +38,12 @@ type KafkaContainer struct { ClusterID string } +type Listener struct { + Name string + Host string + Port string +} + // Deprecated: use Run instead // RunContainer creates an instance of the Kafka container type func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error) { @@ -70,24 +76,6 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom Entrypoint: []string{"sh"}, // this CMD will wait for the starter script to be copied into the container and then execute it Cmd: []string{"-c", "while [ ! -f " + starterScript + " ]; do sleep 0.1; done; bash " + starterScript}, - LifecycleHooks: []testcontainers.ContainerLifecycleHooks{ - { - PostStarts: []testcontainers.ContainerHook{ - // Use a single hook to copy the starter script and wait for - // the Kafka server to be ready. This prevents the wait running - // if the starter script fails to copy. - func(ctx context.Context, c testcontainers.Container) error { - // 1. copy the starter script into the container - if err := copyStarterScript(ctx, c); err != nil { - return fmt.Errorf("copy starter script: %w", err) - } - - // 2. wait for the Kafka server to be ready - return wait.ForLog(".*Transitioning from RECOVERY to RUNNING.*").AsRegexp().WaitUntilReady(ctx, c) - }, - }, - }, - }, } genericContainerReq := testcontainers.GenericContainerRequest{ @@ -95,12 +83,37 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom Started: true, } + settings := defaultOptions(&genericContainerReq) for _, opt := range opts { + if apply, ok := opt.(Option); ok { + if err := apply(&settings); err != nil { + return nil, err + } + } if err := opt.Customize(&genericContainerReq); err != nil { return nil, err } } + genericContainerReq.ContainerRequest.LifecycleHooks = []testcontainers.ContainerLifecycleHooks{ + { + PostStarts: []testcontainers.ContainerHook{ + // Use a single hook to copy the starter script and wait for + // the Kafka server to be ready. This prevents the wait running + // if the starter script fails to copy. + func(ctx context.Context, c testcontainers.Container) error { + // 1. copy the starter script into the container + if err := copyStarterScript(ctx, c, &settings); err != nil { + return fmt.Errorf("copy starter script: %w", err) + } + + // 2. wait for the Kafka server to be ready + return wait.ForLog(".*Transitioning from RECOVERY to RUNNING.*").AsRegexp().WaitUntilReady(ctx, c) + }, + }, + }, + } + err := validateKRaftVersion(genericContainerReq.Image) if err != nil { return nil, err @@ -122,31 +135,34 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom } // copyStarterScript copies the starter script into the container. -func copyStarterScript(ctx context.Context, c testcontainers.Container) error { +func copyStarterScript(ctx context.Context, c testcontainers.Container, settings *options) error { if err := wait.ForListeningPort(publicPort). SkipInternalCheck(). WaitUntilReady(ctx, c); err != nil { return fmt.Errorf("wait for exposed port: %w", err) } - host, err := c.Host(ctx) - if err != nil { - return fmt.Errorf("host: %w", err) + if len(settings.Listeners) == 0 { + defaultInternal, err := brokerListener(ctx, c) + if err != nil { + return fmt.Errorf("default internal listener: %w", err) + } + settings.Listeners = append(settings.Listeners, defaultInternal) } - inspect, err := c.Inspect(ctx) + defaultExternal, err := plainTextListener(ctx, c) if err != nil { - return fmt.Errorf("inspect: %w", err) + return fmt.Errorf("default external listener: %w", err) } - hostname := inspect.Config.Hostname + settings.Listeners = append(settings.Listeners, defaultExternal) - port, err := c.MappedPort(ctx, publicPort) - if err != nil { - return fmt.Errorf("mapped port: %w", err) + advertised := make([]string, len(settings.Listeners)) + for i, item := range settings.Listeners { + advertised[i] = item.Name + "://" + item.Host + ":" + item.Port } - scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname) + scriptContent := fmt.Sprintf(starterScriptContent, strings.Join(advertised, ",")) if err := c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755); err != nil { return fmt.Errorf("copy to container: %w", err) @@ -155,14 +171,6 @@ func copyStarterScript(ctx context.Context, c testcontainers.Container) error { return nil } -func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption { - return func(req *testcontainers.GenericContainerRequest) error { - req.Env["CLUSTER_ID"] = clusterID - - return nil - } -} - // Brokers retrieves the broker connection strings from Kafka with only one entry, // defined by the exposed public port. func (kc *KafkaContainer) Brokers(ctx context.Context) ([]string, error) { diff --git a/modules/kafka/kafka_helpers_test.go b/modules/kafka/kafka_helpers_test.go index 6ef7deb60f..897968f0bf 100644 --- a/modules/kafka/kafka_helpers_test.go +++ b/modules/kafka/kafka_helpers_test.go @@ -9,101 +9,144 @@ import ( ) func TestConfigureQuorumVoters(t *testing.T) { - tests := []struct { - name string - req *testcontainers.GenericContainerRequest - expectedVoters string - }{ - { - name: "voters on localhost", - req: &testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Env: map[string]string{}, - }, + testConfigureControllerQuorumVotersFn := func(t *testing.T, req *testcontainers.GenericContainerRequest, expectedVoters string) { + t.Helper() + + configureControllerQuorumVoters(req) + require.Equalf(t, expectedVoters, req.Env["KAFKA_CONTROLLER_QUORUM_VOTERS"], "expected KAFKA_CONTROLLER_QUORUM_VOTERS to be %s, got %s", expectedVoters, req.Env["KAFKA_CONTROLLER_QUORUM_VOTERS"]) + } + + t.Run("voters/localhost", func(t *testing.T) { + testConfigureControllerQuorumVotersFn(t, &testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Env: map[string]string{}, }, - expectedVoters: "1@localhost:9094", - }, - { - name: "voters on first network alias of the first network", - req: &testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Env: map[string]string{}, - Networks: []string{"foo", "bar", "baaz"}, - NetworkAliases: map[string][]string{ - "foo": {"foo0", "foo1", "foo2", "foo3"}, - "bar": {"bar0", "bar1", "bar2", "bar3"}, - "baaz": {"baaz0", "baaz1", "baaz2", "baaz3"}, - }, + }, "1@localhost:9094") + }) + + t.Run("voters/first-network-alias/first-network", func(t *testing.T) { + testConfigureControllerQuorumVotersFn(t, &testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Env: map[string]string{}, + Networks: []string{"foo", "bar", "baaz"}, + NetworkAliases: map[string][]string{ + "foo": {"foo0", "foo1", "foo2", "foo3"}, + "bar": {"bar0", "bar1", "bar2", "bar3"}, + "baaz": {"baaz0", "baaz1", "baaz2", "baaz3"}, }, }, - expectedVoters: "1@foo0:9094", - }, - { - name: "voters on localhost if alias but no networks", - req: &testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - NetworkAliases: map[string][]string{ - "foo": {"foo0", "foo1", "foo2", "foo3"}, - "bar": {"bar0", "bar1", "bar2", "bar3"}, - "baaz": {"baaz0", "baaz1", "baaz2", "baaz3"}, - }, + }, "1@foo0:9094") + }) + + t.Run("voters/localhost/alias-no-networks", func(t *testing.T) { + testConfigureControllerQuorumVotersFn(t, &testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + NetworkAliases: map[string][]string{ + "foo": {"foo0", "foo1", "foo2", "foo3"}, + "bar": {"bar0", "bar1", "bar2", "bar3"}, + "baaz": {"baaz0", "baaz1", "baaz2", "baaz3"}, }, }, - expectedVoters: "1@localhost:9094", - }, - } + }, "1@localhost:9094") + }) +} - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - configureControllerQuorumVoters(test.req) +func TestValidateKRaftVersion(t *testing.T) { + t.Run("official/valid-version", func(t *testing.T) { + err := validateKRaftVersion("confluentinc/confluent-local:7.5.0") + require.NoError(t, err) + }) - require.Equalf(t, test.expectedVoters, test.req.Env["KAFKA_CONTROLLER_QUORUM_VOTERS"], "expected KAFKA_CONTROLLER_QUORUM_VOTERS to be %s, got %s", test.expectedVoters, test.req.Env["KAFKA_CONTROLLER_QUORUM_VOTERS"]) - }) - } + t.Run("official/valid-limit-version", func(t *testing.T) { + err := validateKRaftVersion("confluentinc/confluent-local:7.4.0") + require.NoError(t, err) + }) + + t.Run("official/invalid-low-version", func(t *testing.T) { + err := validateKRaftVersion("confluentinc/confluent-local:7.3.99") + require.Error(t, err) + }) + + t.Run("official/invalid-too-low-version", func(t *testing.T) { + err := validateKRaftVersion("confluentinc/confluent-local:5.0.0") + require.Error(t, err) + }) + + t.Run("unofficial/does-not-validate-KRaft-version", func(t *testing.T) { + err := validateKRaftVersion("my-kafka:1.0.0") + require.NoError(t, err) + }) } -func TestValidateKRaftVersion(t *testing.T) { - tests := []struct { - name string - image string - wantErr bool - }{ - { - name: "Official: valid version", - image: "confluentinc/confluent-local:7.5.0", - wantErr: false, - }, - { - name: "Official: valid, limit version", - image: "confluentinc/confluent-local:7.4.0", - wantErr: false, - }, - { - name: "Official: invalid, low version", - image: "confluentinc/confluent-local:7.3.99", - wantErr: true, - }, - { - name: "Official: invalid, too low version", - image: "confluentinc/confluent-local:5.0.0", - wantErr: true, - }, - { - name: "Unofficial does not validate KRaft version", - image: "my-kafka:1.0.0", - wantErr: false, - }, - } +func TestValidateListeners(t *testing.T) { + t.Run("fail/reserved-listener/port-9093", func(t *testing.T) { + err := validateListeners(Listener{ + Name: "PLAINTEXT", + Host: "kafka", + Port: "9093", + }) + require.Error(t, err) + }) - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - err := validateKRaftVersion(test.image) + t.Run("fail/reserved-listener/port-9094", func(t *testing.T) { + err := validateListeners(Listener{ + Name: "PLAINTEXT", + Host: "kafka", + Port: "9094", + }) + require.Error(t, err) + }) - if test.wantErr { - require.Errorf(t, err, "expected error, got nil") - } else { - require.NoErrorf(t, err, "expected no error, got %s", err) - } + t.Run("fail/reserved-listener/name-controller", func(t *testing.T) { + err := validateListeners(Listener{ + Name: " cOnTrOller ", + Host: "kafka", + Port: "9092", }) - } + require.Error(t, err) + }) + + t.Run("fail/reserved-listener/name-plaintext", func(t *testing.T) { + err := validateListeners(Listener{ + Name: "plaintext", + Host: "kafka", + Port: "9092", + }) + require.Error(t, err) + }) + + t.Run("fail/port-duplication", func(t *testing.T) { + err := validateListeners(Listener{ + Name: "test", + Host: "kafka", + Port: "9092", + }, Listener{ + Name: "test2", + Host: "kafka", + Port: "9092", + }) + require.Error(t, err) + }) + + t.Run("fail/name-duplication", func(t *testing.T) { + err := validateListeners(Listener{ + Name: "test", + Host: "kafka", + Port: "9092", + }, Listener{ + Name: "test", + Host: "kafka", + Port: "9095", + }) + require.Error(t, err) + }) + + t.Run("success", func(t *testing.T) { + err := validateListeners(Listener{ + Name: "test", + Host: "kafka", + Port: "9092", + }) + require.NoError(t, err) + }) } diff --git a/modules/kafka/kafka_test.go b/modules/kafka/kafka_test.go index af858f849f..a3e4b054ff 100644 --- a/modules/kafka/kafka_test.go +++ b/modules/kafka/kafka_test.go @@ -2,17 +2,22 @@ package kafka_test import ( "context" + "errors" + "io" "strings" + "sync" "testing" + "time" "github.com/IBM/sarama" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/kafka" + "github.com/testcontainers/testcontainers-go/network" ) -func TestKafka(t *testing.T) { +func TestKafka_Basic(t *testing.T) { topic := "some-topic" ctx := context.Background() @@ -74,6 +79,273 @@ func TestKafka_invalidVersion(t *testing.T) { require.Error(t, err) } +func TestKafka_networkConnectivity(t *testing.T) { + ctx := context.Background() + var err error + + const ( + // config + topic_in = "topic_in" + topic_out = "topic_out" + + address = "kafka:9092" + + // test data + key = "wow" + text_msg = "test-input-external" + ) + + Network, err := network.New(ctx) + require.NoError(t, err) + + // kafkaWithListener { + kafkaContainer, err := kafka.Run(ctx, + "confluentinc/confluent-local:7.6.1", + kafka.WithClusterID("test-cluster"), + network.WithNetwork([]string{"kafka"}, Network), + kafka.WithListener(kafka.Listener{ + Name: "BROKER", + Host: "kafka", + Port: "9092", + }), + ) + // } + testcontainers.CleanupContainer(t, kafkaContainer) + require.NoError(t, err) + + kcat, err := runKcatContainer(ctx, Network.Name, "/tmp/msgs.txt") + testcontainers.CleanupContainer(t, kcat) + require.NoError(t, err) + + // 4. Copy message to kcat + err = kcat.SaveFile(ctx, "Message produced by kcat") + require.NoError(t, err) + + brokers, err := kafkaContainer.Brokers(context.TODO()) + require.NoError(t, err) + + // err = createTopics(brokers, []string{topic_in, topic_out}) + err = kcat.CreateTopic(ctx, address, topic_in) + require.NoError(t, err) + + err = kcat.CreateTopic(ctx, address, topic_out) + require.NoError(t, err) + + // perform assertions + + // set config to true because successfully delivered messages will be returned on the Successes channel + config := sarama.NewConfig() + config.Producer.Return.Successes = true + config.Consumer.MaxWaitTime = 2 * time.Second + + producer, err := sarama.NewSyncProducer(brokers, config) + require.NoError(t, err) + + // Act + + // External write + _, _, err = producer.SendMessage(&sarama.ProducerMessage{ + Topic: topic_in, + Key: sarama.StringEncoder(key), + Value: sarama.StringEncoder(text_msg), + }) + require.NoError(t, err) + + // Internal read + _, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", address, "-C", "-t", topic_in, "-c", "1"}) + require.NoError(t, err) + + out, err := io.ReadAll(stdout) + require.NoError(t, err) + require.Contains(t, string(out), text_msg) + + // Internal write + tempfile := "/tmp/msgs.txt" + + err = kcat.CopyToContainer(ctx, []byte(out), tempfile, 700) + require.NoError(t, err) + + _, _, err = kcat.Exec(ctx, []string{"kcat", "-b", address, "-t", topic_out, "-P", "-l", tempfile}) + require.NoError(t, err) + + // External read + consumer := NewTestConsumer(t) + + client, err := sarama.NewConsumerGroup(brokers, "groupName", config) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + + sCtx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + go func() { + defer wg.Done() + for { + // `Consume` should be called inside an infinite loop, when a + // server-side rebalance happens, the consumer session will need to be + // recreated to get the new claims + if err := client.Consume(sCtx, []string{topic_out}, &consumer); err != nil { + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + require.NoError(t, err) + } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + return + } + consumer.ready = make(chan bool) + } + }() + + <-consumer.ready // Await till the consumer has been set up + t.Log("Sarama consumer up and running!...") + + cancel() + wg.Wait() + err = client.Close() + require.NoError(t, err) + + msgs := consumer.messages + require.Len(t, msgs, 1) + + require.Contains(t, string(msgs[0].Value), text_msg) + require.Contains(t, string(msgs[0].Key), key) +} + +func TestKafka_withListener(t *testing.T) { + ctx := context.Background() + + // 1. Create network + rpNetwork, err := network.New(ctx) + require.NoError(t, err) + + t.Cleanup(func() { + err := rpNetwork.Remove(ctx) + require.NoError(t, err) + }) + + // 2. Start Kafka ctr + // withListenerRP { + ctr, err := kafka.Run(ctx, + "confluentinc/confluent-local:7.6.1", + network.WithNetwork([]string{"kafka"}, rpNetwork), + kafka.WithListener(kafka.Listener{ + Name: "BROKER", + Host: "kafka", + Port: "9092", + }), + ) + // } + testcontainers.CleanupContainer(t, ctr) + require.NoError(t, err) + + // 3. Start KCat container + // withListenerKcat { + kcat, err := runKcatContainer(ctx, rpNetwork.Name, "/tmp/msgs.txt") + // } + testcontainers.CleanupContainer(t, kcat) + require.NoError(t, err) + + // 4. Copy message to kcat + err = kcat.SaveFile(ctx, "Message produced by kcat") + require.NoError(t, err) + + // 5. Produce message to Kafka + // withListenerExec { + err = kcat.ProduceMessageFromFile(ctx, "kafka:9092", "msgs") + // } + + require.NoError(t, err) + + // 6. Consume message from Kafka + // 7. Read Message from stdout + out, err := kcat.ConsumeMessage(ctx, "kafka:9092", "msgs") + require.NoError(t, err) + require.Contains(t, string(out), "Message produced by kcat") +} + +func TestKafka_restProxyService(t *testing.T) { + // TODO: test kafka rest proxy service +} + +func TestKafka_listenersValidation(t *testing.T) { + t.Run("reserved-listener/port-9093", func(t *testing.T) { + runWithError(t, kafka.Listener{ + Name: "BROKER", + Host: "kafka", + Port: "9093", + }) + }) + + t.Run("reserved-listener/port-9094", func(t *testing.T) { + runWithError(t, kafka.Listener{ + Name: "BROKER", + Host: "kafka", + Port: "9094", + }) + }) + + t.Run("reserved-listener/controller-duplicated", func(t *testing.T) { + runWithError(t, kafka.Listener{ + Name: " cOnTrOller ", + Host: "kafka", + Port: "9092", + }) + }) + + t.Run("reserved-listener/plaintext-duplicated", func(t *testing.T) { + runWithError(t, kafka.Listener{ + Name: "plaintext", + Host: "kafka", + Port: "9092", + }) + }) + + t.Run("duplicated-ports", func(t *testing.T) { + runWithError(t, kafka.Listener{ + Name: "test", + Host: "kafka", + Port: "9092", + }, + kafka.Listener{ + Name: "test2", + Host: "kafka", + Port: "9092", + }, + ) + }) + + t.Run("duplicated-names", func(t *testing.T) { + runWithError(t, kafka.Listener{ + Name: "test", + Host: "kafka", + Port: "9092", + }, + kafka.Listener{ + Name: "test", + Host: "kafka", + Port: "9095", + }, + ) + }) +} + +// runWithError runs the Kafka container with the provided listeners and expects an error +func runWithError(t *testing.T, listeners ...kafka.Listener) { + t.Helper() + + c, err := kafka.Run(context.Background(), + "confluentinc/confluent-local:7.6.1", + kafka.WithClusterID("test-cluster"), + kafka.WithListener(listeners...), + ) + require.Error(t, err) + require.Nil(t, c) +} + // assertAdvertisedListeners checks that the advertised listeners are set correctly: // - The BROKER:// protocol is using the hostname of the Kafka container func assertAdvertisedListeners(t *testing.T, container testcontainers.Container) { diff --git a/modules/kafka/kcat_test.go b/modules/kafka/kcat_test.go new file mode 100644 index 0000000000..08f3afd3b5 --- /dev/null +++ b/modules/kafka/kcat_test.go @@ -0,0 +1,73 @@ +package kafka_test + +import ( + "context" + "fmt" + "io" + + "github.com/testcontainers/testcontainers-go" +) + +type KcatContainer struct { + testcontainers.Container + FilePath string +} + +func runKcatContainer(ctx context.Context, network, filepath string) (*KcatContainer, error) { + ctr, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kcat:7.4.1", + Networks: []string{ + network, + }, + Entrypoint: []string{ + "sh", + }, + Cmd: []string{ + "-c", + "tail -f /dev/null", + }, + }, + Started: true, + }) + + var c *KcatContainer + if ctr != nil { + c = &KcatContainer{Container: ctr, FilePath: filepath} + } + if err != nil { + return c, fmt.Errorf("generic container: %w", err) + } + + return c, nil +} + +func (kcat *KcatContainer) SaveFile(ctx context.Context, data string) error { + return kcat.Container.CopyToContainer(ctx, []byte(data), kcat.FilePath, 700) +} + +func (kcat *KcatContainer) ProduceMessageFromFile(ctx context.Context, broker, topic string) error { + cmd := []string{"kcat", "-b", broker, "-t", topic, "-P", "-l", kcat.FilePath} + _, _, err := kcat.Container.Exec(ctx, cmd) + + return err +} + +func (kcat *KcatContainer) CreateTopic(ctx context.Context, broker, topic string) error { + cmd := []string{"kcat", "-b", broker, "-C", "-t", topic} + _, _, err := kcat.Container.Exec(ctx, cmd) + + return err +} + +func (kcat *KcatContainer) ConsumeMessage(ctx context.Context, broker, topic string) (string, error) { + cmd := []string{"kcat", "-b", broker, "-C", "-t", topic, "-c1"} + _, stdout, err := kcat.Container.Exec(ctx, cmd) + if err != nil { + return "", err + } + + out, err := io.ReadAll(stdout) + + return string(out), err +} diff --git a/modules/kafka/options.go b/modules/kafka/options.go new file mode 100644 index 0000000000..558bbe3cc9 --- /dev/null +++ b/modules/kafka/options.go @@ -0,0 +1,165 @@ +package kafka + +import ( + "context" + "fmt" + "strings" + + "github.com/testcontainers/testcontainers-go" +) + +type options struct { + // Listeners is a list of custom listeners that can be provided to access the + // containers form within docker networks + Listeners []Listener + // req is the container request that will be used to create the container. + // It's needed to apply the listeners to the container. + req *testcontainers.GenericContainerRequest +} + +func defaultOptions(req *testcontainers.GenericContainerRequest) options { + return options{ + Listeners: make([]Listener, 0), + req: req, + } +} + +// Compiler check to ensure that Option implements the testcontainers.ContainerCustomizer interface. +var _ testcontainers.ContainerCustomizer = (*Option)(nil) + +// Option is an option for the Kafka container. +type Option func(*options) error + +// Customize is a NOOP. It's defined to satisfy the testcontainers.ContainerCustomizer interface. +func (o Option) Customize(*testcontainers.GenericContainerRequest) error { + // NOOP to satisfy interface. + return nil +} + +// WithClusterID sets the cluster ID for the Kafka container. +func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption { + return func(req *testcontainers.GenericContainerRequest) error { + req.Env["CLUSTER_ID"] = clusterID + return nil + } +} + +// WithListener adds custom listeners to the Kafka containers. Each listener +// will be aliases to all networks, so they can be accessed from within docker +// networks. At least one network must be attached to the container, if not an +// error will be thrown when starting the container. +// This options sanitizes the listener names and ports, so they are in the +// correct format: name is uppercase and trimmed, and port is trimmed. +func WithListener(listeners ...Listener) Option { + return func(o *options) error { + if err := validateListeners(listeners...); err != nil { + return fmt.Errorf("validate listeners: %w", err) + } + + applyListenersToEnv(o.req, listeners...) + + o.Listeners = append(o.Listeners, listeners...) + + return nil + } +} + +func applyListenersToEnv(req *testcontainers.GenericContainerRequest, listeners ...Listener) { + if len(listeners) == 0 { + return + } + + req.Env["KAFKA_LISTENERS"] = "CONTROLLER://0.0.0.0:9094, PLAINTEXT://0.0.0.0:9093" + req.Env["KAFKA_REST_BOOTSTRAP_SERVERS"] = "CONTROLLER://0.0.0.0:9094, PLAINTEXT://0.0.0.0:9093" + req.Env["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] = "CONTROLLER:PLAINTEXT, PLAINTEXT:PLAINTEXT" + + // expect first listener has common network between kafka instances + req.Env["KAFKA_INTER_BROKER_LISTENER_NAME"] = listeners[0].Name + + // expect small number of listeners, so joins is okay + for _, item := range listeners { + req.Env["KAFKA_LISTENERS"] = strings.Join( + []string{ + req.Env["KAFKA_LISTENERS"], + item.Name + "://0.0.0.0:" + item.Port, + }, + ",", + ) + + req.Env["KAFKA_REST_BOOTSTRAP_SERVERS"] = req.Env["KAFKA_LISTENERS"] + + req.Env["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] = strings.Join( + []string{ + req.Env["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"], + item.Name + ":" + "PLAINTEXT", + }, + ",", + ) + } +} + +func validateListeners(listeners ...Listener) error { + // Trim + for i := 0; i < len(listeners); i++ { + listeners[i].Name = strings.ToUpper(strings.Trim(listeners[i].Name, " ")) + listeners[i].Host = strings.Trim(listeners[i].Host, " ") + listeners[i].Port = strings.Trim(listeners[i].Port, " ") + } + + // Validate + ports := make(map[string]struct{}, len(listeners)+2) + names := make(map[string]struct{}, len(listeners)+2) + + // check for default listeners + ports["9094"] = struct{}{} + ports["9093"] = struct{}{} + + // check for default listeners + names["CONTROLLER"] = struct{}{} + names["PLAINTEXT"] = struct{}{} + + for _, item := range listeners { + if _, exists := names[item.Name]; exists { + return fmt.Errorf("duplicate of listener name: %s", item.Name) + } + names[item.Name] = struct{}{} + + if _, exists := ports[item.Port]; exists { + return fmt.Errorf("duplicate of listener port: %s", item.Port) + } + ports[item.Port] = struct{}{} + } + + return nil +} + +func plainTextListener(ctx context.Context, c testcontainers.Container) (Listener, error) { + host, err := c.Host(ctx) + if err != nil { + return Listener{}, err + } + + port, err := c.MappedPort(ctx, publicPort) + if err != nil { + return Listener{}, err + } + + return Listener{ + Name: "PLAINTEXT", + Host: host, + Port: port.Port(), + }, nil +} + +func brokerListener(ctx context.Context, c testcontainers.Container) (Listener, error) { + inspect, err := c.Inspect(ctx) + if err != nil { + return Listener{}, fmt.Errorf("inspect: %w", err) + } + + return Listener{ + Name: "BROKER", + Host: inspect.Config.Hostname, + Port: "9092", + }, nil +} diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index 112c83f882..c295357dae 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -525,7 +525,8 @@ func TestRedpandaListener_NoNetwork(t *testing.T) { func TestRedpandaBootstrapConfig(t *testing.T) { ctx := context.Background() - container, err := redpanda.RunContainer(ctx, + container, err := redpanda.Run(ctx, + "docker.redpanda.com/redpandadata/redpanda:v23.3.3", redpanda.WithEnableWasmTransform(), // These configs would require a restart if applied to a live Redpanda instance redpanda.WithBootstrapConfig("data_transforms_per_core_memory_reservation", 33554432),