From 017657c98c4afadac760cec36323c4ea564e4464 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sun, 7 Nov 2021 08:52:32 +0000 Subject: [PATCH 1/5] fix: Apply queueing for each event type - This will reduce the contention of overlapping scheduler event handlers within a single manager process. --- src/ai/backend/common/events.py | 48 ++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/src/ai/backend/common/events.py b/src/ai/backend/common/events.py index 5e95534e..6af26430 100644 --- a/src/ai/backend/common/events.py +++ b/src/ai/backend/common/events.py @@ -41,6 +41,7 @@ KernelId, SessionId, LogSeverity, + Sentinel, ) __all__ = ( @@ -621,6 +622,10 @@ class EventDispatcher(aobject): subscriber_loop_task: asyncio.Task consumer_taskset: weakref.WeakSet[asyncio.Task] subscriber_taskset: weakref.WeakSet[asyncio.Task] + consumer_queues: defaultdict[str, asyncio.Queue[ + tuple[str, AgentId, tuple] | Sentinel, + ]] + consumer_handlers: dict[str, asyncio.Task] _log_events: bool _consumer_name: str @@ -641,12 +646,23 @@ def __init__( self._consumer_name = secrets.token_urlsafe(16) async def __ainit__(self) -> None: - self.consumer_loop_task = asyncio.create_task(self._consume_loop()) - self.subscriber_loop_task = asyncio.create_task(self._subscribe_loop()) + self.consumer_loop_task = asyncio.create_task( + self._consume_loop(), + name="evdispatcher.consumer_loop", + ) + self.subscriber_loop_task = asyncio.create_task( + self._subscribe_loop(), + name="evdispatcher.subscriber_loop", + ) self.consumer_taskset = weakref.WeakSet() self.subscriber_taskset = weakref.WeakSet() + self.consumer_queues = defaultdict(asyncio.Queue) + self.consumer_handlers = {} async def close(self) -> None: + for q in self.consumer_queues.values(): + q.put_nowait(Sentinel.TOKEN) + await asyncio.sleep(0) cancelled_tasks = [] for task in self.consumer_taskset: if not task.done(): @@ -736,6 +752,11 @@ async def dispatch_consumers( self.handle("CONSUMER", consumer, source, args), )) await asyncio.sleep(0) + results = await asyncio.gather(*self.consumer_taskset, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + log.error("unexpected error while processing ev:{}", event_name, exc_info=result) + self.consumer_taskset.clear() async def dispatch_subscribers( self, @@ -751,6 +772,13 @@ async def dispatch_subscribers( )) await asyncio.sleep(0) + async def _consume_handle(self, event_name) -> None: + while True: + params = await self.consumer_queues[event_name].get() + if params is Sentinel.TOKEN: + break + await self.dispatch_consumers(*params) + async def _consume_loop(self) -> None: async with aclosing(redis.read_stream_by_group( self.redis_client, @@ -760,11 +788,19 @@ async def _consume_loop(self) -> None: )) as agen: async for msg_id, msg_data in agen: try: - await self.dispatch_consumers( - msg_data[b'name'].decode(), - msg_data[b'source'].decode(), - msgpack.unpackb(msg_data[b'args']), + event_name = msg_data[b'name'].decode() + self.consumer_queues[event_name].put_nowait( + ( + event_name, + msg_data[b'source'].decode(), + msgpack.unpackb(msg_data[b'args']), + ), ) + if event_name not in self.consumer_handlers: + self.consumer_handlers[event_name] = asyncio.create_task( + self._consume_handle(event_name), + name=f"evdispatcher.consume_handler.{event_name}" + ) except asyncio.CancelledError: raise except Exception: From 21c4d5311444863d3e1671841feecf796cc5154b Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sun, 7 Nov 2021 08:55:17 +0000 Subject: [PATCH 2/5] docs: Add news fragment --- changes/93.fix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/93.fix diff --git a/changes/93.fix b/changes/93.fix new file mode 100644 index 00000000..a27c7127 --- /dev/null +++ b/changes/93.fix @@ -0,0 +1 @@ +Apply queueing of individual event types to reduce contention of overlapping event handlers in each manager process From a9fb952f335bd70ba5169b9906d57a57857792ed Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sun, 7 Nov 2021 09:01:51 +0000 Subject: [PATCH 3/5] fix: Use func-local task set in consumer dispatcher --- src/ai/backend/common/events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ai/backend/common/events.py b/src/ai/backend/common/events.py index 6af26430..3b85fe7c 100644 --- a/src/ai/backend/common/events.py +++ b/src/ai/backend/common/events.py @@ -747,16 +747,16 @@ async def dispatch_consumers( ) -> None: if self._log_events: log.debug('DISPATCH_CONSUMERS(ev:{}, ag:{})', event_name, source) + consumer_tasks = [] for consumer in self.consumers[event_name].copy(): - self.consumer_taskset.add(asyncio.create_task( + consumer_tasks.add(asyncio.create_task( self.handle("CONSUMER", consumer, source, args), )) await asyncio.sleep(0) - results = await asyncio.gather(*self.consumer_taskset, return_exceptions=True) + results = await asyncio.gather(*consumer_tasks, return_exceptions=True) for result in results: if isinstance(result, Exception): log.error("unexpected error while processing ev:{}", event_name, exc_info=result) - self.consumer_taskset.clear() async def dispatch_subscribers( self, From 792c6d877976acb9fce6e3704290e41c22344e9d Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Sun, 7 Nov 2021 09:17:35 +0000 Subject: [PATCH 4/5] fix: oops --- src/ai/backend/common/events.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ai/backend/common/events.py b/src/ai/backend/common/events.py index 3b85fe7c..3c7c66cd 100644 --- a/src/ai/backend/common/events.py +++ b/src/ai/backend/common/events.py @@ -749,10 +749,9 @@ async def dispatch_consumers( log.debug('DISPATCH_CONSUMERS(ev:{}, ag:{})', event_name, source) consumer_tasks = [] for consumer in self.consumers[event_name].copy(): - consumer_tasks.add(asyncio.create_task( + consumer_tasks.append(asyncio.create_task( self.handle("CONSUMER", consumer, source, args), )) - await asyncio.sleep(0) results = await asyncio.gather(*consumer_tasks, return_exceptions=True) for result in results: if isinstance(result, Exception): From 48ab35c64731dc5416d660db3adbbd9440d160ea Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Mon, 8 Nov 2021 02:35:57 +0000 Subject: [PATCH 5/5] fix: Properly synchronize queue shutdown --- src/ai/backend/common/events.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/ai/backend/common/events.py b/src/ai/backend/common/events.py index 3c7c66cd..0b2b2bce 100644 --- a/src/ai/backend/common/events.py +++ b/src/ai/backend/common/events.py @@ -660,9 +660,6 @@ async def __ainit__(self) -> None: self.consumer_handlers = {} async def close(self) -> None: - for q in self.consumer_queues.values(): - q.put_nowait(Sentinel.TOKEN) - await asyncio.sleep(0) cancelled_tasks = [] for task in self.consumer_taskset: if not task.done(): @@ -677,6 +674,11 @@ async def close(self) -> None: cancelled_tasks.append(self.consumer_loop_task) cancelled_tasks.append(self.subscriber_loop_task) await asyncio.gather(*cancelled_tasks, return_exceptions=True) + join_tasks = [] + for q in self.consumer_queues.values(): + q.put_nowait(Sentinel.TOKEN) + join_tasks.append(q.join()) + await asyncio.gather(*join_tasks) await self.redis_client.close() def consume( @@ -774,9 +776,12 @@ async def dispatch_subscribers( async def _consume_handle(self, event_name) -> None: while True: params = await self.consumer_queues[event_name].get() - if params is Sentinel.TOKEN: - break - await self.dispatch_consumers(*params) + try: + if params is Sentinel.TOKEN: + break + await self.dispatch_consumers(*params) + finally: + self.consumer_queues[event_name].task_done() async def _consume_loop(self) -> None: async with aclosing(redis.read_stream_by_group(