Skip to content

Commit

Permalink
Merge pull request #11 from xoeye/bugfix/unchanged-item-in-transaction
Browse files Browse the repository at this point in the history
fix transaction failure with incorrect condition expression for unchanged item
  • Loading branch information
Peter Gaultney authored Jan 25, 2021
2 parents ba4ef19 + b86ae0f commit 3af81cf
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 50 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### 1.10.1

- Fixed bug in `versioned_transact_write_items` where un-effected
items would have an incorrect ConditionExpression generated, causing
the transaction to fail.

## 1.10.0

- `versioned_transact_write_items` now allows the transaction builder
Expand Down
50 changes: 48 additions & 2 deletions tests/xoto3/dynamodb/write_versioned/transact/ddb_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
import pytest
from botocore.exceptions import ClientError

from xoto3.dynamodb.write_versioned.ddb_api import is_cancelled_and_retryable, known_key_schema
from xoto3.dynamodb.write_versioned.errors import TableSchemaUnknownError
from xoto3.dynamodb.write_versioned import delete
from xoto3.dynamodb.write_versioned.ddb_api import (
built_transaction_to_transact_write_items_args,
is_cancelled_and_retryable,
known_key_schema,
)
from xoto3.dynamodb.write_versioned.errors import TableSchemaUnknownError, VersionedTransaction
from xoto3.dynamodb.write_versioned.prepare import items_and_keys_to_clean_table_data


def test_is_cancelled_and_retryable():
Expand Down Expand Up @@ -41,3 +47,43 @@ def test_key_schema_unfetchable():
known_key_schema(table)
except: # noqa
pass # test cannot run at all without access to DynamoDB


def test_built_transaction_includes_unmodified():

tx = VersionedTransaction(
tables=dict(
Common=items_and_keys_to_clean_table_data(
("id",), [dict(id="unmodified")], [dict(id="delete", val=4)]
)
)
)
tx = delete(tx, "Common", dict(id="delete"))

args = built_transaction_to_transact_write_items_args(tx, "adatetimestring")

assert {
"TransactItems": [
{
"Delete": {
"TableName": "Common",
"Key": {"id": {"S": "delete"}},
"ExpressionAttributeNames": {
"#itemVersion": "item_version",
"#idThatExists": "id",
},
"ExpressionAttributeValues": {":curItemVersion": {"N": "0"}},
"ConditionExpression": "#itemVersion = :curItemVersion OR ( attribute_not_exists(#itemVersion) AND attribute_exists(#idThatExists) )",
}
},
{
"ConditionCheck": {
"TableName": "Common",
"Key": {"id": {"S": "unmodified"}},
"ExpressionAttributeNames": {"#itemVersion": "item_version"},
"ExpressionAttributeValues": {":curItemVersion": {"N": "0"}},
"ConditionExpression": "#itemVersion = :curItemVersion OR attribute_not_exists(#itemVersion)",
}
},
]
} == args
18 changes: 18 additions & 0 deletions tests/xoto3/dynamodb/write_versioned/transact/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,21 @@ def opt_delete(tx: VersionedTransaction) -> VersionedTransaction:
# once for the optimistic attempt, which will fail, and a second
# time for the one that succeeds once it knows what the actual
# value is.


def test_assert_unchanged(integration_test_id_table_put, integration_test_id_table):
test_id_to_put = "versioned-transact-put-from-unchanged"
test_id_to_assert = "versioned-transact-assert-unchanged"

integration_test_id_table_put(dict(id=test_id_to_assert, val=9))

def put_after_get(tx):
a = require(tx, integration_test_id_table.name, dict(id=test_id_to_assert))
return put(tx, integration_test_id_table.name, dict(id=test_id_to_put, val=a["val"]))

res = versioned_transact_write_items(
put_after_get,
{integration_test_id_table.name: [dict(id=test_id_to_put), dict(id=test_id_to_assert)]},
)

