Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka): Fix internal docker connection #2894

Open
wants to merge 59 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
320e54d
fixed kafka internal docker connection
catinapoke Apr 19, 2024
9b6537d
added new option for kafka listeners
catinapoke May 10, 2024
15f4ca3
Merge remote-tracking branch 'original/main'
catinapoke May 10, 2024
c7552f4
fixed kafka docs with new options
catinapoke May 13, 2024
8904938
Update docs/modules/kafka.md
catinapoke May 14, 2024
7b38c28
Apply suggestions from code review for docs
catinapoke May 14, 2024
98793d7
added new test, fixed docs from code review
catinapoke May 16, 2024
62a68b5
Merge branch 'main' of https://github.com/catinapoke/testcontainers-go
catinapoke May 16, 2024
a7c4a9f
Merge remote-tracking branch 'Original/main'
catinapoke May 16, 2024
bfa9191
renamed docker folder to testdata and fixed test name
catinapoke May 16, 2024
f1b0d48
Merge remote-tracking branch 'Original/main'
catinapoke May 23, 2024
128d31a
added test for input listeners validation and refactor from code review
catinapoke May 23, 2024
ead068a
added unit test for trimValidateListeners
catinapoke May 28, 2024
eb71110
Merge remote-tracking branch 'Original/main'
catinapoke May 28, 2024
a4d91e2
Merge branch 'main' into main
mdelapenya Jun 7, 2024
dd8fcec
Merge remote-tracking branch 'upstream/main'
catinapoke Aug 25, 2024
5c147d4
go mod update
catinapoke Aug 25, 2024
b31e1af
todo for PR
catinapoke Aug 25, 2024
9f23a4d
updated copyStarterScript for upstream
catinapoke Aug 25, 2024
e63a604
fix: typo
mdelapenya Sep 2, 2024
f9cb155
fix: use Run method
mdelapenya Sep 2, 2024
8394529
chore: use PLAINTEXT and BROKER
mdelapenya Sep 2, 2024
31950f4
fix: handle error in tests
mdelapenya Sep 2, 2024
3aa2cbf
chore: make lint
mdelapenya Sep 2, 2024
d61004f
chore: use non deprecated APIs
mdelapenya Sep 2, 2024
df53491
chore: handle errors in testss
mdelapenya Sep 2, 2024
bf02852
fix: close sarama client
mdelapenya Sep 2, 2024
152ef0c
fix: validation in test
mdelapenya Sep 2, 2024
efa0f7d
chore: refactor test
mdelapenya Sep 2, 2024
d669bfb
chore: add test using kcat
mdelapenya Sep 2, 2024
4c44b97
Merge branch 'main' into catinapoke/main
mdelapenya Sep 9, 2024
ba701e1
Merge pull request #1 from mdelapenya/catinapoke/main
catinapoke Sep 17, 2024
b51be12
fixed basic kafka test but network one is broken
catinapoke Sep 17, 2024
c8c47f4
not working test with kcat
catinapoke Sep 23, 2024
cc60b86
Merge branch 'main' into catinapoke/main
mdelapenya Nov 21, 2024
84cd919
fix: use PLAINTEXT in test
mdelapenya Nov 21, 2024
8a65a7b
chor: rename container variable
mdelapenya Nov 21, 2024
fdbabd2
chore: refactor unit tests for the new test function pattern
mdelapenya Nov 21, 2024
3429d0b
chore: use kcat container function
mdelapenya Nov 21, 2024
01ba4e9
chore: use require and new CleanupContainer functions
mdelapenya Nov 21, 2024
63ee6cf
chore: use test function pattern even more
mdelapenya Nov 21, 2024
8660157
chore: exclude lint error
mdelapenya Nov 21, 2024
3c1e739
chore: remove unused code
mdelapenya Nov 21, 2024
4f152f2
chore: rename to Listener
mdelapenya Nov 25, 2024
76cf56d
chore: trim listeners in the option
mdelapenya Nov 25, 2024
ec67726
chore: use empty struct to not consume memory
mdelapenya Nov 25, 2024
da0deb8
chore: format errors
mdelapenya Nov 25, 2024
f2aaa60
chore: simplify advertised slice
mdelapenya Nov 25, 2024
f82cfc9
chore: simplify envs for listeners
mdelapenya Nov 25, 2024
2b00774
chore: make WithListeners self-contained
mdelapenya Nov 25, 2024
50f9bcc
fix: proper advertised slice
mdelapenya Nov 25, 2024
7772f4f
chore: simplify test helpers
mdelapenya Nov 25, 2024
45f6660
chore: use require
mdelapenya Nov 25, 2024
aa341f8
chore: extract inline helper to a function
mdelapenya Nov 25, 2024
278ec35
chore: refactor options to support writing to the container request
mdelapenya Nov 25, 2024
fc81847
docs: refinement
mdelapenya Nov 25, 2024
4ab2546
docs: document withClusterID function
mdelapenya Nov 25, 2024
9adbd97
chore: remove unused
mdelapenya Nov 25, 2024
67c3480
WIP
mdelapenya Nov 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,44 @@ The environment variables that are already set by default are:

