From 7457d8a224abb94f75c60884d4b825d23ea0b915 Mon Sep 17 00:00:00 2001 From: Vladimir Orany Date: Tue, 25 Jun 2024 15:52:14 +0200 Subject: [PATCH] Retain order of items returned by getAll for DynamoDB v2 (#247) * Retain order of items returned by getAll for DynamoDB v2 * Retain order of items returned by getAll for DynamoDB v2 * added test, clean up * fixed sorting for async service --- .../dynamodb/DefaultAsyncDynamoDbService.java | 21 +++++++++++--- .../dynamodb/DefaultDynamoDbService.java | 28 +++++++++++++++---- ...AdvancedQueryOnDeclarativeServiceTest.java | 8 ++++++ 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultAsyncDynamoDbService.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultAsyncDynamoDbService.java index e6c877cb4..c45a47e6e 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultAsyncDynamoDbService.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultAsyncDynamoDbService.java @@ -19,7 +19,11 @@ import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.SecondaryPartitionKey; import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.SecondarySortKey; -import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.*; +import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.Builders; +import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.DetachedQuery; +import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.DetachedScan; +import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.DetachedUpdate; +import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.UpdateBuilder; import com.agorapulse.micronaut.amazon.awssdk.dynamodb.events.DynamoDbEvent; import io.micronaut.context.event.ApplicationEventPublisher; import io.micronaut.core.annotation.AnnotationValue; @@ -50,10 +54,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -286,9 +293,15 @@ private DetachedQuery simplePartitionAndSort(Object partitionKey, Object sort private Publisher getAll(AttributeValue hashKey, Publisher rangeKeys) { TableSchema tableSchema = table.tableSchema(); - return Flux.from(rangeKeys).buffer(BATCH_SIZE).map(batchRangeKeys -> enhancedClient.batchGetItem(b -> b.readBatches(batchRangeKeys.stream().map(k -> - ReadBatch.builder(tableSchema.itemType().rawClass()).mappedTableResource(table).addGetItem(Key.builder().partitionValue(hashKey).sortValue(k).build()).build() - ).toList()))).flatMap(r -> Flux.from(r.resultsForTable(table)).map(this::postLoad)); + Map order = new ConcurrentHashMap<>(); + AtomicInteger counter = new AtomicInteger(); + Comparator comparator = Comparator.comparingInt(i -> order.getOrDefault(tableSchema.attributeValue(i, tableSchema.tableMetadata().primarySortKey().get()), 0)); + + return Flux.from(rangeKeys).buffer(BATCH_SIZE).map(batchRangeKeys -> enhancedClient.batchGetItem(b -> b.readBatches(batchRangeKeys.stream().map(k -> { + order.put(k, counter.getAndIncrement()); + return ReadBatch.builder(tableSchema.itemType().rawClass()).mappedTableResource(table).addGetItem(Key.builder().partitionValue(hashKey).sortValue(k).build()).build(); + } + ).toList()))).flatMap(r -> Flux.from(r.resultsForTable(table)).map(this::postLoad)).sort(comparator); } private Map getProjectionTypes() { diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDbService.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDbService.java index d87af0314..b5c178769 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDbService.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDbService.java @@ -49,7 +49,16 @@ import io.micronaut.core.annotation.Nullable; import java.lang.annotation.Annotation; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; @@ -280,11 +289,18 @@ private DetachedQuery simplePartitionAndSort(Object partitionKey, Object sort private Publisher getAll(AttributeValue hashKey, Publisher rangeKeys) { TableSchema tableSchema = table.tableSchema(); - return Flux.from(rangeKeys).buffer(BATCH_SIZE).map(batchRangeKeys -> enhancedClient.batchGetItem(b -> { - b.readBatches(batchRangeKeys.stream().map(k -> - ReadBatch.builder(tableSchema.itemType().rawClass()).mappedTableResource(table).addGetItem(Key.builder().partitionValue(hashKey).sortValue(k).build()).build() - ).collect(Collectors.toList())); - })).flatMap(r -> Flux.fromIterable(r.resultsForTable(table)).map(this::postLoad)); + Map order = new ConcurrentHashMap<>(); + AtomicInteger counter = new AtomicInteger(); + + return Flux.from(rangeKeys).buffer(BATCH_SIZE).map(batchRangeKeys -> enhancedClient.batchGetItem(b -> b.readBatches(batchRangeKeys.stream().map(k -> { + order.put(k, counter.getAndIncrement()); + return ReadBatch.builder(tableSchema.itemType().rawClass()).mappedTableResource(table).addGetItem(Key.builder().partitionValue(hashKey).sortValue(k).build()).build(); + }) + .collect(Collectors.toList())))).flatMap(r -> { + Comparator comparator = Comparator.comparingInt(i -> order.getOrDefault(tableSchema.attributeValue(i, tableSchema.tableMetadata().primarySortKey().get()), 0)); + List it = r.resultsForTable(table).stream().sorted(comparator).collect(Collectors.toList()); + return Flux.fromIterable(it).map(this::postLoad); + }); } private Map getProjectionTypes() { diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AdvancedQueryOnDeclarativeServiceTest.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AdvancedQueryOnDeclarativeServiceTest.java index e127c3768..f58929cd9 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AdvancedQueryOnDeclarativeServiceTest.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AdvancedQueryOnDeclarativeServiceTest.java @@ -25,6 +25,8 @@ import java.time.temporal.ChronoUnit; import java.util.Date; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.*; @@ -77,6 +79,12 @@ public void testJavaService() { assertEquals(4, s.deleteAllByRangeBeginsWith("1", "f")); assertEquals(0, s.findAllByRangeBeginsWith("1", "f").size()); + + List ids = IntStream.range(10_000, 11_000).mapToObj(String::valueOf).collect(Collectors.toList()); + + s.saveAll(ids.stream().map(id -> createEntity("10000", id, "foo", 1, Date.from(REFERENCE_DATE))).collect(Collectors.toList())); + + assertEquals(ids, s.getAll("10000", ids).stream().map(DynamoDBEntity::getId).collect(Collectors.toList())); } private DynamoDBEntity createLastEvaluatedKey(String parentId, String id) {