Skip to content

Commit

Permalink
Merge pull request #59 from linkml/rdflib_loader_dumper
Browse files Browse the repository at this point in the history
New rdf dumper - bypasses existing jsonld based method and does a direct conversion from elements to rdflib
  • Loading branch information
cmungall authored Oct 21, 2021
2 parents 725e518 + 74d251e commit 0fd55c3
Show file tree
Hide file tree
Showing 10 changed files with 1,604 additions and 9 deletions.
2 changes: 2 additions & 0 deletions linkml_runtime/dumpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from linkml_runtime.dumpers.json_dumper import JSONDumper
from linkml_runtime.dumpers.rdf_dumper import RDFDumper
from linkml_runtime.dumpers.rdflib_dumper import RDFLibDumper
from linkml_runtime.dumpers.yaml_dumper import YAMLDumper
from linkml_runtime.dumpers.csv_dumper import CSVDumper

json_dumper = JSONDumper()
rdf_dumper = RDFDumper()
rdflib_dumper = RDFLibDumper()
yaml_dumper = YAMLDumper()
csv_dumper = CSVDumper()
145 changes: 145 additions & 0 deletions linkml_runtime/dumpers/rdflib_dumper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import logging
from abc import abstractmethod
from typing import Optional, Any, Dict

from rdflib import Graph, URIRef
from rdflib.term import Node, BNode, Literal
from rdflib.namespace import RDF


from linkml_runtime.dumpers.dumper_root import Dumper
from linkml_runtime.utils.schemaview import SchemaView, ElementName, PermissibleValue, PermissibleValueText
from linkml_runtime.utils.yamlutils import YAMLRoot


class RDFLibDumper(Dumper):
"""
Dumps from elements (instances of a LinkML model) to an rdflib Graph
Note: this should be used in place of rdf_loader for now
This requires a SchemaView object
"""
def as_rdf_graph(self, element: YAMLRoot, schemaview: SchemaView, prefix_map: Dict[str, str] = None) -> Graph:
"""
Dumps from element to an rdflib Graph,
following a schema
:param element: element to represent in RDF
:param schemaview:
:param prefix_map:
:return:
"""
g = Graph()
logging.debug(f'PREFIXMAP={prefix_map}')
if prefix_map:
for k, v in prefix_map.items():
schemaview.namespaces()[k] = v
g.namespace_manager.bind(k, URIRef(v))
self.inject_triples(element, schemaview, g)
return g

def inject_triples(self, element: Any, schemaview: SchemaView, graph: Graph, target_type: ElementName = None) -> Node:
"""
Inject triples from conversion of element into a Graph
:param element: element to represent in RDF
:param schemaview:
:param graph:
:param target_type:
:return: root node as rdflib URIRef, BNode, or Literal
"""
namespaces = schemaview.namespaces()
slot_name_map = schemaview.slot_name_mappings()
logging.debug(f'CONVERT: {element} // {type(element)} // {target_type}')
if target_type in schemaview.all_enums():
if isinstance(element, PermissibleValueText):
e = schemaview.get_enum(target_type)
element = e.permissible_values[element]
else:
element = element.code
element: PermissibleValue
if element.meaning is not None:
return URIRef(schemaview.expand_curie(element.meaning))
else:
return Literal(element.text)
if target_type in schemaview.all_types():
t = schemaview.get_type(target_type)
dt_uri = t.uri
if dt_uri:
if dt_uri == 'xsd:string':
return Literal(element)
else:
return Literal(element, datatype=namespaces.uri_for(dt_uri))
else:
logging.error(f'NO DT: {t}')
return Literal(element)
element_vars = {k: v for k, v in vars(element).items() if not k.startswith('_')}
if len(element_vars) == 0:
return URIRef(schemaview.expand_curie(str(element)))
element_type = type(element)
cn = element_type.class_name
id_slot = schemaview.get_identifier_slot(cn)
if id_slot is not None:
element_id = getattr(element, id_slot.name)
logging.debug(f'ELEMENT_ID={element_id} // {id_slot.name}')
element_uri = namespaces.uri_for(element_id)
else:
element_uri = BNode()
type_added = False
for k, v_or_list in element_vars.items():
if isinstance(v_or_list, list):
vs = v_or_list
elif isinstance(v_or_list, dict):
vs = v_or_list.values()
else:
vs = [v_or_list]
for v in vs:
if v is None:
continue
if k in slot_name_map:
k = slot_name_map[k].name
else:
logging.error(f'Slot {k} not in name map')
slot = schemaview.induced_slot(k, cn)
if not slot.identifier:
slot_uri = URIRef(schemaview.get_uri(slot, expand=True))
v_node = self.inject_triples(v, schemaview, graph, slot.range)
graph.add((element_uri, slot_uri, v_node))
if slot.designates_type:
type_added = True
if not type_added:
graph.add((element_uri, RDF.type, URIRef(schemaview.get_uri(cn, expand=True))))
return element_uri

