Skip to content

Commit

Permalink
implement integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli authored Dec 27, 2024
1 parent 62c9128 commit a5d3ea3
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 79 deletions.
2 changes: 1 addition & 1 deletion aws-kinesis-project/producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@
<configuration>
<java>
<googleJavaFormat>
<version>1.25.0</version>
<version>1.25.2</version>
<style>AOSP</style>
</googleJavaFormat>
</java>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.amazonaws.util.BinaryUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.learning.aws.spring.common.AbstractIntegrationTest;
import com.learning.aws.spring.model.IpAddressDTO;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

class ApplicationIntegrationTest extends AbstractIntegrationTest {

Expand All @@ -18,21 +23,38 @@ void contextLoads() throws InterruptedException, JsonProcessingException {

assertThat(this.messageBarrier.await(30, TimeUnit.SECONDS)).isTrue();

Message<List<?>> message = this.messageHolder.get();
Message<List<Record>> message = this.messageHolder.get();
assertThat(message.getHeaders())
.containsKeys(AwsHeaders.CHECKPOINTER, AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM)
.doesNotContainKeys(AwsHeaders.STREAM, AwsHeaders.PARTITION_KEY);
.containsKeys(AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM)
.doesNotContainKeys(
AwsHeaders.RECEIVED_PARTITION_KEY,
AwsHeaders.RECEIVED_SEQUENCE_NUMBER,
AwsHeaders.STREAM,
AwsHeaders.PARTITION_KEY,
AwsHeaders.CHECKPOINTER);

List<?> payload = message.getPayload();
assertThat(payload).hasSize(10);
List<Record> payloadList = message.getPayload();

Object item = payload.get(0);
assertThat(payloadList).isNotEmpty().hasSizeGreaterThan(1);

assertThat(item).isInstanceOf(GenericMessage.class);
Record item = payloadList.getFirst();
assertThat(item).isNotNull();

Message<?> messageFromBatch = (Message<?>) item;
KinesisClientRecord kinesisClientRecord = KinesisClientRecord.fromRecord(item);

assertThat(messageFromBatch.getPayload()).isEqualTo("Message0");
assertThat(messageFromBatch.getHeaders()).containsEntry("event.eventType", "createEvent");
String sequenceNumber = kinesisClientRecord.sequenceNumber();
assertThat(sequenceNumber).isNotBlank();

Instant approximateArrivalTimestamp = kinesisClientRecord.approximateArrivalTimestamp();
assertThat(approximateArrivalTimestamp).isNotNull().isInstanceOf(Instant.class);

String partitionKey = kinesisClientRecord.partitionKey();
assertThat(partitionKey).isNotBlank();

String dataAsString = new String(BinaryUtils.copyBytesFrom(kinesisClientRecord.data()));
String payload = dataAsString.substring(dataAsString.indexOf("[{"));
List<IpAddressDTO> ipAddressDTOS =
objectMapper.readValue(payload, new TypeReference<>() {});
assertThat(ipAddressDTOS).isNotEmpty().hasSize(254);
}
}
Original file line number Diff line number Diff line change
@@ -1,59 +1,14 @@
package com.learning.aws.spring;

import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;

import com.learning.aws.spring.common.ConsumerConfig;
import com.learning.aws.spring.common.ContainerConfig;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.testcontainers.containers.localstack.LocalStackContainer;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

@TestConfiguration(proxyBeanMethods = false)
public class TestKinesisProducerApplication {

@Bean
KinesisAsyncClient amazonKinesis(LocalStackContainer localStackContainer) {
return KinesisAsyncClient.builder()
.endpointOverride(localStackContainer.getEndpointOverride(KINESIS))
.region(Region.of(localStackContainer.getRegion()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
localStackContainer.getAccessKey(),
localStackContainer.getSecretKey())))
.build();
}

@Bean
public AtomicReference<Message<List<Message<?>>>> messageHolder() {
return new AtomicReference<>();
}

@Bean
public CountDownLatch messageBarrier() {
return new CountDownLatch(1);
}

@Bean
public Consumer<Message<List<Message<?>>>> eventConsumerBatchProcessingWithHeaders() {
return eventMessages -> {
messageHolder().set(eventMessages);
messageBarrier().countDown();
};
}

