Skip to content

Commit

Permalink
fixed bug where terrain factory spawned a request when identical requ…
Browse files Browse the repository at this point in the history
…est was already out of the queue and being processed
  • Loading branch information
tebben committed Jan 23, 2024
1 parent 47423a2 commit 9e25694
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions ctod/core/factory/terrain_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class TerrainFactory:
def __init__(self, cache_expiry_seconds=10):
self.cache_expiry_seconds = cache_expiry_seconds
self.cache = {}
self.open_requests = {}
self.terrain_requests = {}
self.processing_queue = asyncio.Queue()
self.lock = asyncio.Lock()
Expand All @@ -42,19 +43,19 @@ async def handle_request(self, terrain_request: TerrainRequest) -> asyncio.Futur

# loop over wanted files and add to processing queue if needed
for wanted_file in terrain_request.wanted_files:

# Check if the data is already available in the cache
if wanted_file.key not in self.cache:
if wanted_file.key not in [item[0].key for item in self.processing_queue._queue]:
if wanted_file.key not in [item[0].key for item in self.processing_queue._queue] and wanted_file.key not in self.open_requests:
# Add a new request to the processing queue which handles the download of the cog data
cog_request = CogRequest(wanted_file.tms, wanted_file.cog, wanted_file.z, wanted_file.x, wanted_file.y, wanted_file.cog_processor, wanted_file.cog_reader_pool, wanted_file.resampling_method, wanted_file.generate_normals)
await self.processing_queue.put((cog_request,))



# If the data is already available, set it in the wanted file
else:
wanted_file.set_data(self.cache[wanted_file.key].data, self.cache[wanted_file.key].processed_data, self.cache[wanted_file.key].is_out_of_bounds)
await self._process_queue()

await self._process_queue()
await self._process_terrain_requests()

return await terrain_request.wait()
Expand All @@ -73,17 +74,20 @@ async def start_periodic_check(self, interval:int=5):
async def _process_queue(self):
"""Process the queue with CogRequests"""

while not self.processing_queue.empty():
cog_request, = await self.processing_queue.get()
future = cog_request.download_tile_async()
asyncio.ensure_future(self._process_completed_cog_request(cog_request, future))
async with self.lock:
while not self.processing_queue.empty():
cog_request, = await self.processing_queue.get()
self.open_requests[cog_request.key] = True
future = cog_request.download_tile_async()
asyncio.ensure_future(self._process_completed_cog_request(cog_request, future))

async def _process_completed_cog_request(self, cog_request, future):
await future # Wait for the completion of the future

# Process the completed request
async with self.lock:
if cog_request:
del self.open_requests[cog_request.key]
self.cache[cog_request.key] = cog_request

for _, terrain_request in list(self.terrain_requests.items()):
Expand All @@ -109,6 +113,8 @@ async def _handle_cancelled_request(self, terrain_request: TerrainRequest):
for wanted_file in list(self.processing_queue._queue):
if wanted_file[0].key not in [wanted_file.key for terrain_request in self.terrain_requests.values() for wanted_file in terrain_request.wanted_files]:
self.processing_queue._queue.remove(wanted_file)
if wanted_file[0].key in self.open_requests:
del self.open_requests[wanted_file[0].key]

async def _process_terrain_requests(self):
"""Check and run process on terrain requests when ready for processing"""
Expand Down

0 comments on commit 9e25694

Please sign in to comment.