From 9b1589aed319a065880568f1bbcc464f060e20cf Mon Sep 17 00:00:00 2001 From: tebben Date: Thu, 18 Jan 2024 08:08:15 +0100 Subject: [PATCH] Use a COG reader pool to improve performance slightly --- ctod/core/cog/cog.py | 30 ---------- ctod/core/cog/cog_reader.py | 70 ++++++++++++++++++++++++ ctod/core/cog/cog_reader_pool.py | 82 ++++++++++++++++++++++++++++ ctod/core/cog/cog_request.py | 48 +++++++++++----- ctod/core/factory/terrain_factory.py | 2 +- ctod/core/terrain/terrain_request.py | 8 ++- ctod/handlers/terrain.py | 3 +- ctod/server.py | 18 +++--- 8 files changed, 205 insertions(+), 56 deletions(-) delete mode 100644 ctod/core/cog/cog.py create mode 100644 ctod/core/cog/cog_reader.py create mode 100644 ctod/core/cog/cog_reader_pool.py diff --git a/ctod/core/cog/cog.py b/ctod/core/cog/cog.py deleted file mode 100644 index 7bef820..0000000 --- a/ctod/core/cog/cog.py +++ /dev/null @@ -1,30 +0,0 @@ -from morecantile import TileMatrixSet -from rio_tiler.io import Reader -from rio_tiler.models import ImageData - - -def download_tile(tms: TileMatrixSet, x: int, y: int, z: int, geotiff_path: str, resampling_method="bilinear") -> ImageData: - """Retrieve an image from a Cloud Optimized GeoTIFF based on a tile index. - - Args: - tms (TileMatrixSet): The tile matrix set to use. - x (int): x tile index. - y (int): y tile index. - z (int): z tile index. - geotiff_path (str): Path or URL to the Cloud Optimized GeoTIFF. - resampling_method (str, optional): RasterIO resampling algorithm. Defaults to "bilinear". - - Returns: - ImageData: _description_ - """ - - with Reader(geotiff_path, tms=tms) as src: - image_data = src.tile(tile_z=z, tile_x=x, tile_y=y, resampling_method=resampling_method) - - # Set nodata to 0 if nodata is present in the metadata - nodata_value = src.info().nodata_value if hasattr(src.info(), 'nodata_value') else None - - if nodata_value is not None: - image_data.data[image_data.data == nodata_value] = float(0) - - return image_data \ No newline at end of file diff --git a/ctod/core/cog/cog_reader.py b/ctod/core/cog/cog_reader.py new file mode 100644 index 0000000..cbb708b --- /dev/null +++ b/ctod/core/cog/cog_reader.py @@ -0,0 +1,70 @@ +import time + +from morecantile import TileMatrixSet +from rio_tiler.io import Reader +from rio_tiler.models import ImageData + + +class CogReader: + """A reader for a Cloud Optimized GeoTIFF. This class is used to pool readers to + avoid opening and closing the same file many times. + """ + + def __init__(self, pool, cog: str, tms: TileMatrixSet): + self.pool = pool + self.cog = cog + self.tms = tms + self.last_used = time.time() + self._set_rio_reader() + self._set_nodata_value() + + def close(self): + """Close the reader.""" + + self.rio_reader.close() + + def download_tile(self, x: int, y: int, z: int, resampling_method="bilinear") -> ImageData: + """Retrieve an image from a Cloud Optimized GeoTIFF based on a tile index. + + Args: + tms (TileMatrixSet): The tile matrix set to use. + x (int): x tile index. + y (int): y tile index. + z (int): z tile index. + geotiff_path (str): Path or URL to the Cloud Optimized GeoTIFF. + resampling_method (str, optional): RasterIO resampling algorithm. Defaults to "bilinear". + + Returns: + ImageData: _description_ + """ + + try: + + image_data = self.rio_reader.tile(tile_z=z, tile_x=x, tile_y=y, resampling_method=resampling_method) + + # For now set nodata to 0 if nodata is present in the metadata + # handle this better later + if self.nodata_value is not None: + image_data.data[image_data.data == self.nodata_value] = float(0) + + return image_data + + except Exception: + return None + + def return_reader(self): + """Done with the reader, return it to the pool.""" + + self.last_used = time.time() + self.pool.return_reader(self) + + def _set_rio_reader(self): + """Get the reader for the COG.""" + + self.rio_reader = Reader(self.cog, tms=self.tms) + + def _set_nodata_value(self): + """Set the nodata value for the reader.""" + + reader_info = self.rio_reader.info() + self.nodata_value = reader_info.nodata_value if hasattr(reader_info, 'nodata_value') else None diff --git a/ctod/core/cog/cog_reader_pool.py b/ctod/core/cog/cog_reader_pool.py new file mode 100644 index 0000000..9b54f1c --- /dev/null +++ b/ctod/core/cog/cog_reader_pool.py @@ -0,0 +1,82 @@ +import asyncio +import logging + +from collections import defaultdict +from ctod.core.cog.cog_reader import CogReader +from morecantile import TileMatrixSet + + +class CogReaderPool: + """Pool to spawn and manage readers for Cloud Optimized GeoTIFFs. + ToDo: Cleanup readers after being unused for a while, doesn't seem to impact memory usage much. + """ + + def __init__(self, max_readers=250): + """Create a new pool of readers. + + Args: + max_readers (int, optional): Amount of max readers in memory per cog path. Defaults to 250. + """ + + self.max_readers = max_readers + self.readers = defaultdict(list) + self.lock = asyncio.Lock() + + async def get_reader(self, cog: str, tms: TileMatrixSet) -> CogReader: + """Get a reader from the pool. If no reader is available a new one is created. + + Args: + cog (str): The path/url to the COG + tms (TileMatrixSet): The TileMatrixSet to use for the COG + + Returns: + CogReader: A reader for the COG + """ + + async with self.lock: + if cog not in self.readers or len(self.readers[cog]) == 0: + reader = CogReader(self, cog, tms) + else: + reader = self.readers[cog].pop() + + return reader + + def return_reader(self, reader: CogReader): + """Return a reader to the pool adding it back to the list of readers for the COG + + Args: + reader (CogReader): The COG Reader to return to the pool + """ + + if(len(self.readers[reader.cog]) >= self.max_readers): + reader.close() + return + + self.readers[reader.cog].append(reader) + logging.debug(f"Readers in pool for {reader.cog}: {len(self.readers[reader.cog])}") + + def populate_pool(self, cog: str, tms: TileMatrixSet, count: int): + """Populate the pool with readers for a COG + + Args: + cog (str): The path/url to the COG + tms (TileMatrixSet): The TileMatrixSet to use for the COG + count (int): The number of readers to create + """ + + for _ in range(count): + reader = self._create_reader(cog, tms) + self.return_reader(reader) + + def _create_reader(self, cog: str, tms: TileMatrixSet) -> CogReader: + """Create a new COG Reader for the pool + + Args: + cog (str): Path/url to the COG + tms (TileMatrixSet): TileMatrixSet to use for the COG + + Returns: + reader (CogReader): A new COG Reader + """ + + return CogReader(self, cog, tms) \ No newline at end of file diff --git a/ctod/core/cog/cog_request.py b/ctod/core/cog/cog_request.py index c89ac1d..1643bb6 100644 --- a/ctod/core/cog/cog_request.py +++ b/ctod/core/cog/cog_request.py @@ -2,19 +2,28 @@ import time from ctod.core import utils -from ctod.core.cog.cog import download_tile +from ctod.core.cog.cog_reader import CogReader +from ctod.core.cog.cog_reader_pool import CogReaderPool from ctod.core.cog.processor.cog_processor import CogProcessor from ctod.core.utils import generate_cog_cache_key -from rio_tiler.errors import TileOutsideBounds +from functools import partial +from rio_tiler.models import ImageData +from typing import Any + class CogRequest: - def __init__(self, tms, cog, z, x, y, cog_processor: CogProcessor, resampling_method = "bilinear", generate_normals = False): + """A request for a Cloud Optimized GeoTIFF tile. + COG data is retrieved and processed. + """ + + def __init__(self, tms, cog, z, x, y, cog_processor: CogProcessor, cog_reader_pool: CogReaderPool, resampling_method = "bilinear", generate_normals = False): self.tms = tms self.cog = cog self.z = z self.x = x self.y = y self.cog_processor = cog_processor + self.cog_reader_pool = cog_reader_pool self.resampling_method = resampling_method self.generate_normals = generate_normals self.key = generate_cog_cache_key(cog, z, x, y) @@ -25,7 +34,15 @@ def __init__(self, tms, cog, z, x, y, cog_processor: CogProcessor, resampling_me self.timestamp = time.time() self._future = None - def set_data(self, data, processed_data, is_out_of_bounds): + def set_data(self, data: ImageData, processed_data: Any, is_out_of_bounds: bool): + """Set the data manually + + Args: + data (ImageData): Data from the CogReader + processed_data (Any): Data processed by the CogProcessor + is_out_of_bounds (bool): Whether the tile is out of bounds + """ + self.data = data self.processed_data = processed_data self.is_out_of_bounds = is_out_of_bounds @@ -34,17 +51,20 @@ async def download_tile_async(self): """ Asynchronous version to retrieve an image from a Cloud Optimized GeoTIFF based on a tile index. """ + loop = asyncio.get_event_loop() - future = loop.run_in_executor(None, self._download) + reader = await self.cog_reader_pool.get_reader(self.cog, self.tms) + partial_download = partial(self._download, reader) + future = loop.run_in_executor(None, partial_download) return await asyncio.wrap_future(future) - def _download(self): - try: - dowloaded_data = download_tile(self.tms, self.x, self.y, self.z, self.cog, self.resampling_method) - if dowloaded_data is not None: - self.data = dowloaded_data - self.processed_data = self.cog_processor.process(self) - else: - self.is_out_of_bounds = True - except TileOutsideBounds: + def _download(self, reader: CogReader): + dowloaded_data = reader.download_tile(self.x, self.y, self.z, self.resampling_method) + + if dowloaded_data is not None: + self.data = dowloaded_data + self.processed_data = self.cog_processor.process(self) + else: self.is_out_of_bounds = True + + reader.return_reader() diff --git a/ctod/core/factory/terrain_factory.py b/ctod/core/factory/terrain_factory.py index d300038..19dfff5 100644 --- a/ctod/core/factory/terrain_factory.py +++ b/ctod/core/factory/terrain_factory.py @@ -47,7 +47,7 @@ async def handle_request(self, terrain_request: TerrainRequest) -> asyncio.Futur if wanted_file.key not in self.cache: if wanted_file.key not in [item[0].key for item in self.processing_queue._queue]: # Add a new request to the processing queue which handles the download of the cog data - cog_request = CogRequest(wanted_file.tms, wanted_file.cog, wanted_file.z, wanted_file.x, wanted_file.y, wanted_file.cog_processor, wanted_file.resampling_method, wanted_file.generate_normals) + cog_request = CogRequest(wanted_file.tms, wanted_file.cog, wanted_file.z, wanted_file.x, wanted_file.y, wanted_file.cog_processor, wanted_file.cog_reader_pool, wanted_file.resampling_method, wanted_file.generate_normals) await self.processing_queue.put((cog_request,)) # If the data is already available, set it in the wanted file diff --git a/ctod/core/terrain/terrain_request.py b/ctod/core/terrain/terrain_request.py index 32d2314..4aa270d 100644 --- a/ctod/core/terrain/terrain_request.py +++ b/ctod/core/terrain/terrain_request.py @@ -1,6 +1,7 @@ import asyncio from morecantile import TileMatrixSet +from ctod.core.cog.cog_reader_pool import CogReaderPool from ctod.core.cog.processor.cog_processor import CogProcessor from ctod.core.cog.cog_request import CogRequest from ctod.core.terrain.generator.terrain_generator import TerrainGenerator @@ -12,7 +13,7 @@ class TerrainRequest: """Request for a terrain tile""" - def __init__(self, tms: TileMatrixSet, cog: str, z: int, x: int, y: int, resampling_method: str, cog_processor: CogProcessor, terrain_generator: TerrainGenerator, generate_normals = False): + def __init__(self, tms: TileMatrixSet, cog: str, z: int, x: int, y: int, resampling_method: str, cog_processor: CogProcessor, terrain_generator: TerrainGenerator, cog_reader_pool: CogReaderPool, generate_normals = False): self.tms = tms self.cog = cog self.z = z @@ -22,6 +23,7 @@ def __init__(self, tms: TileMatrixSet, cog: str, z: int, x: int, y: int, resampl self.cog_processor = cog_processor self.terrain_generator = terrain_generator self.generate_normals = generate_normals + self.cog_reader_pool = cog_reader_pool self.wanted_files = [] self._generate_wanted_files() self.key = generate_cog_cache_key(self.cog, self.z, self.x, self.y) @@ -136,9 +138,9 @@ def _generate_wanted_files(self): which are the adjecent tiles and the main tile """ - self.wanted_files.append(CogRequest(self.tms, self.cog, self.z, self.x, self.y, self.cog_processor, self.resampling_method, self.generate_normals)) + self.wanted_files.append(CogRequest(self.tms, self.cog, self.z, self.x, self.y, self.cog_processor, self.cog_reader_pool, self.resampling_method, self.generate_normals)) neighbour_tiles = get_neighbor_tiles(self.tms, self.x, self.y, self.z) for tile in neighbour_tiles: - self.wanted_files.append(CogRequest(self.tms, self.cog, tile.z, tile.x, tile.y, self.cog_processor, self.resampling_method, self.generate_normals)) + self.wanted_files.append(CogRequest(self.tms, self.cog, tile.z, tile.x, tile.y, self.cog_processor, self.cog_reader_pool, self.resampling_method, self.generate_normals)) \ No newline at end of file diff --git a/ctod/handlers/terrain.py b/ctod/handlers/terrain.py index f095ece..1c5c85c 100644 --- a/ctod/handlers/terrain.py +++ b/ctod/handlers/terrain.py @@ -24,6 +24,7 @@ def __init__(self, application, request, **kwargs): self.terrain_factory = kwargs.pop('terrain_factory') self.cog_processor = kwargs.pop('cog_processor') self.tile_cache_path = kwargs.pop('tile_cache_path') + self.cog_reader_pool = kwargs.pop('cog_reader_pool') super(TerrainHandler, self).__init__(application, request, **kwargs) async def get(self, z: int, x: int, y: int): @@ -63,7 +64,7 @@ async def get(self, z: int, x: int, y: int): return terrain_generator = self._get_terrain_generator(meshing_method) - self.terrain_request = TerrainRequest(tms, cog, z, x, y, resampling_method, self.cog_processor, terrain_generator, extensions["octvertexnormals"]) + self.terrain_request = TerrainRequest(tms, cog, z, x, y, resampling_method, self.cog_processor, terrain_generator, self.cog_reader_pool, extensions["octvertexnormals"]) quantized = await self.terrain_factory.handle_request(self.terrain_request) self._try_save_tile_to_cache(cog, meshing_method, resampling_method, z, x, y, quantized) diff --git a/ctod/server.py b/ctod/server.py index a437b2b..ac2627b 100644 --- a/ctod/server.py +++ b/ctod/server.py @@ -2,6 +2,8 @@ import logging import quantized_mesh_encoder.occlusion +from ctod.core import utils +from ctod.core.cog.cog_reader_pool import CogReaderPool from ctod.core.cog.processor.cog_processor_quantized_mesh_grid import CogProcessorQuantizedMeshGrid from ctod.core.factory.terrain_factory import TerrainFactory from ctod.handlers.index import IndexHandler @@ -11,11 +13,6 @@ from tornado import web -def patch_occlusion(): - """monkey patch quantized_mesh_encoder.occlusion with our own compute_magnitude""" - - quantized_mesh_encoder.occlusion.compute_magnitude = compute_magnitude - def log_request(handler): logging.debug("%d %s %.2fms", handler.get_status(), @@ -25,9 +22,10 @@ def log_request(handler): def make_server(tile_cache_path: str = None): """Create a Tornado web server.""" - patch_occlusion() + _patch_occlusion() terrain_factory = TerrainFactory() cog_processor_mesh_grid = CogProcessorQuantizedMeshGrid() + cog_reader_pool = CogReaderPool() # Start the periodic cache check in the background asyncio.ensure_future(terrain_factory.start_periodic_check()) @@ -42,6 +40,7 @@ def make_server(tile_cache_path: str = None): dict( terrain_factory=terrain_factory, cog_processor=cog_processor_mesh_grid, + cog_reader_pool=cog_reader_pool, tile_cache_path=tile_cache_path ), ), @@ -49,4 +48,9 @@ def make_server(tile_cache_path: str = None): template_path="./ctod/templates", static_path="./ctod/templates/static", log_function=log_request, - ) \ No newline at end of file + ) + +def _patch_occlusion(): + """monkey patch quantized_mesh_encoder.occlusion with our own compute_magnitude""" + + quantized_mesh_encoder.occlusion.compute_magnitude = compute_magnitude