Skip to content

Commit

Permalink
Merge pull request #10 from xoeye/feature/allow-optimistic-writes-and…
Browse files Browse the repository at this point in the history
…-lazy-loaded-reads-in-transactions

versioned_transact_write_items now supports lazy loading of items read
  • Loading branch information
Peter Gaultney authored Jan 22, 2021
2 parents 3d226c5 + abdd28f commit ba4ef19
Show file tree
Hide file tree
Showing 16 changed files with 371 additions and 102 deletions.
17 changes: 17 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
## 1.10.0

- `versioned_transact_write_items` now allows the transaction builder
itself to lazily load items from any table via `get`/`require`,
instead of requiring all transacted items to be explicitly declared
before the builder is invoked.
- Experimental support for optimistic creations and deletions as
well - if an item being written has not been prefetched, it will be
assumed to not exist for the purposes of the condition check in the
Put or Delete within the transaction. If that optimism proves
invalid, the transaction will be retried after a refetch of the
actual item. The limitation is that we need to be able to somehow
derive a key for your item, so either you need to have prefetched a
different item from the same table, or your environment must allow
access to the DescribeTable action for the DynamoDB table, so that
we can directly check its key schema.

## 1.9.0

- New `versioned_transact_write_items` wrapper that presents a general
Expand Down
15 changes: 6 additions & 9 deletions tests/xoto3/dynamodb/write_versioned/transact/build_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

from xoto3.dynamodb.exceptions import ItemNotFoundException
from xoto3.dynamodb.write_versioned import delete, get, put, require
from xoto3.dynamodb.write_versioned.errors import (
ItemUnknownToTransactionError,
TableUnknownToTransactionError,
)
from xoto3.dynamodb.write_versioned.errors import ItemNotYetFetchedError, TableSchemaUnknownError
from xoto3.dynamodb.write_versioned.keys import hashable_key, key_from_item
from xoto3.dynamodb.write_versioned.prepare import items_and_keys_to_clean_table_data
from xoto3.dynamodb.write_versioned.types import VersionedTransaction as VT
Expand Down Expand Up @@ -37,10 +34,10 @@ def test_get_and_require():
with pytest.raises(ItemNotFoundException):
require(vt, "table1", dict(id="no"))

with pytest.raises(TableUnknownToTransactionError):
with pytest.raises(ItemNotYetFetchedError):
get(vt, "table2", dict(id="1"),) == dict(id="1", val="a")

with pytest.raises(ItemUnknownToTransactionError):
with pytest.raises(ItemNotYetFetchedError):
get(vt, "table1", dict(id="3"))


Expand All @@ -54,11 +51,11 @@ def test_puts_and_deletes():
)
)

with pytest.raises(TableUnknownToTransactionError):
with pytest.raises(TableSchemaUnknownError):
put(vt, "table3", dict(id="seven", val="whatever"))

with pytest.raises(ItemUnknownToTransactionError):
put(vt, "table1", dict(id="b", val="hey"))
put(vt, "table1", dict(id="b", val="hey"))
# we know the table schema for this one, so we optimistically allow the put

out_vt = put(vt, "table1", dict(id="a", val=3))
assert get(out_vt, "table1", dict(id="a")) == dict(id="a", val=3)
Expand Down
15 changes: 14 additions & 1 deletion tests/xoto3/dynamodb/write_versioned/transact/ddb_api_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import boto3
import pytest
from botocore.exceptions import ClientError

from xoto3.dynamodb.write_versioned.ddb_api import is_cancelled_and_retryable
from xoto3.dynamodb.write_versioned.ddb_api import is_cancelled_and_retryable, known_key_schema
from xoto3.dynamodb.write_versioned.errors import TableSchemaUnknownError


def test_is_cancelled_and_retryable():
Expand Down Expand Up @@ -28,3 +31,13 @@ def test_is_cancelled_and_retryable():
"transact_write_items",
)
)


def test_key_schema_unfetchable():
try:
table = boto3.resource("dynamodb").Table("thistabledoesnotexist")

