From 540242ecc36f13c560a887826d08a7ec2e7ed45c Mon Sep 17 00:00:00 2001 From: Christian Hartung Date: Mon, 3 Jun 2024 15:32:41 -0300 Subject: [PATCH] refactor: remove routes instance variable from LoaferManager --- src/loafer/dispatchers.py | 13 ++++++------- src/loafer/managers.py | 17 ++++++++++++++--- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/loafer/dispatchers.py b/src/loafer/dispatchers.py index d1491d4..cb4aebe 100644 --- a/src/loafer/dispatchers.py +++ b/src/loafer/dispatchers.py @@ -18,10 +18,12 @@ class LoaferDispatcher: def __init__( self, routes: Sequence[Route], - max_concurrency: int | None = None, + queue_size: int | None = None, + workers: int | None = None, ) -> None: self.routes = routes - self.max_concurrency = max_concurrency if max_concurrency is not None else max(len(routes), 5) + self.queue_size = queue_size if queue_size is not None else len(routes) * 10 + self.workers = workers if workers is not None else max(len(routes), 5) async def dispatch_message(self, message: Any, route: Route) -> bool: logger.debug("dispatching message to route=%s", route) @@ -94,14 +96,11 @@ async def _consume_messages(self, processing_queue: asyncio.Queue) -> None: processing_queue.task_done() async def dispatch_providers(self, forever: bool = True) -> None: # noqa: FBT001, FBT002 - processing_queue = asyncio.Queue(self.max_concurrency) + processing_queue = asyncio.Queue(self.queue_size) async with TaskGroup() as tg: provider_task = tg.create_task(self._fetch_messages(processing_queue, tg, forever)) - - consumer_tasks = [ - tg.create_task(self._consume_messages(processing_queue)) for _ in range(self.max_concurrency) - ] + consumer_tasks = [tg.create_task(self._consume_messages(processing_queue)) for _ in range(self.workers)] async def join(): await provider_task diff --git a/src/loafer/managers.py b/src/loafer/managers.py index 59ff4d1..ffe5d75 100644 --- a/src/loafer/managers.py +++ b/src/loafer/managers.py @@ -1,22 +1,33 @@ +from __future__ import annotations + import asyncio import logging import os +from typing import TYPE_CHECKING, Sequence from .dispatchers import LoaferDispatcher from .runners import LoaferRunner +if TYPE_CHECKING: + from .routes import Route + logger = logging.getLogger(__name__) class LoaferManager: - def __init__(self, routes, runner=None, max_concurrency=None): + def __init__( + self, + routes: Sequence[Route], + runner: LoaferRunner | None = None, + queue_size: int | None = None, + workers: int | None = None, + ): if runner is None: self.runner = LoaferRunner(on_stop_callback=self.on_loop__stop) else: self.runner = runner - self.routes = routes - self.dispatcher = LoaferDispatcher(self.routes, max_concurrency) + self.dispatcher = LoaferDispatcher(routes, queue_size, workers) def run(self, forever=True, debug=False): # noqa: FBT002 loop = self.runner.loop