def dump(self, element: YAMLRoot,
to_file: str,
schemaview: SchemaView = None,
fmt: str = 'turtle', prefix_map: Dict[str, str] = None, **args) -> None:
"""
Write element as rdf to to_file
:param element: element to represent in RDF
:param to_file:
:param schemaview:
:param fmt:
:param prefix_map:
:return:
"""
super().dump(element, to_file, schemaview=schemaview, fmt=fmt, prefix_map=prefix_map)

def dumps(self, element: YAMLRoot, schemaview: SchemaView = None,
fmt: Optional[str] = 'turtle', prefix_map: Dict[str, str] = None) -> str:
"""
Convert element into an RDF graph guided by the schema
:param element:
:param schemaview:
:param fmt:
:param prefix_map:
:return: serialization of rdflib Graph containing element
"""
return self.as_rdf_graph(element, schemaview, prefix_map=prefix_map).\
serialize(format=fmt).decode()

2 changes: 2 additions & 0 deletions linkml_runtime/loaders/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from linkml_runtime.loaders.json_loader import JSONLoader
from linkml_runtime.loaders.rdf_loader import RDFLoader
from linkml_runtime.loaders.rdflib_loader import RDFLibLoader
from linkml_runtime.loaders.yaml_loader import YAMLLoader
from linkml_runtime.loaders.csv_loader import CSVLoader

json_loader = JSONLoader()
rdf_loader = RDFLoader()
rdflib_loader = RDFLibLoader()
yaml_loader = YAMLLoader()
csv_loader = CSVLoader()
177 changes: 177 additions & 0 deletions linkml_runtime/loaders/rdflib_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import logging
from copy import copy
from dataclasses import dataclass
from typing import Optional, Any, Dict, Type, Union, TextIO, List, Tuple, Set

from hbreader import FileInfo
from rdflib import Graph, URIRef
from rdflib.term import Node, BNode, Literal
from rdflib.namespace import RDF

from linkml_runtime.linkml_model import ClassDefinitionName
from linkml_runtime.loaders.loader_root import Loader
from linkml_runtime.utils.formatutils import underscore
from linkml_runtime.utils.schemaview import SchemaView, SlotDefinition
from linkml_runtime.utils.yamlutils import YAMLRoot

VALID_SUBJECT = Union[URIRef, BNode]
ANYDICT = Dict[str, Any]

@dataclass
class Pointer:
obj: str

