From 8071ac1de86035c200656c6b941933b06bd262e5 Mon Sep 17 00:00:00 2001 From: Peter Gaultney Date: Thu, 21 Jan 2021 09:41:25 -0600 Subject: [PATCH 1/2] versioned_transact_write_items now supports lazy loading of items read, and optimistic creation/deletion of items without a prefetch --- CHANGES.md | 17 +++++ .../write_versioned/transact/build_test.py | 15 ++-- .../write_versioned/transact/ddb_api_test.py | 15 +++- .../write_versioned/transact/modify_test.py | 14 ++++ .../write_versioned/transact/prepare_test.py | 10 ++- .../write_versioned/transact/run_test.py | 74 ++++++++++++++++++- xoto3/__about__.py | 2 +- xoto3/dynamodb/README.md | 61 ++++++++------- xoto3/dynamodb/utils/table.py | 5 +- xoto3/dynamodb/write_versioned/__init__.py | 6 +- xoto3/dynamodb/write_versioned/ddb_api.py | 51 +++++++++---- xoto3/dynamodb/write_versioned/errors.py | 17 ++++- xoto3/dynamodb/write_versioned/modify.py | 53 +++++++++---- xoto3/dynamodb/write_versioned/prepare.py | 45 ++++++++++- xoto3/dynamodb/write_versioned/read.py | 10 ++- xoto3/dynamodb/write_versioned/run.py | 57 +++++++++++--- 16 files changed, 353 insertions(+), 99 deletions(-) create mode 100644 tests/xoto3/dynamodb/write_versioned/transact/modify_test.py diff --git a/CHANGES.md b/CHANGES.md index 53abd35..6ea24e6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,20 @@ +## 1.10.0 + +- `versioned_transact_write_items` now allows the transaction builder + itself to lazily load items from any table via `get`/`require`, + instead of requiring all transacted items to be explicitly declared + before the builder is invoked. +- Experimental support for optimistic creations and deletions as + well - if an item being written has not been prefetched, it will be + assumed to not exist for the purposes of the condition check in the + Put or Delete within the transaction. If that optimism proves + invalid, the transaction will be retried after a refetch of the + actual item. The limitation is that we need to be able to somehow + derive a key for your item, so either you need to have prefetched a + different item from the same table, or your environment must allow + access to the DescribeTable action for the DynamoDB table, so that + we can directly check its key schema. + ## 1.9.0 - New `versioned_transact_write_items` wrapper that presents a general diff --git a/tests/xoto3/dynamodb/write_versioned/transact/build_test.py b/tests/xoto3/dynamodb/write_versioned/transact/build_test.py index a7c0702..6bba1bf 100644 --- a/tests/xoto3/dynamodb/write_versioned/transact/build_test.py +++ b/tests/xoto3/dynamodb/write_versioned/transact/build_test.py @@ -2,10 +2,7 @@ from xoto3.dynamodb.exceptions import ItemNotFoundException from xoto3.dynamodb.write_versioned import delete, get, put, require -from xoto3.dynamodb.write_versioned.errors import ( - ItemUnknownToTransactionError, - TableUnknownToTransactionError, -) +from xoto3.dynamodb.write_versioned.errors import ItemNotYetFetchedError, TableSchemaUnknownError from xoto3.dynamodb.write_versioned.keys import hashable_key, key_from_item from xoto3.dynamodb.write_versioned.prepare import items_and_keys_to_clean_table_data from xoto3.dynamodb.write_versioned.types import VersionedTransaction as VT @@ -37,10 +34,10 @@ def test_get_and_require(): with pytest.raises(ItemNotFoundException): require(vt, "table1", dict(id="no")) - with pytest.raises(TableUnknownToTransactionError): + with pytest.raises(ItemNotYetFetchedError): get(vt, "table2", dict(id="1"),) == dict(id="1", val="a") - with pytest.raises(ItemUnknownToTransactionError): + with pytest.raises(ItemNotYetFetchedError): get(vt, "table1", dict(id="3")) @@ -54,11 +51,11 @@ def test_puts_and_deletes(): ) ) - with pytest.raises(TableUnknownToTransactionError): + with pytest.raises(TableSchemaUnknownError): put(vt, "table3", dict(id="seven", val="whatever")) - with pytest.raises(ItemUnknownToTransactionError): - put(vt, "table1", dict(id="b", val="hey")) + put(vt, "table1", dict(id="b", val="hey")) + # we know the table schema for this one, so we optimistically allow the put out_vt = put(vt, "table1", dict(id="a", val=3)) assert get(out_vt, "table1", dict(id="a")) == dict(id="a", val=3) diff --git a/tests/xoto3/dynamodb/write_versioned/transact/ddb_api_test.py b/tests/xoto3/dynamodb/write_versioned/transact/ddb_api_test.py index 3135036..0e7f5cd 100644 --- a/tests/xoto3/dynamodb/write_versioned/transact/ddb_api_test.py +++ b/tests/xoto3/dynamodb/write_versioned/transact/ddb_api_test.py @@ -1,6 +1,9 @@ +import boto3 +import pytest from botocore.exceptions import ClientError -from xoto3.dynamodb.write_versioned.ddb_api import is_cancelled_and_retryable +from xoto3.dynamodb.write_versioned.ddb_api import is_cancelled_and_retryable, known_key_schema +from xoto3.dynamodb.write_versioned.errors import TableSchemaUnknownError def test_is_cancelled_and_retryable(): @@ -28,3 +31,13 @@ def test_is_cancelled_and_retryable(): "transact_write_items", ) ) + + +def test_key_schema_unfetchable(): + try: + table = boto3.resource("dynamodb").Table("thistabledoesnotexist") + + with pytest.raises(TableSchemaUnknownError): + known_key_schema(table) + except: # noqa + pass # test cannot run at all without access to DynamoDB diff --git a/tests/xoto3/dynamodb/write_versioned/transact/modify_test.py b/tests/xoto3/dynamodb/write_versioned/transact/modify_test.py new file mode 100644 index 0000000..f6827ac --- /dev/null +++ b/tests/xoto3/dynamodb/write_versioned/transact/modify_test.py @@ -0,0 +1,14 @@ +import pytest + +from xoto3.dynamodb.write_versioned import VersionedTransaction +from xoto3.dynamodb.write_versioned.modify import TableSchemaUnknownError, delete + + +def test_cant_delete_non_prefetched_item_without_specifying_key(): + + tx = delete(VersionedTransaction(dict()), "table1", dict(id="whatever")) + + tx = delete(tx, "table1", dict(id="yo", value=3, full_item=True)) + + with pytest.raises(TableSchemaUnknownError): + delete(tx, "table2", dict(id=4, value=7, other_value=9)) diff --git a/tests/xoto3/dynamodb/write_versioned/transact/prepare_test.py b/tests/xoto3/dynamodb/write_versioned/transact/prepare_test.py index 0899d9a..95117d8 100644 --- a/tests/xoto3/dynamodb/write_versioned/transact/prepare_test.py +++ b/tests/xoto3/dynamodb/write_versioned/transact/prepare_test.py @@ -1,6 +1,6 @@ import pytest -from xoto3.dynamodb.write_versioned.prepare import parse_batch_get_request +from xoto3.dynamodb.write_versioned.prepare import add_item_to_base_request, parse_batch_get_request def test_disallow_non_matching_keys(): @@ -13,3 +13,11 @@ def test_deduplicate_keys(): res = parse_batch_get_request(dict(tbl1=req)) assert res == dict(tbl1=[dict(id=1), dict(id=2)]) + + +def test_add_item(): + tname_onto_item_keys = add_item_to_base_request( + dict(table1=[dict(id=1)]), ("table2", dict(id=3)), + ) + + assert tname_onto_item_keys == dict(table1=[dict(id=1)], table2=[dict(id=3)],) diff --git a/tests/xoto3/dynamodb/write_versioned/transact/run_test.py b/tests/xoto3/dynamodb/write_versioned/transact/run_test.py index 6897e8b..52a02cc 100644 --- a/tests/xoto3/dynamodb/write_versioned/transact/run_test.py +++ b/tests/xoto3/dynamodb/write_versioned/transact/run_test.py @@ -175,10 +175,82 @@ def test_delete(tx: VersionedTransaction) -> VersionedTransaction: def test_no_op_builder(): + ran = False + def builder(tx): - assert False + nonlocal ran + ran = True + # transaction does actually run even with no items specified, + # since lazy loading is now permitted. return tx versioned_transact_write_items( builder, dict(table1=[], table2=[]), ) + + assert ran + + +def test_lazy_loading_reads_and_writes( + integration_test_id_table, integration_test_id_table_put, integration_test_id_table_cleaner +): + tname = integration_test_id_table.name + test_id_source = "versioned-transact-known-item-read" + test_id_lazy = "versioned-transact-lazy-load-read" + test_dest_id = "versioned-transact-write-using-lazy-loaded-value" + + integration_test_id_table_put(dict(id=test_id_source, val=10)) + integration_test_id_table_put(dict(id=test_id_lazy, val=9)) + integration_test_id_table_cleaner(dict(id=test_dest_id)) + + def lazy_op(tx: VersionedTransaction) -> VersionedTransaction: + src = require(tx, tname, dict(id=test_id_source)) + if src["val"] > 5: + # the if statement here is just an example of why you might want to lazy-load something. + # In our test, this statement always passes because of the fixture data. + lazy = require(tx, tname, dict(id=test_id_lazy)) + print(lazy) + dest_item = dict(id=test_dest_id, val=src["val"] + lazy["val"]) + print(dest_item) + return put(tx, tname, dest_item) + # this part of the test is just an example of what you might otherwise do. + # it's not actually ever going to run in our test. + return tx + + # note that we only specify upfront a key for the single item we know we need to prefetch + result = versioned_transact_write_items(lazy_op, {tname: [dict(id=test_id_source)]},) + + assert require(result, tname, dict(id=test_dest_id)) == dict(id=test_dest_id, val=19) + + +def test_optimistic_delete_nonexistent(integration_test_id_table): + test_id_to_delete = "versioned-transact-opt-delete" + + def opt_delete(tx: VersionedTransaction) -> VersionedTransaction: + return delete(tx, integration_test_id_table.name, dict(id=test_id_to_delete)) + + res = versioned_transact_write_items(opt_delete, dict()) + + assert None is get(res, integration_test_id_table.name, dict(id=test_id_to_delete)) + + +def test_optimistic_delete_existing(integration_test_id_table_put, integration_test_id_table): + test_id_to_delete = "versioned-transact-opt-delete-existing" + + integration_test_id_table_put(dict(id=test_id_to_delete, val=1984, item_version=4)) + + tx_run_count = 0 + + def opt_delete(tx: VersionedTransaction) -> VersionedTransaction: + nonlocal tx_run_count + tx_run_count += 1 + return delete(tx, integration_test_id_table.name, dict(id=test_id_to_delete)) + + res = versioned_transact_write_items(opt_delete, dict()) + + assert None is get(res, integration_test_id_table.name, dict(id=test_id_to_delete)) + + assert tx_run_count == 2 + # once for the optimistic attempt, which will fail, and a second + # time for the one that succeeds once it knows what the actual + # value is. diff --git a/xoto3/__about__.py b/xoto3/__about__.py index cc856f9..8d165f4 100644 --- a/xoto3/__about__.py +++ b/xoto3/__about__.py @@ -1,4 +1,4 @@ """xoto3""" -__version__ = "1.9.0" +__version__ = "1.10.0" __author__ = "Peter Gaultney" __author_email__ = "pgaultney@xoi.io" diff --git a/xoto3/dynamodb/README.md b/xoto3/dynamodb/README.md index c7ba222..d091f89 100644 --- a/xoto3/dynamodb/README.md +++ b/xoto3/dynamodb/README.md @@ -80,37 +80,36 @@ refetching, and eventually giving up to the utility. ```python import xoto3.dynamodb.write_versioned as wv -def context(*_args): - user_key = dict(id="bob") - group_key = dict(pk="team42") - - def add_user_to_new_or_existing_group(t: wv.VersionedTransaction) -> wv.VersionedTransaction: - user = wv.require(t, "User", user_key) - assert user, "require will raise if the item does not exist" - group = wv.get(t, "Group", group_key) - - if group_key not in user["groups"]: - user["groups"].append(group_key) - t = wv.put(t, "User", user) - - if group: - if user_key not in group["members"]: - group["members"].append(user_key) - else: - group = dict(group_key, members=[user_key]) - - if group != wv.get(t, "Group", group_key): - # if there was a change to the group - t = wv.put(t, "Group", group) - return t - - wv.versioned_transact_write_items( - add_user_to_new_or_existing_group, - { - "User": [user_key], - "Group": [group_key], - }, - ) +user_key = dict(id="bob") +group_key = dict(pk="team42") + +def add_user_to_new_or_existing_group(t: wv.VersionedTransaction) -> wv.VersionedTransaction: + user = wv.require(t, "User", user_key) + assert user, "require will raise ItemNotFoundException if the item does not exist" + group = wv.get(t, "Group", group_key) + + if group_key not in user["groups"]: + user["groups"].append(group_key) + t = wv.put(t, "User", user) + + if group: + if user_key not in group["members"]: + group["members"].append(user_key) + else: + group = dict(group_key, members=[user_key]) + + if group != wv.get(t, "Group", group_key): + # if there was a change to the group + t = wv.put(t, "Group", group) + return t + +wv.versioned_transact_write_items( + add_user_to_new_or_existing_group, + { + "User": [user_key], + "Group": [group_key], + }, +) ``` The above code will ensure that the 'state' of the collection of items diff --git a/xoto3/dynamodb/utils/table.py b/xoto3/dynamodb/utils/table.py index 426c481..58ad510 100644 --- a/xoto3/dynamodb/utils/table.py +++ b/xoto3/dynamodb/utils/table.py @@ -1,9 +1,10 @@ from typing import Tuple -from xoto3.dynamodb.types import TableResource, InputItem, ItemKey + +from xoto3.dynamodb.types import InputItem, ItemKey, TableResource def table_primary_keys(table: TableResource) -> Tuple[str, ...]: - return tuple([key["AttributeName"] for key in table.key_schema]) + return tuple(sorted([key["AttributeName"] for key in table.key_schema])) def extract_key_from_item(table: TableResource, item: InputItem) -> ItemKey: diff --git a/xoto3/dynamodb/write_versioned/__init__.py b/xoto3/dynamodb/write_versioned/__init__.py index 59c046b..e0f2023 100644 --- a/xoto3/dynamodb/write_versioned/__init__.py +++ b/xoto3/dynamodb/write_versioned/__init__.py @@ -1,9 +1,5 @@ """API for transact_write_items""" -from .errors import ( # noqa - ItemUnknownToTransactionError, - TableUnknownToTransactionError, - TransactionAttemptsOverrun, -) +from .errors import TableSchemaUnknownError, TransactionAttemptsOverrun # noqa from .modify import delete, put # noqa from .read import get, require # noqa from .run import versioned_transact_write_items # noqa diff --git a/xoto3/dynamodb/write_versioned/ddb_api.py b/xoto3/dynamodb/write_versioned/ddb_api.py index d512ca6..f756b42 100644 --- a/xoto3/dynamodb/write_versioned/ddb_api.py +++ b/xoto3/dynamodb/write_versioned/ddb_api.py @@ -1,6 +1,7 @@ """Private implementation details for versioned_transact_write_items""" from collections import defaultdict from functools import partial +from logging import getLogger from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, cast import boto3 @@ -10,6 +11,7 @@ from xoto3.dynamodb.types import Item from xoto3.dynamodb.update.versioned import versioned_item_expression from xoto3.dynamodb.utils.serde import serialize_item +from xoto3.dynamodb.utils.table import table_primary_keys from xoto3.errors import client_error_name from .keys import hashable_key_to_key @@ -22,6 +24,8 @@ VersionedTransaction, ) +_log = getLogger(__name__) + _RetryableTransactionCancelledErrorCodes = { "ConditionalCheckFailed", "TransactionConflict", @@ -36,6 +40,19 @@ def table_name(table: TableNameOrResource) -> str: return table +def known_key_schema(table: TableNameOrResource) -> Tuple[str, ...]: + try: + if isinstance(table, str): + table = boto3.resource("dynamodb").Table(table) + assert not isinstance(table, str) + # your environment may or may not have permissions to read the key schema of its table. + # in general, that is a nice permission to allow if possible. + return table_primary_keys(table) + except ClientError: + _log.warning("Key schema could not be fetched") + return tuple() # unknown! + + def _collect_codes(resp: dict) -> Set[str]: cc = {reason["Code"] for reason in resp["Error"].get("CancellationReasons", tuple())} return cc @@ -135,9 +152,13 @@ def built_transaction_to_transact_write_items_args( def get_existing_item(hashable_key) -> dict: return items.get(hashable_key) or dict() - for item_hashable_key, item in items.items(): + modified_item_keys = set() + + for item_hashable_key, effect in effects.items(): + modified_item_keys.add(item_hashable_key) expected_version = get_existing_item(item_hashable_key).get(item_version_attribute, 0) item_key = hashable_key_to_key(key_attributes, item_hashable_key) + item = items.get(item_hashable_key) expression_ensuring_unchangedness = _serialize_versioned_expr( versioned_item_expression( expected_version, @@ -146,18 +167,7 @@ def get_existing_item(hashable_key) -> dict: ) ) effect = effects.get(item_hashable_key) - if item_hashable_key not in effects: - # not modified, so we simply assert that it was not changed. - transact_items.append( - dict( - ConditionCheck=dict( - TableName=table_name, - Key=serialize_item(dict(item_key)), - **expression_ensuring_unchangedness, - ) - ) - ) - elif not effect: + if not effect: # item is nil, indicating requested delete transact_items.append( dict( @@ -187,5 +197,20 @@ def get_existing_item(hashable_key) -> dict: ) ) + for item_hashable_key, item in items.items(): + if item_hashable_key not in modified_item_keys: + # assert that the required transaction item was not changed. + transact_items.append( + dict( + ConditionCheck=dict( + TableName=table_name, + Key=serialize_item( + dict(hashable_key_to_key(key_attributes, item_hashable_key)) + ), + **expression_ensuring_unchangedness, + ) + ) + ) + args["TransactItems"] = transact_items return args diff --git a/xoto3/dynamodb/write_versioned/errors.py b/xoto3/dynamodb/write_versioned/errors.py index 4a435ea..82d0402 100644 --- a/xoto3/dynamodb/write_versioned/errors.py +++ b/xoto3/dynamodb/write_versioned/errors.py @@ -1,11 +1,22 @@ from xoto3.dynamodb.exceptions import DynamoDbException, DynamoDbItemException -class TableUnknownToTransactionError(KeyError): - pass +class TableSchemaUnknownError(KeyError): + """If you get one of these, it means that you've attempted a write and + we don't know how to derive the key from your item because you didn't + give us any examples of the key as part of the option to prefetch + items. + + There's really only one way to fix this - specify an item by key + upfront, so that we can derive the key schema from it. + + In the future, we may attempt to query DynamoDB itself for the key + schema for your table in order to save you this extra work, at the + cost of incurring an unexpected I/O operation partway through. + """ -class ItemUnknownToTransactionError(DynamoDbItemException): +class ItemNotYetFetchedError(DynamoDbItemException): pass diff --git a/xoto3/dynamodb/write_versioned/modify.py b/xoto3/dynamodb/write_versioned/modify.py index 4c4285d..916ca50 100644 --- a/xoto3/dynamodb/write_versioned/modify.py +++ b/xoto3/dynamodb/write_versioned/modify.py @@ -3,14 +3,15 @@ from typing import NamedTuple, Optional, Union from xoto3.dynamodb.constants import DEFAULT_ITEM_NAME -from xoto3.dynamodb.exceptions import get_item_exception_type from xoto3.dynamodb.prewrite import dynamodb_prewrite from xoto3.dynamodb.types import Item, ItemKey from xoto3.utils.tree_map import SimpleTransform +from .ddb_api import known_key_schema from .ddb_api import table_name as _table_name -from .errors import ItemUnknownToTransactionError, TableUnknownToTransactionError +from .errors import TableSchemaUnknownError from .keys import hashable_key, key_from_item +from .prepare import standard_key_attributes_from_key from .types import TableNameOrResource, VersionedTransaction, _TableData @@ -32,11 +33,44 @@ def _write( *, nicename: str = DEFAULT_ITEM_NAME, ) -> VersionedTransaction: - """Shared put/delete implementation - not meant for direct use at this time.""" + """Shared put/delete implementation - not meant for direct use at this time. + + Performs an optimistic put - if the item is not known to the + existing transaction, assumes you mean to create it if and only if + it does not exist. If the item turns out to exist already, your + transaction will be re-run, at which point a put will be interpreted as a + 'witting' choice to overwrite the known item. + """ + table_name = _table_name(table) - if table_name not in transaction.tables: - raise TableUnknownToTransactionError(table_name) - items, effects, key_attributes = transaction.tables[table_name] + + if table_name in transaction.tables: + items, effects, key_attributes = transaction.tables[table_name] + else: + # we have to have key_attributes in order to proceed with any + # effect for the given table. We're going to attempt various + # ways of getting them, starting by asking DynamoDB directly. + key_attributes = known_key_schema(table) + items = dict() + effects = dict() + if not key_attributes: + if isinstance(put_or_delete, Delete): + # hope that the user provided an actual item key + if len(put_or_delete.item_key) > 2: + raise TableSchemaUnknownError( + f"We don't know the key schema for {table_name} because you haven't defined it " + "and it is not guessable from the delete you requested." + "Specify this delete in terms of the key only and this should work fine." + ) + # at this point this is a best guess + key_attributes = standard_key_attributes_from_key(put_or_delete.item_key) + else: + # it's a put - we can't make this work at all + raise TableSchemaUnknownError( + f"We don't have enough information about table {table_name} to properly derive " + "a key from your request to put this item. " + "Prefetching this item by key would solve that problem." + ) item_or_none, item_key = ( (put_or_delete.item, key_from_item(key_attributes, put_or_delete.item)) @@ -45,13 +79,6 @@ def _write( ) hashable_item_key = hashable_key(item_key) - if hashable_item_key not in items: - # Any item to be modified by the transaction must be specified - # at the initiation of the transaction so that it can be - # prefetched. - raise get_item_exception_type(nicename, ItemUnknownToTransactionError)( - "Not specified as part of transaction", key=item_key, table_name=table_name - ) return VersionedTransaction( tables={ diff --git a/xoto3/dynamodb/write_versioned/prepare.py b/xoto3/dynamodb/write_versioned/prepare.py index 18b0bcd..013d114 100644 --- a/xoto3/dynamodb/write_versioned/prepare.py +++ b/xoto3/dynamodb/write_versioned/prepare.py @@ -2,8 +2,8 @@ from xoto3.dynamodb.types import Item, ItemKey, KeyAttributeType -from .keys import hashable_key, key_from_item -from .types import VersionedTransaction, _TableData +from .keys import hashable_key, hashable_key_to_key, key_from_item +from .types import HashableItemKey, VersionedTransaction, _TableData def _deduplicate_and_validate_keys(keys: Collection[ItemKey]) -> Iterable[ItemKey]: @@ -56,9 +56,13 @@ def items_and_keys_to_clean_table_data( ) +def standard_key_attributes_from_key(item_key: ItemKey) -> Tuple[str, ...]: + return tuple(sorted(item_key.keys())) + + def _extract_key_attributes(item_keys: Sequence[ItemKey]) -> Tuple[str, ...]: assert item_keys, "You can't extract key_attributes without at least one item_key." - return tuple(sorted(item_keys[0].keys())) + return standard_key_attributes_from_key(item_keys[0]) D = TypeVar("D", bound=dict) @@ -84,3 +88,38 @@ def prepare_clean_transaction( } ) ) + + +def add_item_to_base_request( + table_name_onto_item_keys: Mapping[str, Collection[ItemKey]], + table_name_and_item_key: Tuple[str, ItemKey], +) -> Mapping[str, Collection[ItemKey]]: + table_name, item_key = table_name_and_item_key + if table_name not in table_name_onto_item_keys: + return {**table_name_onto_item_keys, table_name: [item_key]} + return { + **table_name_onto_item_keys, + table_name: list(table_name_onto_item_keys[table_name]) + [item_key], + } + + +def all_items_for_next_attempt( + failed_transaction: VersionedTransaction, +) -> Dict[str, List[ItemKey]]: + + table_name_onto_hashable_keys: Dict[str, Set[HashableItemKey]] = { + table_name: set() for table_name in failed_transaction.tables + } + for (table_name, table_data,) in failed_transaction.tables.items(): + items, effects, key_attributes = table_data + for hashable_item_key in items.keys(): + table_name_onto_hashable_keys[table_name].add(hashable_item_key) + for hashable_item_key in effects.keys(): + table_name_onto_hashable_keys[table_name].add(hashable_item_key) + + return { + table_name: [ + hashable_key_to_key(key_attributes, hashable_key) for hashable_key in hashable_keys + ] + for table_name, hashable_keys in table_name_onto_hashable_keys.items() + } diff --git a/xoto3/dynamodb/write_versioned/read.py b/xoto3/dynamodb/write_versioned/read.py index 92abee6..f9bcad3 100644 --- a/xoto3/dynamodb/write_versioned/read.py +++ b/xoto3/dynamodb/write_versioned/read.py @@ -8,7 +8,7 @@ from xoto3.dynamodb.types import Item, ItemKey from .ddb_api import table_name as _table_name -from .errors import ItemUnknownToTransactionError, TableUnknownToTransactionError +from .errors import ItemNotYetFetchedError from .keys import hashable_key from .types import TableNameOrResource, VersionedTransaction @@ -36,8 +36,9 @@ def get( directly. Caveat emptor... """ table_name = _table_name(table) + item_exc_type = get_item_exception_type(nicename, ItemNotYetFetchedError) if table_name not in transaction.tables: - raise TableUnknownToTransactionError(table_name) + raise item_exc_type("Table new to transaction", key=item_key, table_name=table_name) items, effects, _ = transaction.tables[table_name] item_hashable_key = hashable_key(item_key) @@ -46,9 +47,10 @@ def get( if item_hashable_key in effects: return xf_result(effects[item_hashable_key]) if item_hashable_key not in items: - raise get_item_exception_type(nicename, ItemUnknownToTransactionError)( - f"{nicename} not present in transaction", key=item_key, table_name=table_name + raise item_exc_type( + f"{nicename} not yet present in transaction", key=item_key, table_name=table_name ) + return xf_result(items[item_hashable_key]) diff --git a/xoto3/dynamodb/write_versioned/run.py b/xoto3/dynamodb/write_versioned/run.py index b1f9cd3..59096bb 100644 --- a/xoto3/dynamodb/write_versioned/run.py +++ b/xoto3/dynamodb/write_versioned/run.py @@ -1,7 +1,7 @@ """The core run loop for a transaction""" from datetime import datetime from logging import getLogger -from typing import Callable, Collection, Iterable, Mapping, Optional +from typing import Callable, Collection, Iterable, Mapping, Optional, Tuple from botocore.exceptions import ClientError @@ -13,8 +13,13 @@ built_transaction_to_transact_write_items_args, is_cancelled_and_retryable, ) -from .errors import TransactionAttemptsOverrun -from .prepare import parse_batch_get_request, prepare_clean_transaction +from .errors import ItemNotYetFetchedError, TransactionAttemptsOverrun +from .prepare import ( + add_item_to_base_request, + all_items_for_next_attempt, + parse_batch_get_request, + prepare_clean_transaction, +) from .retry import timed_retry from .types import BatchGetItem, TransactionBuilder, TransactWriteItems, VersionedTransaction @@ -25,9 +30,38 @@ def _is_empty(transaction: VersionedTransaction) -> bool: return not sum(len(effects) for _, effects, *_ in transaction.tables.values()) -def versioned_transact_write_items( +def _lazy_loading_transaction_builder( transaction_builder: TransactionBuilder, item_keys_by_table_name: Mapping[str, Collection[ItemKey]], + batch_get_item: BatchGetItem, +) -> Tuple[VersionedTransaction, VersionedTransaction]: + """If your transaction attempts to get/require an item that was not + prefetched, we will stop, fetch it, and then retry your + transaction from the beginning. + + Remember to make your transaction a pure function, or at least make + sure all of its side effects are effectively idempotent! + """ + while True: + item_keys_by_table_name = parse_batch_get_request(item_keys_by_table_name) + clean_transaction = prepare_clean_transaction( + item_keys_by_table_name, batch_get_item(item_keys_by_table_name), + ) + try: + built_transaction = transaction_builder(clean_transaction) + # the built transaction will always contain a superset of the keys + # present in the clean transaction. + return clean_transaction, built_transaction + except ItemNotYetFetchedError as item_error: + assert item_error.key + item_keys_by_table_name = add_item_to_base_request( + item_keys_by_table_name, (item_error.table_name, item_error.key), + ) + + +def versioned_transact_write_items( + transaction_builder: TransactionBuilder, + item_keys_by_table_name: Mapping[str, Collection[ItemKey]] = dict(), *, batch_get_item: Optional[BatchGetItem] = None, transact_write_items: Optional[TransactWriteItems] = None, @@ -69,15 +103,9 @@ def versioned_transact_write_items( ) for i, _ in enumerate(attempts_iterator or timed_retry()): - clean_transaction = prepare_clean_transaction( - parse_batch_get_request(item_keys_by_table_name), - batch_get_item(item_keys_by_table_name), + clean_transaction, built_transaction = _lazy_loading_transaction_builder( + transaction_builder, item_keys_by_table_name, batch_get_item, ) - if not clean_transaction.tables: - return clean_transaction - - built_transaction = transaction_builder(clean_transaction) - if built_transaction is clean_transaction or _is_empty(built_transaction): logger.info("No effects were defined, so the existing items will be returned as-is.") return clean_transaction @@ -94,7 +122,12 @@ def versioned_transact_write_items( ) return built_transaction except ClientError as ce: + # TODO in the future, have is_retryable determine which + # specific items need to be refetched. this would save us + # some fetches in the general case. if not is_retryable(ce): raise + item_keys_by_table_name = all_items_for_next_attempt(built_transaction) + raise TransactionAttemptsOverrun(f"Failed after {i + 1} attempts") From abdd28fed8f4061fbd0c87672b8941601cef1c7d Mon Sep 17 00:00:00 2001 From: Peter Gaultney Date: Fri, 22 Jan 2021 10:26:43 -0600 Subject: [PATCH 2/2] improve comments and readme formatting; add failed transaction to TransactionAttemptsOverrun exception --- xoto3/dynamodb/README.md | 40 ++++++++++++------------ xoto3/dynamodb/write_versioned/errors.py | 37 +++++++++++++++------- xoto3/dynamodb/write_versioned/run.py | 4 ++- 3 files changed, 48 insertions(+), 33 deletions(-) diff --git a/xoto3/dynamodb/README.md b/xoto3/dynamodb/README.md index d091f89..5b71992 100644 --- a/xoto3/dynamodb/README.md +++ b/xoto3/dynamodb/README.md @@ -84,31 +84,31 @@ user_key = dict(id="bob") group_key = dict(pk="team42") def add_user_to_new_or_existing_group(t: wv.VersionedTransaction) -> wv.VersionedTransaction: - user = wv.require(t, "User", user_key) - assert user, "require will raise ItemNotFoundException if the item does not exist" - group = wv.get(t, "Group", group_key) + user = wv.require(t, "User", user_key) + assert user, "require will raise ItemNotFoundException if the item does not exist" + group = wv.get(t, "Group", group_key) - if group_key not in user["groups"]: - user["groups"].append(group_key) - t = wv.put(t, "User", user) + if group_key not in user["groups"]: + user["groups"].append(group_key) + t = wv.put(t, "User", user) - if group: - if user_key not in group["members"]: - group["members"].append(user_key) - else: - group = dict(group_key, members=[user_key]) + if group: + if user_key not in group["members"]: + group["members"].append(user_key) + else: + group = dict(group_key, members=[user_key]) - if group != wv.get(t, "Group", group_key): - # if there was a change to the group - t = wv.put(t, "Group", group) - return t + if group != wv.get(t, "Group", group_key): + # if there was a change to the group + t = wv.put(t, "Group", group) + return t wv.versioned_transact_write_items( - add_user_to_new_or_existing_group, - { - "User": [user_key], - "Group": [group_key], - }, + add_user_to_new_or_existing_group, + { + "User": [user_key], + "Group": [group_key], + }, ) ``` diff --git a/xoto3/dynamodb/write_versioned/errors.py b/xoto3/dynamodb/write_versioned/errors.py index 82d0402..6f099bf 100644 --- a/xoto3/dynamodb/write_versioned/errors.py +++ b/xoto3/dynamodb/write_versioned/errors.py @@ -1,24 +1,37 @@ from xoto3.dynamodb.exceptions import DynamoDbException, DynamoDbItemException +from .types import VersionedTransaction + class TableSchemaUnknownError(KeyError): """If you get one of these, it means that you've attempted a write and - we don't know how to derive the key from your item because you didn't - give us any examples of the key as part of the option to prefetch - items. - - There's really only one way to fix this - specify an item by key - upfront, so that we can derive the key schema from it. - - In the future, we may attempt to query DynamoDB itself for the key - schema for your table in order to save you this extra work, at the - cost of incurring an unexpected I/O operation partway through. + we don't know how to derive the key from your item because you + didn't give us any examples of the key as part of the option to + prefetch items. + + There's really only one way to be certain to avoid this - specify + an item to be prefetched by its key, so that we can assume the key + schema based on it. + + Before this is raises, we may attempt to query DynamoDB itself for + the key schema for your table in order to save you this extra + work, at the cost of incurring an unexpected I/O operation partway + through. However, depending on your IAM permissions, it may not be + possible for us to accomplish this, in which case you will receive + this runtime error. """ class ItemNotYetFetchedError(DynamoDbItemException): - pass + """Used internally to send control flow and information back to the + transaction runner to prompt a lazy load of an item not already + fetched. + """ class TransactionAttemptsOverrun(DynamoDbException): - pass + """The end failure state in a system where there is high contention for resources.""" + + def __init__(self, msg, failed_transaction: VersionedTransaction): + super().__init__(msg) + self.failed_transaction = failed_transaction diff --git a/xoto3/dynamodb/write_versioned/run.py b/xoto3/dynamodb/write_versioned/run.py index 59096bb..1b6c35c 100644 --- a/xoto3/dynamodb/write_versioned/run.py +++ b/xoto3/dynamodb/write_versioned/run.py @@ -102,6 +102,7 @@ def versioned_transact_write_items( batch_get_item, transact_write_items, ) + built_transaction = None for i, _ in enumerate(attempts_iterator or timed_retry()): clean_transaction, built_transaction = _lazy_loading_transaction_builder( transaction_builder, item_keys_by_table_name, batch_get_item, @@ -130,4 +131,5 @@ def versioned_transact_write_items( item_keys_by_table_name = all_items_for_next_attempt(built_transaction) - raise TransactionAttemptsOverrun(f"Failed after {i + 1} attempts") + assert built_transaction, "No attempt was made to run the transaction" + raise TransactionAttemptsOverrun(f"Failed after {i + 1} attempts", built_transaction)