Skip to content

Commit

Permalink
Added converts, flushed out tests
Browse files Browse the repository at this point in the history
  • Loading branch information
abodeuis committed Jul 9, 2024
1 parent 88504b4 commit 07e325c
Show file tree
Hide file tree
Showing 11 changed files with 88,453 additions and 33 deletions.
94 changes: 94 additions & 0 deletions cdrhook/cdr_endpoint_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from typing import List, Optional
from pydantic import BaseModel

# Returned by cog_system_versions endpoint
class SystemId(BaseModel):
name: str
version: str

class SystemVersionsEndpoint(BaseModel):
system_versions: List[SystemId]

# Returned by cog_area_extraction endpoint
class AreaExtractionCoords(BaseModel):
type: str
coordinates: List[List[List[float]]]

class AreaExtractionsEndpoint(BaseModel):
area_extraction_id : str
cog_id: str
reference_id: str
px_bbox : List[float]
px_geojson : AreaExtractionCoords
system :str
system_version :str
model_id : str
validated : bool
confidence : Optional[float] = None
category : str
text : str
projected_feature : List[str]

# Returned by cog_legend_items endpoint
class PxGeojson(BaseModel):
type: str
coordinates: List = []

class LegendItemsEndpoint(BaseModel):
legend_id: str
abbreviation: str
description: str
color: str
reference_id: str
label: str
pattern: str
px_bbox: List = []
px_geojson: PxGeojson
cog_id: str
category: str
system: str
system_version: str
_model_id: str
validated: bool
confidence: Optional[float] = None
map_unit_age_text: str
map_unit_lithology: str
map_unit_b_age: Optional[float] = None
map_unit_t_age: Optional[float] = None
point_extractions: List = []
polygon_extractions: List = []
line_extractions: List = []

# Returned by cog_metadata endpoint
class BestBoundsGeoJson(BaseModel):
type: str
coordinates: List[List[List[float]]]

class MetadataEndpoint(BaseModel):
citation: str
ngmdb_prod: str
scale: int
has_part_names: List[str]
ngmdb_item: int
cog_id: str
publisher: str
cog_url: str
provider_name: str
display_links_str: str
cog_size: int
authors: List[str]
provider_url: str
original_download_url: str
no_map: bool
thumbnail_url: str
state: Optional[str]
cog_name: str
publish_year: int
quadrangle: Optional[str]
alternate_name: str
keywords: List[str]
best_bounds_geojson: BestBoundsGeoJson
georeferenced_count : int
validated_count : int

# Map results endpoint is a cdr_schema map_result
5 changes: 0 additions & 5 deletions cdrhook/cdr_types.py

This file was deleted.

6 changes: 3 additions & 3 deletions cdrhook/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
from pydantic import BaseModel, Field, AnyUrl

