Skip to content

Commit

Permalink
wip reuse configs so vrt/json files only needs to be downloaded once
Browse files Browse the repository at this point in the history
  • Loading branch information
tebben committed Jan 31, 2024
1 parent 9d3e0e1 commit 28cc2b8
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 175 deletions.
9 changes: 7 additions & 2 deletions ctod/core/cog/cog_reader_pool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import copy
import logging
from ctod.core.cog.dataset_configs import DatasetConfigs

from ctod.core.utils import get_dataset_type
from collections import defaultdict
Expand All @@ -20,6 +22,7 @@ def __init__(self, unsafe: bool = False, max_readers: int = 250):
max_readers (int, optional): Amount of max readers in memory per cog path. Defaults to 250.
"""

self.configs = DatasetConfigs()
self.unsafe = unsafe
self.max_readers = max_readers
self.readers = defaultdict(list)
Expand All @@ -40,10 +43,12 @@ async def get_reader(self, cog: str, tms: TileMatrixSet) -> CogReader:

async with self.lock:
if cog not in self.readers or len(self.readers[cog]) == 0:
config = self.configs.get_config(cog)

if type == "mosaic":
reader = CogReaderMosaic(self, cog, tms, self.unsafe)
reader = CogReaderMosaic(self, config, cog, tms, self.unsafe)
else:
reader = CogReader(self, cog, tms, self.unsafe)
reader = CogReader(self, config, cog, tms, self.unsafe)
else:
reader = self.readers[cog].pop()

Expand Down
119 changes: 119 additions & 0 deletions ctod/core/cog/dataset_configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import copy
import json
import os
import requests

from ctod.core.utils import get_dataset_type
from urllib.parse import urlparse, urljoin
from lxml import etree

class DatasetConfigs:
"""
ToDo: Make classes for config and add error checking
"""

def __init__(self):
self.cached_configs = {}

def get_config(self, dataset_path: str):
if dataset_path not in self.cached_configs:
config = self._create_config(dataset_path)
self.cached_configs[dataset_path] = config

return copy.deepcopy(self.cached_configs[dataset_path])

def _create_config(self, dataset_path: str):
type = get_dataset_type(dataset_path)
if type == "mosaic":
return self._create_mosaic_config(dataset_path)
elif type == "vrt":
return self._create_vrt_config(dataset_path)
else:
return self._create_default_config(dataset_path)

def _create_default_config(self, dataset_path: str):
return { "type": "cog", "path": dataset_path }

def _create_vrt_config(self, dataset_path: str):
if dataset_path.startswith(('http://', 'https://')):
# Fetch the VRT content
response = requests.get(dataset_path)
vrt_content = response.text

# Parse the VRT content
vrt_tree = etree.fromstring(vrt_content)

# Iterate over SourceFilename elements and modify them
for source_filename_elem in vrt_tree.xpath("//SourceFilename"):
# Set relativeToVRT attribute to 0
source_filename_elem.set("relativeToVRT", "0")

if source_filename_elem.text and not ("/vsicurl/" in source_filename_elem.text):
# Get the path of the source filename
source_path = source_filename_elem.text.strip()
base_url = self._get_base_url(dataset_path)
full_url = urljoin(base_url, source_path)

# Replace the path with the full URL using /vsicurl/
source_filename_elem.text = f"/vsicurl/{full_url}"

# Serialize the modified XML content back to string
modified_vrt_content = etree.tostring(vrt_tree, pretty_print=True, encoding='utf-8').decode()

return { "type": "vrt", "path": dataset_path, "vrt": modified_vrt_content }

else:
# Read the VRT content from a local file
with open(dataset_path, "r") as f:
vrt_content = f.read()

# Parse the VRT content
vrt_tree = etree.fromstring(vrt_content)

# Iterate over SourceFilename elements and modify them
for source_filename_elem in vrt_tree.xpath("//SourceFilename"):
# If source_filename_elem contains http or https, add /vsicurl/ in front of it
if source_filename_elem.text and not ("/vsicurl/" in source_filename_elem.text) and ("http://" in source_filename_elem.text or "https://" in source_filename_elem.text):
source_filename_elem.text = f"/vsicurl/{source_filename_elem.text.strip()}"

# Serialize the modified XML content back to string
modified_vrt_content = etree.tostring(vrt_tree, pretty_print=True, encoding='utf-8').decode()

return { "type": "vrt", "path": dataset_path, "vrt": modified_vrt_content }

def _create_mosaic_config(self, dataset_path: str):
if dataset_path.startswith(('http://', 'https://')):
try:
response = requests.get(dataset_path)
response.raise_for_status() # Raise an exception for HTTP errors
datasets_json = response.json()
except requests.RequestException as e:
print(f"Error fetching mosaic settings from {dataset_path}: {e}")
return {}
else:
# It's a local file, attempt to read JSON from it
if not os.path.exists(dataset_path):
print(f"Error: Local file {dataset_path} does not exist")
return {}
try:
with open(dataset_path, 'r') as f:
datasets_json = json.load(f)
except Exception as e:
print(f"Error reading mosaic settings from {dataset_path}: {e}")
return {}

# Alter paths if dataset is a URL
if dataset_path.startswith(('http://', 'https://')):
base_url = self._get_base_url(dataset_path)
for dataset in datasets_json.get("datasets", []):
path = dataset.get('path')
if path and not path.startswith(('http://', 'https://')):
absolute_path = urljoin(base_url, path)
dataset["path"] = absolute_path

datasets_json["type"] = "mosaic"
return datasets_json

def _get_base_url(self, url):
parsed_url = urlparse(url)
return f"{parsed_url.scheme}://{parsed_url.netloc}"
9 changes: 7 additions & 2 deletions ctod/core/cog/reader/cog_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ class CogReader:
avoid opening and closing the same file many times.
"""

