From 2ab941005d8114fa8491259510171f51cba8a0c7 Mon Sep 17 00:00:00 2001 From: mrsmrynk Date: Sun, 3 Mar 2024 18:49:19 +0100 Subject: [PATCH] refactor --- requirements.txt | 2 + requirements_dev.txt | 2 + src/postprocessing/__init__.py | 2 +- src/postprocessing/postprocessor.py | 366 +++++++++++++++++----------- 4 files changed, 230 insertions(+), 142 deletions(-) diff --git a/requirements.txt b/requirements.txt index 6dd01f4..3b7af4b 100755 --- a/requirements.txt +++ b/requirements.txt @@ -5,9 +5,11 @@ onnxruntime==1.16.3 OWSLib==0.29.3 pandas==2.1.4 Pillow==10.1.0 +pyarrow==15.0.0 pydantic==2.5.2 PyYAML==6.0.1 rasterio==1.3.9 rtree==1.1.0 Shapely==2.0.2 +simplification==0.7.10 topojson==1.7 \ No newline at end of file diff --git a/requirements_dev.txt b/requirements_dev.txt index 9936e2c..c7518fa 100755 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -7,6 +7,7 @@ onnxruntime==1.16.3 OWSLib==0.29.3 pandas==2.1.4 Pillow==10.1.0 +pyarrow==15.0.0 pydantic==2.5.2 pytest==7.4.3 pytest-cov==4.1.0 @@ -14,4 +15,5 @@ PyYAML==6.0.1 rasterio==1.3.9 rtree==1.1.0 Shapely==2.0.2 +simplification==0.7.10 topojson==1.7 \ No newline at end of file diff --git a/src/postprocessing/__init__.py b/src/postprocessing/__init__.py index 14ad6c6..a4468f4 100644 --- a/src/postprocessing/__init__.py +++ b/src/postprocessing/__init__.py @@ -1 +1 @@ -from .postprocessor import * +from .postprocessor import Postprocessor diff --git a/src/postprocessing/postprocessor.py b/src/postprocessing/postprocessor.py index d8d48c7..59fbb37 100644 --- a/src/postprocessing/postprocessor.py +++ b/src/postprocessing/postprocessor.py @@ -1,240 +1,324 @@ import re -from collections import OrderedDict from pathlib import Path import geopandas as gpd import numpy as np +import numpy.typing as npt import pandas as pd import rasterio as rio import rasterio.features import topojson as tp -from shapely.geometry import box as Box # PEP8 compliant -from shapely.geometry import Polygon +from shapely.geometry import box, Polygon -import src.utils.settings as settings +from src.utils.settings import RESOLUTION pd.options.mode.chained_assignment = None class Postprocessor: - ATTRIBUTES = OrderedDict() - ATTRIBUTES['class'] = 'str:7' - CLASS_MAP = {1: 'Hochbau', 2: 'Tiefbau'} - SCHEMA = {'properties': ATTRIBUTES, - 'geometry': 'Polygon'} + FIELD = 'class' def __init__(self, - output_dir_path, - bounding_box, - epsg_code, - boundary_gdf): + path_output_dir: Path, + epsg_code: int) -> None: """ - | Constructor method + | Initializer method - :param str or Path output_dir_path: path to the output directory - :param (int, int, int, int) bounding_box: bounding box (x_min, y_min, x_max, y_max) - :param int epsg_code: epsg code of the coordinate reference system - :param gpd.GeoDataFrame or None boundary_gdf: boundary geodataframe + :param path_output_dir: path to the output directory + :param epsg_code: epsg code :returns: None - :rtype: None """ - self.cached_tiles_dir_path = Path(output_dir_path) / 'cached_tiles' - self.x_min, self.y_min, self.x_max, self.y_max = bounding_box + assert isinstance(path_output_dir, Path) + + assert isinstance(epsg_code, int) + + self.path_tiles_processed_dir = path_output_dir / 'tiles_processed' self.epsg_code = epsg_code - self.boundary_gdf = boundary_gdf - def create_cached_tiles_dir(self): + @staticmethod + def vectorize_mask(mask: npt.NDArray[np.uint8], + coordinates: tuple[int, int]) -> list[dict[str, dict[str, any]]]: """ - | Creates a cached_tiles directory in the output directory. + | Returns the features of the mask. + | The features implement the __geo_interface__ protocol. - :returns: None - :rtype: None + :param mask: mask + :param coordinates: coordinates (x_min, y_max) + :returns: features """ - self.cached_tiles_dir_path.mkdir(exist_ok=True) + assert isinstance(mask, np.ndarray) + assert mask.dtype == np.uint8 + assert mask.ndim == 2 + + assert isinstance(coordinates, tuple) + assert len(coordinates) == 2 + assert all(isinstance(coordinate, int) for coordinate in coordinates) + + x_min, y_max = coordinates + + transform = rio.transform.from_origin(west=x_min, + north=y_max, + xsize=RESOLUTION, + ysize=RESOLUTION) + + features = [{'properties': {Postprocessor.FIELD: int(value)}, + 'geometry': polygon} + for polygon, value in rio.features.shapes(mask, transform=transform) + if int(value) != 0] + + return features def export_features(self, - features, - coordinates): + features: list[dict[str, dict[str, any]]], + coordinates: tuple[int, int]) -> None: """ - | Exports features of a tile as a shape file (.shp) in a subdirectory to the cached_tiles directory. - | Each subdirectory name is in the following schema: x_y + | Exports the features of the tile as a feather file to a subdirectory in the tiles_processed directory. - :param list[dict[str, dict[str, Any]]] features: features - :param (int, int) coordinates: coordinates (x, y) + :param features: features + :param coordinates: coordinates (x_min, y_max) :returns: None - :rtype: None """ - sub_dir_path = self.cached_tiles_dir_path / f'{coordinates[0]}_{coordinates[1]}' - sub_dir_path.mkdir(exist_ok=True) + assert isinstance(features, list) + + if __debug__: + for features_element in features: + assert isinstance(features_element, dict) + assert 'properties' in features_element + assert isinstance(features_element['properties'], dict) + assert Postprocessor.FIELD in features_element['properties'] + assert isinstance(features_element['properties'][Postprocessor.FIELD], int) + assert 'geometry' in features_element + assert isinstance(features_element['geometry'], dict) + + assert isinstance(coordinates, tuple) + assert len(coordinates) == 2 + assert all(isinstance(coordinate, int) for coordinate in coordinates) + + x_min, y_max = coordinates + + path_tile_processed_dir = self.path_tiles_processed_dir / f'{x_min}_{y_max}' + + path_tile_processed_dir.mkdir(exist_ok=True) - for path in sub_dir_path.iterdir(): + for path in path_tile_processed_dir.iterdir(): path.unlink() if features: - shape_file_path = sub_dir_path / f'{coordinates[0]}_{coordinates[1]}.shp' - gdf = gpd.GeoDataFrame.from_features(features, crs=f'EPSG:{self.epsg_code}') - gdf.to_file(str(shape_file_path), schema=Postprocessor.SCHEMA) + path_tile_processed = path_tile_processed_dir / f'{x_min}_{y_max}.feather' - def vectorize_mask(self, - mask, - coordinates): - """ - | Exports a shape file (.shp) of the polygons in the vectorized mask given its coordinates - of the top left corner in a subdirectory to the cached_tiles directory. - - :param np.ndarray[np.uint8] mask: mask - :param (int, int) coordinates: coordinates (x, y) - :returns: None - :rtype: None - """ - transform = rio.transform.from_origin(west=coordinates[0], - north=coordinates[1], - xsize=settings.RESOLUTION, - ysize=settings.RESOLUTION) - vectorized_mask = rio.features.shapes(mask, transform=transform) + gdf = gpd.GeoDataFrame.from_features(features=features, + crs=f'EPSG:{self.epsg_code}') - features = [{'properties': {'class': Postprocessor.CLASS_MAP.get(int(value))}, 'geometry': shape} - for shape, value in vectorized_mask if int(value) != 0] - self.export_features(features=features, - coordinates=coordinates) + gdf[Postprocessor.FIELD] = gdf[Postprocessor.FIELD].astype(np.uint8) + gdf.to_feather(path_tile_processed) - def concatenate_cached_tiles(self, coordinates): + def concatenate_processed_tiles(self, + coordinates: npt.NDArray[np.int32], + value_map: dict[int, str] = None) -> gpd.GeoDataFrame: """ - | Returns a geodataframe of the concatenated cached tiles. + | Returns a geodataframe of the concatenated processed tiles. - :param np.ndarray[np.int32] coordinates: coordinates (x_min, y_max) of each tile - :returns: concatenated cached tiles - :rtype: gpd.GeoDataFrame + :param coordinates: coordinates (x_min, y_max) of each tile + :param value_map: value map + :returns: concatenated processed tiles """ - assert isinstance(coordinates, np.ndarray) and coordinates.dtype == np.int32 - assert len(coordinates.shape) == 2 + assert isinstance(coordinates, np.ndarray) + assert coordinates.dtype == np.int32 + assert coordinates.ndim == 2 assert coordinates.shape[-1] == 2 - cached_tiles = [] + assert isinstance(value_map, dict) or value_map is None + + if value_map is not None: + assert all(isinstance(value_int, int) for value_int in value_map.keys()) + assert all(isinstance(value_str, str) for value_str in value_map.values()) + + tiles_processed = [] pattern = re.compile(r'^(-?\d+)_(-?\d+)$') - for path_cached_tile_dir in self.cached_tiles_dir_path.iterdir(): - match = pattern.search(path_cached_tile_dir.name) + for path_tile_processed_dir in self.path_tiles_processed_dir.iterdir(): + if not path_tile_processed_dir.is_dir(): + continue + + match = pattern.search(path_tile_processed_dir.name) if match: x_min = int(match.group(1)) y_max = int(match.group(2)) - coordinates_cached = np.array([x_min, y_max], dtype=np.int32) + coordinates_processed = np.array([x_min, y_max], dtype=np.int32) - if np.any(np.all(coordinates == coordinates_cached, axis=1)): - path_cached_tile = (self.cached_tiles_dir_path / - f'{x_min}_{y_max}' / - f'{x_min}_{y_max}.gpkg') + if np.any(np.all(coordinates == coordinates_processed, axis=1)): + path_tile_processed = (self.path_tiles_processed_dir / + f'{x_min}_{y_max}' / + f'{x_min}_{y_max}.feather') - if path_cached_tile.is_file(): - cached_tile = gpd.read_file(path_cached_tile) - cached_tiles.append(cached_tile) + if path_tile_processed.is_file(): + tile_processed = gpd.read_feather(path_tile_processed) + tiles_processed.append(tile_processed) + else: + pass # TODO: log warning - cached_tiles_concatenated = gpd.GeoDataFrame(pd.concat(cached_tiles, ignore_index=True), - crs=f'EPSG:{self.epsg_code}') + tiles_processed_concatenated = gpd.GeoDataFrame(pd.concat(tiles_processed, ignore_index=True), + crs=f'EPSG:{self.epsg_code}') - return cached_tiles_concatenated + if value_map is not None: + tiles_processed_concatenated[Postprocessor.FIELD] = ( + tiles_processed_concatenated[Postprocessor.FIELD].map(value_map).astype('category')) + + return tiles_processed_concatenated @staticmethod - def sieve_gdf(gdf, sieve_size): + def sieve_gdf(gdf: gpd.GeoDataFrame, + sieve_size: int) -> gpd.GeoDataFrame: """ - | Returns a sieved geodataframe. + | Returns the sieved geodataframe. - :param gpd.GeoDataFrame gdf: geodataframe - :param int sieve_size: sieve size in square meters (minimum area of polygons to retain) + :param gdf: geodataframe + :param sieve_size: sieve size in square meters (minimum area of polygons to retain) :returns: sieved geodataframe - :rtype: gpd.GeoDataFrame """ + assert isinstance(gdf, gpd.GeoDataFrame) + if gdf.empty: return gdf - mask = gdf.area >= sieve_size - sieved_gdf = gdf.loc[mask] - sieved_gdf.reset_index(drop=True, + assert all(gdf['geometry'].geom_type == 'Polygon') + assert all(gdf['geometry'].is_valid) + + assert isinstance(sieve_size, int) + assert sieve_size >= 0 + + if sieve_size == 0: + return gdf + + gdf_sieved = gdf[gdf['geometry'].area >= sieve_size] + + gdf_sieved.reset_index(drop=True, inplace=True) - return sieved_gdf + + return gdf_sieved @staticmethod - def fill_polygon(polygon, hole_size): + def fill_polygon(polygon: Polygon, + hole_size: int) -> Polygon: """ - | Returns a polygon without holes. - | - | Based on: - | https://gis.stackexchange.com/a/409398 + | Returns the polygon without holes. - :param Polygon polygon: polygon - :param int hole_size: hole size in square meters (maximum area of holes in the polygons to retain) + :param polygon: polygon + :param hole_size: hole size in square meters (maximum area of holes in the polygons to retain) :returns: filled polygon - :rtype: Polygon - """ - if polygon.interiors: - interiors = [] - for interior in polygon.interiors: - polygon_interior = Polygon(interior) - if polygon_interior.area >= hole_size: - interiors.append(interior) - return Polygon(polygon.exterior.coords, holes=interiors) - else: + """ + assert isinstance(polygon, Polygon) + assert polygon.is_valid + + if not polygon.interiors: return polygon + assert isinstance(hole_size, int) + assert hole_size >= 0 + + if hole_size == 0: + return Polygon(polygon.exterior.coords) + + interiors = [interior + for interior in polygon.interiors + if Polygon(interior).area >= hole_size] + + return Polygon(polygon.exterior.coords, holes=interiors) + @staticmethod - def fill_gdf(gdf, hole_size): + def fill_gdf(gdf: gpd.GeoDataFrame, + hole_size: int) -> gpd.GeoDataFrame: """ - | Returns a geodataframe without holes in the polygons. - | The hole size of buildings is doubled. - | - | Based on: - | https://gis.stackexchange.com/a/409398 - | https://stackoverflow.com/a/61466689 + | Returns the filled geodataframe. - :param gpd.GeoDataFrame gdf: geodataframe - :param int hole_size: hole size in square meters (maximum area of holes in the polygons to retain) + :param gdf: geodataframe + :param hole_size: hole size in square meters (maximum area of holes in the polygons to retain) :returns: filled geodataframe - :rtype: gpd.GeoDataFrame """ + assert isinstance(gdf, gpd.GeoDataFrame) + if gdf.empty: return gdf - gdf['geometry'] = gdf.apply(lambda x: - Postprocessor.fill_polygon(x['geometry'], - hole_size=2 * hole_size) - if x['class'] == 'Hochbau' - else Postprocessor.fill_polygon(x['geometry'], - hole_size=hole_size), - axis=1) + assert all(gdf['geometry'].geom_type == 'Polygon') + assert all(gdf['geometry'].is_valid) + + assert isinstance(hole_size, int) + assert hole_size >= 0 + + gdf['geometry'] = gdf.apply(lambda x: Postprocessor.fill_polygon(x['geometry'], hole_size=hole_size), axis=1) + return gdf - def simplify_gdf(self, gdf): + def simplify_gdf(self, + gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ - | Returns a geodataframe with simplified polygons (Douglas-Peucker algorithm is used). + | Returns the simplified geodataframe. + | The Douglas-Peucker algorithm is used. - :param gpd.GeoDataFrame gdf: geodataframe + :param gdf: geodataframe :returns: simplified geodataframe - :rtype: gpd.GeoDataFrame """ + assert isinstance(gdf, gpd.GeoDataFrame) + if gdf.empty: return gdf + assert all(gdf['geometry'].geom_type == 'Polygon') + assert all(gdf['geometry'].is_valid) + assert gdf.crs == f'EPSG:{self.epsg_code}' + topo = tp.Topology(gdf, prequantize=False) - simplified_gdf = topo.toposimplify(settings.RESOLUTION).to_gdf(crs=f'EPSG:{self.epsg_code}') - return simplified_gdf + topo_simplified = topo.toposimplify(epsilon=RESOLUTION, + simplify_with='simplification') - def clip_gdf(self, gdf): + return topo_simplified.to_gdf(crs=f'EPSG:{self.epsg_code}') + + def clip_gdf(self, + gdf: gpd.GeoDataFrame, + bounding_box: tuple[int, int, int, int], + boundary: gpd.GeoDataFrame = None) -> gpd.GeoDataFrame: """ - | Returns a clipped geodataframe. + | Returns the clipped geodataframe. - :param gpd.GeoDataFrame gdf: geodataframe + :param gdf: geodataframe + :param bounding_box: bounding box (x_min, y_min, x_max, y_max) + :param boundary: boundary :returns: clipped geodataframe - :rtype: gpd.GeoDataFrame """ - if self.boundary_gdf is None: - clipped_gdf = gpd.clip(gdf, - mask=Box(self.x_min, self.y_min, self.x_max, self.y_max), + assert isinstance(gdf, gpd.GeoDataFrame) + + if gdf.empty: + return gdf + + assert all(gdf['geometry'].geom_type == 'Polygon') + assert all(gdf['geometry'].is_valid) + assert gdf.crs == f'EPSG:{self.epsg_code}' + + assert isinstance(bounding_box, tuple) + assert len(bounding_box) == 4 + assert all(isinstance(coordinate, int) for coordinate in bounding_box) + + assert isinstance(boundary, gpd.GeoDataFrame) or boundary is None + + if boundary is not None: + assert not boundary.empty + assert all(boundary['geometry'].geom_type == 'Polygon') + assert all(boundary['geometry'].is_valid) + assert boundary.crs == f'EPSG:{self.epsg_code}' + + x_min, y_min, x_max, y_max = bounding_box + + if boundary is None: + gdf_clipped = gpd.clip(gdf, + mask=box(x_min, y_min, x_max, y_max), keep_geom_type=True).reset_index(drop=True) else: - clipped_gdf = gpd.clip(gdf, - mask=self.boundary_gdf['geometry'], + gdf_clipped = gpd.clip(gdf, + mask=boundary['geometry'], keep_geom_type=True).reset_index(drop=True) - return clipped_gdf + + return gdf_clipped