Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add support for civic assertions #422

Draft
wants to merge 11 commits into
base: issue-415
Choose a base branch
from
4 changes: 3 additions & 1 deletion src/metakb/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _get_credentials(


_CONSTRAINTS = {
"coding_constraint": "CREATE CONSTRAINT coding_constraint IF NOT EXISTS FOR (c:Coding) REQUIRE (c.code, c.label, c.system) IS UNIQUE;",
"strength_constraint": "CREATE CONSTRAINT coding_constraint IF NOT EXISTS FOR (n:Strength) REQUIRE (n.label, n.primaryCode) IS UNIQUE;",
"gene_id_constraint": "CREATE CONSTRAINT gene_id_constraint IF NOT EXISTS FOR (n:Gene) REQUIRE n.id IS UNIQUE;",
"disease_id_constraint": "CREATE CONSTRAINT disease_id_constraint IF NOT EXISTS FOR (n:Disease) REQUIRE n.id IS UNIQUE;",
"therapy_id_constraint": "CREATE CONSTRAINT therapy_id_constraint IF NOT EXISTS FOR (n:Therapy) REQUIRE n.id IS UNIQUE;",
Expand All @@ -84,6 +84,8 @@ def _get_credentials(
"document_id_constraint": "CREATE CONSTRAINT document_id_constraint IF NOT EXISTS FOR (n:Document) REQUIRE n.id IS UNIQUE;",
"statement_id_constraint": "CREATE CONSTRAINT statement_id_constraint IF NOT EXISTS FOR (n:Statement) REQUIRE n.id IS UNIQUE;",
"method_id_constraint": "CREATE CONSTRAINT method_id_constraint IF NOT EXISTS FOR (n:Method) REQUIRE n.id IS UNIQUE;",
"classification_constraint": "CREATE CONSTRAINT classification_constraint IF NOT EXISTS FOR (n:Classification) REQUIRE n.primaryCode IS UNIQUE;",
"evidence_line_id_constraint": "CREATE CONSTRAINT evidence_line_id_constraint IF NOT EXISTS FOR (n:EvidenceLine) REQUIRE n.id IS UNIQUE;",
}


Expand Down
142 changes: 103 additions & 39 deletions src/metakb/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import logging
import uuid
from pathlib import Path

from neo4j import Driver, ManagedTransaction
Expand Down Expand Up @@ -394,28 +395,17 @@ def _add_obj_id_to_set(obj: dict, ids_set: set[str]) -> None:
return ids_in_stmts


def _add_statement(tx: ManagedTransaction, statement_in: dict) -> None:
"""Add statement node and its relationships
def _get_statement_query(statement: dict) -> str:
"""Generate the initial Cypher query to create a statement node and its
relationships, based on shared properties of evidence and assertion records.

:param tx: Transaction object provided to transaction functions
:param statement_in: Statement CDM object
:param statement: Statement record
:return: The base Cypher query string for creating the statement node and
relationships
"""
statement = statement_in.copy()
statement_type = statement["type"]
statement_keys = _create_parameterized_query(
statement, ("id", "description", "direction", "type")
)

match_line = ""
rel_line = ""

is_reported_in_docs = statement.get("reportedIn", [])
for ri_doc in is_reported_in_docs:
ri_doc_id = ri_doc["id"]
name = f"doc_{ri_doc_id.split(':')[-1]}"
match_line += f"MERGE ({name} {{ id: '{ri_doc_id}'}})\n"
rel_line += f"MERGE (s) -[:IS_REPORTED_IN] -> ({name})\n"

proposition = statement["proposition"]
statement["propositionType"] = proposition["type"]
match_line += "SET s.propositionType=$propositionType\n"
Expand All @@ -439,21 +429,6 @@ def _add_statement(tx: ManagedTransaction, statement_in: dict) -> None:
match_line += f"MERGE (m {{ id: '{method_id}' }})\n"
rel_line += "MERGE (s) -[:IS_SPECIFIED_BY] -> (m)\n"

strength = statement.get("strength")
if strength:
strength_key_fields = ("primaryCode", "label")

strength_keys = _create_parameterized_query(
strength, strength_key_fields, entity_param_prefix="strength_"
)
for k in strength_key_fields:
v = strength.get(k)
if v:
statement[f"strength_{k}"] = v

match_line += f"MERGE (mc:MappableConcept {{ {strength_keys} }})\n"
rel_line += "MERGE (s) -[:HAS_STRENGTH] -> (mc)\n"

variant_id = proposition["subjectVariant"]["id"]
match_line += f"MERGE (v:Variation {{ id: '{variant_id}' }})\n"
rel_line += "MERGE (s) -[:HAS_VARIANT] -> (v)\n"
Expand All @@ -471,12 +446,96 @@ def _add_statement(tx: ManagedTransaction, statement_in: dict) -> None:
match_line += f"MERGE (tt:Condition {{ id: '{tumor_type_id}' }})\n"
rel_line += "MERGE (s) -[:HAS_TUMOR_TYPE] -> (tt)\n"

query = f"""
MERGE (s:{statement_type}:StudyStatement {{ {statement_keys} }})
statement_keys = _create_parameterized_query(
statement, ("id", "description", "direction", "type")
)

return f"""
MERGE (s:{statement['type']}:StudyStatement {{ {statement_keys} }})
{match_line}
{rel_line}
{rel_line}\n
"""


def _add_statement_evidence(tx: ManagedTransaction, statement_in: dict) -> None:
"""Add statement node and its relationships for evidence records

:param tx: Transaction object provided to transaction functions
:param statement_in: Statement CDM object for evidence items
"""
statement = statement_in.copy()
query = _get_statement_query(statement)

is_reported_in_docs = statement.get("reportedIn", [])
for ri_doc in is_reported_in_docs:
ri_doc_id = ri_doc["id"]
name = f"doc_{ri_doc_id.split(':')[-1]}"
query += f"""
MERGE ({name} {{ id: '{ri_doc_id}'}})
MERGE (s) -[:IS_REPORTED_IN] -> ({name})
"""

strength = statement.get("strength")
if strength:
strength_key_fields = ("primaryCode", "label")

strength_keys = _create_parameterized_query(
strength, strength_key_fields, entity_param_prefix="strength_"
)
for k in strength_key_fields:
v = strength.get(k)
if v:
statement[f"strength_{k}"] = v

query += f"""
MERGE (strength:Strength {{ {strength_keys} }})
MERGE (s) -[:HAS_STRENGTH] -> (strength)
"""
tx.run(query, **statement)


def _add_statement_assertion(tx: ManagedTransaction, statement_in: dict) -> None:
"""Add statement node and its relationships for assertion records

:param tx: Transaction object provided to transaction functions
:param statement_in: Statement CDM object for assertions
"""
statement = statement_in.copy()
query = _get_statement_query(statement)

classification = statement["classification"]
classification_keys = [
_create_parameterized_query(
classification, ("primaryCode",), entity_param_prefix="classification_"
)
]
statement["classification_primaryCode"] = classification["primaryCode"]
_add_mappings_and_exts_to_obj(classification, classification_keys)
statement.update(classification)
classification_keys = ", ".join(classification_keys)

query += f"""
MERGE (classification:Classification {{ {classification_keys} }})
MERGE (s) -[:HAS_CLASSIFICATION] -> (classification)
"""

evidence_lines = statement.get("hasEvidenceLines", [])
if evidence_lines:
for el in evidence_lines:
el["evidence_line_id"] = str(uuid.uuid4())
el["evidence_item_ids"] = [ev["id"] for ev in el["hasEvidenceItems"]]

query += """
WITH s
UNWIND $hasEvidenceLines AS el
MERGE (evidence_line:EvidenceLine {id: el.evidence_line_id, direction: el.directionOfEvidenceProvided})
MERGE (s)-[:HAS_EVIDENCE_LINE]->(evidence_line)
WITH evidence_line, el.evidence_item_ids AS evidence_item_ids
UNWIND evidence_item_ids AS evidence_item_id
MERGE (evidence:Statement {id: evidence_item_id})
MERGE (evidence_line)-[:HAS_EVIDENCE_ITEM]->(evidence)
"""

tx.run(query, **statement)


Expand All @@ -488,8 +547,9 @@ def add_transformed_data(driver: Driver, data: dict) -> None:
"""
# Used to keep track of IDs that are in statements. This is used to prevent adding
# nodes that aren't associated to statements
statements = data.get("statements", [])
ids_in_stmts = _get_ids_from_stmts(statements)
statements_evidence = data.get("statements_evidence", [])
statements_assertions = data.get("statements_assertions", [])
ids_in_stmts = _get_ids_from_stmts(statements_evidence + statements_assertions)

with driver.session() as session:
loaded_stmt_count = 0
Expand All @@ -511,8 +571,12 @@ def add_transformed_data(driver: Driver, data: dict) -> None:
session.execute_write(_add_therapy_or_group, tp, ids_in_stmts)

# This should always be done last
for statement in statements:
session.execute_write(_add_statement, statement)
for statement_evidence_item in statements_evidence:
session.execute_write(_add_statement_evidence, statement_evidence_item)
loaded_stmt_count += 1

for statement_assertion in statements_assertions:
session.execute_write(_add_statement_assertion, statement_assertion)
loaded_stmt_count += 1

_logger.info("Successfully loaded %s statements.", loaded_stmt_count)
Expand Down
48 changes: 45 additions & 3 deletions src/metakb/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
VariantPrognosticStudyStatement,
VariantTherapeuticResponseStudyStatement,
)
from ga4gh.va_spec.base import Document, Method, TherapyGroup
from ga4gh.va_spec.base import Direction, Document, EvidenceLine, Method, TherapyGroup
from ga4gh.vrs.models import Expression, Variation
from neo4j import Driver
from neo4j.graph import Node
Expand Down Expand Up @@ -522,7 +522,6 @@ def _get_nested_stmt(
condition_key: None,
},
"strength": None,
"reportedIn": [],
"specifiedBy": None,
}
params.update(stmt_node)
Expand Down Expand Up @@ -560,13 +559,17 @@ def _get_nested_stmt(
node["subtype"] = json.loads(node["subtype"])
params["specifiedBy"] = Method(**node)
elif rel_type == "IS_REPORTED_IN":
params["reportedIn"].append(self._get_document(node))
params["reportedIn"] = [self._get_document(node)]
elif rel_type == "HAS_STRENGTH":
params["strength"] = MappableConcept(**node)
elif rel_type == "HAS_THERAPEUTIC":
params["proposition"]["objectTherapeutic"] = self._get_therapy_or_group(
node
)
elif rel_type == "HAS_CLASSIFICATION":
params["classification"] = self._get_classification(node)
elif rel_type == "HAS_EVIDENCE_LINE":
params["hasEvidenceLines"] = self._get_evidence_lines(statement_id)
else:
logger.warning("relation type not supported: %s", rel_type)

Expand Down Expand Up @@ -816,6 +819,45 @@ def _get_therapy_or_group(

return therapy

@staticmethod
def _get_classification(node: dict) -> MappableConcept:
"""Get classification data from a node with relationship ``HAS_CLASSIFICATION``

:param node: CLassification node data. This will be mutated
:return: Classification data
"""
civic_amp_level = node.pop("civic_amp_level")
if civic_amp_level:
node["extensions"] = [
Extension(name="civic_amp_level", value=civic_amp_level)
]
return MappableConcept(**node)

def _get_evidence_lines(self, statement_id: int) -> list[EvidenceLine]:
"""Get EvidenceLine data from a node with relationship ``HAS_CLASSIFICATION``

:param statement_id: Statement ID to get evidence lines for
:return: EvidenceLine data for a given ``statement_id``
"""
evidence_lines = []

query = f"""
MATCH (s:Statement {{id: '{statement_id}'}}) -[:HAS_EVIDENCE_LINE] -> (el:EvidenceLine)
OPTIONAL MATCH (el) -[:HAS_EVIDENCE_ITEM] -> (ev:Statement)
RETURN DISTINCT el, ev
"""
results = self.driver.execute_query(query).records
for r in results:
r_params = r.data()
evidence_lines.append(
EvidenceLine(
hasEvidenceItems=[self._get_nested_stmt(r_params["ev"])],
directionOfEvidenceProvided=Direction(r_params["el"]["direction"]),
)
)

return evidence_lines

def _get_therapies(
self,
tp_id: str,
Expand Down
11 changes: 8 additions & 3 deletions src/metakb/transformers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ga4gh.va_spec.base import Document, Method, TherapyGroup
from ga4gh.vrs.models import Allele
from gene.schemas import NormalizeService as NormalizedGene
from pydantic import BaseModel, StrictStr, ValidationError
from pydantic import BaseModel, Field, StrictStr, ValidationError
from therapy.schemas import NormalizationService as NormalizedTherapy

from metakb import APP_ROOT, DATE_FMT
Expand Down Expand Up @@ -111,11 +111,16 @@ class ViccConceptVocab(BaseModel):
class TransformedData(BaseModel):
"""Define model for transformed data"""

statements: list[
statements_evidence: list[
VariantTherapeuticResponseStudyStatement
| VariantPrognosticStudyStatement
| VariantDiagnosticStudyStatement
] = []
] = Field([], description="Statement objects for evidence records")
statements_assertions: list[
VariantTherapeuticResponseStudyStatement
| VariantPrognosticStudyStatement
| VariantDiagnosticStudyStatement
] = Field([], description="Statement objects for assertion records")
categorical_variants: list[CategoricalVariant] = []
variations: list[Allele] = []
genes: list[MappableConcept] = []
Expand Down
Loading
Loading