Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DE-664] Deprecating Batch API #279

Merged
merged 5 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,33 @@
main
-----

* Refactoring `BatchDatabase` API

- The batch API is deprecated since ArangoDB 3.8.0 and will be removed in a future version.
- The `BatchDatabase` is still available, but it now uses a `TreadPoolExecutor` internally.
- For backwards compatibility, the `BatchDatabase` uses only one worker thread, essentially
sending the requests sequentially. Feel free to set the `max_workers` parameter to a higher
value if you want to use multiple threads, but be aware that the requests will be sent in
parallel, which may cause problems if you are using transactions.
- To discourage the use of this API, we now issue a warning when the `BatchDatabase` is used.

Note that `{"foo": "bar"}` may be inserted after `{"foo": "baz"}` in the following example:
```python
with db.begin_batch_execution(max_workers=2) as batch_db:
job1 = batch_db.collection.insert({"foo": "bar"})
job2 = batch_db.collection.insert({"foo": "baz"})
```

7.6.2
-----

* Fix: build_filter_conditions utils method

7.6.1
-----

* [DE-542] Added `shards()` method to `Collection` by @apetenchea in https://github.com/ArangoDB-Community/python-arango/pull/274
* [DE-584] Refactor deprecated `/_api/simple` methods by @aMahanna in https://github.com/ArangoDB-Community/python-arango/pull/268
* [DE-584] Refactor deprecated `/_api/simple` methods by @aMahanna in https://github.com/ArangoDB-Community/python-arango/pull/275
* Added `raise_on_document_error` parameter to `Collection.update_many()` by @aMahanna in https://github.com/ArangoDB-Community/python-arango/pull/273
* Added `computed_values` parameter to `Collection.onfigure()` by @aMahanna in https://github.com/ArangoDB-Community/python-arango/pull/268
* Various bug fixes
Expand Down
45 changes: 40 additions & 5 deletions arango/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime
from numbers import Number
from typing import Any, List, Optional, Sequence, Union
from warnings import warn

from arango.api import ApiGroup
from arango.aql import AQL
Expand Down Expand Up @@ -2488,18 +2489,38 @@ def begin_async_execution(self, return_result: bool = True) -> "AsyncDatabase":
"""
return AsyncDatabase(self._conn, return_result)

def begin_batch_execution(self, return_result: bool = True) -> "BatchDatabase":
def begin_batch_execution(
self,
return_result: bool = True,
max_workers: Optional[int] = 1,
) -> "BatchDatabase":
"""Begin batch execution.

.. warning::

The batch request API is deprecated since ArangoDB 3.8.0.
This functionality should no longer be used.
To send multiple documents at once to an ArangoDB instance,
please use any of :class:`arango.collection.Collection` methods
that accept a list of documents as input.
See :func:`~arango.collection.Collection.insert_many`,
:func:`~arango.collection.Collection.update_many`,
:func:`~arango.collection.Collection.replace_many`,
:func:`~arango.collection.Collection.delete_many`.

:param return_result: If set to True, API executions return instances
of :class:`arango.job.BatchJob` that are populated with results on
commit. If set to False, API executions return None and no results
are tracked client-side.
:type return_result: bool
:param max_workers: Maximum number of workers to use for submitting
requests asynchronously. If None, the default value is the minimum
between `os.cpu_count()` and the number of requests.
:type max_workers: Optional[int]
:return: Database API wrapper object specifically for batch execution.
:rtype: arango.database.BatchDatabase
"""
return BatchDatabase(self._conn, return_result)
return BatchDatabase(self._conn, return_result, max_workers)

def begin_transaction(
self,
Expand Down Expand Up @@ -2587,20 +2608,34 @@ def __repr__(self) -> str:
class BatchDatabase(Database):
"""Database API wrapper tailored specifically for batch execution.

See :func:`arango.database.StandardDatabase.begin_batch_execution`.
.. note::

This class is not intended to be instantiated directly.
See
:func:`arango.database.StandardDatabase.begin_batch_execution`.