def __init__(self, pool, cog: str, tms: TileMatrixSet, unsafe: bool = False):
def __init__(self, pool, config: Any, cog: str, tms: TileMatrixSet, unsafe: bool = False):
self.pool = pool
self.config = config
self.cog = cog
self.tms = tms
self.unsafe = unsafe
Expand Down Expand Up @@ -77,7 +78,11 @@ def return_reader(self):
def _set_rio_reader(self):
"""Get the reader for the COG."""

self.rio_reader = Reader(self.cog, tms=self.tms)
if self.config["type"] == "vrt":
logging.info(f"VRT: {self.config['vrt']}")
self.rio_reader = Reader(self.config["vrt"], tms=self.tms)
else:
self.rio_reader = Reader(self.cog, tms=self.tms)


def _set_safe_level(self):
Expand Down
32 changes: 4 additions & 28 deletions ctod/core/cog/reader/cog_reader_mosaic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ class CogReaderMosaic:
avoid opening and closing the same file many times.
"""

def __init__(self, pool, cog: str, tms: TileMatrixSet, unsafe: bool = False):
def __init__(self, pool, config: Any, cog: str, tms: TileMatrixSet, unsafe: bool = False):
self.pool = pool
self.config = config
self.cog = cog
self.tms = tms
self.unsafe = unsafe
self.last_used = time.time()
self._download_json(cog)

def close(self):
"""Close the reader."""
Expand Down Expand Up @@ -60,7 +60,7 @@ def download_tile(self, x: int, y: int, z: int, loop: asyncio.AbstractEventLoop,

#logging.info(f"{z} {x} {y} {len(datasets)}\n {datasets} \n {tile_bounds}")

if not self._tile_intersects(tile_bounds, self.dataset["extent"]) or len(datasets) == 0:
if not self._tile_intersects(tile_bounds, self.config["extent"]) or len(datasets) == 0:
return None

kwargs["resampling_method"] = resampling_method
Expand All @@ -80,7 +80,7 @@ def return_reader(self):

def _get_intersecting_datasets(self, tile_bounds: BoundingBox) -> list:
intersecting_datasets = []
for dataset in self.dataset["datasets"]:
for dataset in self.config["datasets"]:
if self._tile_intersects(tile_bounds, dataset["extent"]):
intersecting_datasets.append(dataset["path"])

Expand All @@ -104,27 +104,3 @@ def _tile_intersects(self, tile_bounds: BoundingBox, dataset_bounds: list) -> bo
return False

return True

def _download_json(self, json_url):
# Download the JSON file
response = requests.get(json_url)

# Load the JSON content
datasets_json = response.json()

# Extract base URL
base_url = self._get_base_url(json_url)

# Extract datasets and their geometries

for dataset in datasets_json["datasets"]:
path = dataset['path']
absolute_path = urljoin(base_url, path)
dataset["path"] = absolute_path

#self.dataset = datasets_json["info"]
self.dataset = datasets_json

def _get_base_url(self, url):
parsed_url = urlparse(url)
return f"{parsed_url.scheme}://{parsed_url.netloc}"
Loading

0 comments on commit 28cc2b8

Please sign in to comment.