From 9e256947941797bbd46909a96b06262b0a8f11ad Mon Sep 17 00:00:00 2001 From: tebben Date: Tue, 23 Jan 2024 13:46:58 +0100 Subject: [PATCH] fixed bug where terrain factory spawned a request when identical request was already out of the queue and being processed --- ctod/core/factory/terrain_factory.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/ctod/core/factory/terrain_factory.py b/ctod/core/factory/terrain_factory.py index 19dfff5..fad2965 100644 --- a/ctod/core/factory/terrain_factory.py +++ b/ctod/core/factory/terrain_factory.py @@ -19,6 +19,7 @@ class TerrainFactory: def __init__(self, cache_expiry_seconds=10): self.cache_expiry_seconds = cache_expiry_seconds self.cache = {} + self.open_requests = {} self.terrain_requests = {} self.processing_queue = asyncio.Queue() self.lock = asyncio.Lock() @@ -42,19 +43,19 @@ async def handle_request(self, terrain_request: TerrainRequest) -> asyncio.Futur # loop over wanted files and add to processing queue if needed for wanted_file in terrain_request.wanted_files: - # Check if the data is already available in the cache if wanted_file.key not in self.cache: - if wanted_file.key not in [item[0].key for item in self.processing_queue._queue]: + if wanted_file.key not in [item[0].key for item in self.processing_queue._queue] and wanted_file.key not in self.open_requests: # Add a new request to the processing queue which handles the download of the cog data cog_request = CogRequest(wanted_file.tms, wanted_file.cog, wanted_file.z, wanted_file.x, wanted_file.y, wanted_file.cog_processor, wanted_file.cog_reader_pool, wanted_file.resampling_method, wanted_file.generate_normals) await self.processing_queue.put((cog_request,)) - + + # If the data is already available, set it in the wanted file else: wanted_file.set_data(self.cache[wanted_file.key].data, self.cache[wanted_file.key].processed_data, self.cache[wanted_file.key].is_out_of_bounds) - - await self._process_queue() + + await self._process_queue() await self._process_terrain_requests() return await terrain_request.wait() @@ -73,10 +74,12 @@ async def start_periodic_check(self, interval:int=5): async def _process_queue(self): """Process the queue with CogRequests""" - while not self.processing_queue.empty(): - cog_request, = await self.processing_queue.get() - future = cog_request.download_tile_async() - asyncio.ensure_future(self._process_completed_cog_request(cog_request, future)) + async with self.lock: + while not self.processing_queue.empty(): + cog_request, = await self.processing_queue.get() + self.open_requests[cog_request.key] = True + future = cog_request.download_tile_async() + asyncio.ensure_future(self._process_completed_cog_request(cog_request, future)) async def _process_completed_cog_request(self, cog_request, future): await future # Wait for the completion of the future @@ -84,6 +87,7 @@ async def _process_completed_cog_request(self, cog_request, future): # Process the completed request async with self.lock: if cog_request: + del self.open_requests[cog_request.key] self.cache[cog_request.key] = cog_request for _, terrain_request in list(self.terrain_requests.items()): @@ -109,6 +113,8 @@ async def _handle_cancelled_request(self, terrain_request: TerrainRequest): for wanted_file in list(self.processing_queue._queue): if wanted_file[0].key not in [wanted_file.key for terrain_request in self.terrain_requests.values() for wanted_file in terrain_request.wanted_files]: self.processing_queue._queue.remove(wanted_file) + if wanted_file[0].key in self.open_requests: + del self.open_requests[wanted_file[0].key] async def _process_terrain_requests(self): """Check and run process on terrain requests when ready for processing"""