From 371be70a463193440847f9db300005bd38f31c81 Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Fri, 15 Sep 2023 16:56:28 +0300 Subject: [PATCH] [DE-664] Deprecating Batch API (#279) * Using TreadPoolExecutor for batch API * Adapted tests * Adapted formatter * Changing changelog and driver version * Updating docs --- CHANGELOG.md | 24 ++++++++++- arango/database.py | 45 ++++++++++++++++--- arango/executor.py | 103 ++++++++++---------------------------------- arango/formatter.py | 2 + arango/job.py | 9 ++-- arango/request.py | 2 +- docs/batch.rst | 29 +++++++++++-- tests/helpers.py | 6 +-- tests/test_batch.py | 46 +++----------------- 9 files changed, 126 insertions(+), 140 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3405986f..1a785610 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,33 @@ main ----- +* Refactoring `BatchDatabase` API + + - The batch API is deprecated since ArangoDB 3.8.0 and will be removed in a future version. + - The `BatchDatabase` is still available, but it now uses a `TreadPoolExecutor` internally. + - For backwards compatibility, the `BatchDatabase` uses only one worker thread, essentially + sending the requests sequentially. Feel free to set the `max_workers` parameter to a higher + value if you want to use multiple threads, but be aware that the requests will be sent in + parallel, which may cause problems if you are using transactions. + - To discourage the use of this API, we now issue a warning when the `BatchDatabase` is used. + + Note that `{"foo": "bar"}` may be inserted after `{"foo": "baz"}` in the following example: + ```python + with db.begin_batch_execution(max_workers=2) as batch_db: + job1 = batch_db.collection.insert({"foo": "bar"}) + job2 = batch_db.collection.insert({"foo": "baz"}) + ``` + +7.6.2 +----- + +* Fix: build_filter_conditions utils method + 7.6.1 ----- * [DE-542] Added `shards()` method to `Collection` by @apetenchea in https://github.com/ArangoDB-Community/python-arango/pull/274 -* [DE-584] Refactor deprecated `/_api/simple` methods by @aMahanna in https://github.com/ArangoDB-Community/python-arango/pull/268 +* [DE-584] Refactor deprecated `/_api/simple` methods by @aMahanna in https://github.com/ArangoDB-Community/python-arango/pull/275 * Added `raise_on_document_error` parameter to `Collection.update_many()` by @aMahanna in https://github.com/ArangoDB-Community/python-arango/pull/273 * Added `computed_values` parameter to `Collection.onfigure()` by @aMahanna in https://github.com/ArangoDB-Community/python-arango/pull/268 * Various bug fixes diff --git a/arango/database.py b/arango/database.py index bb249c28..b8d74419 100644 --- a/arango/database.py +++ b/arango/database.py @@ -9,6 +9,7 @@ from datetime import datetime from numbers import Number from typing import Any, List, Optional, Sequence, Union +from warnings import warn from arango.api import ApiGroup from arango.aql import AQL @@ -2549,18 +2550,38 @@ def begin_async_execution(self, return_result: bool = True) -> "AsyncDatabase": """ return AsyncDatabase(self._conn, return_result) - def begin_batch_execution(self, return_result: bool = True) -> "BatchDatabase": + def begin_batch_execution( + self, + return_result: bool = True, + max_workers: Optional[int] = 1, + ) -> "BatchDatabase": """Begin batch execution. + .. warning:: + + The batch request API is deprecated since ArangoDB 3.8.0. + This functionality should no longer be used. + To send multiple documents at once to an ArangoDB instance, + please use any of :class:`arango.collection.Collection` methods + that accept a list of documents as input. + See :func:`~arango.collection.Collection.insert_many`, + :func:`~arango.collection.Collection.update_many`, + :func:`~arango.collection.Collection.replace_many`, + :func:`~arango.collection.Collection.delete_many`. + :param return_result: If set to True, API executions return instances of :class:`arango.job.BatchJob` that are populated with results on commit. If set to False, API executions return None and no results are tracked client-side. :type return_result: bool + :param max_workers: Maximum number of workers to use for submitting + requests asynchronously. If None, the default value is the minimum + between `os.cpu_count()` and the number of requests. + :type max_workers: Optional[int] :return: Database API wrapper object specifically for batch execution. :rtype: arango.database.BatchDatabase """ - return BatchDatabase(self._conn, return_result) + return BatchDatabase(self._conn, return_result, max_workers) def begin_transaction( self, @@ -2648,7 +2669,11 @@ def __repr__(self) -> str: class BatchDatabase(Database): """Database API wrapper tailored specifically for batch execution. - See :func:`arango.database.StandardDatabase.begin_batch_execution`. + .. note:: + + This class is not intended to be instantiated directly. + See + :func:`arango.database.StandardDatabase.begin_batch_execution`. :param connection: HTTP connection. :param return_result: If set to True, API executions return instances of @@ -2656,12 +2681,22 @@ class BatchDatabase(Database): If set to False, API executions return None and no results are tracked client-side. :type return_result: bool + :param max_workers: Use a thread pool of at most `max_workers`. + :type max_workers: Optional[int] """ - def __init__(self, connection: Connection, return_result: bool) -> None: + def __init__( + self, connection: Connection, return_result: bool, max_workers: Optional[int] + ) -> None: self._executor: BatchApiExecutor super().__init__( - connection=connection, executor=BatchApiExecutor(connection, return_result) + connection=connection, + executor=BatchApiExecutor(connection, return_result, max_workers), + ) + warn( + "The batch request API is deprecated since ArangoDB version 3.8.0.", + FutureWarning, + stacklevel=3, ) def __repr__(self) -> str: diff --git a/arango/executor.py b/arango/executor.py index ce349a74..47ac4a19 100644 --- a/arango/executor.py +++ b/arango/executor.py @@ -8,14 +8,13 @@ ] from collections import OrderedDict +from concurrent.futures import ThreadPoolExecutor +from os import cpu_count from typing import Any, Callable, Optional, Sequence, Tuple, TypeVar, Union -from urllib.parse import urlencode -from uuid import uuid4 from arango.connection import Connection from arango.exceptions import ( AsyncExecuteError, - BatchExecuteError, BatchStateError, OverloadControlExecutorError, TransactionAbortError, @@ -27,7 +26,6 @@ from arango.request import Request from arango.response import Response from arango.typings import Fields, Json -from arango.utils import suppress_warning ApiExecutor = Union[ "DefaultApiExecutor", @@ -126,35 +124,29 @@ class BatchApiExecutor: If set to False, API executions return None and no results are tracked client-side. :type return_result: bool + :param max_workers: Use a thread pool of at most `max_workers`. If None, + the default value is the number of CPUs. For backwards compatibility, + the default value is 1, effectively behaving like single-threaded + execution. + :type max_workers: Optional[int] """ - def __init__(self, connection: Connection, return_result: bool) -> None: + def __init__( + self, + connection: Connection, + return_result: bool, + max_workers: Optional[int] = 1, + ) -> None: self._conn = connection self._return_result: bool = return_result self._queue: OrderedDict[str, Tuple[Request, BatchJob[Any]]] = OrderedDict() self._committed: bool = False + self._max_workers: int = max_workers or cpu_count() # type: ignore @property def context(self) -> str: return "batch" - def _stringify_request(self, request: Request) -> str: - path = request.endpoint - - if request.params is not None: - path += f"?{urlencode(request.params)}" - buffer = [f"{request.method} {path} HTTP/1.1"] - - if request.headers is not None: - for key, value in sorted(request.headers.items()): - buffer.append(f"{key}: {value}") - - if request.data is not None: - serialized = self._conn.serialize(request.data) - buffer.append("\r\n" + serialized) - - return "\r\n".join(buffer) - @property def jobs(self) -> Optional[Sequence[BatchJob[Any]]]: """Return the queued batch jobs. @@ -190,7 +182,7 @@ def execute( return job if self._return_result else None def commit(self) -> Optional[Sequence[BatchJob[Any]]]: - """Execute the queued requests in a single batch API request. + """Execute the queued requests in a batch of requests. If **return_result** parameter was set to True during initialization, :class:`arango.job.BatchJob` instances are populated with results. @@ -199,9 +191,7 @@ def commit(self) -> Optional[Sequence[BatchJob[Any]]]: False during initialization. :rtype: [arango.job.BatchJob] | None :raise arango.exceptions.BatchStateError: If batch state is invalid - (e.g. batch was already committed or size of response from server - did not match the expected). - :raise arango.exceptions.BatchExecuteError: If commit fails. + (e.g. batch was already committed). """ if self._committed: raise BatchStateError("batch already committed") @@ -211,65 +201,18 @@ def commit(self) -> Optional[Sequence[BatchJob[Any]]]: if len(self._queue) == 0: return self.jobs - # Boundary used for multipart request - boundary = uuid4().hex + with ThreadPoolExecutor( + max_workers=min(self._max_workers, len(self._queue)) + ) as executor: + for req, job in self._queue.values(): + job._future = executor.submit(self._conn.send_request, req) - # Build the batch request payload - buffer = [] - for req, job in self._queue.values(): - buffer.append(f"--{boundary}") - buffer.append("Content-Type: application/x-arango-batchpart") - buffer.append(f"Content-Id: {job.id}") - buffer.append("\r\n" + self._stringify_request(req)) - buffer.append(f"--{boundary}--") - - request = Request( - method="post", - endpoint="/_api/batch", - headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}, - data="\r\n".join(buffer), - ) - with suppress_warning("requests.packages.urllib3.connectionpool"): - resp = self._conn.send_request(request) - - if not resp.is_success: - raise BatchExecuteError(resp, request) + for _, job in self._queue.values(): + job._status = "done" if not self._return_result: return None - url_prefix = resp.url.strip("/_api/batch") - raw_responses = resp.raw_body.split(f"--{boundary}")[1:-1] - - if len(self._queue) != len(raw_responses): - raise BatchStateError( - "expecting {} parts in batch response but got {}".format( - len(self._queue), len(raw_responses) - ) - ) - for raw_resp in raw_responses: - # Parse and breakdown the batch response body - resp_parts = raw_resp.strip().split("\r\n") - raw_content_id = resp_parts[1] - raw_body = resp_parts[-1] - raw_status = resp_parts[3] - job_id = raw_content_id.split(" ")[1] - _, status_code, status_text = raw_status.split(" ", 2) - - # Update the corresponding batch job - queued_req, queued_job = self._queue[job_id] - - queued_job._status = "done" - resp = Response( - method=queued_req.method, - url=url_prefix + queued_req.endpoint, - headers={}, - status_code=int(status_code), - status_text=status_text, - raw_body=raw_body, - ) - queued_job._response = self._conn.prep_response(resp) - return self.jobs diff --git a/arango/formatter.py b/arango/formatter.py index ea558720..a74cd3ea 100644 --- a/arango/formatter.py +++ b/arango/formatter.py @@ -157,6 +157,8 @@ def format_database(body: Json) -> Json: result["replication_factor"] = body["replicationFactor"] if "writeConcern" in body: result["write_concern"] = body["writeConcern"] + if "replicationVersion" in body: + result["replication_version"] = body["replicationVersion"] return verify_format(body, result) diff --git a/arango/job.py b/arango/job.py index d2ce1c2d..d5065d04 100644 --- a/arango/job.py +++ b/arango/job.py @@ -1,5 +1,6 @@ __all__ = ["AsyncJob", "BatchJob"] +from concurrent.futures import Future from typing import Callable, Generic, Optional, TypeVar from uuid import uuid4 @@ -160,13 +161,13 @@ class BatchJob(Generic[T]): :type response_handler: callable """ - __slots__ = ["_id", "_status", "_response", "_response_handler"] + __slots__ = ["_id", "_status", "_response_handler", "_future"] def __init__(self, response_handler: Callable[[Response], T]) -> None: self._id = uuid4().hex self._status = "pending" - self._response: Optional[Response] = None self._response_handler = response_handler + self._future: Optional[Future[Response]] = None def __repr__(self) -> str: return f"" @@ -200,7 +201,7 @@ def result(self) -> T: :raise arango.exceptions.BatchJobResultError: If job result is not available (i.e. batch is not committed yet). """ - if self._status == "pending" or self._response is None: + if self._status == "pending" or self._future is None or not self._future.done(): raise BatchJobResultError("result not available yet") - return self._response_handler(self._response) + return self._response_handler(self._future.result()) diff --git a/arango/request.py b/arango/request.py index 6afc6046..a47cc236 100644 --- a/arango/request.py +++ b/arango/request.py @@ -12,7 +12,7 @@ def normalize_headers( if driver_flags is not None: for flag in driver_flags: flags = flags + flag + ";" - driver_version = "7.6.1" + driver_version = "7.6.2" driver_header = "python-arango/" + driver_version + " (" + flags + ")" normalized_headers: Headers = { "charset": "utf-8", diff --git a/docs/batch.rst b/docs/batch.rst index a4e3a047..6aa3ed75 100644 --- a/docs/batch.rst +++ b/docs/batch.rst @@ -1,9 +1,30 @@ Batch API Execution ------------------- - -In **batch API executions**, requests to ArangoDB server are stored in client-side -in-memory queue, and committed together in a single HTTP call. After the commit, -results can be retrieved later from :ref:`BatchJob` objects. +.. warning:: + + The batch request API is deprecated since ArangoDB 3.8.0. + We discourage its use, as it will be removed in a future release. + It is already slow and seems to regularly create weird errors when + used with recent versions of ArangoDB. + + The driver functionality has been refactored to no longer use the batch API, + but a `ThreadPoolExecutor` instead. For backwards compatibility, + `max_workers` is set to 1 by default, but can be increased to speed up + batch operations. Essentially, the batch API can now be used to send + multiple requests in parallel, but not to send multiple requests in a + single HTTP call. Note that sending multiple requests in parallel may + cause conflicts on the servers side (for example, requests that modify the same document). + + To send multiple documents at once to an ArangoDB instance, + please use any of :class:`arango.collection.Collection` methods + that accept a list of documents as input, such as: + + * :func:`~arango.collection.Collection.insert_many` + * :func:`~arango.collection.Collection.update_many` + * :func:`~arango.collection.Collection.replace_many` + * :func:`~arango.collection.Collection.delete_many` + +After the commit, results can be retrieved later from :ref:`BatchJob` objects. **Example:** diff --git a/tests/helpers.py b/tests/helpers.py index 91c0ece3..ef25a786 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -6,7 +6,7 @@ import pytest from arango.cursor import Cursor -from arango.exceptions import AsyncExecuteError, BatchExecuteError, TransactionInitError +from arango.exceptions import AsyncExecuteError, TransactionInitError def generate_db_name(): @@ -180,6 +180,4 @@ def assert_raises(*exc): :param exc: Expected exception(s). :type: exc """ - return pytest.raises( - exc + (AsyncExecuteError, BatchExecuteError, TransactionInitError) - ) + return pytest.raises(exc + (AsyncExecuteError, TransactionInitError)) diff --git a/tests/test_batch.py b/tests/test_batch.py index 990adb98..a2267502 100644 --- a/tests/test_batch.py +++ b/tests/test_batch.py @@ -1,15 +1,7 @@ -import json - -import mock import pytest from arango.database import BatchDatabase -from arango.exceptions import ( - BatchExecuteError, - BatchJobResultError, - BatchStateError, - DocumentInsertError, -) +from arango.exceptions import BatchJobResultError, BatchStateError, DocumentInsertError from arango.job import BatchJob from tests.helpers import clean_doc, extract @@ -74,10 +66,11 @@ def test_batch_execute_with_result(db, col, docs): # Test successful results assert job1.result()["_key"] == docs[0]["_key"] - assert job2.result()["_key"] == docs[1]["_key"] # Test insert error result + # job2 and job3 are concurrent, either one can fail with pytest.raises(DocumentInsertError) as err: + job2.result() job3.result() assert err.value.error_code == 1210 @@ -122,13 +115,9 @@ def test_batch_action_after_commit(db, col): def test_batch_execute_error(bad_db, col, docs): batch_db = bad_db.begin_batch_execution(return_result=True) job = batch_db.collection(col.name).insert_many(docs) - - # Test batch execute with bad database - with pytest.raises(BatchExecuteError) as err: - batch_db.commit() - assert err.value.error_code in {11, 1228} + batch_db.commit() assert len(col) == 0 - assert job.status() == "pending" + assert job.status() == "done" def test_batch_job_result_not_ready(db, col, docs): @@ -144,28 +133,3 @@ def test_batch_job_result_not_ready(db, col, docs): assert batch_db.commit() == [job] assert len(job.result()) == len(docs) assert extract("_key", col.all()) == extract("_key", docs) - - -def test_batch_bad_state(db, col, docs): - batch_db = db.begin_batch_execution() - batch_col = batch_db.collection(col.name) - batch_col.insert(docs[0]) - batch_col.insert(docs[1]) - batch_col.insert(docs[2]) - - # Monkey patch the connection object - mock_resp = mock.MagicMock() - mock_resp.is_success = True - mock_resp.raw_body = "" - mock_send_request = mock.MagicMock() - mock_send_request.return_value = mock_resp - mock_connection = mock.MagicMock() - mock_connection.send_request = mock_send_request - mock_connection.serialize = json.dumps - mock_connection.deserialize = json.loads - batch_db._executor._conn = mock_connection - - # Test commit with invalid batch state - with pytest.raises(BatchStateError) as err: - batch_db.commit() - assert "expecting 3 parts in batch response but got 0" in str(err.value)