From 3429d0b99b5e773c63d9059be20bec85c0087b99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Thu, 21 Nov 2024 15:07:32 +0100 Subject: [PATCH] chore: use kcat container function --- modules/kafka/kafka_test.go | 54 +++++++------------------------------ modules/kafka/kcat_test.go | 24 ++++++++++++----- 2 files changed, 26 insertions(+), 52 deletions(-) diff --git a/modules/kafka/kafka_test.go b/modules/kafka/kafka_test.go index e6312dbb5e..f58a58ba5d 100644 --- a/modules/kafka/kafka_test.go +++ b/modules/kafka/kafka_test.go @@ -115,38 +115,21 @@ func TestKafka_networkConnectivity(t *testing.T) { // } require.NoError(t, err, "failed to start kafka container") - kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "confluentinc/cp-kcat:7.4.1", - Networks: []string{ - Network.Name, - }, - Entrypoint: []string{ - "sh", - }, - Cmd: []string{ - "-c", - "tail -f /dev/null", - }, - }, - Started: true, - }) - // } - + kcat, err := runKcatContainer(ctx, Network.Name, "/tmp/msgs.txt") require.NoError(t, err, "failed to start kcat") // 4. Copy message to kcat - err = kcat.CopyToContainer(ctx, []byte("Message produced by kcat"), "/tmp/msgs.txt", 700) + err = kcat.SaveFile(ctx, "Message produced by kcat") require.NoError(t, err) brokers, err := kafkaContainer.Brokers(context.TODO()) require.NoError(t, err, "failed to get brokers") // err = createTopics(brokers, []string{topic_in, topic_out}) - _, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", address, "-C", "-t", topic_in}) + err = kcat.CreateTopic(ctx, address, topic_in) require.NoError(t, err, "create topic topic_in") - _, stdout, err = kcat.Exec(ctx, []string{"kcat", "-b", address, "-C", "-t", topic_out}) + err = kcat.CreateTopic(ctx, address, topic_out) require.NoError(t, err, "create topic topic_out") // perform assertions @@ -170,7 +153,7 @@ func TestKafka_networkConnectivity(t *testing.T) { require.NoError(t, err, "send message") // Internal read - _, stdout, err = kcat.Exec(ctx, []string{"kcat", "-b", address, "-C", "-t", topic_in, "-c", "1"}) + _, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", address, "-C", "-t", topic_in, "-c", "1"}) require.NoError(t, err) out, err := io.ReadAll(stdout) @@ -241,45 +224,26 @@ func TestKafka_withListener(t *testing.T) { // 3. Start KCat container // withListenerKcat { - kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "confluentinc/cp-kcat:7.4.1", - Networks: []string{ - rpNetwork.Name, - }, - Entrypoint: []string{ - "sh", - }, - Cmd: []string{ - "-c", - "tail -f /dev/null", - }, - }, - Started: true, - }) + kcat, err := runKcatContainer(ctx, rpNetwork.Name, "/tmp/msgs.txt") // } require.NoError(t, err) // 4. Copy message to kcat - err = kcat.CopyToContainer(ctx, []byte("Message produced by kcat"), "/tmp/msgs.txt", 700) + err = kcat.SaveFile(ctx, "Message produced by kcat") require.NoError(t, err) // 5. Produce message to Kafka // withListenerExec { - _, _, err = kcat.Exec(ctx, []string{"kcat", "-b", "kafka:9092", "-t", "msgs", "-P", "-l", "/tmp/msgs.txt"}) + err = kcat.ProduceMessageFromFile(ctx, "kafka:9092", "msgs") // } require.NoError(t, err) // 6. Consume message from Kafka - _, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", "kafka:9092", "-C", "-t", "msgs", "-c", "1"}) - require.NoError(t, err) - // 7. Read Message from stdout - out, err := io.ReadAll(stdout) + out, err := kcat.ConsumeMessage(ctx, "kafka:9092", "msgs") require.NoError(t, err) - require.Contains(t, string(out), "Message produced by kcat") t.Cleanup(func() { diff --git a/modules/kafka/kcat_test.go b/modules/kafka/kcat_test.go index 5c1d56e83e..08f3afd3b5 100644 --- a/modules/kafka/kcat_test.go +++ b/modules/kafka/kcat_test.go @@ -9,12 +9,12 @@ import ( ) type KcatContainer struct { - Container testcontainers.Container - FilePath string + testcontainers.Container + FilePath string } -func createKCat(ctx context.Context, network, filepath string) (KcatContainer, error) { - kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ +func runKcatContainer(ctx context.Context, network, filepath string) (*KcatContainer, error) { + ctr, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ Image: "confluentinc/cp-kcat:7.4.1", Networks: []string{ @@ -31,11 +31,15 @@ func createKCat(ctx context.Context, network, filepath string) (KcatContainer, e Started: true, }) + var c *KcatContainer + if ctr != nil { + c = &KcatContainer{Container: ctr, FilePath: filepath} + } if err != nil { - return KcatContainer{}, fmt.Errorf("create generic container: %w", err) + return c, fmt.Errorf("generic container: %w", err) } - return KcatContainer{Container: kcat, FilePath: filepath}, nil + return c, nil } func (kcat *KcatContainer) SaveFile(ctx context.Context, data string) error { @@ -49,10 +53,16 @@ func (kcat *KcatContainer) ProduceMessageFromFile(ctx context.Context, broker, t return err } +func (kcat *KcatContainer) CreateTopic(ctx context.Context, broker, topic string) error { + cmd := []string{"kcat", "-b", broker, "-C", "-t", topic} + _, _, err := kcat.Container.Exec(ctx, cmd) + + return err +} + func (kcat *KcatContainer) ConsumeMessage(ctx context.Context, broker, topic string) (string, error) { cmd := []string{"kcat", "-b", broker, "-C", "-t", topic, "-c1"} _, stdout, err := kcat.Container.Exec(ctx, cmd) - if err != nil { return "", err }