class RDFLibLoader(Loader):
"""
Loads objects from rdflib Graphs into the python target_class structure
Note: this is a more complete replacement for rdf_loader
"""
def from_rdf_graph(self, graph: Graph, schemaview: SchemaView, target_class: Type[YAMLRoot],
prefix_map: Dict[str, str] = None) -> List[YAMLRoot]:
"""
Loads objects from graph into lists of the python target_class structure,
recursively walking RDF graph from instances of target_class.
:param graph: rdflib Graph that holds instances of target_class
:param schemaview: schema to which graph conforms
:param target_class: class which root nodes should instantiate
:param prefix_map:
:return: all instances of target class type
"""
namespaces = schemaview.namespaces()
# data prefix map: supplements or overrides existing schema prefix map
if prefix_map:
for k, v in prefix_map.items():
namespaces[k] = v
graph.namespace_manager.bind(k, URIRef(v))
# Step 1: Create stub root dicts
target_class_uriref: URIRef = target_class.class_class_uri
root_dicts: List[ANYDICT] = []
root_subjects: List[VALID_SUBJECT] = list(graph.subjects(RDF.type, target_class_uriref))
logging.debug(f'ROOTS = {root_subjects}')
# Step 2: walk RDF graph starting from root subjects, constructing dict tree
node_tuples_to_visit: List[Tuple[VALID_SUBJECT, ClassDefinitionName]] ## nodes and their type still to visit
node_tuples_to_visit = [(subject, target_class.class_name) for subject in root_subjects]
uri_to_slot: Dict[str, SlotDefinition] ## lookup table for RDF predicates -> slots
uri_to_slot = {URIRef(schemaview.get_uri(s, expand=True)): s for s in schemaview.all_slots().values()}
processed: Set[VALID_SUBJECT] = set() ## track nodes already visited
obj_map: Dict[VALID_SUBJECT, ANYDICT] = {} ## map from an RDF node to its dict representation
while len(node_tuples_to_visit) > 0:
subject, subject_class = node_tuples_to_visit.pop()
processed.add(subject)
dict_obj = self._get_id_dict(subject, schemaview, subject_class)
if subject in root_subjects:
root_dicts.append(dict_obj)
obj_map[subject] = dict_obj
# process all triples for this node
for (_, p, o) in graph.triples((subject, None, None)):
logging.debug(f' Processing triple {subject} {p} {o}, subject type = {subject_class}')
if p == RDF.type:
logging.debug(f'Ignoring RDF.type for {subject} {o}, we automatically infer this')
elif p not in uri_to_slot:
raise Exception(f'No pred for {p} {type(p)}')
else:
slot = schemaview.induced_slot(uri_to_slot[p].name, subject_class)
is_inlined = schemaview.is_inlined(slot)
slot_name = underscore(slot.name)
if isinstance(o, Literal):
v = o.value
else:
v = namespaces.curie_for(o)
if slot.range in schemaview.all_enums():
# if a PV has a meaning URI declared, map this
# back to a text representation
e = schemaview.get_enum(slot.range)
for pv in e.permissible_values.values():
if v == pv.meaning or str(o) == pv.meaning:
v = pv.text
break
if is_inlined:
# the object of the triple may not yet be processed;
# we store a pointer to o, and then replace this later
v = Pointer(o)
if slot.multivalued:
if slot_name not in dict_obj:
dict_obj[slot_name] = []
dict_obj[slot_name].append(v)
else:
dict_obj[slot_name] = v
if o not in processed:
# if o instantiates a class, add to list of nodes to be visited
if slot.range in schemaview.all_classes():
node_tuples_to_visit.append((o, ClassDefinitionName(slot.range)))
# Step 2: replace inline pointers with object dicts
def repl(v):
if isinstance(v, Pointer):
v2 = obj_map[v.obj]
if v2 is None:
raise Exception(f'No mapping for pointer {v}')
return v2
else:
return v
objs_to_visit: List[ANYDICT] = copy(root_dicts)
while len(objs_to_visit) > 0:
obj = objs_to_visit.pop()
logging.debug(f'Replacing pointers for {obj}')
for k, v in obj.items():
if v is None:
continue
if isinstance(v, list):
v = [repl(v1) for v1 in v if v1 is not None]
for v1 in v:
if isinstance(v1, dict):
objs_to_visit.append(v1)
else:
v = repl(v)
if isinstance(v, dict):
objs_to_visit.append(v)
obj[k] = v
# Final step: translate dicts into instances of target_class
return [target_class(**x) for x in root_dicts]

def _get_id_dict(self, node: VALID_SUBJECT, schemaview: SchemaView, cn: ClassDefinitionName) -> ANYDICT:
id_slot = schemaview.get_identifier_slot(cn)
if not isinstance(node, BNode):
id_val = schemaview.namespaces().curie_for(node)
if id_val == None:
id_val = str(node)
return {id_slot.name: id_val}
else:
if id_slot is not None:
raise Exception(f'Unexpected blank node {node}, type {cn} expects {id_slot.name} identifier')
return {}



def load(self, source: Union[str, TextIO, Graph], target_class: Type[YAMLRoot], *,
schemaview: SchemaView = None,
prefix_map: Dict[str, str] = None,
fmt: Optional[str] = 'turtle',
metadata: Optional[FileInfo] = None) -> YAMLRoot:
"""
Load the RDF in source into the python target_class structure
The assumption of all loaders is that the source contains exactly one instance of the
target class. To load from graphs with multiple instances, use from_rdf_graph
:param source: RDF data source. Can be a file name, an open handle or an existing graph
:param target_class: LinkML class to load the RDF into
:param schemaview: view over schema to guide instantiation
:param prefix_map: map of prefixes used in data
:param fmt: format of source if it isn't an existing Graph
:param metadata: source information. Used by some loaders to record where information came from
:return: Instance of target_class
"""
if isinstance(source, Graph):
g = source
else:
g = Graph()
g.parse(source, format=fmt)
objs = self.from_rdf_graph(g, schemaview=schemaview, target_class=target_class, prefix_map=prefix_map)
if len(objs) != 1:
raise Exception(f'Got {len(objs)} of type {target_class} from source, expected exactly 1')
return objs[0]



Loading

0 comments on commit 0fd55c3

Please sign in to comment.