diff --git a/aws-kinesis-project/producer/pom.xml b/aws-kinesis-project/producer/pom.xml index b092219a..62409039 100644 --- a/aws-kinesis-project/producer/pom.xml +++ b/aws-kinesis-project/producer/pom.xml @@ -223,7 +223,7 @@ - 1.25.0 + 1.25.2 diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java index 7cd9a2f1..1ec74378 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java @@ -2,14 +2,19 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.amazonaws.util.BinaryUtils; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.learning.aws.spring.common.AbstractIntegrationTest; +import com.learning.aws.spring.model.IpAddressDTO; +import java.time.Instant; import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; import org.springframework.integration.aws.support.AwsHeaders; import org.springframework.messaging.Message; -import org.springframework.messaging.support.GenericMessage; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; class ApplicationIntegrationTest extends AbstractIntegrationTest { @@ -18,21 +23,38 @@ void contextLoads() throws InterruptedException, JsonProcessingException { assertThat(this.messageBarrier.await(30, TimeUnit.SECONDS)).isTrue(); - Message> message = this.messageHolder.get(); + Message> message = this.messageHolder.get(); assertThat(message.getHeaders()) - .containsKeys(AwsHeaders.CHECKPOINTER, AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM) - .doesNotContainKeys(AwsHeaders.STREAM, AwsHeaders.PARTITION_KEY); + .containsKeys(AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM) + .doesNotContainKeys( + AwsHeaders.RECEIVED_PARTITION_KEY, + AwsHeaders.RECEIVED_SEQUENCE_NUMBER, + AwsHeaders.STREAM, + AwsHeaders.PARTITION_KEY, + AwsHeaders.CHECKPOINTER); - List payload = message.getPayload(); - assertThat(payload).hasSize(10); + List payloadList = message.getPayload(); - Object item = payload.get(0); + assertThat(payloadList).isNotEmpty().hasSizeGreaterThan(1); - assertThat(item).isInstanceOf(GenericMessage.class); + Record item = payloadList.getFirst(); + assertThat(item).isNotNull(); - Message messageFromBatch = (Message) item; + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.fromRecord(item); - assertThat(messageFromBatch.getPayload()).isEqualTo("Message0"); - assertThat(messageFromBatch.getHeaders()).containsEntry("event.eventType", "createEvent"); + String sequenceNumber = kinesisClientRecord.sequenceNumber(); + assertThat(sequenceNumber).isNotBlank(); + + Instant approximateArrivalTimestamp = kinesisClientRecord.approximateArrivalTimestamp(); + assertThat(approximateArrivalTimestamp).isNotNull().isInstanceOf(Instant.class); + + String partitionKey = kinesisClientRecord.partitionKey(); + assertThat(partitionKey).isNotBlank(); + + String dataAsString = new String(BinaryUtils.copyBytesFrom(kinesisClientRecord.data())); + String payload = dataAsString.substring(dataAsString.indexOf("[{")); + List ipAddressDTOS = + objectMapper.readValue(payload, new TypeReference<>() {}); + assertThat(ipAddressDTOS).isNotEmpty().hasSize(254); } } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java index 15989783..1b9f48d4 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java @@ -1,59 +1,14 @@ package com.learning.aws.spring; -import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS; - +import com.learning.aws.spring.common.ConsumerConfig; import com.learning.aws.spring.common.ContainerConfig; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import org.springframework.boot.SpringApplication; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.messaging.Message; -import org.testcontainers.containers.localstack.LocalStackContainer; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -@TestConfiguration(proxyBeanMethods = false) public class TestKinesisProducerApplication { - @Bean - KinesisAsyncClient amazonKinesis(LocalStackContainer localStackContainer) { - return KinesisAsyncClient.builder() - .endpointOverride(localStackContainer.getEndpointOverride(KINESIS)) - .region(Region.of(localStackContainer.getRegion())) - .credentialsProvider( - StaticCredentialsProvider.create( - AwsBasicCredentials.create( - localStackContainer.getAccessKey(), - localStackContainer.getSecretKey()))) - .build(); - } - - @Bean - public AtomicReference>>> messageHolder() { - return new AtomicReference<>(); - } - - @Bean - public CountDownLatch messageBarrier() { - return new CountDownLatch(1); - } - - @Bean - public Consumer>>> eventConsumerBatchProcessingWithHeaders() { - return eventMessages -> { - messageHolder().set(eventMessages); - messageBarrier().countDown(); - }; - } - public static void main(String[] args) { SpringApplication.from(KinesisProducerApplication::main) - .with(ContainerConfig.class) + .with(ContainerConfig.class, ConsumerConfig.class) .run(args); } } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java index 1f098754..9a56e260 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java @@ -14,19 +14,12 @@ import org.springframework.test.context.ActiveProfiles; import org.springframework.test.web.servlet.MockMvc; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.Record; @ActiveProfiles({PROFILE_TEST}) @SpringBootTest( webEnvironment = RANDOM_PORT, - properties = { - "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.idleBetweenPolls = 1", - "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.listenerMode = batch", - "spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.checkpointMode = manual", - "spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.useNativeDecoding = true", - "spring.cloud.stream.kinesis.binder.headers = event.eventType", - "spring.cloud.stream.kinesis.binder.autoAddShards = true" - }, - classes = ContainerConfig.class) + classes = {ContainerConfig.class, ConsumerConfig.class}) @AutoConfigureMockMvc public abstract class AbstractIntegrationTest { @@ -38,5 +31,5 @@ public abstract class AbstractIntegrationTest { @Autowired protected CountDownLatch messageBarrier; - @Autowired protected AtomicReference>> messageHolder; + @Autowired protected AtomicReference>> messageHolder; } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ConsumerConfig.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ConsumerConfig.java new file mode 100644 index 00000000..ae96ea24 --- /dev/null +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ConsumerConfig.java @@ -0,0 +1,60 @@ +package com.learning.aws.spring.common; + +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.testcontainers.containers.localstack.LocalStackContainer; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.model.Record; + +@TestConfiguration +public class ConsumerConfig { + + @Bean + DynamoDbAsyncClient dynamoDbAsyncClient(LocalStackContainer localStackContainer) { + return DynamoDbAsyncClient.builder() + .endpointOverride(localStackContainer.getEndpointOverride(DYNAMODB)) + .region(Region.of(localStackContainer.getRegion())) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create( + localStackContainer.getAccessKey(), + localStackContainer.getSecretKey()))) + .overrideConfiguration( + ClientOverrideConfiguration.builder() + .apiCallTimeout(Duration.ofSeconds(10)) + .retryPolicy(RetryPolicy.builder().numRetries(3).build()) + .build()) + .build(); + } + + @Bean + public AtomicReference>> messageHolder() { + return new AtomicReference<>(); + } + + @Bean + public CountDownLatch messageBarrier() { + return new CountDownLatch(1); + } + + @Bean + public Consumer>> consumeEvent() { + return eventMessages -> { + messageHolder().set(eventMessages); + messageBarrier().countDown(); + }; + } +} diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java index 01f1e6b3..1e9274fc 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java @@ -25,8 +25,7 @@ public class ContainerConfig { @Bean LocalStackContainer localStackContainer() { return new LocalStackContainer( - DockerImageName.parse("localstack/localstack").withTag("4.0.3")) - .withServices(KINESIS); + DockerImageName.parse("localstack/localstack").withTag("4.0.3")); } @Bean diff --git a/aws-kinesis-project/producer/src/test/resources/application-test.properties b/aws-kinesis-project/producer/src/test/resources/application-test.properties index 949d8220..593816b1 100644 --- a/aws-kinesis-project/producer/src/test/resources/application-test.properties +++ b/aws-kinesis-project/producer/src/test/resources/application-test.properties @@ -1,8 +1,23 @@ -cloud.aws.region.use-default-aws-region-chain=false -cloud.aws.credentials.use-default-aws-credentials-chain=false -cloud.aws.stack.auto=false -cloud.aws.region.auto=false -cloud.aws.region.static=us-east-1 - -application.endpoint-uri=http://localhost:4566 -application.region=us-east-1 + +# events inbound +spring.cloud.stream.bindings.consumeEvent-in-0.destination=my-test-stream +spring.cloud.stream.bindings.consumeEvent-in-0.group=my-test-stream-Consumer-Group-1 +spring.cloud.stream.bindings.consumeEvent-in-0.content-type=application/json +spring.cloud.stream.bindings.consumeEvent-in-0.consumer.header-mode=none +spring.cloud.stream.bindings.consumeEvent-in-0.consumer.use-native-decoding=true +#defaults to 1, this will process upto 5 messages concurrently, in reactive mode this is not necessary +spring.cloud.stream.bindings.consumeEvent-in-0.consumer.concurrency=5 +spring.cloud.stream.kinesis.bindings.consumeEvent-in-0.consumer.listenerMode=batch + +spring.cloud.function.definition=consumeEvent;producerSupplier; + +#Kinesis-dynamodb-checkpoint +spring.cloud.stream.kinesis.binder.checkpoint.table=spring-stream-metadata +spring.cloud.stream.kinesis.binder.checkpoint.billingMode=provisioned +spring.cloud.stream.kinesis.binder.checkpoint.readCapacity=5 +spring.cloud.stream.kinesis.binder.checkpoint.writeCapacity=5 + +spring.cloud.stream.kinesis.binder.locks.table=spring-stream-lock-registry +spring.cloud.stream.kinesis.binder.locks.billingMode=provisioned +spring.cloud.stream.kinesis.binder.locks.readCapacity=5 +spring.cloud.stream.kinesis.binder.locks.writeCapacity=5