Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make induced_slot recursive #332

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 136 additions & 76 deletions linkml_runtime/utils/schemaview.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import os
import pdb
import uuid
import logging
import collections
from functools import lru_cache
from copy import copy, deepcopy
from collections import defaultdict, deque
from pathlib import Path
from typing import Mapping, Tuple, TypeVar
from typing import Mapping, Optional, Tuple, TypeVar
import warnings

from linkml_runtime.utils.namespaces import Namespaces
from deprecated.classic import deprecated
from linkml_runtime.utils.context_utils import parse_import_map, map_import
from linkml_runtime.utils.formatutils import is_empty, remove_empty_items
from linkml_runtime.utils.pattern import PatternResolver
from linkml_runtime.linkml_model.meta import *
from linkml_runtime.exceptions import OrderingError
Expand Down Expand Up @@ -144,6 +146,11 @@ class SchemaView(object):
modifications: int = 0
uuid: str = None

## private vars --------
# cached hash
_hash: Optional[int] = None


def __init__(self, schema: Union[str, Path, SchemaDefinition],
importmap: Optional[Dict[str, str]] = None, merge_imports: bool = False, base_dir: str = None):
if isinstance(schema, Path):
Expand All @@ -165,8 +172,10 @@ def __eq__(self, other):
return self.__key() == other.__key()
return NotImplemented

def __hash__(self):
return hash(self.__key())
def __hash__(self) -> int:
if self._hash is None:
self._hash = hash(self.__key())
return self._hash

@lru_cache(None)
def namespaces(self) -> Namespaces:
Expand Down Expand Up @@ -594,7 +603,7 @@ def get_class(self, class_name: CLASS_NAME, imports=True, strict=False) -> Class
return c

@lru_cache(None)
def get_slot(self, slot_name: SLOT_NAME, imports=True, attributes=True, strict=False) -> SlotDefinition:
def get_slot(self, slot_name: SLOT_NAME, imports=True, attributes=True, strict=False) -> Optional[SlotDefinition]:
"""
:param slot_name: name of the slot to be retrieved
:param imports: include import closure
Expand All @@ -614,6 +623,11 @@ def get_slot(self, slot_name: SLOT_NAME, imports=True, attributes=True, strict=F
slot.owner = c.name
if strict and slot is None:
raise ValueError(f'No such slot as "{slot_name}"')
if slot is not None:
if slot.inlined_as_list:
slot.inlined = True
if slot.identifier or slot.key:
slot.required = True
return slot

@lru_cache(None)
Expand Down Expand Up @@ -1295,101 +1309,146 @@ def class_slots(self, class_name: CLASS_NAME, imports=True, direct=False, attrib
return slots_nr

@lru_cache(None)
def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, imports=True,
def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, imports: bool = True,
mangle_name=False) -> SlotDefinition:
"""
Given a slot, in the context of a particular class, yield a dynamic SlotDefinition that
has all properties materialized.

This makes use of schema slots, such as attributes, slot_usage. It also uses ancestor relationships
to infer missing values, for inheritable slots
to infer missing values, for inheritable slots.

Creates a new SlotDefinition object, so ``sv.induced_slot(slot) is sv.get_slot(slot)``
will always be ``False`` .

