From 9c950b47eca2b4e93fd2fe52cf80f158e6cf97ad Mon Sep 17 00:00:00 2001 From: George Psarakis Date: Mon, 23 Jan 2017 11:49:38 +0200 Subject: [PATCH] AWS DynamoDB result backend (#3736) * Add result backend for AWS DynamoDB * Dependencies for DynamoDB result backend * Add DynamoDB backend in aliases * Test cases for DynamoDB result backend * Documentation for DynamoDB backend * Configurable endpoint URL for DynamoDB local instance * Enable integration tests for DynamoDB result backend - Run before_install script only for integration environments * Fix invalid type error for primary key in Python3 * Add Python 3.6 in Travis CI build matrix - Instruct Travis CI to include Python 3.6 interpreter in jobs - Optimize Travis CI build matrix * Optimize Travis CI build matrix * Fix endless loop in logger_isa (Python 3.6) * Add test cases for AWS client construction - Add/improve log messages during table initialization - Enable skipped unit tests due to missing dependency boto3 * Use explicit hash seed value for apicheck tox environment - Related Sphinx issue: https://github.com/sphinx-doc/sphinx/issues/2324 --- .travis.yml | 63 ++-- celery/app/backends.py | 3 +- celery/backends/dynamodb.py | 275 ++++++++++++++++++ celery/utils/log.py | 2 +- docs/includes/installation.txt | 3 + .../reference/celery.backends.dynamodb.rst | 11 + docs/internals/reference/index.rst | 1 + docs/userguide/configuration.rst | 69 +++++ requirements/extras/dynamodb.txt | 1 + requirements/test-ci-default.txt | 2 +- requirements/test-integration.txt | 1 + setup.py | 3 +- t/unit/backends/test_dynamodb.py | 243 ++++++++++++++++ tox.ini | 9 +- 14 files changed, 662 insertions(+), 24 deletions(-) create mode 100644 celery/backends/dynamodb.py create mode 100644 docs/internals/reference/celery.backends.dynamodb.rst create mode 100644 requirements/extras/dynamodb.txt create mode 100644 t/unit/backends/test_dynamodb.py diff --git a/.travis.yml b/.travis.yml index 992a3df6157..ef4ef3dd9cb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,31 +2,42 @@ language: python sudo: required cache: false python: - - '3.5' + - '2.7' + - '3.4' + - '3.5' + - '3.6' os: - linux env: global: - PYTHONUNBUFFERED=yes + - PYTHONUNBUFFERED=yes matrix: - - TOXENV=2.7-unit - - TOXENV=2.7-integration-rabbitmq - - TOXENV=2.7-integration-redis - - TOXENV=3.4-unit - - TOXENV=3.4-integration-rabbitmq - - TOXENV=3.4-integration-redis - - TOXENV=3.5-unit - - TOXENV=3.5-integration-rabbitmq - - TOXENV=3.5-integration-redis - - TOXENV=pypy-unit PYPY_VERSION="5.3" - - TOXENV=pypy-integration-rabbitmq PYPY_VERSION="5.3" - - TOXENV=pypy-integration-redis PYPY_VERSION="5.3" - - TOXENV=flake8 - - TOXENV=flakeplus - - TOXENV=apicheck - - TOXENV=configcheck - - TOXENV=pydocstyle + - MATRIX_TOXENV=unit + - MATRIX_TOXENV=integration-rabbitmq + - MATRIX_TOXENV=integration-redis + - MATRIX_TOXENV=integration-dynamodb +matrix: + include: + - python: '3.5' + env: TOXENV=pypy-unit PYPY_VERSION="5.3" + - python: '3.5' + env: TOXENV=pypy-integration-rabbitmq PYPY_VERSION="5.3" + - python: '3.5' + env: TOXENV=pypy-integration-redis PYPY_VERSION="5.3" + - python: '3.5' + env: TOXENV=pypy-integration-dynamodb PYPY_VERSION="5.3" + - python: '3.5' + env: TOXENV=flake8 + - python: '3.5' + env: TOXENV=flakeplus + - python: '3.5' + env: TOXENV=apicheck + - python: '3.5' + env: TOXENV=configcheck + - python: '3.5' + env: TOXENV=pydocstyle before_install: + - if [[ -v MATRIX_TOXENV ]]; then export TOXENV=${TRAVIS_PYTHON_VERSION}-${MATRIX_TOXENV}; fi; env - | if [ "$TOXENV" = "pypy" ]; then export PYENV_ROOT="$HOME/.pyenv" @@ -39,6 +50,20 @@ before_install: virtualenv --python="$PYENV_ROOT/versions/pypy-$PYPY_VERSION/bin/python" "$HOME/virtualenvs/pypy-$PYPY_VERSION" source "$HOME/virtualenvs/pypy-$PYPY_VERSION/bin/activate" fi + - | + if [[ "$TOXENV" == *dynamodb ]]; then + sudo apt-get install -y default-jre supervisor + mkdir /opt/dynamodb-local + cd /opt/dynamodb-local && curl -L http://dynamodb-local.s3-website-us-west-2.amazonaws.com/dynamodb_local_latest.tar.gz | tar zx + cd - + echo '[program:dynamodb-local]' | sudo tee /etc/supervisor/conf.d/dynamodb-local.conf + echo 'command=java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -inMemory' | sudo tee -a /etc/supervisor/conf.d/dynamodb-local.conf + echo 'directory=/opt/dynamodb-local' | sudo tee -a /etc/supervisor/conf.d/dynamodb-local.conf + sudo service supervisor stop + sudo service supervisor start + sleep 10 + curl localhost:8000 + fi after_success: - .tox/$TRAVIS_PYTHON_VERSION/bin/coverage xml - .tox/$TRAVIS_PYTHON_VERSION/bin/codecov -e TOXENV diff --git a/celery/app/backends.py b/celery/app/backends.py index d5a67527a20..37c8d70628b 100644 --- a/celery/app/backends.py +++ b/celery/app/backends.py @@ -29,7 +29,8 @@ 'riak': 'celery.backends.riak:RiakBackend', 'file': 'celery.backends.filesystem:FilesystemBackend', 'disabled': 'celery.backends.base:DisabledBackend', - 'consul': 'celery.backends.consul:ConsulBackend' + 'consul': 'celery.backends.consul:ConsulBackend', + 'dynamodb': 'celery.backends.dynamodb:DynamoDBBackend', } diff --git a/celery/backends/dynamodb.py b/celery/backends/dynamodb.py new file mode 100644 index 00000000000..748dc7188d3 --- /dev/null +++ b/celery/backends/dynamodb.py @@ -0,0 +1,275 @@ +# -*- coding: utf-8 -*- +"""AWS DynamoDB result store backend.""" +from __future__ import absolute_import, unicode_literals +from collections import namedtuple +from time import time, sleep + +from kombu.utils.url import _parse_url as parse_url +from celery.exceptions import ImproperlyConfigured +from celery.utils.log import get_logger +from celery.five import string +from .base import KeyValueStoreBackend +try: + import boto3 + from botocore.exceptions import ClientError +except ImportError: # pragma: no cover + boto3 = ClientError = None # noqa + +__all__ = ['DynamoDBBackend'] + + +# Helper class that describes a DynamoDB attribute +DynamoDBAttribute = namedtuple('DynamoDBAttribute', ('name', 'data_type')) + +logger = get_logger(__name__) + + +class DynamoDBBackend(KeyValueStoreBackend): + """AWS DynamoDB result backend. + + Raises: + celery.exceptions.ImproperlyConfigured: + if module :pypi:`boto3` is not available. + """ + + #: default DynamoDB table name (`default`) + table_name = 'celery' + + #: Read Provisioned Throughput (`default`) + read_capacity_units = 1 + + #: Write Provisioned Throughput (`default`) + write_capacity_units = 1 + + #: AWS region (`default`) + aws_region = None + + #: The endpoint URL that is passed to boto3 (local DynamoDB) (`default`) + endpoint_url = None + + _key_field = DynamoDBAttribute(name='id', data_type='S') + _value_field = DynamoDBAttribute(name='result', data_type='B') + _timestamp_field = DynamoDBAttribute(name='timestamp', data_type='N') + _available_fields = None + + def __init__(self, url=None, table_name=None, *args, **kwargs): + super(DynamoDBBackend, self).__init__(*args, **kwargs) + + self.url = url + self.table_name = table_name or self.table_name + + if not boto3: + raise ImproperlyConfigured( + 'You need to install the boto3 library to use the ' + 'DynamoDB backend.') + + aws_credentials_given = False + aws_access_key_id = None + aws_secret_access_key = None + + if url is not None: + scheme, region, port, username, password, table, query = \ + parse_url(url) + + aws_access_key_id = username + aws_secret_access_key = password + + access_key_given = aws_access_key_id is not None + secret_key_given = aws_secret_access_key is not None + + if access_key_given != secret_key_given: + raise ImproperlyConfigured( + 'You need to specify both the Access Key ID ' + 'and Secret.') + + aws_credentials_given = access_key_given + + if region == 'localhost': + # We are using the downloadable, local version of DynamoDB + self.endpoint_url = 'http://localhost:{}'.format(port) + self.aws_region = 'us-east-1' + logger.warning( + 'Using local-only DynamoDB endpoint URL: {}'.format( + self.endpoint_url + ) + ) + else: + self.aws_region = region + + self.read_capacity_units = int( + query.get( + 'read', + self.read_capacity_units + ) + ) + self.write_capacity_units = int( + query.get( + 'write', + self.write_capacity_units + ) + ) + self.table_name = table or self.table_name + + self._available_fields = ( + self._key_field, + self._value_field, + self._timestamp_field + ) + + self._client = None + if aws_credentials_given: + self._get_client( + access_key_id=aws_access_key_id, + secret_access_key=aws_secret_access_key + ) + + def _get_client(self, access_key_id=None, secret_access_key=None): + """Get client connection.""" + if self._client is None: + client_parameters = dict( + region_name=self.aws_region + ) + if access_key_id is not None: + client_parameters.update(dict( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key + )) + + if self.endpoint_url is not None: + client_parameters['endpoint_url'] = self.endpoint_url + + self._client = boto3.client( + 'dynamodb', + **client_parameters + ) + self._get_or_create_table() + return self._client + + def _get_table_schema(self): + """Get the boto3 structure describing the DynamoDB table schema.""" + return dict( + AttributeDefinitions=[ + { + 'AttributeName': self._key_field.name, + 'AttributeType': self._key_field.data_type + } + ], + TableName=self.table_name, + KeySchema=[ + { + 'AttributeName': self._key_field.name, + 'KeyType': 'HASH' + } + ], + ProvisionedThroughput={ + 'ReadCapacityUnits': self.read_capacity_units, + 'WriteCapacityUnits': self.write_capacity_units + } + ) + + def _get_or_create_table(self): + """Create table if not exists, otherwise return the description.""" + table_schema = self._get_table_schema() + try: + table_description = self._client.create_table(**table_schema) + logger.info( + 'DynamoDB Table {} did not exist, creating.'.format( + self.table_name + ) + ) + # In case we created the table, wait until it becomes available. + self._wait_for_table_status('ACTIVE') + logger.info( + 'DynamoDB Table {} is now available.'.format( + self.table_name + ) + ) + return table_description + except ClientError as e: + error_code = e.response['Error'].get('Code', 'Unknown') + + # If table exists, do not fail, just return the description. + if error_code == 'ResourceInUseException': + return self._client.describe_table( + TableName=self.table_name + ) + else: + raise e + + def _wait_for_table_status(self, expected='ACTIVE'): + """Poll for the expected table status.""" + achieved_state = False + while not achieved_state: + table_description = self.client.describe_table( + TableName=self.table_name + ) + logger.debug( + 'Waiting for DynamoDB table {} to become {}.'.format( + self.table_name, + expected + ) + ) + current_status = table_description['Table']['TableStatus'] + achieved_state = current_status == expected + sleep(1) + + def _prepare_get_request(self, key): + """Construct the item retrieval request parameters.""" + return dict( + TableName=self.table_name, + Key={ + self._key_field.name: { + self._key_field.data_type: key + } + } + ) + + def _prepare_put_request(self, key, value): + """Construct the item creation request parameters.""" + return dict( + TableName=self.table_name, + Item={ + self._key_field.name: { + self._key_field.data_type: key + }, + self._value_field.name: { + self._value_field.data_type: value + }, + self._timestamp_field.name: { + self._timestamp_field.data_type: str(time()) + } + } + ) + + def _item_to_dict(self, raw_response): + """Convert get_item() response to field-value pairs.""" + if 'Item' not in raw_response: + return {} + return { + field.name: raw_response['Item'][field.name][field.data_type] + for field in self._available_fields + } + + @property + def client(self): + return self._get_client() + + def get(self, key): + key = string(key) + request_parameters = self._prepare_get_request(key) + item_response = self.client.get_item(**request_parameters) + item = self._item_to_dict(item_response) + return item.get(self._value_field.name) + + def set(self, key, value): + key = string(key) + request_parameters = self._prepare_put_request(key, value) + self.client.put_item(**request_parameters) + + def mget(self, keys): + return [self.get(key) for key in keys] + + def delete(self, key): + key = string(key) + request_parameters = self._prepare_get_request(key) + self.client.delete_item(**request_parameters) diff --git a/celery/utils/log.py b/celery/utils/log.py index 2fb15e68666..00df47631c0 100644 --- a/celery/utils/log.py +++ b/celery/utils/log.py @@ -82,7 +82,7 @@ def logger_isa(l, p, max=1000): else: if this in seen: raise RuntimeError( - 'Logger {0!r} parents recursive'.format(l), + 'Logger {0!r} parents recursive'.format(l.name), ) seen.add(this) this = this.parent diff --git a/docs/includes/installation.txt b/docs/includes/installation.txt index d9cfc3eff8c..28081e2aeee 100644 --- a/docs/includes/installation.txt +++ b/docs/includes/installation.txt @@ -88,6 +88,9 @@ Transports and Backends :``celery[riak]``: for using Riak as a result backend. +:``celery[dynamodb]``: + for using AWS DynamoDB as a result backend. + :``celery[zookeeper]``: for using Zookeeper as a message transport. diff --git a/docs/internals/reference/celery.backends.dynamodb.rst b/docs/internals/reference/celery.backends.dynamodb.rst new file mode 100644 index 00000000000..f7f39bcf3d1 --- /dev/null +++ b/docs/internals/reference/celery.backends.dynamodb.rst @@ -0,0 +1,11 @@ +=========================================== + ``celery.backends.dynamodb`` +=========================================== + +.. contents:: + :local: +.. currentmodule:: celery.backends.dynamodb + +.. automodule:: celery.backends.dynamodb + :members: + :undoc-members: diff --git a/docs/internals/reference/index.rst b/docs/internals/reference/index.rst index 335ff900d91..3f35d25a6b5 100644 --- a/docs/internals/reference/index.rst +++ b/docs/internals/reference/index.rst @@ -35,6 +35,7 @@ celery.backends.riak celery.backends.cassandra celery.backends.couchbase + celery.backends.dynamodb celery.backends.filesystem celery.app.trace celery.app.annotations diff --git a/docs/userguide/configuration.rst b/docs/userguide/configuration.rst index d949cdef7cb..9d531346cc3 100644 --- a/docs/userguide/configuration.rst +++ b/docs/userguide/configuration.rst @@ -1129,6 +1129,75 @@ This is a dict supporting the following keys: The protocol to use to connect to the Riak server. This isn't configurable via :setting:`result_backend` +.. _conf-dynamodb-result-backend: + +AWS DynamoDB backend settings +----------------------------- + +.. note:: + + The Dynamodb backend requires the :pypi:`boto3` library. + + To install this package use :command:`pip`: + + .. code-block:: console + + $ pip install celery[dynamodb] + + See :ref:`bundles` for information on combining multiple extension + requirements. + +This backend requires the :setting:`result_backend` +setting to be set to a DynamoDB URL:: + + result_backend = 'dynamodb://aws_access_key_id:aws_secret_access_key@region:port/table?read=n&write=m' + +For example, specifying the AWS region and the table name:: + + result_backend = 'dynamodb://@us-east-1/celery_results + +or retrieving AWS configuration parameters from the environment, using the default table name (``celery``) +and specifying read and write provisioned throughput:: + + result_backend = 'dynamodb://@/?read=5&write=5' + +or using the `downloadable version `_ +of DynamoDB +`locally `_:: + + result_backend = 'dynamodb://@localhost:8000 + +The fields of the URL are defined as follows: + +#. ``aws_access_key_id & aws_secret_access_key`` + + The credentials for accessing AWS API resources. These can also be resolved + by the :pypi:`boto3` library from various sources, as + described `here `_. + +#. ``region`` + + The AWS region, e.g. ``us-east-1`` or ``localhost`` for the `Downloadable Version `_. + See the :pypi:`boto3` library `documentation `_ + for definition options. + +#. ``port`` + + The listening port of the local DynamoDB instance, if you are using the downloadable version. + If you have not specified the ``region`` parameter as ``localhost``, + setting this parameter has **no effect**. + +#. ``table`` + + Table name to use. Default is ``celery``. + See the `DynamoDB Naming Rules `_ + for information on the allowed characters and length. + +#. ``read & write`` + + The Read & Write Capacity Units for the created DynamoDB table. Default is ``1`` for both read and write. + More details can be found in the `Provisioned Throughput documentation `_. + .. _conf-ironcache-result-backend: IronCache backend settings diff --git a/requirements/extras/dynamodb.txt b/requirements/extras/dynamodb.txt new file mode 100644 index 00000000000..939a2aa474a --- /dev/null +++ b/requirements/extras/dynamodb.txt @@ -0,0 +1 @@ +boto3==1.4.3 \ No newline at end of file diff --git a/requirements/test-ci-default.txt b/requirements/test-ci-default.txt index 9dffcbaefc5..7a575eba8e9 100644 --- a/requirements/test-ci-default.txt +++ b/requirements/test-ci-default.txt @@ -16,4 +16,4 @@ -r extras/couchdb.txt -r extras/consul.txt -r extras/cassandra.txt - +-r extras/dynamodb.txt diff --git a/requirements/test-integration.txt b/requirements/test-integration.txt index 64c36a5b0bb..ce643b473bf 100644 --- a/requirements/test-integration.txt +++ b/requirements/test-integration.txt @@ -1,2 +1,3 @@ simplejson -r extras/redis.txt +-r extras/dynamodb.txt diff --git a/setup.py b/setup.py index ef50f431c87..f23574ad005 100644 --- a/setup.py +++ b/setup.py @@ -73,7 +73,8 @@ def _pyimp(): 'pyro', 'slmq', 'tblib', - 'consul' + 'consul', + 'dynamodb' } # -*- Classifiers -*- diff --git a/t/unit/backends/test_dynamodb.py b/t/unit/backends/test_dynamodb.py new file mode 100644 index 00000000000..b3b33139c95 --- /dev/null +++ b/t/unit/backends/test_dynamodb.py @@ -0,0 +1,243 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from decimal import Decimal + +import pytest +from case import MagicMock, Mock, patch, sentinel, skip + +from celery.backends import dynamodb as module +from celery.backends.dynamodb import DynamoDBBackend +from celery.exceptions import ImproperlyConfigured +from celery.five import string + + +@skip.unless_module('boto3') +class test_DynamoDBBackend: + def setup(self): + self._static_timestamp = Decimal(1483425566.52) # noqa + self.app.conf.result_backend = 'dynamodb://' + + @property + def backend(self): + """:rtype: DynamoDBBackend""" + return self.app.backend + + def test_init_no_boto3(self): + prev, module.boto3 = module.boto3, None + try: + with pytest.raises(ImproperlyConfigured): + DynamoDBBackend(app=self.app) + finally: + module.boto3 = prev + + def test_init_aws_credentials(self): + with pytest.raises(ImproperlyConfigured): + DynamoDBBackend( + app=self.app, + url='dynamodb://a:@' + ) + + def test_get_client_local(self): + table_creation_path = \ + 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table' + with patch('boto3.client') as mock_boto_client, \ + patch(table_creation_path): + backend = DynamoDBBackend( + app=self.app, + url='dynamodb://@localhost:8000' + ) + client = backend._get_client() + assert backend.client is client + mock_boto_client.assert_called_once_with( + 'dynamodb', + endpoint_url='http://localhost:8000', + region_name='us-east-1' + ) + assert backend.endpoint_url == 'http://localhost:8000' + + def test_get_client_credentials(self): + table_creation_path = \ + 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table' + with patch('boto3.client') as mock_boto_client, \ + patch(table_creation_path): + backend = DynamoDBBackend( + app=self.app, + url='dynamodb://key:secret@test' + ) + client = backend._get_client() + assert client is backend.client + mock_boto_client.assert_called_once_with( + 'dynamodb', + aws_access_key_id='key', + aws_secret_access_key='secret', + region_name='test' + ) + assert backend.aws_region == 'test' + + def test_get_or_create_table_not_exists(self): + self.backend._client = MagicMock() + mock_create_table = self.backend._client.create_table = MagicMock() + mock_describe_table = self.backend._client.describe_table = \ + MagicMock() + + mock_describe_table.return_value = { + 'Table': { + 'TableStatus': 'ACTIVE' + } + } + + self.backend._get_or_create_table() + mock_create_table.assert_called_once_with( + **self.backend._get_table_schema() + ) + + def test_get_or_create_table_already_exists(self): + from botocore.exceptions import ClientError + + self.backend._client = MagicMock() + mock_create_table = self.backend._client.create_table = MagicMock() + client_error = ClientError( + { + 'Error': { + 'Code': 'ResourceInUseException', + 'Message': 'Table already exists: {}'.format( + self.backend.table_name + ) + } + }, + 'CreateTable' + ) + mock_create_table.side_effect = client_error + mock_describe_table = self.backend._client.describe_table = \ + MagicMock() + + mock_describe_table.return_value = { + 'Table': { + 'TableStatus': 'ACTIVE' + } + } + + self.backend._get_or_create_table() + mock_describe_table.assert_called_once_with( + TableName=self.backend.table_name + ) + + def test_wait_for_table_status(self): + self.backend._client = MagicMock() + mock_describe_table = self.backend._client.describe_table = \ + MagicMock() + mock_describe_table.side_effect = [ + {'Table': { + 'TableStatus': 'CREATING' + }}, + {'Table': { + 'TableStatus': 'SOME_STATE' + }} + ] + self.backend._wait_for_table_status(expected='SOME_STATE') + assert mock_describe_table.call_count == 2 + + def test_prepare_get_request(self): + expected = { + 'TableName': u'celery', + 'Key': {u'id': {u'S': u'abcdef'}} + } + assert self.backend._prepare_get_request('abcdef') == expected + + def test_prepare_put_request(self): + expected = { + 'TableName': u'celery', + 'Item': { + u'id': {u'S': u'abcdef'}, + u'result': {u'B': u'val'}, + u'timestamp': { + u'N': str(Decimal(self._static_timestamp)) + } + } + } + with patch('celery.backends.dynamodb.time', self._mock_time): + result = self.backend._prepare_put_request('abcdef', 'val') + assert result == expected + + def test_item_to_dict(self): + boto_response = { + 'Item': { + 'id': { + 'S': sentinel.key + }, + 'result': { + 'B': sentinel.value + }, + 'timestamp': { + 'N': Decimal(1) + } + } + } + converted = self.backend._item_to_dict(boto_response) + assert converted == { + 'id': sentinel.key, + 'result': sentinel.value, + 'timestamp': Decimal(1) + } + + def test_get(self): + self.backend._client = Mock(name='_client') + self.backend._client.get_item = MagicMock() + + assert self.backend.get('1f3fab') is None + self.backend.client.get_item.assert_called_once_with( + Key={u'id': {u'S': u'1f3fab'}}, + TableName='celery' + ) + + def _mock_time(self): + return self._static_timestamp + + def test_set(self): + + self.backend._client = MagicMock() + self.backend._client.put_item = MagicMock() + + # should return None + with patch('celery.backends.dynamodb.time', self._mock_time): + assert self.backend.set(sentinel.key, sentinel.value) is None + + assert self.backend._client.put_item.call_count == 1 + _, call_kwargs = self.backend._client.put_item.call_args + expected_kwargs = dict( + Item={ + u'timestamp': {u'N': str(self._static_timestamp)}, + u'id': {u'S': string(sentinel.key)}, + u'result': {u'B': sentinel.value} + }, + TableName='celery' + ) + assert call_kwargs['Item'] == expected_kwargs['Item'] + assert call_kwargs['TableName'] == 'celery' + + def test_delete(self): + self.backend._client = Mock(name='_client') + mocked_delete = self.backend._client.delete = Mock('client.delete') + mocked_delete.return_value = None + # should return None + assert self.backend.delete('1f3fab') is None + self.backend.client.delete_item.assert_called_once_with( + Key={u'id': {u'S': u'1f3fab'}}, + TableName='celery' + ) + + def test_backend_by_url(self, url='dynamodb://'): + from celery.app import backends + from celery.backends.dynamodb import DynamoDBBackend + backend, url_ = backends.by_url(url, self.app.loader) + assert backend is DynamoDBBackend + assert url_ == url + + def test_backend_params_by_url(self): + self.app.conf.result_backend = \ + 'dynamodb://@us-east-1/celery_results?read=10&write=20' + assert self.backend.aws_region == 'us-east-1' + assert self.backend.table_name == 'celery_results' + assert self.backend.read_capacity_units == 10 + assert self.backend.write_capacity_units == 20 + assert self.backend.endpoint_url is None diff --git a/tox.ini b/tox.ini index ad14dba8324..c603db06610 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,7 @@ [tox] envlist = {2.7,pypy,3.4,3.5,3.6}-unit - {2.7,pypy,3.4,3.5,3.6}-integration-{rabbitmq,redis} + {2.7,pypy,3.4,3.5,3.6}-integration-{rabbitmq,redis,dynamodb} flake8 flakeplus @@ -36,6 +36,11 @@ setenv = redis: TEST_BROKER=redis:// redis: TEST_BACKEND=redis:// + dynamodb: TEST_BROKER=redis:// + dynamodb: TEST_BACKEND=dynamodb://@localhost:8000 + dynamodb: AWS_ACCESS_KEY_ID=test_aws_key_id + dynamodb: AWS_SECRET_ACCESS_KEY=test_aws_secret_key + basepython = 2.7: python2.7 3.4: python3.4 @@ -45,6 +50,8 @@ basepython = flake8,flakeplus,apicheck,linkcheck,configcheck,pydocstyle: python2.7 [testenv:apicheck] +setenv = + PYTHONHASHSEED = 100 commands = sphinx-build -b apicheck -d {envtmpdir}/doctrees docs docs/_build/apicheck