Skip to content

Commit

Permalink
chore: use kcat container function
Browse files Browse the repository at this point in the history
  • Loading branch information
mdelapenya committed Nov 21, 2024
1 parent fdbabd2 commit 3429d0b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 52 deletions.
54 changes: 9 additions & 45 deletions modules/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,38 +115,21 @@ func TestKafka_networkConnectivity(t *testing.T) {
// }
require.NoError(t, err, "failed to start kafka container")

kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "confluentinc/cp-kcat:7.4.1",
Networks: []string{
Network.Name,
},
Entrypoint: []string{
"sh",
},
Cmd: []string{
"-c",
"tail -f /dev/null",
},
},
Started: true,
})
// }

kcat, err := runKcatContainer(ctx, Network.Name, "/tmp/msgs.txt")
require.NoError(t, err, "failed to start kcat")

// 4. Copy message to kcat
err = kcat.CopyToContainer(ctx, []byte("Message produced by kcat"), "/tmp/msgs.txt", 700)
err = kcat.SaveFile(ctx, "Message produced by kcat")
require.NoError(t, err)

brokers, err := kafkaContainer.Brokers(context.TODO())
require.NoError(t, err, "failed to get brokers")

// err = createTopics(brokers, []string{topic_in, topic_out})
_, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", address, "-C", "-t", topic_in})
err = kcat.CreateTopic(ctx, address, topic_in)
require.NoError(t, err, "create topic topic_in")

_, stdout, err = kcat.Exec(ctx, []string{"kcat", "-b", address, "-C", "-t", topic_out})
err = kcat.CreateTopic(ctx, address, topic_out)
require.NoError(t, err, "create topic topic_out")

// perform assertions
Expand All @@ -170,7 +153,7 @@ func TestKafka_networkConnectivity(t *testing.T) {
require.NoError(t, err, "send message")

// Internal read
_, stdout, err = kcat.Exec(ctx, []string{"kcat", "-b", address, "-C", "-t", topic_in, "-c", "1"})
_, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", address, "-C", "-t", topic_in, "-c", "1"})
require.NoError(t, err)

out, err := io.ReadAll(stdout)
Expand Down Expand Up @@ -241,45 +224,26 @@ func TestKafka_withListener(t *testing.T) {

// 3. Start KCat container
// withListenerKcat {
kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "confluentinc/cp-kcat:7.4.1",
Networks: []string{
rpNetwork.Name,
},
Entrypoint: []string{
"sh",
},
Cmd: []string{
"-c",
"tail -f /dev/null",
},
},
Started: true,
})
kcat, err := runKcatContainer(ctx, rpNetwork.Name, "/tmp/msgs.txt")
// }

require.NoError(t, err)

// 4. Copy message to kcat
err = kcat.CopyToContainer(ctx, []byte("Message produced by kcat"), "/tmp/msgs.txt", 700)
err = kcat.SaveFile(ctx, "Message produced by kcat")
require.NoError(t, err)

// 5. Produce message to Kafka
// withListenerExec {
_, _, err = kcat.Exec(ctx, []string{"kcat", "-b", "kafka:9092", "-t", "msgs", "-P", "-l", "/tmp/msgs.txt"})
err = kcat.ProduceMessageFromFile(ctx, "kafka:9092", "msgs")
// }

require.NoError(t, err)

// 6. Consume message from Kafka
_, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", "kafka:9092", "-C", "-t", "msgs", "-c", "1"})
require.NoError(t, err)

// 7. Read Message from stdout
out, err := io.ReadAll(stdout)
out, err := kcat.ConsumeMessage(ctx, "kafka:9092", "msgs")
require.NoError(t, err)

require.Contains(t, string(out), "Message produced by kcat")

t.Cleanup(func() {
Expand Down
24 changes: 17 additions & 7 deletions modules/kafka/kcat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
)

type KcatContainer struct {
Container testcontainers.Container
FilePath string
testcontainers.Container
FilePath string
}

func createKCat(ctx context.Context, network, filepath string) (KcatContainer, error) {
kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
func runKcatContainer(ctx context.Context, network, filepath string) (*KcatContainer, error) {
ctr, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "confluentinc/cp-kcat:7.4.1",
Networks: []string{
Expand All @@ -31,11 +31,15 @@ func createKCat(ctx context.Context, network, filepath string) (KcatContainer, e
Started: true,
})

var c *KcatContainer
if ctr != nil {
c = &KcatContainer{Container: ctr, FilePath: filepath}
}
if err != nil {
return KcatContainer{}, fmt.Errorf("create generic container: %w", err)
return c, fmt.Errorf("generic container: %w", err)
}

return KcatContainer{Container: kcat, FilePath: filepath}, nil
return c, nil
}

func (kcat *KcatContainer) SaveFile(ctx context.Context, data string) error {
Expand All @@ -49,10 +53,16 @@ func (kcat *KcatContainer) ProduceMessageFromFile(ctx context.Context, broker, t
return err
}

func (kcat *KcatContainer) CreateTopic(ctx context.Context, broker, topic string) error {
cmd := []string{"kcat", "-b", broker, "-C", "-t", topic}
_, _, err := kcat.Container.Exec(ctx, cmd)

return err
}

func (kcat *KcatContainer) ConsumeMessage(ctx context.Context, broker, topic string) (string, error) {
cmd := []string{"kcat", "-b", broker, "-C", "-t", topic, "-c1"}
_, stdout, err := kcat.Container.Exec(ctx, cmd)

if err != nil {
return "", err
}
Expand Down

0 comments on commit 3429d0b

Please sign in to comment.