:param slot_name: slot to be queries
:param class_name: class used as context
:param imports: include imports closure
:return: dynamic slot constructed by inference
"""
if class_name:
cls = self.get_class(class_name, imports, strict=True)
induced_slot = self._induced_attribute(slot_name=slot_name, class_name=class_name, imports=imports)
else:
cls = None
induced_slot = self._induced_slot(slot_name=slot_name, imports=imports)
if induced_slot is None:
# try to get it from the attributes of whatever class we find first
attr = self.get_slot(slot_name=slot_name, imports=imports, attributes=True)
induced_slot = self._induced_attribute(attr.name, class_name=attr.owner, imports=imports)

if induced_slot is None:
raise ValueError(f"No such slot {slot_name} as an attribute of {class_name} ancestors "
"or as a slot definition in the schema")

# Set any values that need to take on defaults that aren't explicit in the metamodel
if 'range' not in induced_slot:
induced_slot['range'] = self.schema.default_range
if 'alias' not in induced_slot:
induced_slot['alias'] = underscore(slot_name)

# Set values based on logical conditions of the induced_slot
if induced_slot.get('inlined_as_list', False):
induced_slot['inlined'] = True
if induced_slot.get('identifier', False) or induced_slot.get('key', False):
induced_slot['required'] = True

# final modifications outside of schema declaration
if mangle_name and class_name:
induced_slot['name'] = f'{camelcase(class_name)}__{underscore(slot_name)}'

# set the domain of the slot
# FIXME: this is inaccurate because classes can inherit slots and attrs
# return when induced_classes is recursive and it's cheap to get a fully hydrated class
induced_slot['domain_of'] = []
for c in self.all_classes().values():
if induced_slot['name'] in c.slots or induced_slot['name'] in c.attributes:
if c.name not in induced_slot['domain_of']:
induced_slot['domain_of'].append(c.name)

induced_slot = SlotDefinition(**induced_slot)

return induced_slot

@lru_cache(None)
def _induced_slot(self, slot_name: SLOT_NAME, imports: bool =True) -> Optional[dict]:
"""
induced_slot when class_name is None - schema-level slot definitions only
"""
slot = self.get_slot(slot_name, imports, attributes=False)
if slot is None:
return slot

# propagate inheritable_slots from ancestors
# recursively propagate from n+1 layer up
induced_slot = {}
parent_names = self.slot_parents(slot_name=slot_name, imports=imports, mixins=True, is_a=True)
for parent_name in reversed(parent_names):
induced_parent = copy(self._induced_slot(parent_name, imports=imports))
induced_slot.update({k:v for k,v in induced_parent.items() if k in SlotDefinition._inherited_slots})

# apply props set on this slot last
induced_slot.update({k: v for k, v in slot.__dict__.items() if not is_empty(v)})
return induced_slot

@lru_cache(None)
def _induced_attribute(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None,
imports=True) -> Optional[dict]:
"""
induced_slot when class_name is given - could be either an attribute or a schema-level slot definition
"""
cls = self.get_class(class_name, imports, strict=True)
class_ancestor_names = self.class_ancestors(class_name, imports=imports, reflexive=True,
mixins=True)
class_parent_names = self.class_parents(class_name=class_name, imports=imports, mixins=True, is_a=True)
class_ancestors = [self.get_class(name, imports=imports) for name in class_ancestor_names]

# attributes take priority over schema-level slot definitions, IF
# the attributes is declared for the class or an ancestor
slot_comes_from_attribute = False
if cls:
slot = self.get_slot(slot_name, imports, attributes=False)
# traverse ancestors (reflexive), starting with
# the main class
for an in self.class_ancestors(class_name):
a = self.get_class(an, imports)
if slot_name in a.attributes:
slot = a.attributes[slot_name]
slot_comes_from_attribute = True
break
attribute = None
for ancestor in class_ancestors:
if slot_name in ancestor.attributes:
attribute = ancestor.attributes[slot_name]
break

if attribute is None:
induced_slot = copy(self._induced_slot(slot_name=slot_name, imports=imports))
if induced_slot is None:
return induced_slot
else:
slot = self.get_slot(slot_name, imports, attributes=True)

if slot is None:
raise ValueError(f"No such slot {slot_name} as an attribute of {class_name} ancestors "
"or as a slot definition in the schema")
# we'll update from the values of this attribute at the end
induced_slot = {}

# copy the slot, as it will be modified
induced_slot = copy(slot)
if not slot_comes_from_attribute:
slot_anc_names = self.slot_ancestors(slot_name, reflexive=True)
# inheritable slot: first propagate from ancestors
for anc_sn in reversed(slot_anc_names):
anc_slot = self.get_slot(anc_sn, attributes=False)
for metaslot_name in SlotDefinition._inherited_slots:
if getattr(anc_slot, metaslot_name, None):
setattr(induced_slot, metaslot_name, copy(getattr(anc_slot, metaslot_name)))
COMBINE = {
'maximum_value': lambda x, y: min(x, y),
'minimum_value': lambda x, y: max(x, y),
}
# iterate through all metaslots, and potentially populate metaslot value for induced slot
for metaslot_name in self._metaslots_for_slot():
# inheritance of slots; priority order
# slot-level assignment < ancestor slot_usage < self slot_usage
v = getattr(induced_slot, metaslot_name, None)
if not cls:
propagated_from = []
else:
propagated_from = self.class_ancestors(class_name, reflexive=True, mixins=True)
for an in reversed(propagated_from):
induced_slot.owner = an
a = self.get_class(an, imports)
anc_slot_usage = a.slot_usage.get(slot_name, {})
v2 = getattr(anc_slot_usage, metaslot_name, None)
if v is None:
v = v2
else:
if metaslot_name in COMBINE:
if v2 is not None:
v = COMBINE[metaslot_name](v, v2)
else:
if v2 is not None:
v = v2
logging.debug(f'{v} takes precedence over {v2} for {induced_slot.name}.{metaslot_name}')
if v is None:
if metaslot_name == 'range':
v = self.schema.default_range
if v is not None:
setattr(induced_slot, metaslot_name, v)
if slot.inlined_as_list:
slot.inlined = True
if slot.identifier or slot.key:
slot.required = True
if mangle_name:
mangled_name = f'{camelcase(class_name)}__{underscore(slot_name)}'
induced_slot.name = mangled_name
if not induced_slot.alias:
induced_slot.alias = underscore(slot_name)
for c in self.all_classes().values():
if induced_slot.name in c.slots or induced_slot.name in c.attributes:
if c.name not in induced_slot.domain_of:
induced_slot.domain_of.append(c.name)
# update recursively from parent classes
for parent_name in reversed(class_parent_names):
induced_parent = copy(self._induced_attribute(slot_name=slot_name, class_name=parent_name, imports=imports))
if induced_parent is None:
continue
# even tho parent should already be completed,
# merge values here too since we can have multiple parents via mixins
for combine_key, combine_func in COMBINE.items():
if (src_val := induced_slot.get(combine_key, None)) and (parent_val := induced_parent.get(combine_key, None)):
induced_parent[combine_key] = combine_func(src_val, parent_val)
induced_slot.update(induced_parent)

# now update from values set in this class on an attribute
if attribute is not None:
induced_slot.update({k: v for k, v in attribute.__dict__.items() if not is_empty(v)})

# update from any slot_usage from this class
if slot_usage := cls.slot_usage.get(slot_name, None):
if isinstance(slot_usage, SlotDefinition):
slot_usage = remove_empty_items(slot_usage)

# merge any fields that need to be merged for monotonicity reazons
for combine_key, combine_func in COMBINE.items():
if (src_val := induced_slot.get(combine_key, None)) and (parent_val := slot_usage.get(combine_key, None)):
slot_usage[combine_key] = combine_func(src_val, parent_val)

induced_slot.update(slot_usage)

return induced_slot

@lru_cache(None)
Expand Down Expand Up @@ -1825,6 +1884,7 @@ def copy_schema(self, new_name: str = None) -> SchemaDefinition:
return s2

def set_modified(self) -> None:
self._hash = None
self.modifications += 1

def materialize_patterns(self) -> None:
Expand Down
Loading