Skip to content

Commit

Permalink
VS-1487: fix: Threaded BatchGetItem on 3.8+ (#32)
Browse files Browse the repository at this point in the history
* Switch batch_get to use concurrent.futures.ThreadPoolExecutor

* Add create_integration_test_table script

* 1.16.2
  • Loading branch information
xaviergmail authored Sep 2, 2022
1 parent 6d33afc commit 36c43d1
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 6 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
### 1.16.2

- Switches `BatchGetItem`'s threadpool to `concurrent.futures.ThreadPoolExecutor`
to work in AWS Lambda environments running Python 3.8+
- Adds `scripts/create_integration_test_table.py`; creates a DynamoDB table used
for running integration tests

### 1.16.1

- Fix xoto3.lam.finalize for Python > 3.7
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ and no range key.
`XOTO3_INTEGRATION_TEST_NO_RANGE_KEY_INDEX_HASH_KEY`: the name of an
attribute which is the partition key of a GSI with no range key.

If you don't currently have a table viable for testing, you can use the following script to easily create one:
`pipenv run python ./scripts/create_integration_test_table.py`

## Development

### Writing tests
Expand Down
88 changes: 88 additions & 0 deletions scripts/create_integration_test_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python
import argparse
import getpass
import logging
from time import sleep

import boto3

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

ddb = boto3.client("dynamodb")

PK_NAME = "id"
GSI_NAME = "gsi_hash"


def _table_exists(table_name: str) -> bool:
try:
ddb.describe_table(TableName=table_name)
except ddb.exceptions.ResourceNotFoundException:
return False

return True


def main(table_name: str, recreate=False):
if _table_exists(table_name):
logger.info("Table %s already exists", table_name)
if recreate:
logger.info("Parameter --recreate passed, deleting table and re-creating...")
ddb.delete_table(TableName=table_name)
while True:
try:
status = ddb.describe_table(TableName=table_name)["Table"]["TableStatus"]
logger.info("Current table status: %s", status)
except ddb.exceptions.ResourceNotFoundException:
logger.info("Table successfully deleted.")
break
else:
sleep(3)

else:
logger.warning("--recreate not passed. Keeping existing table, nothing left to do.")
return

logger.info("Creating table %s", table_name)
ddb.create_table(
TableName=table_name,
AttributeDefinitions=[
{"AttributeName": PK_NAME, "AttributeType": "S"},
{"AttributeName": GSI_NAME, "AttributeType": "S"},
],
KeySchema=[{"AttributeName": PK_NAME, "KeyType": "HASH"}],
GlobalSecondaryIndexes=[
{
"IndexName": GSI_NAME,
"KeySchema": [{"AttributeName": GSI_NAME, "KeyType": "HASH"},],
"Projection": {"ProjectionType": "ALL"},
}
],
BillingMode="PAY_PER_REQUEST",
)

while True:
status = ddb.describe_table(TableName=table_name)["Table"]["TableStatus"]
if status == "ACTIVE":
logger.info("Finished table creation")
break
else:
logger.info("Awaiting table creation. Current status: '%s'", status)
sleep(3)

logger.info("Success!\n\n")
print("Paste the following lines into your terminal or add to your shell .rc file:\n")
print(f"export XOTO3_INTEGRATION_TEST_DYNAMODB_ID_TABLE_NAME='{table_name}'")
print(f"export XOTO3_INTEGRATION_TEST_NO_RANGE_KEY_INDEX_HASH_KEY='{GSI_NAME}'")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--table-suffix", default=getpass.getuser())
parser.add_argument("--recreate", action="store_const", const=True)

args = parser.parse_args()

_table_name = f"xoto3-integration-{args.table_suffix}"
main(_table_name, args.recreate)
2 changes: 1 addition & 1 deletion xoto3/__about__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""xoto3"""
__version__ = "1.16.1"
__version__ = "1.16.2"
__author__ = "Peter Gaultney"
__author_email__ = "[email protected]"
12 changes: 7 additions & 5 deletions xoto3/dynamodb/batch_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import os
import timeit
import typing as ty
from concurrent.futures import ThreadPoolExecutor
from logging import getLogger
from multiprocessing.dummy import Pool as ThreadPool
from typing import Iterable, List, Set, Tuple

from xoto3.backoff import backoff
Expand All @@ -20,7 +20,9 @@
_BATCH_GET_CHUNKSIZE = int(os.environ.get("BATCH_GET_CHUNKSIZE", 1))
_THREADPOOL_SIZE = int(os.environ.get("BATCH_GET_THREADPOOL_SIZE", 50))
__DEFAULT_THREADPOOL: Lazy[ty.Any] = Lazy(
lambda: ThreadPool(_THREADPOOL_SIZE) if _THREADPOOL_SIZE else None
lambda: ThreadPoolExecutor(max_workers=_THREADPOOL_SIZE, thread_name_prefix=__name__)
if _THREADPOOL_SIZE
else None
)

_DYNAMODB_RESOURCE = tll_from_session(lambda sess: sess.resource("dynamodb"))
Expand Down Expand Up @@ -160,8 +162,8 @@ def partial_get_single_batch(key_values_batch: Set[Tuple[KeyAttributeType, ...]]
)

# threaded implementation
for batch in thread_pool.imap(
partial_get_single_batch, batches_of_100_iter, _BATCH_GET_CHUNKSIZE
for batch in thread_pool.map(
partial_get_single_batch, batches_of_100_iter, chunksize=_BATCH_GET_CHUNKSIZE
):
for key_value_tuple, item in batch:
total_count += 1
Expand Down Expand Up @@ -228,7 +230,7 @@ def _get_single_batch(
# log performance
ms_elapsed = (timeit.default_timer() - start) * 1000
logger.debug(
f"_get_single_batch on %s returned %d/%d items after %d ms; %.1f/s",
"_get_single_batch on %s returned %d/%d items after %d ms; %.1f/s",
table_name,
len(responses),
len(table_request["Keys"]),
Expand Down

0 comments on commit 36c43d1

Please sign in to comment.