Skip to content

Commit

Permalink
refactor: remove routes instance variable from LoaferManager
Browse files Browse the repository at this point in the history
  • Loading branch information
hartungstenio committed Jun 5, 2024
1 parent 877a667 commit 65bc747
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
13 changes: 6 additions & 7 deletions src/loafer/dispatchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ class LoaferDispatcher:
def __init__(
self,
routes: Sequence[Route],
max_concurrency: int | None = None,
queue_size: int | None = None,
workers: int | None = None,
) -> None:
self.routes = routes
self.max_concurrency = max_concurrency if max_concurrency is not None else max(len(routes), 5)
self.queue_size = queue_size if queue_size is not None else len(routes) * 10
self.workers = workers if workers is not None else max(len(routes), 5)

async def dispatch_message(self, message: Any, route: Route) -> bool:
logger.debug("dispatching message to route=%s", route)
Expand Down Expand Up @@ -94,14 +96,11 @@ async def _consume_messages(self, processing_queue: asyncio.Queue) -> None:
processing_queue.task_done()

async def dispatch_providers(self, forever: bool = True) -> None: # noqa: FBT001, FBT002
processing_queue = asyncio.Queue(self.max_concurrency)
processing_queue = asyncio.Queue(self.queue_size)

async with TaskGroup() as tg:
provider_task = tg.create_task(self._fetch_messages(processing_queue, tg, forever))

consumer_tasks = [
tg.create_task(self._consume_messages(processing_queue)) for _ in range(self.max_concurrency)
]
consumer_tasks = [tg.create_task(self._consume_messages(processing_queue)) for _ in range(self.workers)]

async def join():
await provider_task
Expand Down
17 changes: 14 additions & 3 deletions src/loafer/managers.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
from __future__ import annotations

import asyncio
import logging
import os
from typing import TYPE_CHECKING, Sequence

from .dispatchers import LoaferDispatcher
from .runners import LoaferRunner

if TYPE_CHECKING:
from .routes import Route

logger = logging.getLogger(__name__)


class LoaferManager:
def __init__(self, routes, runner=None, max_concurrency=None):
def __init__(
self,
routes: Sequence[Route],
runner: LoaferRunner | None = None,
queue_size: int | None = None,
workers: int | None = None,
):
if runner is None:
self.runner = LoaferRunner(on_stop_callback=self.on_loop__stop)
else:
self.runner = runner

self.routes = routes
self.dispatcher = LoaferDispatcher(self.routes, max_concurrency)
self.dispatcher = LoaferDispatcher(routes, queue_size, workers)

def run(self, forever=True, debug=False): # noqa: FBT002
loop = self.runner.loop
Expand Down

0 comments on commit 65bc747

Please sign in to comment.