Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB Enhanced Client Versioned Record start at 0 #5565

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Amazon DyanmoDB Enhanced Client",
"contributor": "kiesler",
"type": "feature",
"description": "DynamoDB Enhanced Client Versioned Record can start at 0"
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,15 @@ public int hashCode() {
return result;
}

@Override
public String toString() {
return "Expression{" +
"expression='" + expression + '\'' +
", expressionValues=" + expressionValues +
", expressionNames=" + expressionNames +
'}';
}

/**
* A builder for {@link Expression}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTag;
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableMetadata;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.utils.Pair;

/**
* This extension implements optimistic locking on record writes by means of a 'record version number' that is used
Expand All @@ -60,8 +61,20 @@ public final class VersionedRecordExtension implements DynamoDbEnhancedClientExt
private static final Function<String, String> VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER = key -> ":old_" + key + "_value";
private static final String CUSTOM_METADATA_KEY = "VersionedRecordExtension:VersionAttribute";
private static final VersionAttribute VERSION_ATTRIBUTE = new VersionAttribute();

private VersionedRecordExtension() {
private static final AttributeValue DEFAULT_VALUE = AttributeValue.fromNul(Boolean.TRUE);

private final int startingValue;
private final int increment;

/**
* Creates a new {@link VersionedRecordExtension} using the supplied starting and incrementing value.
*
* @param startingValue the value used to compare if a record is the initial version of a record.
* @param increment the amount to increment the version by with each subsequent update.
*/
private VersionedRecordExtension(int startingValue, int increment) {
this.startingValue = startingValue;
this.increment = increment;
}

public static Builder builder() {
Expand Down Expand Up @@ -101,39 +114,13 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex
return WriteModification.builder().build();
}

Map<String, AttributeValue> itemToTransform = new HashMap<>(context.items());
Pair<AttributeValue, Expression> updates = getRecordUpdates(versionAttributeKey.get(), context.items());

String attributeKeyRef = keyRef(versionAttributeKey.get());
AttributeValue newVersionValue;
Expression condition;
Optional<AttributeValue> existingVersionValue =
Optional.ofNullable(itemToTransform.get(versionAttributeKey.get()));

if (!existingVersionValue.isPresent() || isNullAttributeValue(existingVersionValue.get())) {
// First version of the record
newVersionValue = AttributeValue.builder().n("1").build();
condition = Expression.builder()
.expression(String.format("attribute_not_exists(%s)", attributeKeyRef))
.expressionNames(Collections.singletonMap(attributeKeyRef, versionAttributeKey.get()))
.build();
} else {
// Existing record, increment version
if (existingVersionValue.get().n() == null) {
// In this case a non-null version attribute is present, but it's not an N
throw new IllegalArgumentException("Version attribute appears to be the wrong type. N is required.");
}

int existingVersion = Integer.parseInt(existingVersionValue.get().n());
String existingVersionValueKey = VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER.apply(versionAttributeKey.get());
newVersionValue = AttributeValue.builder().n(Integer.toString(existingVersion + 1)).build();
condition = Expression.builder()
.expression(String.format("%s = %s", attributeKeyRef, existingVersionValueKey))
.expressionNames(Collections.singletonMap(attributeKeyRef, versionAttributeKey.get()))
.expressionValues(Collections.singletonMap(existingVersionValueKey,
existingVersionValue.get()))
.build();
}
// Unpack values from Pair
AttributeValue newVersionValue = updates.left();
Expression condition = updates.right();

Map<String, AttributeValue> itemToTransform = new HashMap<>(context.items());
itemToTransform.put(versionAttributeKey.get(), newVersionValue);

return WriteModification.builder()
Expand All @@ -142,13 +129,104 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex
.build();
}

private Pair<AttributeValue, Expression> getRecordUpdates(String versionAttributeKey,
Map<String, AttributeValue> itemToTransform) {
// Default to NUL if not present to reduce additional checks further along
AttributeValue existingVersionValue = itemToTransform.getOrDefault(versionAttributeKey, DEFAULT_VALUE);

if (isInitialVersion(existingVersionValue)) {
// First version of the record ensure it does not exist
return createInitialRecord(versionAttributeKey);
}
// Existing record, increment version
return updateExistingRecord(versionAttributeKey, existingVersionValue);
}

