diff --git a/nakuru/application.py b/nakuru/application.py index ae41fef..378f7d0 100644 --- a/nakuru/application.py +++ b/nakuru/application.py @@ -62,9 +62,7 @@ async def ws_event(self): async with aiohttp.ClientSession() as session: async with session.ws_connect(f"{self.baseurl}", headers=self.headers) as ws_connection: logger.info("Protocol: connected") - while True: - if self.closed: - break + while not self.closed: try: received_data = await ws_connection.receive_json() except TypeError: @@ -92,7 +90,7 @@ async def ws_event(self): )) async def event_runner(self): - while True: + while not self.closed: try: event_context: NamedTuple[InternalEvent] = await asyncio.wait_for(self.queue.get(), 3) except asyncio.TimeoutError: @@ -193,8 +191,8 @@ async def executor(self, async def _run(self): loop = asyncio.get_event_loop() self.queue = asyncio.Queue(loop=loop) if sys.version_info.minor < 10 else asyncio.Queue() - loop.create_task(self.ws_event()) - loop.create_task(self.event_runner()) + wsLoop = loop.create_task(self.ws_event()) + eventLoop = loop.create_task(self.event_runner()) await self.queue.put(InternalEvent( name=self.getEventCurrentName("AppInitEvent"), @@ -209,6 +207,7 @@ async def _run(self): except KeyboardInterrupt: logger.info("catched Ctrl-C, exiting..") + self.close() except Exception as e: traceback.print_exc() finally: @@ -217,11 +216,14 @@ async def _run(self): for end_callable in self.lifecycle['end']: await self.run_func(end_callable, self) + + # await tasks exit + await wsLoop + await eventLoop def run(self): loop = asyncio.get_event_loop() loop.run_until_complete(self._run()) - loop.run_forever() def receiver(self, event_name,