Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds integration test #467

Merged
merged 23 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
908d51e
adds integration test
rajadilipkolli Jun 6, 2023
5824c61
Merge branch 'main' into add-integration-tests
rajadilipkolli Jun 6, 2023
9dcbf80
Merge branch 'main' into add-integration-tests
rajadilipkolli Jun 7, 2023
e0c1dd9
fixes script
rajadilipkolli Jun 7, 2023
77291ee
Another way to set data
rajadilipkolli Jun 8, 2023
840f68f
Merge branch 'main' into add-integration-tests
rajadilipkolli Jun 14, 2023
8201406
Merge branch 'main' into add-integration-tests
rajadilipkolli Jul 18, 2023
daa3969
Merge branch 'main' into add-integration-tests
rajadilipkolli Jul 25, 2023
893b396
removes unnecessary declaration
rajadilipkolli Jul 25, 2023
09415eb
Merge branch 'main' into add-integration-tests
rajadilipkolli Jul 26, 2023
25724ec
Merge branch 'main' into add-integration-tests
rajadilipkolli Nov 6, 2023
3519513
Merge branch 'main' into add-integration-tests
rajadilipkolli Mar 12, 2024
e1656f4
fix : compile issue
rajadilipkolli Mar 12, 2024
e1f2dfb
adds kinesis asyncClient Bean
rajadilipkolli Mar 12, 2024
4b5a412
Merge branch 'main' into add-integration-tests
rajadilipkolli Mar 12, 2024
66c5a35
Merge remote-tracking branch 'origin/main' into add-integration-tests
rajadilipkolli Mar 13, 2024
5d66616
Merge branch 'main' into add-integration-tests
rajadilipkolli Mar 13, 2024
59d0013
Merge branch 'main' into add-integration-tests
rajadilipkolli Mar 18, 2024
081fcf5
Merge branch 'main' into add-integration-tests
rajadilipkolli Apr 22, 2024
5b1b2aa
Merge branch 'main' into add-integration-tests
rajadilipkolli Jul 29, 2024
72a79c9
Merge branch 'main' into add-integration-tests
rajadilipkolli Dec 27, 2024
62c9128
fix : compilation issue
rajadilipkolli Dec 27, 2024
a5d3ea3
implement integration test
rajadilipkolli Dec 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws-kinesis-project/producer/.localstack/init-aws.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash

awslocal kinesis create-stream --stream-name my-test-stream --shard-count 1

Expand Down
2 changes: 1 addition & 1 deletion aws-kinesis-project/producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@
<configuration>
<java>
<googleJavaFormat>
<version>1.25.0</version>
<version>1.25.2</version>
<style>AOSP</style>
</googleJavaFormat>
</java>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,60 @@
package com.learning.aws.spring;

import static org.assertj.core.api.Assertions.assertThat;

import com.amazonaws.util.BinaryUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.learning.aws.spring.common.AbstractIntegrationTest;
import com.learning.aws.spring.model.IpAddressDTO;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.messaging.Message;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

class ApplicationIntegrationTest extends AbstractIntegrationTest {

@Test
void contextLoads() {}
void contextLoads() throws InterruptedException, JsonProcessingException {

assertThat(this.messageBarrier.await(30, TimeUnit.SECONDS)).isTrue();

Message<List<Record>> message = this.messageHolder.get();
assertThat(message.getHeaders())
.containsKeys(AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM)
.doesNotContainKeys(
AwsHeaders.RECEIVED_PARTITION_KEY,
AwsHeaders.RECEIVED_SEQUENCE_NUMBER,
AwsHeaders.STREAM,
AwsHeaders.PARTITION_KEY,
AwsHeaders.CHECKPOINTER);

List<Record> payloadList = message.getPayload();

assertThat(payloadList).isNotEmpty().hasSizeGreaterThan(1);

Record item = payloadList.getFirst();
assertThat(item).isNotNull();

KinesisClientRecord kinesisClientRecord = KinesisClientRecord.fromRecord(item);

String sequenceNumber = kinesisClientRecord.sequenceNumber();
assertThat(sequenceNumber).isNotBlank();

Instant approximateArrivalTimestamp = kinesisClientRecord.approximateArrivalTimestamp();
assertThat(approximateArrivalTimestamp).isNotNull().isInstanceOf(Instant.class);

String partitionKey = kinesisClientRecord.partitionKey();
assertThat(partitionKey).isNotBlank();

String dataAsString = new String(BinaryUtils.copyBytesFrom(kinesisClientRecord.data()));
String payload = dataAsString.substring(dataAsString.indexOf("[{"));
List<IpAddressDTO> ipAddressDTOS =
objectMapper.readValue(payload, new TypeReference<>() {});
assertThat(ipAddressDTOS).isNotEmpty().hasSize(254);
}
Comment on lines +55 to +59
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Safeguard substring usage and avoid magic numbers

