Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: move ViccNormalizerDataExtension to mappings #425

Draft
wants to merge 3 commits into
base: issue-419
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions src/metakb/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from neo4j import Driver, ManagedTransaction

from metakb.database import get_driver
from metakb.normalizers import VICC_NORMALIZER_DATA, ViccDiseaseNormalizerData
from metakb.transformers.base import TherapyType
from metakb.transformers.base import NORMALIZER_PRIORITY_EXT_NAME, TherapyType

_logger = logging.getLogger(__name__)

Expand All @@ -33,34 +32,41 @@ def _create_parameterized_query(
def _add_mappings_and_exts_to_obj(obj: dict, obj_keys: list[str]) -> None:
"""Get mappings and extensions from object and add to `obj` and `obj_keys`

:param obj: Object to update with mappings and extensions (if found)
:param obj: Object to update with mappings and extensions (if found).
If ``obj`` has Disease, Gene, or Therapy ``conceptType``, then ``normalizer_id``
will also be added.
:param obj_keys: Parameterized queries. This will be mutated if mappings and
extensions exists
"""
mappings = obj.get("mappings", [])
if mappings:
concept_type = obj.get("conceptType")
if concept_type in {"Disease", "Gene", "Therapy"}:
normalizer_id = None
for mapping in obj["mappings"]:
extensions = mapping.get("extensions") or []
for ext in extensions:
if ext["name"] == NORMALIZER_PRIORITY_EXT_NAME and ext["value"]:
normalizer_id = mapping["coding"]["code"]
obj["normalizer_id"] = normalizer_id
obj_keys.append("normalizer_id:$normalizer_id")
break

if normalizer_id:
break

obj["mappings"] = json.dumps(mappings)
obj_keys.append("mappings:$mappings")

extensions = obj.get("extensions", [])
for ext in extensions:
if ext["name"] == VICC_NORMALIZER_DATA:
for normalized_field in ViccDiseaseNormalizerData.model_fields:
normalized_val = ext["value"].get(normalized_field)
if normalized_val is None:
continue

name = f"normalizer_{normalized_field}"
obj[name] = normalized_val
obj_keys.append(f"{name}:${name}")
name = "_".join(ext["name"].split()).lower()
val = ext["value"]
if isinstance(val, (dict | list)):
obj[name] = json.dumps(val)
else:
name = "_".join(ext["name"].split()).lower()
val = ext["value"]
if isinstance(val, (dict | list)):
obj[name] = json.dumps(val)
else:
obj[name] = val
obj_keys.append(f"{name}:${name}")
obj[name] = val
obj_keys.append(f"{name}:${name}")


def _add_method(tx: ManagedTransaction, method: dict, ids_in_stmts: set[str]) -> None:
Expand Down
24 changes: 0 additions & 24 deletions src/metakb/normalizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os
from collections.abc import Iterable
from enum import Enum
from typing import Literal

from botocore.exceptions import TokenRetrievalError
from disease.cli import update_db as update_disease_db
Expand All @@ -23,7 +22,6 @@
from gene.database.database import AWS_ENV_VAR_NAME as GENE_AWS_ENV_VAR_NAME
from gene.query import QueryHandler as GeneQueryHandler
from gene.schemas import NormalizeService as NormalizedGene
from pydantic import BaseModel
from therapy.cli import update_normalizer_db as update_therapy_db
from therapy.database import create_db as create_therapy_db
from therapy.database.database import AWS_ENV_VAR_NAME as THERAPY_AWS_ENV_VAR_NAME
Expand All @@ -44,28 +42,6 @@
_logger = logging.getLogger(__name__)


class ViccNormalizerData(BaseModel, extra="forbid"):
"""Define model for representing VICC normalizer data"""

id: str
label: str


class ViccDiseaseNormalizerData(ViccNormalizerData, extra="forbid"):
"""Define model for representing VICC disease normalizer data"""

mondo_id: str | None = None


VICC_NORMALIZER_DATA = "vicc_normalizer_data"


class ViccNormalizerDataExtension(Extension):
"""Define model for representing VICC normalizer data as an Extension"""

name: Literal["vicc_normalizer_data"] = VICC_NORMALIZER_DATA


class ViccNormalizers:
"""Manage VICC concept normalization services.

Expand Down
40 changes: 11 additions & 29 deletions src/metakb/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

from metakb.database import get_driver
from metakb.normalizers import (
ViccDiseaseNormalizerData,
ViccNormalizerData,
ViccNormalizerDataExtension,
ViccNormalizers,
)
from metakb.schemas.api import (
Expand Down Expand Up @@ -577,41 +574,23 @@ def _get_nested_stmt(

return PROP_TYPE_TO_CLASS[prop_type](**params)

@staticmethod
def _get_vicc_normalizer_extension(node: dict) -> ViccNormalizerDataExtension:
"""Get VICC Normalizer extension data

:param node: Therapy, disease, or gene node data
:return: VICC Normalizer extension data
"""
params = {
"id": node["normalizer_id"],
"label": node["normalizer_label"],
}

if node["conceptType"] == "Disease":
params["mondo_id"] = node.get("normalizer_mondo_id")
ext_val = ViccDiseaseNormalizerData(**params)
else:
ext_val = ViccNormalizerData(**params)

return ViccNormalizerDataExtension(value=ext_val.model_dump())

def _get_disease(self, node: dict) -> MappableConcept:
"""Get disease data from a node with relationship ``HAS_TUMOR_TYPE``

:param node: Disease node data
:return: Disease mappable concept object
"""
node["mappings"] = _deserialize_field(node, "mappings")
extensions = [self._get_vicc_normalizer_extension(node)]
extensions = []
descr = node.get("description")
if descr:
extensions.append(Extension(name="description", value=descr))
aliases = node.get("aliases")
if aliases:
extensions.append(Extension(name="aliases", value=json.loads(aliases)))
node["extensions"] = extensions

if extensions:
node["extensions"] = extensions
return MappableConcept(**node)

def _get_variations(self, cv_id: str, relation: VariationRelation) -> list[dict]:
Expand Down Expand Up @@ -732,15 +711,16 @@ def _get_gene_context_qualifier(self, statement_id: str) -> MappableConcept | No

gene_node = results.records[0].data()["g"]
gene_node["mappings"] = _deserialize_field(gene_node, "mappings")
extensions = [self._get_vicc_normalizer_extension(gene_node)]
extensions = []
descr = gene_node.get("description")
if descr:
extensions.append(Extension(name="description", value=descr))
aliases = gene_node.get("aliases")
if aliases:
extensions.append(Extension(name="aliases", value=json.loads(aliases)))

gene_node["extensions"] = extensions
if extensions:
gene_node["extensions"] = extensions
return MappableConcept(**gene_node)

def _get_method_document(self, method_id: str) -> Document | None:
Expand Down Expand Up @@ -896,7 +876,7 @@ def _get_therapy(self, in_ta_params: dict) -> MappableConcept:
"""
ta_params = copy(in_ta_params)
ta_params["mappings"] = _deserialize_field(ta_params, "mappings")
extensions = [self._get_vicc_normalizer_extension(ta_params)]
extensions = []
regulatory_approval = ta_params.get("regulatory_approval")
if regulatory_approval:
regulatory_approval = json.loads(regulatory_approval)
Expand All @@ -906,7 +886,9 @@ def _get_therapy(self, in_ta_params: dict) -> MappableConcept:
aliases = ta_params.get("aliases")
if aliases:
extensions.append(Extension(name="aliases", value=json.loads(aliases)))
ta_params["extensions"] = extensions

if extensions:
ta_params["extensions"] = extensions
return MappableConcept(**ta_params)

async def batch_search_statements(
Expand Down
73 changes: 56 additions & 17 deletions src/metakb/transformers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
from metakb import APP_ROOT, DATE_FMT
from metakb.harvesters.base import _HarvestedData
from metakb.normalizers import (
ViccDiseaseNormalizerData,
ViccNormalizerData,
ViccNormalizerDataExtension,
ViccNormalizers,
)
from metakb.schemas.app import SourceName
Expand All @@ -57,6 +54,9 @@
NormalizedGene: "gene",
}

# Normalizer priority extension name
NORMALIZER_PRIORITY_EXT_NAME = "vicc_normalizer_priority"


class EcoLevel(str, Enum):
"""Define constraints for Evidence Ontology levels"""
Expand Down Expand Up @@ -526,34 +526,73 @@ def _add_therapy(
return therapy

@staticmethod
def _get_vicc_normalizer_extension(
def _get_vicc_normalizer_mappings(
normalized_id: str,
normalizer_resp: NormalizedDisease | NormalizedTherapy | NormalizedGene,
) -> ViccNormalizerDataExtension:
"""Get VICC Normalizer extension data
) -> list[ConceptMapping]:
"""Get VICC Normalizer mappable concept

:param normalized_id: Normalized ID from VICC normalizer
:param normalizer_resp: Response from VICC normalizer
:return: VICC Normalizer extension data
:return: List of VICC Normalizer data represented as mappable concept
"""
attr_name = NORMALIZER_INSTANCE_TO_ATTR[type(normalizer_resp)]
normalizer_resp_obj = getattr(normalizer_resp, attr_name)

params = {"id": normalized_id, "label": normalizer_resp_obj.label}
def _add_merged_id_ext(
mapping: ConceptMapping,
is_priority: bool,
label: str | None = None,
) -> Extension:
"""Update ``mapping`` to include extension on whether mapping is from merged identifier

:param mapping: ConceptMapping from vicc normalizer. This will be mutated.
:param is_priority: ``True`` if concept mapping contains primaryCode that
matches merged record primaryCode. ``False`` otherwise (meaning it comes
from merged record mappings)
:param label: Merged concept label, if found
:return: ConceptMapping with normalizer extension added
"""
merged_id_ext = Extension(
name=NORMALIZER_PRIORITY_EXT_NAME, value=is_priority
)
if mapping.extensions:
mapping.extensions.append(merged_id_ext)
else:
mapping.extensions = [merged_id_ext]

if label:
mapping.coding.label = label

return mapping

mappings: list[ConceptMapping] = []
attr_name = NORMALIZER_INSTANCE_TO_ATTR[type(normalizer_resp)]
normalizer_resp_obj = getattr(normalizer_resp, attr_name)
normalizer_mappings = normalizer_resp_obj.mappings or []
if isinstance(normalizer_resp, NormalizedDisease):
mappings = normalizer_resp_obj.mappings or []
for mapping in mappings:
for mapping in normalizer_mappings:
if (
DISEASE_SYSTEM_URI_TO_NAMESPACE.get(mapping.coding.system)
== DiseaseNamespacePrefix.MONDO.value
):
params["mondo_id"] = mapping.coding.code.root
break
ext_val = ViccDiseaseNormalizerData(**params)
mappings.append(_add_merged_id_ext(mapping, is_priority=False))
else:
if normalized_id == mapping.coding.code.root:
mappings.append(
_add_merged_id_ext(
mapping,
label=normalizer_resp_obj.label,
is_priority=True,
)
)
else:
ext_val = ViccNormalizerData(**params)
return ViccNormalizerDataExtension(value=ext_val.model_dump())
mappings.extend(
_add_merged_id_ext(
mapping, label=normalizer_resp_obj.label, is_priority=True
)
for mapping in normalizer_mappings
if normalized_id == mapping.coding.code.root
)
return mappings

def create_json(self, cdm_filepath: Path | None = None) -> None:
"""Create a composite JSON for transformed data.
Expand Down
Loading
Loading