:param connection: HTTP connection.
:param return_result: If set to True, API executions return instances of
:class:`arango.job.BatchJob` that are populated with results on commit.
If set to False, API executions return None and no results are tracked
client-side.
:type return_result: bool
:param max_workers: Use a thread pool of at most `max_workers`.
:type max_workers: Optional[int]
"""

def __init__(self, connection: Connection, return_result: bool) -> None:
def __init__(
self, connection: Connection, return_result: bool, max_workers: Optional[int]
) -> None:
self._executor: BatchApiExecutor
super().__init__(
connection=connection, executor=BatchApiExecutor(connection, return_result)
connection=connection,
executor=BatchApiExecutor(connection, return_result, max_workers),
)
warn(
"The batch request API is deprecated since ArangoDB version 3.8.0.",
FutureWarning,
stacklevel=3,
)

def __repr__(self) -> str:
Expand Down
103 changes: 23 additions & 80 deletions arango/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
]

from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from os import cpu_count
from typing import Any, Callable, Optional, Sequence, Tuple, TypeVar, Union
from urllib.parse import urlencode
from uuid import uuid4

from arango.connection import Connection
from arango.exceptions import (
AsyncExecuteError,
BatchExecuteError,
BatchStateError,
OverloadControlExecutorError,
TransactionAbortError,
Expand All @@ -27,7 +26,6 @@
from arango.request import Request
from arango.response import Response
from arango.typings import Fields, Json
from arango.utils import suppress_warning

ApiExecutor = Union[
"DefaultApiExecutor",
Expand Down Expand Up @@ -126,35 +124,29 @@ class BatchApiExecutor:
If set to False, API executions return None and no results are tracked
client-side.
:type return_result: bool
:param max_workers: Use a thread pool of at most `max_workers`. If None,
the default value is the number of CPUs. For backwards compatibility,
the default value is 1, effectively behaving like single-threaded
execution.
:type max_workers: Optional[int]
"""

def __init__(self, connection: Connection, return_result: bool) -> None:
def __init__(
self,
connection: Connection,
return_result: bool,
max_workers: Optional[int] = 1,
) -> None:
self._conn = connection
self._return_result: bool = return_result
self._queue: OrderedDict[str, Tuple[Request, BatchJob[Any]]] = OrderedDict()
self._committed: bool = False
self._max_workers: int = max_workers or cpu_count() # type: ignore

@property
def context(self) -> str:
return "batch"

def _stringify_request(self, request: Request) -> str:
path = request.endpoint

if request.params is not None:
path += f"?{urlencode(request.params)}"
buffer = [f"{request.method} {path} HTTP/1.1"]

if request.headers is not None:
for key, value in sorted(request.headers.items()):
buffer.append(f"{key}: {value}")

if request.data is not None:
serialized = self._conn.serialize(request.data)
buffer.append("\r\n" + serialized)

return "\r\n".join(buffer)

@property
def jobs(self) -> Optional[Sequence[BatchJob[Any]]]:
"""Return the queued batch jobs.
Expand Down Expand Up @@ -190,7 +182,7 @@ def execute(
return job if self._return_result else None

def commit(self) -> Optional[Sequence[BatchJob[Any]]]:
"""Execute the queued requests in a single batch API request.
"""Execute the queued requests in a batch of requests.

If **return_result** parameter was set to True during initialization,
:class:`arango.job.BatchJob` instances are populated with results.
Expand All @@ -199,9 +191,7 @@ def commit(self) -> Optional[Sequence[BatchJob[Any]]]:
False during initialization.
:rtype: [arango.job.BatchJob] | None
:raise arango.exceptions.BatchStateError: If batch state is invalid
(e.g. batch was already committed or size of response from server
did not match the expected).
:raise arango.exceptions.BatchExecuteError: If commit fails.
(e.g. batch was already committed).
"""
if self._committed:
raise BatchStateError("batch already committed")
Expand All @@ -211,65 +201,18 @@ def commit(self) -> Optional[Sequence[BatchJob[Any]]]:
if len(self._queue) == 0:
return self.jobs

# Boundary used for multipart request
boundary = uuid4().hex
with ThreadPoolExecutor(
max_workers=min(self._max_workers, len(self._queue))
) as executor:
for req, job in self._queue.values():
job._future = executor.submit(self._conn.send_request, req)

# Build the batch request payload
buffer = []
for req, job in self._queue.values():
buffer.append(f"--{boundary}")
buffer.append("Content-Type: application/x-arango-batchpart")
buffer.append(f"Content-Id: {job.id}")
buffer.append("\r\n" + self._stringify_request(req))
buffer.append(f"--{boundary}--")

request = Request(
method="post",
endpoint="/_api/batch",
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
data="\r\n".join(buffer),
)
with suppress_warning("requests.packages.urllib3.connectionpool"):
resp = self._conn.send_request(request)

if not resp.is_success:
raise BatchExecuteError(resp, request)
for _, job in self._queue.values():
job._status = "done"

if not self._return_result:
return None

url_prefix = resp.url.strip("/_api/batch")
raw_responses = resp.raw_body.split(f"--{boundary}")[1:-1]

