From b86ae0fc3016c17f2574a67aabcb468191d34b7c Mon Sep 17 00:00:00 2001 From: Peter Gaultney Date: Mon, 25 Jan 2021 15:00:12 -0600 Subject: [PATCH] fix transaction failure with incorrect condition expression for unchanged item --- CHANGES.md | 6 + .../write_versioned/transact/ddb_api_test.py | 50 +++++++- .../write_versioned/transact/run_test.py | 18 +++ xoto3/__about__.py | 2 +- xoto3/dynamodb/write_versioned/ddb_api.py | 111 ++++++++++-------- xoto3/dynamodb/write_versioned/run.py | 1 + 6 files changed, 138 insertions(+), 50 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6ea24e6..42031e4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +### 1.10.1 + +- Fixed bug in `versioned_transact_write_items` where un-effected + items would have an incorrect ConditionExpression generated, causing + the transaction to fail. + ## 1.10.0 - `versioned_transact_write_items` now allows the transaction builder diff --git a/tests/xoto3/dynamodb/write_versioned/transact/ddb_api_test.py b/tests/xoto3/dynamodb/write_versioned/transact/ddb_api_test.py index 0e7f5cd..df2ec6a 100644 --- a/tests/xoto3/dynamodb/write_versioned/transact/ddb_api_test.py +++ b/tests/xoto3/dynamodb/write_versioned/transact/ddb_api_test.py @@ -2,8 +2,14 @@ import pytest from botocore.exceptions import ClientError -from xoto3.dynamodb.write_versioned.ddb_api import is_cancelled_and_retryable, known_key_schema -from xoto3.dynamodb.write_versioned.errors import TableSchemaUnknownError +from xoto3.dynamodb.write_versioned import delete +from xoto3.dynamodb.write_versioned.ddb_api import ( + built_transaction_to_transact_write_items_args, + is_cancelled_and_retryable, + known_key_schema, +) +from xoto3.dynamodb.write_versioned.errors import TableSchemaUnknownError, VersionedTransaction +from xoto3.dynamodb.write_versioned.prepare import items_and_keys_to_clean_table_data def test_is_cancelled_and_retryable(): @@ -41,3 +47,43 @@ def test_key_schema_unfetchable(): known_key_schema(table) except: # noqa pass # test cannot run at all without access to DynamoDB + + +def test_built_transaction_includes_unmodified(): + + tx = VersionedTransaction( + tables=dict( + Common=items_and_keys_to_clean_table_data( + ("id",), [dict(id="unmodified")], [dict(id="delete", val=4)] + ) + ) + ) + tx = delete(tx, "Common", dict(id="delete")) + + args = built_transaction_to_transact_write_items_args(tx, "adatetimestring") + + assert { + "TransactItems": [ + { + "Delete": { + "TableName": "Common", + "Key": {"id": {"S": "delete"}}, + "ExpressionAttributeNames": { + "#itemVersion": "item_version", + "#idThatExists": "id", + }, + "ExpressionAttributeValues": {":curItemVersion": {"N": "0"}}, + "ConditionExpression": "#itemVersion = :curItemVersion OR ( attribute_not_exists(#itemVersion) AND attribute_exists(#idThatExists) )", + } + }, + { + "ConditionCheck": { + "TableName": "Common", + "Key": {"id": {"S": "unmodified"}}, + "ExpressionAttributeNames": {"#itemVersion": "item_version"}, + "ExpressionAttributeValues": {":curItemVersion": {"N": "0"}}, + "ConditionExpression": "#itemVersion = :curItemVersion OR attribute_not_exists(#itemVersion)", + } + }, + ] + } == args diff --git a/tests/xoto3/dynamodb/write_versioned/transact/run_test.py b/tests/xoto3/dynamodb/write_versioned/transact/run_test.py index 52a02cc..b16080b 100644 --- a/tests/xoto3/dynamodb/write_versioned/transact/run_test.py +++ b/tests/xoto3/dynamodb/write_versioned/transact/run_test.py @@ -254,3 +254,21 @@ def opt_delete(tx: VersionedTransaction) -> VersionedTransaction: # once for the optimistic attempt, which will fail, and a second # time for the one that succeeds once it knows what the actual # value is. + + +def test_assert_unchanged(integration_test_id_table_put, integration_test_id_table): + test_id_to_put = "versioned-transact-put-from-unchanged" + test_id_to_assert = "versioned-transact-assert-unchanged" + + integration_test_id_table_put(dict(id=test_id_to_assert, val=9)) + + def put_after_get(tx): + a = require(tx, integration_test_id_table.name, dict(id=test_id_to_assert)) + return put(tx, integration_test_id_table.name, dict(id=test_id_to_put, val=a["val"])) + + res = versioned_transact_write_items( + put_after_get, + {integration_test_id_table.name: [dict(id=test_id_to_put), dict(id=test_id_to_assert)]}, + ) + + assert 9 == require(res, integration_test_id_table.name, dict(id=test_id_to_put))["val"] diff --git a/xoto3/__about__.py b/xoto3/__about__.py index 8d165f4..43a53d0 100644 --- a/xoto3/__about__.py +++ b/xoto3/__about__.py @@ -1,4 +1,4 @@ """xoto3""" -__version__ = "1.10.0" +__version__ = "1.10.1" __author__ = "Peter Gaultney" __author_email__ = "pgaultney@xoi.io" diff --git a/xoto3/dynamodb/write_versioned/ddb_api.py b/xoto3/dynamodb/write_versioned/ddb_api.py index f756b42..d50c5e0 100644 --- a/xoto3/dynamodb/write_versioned/ddb_api.py +++ b/xoto3/dynamodb/write_versioned/ddb_api.py @@ -17,6 +17,7 @@ from .keys import hashable_key_to_key from .types import ( BatchGetItem, + HashableItemKey, ItemKeysByTableName, ItemsByTableName, TableNameOrResource, @@ -77,7 +78,6 @@ def __call__(self, RequestItems: Mapping[str, dict], **__kwargs) -> BatchGetResp def _ddb_batch_get_item( batch_get_item: Boto3BatchGetItem, item_keys_by_table_name: ItemKeysByTableName, ) -> ItemsByTableName: - # todo handle loop unprocessed_keys = { table_name: dict(Keys=item_keys, ConsistentRead=True) for table_name, item_keys in item_keys_by_table_name.items() @@ -105,7 +105,7 @@ def boto3_transact_multiple_but_optimize_single(TransactItems: List[dict], **kwa ddb_client.delete_item(**{**item_args, **kwargs}) return # we don't (yet) support single writee optimization for things other than Put or Delete - ddb_client.transact_write_items(TransactItems=TransactItems, **kwargs), + ddb_client.transact_write_items(TransactItems=TransactItems, **kwargs) return boto3_transact_multiple_but_optimize_single @@ -152,65 +152,82 @@ def built_transaction_to_transact_write_items_args( def get_existing_item(hashable_key) -> dict: return items.get(hashable_key) or dict() - modified_item_keys = set() + keys_of_items_to_be_modified = set() - for item_hashable_key, effect in effects.items(): - modified_item_keys.add(item_hashable_key) - expected_version = get_existing_item(item_hashable_key).get(item_version_attribute, 0) - item_key = hashable_key_to_key(key_attributes, item_hashable_key) - item = items.get(item_hashable_key) - expression_ensuring_unchangedness = _serialize_versioned_expr( + def put_or_delete_item(item_hashable_key: HashableItemKey, effect: Optional[Item]) -> dict: + keys_of_items_to_be_modified.add(item_hashable_key) + item = get_existing_item(item_hashable_key) + expected_version = item.get(item_version_attribute, 0) + expression_ensuring_unmodifiedness = _serialize_versioned_expr( versioned_item_expression( expected_version, item_version_key=item_version_attribute, id_that_exists=key_attributes[0] if item else "", ) ) - effect = effects.get(item_hashable_key) - if not effect: + if effect is None: # item is nil, indicating requested delete - transact_items.append( - dict( - Delete=dict( - TableName=table_name, - Key=serialize_item(dict(item_key)), - **expression_ensuring_unchangedness, - ) + return dict( + Delete=dict( + TableName=table_name, + Key=serialize_item( + dict(hashable_key_to_key(key_attributes, item_hashable_key)) + ), + **expression_ensuring_unmodifiedness, ) ) - else: - transact_items.append( - dict( - Put=dict( - TableName=table_name, - Item=serialize_item( - dict( - effect, - **{ - last_written_attribute: last_written_at_str, - item_version_attribute: expected_version + 1, - }, - ) - ), - **expression_ensuring_unchangedness, + + # put + return dict( + Put=dict( + TableName=table_name, + Item=serialize_item( + dict( + effect, + **{ + last_written_attribute: last_written_at_str, + item_version_attribute: expected_version + 1, + }, ) - ) + ), + **expression_ensuring_unmodifiedness, ) + ) - for item_hashable_key, item in items.items(): - if item_hashable_key not in modified_item_keys: - # assert that the required transaction item was not changed. - transact_items.append( - dict( - ConditionCheck=dict( - TableName=table_name, - Key=serialize_item( - dict(hashable_key_to_key(key_attributes, item_hashable_key)) - ), - **expression_ensuring_unchangedness, - ) - ) + transact_items.extend( + [ + put_or_delete_item(item_hashable_key, effect) + for item_hashable_key, effect in effects.items() + ] + ) + + def item_remains_unmodified( + item_hashable_key: HashableItemKey, item: Optional[Item] + ) -> dict: + expression_ensuring_unmodifiedness = _serialize_versioned_expr( + versioned_item_expression( + get_existing_item(item_hashable_key).get(item_version_attribute, 0), + item_version_key=item_version_attribute, + id_that_exists=key_attributes[0] if item else "", + ) + ) + return dict( + ConditionCheck=dict( + TableName=table_name, + Key=serialize_item( + dict(hashable_key_to_key(key_attributes, item_hashable_key)) + ), + **expression_ensuring_unmodifiedness, ) + ) + + transact_items.extend( + [ + item_remains_unmodified(item_hashable_key, item) + for item_hashable_key, item in items.items() + if item_hashable_key not in keys_of_items_to_be_modified + ] + ) args["TransactItems"] = transact_items return args diff --git a/xoto3/dynamodb/write_versioned/run.py b/xoto3/dynamodb/write_versioned/run.py index 1b6c35c..99c187d 100644 --- a/xoto3/dynamodb/write_versioned/run.py +++ b/xoto3/dynamodb/write_versioned/run.py @@ -128,6 +128,7 @@ def versioned_transact_write_items( # some fetches in the general case. if not is_retryable(ce): raise + logger.warning(f"Retrying attempt {i} that failed with {ce.response}") item_keys_by_table_name = all_items_for_next_attempt(built_transaction)