class CdrConnector(BaseModel):
cdr_url : AnyUrl = Field(
default="https://api.cdr.land",
description="The URL of the CDR API")
system_name : str = Field(
description="The name of the system registering with the CDR")
system_version : str = Field(
Expand All @@ -27,6 +24,9 @@ class CdrConnector(BaseModel):
events : List[str] = Field(
default_factory=list,
description="The events to register for, leaving blank will register for all events")
cdr_url : AnyUrl = Field(
default="https://api.cdr.land",
description="The URL of the CDR API")
registration : Optional[str] = Field(
default=None,
description="The registration ID returned by the CDR")
Expand Down
54 changes: 54 additions & 0 deletions cdrhook/convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import List
from cdrhook.cdr_endpoint_schemas import AreaExtractionsEndpoint, LegendItemsEndpoint, MetadataEndpoint
from cmaas_utils.types import Legend, MapUnit, MapUnitType, Layout, CMAAS_MapMetadata, Provenance

# This would require a lot of effort to convert and don't think it will be used.
# def convert_cdr_schema_map_to_cmass_map(cdr_map:MapResults) -> CMAAS_Map:
# map_data = CMAAS_Map(name="", cog_id=cdr_map.cog_id)
# map_data.metadata =
# map_data.layout =
# map_data.legend =
# return

def convert_cdr_schema_metadata_to_cmass_map_metadata(cdr_metadata:MetadataEndpoint) -> CMAAS_MapMetadata:
map_metadata = CMAAS_MapMetadata(provenance=Provenance(name='CDR', version='0.3.3'))
map_metadata.title = cdr_metadata.cog_name
map_metadata.authors = cdr_metadata.authors
map_metadata.publisher = cdr_metadata.publisher
map_metadata.source_url = cdr_metadata.cog_url
map_metadata.year = cdr_metadata.publish_year
map_metadata.scale = cdr_metadata.scale
#map_metadata.map_color =
#map_metadata.map_shape =
#map_metadata.physiographic_region
return map_metadata

def convert_cdr_schema_legend_items_to_cmass_legend(cdr_legend:List[LegendItemsEndpoint]) -> Legend:
legend = Legend(provenance=Provenance(name=cdr_legend[0].system, version=cdr_legend[0].system_version))
for item in cdr_legend:
map_unit = MapUnit(type=MapUnitType.from_str(item.category.lower()))
map_unit.label = item.label
map_unit.abbreviation = item.abbreviation
map_unit.description = item.description
map_unit.color = item.color
map_unit.pattern = item.pattern
#map_unit.overlay =
map_unit.bounding_box = item.px_bbox
legend.features.append(map_unit)
return legend

def convert_cdr_schema_area_extraction_to_layout(cdr_area_extraction:List[AreaExtractionsEndpoint]) -> Layout:
layout = Layout(provenance=Provenance(name=cdr_area_extraction[0].system, version=cdr_area_extraction[0].system_version))
for area in cdr_area_extraction:
if area.category == 'map_area':
layout.map = area.px_geojson.coordinates
if area.category == 'line_point_legend_area':
layout.line_legend = area.px_geojson.coordinates
layout.point_legend = area.px_geojson.coordinates
if area.category == 'polygon_legend_area':
layout.polygon_legend = area.px_geojson.coordinates
if area.category == 'cross_section':
layout.cross_section = area.px_geojson.coordinates
if area.category == 'correlation_diagram':
layout.correlation_diagram = area.px_geojson.coordinates
return layout
35 changes: 21 additions & 14 deletions cdrhook/retrieve.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,58 @@
import logging
import requests
from pydantic import BaseModel
from cdrhook.connector import CdrConnector
from cdrhook.cdr_types import SystemId
from cdrhook.cdr_endpoint_schemas import SystemId

# Generic retrieval
def retrieve_json(connection:CdrConnector, endpoint_url:str, headers:dict=None):
def retrieve_endpoint(connection:CdrConnector, endpoint_url:str, headers:dict=None):
if headers is None:
headers = {'Authorization': f'Bearer {connection.token}'}
logging.debug(f"Retrieving {endpoint_url}")
r = requests.get(endpoint_url, headers=headers)
r.raise_for_status()
return r.json()

def validate_endpoint(response:dict, schema:BaseModel):
# Validate the response against the model
return schema.model_validate(response)

# region Cog Endpoints
def retrieve_cog_metadata(connection:CdrConnector, cog_id:str):
def retrieve_cog_metadata(connection:CdrConnector, cog_id:str) -> dict:
# Get cog info
endpoint_url = f"{connection.cdr_url}/v1/maps/cog/meta/{cog_id}"
return retrieve_json(connection, endpoint_url)
return retrieve_endpoint(connection, endpoint_url)

def retrieve_cog_results(connection:CdrConnector, cog_id:str):
def retrieve_cog_results(connection:CdrConnector, cog_id:str) -> dict:
# Get results for a cog
endpoint_url = f"{connection.cdr_url}/v1/maps/cog/{cog_id}/results"
return retrieve_json(connection, endpoint_url)
response_data = retrieve_endpoint(connection, endpoint_url)
response_data['cog_id'] = cog_id # Need to add cog_id to the response to conform to cdr_schema
return response_data

def retrieve_cog_system_versions(connection:CdrConnector, cog_id:str):
def retrieve_cog_system_versions(connection:CdrConnector, cog_id:str) -> dict:
# Get all system_versions for extraction types per cog
endpoint_url = f"{connection.cdr_url}/v1/features/{cog_id}/system_versions"
return retrieve_json(connection, endpoint_url)
return retrieve_endpoint(connection, endpoint_url)

def retrieve_cog_area_extraction(connection:CdrConnector, cog_id:str, system_id:SystemId=None):
def retrieve_cog_area_extraction(connection:CdrConnector, cog_id:str, system_id:SystemId=None) -> dict:
# Get all area extractions for a cog
endpoint_url = f"{connection.cdr_url}/v1/features/{cog_id}/area_extractions"
if system_id is not None:
endpoint_url += f"?system_version={system_id.name}__{system_id.version}"
return retrieve_json(connection, endpoint_url)
return retrieve_endpoint(connection, endpoint_url)

def retrieve_cog_legend_items(connection:CdrConnector, cog_id:str, system_id:SystemId=None):
def retrieve_cog_legend_items(connection:CdrConnector, cog_id:str, system_id:SystemId=None) -> dict:
# Get all legend items for a cog
endpoint_url = f"{connection.cdr_url}/v1/features/{cog_id}/legend_items"
if system_id is not None:
endpoint_url += f"?system_version={system_id.name}__{system_id.version}"
return retrieve_json(connection, endpoint_url)
return retrieve_endpoint(connection, endpoint_url)
# endregion Cog Endpoints

# region Event Endpoints
def retrieve_area_extraction_event(connection:CdrConnector, event_id:str):
def retrieve_area_extraction_event(connection:CdrConnector, event_id:str) -> dict:
endpoint_url = f"{connection.cdr_url}/v1/maps/extractions/{event_id}"
return retrieve_json(connection, endpoint_url)
return retrieve_endpoint(connection, endpoint_url)
# endregion Event Endpoints

15 changes: 15 additions & 0 deletions tests/data/mock_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from cdrhook.connector import CdrConnector

class MockConnector(CdrConnector):
# override
def register(self):
return '12345'
# override
def unregister(self):
return

def __eq__(self, other):
return self.__dict__ == other.__dict__

def get_mock_connector():
return MockConnector('mock_connector', '0.1', 'mock_token')
Loading

0 comments on commit 07e325c

Please sign in to comment.