if len(self._queue) != len(raw_responses):
raise BatchStateError(
"expecting {} parts in batch response but got {}".format(
len(self._queue), len(raw_responses)
)
)
for raw_resp in raw_responses:
# Parse and breakdown the batch response body
resp_parts = raw_resp.strip().split("\r\n")
raw_content_id = resp_parts[1]
raw_body = resp_parts[-1]
raw_status = resp_parts[3]
job_id = raw_content_id.split(" ")[1]
_, status_code, status_text = raw_status.split(" ", 2)

# Update the corresponding batch job
queued_req, queued_job = self._queue[job_id]

queued_job._status = "done"
resp = Response(
method=queued_req.method,
url=url_prefix + queued_req.endpoint,
headers={},
status_code=int(status_code),
status_text=status_text,
raw_body=raw_body,
)
queued_job._response = self._conn.prep_response(resp)

return self.jobs


Expand Down
2 changes: 2 additions & 0 deletions arango/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ def format_database(body: Json) -> Json:
result["replication_factor"] = body["replicationFactor"]
if "writeConcern" in body:
result["write_concern"] = body["writeConcern"]
if "replicationVersion" in body:
result["replication_version"] = body["replicationVersion"]

return verify_format(body, result)

Expand Down
9 changes: 5 additions & 4 deletions arango/job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
__all__ = ["AsyncJob", "BatchJob"]

from concurrent.futures import Future
from typing import Callable, Generic, Optional, TypeVar
from uuid import uuid4

Expand Down Expand Up @@ -160,13 +161,13 @@ class BatchJob(Generic[T]):
:type response_handler: callable
"""

__slots__ = ["_id", "_status", "_response", "_response_handler"]
__slots__ = ["_id", "_status", "_response_handler", "_future"]

def __init__(self, response_handler: Callable[[Response], T]) -> None:
self._id = uuid4().hex
self._status = "pending"
self._response: Optional[Response] = None
self._response_handler = response_handler
self._future: Optional[Future[Response]] = None

def __repr__(self) -> str:
return f"<BatchJob {self._id}>"
Expand Down Expand Up @@ -200,7 +201,7 @@ def result(self) -> T:
:raise arango.exceptions.BatchJobResultError: If job result is not
available (i.e. batch is not committed yet).
"""
if self._status == "pending" or self._response is None:
if self._status == "pending" or self._future is None or not self._future.done():
raise BatchJobResultError("result not available yet")

return self._response_handler(self._response)
return self._response_handler(self._future.result())
2 changes: 1 addition & 1 deletion arango/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def normalize_headers(
if driver_flags is not None:
for flag in driver_flags:
flags = flags + flag + ";"
driver_version = "7.6.1"
driver_version = "7.6.2"
driver_header = "python-arango/" + driver_version + " (" + flags + ")"
normalized_headers: Headers = {
"charset": "utf-8",
Expand Down
29 changes: 25 additions & 4 deletions docs/batch.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
Batch API Execution
-------------------

In **batch API executions**, requests to ArangoDB server are stored in client-side
in-memory queue, and committed together in a single HTTP call. After the commit,
results can be retrieved later from :ref:`BatchJob` objects.
.. warning::

The batch request API is deprecated since ArangoDB 3.8.0.
We discourage its use, as it will be removed in a future release.
It is already slow and seems to regularly create weird errors when
used with recent versions of ArangoDB.

The driver functionality has been refactored to no longer use the batch API,
but a `ThreadPoolExecutor` instead. For backwards compatibility,
`max_workers` is set to 1 by default, but can be increased to speed up
batch operations. Essentially, the batch API can now be used to send
multiple requests in parallel, but not to send multiple requests in a
single HTTP call. Note that sending multiple requests in parallel may
cause conflicts on the servers side (for example, requests that modify the same document).

To send multiple documents at once to an ArangoDB instance,
please use any of :class:`arango.collection.Collection` methods
that accept a list of documents as input, such as:

* :func:`~arango.collection.Collection.insert_many`
* :func:`~arango.collection.Collection.update_many`
* :func:`~arango.collection.Collection.replace_many`
* :func:`~arango.collection.Collection.delete_many`

After the commit, results can be retrieved later from :ref:`BatchJob` objects.

**Example:**

Expand Down
6 changes: 2 additions & 4 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest

from arango.cursor import Cursor
from arango.exceptions import AsyncExecuteError, BatchExecuteError, TransactionInitError
from arango.exceptions import AsyncExecuteError, TransactionInitError


def generate_db_name():
Expand Down Expand Up @@ -180,6 +180,4 @@ def assert_raises(*exc):
:param exc: Expected exception(s).
:type: exc
"""
return pytest.raises(
exc + (AsyncExecuteError, BatchExecuteError, TransactionInitError)
)
return pytest.raises(exc + (AsyncExecuteError, TransactionInitError))
Loading
Loading