From 908d51e9329bad8096dedbcd14ba262e33aaee98 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Tue, 6 Jun 2023 06:42:32 +0000 Subject: [PATCH 1/8] adds integration test --- .../producer/docker/docker-compose.yml | 3 +- .../spring/ApplicationIntegrationTest.java | 59 ++++++++++++++++++- .../common/AbstractIntegrationTest.java | 24 +++++++- .../aws/spring/common/LocalStackConfig.java | 27 ++++++++- 4 files changed, 106 insertions(+), 7 deletions(-) diff --git a/aws-kinesis-project/producer/docker/docker-compose.yml b/aws-kinesis-project/producer/docker/docker-compose.yml index 53df9ce4..5e383add 100644 --- a/aws-kinesis-project/producer/docker/docker-compose.yml +++ b/aws-kinesis-project/producer/docker/docker-compose.yml @@ -6,8 +6,7 @@ services: ports: - "4566:4566" environment: - - EAGER_SERVICE_LOADING=1 - - SERVICES=kinesis, dynamodb + - SERVICES=kinesis - DEFAULT_REGION=us-east-1 - EDGE_PORT=4566 volumes: diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java index 5aea7f5d..5ac7966b 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java @@ -1,10 +1,67 @@ package com.learning.aws.spring; +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; import com.learning.aws.spring.common.AbstractIntegrationTest; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; +import org.springframework.integration.aws.support.AwsHeaders; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; class ApplicationIntegrationTest extends AbstractIntegrationTest { + private static final String KINESIS_STREAM = "test_stream"; + @Test - void contextLoads() {} + void contextLoads() throws InterruptedException, JsonProcessingException { + PutRecordsRequest.Builder putRecordsRequest = + PutRecordsRequest.builder().streamName(KINESIS_STREAM); + + List putRecordsRequestEntryList = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + Message eventMessages = + MessageBuilder.withPayload("Message" + i) + .setHeader("event.eventType", "createEvent") + .build(); + PutRecordsRequestEntry putRecordsRequestEntry = + PutRecordsRequestEntry.builder() + .partitionKey("1") + .data( + SdkBytes.fromByteArray( + objectMapper.writeValueAsBytes(eventMessages))) + .build(); + putRecordsRequestEntryList.add(putRecordsRequestEntry); + } + putRecordsRequest.records(putRecordsRequestEntryList); + + amazonKinesis.putRecords(putRecordsRequest.build()); + + assertThat(this.messageBarrier.await(30, TimeUnit.SECONDS)).isTrue(); + + Message> message = this.messageHolder.get(); + assertThat(message.getHeaders()) + .containsKeys(AwsHeaders.CHECKPOINTER, AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM) + .doesNotContainKeys(AwsHeaders.STREAM, AwsHeaders.PARTITION_KEY); + + List payload = message.getPayload(); + assertThat(payload).hasSize(10); + + Object item = payload.get(0); + + assertThat(item).isInstanceOf(GenericMessage.class); + + Message messageFromBatch = (Message) item; + + assertThat(messageFromBatch.getPayload()).isEqualTo("Message0"); + assertThat(messageFromBatch.getHeaders()).containsEntry("event.eventType", "createEvent"); + } } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java index 8adcfb6c..25fd98af 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java @@ -4,15 +4,31 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Import; +import org.springframework.messaging.Message; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.web.servlet.MockMvc; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @ActiveProfiles({PROFILE_TEST}) -@SpringBootTest(webEnvironment = RANDOM_PORT) +@SpringBootTest( + webEnvironment = RANDOM_PORT, + properties = { + "spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.destination=test_stream", + "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.idleBetweenPolls = 1", + "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.listenerMode = batch", + "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.checkpointMode = manual", + "spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.useNativeDecoding = true", + "spring.cloud.stream.kinesis.binder.headers = event.eventType", + "spring.cloud.stream.kinesis.binder.autoAddShards = true", + "spring.cloud.aws.region.static=eu-west-2" + }) @Import(LocalStackConfig.class) @AutoConfigureMockMvc public abstract class AbstractIntegrationTest { @@ -20,4 +36,10 @@ public abstract class AbstractIntegrationTest { @Autowired protected MockMvc mockMvc; @Autowired protected ObjectMapper objectMapper; + + @Autowired protected KinesisAsyncClient amazonKinesis; + + @Autowired protected CountDownLatch messageBarrier; + + @Autowired protected AtomicReference>> messageHolder; } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java index e75dce36..0cd970b3 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java @@ -2,9 +2,14 @@ import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; +import org.springframework.messaging.Message; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; @@ -12,16 +17,14 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -@TestConfiguration +@TestConfiguration(proxyBeanMethods = false) public class LocalStackConfig { static LocalStackContainer localStackContainer; static { - System.setProperty("com.amazonaws.sdk.disableCbor", "true"); localStackContainer = new LocalStackContainer( DockerImageName.parse("localstack/localstack").withTag("2.1.0")) - .withEnv("EAGER_SERVICE_LOADING", "1") .withServices(KINESIS) .withExposedPorts(4566); localStackContainer.start(); @@ -40,4 +43,22 @@ public KinesisAsyncClient amazonKinesis() { localStackContainer.getSecretKey()))) .build(); } + + @Bean + public AtomicReference>>> messageHolder() { + return new AtomicReference<>(); + } + + @Bean + public CountDownLatch messageBarrier() { + return new CountDownLatch(1); + } + + @Bean + public Consumer>>> eventConsumerBatchProcessingWithHeaders() { + return eventMessages -> { + messageHolder().set(eventMessages); + messageBarrier().countDown(); + }; + } } From e0c1dd9993fa6386185da02a018d1d847376355f Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Wed, 7 Jun 2023 16:49:25 +0530 Subject: [PATCH 2/8] fixes script --- .../producer/.localstack/init-aws.sh | 94 ++++++++++++++++++- .../producer/docker/docker-compose.yml | 2 +- 2 files changed, 93 insertions(+), 3 deletions(-) diff --git a/aws-kinesis-project/producer/.localstack/init-aws.sh b/aws-kinesis-project/producer/.localstack/init-aws.sh index 96dbbe8b..22b9a078 100755 --- a/aws-kinesis-project/producer/.localstack/init-aws.sh +++ b/aws-kinesis-project/producer/.localstack/init-aws.sh @@ -1,6 +1,91 @@ #!/bin/bash -awslocal kinesis create-stream --stream-name my-test-stream --shard-count 1 +# Variables +USER_NAME="localstack" +ROLE_NAME="localstack-role" +POLICY_NAME="kinesis-policy" +STREAM_NAME="my-test-stream" +SHARD_COUNT=1 + +# Create IAM user +awslocal iam create-user --user-name $USER_NAME + +# Create IAM role +awslocal iam create-role --role-name $ROLE_NAME --assume-role-policy-document '{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "kinesis.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +}' + +# Create IAM policy +cat > policy.json < Date: Thu, 8 Jun 2023 08:15:48 +0000 Subject: [PATCH 3/8] Another way to set data --- .../producer/.localstack/init-aws.sh | 110 ------------------ .../producer/docker/docker-compose.yml | 1 - aws-kinesis-project/producer/pom.xml | 2 + .../spring/ApplicationIntegrationTest.java | 29 ----- .../common/AbstractIntegrationTest.java | 4 +- .../aws/spring/common/LocalStackConfig.java | 38 ++---- 6 files changed, 14 insertions(+), 170 deletions(-) delete mode 100755 aws-kinesis-project/producer/.localstack/init-aws.sh diff --git a/aws-kinesis-project/producer/.localstack/init-aws.sh b/aws-kinesis-project/producer/.localstack/init-aws.sh deleted file mode 100755 index 22b9a078..00000000 --- a/aws-kinesis-project/producer/.localstack/init-aws.sh +++ /dev/null @@ -1,110 +0,0 @@ -#!/bin/bash - -# Variables -USER_NAME="localstack" -ROLE_NAME="localstack-role" -POLICY_NAME="kinesis-policy" -STREAM_NAME="my-test-stream" -SHARD_COUNT=1 - -# Create IAM user -awslocal iam create-user --user-name $USER_NAME - -# Create IAM role -awslocal iam create-role --role-name $ROLE_NAME --assume-role-policy-document '{ - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": { - "Service": "kinesis.amazonaws.com" - }, - "Action": "sts:AssumeRole" - } - ] -}' - -# Create IAM policy -cat > policy.json <2.1.0 2.12.0 + 1.18.3 + ${project.build.directory}/test-results 2.37.0 8.2.1 diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java index 5ac7966b..7cd9a2f1 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java @@ -4,46 +4,17 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.learning.aws.spring.common.AbstractIntegrationTest; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; import org.springframework.integration.aws.support.AwsHeaders; -import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; -import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; -import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; class ApplicationIntegrationTest extends AbstractIntegrationTest { - private static final String KINESIS_STREAM = "test_stream"; - @Test void contextLoads() throws InterruptedException, JsonProcessingException { - PutRecordsRequest.Builder putRecordsRequest = - PutRecordsRequest.builder().streamName(KINESIS_STREAM); - - List putRecordsRequestEntryList = new ArrayList<>(); - - for (int i = 0; i < 10; i++) { - Message eventMessages = - MessageBuilder.withPayload("Message" + i) - .setHeader("event.eventType", "createEvent") - .build(); - PutRecordsRequestEntry putRecordsRequestEntry = - PutRecordsRequestEntry.builder() - .partitionKey("1") - .data( - SdkBytes.fromByteArray( - objectMapper.writeValueAsBytes(eventMessages))) - .build(); - putRecordsRequestEntryList.add(putRecordsRequestEntry); - } - putRecordsRequest.records(putRecordsRequestEntryList); - - amazonKinesis.putRecords(putRecordsRequest.build()); assertThat(this.messageBarrier.await(30, TimeUnit.SECONDS)).isTrue(); diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java index 25fd98af..2811c5cb 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java @@ -20,14 +20,12 @@ @SpringBootTest( webEnvironment = RANDOM_PORT, properties = { - "spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.destination=test_stream", "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.idleBetweenPolls = 1", "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.listenerMode = batch", "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.checkpointMode = manual", "spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.useNativeDecoding = true", "spring.cloud.stream.kinesis.binder.headers = event.eventType", - "spring.cloud.stream.kinesis.binder.autoAddShards = true", - "spring.cloud.aws.region.static=eu-west-2" + "spring.cloud.stream.kinesis.binder.autoAddShards = true" }) @Import(LocalStackConfig.class) @AutoConfigureMockMvc diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java index 0cd970b3..caf8cc2a 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java @@ -1,47 +1,31 @@ package com.learning.aws.spring.common; -import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS; - import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.messaging.Message; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.utility.DockerImageName; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @TestConfiguration(proxyBeanMethods = false) public class LocalStackConfig { - static LocalStackContainer localStackContainer; + + static final LocalStackContainer localStackContainer = + new LocalStackContainer( + DockerImageName.parse("localstack/localstack").withTag("2.1.0")); static { - localStackContainer = - new LocalStackContainer( - DockerImageName.parse("localstack/localstack").withTag("2.1.0")) - .withServices(KINESIS) - .withExposedPorts(4566); localStackContainer.start(); - } - - @Bean - @Primary - public KinesisAsyncClient amazonKinesis() { - return KinesisAsyncClient.builder() - .endpointOverride(localStackContainer.getEndpointOverride(KINESIS)) - .region(Region.of(localStackContainer.getRegion())) - .credentialsProvider( - StaticCredentialsProvider.create( - AwsBasicCredentials.create( - localStackContainer.getAccessKey(), - localStackContainer.getSecretKey()))) - .build(); + System.setProperty( + "spring.cloud.aws.endpoint", localStackContainer.getEndpoint().toString()); + System.setProperty( + "spring.cloud.aws.credentials.access-key", localStackContainer.getAccessKey()); + System.setProperty( + "spring.cloud.aws.credentials.secret-key", localStackContainer.getSecretKey()); + System.setProperty("spring.cloud.aws.region.static", localStackContainer.getRegion()); } @Bean From 893b396dcf9902ee536c80a35d92d5972b7960d0 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Tue, 25 Jul 2023 19:37:36 +0530 Subject: [PATCH 4/8] removes unnecessary declaration --- aws-kinesis-project/producer/pom.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/aws-kinesis-project/producer/pom.xml b/aws-kinesis-project/producer/pom.xml index ab439718..42c6dbbf 100644 --- a/aws-kinesis-project/producer/pom.xml +++ b/aws-kinesis-project/producer/pom.xml @@ -23,8 +23,6 @@ 2.1.0 2.13.0 - 1.18.3 - ${project.build.directory}/test-results 2.38.0 8.3.1 From e1656f4672e9d3a86932ec5f4dc19745bb46c605 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Tue, 12 Mar 2024 08:03:26 +0000 Subject: [PATCH 5/8] fix : compile issue --- aws-kinesis-project/producer/pom.xml | 2 +- .../java/com/learning/aws/spring/common/LocalStackConfig.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/aws-kinesis-project/producer/pom.xml b/aws-kinesis-project/producer/pom.xml index 43fa0070..2cdf45dd 100644 --- a/aws-kinesis-project/producer/pom.xml +++ b/aws-kinesis-project/producer/pom.xml @@ -227,7 +227,7 @@ - 1.18.1 + 1.19.2 diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java index 2063d255..5e44c349 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java @@ -1,5 +1,7 @@ package com.learning.aws.spring.common; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS; + import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -20,7 +22,6 @@ public class LocalStackConfig { localStackContainer = new LocalStackContainer( DockerImageName.parse("localstack/localstack").withTag("3.2.0")) - .withEnv("EAGER_SERVICE_LOADING", "1") .withServices(KINESIS) .withExposedPorts(4566); localStackContainer.start(); From e1f2dfb6b9b6706c03bb0d38d1d426ba095dc2c6 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Tue, 12 Mar 2024 08:11:54 +0000 Subject: [PATCH 6/8] adds kinesis asyncClient Bean --- .../aws/spring/common/LocalStackConfig.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java index 5e44c349..c54a2fa3 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/LocalStackConfig.java @@ -8,9 +8,14 @@ import java.util.function.Consumer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; import org.springframework.messaging.Message; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @TestConfiguration(proxyBeanMethods = false) public class LocalStackConfig { @@ -25,13 +30,20 @@ public class LocalStackConfig { .withServices(KINESIS) .withExposedPorts(4566); localStackContainer.start(); - System.setProperty( - "spring.cloud.aws.endpoint", localStackContainer.getEndpoint().toString()); - System.setProperty( - "spring.cloud.aws.credentials.access-key", localStackContainer.getAccessKey()); - System.setProperty( - "spring.cloud.aws.credentials.secret-key", localStackContainer.getSecretKey()); - System.setProperty("spring.cloud.aws.region.static", localStackContainer.getRegion()); + } + + @Bean + @Primary + public KinesisAsyncClient amazonKinesis() { + return KinesisAsyncClient.builder() + .endpointOverride(localStackContainer.getEndpointOverride(KINESIS)) + .region(Region.of(localStackContainer.getRegion())) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create( + localStackContainer.getAccessKey(), + localStackContainer.getSecretKey()))) + .build(); } @Bean From 62c9128efc6458dd9b1f124531f819b7953470f6 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Fri, 27 Dec 2024 08:34:44 +0000 Subject: [PATCH 7/8] fix : compilation issue --- .../learning/aws/spring/TestKinesisProducerApplication.java | 6 ++---- .../learning/aws/spring/common/AbstractIntegrationTest.java | 1 - 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java index 7798e18a..15989783 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java @@ -2,6 +2,7 @@ import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS; +import com.learning.aws.spring.common.ContainerConfig; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -10,16 +11,13 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; -import org.springframework.test.context.DynamicPropertyRegistry; import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import com.learning.aws.spring.common.ContainerConfig; -import org.springframework.boot.SpringApplication; +@TestConfiguration(proxyBeanMethods = false) public class TestKinesisProducerApplication { @Bean diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java index 8bbbec1f..1f098754 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java @@ -4,7 +4,6 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; import com.fasterxml.jackson.databind.ObjectMapper; -import com.learning.aws.spring.TestKinesisProducerApplication; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; From a5d3ea350db29aac6e04a7678eb4ce6372422e4f Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Fri, 27 Dec 2024 10:40:17 +0000 Subject: [PATCH 8/8] implement integration test --- aws-kinesis-project/producer/pom.xml | 2 +- .../spring/ApplicationIntegrationTest.java | 44 ++++++++++---- .../TestKinesisProducerApplication.java | 49 +-------------- .../common/AbstractIntegrationTest.java | 13 +--- .../aws/spring/common/ConsumerConfig.java | 60 +++++++++++++++++++ .../aws/spring/common/ContainerConfig.java | 3 +- .../resources/application-test.properties | 31 +++++++--- 7 files changed, 123 insertions(+), 79 deletions(-) create mode 100644 aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ConsumerConfig.java diff --git a/aws-kinesis-project/producer/pom.xml b/aws-kinesis-project/producer/pom.xml index b092219a..62409039 100644 --- a/aws-kinesis-project/producer/pom.xml +++ b/aws-kinesis-project/producer/pom.xml @@ -223,7 +223,7 @@ - 1.25.0 + 1.25.2 diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java index 7cd9a2f1..1ec74378 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java @@ -2,14 +2,19 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.amazonaws.util.BinaryUtils; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.learning.aws.spring.common.AbstractIntegrationTest; +import com.learning.aws.spring.model.IpAddressDTO; +import java.time.Instant; import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; import org.springframework.integration.aws.support.AwsHeaders; import org.springframework.messaging.Message; -import org.springframework.messaging.support.GenericMessage; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; class ApplicationIntegrationTest extends AbstractIntegrationTest { @@ -18,21 +23,38 @@ void contextLoads() throws InterruptedException, JsonProcessingException { assertThat(this.messageBarrier.await(30, TimeUnit.SECONDS)).isTrue(); - Message> message = this.messageHolder.get(); + Message> message = this.messageHolder.get(); assertThat(message.getHeaders()) - .containsKeys(AwsHeaders.CHECKPOINTER, AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM) - .doesNotContainKeys(AwsHeaders.STREAM, AwsHeaders.PARTITION_KEY); + .containsKeys(AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM) + .doesNotContainKeys( + AwsHeaders.RECEIVED_PARTITION_KEY, + AwsHeaders.RECEIVED_SEQUENCE_NUMBER, + AwsHeaders.STREAM, + AwsHeaders.PARTITION_KEY, + AwsHeaders.CHECKPOINTER); - List payload = message.getPayload(); - assertThat(payload).hasSize(10); + List payloadList = message.getPayload(); - Object item = payload.get(0); + assertThat(payloadList).isNotEmpty().hasSizeGreaterThan(1); - assertThat(item).isInstanceOf(GenericMessage.class); + Record item = payloadList.getFirst(); + assertThat(item).isNotNull(); - Message messageFromBatch = (Message) item; + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.fromRecord(item); - assertThat(messageFromBatch.getPayload()).isEqualTo("Message0"); - assertThat(messageFromBatch.getHeaders()).containsEntry("event.eventType", "createEvent"); + String sequenceNumber = kinesisClientRecord.sequenceNumber(); + assertThat(sequenceNumber).isNotBlank(); + + Instant approximateArrivalTimestamp = kinesisClientRecord.approximateArrivalTimestamp(); + assertThat(approximateArrivalTimestamp).isNotNull().isInstanceOf(Instant.class); + + String partitionKey = kinesisClientRecord.partitionKey(); + assertThat(partitionKey).isNotBlank(); + + String dataAsString = new String(BinaryUtils.copyBytesFrom(kinesisClientRecord.data())); + String payload = dataAsString.substring(dataAsString.indexOf("[{")); + List ipAddressDTOS = + objectMapper.readValue(payload, new TypeReference<>() {}); + assertThat(ipAddressDTOS).isNotEmpty().hasSize(254); } } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java index 15989783..1b9f48d4 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java @@ -1,59 +1,14 @@ package com.learning.aws.spring; -import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS; - +import com.learning.aws.spring.common.ConsumerConfig; import com.learning.aws.spring.common.ContainerConfig; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import org.springframework.boot.SpringApplication; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.messaging.Message; -import org.testcontainers.containers.localstack.LocalStackContainer; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -@TestConfiguration(proxyBeanMethods = false) public class TestKinesisProducerApplication { - @Bean - KinesisAsyncClient amazonKinesis(LocalStackContainer localStackContainer) { - return KinesisAsyncClient.builder() - .endpointOverride(localStackContainer.getEndpointOverride(KINESIS)) - .region(Region.of(localStackContainer.getRegion())) - .credentialsProvider( - StaticCredentialsProvider.create( - AwsBasicCredentials.create( - localStackContainer.getAccessKey(), - localStackContainer.getSecretKey()))) - .build(); - } - - @Bean - public AtomicReference>>> messageHolder() { - return new AtomicReference<>(); - } - - @Bean - public CountDownLatch messageBarrier() { - return new CountDownLatch(1); - } - - @Bean - public Consumer>>> eventConsumerBatchProcessingWithHeaders() { - return eventMessages -> { - messageHolder().set(eventMessages); - messageBarrier().countDown(); - }; - } - public static void main(String[] args) { SpringApplication.from(KinesisProducerApplication::main) - .with(ContainerConfig.class) + .with(ContainerConfig.class, ConsumerConfig.class) .run(args); } } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java index 1f098754..9a56e260 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java @@ -14,19 +14,12 @@ import org.springframework.test.context.ActiveProfiles; import org.springframework.test.web.servlet.MockMvc; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.Record; @ActiveProfiles({PROFILE_TEST}) @SpringBootTest( webEnvironment = RANDOM_PORT, - properties = { - "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.idleBetweenPolls = 1", - "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.listenerMode = batch", - "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.checkpointMode = manual", - "spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.useNativeDecoding = true", - "spring.cloud.stream.kinesis.binder.headers = event.eventType", - "spring.cloud.stream.kinesis.binder.autoAddShards = true" - }, - classes = ContainerConfig.class) + classes = {ContainerConfig.class, ConsumerConfig.class}) @AutoConfigureMockMvc public abstract class AbstractIntegrationTest { @@ -38,5 +31,5 @@ public abstract class AbstractIntegrationTest { @Autowired protected CountDownLatch messageBarrier; - @Autowired protected AtomicReference>> messageHolder; + @Autowired protected AtomicReference>> messageHolder; } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ConsumerConfig.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ConsumerConfig.java new file mode 100644 index 00000000..ae96ea24 --- /dev/null +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ConsumerConfig.java @@ -0,0 +1,60 @@ +package com.learning.aws.spring.common; + +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.testcontainers.containers.localstack.LocalStackContainer; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.model.Record; + +@TestConfiguration +public class ConsumerConfig { + + @Bean + DynamoDbAsyncClient dynamoDbAsyncClient(LocalStackContainer localStackContainer) { + return DynamoDbAsyncClient.builder() + .endpointOverride(localStackContainer.getEndpointOverride(DYNAMODB)) + .region(Region.of(localStackContainer.getRegion())) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create( + localStackContainer.getAccessKey(), + localStackContainer.getSecretKey()))) + .overrideConfiguration( + ClientOverrideConfiguration.builder() + .apiCallTimeout(Duration.ofSeconds(10)) + .retryPolicy(RetryPolicy.builder().numRetries(3).build()) + .build()) + .build(); + } + + @Bean + public AtomicReference>> messageHolder() { + return new AtomicReference<>(); + } + + @Bean + public CountDownLatch messageBarrier() { + return new CountDownLatch(1); + } + + @Bean + public Consumer>> consumeEvent() { + return eventMessages -> { + messageHolder().set(eventMessages); + messageBarrier().countDown(); + }; + } +} diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java index 01f1e6b3..1e9274fc 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java @@ -25,8 +25,7 @@ public class ContainerConfig { @Bean LocalStackContainer localStackContainer() { return new LocalStackContainer( - DockerImageName.parse("localstack/localstack").withTag("4.0.3")) - .withServices(KINESIS); + DockerImageName.parse("localstack/localstack").withTag("4.0.3")); } @Bean diff --git a/aws-kinesis-project/producer/src/test/resources/application-test.properties b/aws-kinesis-project/producer/src/test/resources/application-test.properties index 949d8220..593816b1 100644 --- a/aws-kinesis-project/producer/src/test/resources/application-test.properties +++ b/aws-kinesis-project/producer/src/test/resources/application-test.properties @@ -1,8 +1,23 @@ -cloud.aws.region.use-default-aws-region-chain=false -cloud.aws.credentials.use-default-aws-credentials-chain=false -cloud.aws.stack.auto=false -cloud.aws.region.auto=false -cloud.aws.region.static=us-east-1 - -application.endpoint-uri=http://localhost:4566 -application.region=us-east-1 + +# events inbound +spring.cloud.stream.bindings.consumeEvent-in-0.destination=my-test-stream +spring.cloud.stream.bindings.consumeEvent-in-0.group=my-test-stream-Consumer-Group-1 +spring.cloud.stream.bindings.consumeEvent-in-0.content-type=application/json +spring.cloud.stream.bindings.consumeEvent-in-0.consumer.header-mode=none +spring.cloud.stream.bindings.consumeEvent-in-0.consumer.use-native-decoding=true +#defaults to 1, this will process upto 5 messages concurrently, in reactive mode this is not necessary +spring.cloud.stream.bindings.consumeEvent-in-0.consumer.concurrency=5 +spring.cloud.stream.kinesis.bindings.consumeEvent-in-0.consumer.listenerMode=batch + +spring.cloud.function.definition=consumeEvent;producerSupplier; + +#Kinesis-dynamodb-checkpoint +spring.cloud.stream.kinesis.binder.checkpoint.table=spring-stream-metadata +spring.cloud.stream.kinesis.binder.checkpoint.billingMode=provisioned +spring.cloud.stream.kinesis.binder.checkpoint.readCapacity=5 +spring.cloud.stream.kinesis.binder.checkpoint.writeCapacity=5 + +spring.cloud.stream.kinesis.binder.locks.table=spring-stream-lock-registry +spring.cloud.stream.kinesis.binder.locks.billingMode=provisioned +spring.cloud.stream.kinesis.binder.locks.readCapacity=5 +spring.cloud.stream.kinesis.binder.locks.writeCapacity=5