Skip to content

Commit

Permalink
Multithreading integration in servers (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
francis-clairicia authored Sep 7, 2024
1 parent 074de2d commit 06b60ec
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 2 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from __future__ import annotations

import time
from collections.abc import AsyncGenerator
from contextlib import AsyncExitStack

from easynetwork.lowlevel.api_async.backend.abc import IEvent
from easynetwork.servers.async_tcp import AsyncTCPNetworkServer
from easynetwork.servers.handlers import AsyncStreamClient, AsyncStreamRequestHandler


class Request: ...


class Response: ...


class BaseRunFromSomeThreadRequestHandler(AsyncStreamRequestHandler[Request, Response]):
async def service_init(self, exit_stack: AsyncExitStack, server: AsyncTCPNetworkServer[Request, Response]) -> None:
from concurrent.futures import ThreadPoolExecutor

from easynetwork.lowlevel.futures import AsyncExecutor

# 4 worker threads for the demo
self.executor = AsyncExecutor(ThreadPoolExecutor(max_workers=4), server.backend())
await exit_stack.enter_async_context(self.executor)

# Create a portal to execute code from external threads in the scheduler loop
self.portal = server.backend().create_threads_portal()
await exit_stack.enter_async_context(self.portal)


class RunCoroutineFromSomeThreadRequestHandler(BaseRunFromSomeThreadRequestHandler):
async def handle(
self,
client: AsyncStreamClient[Response],
) -> AsyncGenerator[None, Request]:
request: Request = yield

response = await self.executor.run(self._data_processing, request)

await client.send_packet(response)

def _data_processing(self, request: Request) -> Response:
# Get back in scheduler loop for 1 second
backend = self.executor.backend()
self.portal.run_coroutine(backend.sleep, 1)

return Response()


class RunSyncFromSomeThreadRequestHandler(BaseRunFromSomeThreadRequestHandler):
async def handle(
self,
client: AsyncStreamClient[Response],
) -> AsyncGenerator[None, Request]:
request: Request = yield

event = client.backend().create_event()

self.executor.wrapped.submit(self._blocking_wait, event)
await event.wait()

await client.send_packet(Response())

def _blocking_wait(self, event: IEvent) -> None:
time.sleep(1)

# Thread-safe flag set
self.portal.run_sync(event.set)


class SpawnTaskFromSomeThreadRequestHandler(BaseRunFromSomeThreadRequestHandler):
async def handle(
self,
client: AsyncStreamClient[Response],
) -> AsyncGenerator[None, Request]:
request: Request = yield

await self.executor.run(self._blocking_wait)

await client.send_packet(Response())

def _blocking_wait(self) -> None:
sleep = self.executor.backend().sleep

async def long_running_task(index: int) -> str:
await sleep(1)
print(f"Task {index} running...")
await sleep(index)
return f"Task {index} return value"

# Spawn several tasks
from concurrent.futures import as_completed

futures = [self.portal.run_coroutine_soon(long_running_task, i) for i in range(1, 5)]
for future in as_completed(futures):
print(future.result())
100 changes: 100 additions & 0 deletions docs/source/_include/examples/howto/multithreading/run_in_thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from __future__ import annotations

import asyncio
import time
from collections.abc import AsyncGenerator
from contextlib import AsyncExitStack

import trio

from easynetwork.servers.async_tcp import AsyncTCPNetworkServer
from easynetwork.servers.handlers import AsyncStreamClient, AsyncStreamRequestHandler


class Request: ...


class Response: ...


class RunInSomeThreadRequestHandlerAsyncIO(AsyncStreamRequestHandler[Request, Response]):
async def handle(
self,
client: AsyncStreamClient[Response],
) -> AsyncGenerator[None, Request]:
request: Request = yield

response = await asyncio.to_thread(self._data_processing, request)

await client.send_packet(response)

def _data_processing(self, request: Request) -> Response:
# Simulate long computing
time.sleep(1)

return Response()


class RunInSomeThreadRequestHandlerTrio(AsyncStreamRequestHandler[Request, Response]):
async def handle(
self,
client: AsyncStreamClient[Response],
) -> AsyncGenerator[None, Request]:
request: Request = yield

response = await trio.to_thread.run_sync(self._data_processing, request)

await client.send_packet(response)

def _data_processing(self, request: Request) -> Response:
# Simulate long computing
time.sleep(1)

return Response()


class RunInSomeThreadRequestHandlerWithClientBackend(AsyncStreamRequestHandler[Request, Response]):
async def handle(
self,
client: AsyncStreamClient[Response],
) -> AsyncGenerator[None, Request]:
request: Request = yield

response = await client.backend().run_in_thread(self._data_processing, request)

await client.send_packet(response)

def _data_processing(self, request: Request) -> Response:
# Simulate long computing
time.sleep(1)

return Response()


class RunInSomeThreadRequestHandlerWithExecutor(AsyncStreamRequestHandler[Request, Response]):
async def service_init(self, exit_stack: AsyncExitStack, server: AsyncTCPNetworkServer[Request, Response]) -> None:
from concurrent.futures import ThreadPoolExecutor

from easynetwork.lowlevel.futures import AsyncExecutor

# 4 worker threads for the demo
self.executor = AsyncExecutor(ThreadPoolExecutor(max_workers=4), server.backend())

# Shut down executor at server stop
await exit_stack.enter_async_context(self.executor)

async def handle(
self,
client: AsyncStreamClient[Response],
) -> AsyncGenerator[None, Request]:
request: Request = yield

response = await self.executor.run(self._data_processing, request)

await client.send_packet(response)

def _data_processing(self, request: Request) -> Response:
# Simulate long computing
time.sleep(1)

return Response()
2 changes: 2 additions & 0 deletions docs/source/api/lowlevel/async/backend.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ All asynchronous objects relying on an :class:`AsyncBackend` object have a ``bac

* Data transport adapters ( :meth:`.AsyncBaseTransport.backend` ).

* Concurrent Executors ( :meth:`.AsyncExecutor.backend` ).

Obtain An Object By Yourself
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions docs/source/howto/advanced/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ Advanced Guide
serializer_combinations
serializer_composition
standalone_servers
multithreaded_servers
119 changes: 119 additions & 0 deletions docs/source/howto/advanced/multithreaded_servers.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
**********************************************
How-to — Multithreading Integration In Servers
**********************************************

.. include:: ../../_include/sync-async-variants.rst

.. contents:: Table of Contents
:local:

------

Run Blocking Functions In A Worker Thread
=========================================

You can run IO-bound functions in another OS thread and :keyword:`await` the result:

.. tabs::

.. group-tab:: Using ``asyncio``

.. literalinclude:: ../../_include/examples/howto/multithreading/run_in_thread.py
:pyobject: RunInSomeThreadRequestHandlerAsyncIO
:start-after: RunInSomeThreadRequestHandlerAsyncIO
:dedent:
:linenos:
:emphasize-lines: 7

.. seealso:: The :func:`asyncio.to_thread` coroutine.

.. group-tab:: Using ``trio``

.. literalinclude:: ../../_include/examples/howto/multithreading/run_in_thread.py
:pyobject: RunInSomeThreadRequestHandlerTrio
:start-after: RunInSomeThreadRequestHandlerTrio
:dedent:
:linenos:
:emphasize-lines: 7

.. seealso:: The :func:`trio.to_thread.run_sync` coroutine.

.. group-tab:: Using the ``AsyncBackend`` API

.. literalinclude:: ../../_include/examples/howto/multithreading/run_in_thread.py
:pyobject: RunInSomeThreadRequestHandlerWithClientBackend
:start-after: RunInSomeThreadRequestHandlerWithClientBackend
:dedent:
:linenos:
:emphasize-lines: 7

.. seealso:: The :meth:`.AsyncBackend.run_in_thread` coroutine.


Use A Custom Thread Pool
------------------------

Instead of using the scheduler's global thread pool, you can (and should) have your own thread pool:

.. literalinclude:: ../../_include/examples/howto/multithreading/run_in_thread.py
:pyobject: RunInSomeThreadRequestHandlerWithExecutor
:start-after: RunInSomeThreadRequestHandlerWithExecutor
:dedent:
:linenos:
:emphasize-lines: 7,18

.. seealso:: The :class:`.AsyncExecutor` class.

Allow Access To The Scheduler Loop From Within A Thread
=======================================================

There are many ways provided by your :term:`asynchronous framework` to get back from a thread to the scheduler loop.
However, the simplest way is to use the provided :class:`.ThreadsPortal` interface:

.. literalinclude:: ../../_include/examples/howto/multithreading/run_from_thread.py
:pyobject: BaseRunFromSomeThreadRequestHandler
:start-after: BaseRunFromSomeThreadRequestHandler
:dedent:
:linenos:
:emphasize-lines: 11-12

Calling asynchronous code from a worker thread
----------------------------------------------

If you need to call a coroutine function from a worker thread, you can do this:

.. literalinclude:: ../../_include/examples/howto/multithreading/run_from_thread.py
:pyobject: RunCoroutineFromSomeThreadRequestHandler
:start-after: RunCoroutineFromSomeThreadRequestHandler
:dedent:
:linenos:
:emphasize-lines: 14


Calling synchronous code from a worker thread
----------------------------------------------

Occasionally you may need to call synchronous code in the event loop thread from a worker thread.
Common cases include setting asynchronous events or sending data to a stream. Because these methods aren't thread safe,
you need to arrange them to be called inside the event loop thread using :meth:`~.ThreadsPortal.run_sync`:

.. literalinclude:: ../../_include/examples/howto/multithreading/run_from_thread.py
:pyobject: RunSyncFromSomeThreadRequestHandler
:start-after: RunSyncFromSomeThreadRequestHandler
:dedent:
:linenos:
:emphasize-lines: 18

Spawning tasks from worker threads
----------------------------------

When you need to spawn a task to be run in the background, you can do so using :meth:`~.ThreadsPortal.run_coroutine_soon`:

.. literalinclude:: ../../_include/examples/howto/multithreading/run_from_thread.py
:pyobject: SpawnTaskFromSomeThreadRequestHandler
:start-after: SpawnTaskFromSomeThreadRequestHandler
:dedent:
:linenos:
:emphasize-lines: 23

Cancelling tasks spawned this way can be done by cancelling the returned :class:`~concurrent.futures.Future`.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def schedule_task() -> None:
waiter = self.__register_waiter(self.__call_soon_waiters, loop)
_ = self.__task_group.create_task(coroutine(waiter), name=TaskUtils.compute_task_name_from_func(coro_func))

self.run_sync_soon(schedule_task)
self.run_sync_soon(schedule_task).result()
return future

def run_sync_soon(self, func: Callable[_P, _T], /, *args: _P.args, **kwargs: _P.kwargs) -> concurrent.futures.Future[_T]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async def coroutine() -> None:
def schedule_task() -> None:
self.__task_group.start_soon(coroutine, name=TaskUtils.compute_task_name_from_func(coro_func))

self.run_sync_soon(schedule_task)
self.run_sync_soon(schedule_task).result()
return future

def run_sync_soon(self, func: Callable[_P, _T], /, *args: _P.args, **kwargs: _P.kwargs) -> concurrent.futures.Future[_T]:
Expand Down
7 changes: 7 additions & 0 deletions src/easynetwork/lowlevel/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ async def shutdown(self, *, cancel_futures: bool = False) -> None:
shutdown_callback = functools.partial(self.__executor.shutdown, wait=True, cancel_futures=cancel_futures)
await self.__backend.run_in_thread(shutdown_callback)

def backend(self) -> AsyncBackend:
"""
Returns:
The backend implementation linked to this object.
"""
return self.__backend

def _setup_func(self, func: Callable[_P, _T]) -> Callable[_P, _T]:
if self.__handle_contexts:
ctx = contextvars.copy_context()
Expand Down
10 changes: 10 additions & 0 deletions tests/unit_test/test_async/test_lowlevel_api/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ async def test____shutdown____shutdown_executor____cancel_futures(
partial_eq(mock_stdlib_executor.shutdown, wait=True, cancel_futures=cancel_futures)
)

async def test____get_backend____returns_inner_backend(
self,
executor: AsyncExecutor[concurrent.futures.Executor],
mock_backend: MagicMock,
) -> None:
# Arrange

# Act & Assert
assert executor.backend() is mock_backend

async def test____context_manager____shutdown_executor_at_end(
self,
executor: AsyncExecutor[concurrent.futures.Executor],
Expand Down

0 comments on commit 06b60ec

Please sign in to comment.