with pytest.raises(TableSchemaUnknownError):
known_key_schema(table)
except: # noqa
pass # test cannot run at all without access to DynamoDB
14 changes: 14 additions & 0 deletions tests/xoto3/dynamodb/write_versioned/transact/modify_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pytest

from xoto3.dynamodb.write_versioned import VersionedTransaction
from xoto3.dynamodb.write_versioned.modify import TableSchemaUnknownError, delete


def test_cant_delete_non_prefetched_item_without_specifying_key():

tx = delete(VersionedTransaction(dict()), "table1", dict(id="whatever"))

tx = delete(tx, "table1", dict(id="yo", value=3, full_item=True))

with pytest.raises(TableSchemaUnknownError):
delete(tx, "table2", dict(id=4, value=7, other_value=9))
10 changes: 9 additions & 1 deletion tests/xoto3/dynamodb/write_versioned/transact/prepare_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from xoto3.dynamodb.write_versioned.prepare import parse_batch_get_request
from xoto3.dynamodb.write_versioned.prepare import add_item_to_base_request, parse_batch_get_request


def test_disallow_non_matching_keys():
Expand All @@ -13,3 +13,11 @@ def test_deduplicate_keys():

res = parse_batch_get_request(dict(tbl1=req))
assert res == dict(tbl1=[dict(id=1), dict(id=2)])


def test_add_item():
tname_onto_item_keys = add_item_to_base_request(
dict(table1=[dict(id=1)]), ("table2", dict(id=3)),
)

assert tname_onto_item_keys == dict(table1=[dict(id=1)], table2=[dict(id=3)],)
74 changes: 73 additions & 1 deletion tests/xoto3/dynamodb/write_versioned/transact/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,82 @@ def test_delete(tx: VersionedTransaction) -> VersionedTransaction:


def test_no_op_builder():
ran = False

def builder(tx):
assert False
nonlocal ran
ran = True
# transaction does actually run even with no items specified,
# since lazy loading is now permitted.
return tx

versioned_transact_write_items(
builder, dict(table1=[], table2=[]),
)

assert ran


def test_lazy_loading_reads_and_writes(
integration_test_id_table, integration_test_id_table_put, integration_test_id_table_cleaner
):
tname = integration_test_id_table.name
test_id_source = "versioned-transact-known-item-read"
test_id_lazy = "versioned-transact-lazy-load-read"
test_dest_id = "versioned-transact-write-using-lazy-loaded-value"

integration_test_id_table_put(dict(id=test_id_source, val=10))
integration_test_id_table_put(dict(id=test_id_lazy, val=9))
integration_test_id_table_cleaner(dict(id=test_dest_id))

def lazy_op(tx: VersionedTransaction) -> VersionedTransaction:
src = require(tx, tname, dict(id=test_id_source))
if src["val"] > 5:
# the if statement here is just an example of why you might want to lazy-load something.
# In our test, this statement always passes because of the fixture data.
lazy = require(tx, tname, dict(id=test_id_lazy))
print(lazy)
dest_item = dict(id=test_dest_id, val=src["val"] + lazy["val"])
print(dest_item)
return put(tx, tname, dest_item)
# this part of the test is just an example of what you might otherwise do.
# it's not actually ever going to run in our test.
return tx

# note that we only specify upfront a key for the single item we know we need to prefetch
result = versioned_transact_write_items(lazy_op, {tname: [dict(id=test_id_source)]},)

assert require(result, tname, dict(id=test_dest_id)) == dict(id=test_dest_id, val=19)


def test_optimistic_delete_nonexistent(integration_test_id_table):
test_id_to_delete = "versioned-transact-opt-delete"

def opt_delete(tx: VersionedTransaction) -> VersionedTransaction:
return delete(tx, integration_test_id_table.name, dict(id=test_id_to_delete))

res = versioned_transact_write_items(opt_delete, dict())

assert None is get(res, integration_test_id_table.name, dict(id=test_id_to_delete))