{% include "../features/common_functional_options.md" %}


#### ClusterId

You can set up cluster id by using `WithClusterID` option.

```
KafkaContainer, err = kafka.Run(ctx,
"confluentinc/confluent-local:7.6.1",
kafka.WithClusterID("test-cluster"))
```

#### Listeners

If you need to connect new listeners, you can use `WithListener(listeners ...Listener)`.
This option controls the following environment variables for the Kafka container:
- `KAFKA_LISTENERS`
- `KAFKA_REST_BOOTSTRAP_SERVERS`
- `KAFKA_LISTENER_SECURITY_PROTOCOL_MAP`
- `KAFKA_INTER_BROKER_LISTENER_NAME`
- `KAFKA_ADVERTISED_LISTENERS`

Example:

<!--codeinclude-->
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:kafkaWithListener
<!--/codeinclude-->

In the above code, we created a network for our container and attached kafka to it, so they can communicate. Then we marked port 9092 for our internal usage.

The first listener in the slice will be written in the env parameter `KAFKA_INTER_BROKER_LISTENER_NAME`

Every listener's name will be converted in upper case. Every name and port should be unique and will be checked in a validation step.

If you are not using this option or the listeners list is empty, there will be 2 default listeners with the following addresses and ports:

External - Host():MappedPort()
Internal - Host():9092

### Container Methods

The Kafka container exposes the following methods:
Expand Down
58 changes: 58 additions & 0 deletions modules/kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,61 @@ func (k *TestKafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, cl
}
}
}

// Consumer represents a Sarama consumer group consumer
type TestConsumer struct {
t *testing.T
ready chan bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: its idiomatic to use a struct{} for this pattern

Suggested change
ready chan bool
ready chan struct{}

messages []*sarama.ConsumerMessage
}

