diff --git a/modules/kafka/kafka_test.go b/modules/kafka/kafka_test.go index f58a58ba5d..b1102799cf 100644 --- a/modules/kafka/kafka_test.go +++ b/modules/kafka/kafka_test.go @@ -113,9 +113,11 @@ func TestKafka_networkConnectivity(t *testing.T) { }), ) // } + testcontainers.CleanupContainer(t, kafkaContainer) require.NoError(t, err, "failed to start kafka container") kcat, err := runKcatContainer(ctx, Network.Name, "/tmp/msgs.txt") + testcontainers.CleanupContainer(t, kcat) require.NoError(t, err, "failed to start kcat") // 4. Copy message to kcat @@ -193,10 +195,8 @@ func TestKafka_networkConnectivity(t *testing.T) { t.Fatal("Empty message") } - // Assert - if !strings.Contains(string(consumer.message.Value), text_msg) { - t.Error("got wrong string") - } + require.Contains(t, string(consumer.message.Value), text_msg) + require.Contains(t, string(consumer.message.Key), key) } func TestKafka_withListener(t *testing.T) { @@ -206,6 +206,12 @@ func TestKafka_withListener(t *testing.T) { rpNetwork, err := network.New(ctx) require.NoError(t, err) + t.Cleanup(func() { + if err := rpNetwork.Remove(ctx); err != nil { + t.Fatalf("failed to remove network: %s", err) + } + }) + // 2. Start Kafka ctr // withListenerRP { ctr, err := kafka.Run(ctx, @@ -220,13 +226,14 @@ func TestKafka_withListener(t *testing.T) { }), ) // } + testcontainers.CleanupContainer(t, ctr) require.NoError(t, err) // 3. Start KCat container // withListenerKcat { kcat, err := runKcatContainer(ctx, rpNetwork.Name, "/tmp/msgs.txt") // } - + testcontainers.CleanupContainer(t, kcat) require.NoError(t, err) // 4. Copy message to kcat @@ -245,19 +252,6 @@ func TestKafka_withListener(t *testing.T) { out, err := kcat.ConsumeMessage(ctx, "kafka:9092", "msgs") require.NoError(t, err) require.Contains(t, string(out), "Message produced by kcat") - - t.Cleanup(func() { - if err := kcat.Terminate(ctx); err != nil { - t.Fatalf("failed to terminate kcat container: %s", err) - } - if err := ctr.Terminate(ctx); err != nil { - t.Fatalf("failed to terminate Kafka container: %s", err) - } - - if err := rpNetwork.Remove(ctx); err != nil { - t.Fatalf("failed to remove network: %s", err) - } - }) } func TestKafka_restProxyService(t *testing.T) {