From 06b60ec57a5280115fbfba2144aed0e8412791cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francis=20Clairicia-Rose-Claire-Jos=C3=A9phine?= Date: Sat, 7 Sep 2024 20:06:03 +0200 Subject: [PATCH] Multithreading integration in servers (#344) --- .../examples/howto/multithreading/__init__.py | 0 .../howto/multithreading/run_from_thread.py | 98 +++++++++++++++ .../howto/multithreading/run_in_thread.py | 100 +++++++++++++++ docs/source/api/lowlevel/async/backend.rst | 2 + docs/source/howto/advanced/index.rst | 1 + .../howto/advanced/multithreaded_servers.rst | 119 ++++++++++++++++++ .../api_async/backend/_asyncio/threads.py | 2 +- .../api_async/backend/_trio/threads.py | 2 +- src/easynetwork/lowlevel/futures.py | 7 ++ .../test_lowlevel_api/test_futures.py | 10 ++ 10 files changed, 339 insertions(+), 2 deletions(-) create mode 100644 docs/source/_include/examples/howto/multithreading/__init__.py create mode 100644 docs/source/_include/examples/howto/multithreading/run_from_thread.py create mode 100644 docs/source/_include/examples/howto/multithreading/run_in_thread.py create mode 100644 docs/source/howto/advanced/multithreaded_servers.rst diff --git a/docs/source/_include/examples/howto/multithreading/__init__.py b/docs/source/_include/examples/howto/multithreading/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/docs/source/_include/examples/howto/multithreading/run_from_thread.py b/docs/source/_include/examples/howto/multithreading/run_from_thread.py new file mode 100644 index 00000000..6338fcf3 --- /dev/null +++ b/docs/source/_include/examples/howto/multithreading/run_from_thread.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import time +from collections.abc import AsyncGenerator +from contextlib import AsyncExitStack + +from easynetwork.lowlevel.api_async.backend.abc import IEvent +from easynetwork.servers.async_tcp import AsyncTCPNetworkServer +from easynetwork.servers.handlers import AsyncStreamClient, AsyncStreamRequestHandler + + +class Request: ... + + +class Response: ... + + +class BaseRunFromSomeThreadRequestHandler(AsyncStreamRequestHandler[Request, Response]): + async def service_init(self, exit_stack: AsyncExitStack, server: AsyncTCPNetworkServer[Request, Response]) -> None: + from concurrent.futures import ThreadPoolExecutor + + from easynetwork.lowlevel.futures import AsyncExecutor + + # 4 worker threads for the demo + self.executor = AsyncExecutor(ThreadPoolExecutor(max_workers=4), server.backend()) + await exit_stack.enter_async_context(self.executor) + + # Create a portal to execute code from external threads in the scheduler loop + self.portal = server.backend().create_threads_portal() + await exit_stack.enter_async_context(self.portal) + + +class RunCoroutineFromSomeThreadRequestHandler(BaseRunFromSomeThreadRequestHandler): + async def handle( + self, + client: AsyncStreamClient[Response], + ) -> AsyncGenerator[None, Request]: + request: Request = yield + + response = await self.executor.run(self._data_processing, request) + + await client.send_packet(response) + + def _data_processing(self, request: Request) -> Response: + # Get back in scheduler loop for 1 second + backend = self.executor.backend() + self.portal.run_coroutine(backend.sleep, 1) + + return Response() + + +class RunSyncFromSomeThreadRequestHandler(BaseRunFromSomeThreadRequestHandler): + async def handle( + self, + client: AsyncStreamClient[Response], + ) -> AsyncGenerator[None, Request]: + request: Request = yield + + event = client.backend().create_event() + + self.executor.wrapped.submit(self._blocking_wait, event) + await event.wait() + + await client.send_packet(Response()) + + def _blocking_wait(self, event: IEvent) -> None: + time.sleep(1) + + # Thread-safe flag set + self.portal.run_sync(event.set) + + +class SpawnTaskFromSomeThreadRequestHandler(BaseRunFromSomeThreadRequestHandler): + async def handle( + self, + client: AsyncStreamClient[Response], + ) -> AsyncGenerator[None, Request]: + request: Request = yield + + await self.executor.run(self._blocking_wait) + + await client.send_packet(Response()) + + def _blocking_wait(self) -> None: + sleep = self.executor.backend().sleep + + async def long_running_task(index: int) -> str: + await sleep(1) + print(f"Task {index} running...") + await sleep(index) + return f"Task {index} return value" + + # Spawn several tasks + from concurrent.futures import as_completed + + futures = [self.portal.run_coroutine_soon(long_running_task, i) for i in range(1, 5)] + for future in as_completed(futures): + print(future.result()) diff --git a/docs/source/_include/examples/howto/multithreading/run_in_thread.py b/docs/source/_include/examples/howto/multithreading/run_in_thread.py new file mode 100644 index 00000000..33d4c505 --- /dev/null +++ b/docs/source/_include/examples/howto/multithreading/run_in_thread.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import asyncio +import time +from collections.abc import AsyncGenerator +from contextlib import AsyncExitStack + +import trio + +from easynetwork.servers.async_tcp import AsyncTCPNetworkServer +from easynetwork.servers.handlers import AsyncStreamClient, AsyncStreamRequestHandler + + +class Request: ... + + +class Response: ... + + +class RunInSomeThreadRequestHandlerAsyncIO(AsyncStreamRequestHandler[Request, Response]): + async def handle( + self, + client: AsyncStreamClient[Response], + ) -> AsyncGenerator[None, Request]: + request: Request = yield + + response = await asyncio.to_thread(self._data_processing, request) + + await client.send_packet(response) + + def _data_processing(self, request: Request) -> Response: + # Simulate long computing + time.sleep(1) + + return Response() + + +class RunInSomeThreadRequestHandlerTrio(AsyncStreamRequestHandler[Request, Response]): + async def handle( + self, + client: AsyncStreamClient[Response], + ) -> AsyncGenerator[None, Request]: + request: Request = yield + + response = await trio.to_thread.run_sync(self._data_processing, request) + + await client.send_packet(response) + + def _data_processing(self, request: Request) -> Response: + # Simulate long computing + time.sleep(1) + + return Response() + + +class RunInSomeThreadRequestHandlerWithClientBackend(AsyncStreamRequestHandler[Request, Response]): + async def handle( + self, + client: AsyncStreamClient[Response], + ) -> AsyncGenerator[None, Request]: + request: Request = yield + + response = await client.backend().run_in_thread(self._data_processing, request) + + await client.send_packet(response) + + def _data_processing(self, request: Request) -> Response: + # Simulate long computing + time.sleep(1) + + return Response() + + +class RunInSomeThreadRequestHandlerWithExecutor(AsyncStreamRequestHandler[Request, Response]): + async def service_init(self, exit_stack: AsyncExitStack, server: AsyncTCPNetworkServer[Request, Response]) -> None: + from concurrent.futures import ThreadPoolExecutor + + from easynetwork.lowlevel.futures import AsyncExecutor + + # 4 worker threads for the demo + self.executor = AsyncExecutor(ThreadPoolExecutor(max_workers=4), server.backend()) + + # Shut down executor at server stop + await exit_stack.enter_async_context(self.executor) + + async def handle( + self, + client: AsyncStreamClient[Response], + ) -> AsyncGenerator[None, Request]: + request: Request = yield + + response = await self.executor.run(self._data_processing, request) + + await client.send_packet(response) + + def _data_processing(self, request: Request) -> Response: + # Simulate long computing + time.sleep(1) + + return Response() diff --git a/docs/source/api/lowlevel/async/backend.rst b/docs/source/api/lowlevel/async/backend.rst index d0a7fba9..f4b7ff29 100644 --- a/docs/source/api/lowlevel/async/backend.rst +++ b/docs/source/api/lowlevel/async/backend.rst @@ -67,6 +67,8 @@ All asynchronous objects relying on an :class:`AsyncBackend` object have a ``bac * Data transport adapters ( :meth:`.AsyncBaseTransport.backend` ). +* Concurrent Executors ( :meth:`.AsyncExecutor.backend` ). + Obtain An Object By Yourself ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/source/howto/advanced/index.rst b/docs/source/howto/advanced/index.rst index 3d8e0d6a..6383821b 100644 --- a/docs/source/howto/advanced/index.rst +++ b/docs/source/howto/advanced/index.rst @@ -10,3 +10,4 @@ Advanced Guide serializer_combinations serializer_composition standalone_servers + multithreaded_servers diff --git a/docs/source/howto/advanced/multithreaded_servers.rst b/docs/source/howto/advanced/multithreaded_servers.rst new file mode 100644 index 00000000..75539835 --- /dev/null +++ b/docs/source/howto/advanced/multithreaded_servers.rst @@ -0,0 +1,119 @@ +********************************************** +How-to — Multithreading Integration In Servers +********************************************** + +.. include:: ../../_include/sync-async-variants.rst + +.. contents:: Table of Contents + :local: + +------ + +Run Blocking Functions In A Worker Thread +========================================= + +You can run IO-bound functions in another OS thread and :keyword:`await` the result: + +.. tabs:: + + .. group-tab:: Using ``asyncio`` + + .. literalinclude:: ../../_include/examples/howto/multithreading/run_in_thread.py + :pyobject: RunInSomeThreadRequestHandlerAsyncIO + :start-after: RunInSomeThreadRequestHandlerAsyncIO + :dedent: + :linenos: + :emphasize-lines: 7 + + .. seealso:: The :func:`asyncio.to_thread` coroutine. + + .. group-tab:: Using ``trio`` + + .. literalinclude:: ../../_include/examples/howto/multithreading/run_in_thread.py + :pyobject: RunInSomeThreadRequestHandlerTrio + :start-after: RunInSomeThreadRequestHandlerTrio + :dedent: + :linenos: + :emphasize-lines: 7 + + .. seealso:: The :func:`trio.to_thread.run_sync` coroutine. + + .. group-tab:: Using the ``AsyncBackend`` API + + .. literalinclude:: ../../_include/examples/howto/multithreading/run_in_thread.py + :pyobject: RunInSomeThreadRequestHandlerWithClientBackend + :start-after: RunInSomeThreadRequestHandlerWithClientBackend + :dedent: + :linenos: + :emphasize-lines: 7 + + .. seealso:: The :meth:`.AsyncBackend.run_in_thread` coroutine. + + +Use A Custom Thread Pool +------------------------ + +Instead of using the scheduler's global thread pool, you can (and should) have your own thread pool: + +.. literalinclude:: ../../_include/examples/howto/multithreading/run_in_thread.py + :pyobject: RunInSomeThreadRequestHandlerWithExecutor + :start-after: RunInSomeThreadRequestHandlerWithExecutor + :dedent: + :linenos: + :emphasize-lines: 7,18 + +.. seealso:: The :class:`.AsyncExecutor` class. + +Allow Access To The Scheduler Loop From Within A Thread +======================================================= + +There are many ways provided by your :term:`asynchronous framework` to get back from a thread to the scheduler loop. +However, the simplest way is to use the provided :class:`.ThreadsPortal` interface: + +.. literalinclude:: ../../_include/examples/howto/multithreading/run_from_thread.py + :pyobject: BaseRunFromSomeThreadRequestHandler + :start-after: BaseRunFromSomeThreadRequestHandler + :dedent: + :linenos: + :emphasize-lines: 11-12 + +Calling asynchronous code from a worker thread +---------------------------------------------- + +If you need to call a coroutine function from a worker thread, you can do this: + +.. literalinclude:: ../../_include/examples/howto/multithreading/run_from_thread.py + :pyobject: RunCoroutineFromSomeThreadRequestHandler + :start-after: RunCoroutineFromSomeThreadRequestHandler + :dedent: + :linenos: + :emphasize-lines: 14 + + +Calling synchronous code from a worker thread +---------------------------------------------- + +Occasionally you may need to call synchronous code in the event loop thread from a worker thread. +Common cases include setting asynchronous events or sending data to a stream. Because these methods aren't thread safe, +you need to arrange them to be called inside the event loop thread using :meth:`~.ThreadsPortal.run_sync`: + +.. literalinclude:: ../../_include/examples/howto/multithreading/run_from_thread.py + :pyobject: RunSyncFromSomeThreadRequestHandler + :start-after: RunSyncFromSomeThreadRequestHandler + :dedent: + :linenos: + :emphasize-lines: 18 + +Spawning tasks from worker threads +---------------------------------- + +When you need to spawn a task to be run in the background, you can do so using :meth:`~.ThreadsPortal.run_coroutine_soon`: + +.. literalinclude:: ../../_include/examples/howto/multithreading/run_from_thread.py + :pyobject: SpawnTaskFromSomeThreadRequestHandler + :start-after: SpawnTaskFromSomeThreadRequestHandler + :dedent: + :linenos: + :emphasize-lines: 23 + +Cancelling tasks spawned this way can be done by cancelling the returned :class:`~concurrent.futures.Future`. diff --git a/src/easynetwork/lowlevel/api_async/backend/_asyncio/threads.py b/src/easynetwork/lowlevel/api_async/backend/_asyncio/threads.py index 571873f5..f4a7b520 100644 --- a/src/easynetwork/lowlevel/api_async/backend/_asyncio/threads.py +++ b/src/easynetwork/lowlevel/api_async/backend/_asyncio/threads.py @@ -121,7 +121,7 @@ def schedule_task() -> None: waiter = self.__register_waiter(self.__call_soon_waiters, loop) _ = self.__task_group.create_task(coroutine(waiter), name=TaskUtils.compute_task_name_from_func(coro_func)) - self.run_sync_soon(schedule_task) + self.run_sync_soon(schedule_task).result() return future def run_sync_soon(self, func: Callable[_P, _T], /, *args: _P.args, **kwargs: _P.kwargs) -> concurrent.futures.Future[_T]: diff --git a/src/easynetwork/lowlevel/api_async/backend/_trio/threads.py b/src/easynetwork/lowlevel/api_async/backend/_trio/threads.py index 4246677b..ef884df4 100644 --- a/src/easynetwork/lowlevel/api_async/backend/_trio/threads.py +++ b/src/easynetwork/lowlevel/api_async/backend/_trio/threads.py @@ -125,7 +125,7 @@ async def coroutine() -> None: def schedule_task() -> None: self.__task_group.start_soon(coroutine, name=TaskUtils.compute_task_name_from_func(coro_func)) - self.run_sync_soon(schedule_task) + self.run_sync_soon(schedule_task).result() return future def run_sync_soon(self, func: Callable[_P, _T], /, *args: _P.args, **kwargs: _P.kwargs) -> concurrent.futures.Future[_T]: diff --git a/src/easynetwork/lowlevel/futures.py b/src/easynetwork/lowlevel/futures.py index 7fce4b17..854c0a86 100644 --- a/src/easynetwork/lowlevel/futures.py +++ b/src/easynetwork/lowlevel/futures.py @@ -202,6 +202,13 @@ async def shutdown(self, *, cancel_futures: bool = False) -> None: shutdown_callback = functools.partial(self.__executor.shutdown, wait=True, cancel_futures=cancel_futures) await self.__backend.run_in_thread(shutdown_callback) + def backend(self) -> AsyncBackend: + """ + Returns: + The backend implementation linked to this object. + """ + return self.__backend + def _setup_func(self, func: Callable[_P, _T]) -> Callable[_P, _T]: if self.__handle_contexts: ctx = contextvars.copy_context() diff --git a/tests/unit_test/test_async/test_lowlevel_api/test_futures.py b/tests/unit_test/test_async/test_lowlevel_api/test_futures.py index b614e4bc..5ab72ca9 100644 --- a/tests/unit_test/test_async/test_lowlevel_api/test_futures.py +++ b/tests/unit_test/test_async/test_lowlevel_api/test_futures.py @@ -262,6 +262,16 @@ async def test____shutdown____shutdown_executor____cancel_futures( partial_eq(mock_stdlib_executor.shutdown, wait=True, cancel_futures=cancel_futures) ) + async def test____get_backend____returns_inner_backend( + self, + executor: AsyncExecutor[concurrent.futures.Executor], + mock_backend: MagicMock, + ) -> None: + # Arrange + + # Act & Assert + assert executor.backend() is mock_backend + async def test____context_manager____shutdown_executor_at_end( self, executor: AsyncExecutor[concurrent.futures.Executor],