func NewTestConsumer(t *testing.T) TestConsumer {
t.Helper()

return TestConsumer{
t: t,
ready: make(chan bool),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ready: make(chan bool),
ready: make(chan struct{}),

}
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *TestConsumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(c.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *TestConsumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *TestConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
for {
select {
case message, ok := <-claim.Messages():
if !ok {
c.t.Log("message channel was closed")
return nil
}
c.t.Logf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")

// Store the message to be consumed later
c.messages = append(c.messages, message)

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
11 changes: 5 additions & 6 deletions modules/kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/testcontainers/testcontainers-go/modules/kafka
go 1.22

require (
github.com/IBM/sarama v1.42.1
github.com/IBM/sarama v1.43.3
github.com/docker/go-connections v0.5.0
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.34.0
Expand All @@ -23,7 +23,7 @@ require (
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/docker v27.1.1+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand All @@ -41,7 +41,7 @@ require (
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -53,7 +53,7 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
Expand All @@ -69,8 +69,7 @@ require (
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.26.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
Expand Down
24 changes: 12 additions & 12 deletions modules/kafka/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ=
github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA=
github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand All @@ -32,8 +32,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0=
github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
Expand Down Expand Up @@ -85,8 +85,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -111,8 +111,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -190,14 +190,14 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
84 changes: 46 additions & 38 deletions modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
// starterScript {
starterScriptContent = `#!/bin/bash
source /etc/confluent/docker/bash-config
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092
export KAFKA_ADVERTISED_LISTENERS=%s
echo Starting Kafka KRaft mode
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
Expand All @@ -38,6 +38,12 @@ type KafkaContainer struct {
ClusterID string
}

type Listener struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Add doc comment.

Name string
Host string
Port string
}

// Deprecated: use Run instead
// RunContainer creates an instance of the Kafka container type
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error) {
Expand Down Expand Up @@ -70,37 +76,44 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
Entrypoint: []string{"sh"},
// this CMD will wait for the starter script to be copied into the container and then execute it
Cmd: []string{"-c", "while [ ! -f " + starterScript + " ]; do sleep 0.1; done; bash " + starterScript},
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{
{
PostStarts: []testcontainers.ContainerHook{
// Use a single hook to copy the starter script and wait for
// the Kafka server to be ready. This prevents the wait running
// if the starter script fails to copy.
func(ctx context.Context, c testcontainers.Container) error {
// 1. copy the starter script into the container
if err := copyStarterScript(ctx, c); err != nil {
return fmt.Errorf("copy starter script: %w", err)
}

// 2. wait for the Kafka server to be ready
return wait.ForLog(".*Transitioning from RECOVERY to RUNNING.*").AsRegexp().WaitUntilReady(ctx, c)
},
},
},
},
}

genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

settings := defaultOptions(&genericContainerReq)
for _, opt := range opts {
if apply, ok := opt.(Option); ok {
if err := apply(&settings); err != nil {
return nil, err
}
}
if err := opt.Customize(&genericContainerReq); err != nil {
return nil, err
}
}

genericContainerReq.ContainerRequest.LifecycleHooks = []testcontainers.ContainerLifecycleHooks{
{
PostStarts: []testcontainers.ContainerHook{
// Use a single hook to copy the starter script and wait for
// the Kafka server to be ready. This prevents the wait running
// if the starter script fails to copy.
func(ctx context.Context, c testcontainers.Container) error {
// 1. copy the starter script into the container
if err := copyStarterScript(ctx, c, &settings); err != nil {
return fmt.Errorf("copy starter script: %w", err)
}

// 2. wait for the Kafka server to be ready
return wait.ForLog(".*Transitioning from RECOVERY to RUNNING.*").AsRegexp().WaitUntilReady(ctx, c)
},
},
},
}

err := validateKRaftVersion(genericContainerReq.Image)
if err != nil {
return nil, err
Expand All @@ -122,31 +135,34 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
}

// copyStarterScript copies the starter script into the container.
func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
func copyStarterScript(ctx context.Context, c testcontainers.Container, settings *options) error {
if err := wait.ForListeningPort(publicPort).
SkipInternalCheck().
WaitUntilReady(ctx, c); err != nil {
return fmt.Errorf("wait for exposed port: %w", err)
}

host, err := c.Host(ctx)
if err != nil {
return fmt.Errorf("host: %w", err)
if len(settings.Listeners) == 0 {
defaultInternal, err := brokerListener(ctx, c)
if err != nil {
return fmt.Errorf("default internal listener: %w", err)
}
settings.Listeners = append(settings.Listeners, defaultInternal)
}

inspect, err := c.Inspect(ctx)
defaultExternal, err := plainTextListener(ctx, c)
if err != nil {
return fmt.Errorf("inspect: %w", err)
return fmt.Errorf("default external listener: %w", err)
}

hostname := inspect.Config.Hostname
settings.Listeners = append(settings.Listeners, defaultExternal)

port, err := c.MappedPort(ctx, publicPort)
if err != nil {
return fmt.Errorf("mapped port: %w", err)
advertised := make([]string, len(settings.Listeners))
for i, item := range settings.Listeners {
advertised[i] = item.Name + "://" + item.Host + ":" + item.Port
}

scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname)
scriptContent := fmt.Sprintf(starterScriptContent, strings.Join(advertised, ","))

if err := c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755); err != nil {
return fmt.Errorf("copy to container: %w", err)
Expand All @@ -155,14 +171,6 @@ func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
return nil
}

func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
req.Env["CLUSTER_ID"] = clusterID

return nil
}
}

// Brokers retrieves the broker connection strings from Kafka with only one entry,
// defined by the exposed public port.
func (kc *KafkaContainer) Brokers(ctx context.Context) ([]string, error) {
Expand Down
Loading
Loading