def test_optimistic_delete_existing(integration_test_id_table_put, integration_test_id_table):
test_id_to_delete = "versioned-transact-opt-delete-existing"

integration_test_id_table_put(dict(id=test_id_to_delete, val=1984, item_version=4))

tx_run_count = 0

def opt_delete(tx: VersionedTransaction) -> VersionedTransaction:
nonlocal tx_run_count
tx_run_count += 1
return delete(tx, integration_test_id_table.name, dict(id=test_id_to_delete))

res = versioned_transact_write_items(opt_delete, dict())

assert None is get(res, integration_test_id_table.name, dict(id=test_id_to_delete))

assert tx_run_count == 2
# once for the optimistic attempt, which will fail, and a second
# time for the one that succeeds once it knows what the actual
# value is.
2 changes: 1 addition & 1 deletion xoto3/__about__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""xoto3"""
__version__ = "1.9.0"
__version__ = "1.10.0"
__author__ = "Peter Gaultney"
__author_email__ = "[email protected]"
61 changes: 30 additions & 31 deletions xoto3/dynamodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,37 +80,36 @@ refetching, and eventually giving up to the utility.
```python
import xoto3.dynamodb.write_versioned as wv

def context(*_args):
user_key = dict(id="bob")
group_key = dict(pk="team42")

def add_user_to_new_or_existing_group(t: wv.VersionedTransaction) -> wv.VersionedTransaction:
user = wv.require(t, "User", user_key)
assert user, "require will raise if the item does not exist"
group = wv.get(t, "Group", group_key)

if group_key not in user["groups"]:
user["groups"].append(group_key)
t = wv.put(t, "User", user)

if group:
if user_key not in group["members"]:
group["members"].append(user_key)
else:
group = dict(group_key, members=[user_key])

if group != wv.get(t, "Group", group_key):
# if there was a change to the group
t = wv.put(t, "Group", group)
return t

wv.versioned_transact_write_items(
add_user_to_new_or_existing_group,
{
"User": [user_key],
"Group": [group_key],
},
)
user_key = dict(id="bob")
group_key = dict(pk="team42")

def add_user_to_new_or_existing_group(t: wv.VersionedTransaction) -> wv.VersionedTransaction:
user = wv.require(t, "User", user_key)
assert user, "require will raise ItemNotFoundException if the item does not exist"
group = wv.get(t, "Group", group_key)

if group_key not in user["groups"]:
user["groups"].append(group_key)
t = wv.put(t, "User", user)

if group:
if user_key not in group["members"]:
group["members"].append(user_key)
else:
group = dict(group_key, members=[user_key])

if group != wv.get(t, "Group", group_key):
# if there was a change to the group
t = wv.put(t, "Group", group)
return t

wv.versioned_transact_write_items(
add_user_to_new_or_existing_group,
{
"User": [user_key],
"Group": [group_key],
},
)
```

The above code will ensure that the 'state' of the collection of items
Expand Down
5 changes: 3 additions & 2 deletions xoto3/dynamodb/utils/table.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Tuple
from xoto3.dynamodb.types import TableResource, InputItem, ItemKey

from xoto3.dynamodb.types import InputItem, ItemKey, TableResource


def table_primary_keys(table: TableResource) -> Tuple[str, ...]:
return tuple([key["AttributeName"] for key in table.key_schema])
return tuple(sorted([key["AttributeName"] for key in table.key_schema]))


def extract_key_from_item(table: TableResource, item: InputItem) -> ItemKey:
Expand Down
6 changes: 1 addition & 5 deletions xoto3/dynamodb/write_versioned/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
"""API for transact_write_items"""
from .errors import ( # noqa
ItemUnknownToTransactionError,
TableUnknownToTransactionError,
TransactionAttemptsOverrun,
)
from .errors import TableSchemaUnknownError, TransactionAttemptsOverrun # noqa
from .modify import delete, put # noqa
from .read import get, require # noqa
from .run import versioned_transact_write_items # noqa
Expand Down
Loading

0 comments on commit ba4ef19

Please sign in to comment.