assert 9 == require(res, integration_test_id_table.name, dict(id=test_id_to_put))["val"]
2 changes: 1 addition & 1 deletion xoto3/__about__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""xoto3"""
__version__ = "1.10.0"
__version__ = "1.10.1"
__author__ = "Peter Gaultney"
__author_email__ = "[email protected]"
111 changes: 64 additions & 47 deletions xoto3/dynamodb/write_versioned/ddb_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .keys import hashable_key_to_key
from .types import (
BatchGetItem,
HashableItemKey,
ItemKeysByTableName,
ItemsByTableName,
TableNameOrResource,
Expand Down Expand Up @@ -77,7 +78,6 @@ def __call__(self, RequestItems: Mapping[str, dict], **__kwargs) -> BatchGetResp
def _ddb_batch_get_item(
batch_get_item: Boto3BatchGetItem, item_keys_by_table_name: ItemKeysByTableName,
) -> ItemsByTableName:
# todo handle loop
unprocessed_keys = {
table_name: dict(Keys=item_keys, ConsistentRead=True)
for table_name, item_keys in item_keys_by_table_name.items()
Expand Down Expand Up @@ -105,7 +105,7 @@ def boto3_transact_multiple_but_optimize_single(TransactItems: List[dict], **kwa
ddb_client.delete_item(**{**item_args, **kwargs})
return
# we don't (yet) support single writee optimization for things other than Put or Delete
ddb_client.transact_write_items(TransactItems=TransactItems, **kwargs),
ddb_client.transact_write_items(TransactItems=TransactItems, **kwargs)

return boto3_transact_multiple_but_optimize_single

Expand Down Expand Up @@ -152,65 +152,82 @@ def built_transaction_to_transact_write_items_args(
def get_existing_item(hashable_key) -> dict:
return items.get(hashable_key) or dict()

modified_item_keys = set()
keys_of_items_to_be_modified = set()

for item_hashable_key, effect in effects.items():
modified_item_keys.add(item_hashable_key)
expected_version = get_existing_item(item_hashable_key).get(item_version_attribute, 0)
item_key = hashable_key_to_key(key_attributes, item_hashable_key)
item = items.get(item_hashable_key)
expression_ensuring_unchangedness = _serialize_versioned_expr(
def put_or_delete_item(item_hashable_key: HashableItemKey, effect: Optional[Item]) -> dict:
keys_of_items_to_be_modified.add(item_hashable_key)
item = get_existing_item(item_hashable_key)
expected_version = item.get(item_version_attribute, 0)
expression_ensuring_unmodifiedness = _serialize_versioned_expr(
versioned_item_expression(
expected_version,
item_version_key=item_version_attribute,
id_that_exists=key_attributes[0] if item else "",
)
)
effect = effects.get(item_hashable_key)
if not effect:
if effect is None:
# item is nil, indicating requested delete
transact_items.append(
dict(
Delete=dict(
TableName=table_name,
Key=serialize_item(dict(item_key)),
**expression_ensuring_unchangedness,
)
return dict(
Delete=dict(
TableName=table_name,
Key=serialize_item(
dict(hashable_key_to_key(key_attributes, item_hashable_key))
),
**expression_ensuring_unmodifiedness,
)
)
else:
transact_items.append(
dict(
Put=dict(
TableName=table_name,
Item=serialize_item(
dict(
effect,
**{
last_written_attribute: last_written_at_str,
item_version_attribute: expected_version + 1,
},
)
),
**expression_ensuring_unchangedness,

# put
return dict(
Put=dict(
TableName=table_name,
Item=serialize_item(
dict(
effect,
**{
last_written_attribute: last_written_at_str,
item_version_attribute: expected_version + 1,
},
)
)
),
**expression_ensuring_unmodifiedness,
)
)

for item_hashable_key, item in items.items():
if item_hashable_key not in modified_item_keys:
# assert that the required transaction item was not changed.
transact_items.append(
dict(
ConditionCheck=dict(
TableName=table_name,
Key=serialize_item(
dict(hashable_key_to_key(key_attributes, item_hashable_key))
),
**expression_ensuring_unchangedness,
)
)
transact_items.extend(
[
put_or_delete_item(item_hashable_key, effect)
for item_hashable_key, effect in effects.items()
]
)

def item_remains_unmodified(
item_hashable_key: HashableItemKey, item: Optional[Item]
) -> dict:
expression_ensuring_unmodifiedness = _serialize_versioned_expr(
versioned_item_expression(
get_existing_item(item_hashable_key).get(item_version_attribute, 0),
item_version_key=item_version_attribute,
id_that_exists=key_attributes[0] if item else "",
)
)
return dict(
ConditionCheck=dict(
TableName=table_name,
Key=serialize_item(
dict(hashable_key_to_key(key_attributes, item_hashable_key))
),
**expression_ensuring_unmodifiedness,
)
)

transact_items.extend(
[
item_remains_unmodified(item_hashable_key, item)
for item_hashable_key, item in items.items()
if item_hashable_key not in keys_of_items_to_be_modified
]
)

args["TransactItems"] = transact_items
return args
1 change: 1 addition & 0 deletions xoto3/dynamodb/write_versioned/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def versioned_transact_write_items(
# some fetches in the general case.
if not is_retryable(ce):
raise
logger.warning(f"Retrying attempt {i} that failed with {ce.response}")

item_keys_by_table_name = all_items_for_next_attempt(built_transaction)

Expand Down

0 comments on commit 3af81cf

Please sign in to comment.