  1. If dataAsString.indexOf("[{") returns -1, an exception occurs. Consider adding a protective check or using a more stable parsing strategy.
  2. The hard-coded 254 size could be extracted as a constant or read from a configuration to document its purpose better.

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.learning.aws.spring;

import com.learning.aws.spring.common.ConsumerConfig;
import com.learning.aws.spring.common.ContainerConfig;
import org.springframework.boot.SpringApplication;

public class TestKinesisProducerApplication {

public static void main(String[] args) {
SpringApplication.from(KinesisProducerApplication::main)
.with(ContainerConfig.class)
.with(ContainerConfig.class, ConsumerConfig.class)
.run(args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,32 @@
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.servlet.MockMvc;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record;

@ActiveProfiles({PROFILE_TEST})
@SpringBootTest(webEnvironment = RANDOM_PORT, classes = ContainerConfig.class)
@SpringBootTest(
webEnvironment = RANDOM_PORT,
classes = {ContainerConfig.class, ConsumerConfig.class})
@AutoConfigureMockMvc
public abstract class AbstractIntegrationTest {

@Autowired protected MockMvc mockMvc;

@Autowired protected ObjectMapper objectMapper;

@Autowired protected KinesisAsyncClient amazonKinesis;

@Autowired protected CountDownLatch messageBarrier;

@Autowired protected AtomicReference<Message<List<Record>>> messageHolder;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.learning.aws.spring.common;

import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.testcontainers.containers.localstack.LocalStackContainer;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record;

@TestConfiguration
public class ConsumerConfig {

@Bean
DynamoDbAsyncClient dynamoDbAsyncClient(LocalStackContainer localStackContainer) {
return DynamoDbAsyncClient.builder()
.endpointOverride(localStackContainer.getEndpointOverride(DYNAMODB))
.region(Region.of(localStackContainer.getRegion()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
localStackContainer.getAccessKey(),
localStackContainer.getSecretKey())))
.overrideConfiguration(
ClientOverrideConfiguration.builder()
.apiCallTimeout(Duration.ofSeconds(10))
.retryPolicy(RetryPolicy.builder().numRetries(3).build())
.build())
.build();
}

@Bean
public AtomicReference<Message<List<Record>>> messageHolder() {
return new AtomicReference<>();
}

@Bean
public CountDownLatch messageBarrier() {
return new CountDownLatch(1);
}

@Bean
public Consumer<Message<List<Record>>> consumeEvent() {
return eventMessages -> {
messageHolder().set(eventMessages);
messageBarrier().countDown();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public class ContainerConfig {
@Bean
LocalStackContainer localStackContainer() {
return new LocalStackContainer(
DockerImageName.parse("localstack/localstack").withTag("4.0.3"))
.withServices(KINESIS);
DockerImageName.parse("localstack/localstack").withTag("4.0.3"));
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
cloud.aws.region.use-default-aws-region-chain=false
cloud.aws.credentials.use-default-aws-credentials-chain=false
cloud.aws.stack.auto=false
cloud.aws.region.auto=false
cloud.aws.region.static=us-east-1

application.endpoint-uri=http://localhost:4566
application.region=us-east-1

# events inbound
spring.cloud.stream.bindings.consumeEvent-in-0.destination=my-test-stream
spring.cloud.stream.bindings.consumeEvent-in-0.group=my-test-stream-Consumer-Group-1
spring.cloud.stream.bindings.consumeEvent-in-0.content-type=application/json
spring.cloud.stream.bindings.consumeEvent-in-0.consumer.header-mode=none
spring.cloud.stream.bindings.consumeEvent-in-0.consumer.use-native-decoding=true
#defaults to 1, this will process upto 5 messages concurrently, in reactive mode this is not necessary
spring.cloud.stream.bindings.consumeEvent-in-0.consumer.concurrency=5
spring.cloud.stream.kinesis.bindings.consumeEvent-in-0.consumer.listenerMode=batch

spring.cloud.function.definition=consumeEvent;producerSupplier;

#Kinesis-dynamodb-checkpoint
spring.cloud.stream.kinesis.binder.checkpoint.table=spring-stream-metadata
spring.cloud.stream.kinesis.binder.checkpoint.billingMode=provisioned
spring.cloud.stream.kinesis.binder.checkpoint.readCapacity=5
spring.cloud.stream.kinesis.binder.checkpoint.writeCapacity=5

spring.cloud.stream.kinesis.binder.locks.table=spring-stream-lock-registry
spring.cloud.stream.kinesis.binder.locks.billingMode=provisioned
spring.cloud.stream.kinesis.binder.locks.readCapacity=5
spring.cloud.stream.kinesis.binder.locks.writeCapacity=5
Loading