public static void main(String[] args) {
SpringApplication.from(KinesisProducerApplication::main)
.with(ContainerConfig.class)
.with(ContainerConfig.class, ConsumerConfig.class)
.run(args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,12 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.servlet.MockMvc;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record;

@ActiveProfiles({PROFILE_TEST})
@SpringBootTest(
webEnvironment = RANDOM_PORT,
properties = {
"spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.idleBetweenPolls = 1",
"spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.listenerMode = batch",
"spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.checkpointMode = manual",
"spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.useNativeDecoding = true",
"spring.cloud.stream.kinesis.binder.headers = event.eventType",
"spring.cloud.stream.kinesis.binder.autoAddShards = true"
},
classes = ContainerConfig.class)
classes = {ContainerConfig.class, ConsumerConfig.class})
@AutoConfigureMockMvc
public abstract class AbstractIntegrationTest {

Expand All @@ -38,5 +31,5 @@ public abstract class AbstractIntegrationTest {

@Autowired protected CountDownLatch messageBarrier;

@Autowired protected AtomicReference<Message<List<?>>> messageHolder;
@Autowired protected AtomicReference<Message<List<Record>>> messageHolder;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.learning.aws.spring.common;

import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.testcontainers.containers.localstack.LocalStackContainer;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record;

@TestConfiguration
public class ConsumerConfig {

@Bean
DynamoDbAsyncClient dynamoDbAsyncClient(LocalStackContainer localStackContainer) {
return DynamoDbAsyncClient.builder()
.endpointOverride(localStackContainer.getEndpointOverride(DYNAMODB))
.region(Region.of(localStackContainer.getRegion()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
localStackContainer.getAccessKey(),
localStackContainer.getSecretKey())))
.overrideConfiguration(
ClientOverrideConfiguration.builder()
.apiCallTimeout(Duration.ofSeconds(10))
.retryPolicy(RetryPolicy.builder().numRetries(3).build())
.build())
.build();
}

@Bean
public AtomicReference<Message<List<Record>>> messageHolder() {
return new AtomicReference<>();
}

@Bean
public CountDownLatch messageBarrier() {
return new CountDownLatch(1);
}

@Bean
public Consumer<Message<List<Record>>> consumeEvent() {
return eventMessages -> {
messageHolder().set(eventMessages);
messageBarrier().countDown();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public class ContainerConfig {
@Bean
LocalStackContainer localStackContainer() {
return new LocalStackContainer(
DockerImageName.parse("localstack/localstack").withTag("4.0.3"))
.withServices(KINESIS);
DockerImageName.parse("localstack/localstack").withTag("4.0.3"));
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
cloud.aws.region.use-default-aws-region-chain=false
cloud.aws.credentials.use-default-aws-credentials-chain=false
cloud.aws.stack.auto=false
cloud.aws.region.auto=false
cloud.aws.region.static=us-east-1

application.endpoint-uri=http://localhost:4566
application.region=us-east-1

# events inbound
spring.cloud.stream.bindings.consumeEvent-in-0.destination=my-test-stream
spring.cloud.stream.bindings.consumeEvent-in-0.group=my-test-stream-Consumer-Group-1
spring.cloud.stream.bindings.consumeEvent-in-0.content-type=application/json
spring.cloud.stream.bindings.consumeEvent-in-0.consumer.header-mode=none
spring.cloud.stream.bindings.consumeEvent-in-0.consumer.use-native-decoding=true
#defaults to 1, this will process upto 5 messages concurrently, in reactive mode this is not necessary
spring.cloud.stream.bindings.consumeEvent-in-0.consumer.concurrency=5
spring.cloud.stream.kinesis.bindings.consumeEvent-in-0.consumer.listenerMode=batch

spring.cloud.function.definition=consumeEvent;producerSupplier;

#Kinesis-dynamodb-checkpoint
spring.cloud.stream.kinesis.binder.checkpoint.table=spring-stream-metadata
spring.cloud.stream.kinesis.binder.checkpoint.billingMode=provisioned
spring.cloud.stream.kinesis.binder.checkpoint.readCapacity=5
spring.cloud.stream.kinesis.binder.checkpoint.writeCapacity=5

spring.cloud.stream.kinesis.binder.locks.table=spring-stream-lock-registry
spring.cloud.stream.kinesis.binder.locks.billingMode=provisioned
spring.cloud.stream.kinesis.binder.locks.readCapacity=5
spring.cloud.stream.kinesis.binder.locks.writeCapacity=5

0 comments on commit a5d3ea3

Please sign in to comment.