private boolean isInitialVersion(AttributeValue existingVersionValue) {
return isNullAttributeValue(existingVersionValue)
|| getExistingVersion(existingVersionValue) == this.startingValue;
}

private Pair<AttributeValue, Expression> createInitialRecord(String versionAttributeKey) {
AttributeValue newVersionValue = incrementVersion(this.startingValue);

String attributeKeyRef = keyRef(versionAttributeKey);

Expression condition = Expression.builder()
// Check that the version does not exist before setting the initial value.
.expression(String.format("attribute_not_exists(%s)", attributeKeyRef))
.expressionNames(Collections.singletonMap(attributeKeyRef, versionAttributeKey))
.build();

return Pair.of(newVersionValue, condition);
}

private Pair<AttributeValue, Expression> updateExistingRecord(String versionAttributeKey,
AttributeValue existingVersionValue) {
int existingVersion = getExistingVersion(existingVersionValue);
AttributeValue newVersionValue = incrementVersion(existingVersion);

String attributeKeyRef = keyRef(versionAttributeKey);
String existingVersionValueKey = VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER.apply(versionAttributeKey);

Expression condition = Expression.builder()
// Check that the version matches the existing value before setting the updated value.
.expression(String.format("%s = %s", attributeKeyRef, existingVersionValueKey))
.expressionNames(Collections.singletonMap(attributeKeyRef, versionAttributeKey))
.expressionValues(Collections.singletonMap(existingVersionValueKey,
existingVersionValue))
.build();

return Pair.of(newVersionValue, condition);
}

private int getExistingVersion(AttributeValue existingVersionValue) {
if (existingVersionValue.n() == null) {
// In this case a non-null version attribute is present, but it's not an N
throw new IllegalArgumentException("Version attribute appears to be the wrong type. N is required.");
}

return Integer.parseInt(existingVersionValue.n());
}

private AttributeValue incrementVersion(int version) {
return AttributeValue.fromN(Integer.toString(version + this.increment));
}

@NotThreadSafe
public static final class Builder {
private int startingValue = 0;
private int increment = 1;

private Builder() {
}

/**
* Sets the startingValue used to compare if a record is the initial version of a record.
* Default value - {@code 0}.
*
* @param startingValue
* @return the builder instance
*/
public Builder startAt(int startingValue) {
this.startingValue = startingValue;
return this;
}

/**
* Sets the amount to increment the version by with each subsequent update.
* Default value - {@code 1}.
*
* @param increment
* @return the builder instance
*/
public Builder incrementBy(int increment) {
this.increment = increment;
return this;
}

public VersionedRecordExtension build() {
return new VersionedRecordExtension();
return new VersionedRecordExtension(this.startingValue, this.increment);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,33 @@ public void beforeWrite_initialVersionDueToExplicitNull_transformedItemIsCorrect
assertThat(result.transformedItem(), is(fakeItemWithInitialVersion));
}

@Test
public void beforeWrite_initialVersionDueToExplicitZero_expressionAndTransformedItemIsCorrect() {
FakeItem fakeItem = createUniqueFakeItem();

Map<String, AttributeValue> inputMap =
new HashMap<>(FakeItem.getTableSchema().itemToMap(fakeItem, true));
inputMap.put("version", AttributeValue.builder().n("0").build());

Map<String, AttributeValue> fakeItemWithInitialVersion =
new HashMap<>(FakeItem.getTableSchema().itemToMap(fakeItem, true));
fakeItemWithInitialVersion.put("version", AttributeValue.builder().n("1").build());

WriteModification result =
versionedRecordExtension.beforeWrite(DefaultDynamoDbExtensionContext
.builder()
.items(inputMap)
.tableMetadata(FakeItem.getTableMetadata())
.operationContext(PRIMARY_CONTEXT).build());

assertThat(result.transformedItem(), is(fakeItemWithInitialVersion));
assertThat(result.additionalConditionalExpression(),
is(Expression.builder()
.expression("attribute_not_exists(#AMZN_MAPPED_version)")
.expressionNames(singletonMap("#AMZN_MAPPED_version", "version"))
.build()));
}

@Test
public void beforeWrite_existingVersion_expressionIsCorrect() {
FakeItem fakeItem = createUniqueFakeItem();
Expand Down