diff --git a/mypy.ini b/mypy.ini index 9435d230b..1286a4332 100644 --- a/mypy.ini +++ b/mypy.ini @@ -4,6 +4,8 @@ ignore_missing_imports = False allow_untyped_globals = False files = xblock +exclude = + xblock.test # Ignore web_fragments typing until it has hints. [mypy-web_fragments.*] diff --git a/xblock/completable.py b/xblock/completable.py index cb98a3444..d3cfcba6f 100644 --- a/xblock/completable.py +++ b/xblock/completable.py @@ -1,6 +1,7 @@ """ This module defines CompletableXBlockMixin and completion mode enumeration. """ +from xblock.core import Blocklike, XBlockMixin class XBlockCompletionMode: @@ -12,7 +13,7 @@ class XBlockCompletionMode: EXCLUDED = "excluded" @classmethod - def get_mode(cls, block_class): + def get_mode(cls, block_class: Blocklike | type[Blocklike]) -> str: """ Return the effective completion mode for a given block. @@ -21,17 +22,17 @@ def get_mode(cls, block_class): return getattr(block_class, 'completion_mode', cls.COMPLETABLE) -class CompletableXBlockMixin: +class CompletableXBlockMixin(XBlockMixin): """ This mixin sets attributes and provides helper method to integrate XBlock with Completion API. """ - has_custom_completion = True - completion_mode = XBlockCompletionMode.COMPLETABLE + has_custom_completion: bool = True + completion_mode: str = XBlockCompletionMode.COMPLETABLE # To read more on the debate about using the terms percent vs ratio, see: # https://openedx.atlassian.net/wiki/spaces/OpenDev/pages/245465398/Naming+with+Percent+or+Ratio - def emit_completion(self, completion_percent): + def emit_completion(self, completion_percent: float) -> None: """ Emits completion event through Completion API. diff --git a/xblock/core.py b/xblock/core.py index fcea0f597..d0cb07c00 100644 --- a/xblock/core.py +++ b/xblock/core.py @@ -1,18 +1,23 @@ """ Base classes for all XBlock-like objects. Used by all XBlock Runtimes. """ +from __future__ import annotations + import copy import functools import inspect import json import logging import os +import typing as t import warnings from collections import OrderedDict, defaultdict import pkg_resources from lxml import etree -from webob import Response +from opaque_keys.edx.keys import LearningContextKey, UsageKey +from webob import Request, Response +from web_fragments.fragment import Fragment from xblock.exceptions import ( DisallowedFileError, @@ -21,11 +26,17 @@ KeyValueMultiSaveError, XBlockSaveError, ) -from xblock.fields import Field, List, Reference, ReferenceList, Scope, String +from xblock.field_data import FieldData +from xblock.fields import Field, List, Reference, ReferenceListNotNone, Scope, ScopeIds, String from xblock.internal import class_lazy from xblock.plugin import Plugin from xblock.validation import Validation + +if t.TYPE_CHECKING: + from xblock.runtime import Runtime + + # OrderedDict is used so that namespace attributes are put in predictable order # This allows for simple string equality assertions in tests and have no other effects XML_NAMESPACES = OrderedDict([ @@ -36,8 +47,6 @@ # __all__ controls what classes end up in the docs. __all__ = ['XBlock', 'XBlockAside'] -UNSET = object() - class _AutoNamedFieldsMetaclass(type): """ @@ -91,26 +100,28 @@ class Blocklike(metaclass=_AutoNamedFieldsMetaclass): (see XBlock and XBlockAside classes for details) """ - resources_dir = '' - public_dir = 'public' - i18n_js_namespace = None + resources_dir: str = '' + public_dir: str = 'public' + i18n_js_namespace: str | None = None + + entry_point: str # Should be overwritten by children classes @classmethod - def get_resources_dir(cls): + def get_resources_dir(cls) -> str: """ Gets the resource directory for this XBlock-like class. """ return cls.resources_dir @classmethod - def get_public_dir(cls): + def get_public_dir(cls) -> str: """ Gets the public directory for this XBlock-like class. """ return cls.public_dir @classmethod - def get_i18n_js_namespace(cls): + def get_i18n_js_namespace(cls) -> str | None: """ Gets the JavaScript translations namespace for this XBlock-like class. @@ -121,7 +132,7 @@ def get_i18n_js_namespace(cls): return cls.i18n_js_namespace @classmethod - def open_local_resource(cls, uri): + def open_local_resource(cls, uri: str | bytes) -> t.IO[bytes]: """ Open a local resource. @@ -159,8 +170,12 @@ def open_local_resource(cls, uri): return pkg_resources.resource_stream(cls.__module__, os.path.join(cls.resources_dir, uri)) + BlocklikeSubclass = t.TypeVar("BlocklikeSubclass", bound="Blocklike") + @classmethod - def json_handler(cls, func): + def json_handler( + cls, func: t.Callable[[BlocklikeSubclass, t.Any, str], t.Any] + ) -> t.Callable[[BlocklikeSubclass, Request, str], Response]: """ Wrap a handler to consume and produce JSON. @@ -179,7 +194,7 @@ def json_handler(cls, func): """ @cls.handler @functools.wraps(func) - def wrapper(self, request, suffix=''): + def wrapper(self: Blocklike.BlocklikeSubclass, request: Request, suffix: str = '') -> Response: """The wrapper function `json_handler` returns.""" if request.method != "POST": return JsonHandlerError(405, "Method must be POST").get_response(allow=["POST"]) @@ -198,17 +213,21 @@ def wrapper(self, request, suffix=''): return wrapper @classmethod - def handler(cls, func): + def handler( + cls, + func: t.Callable[[BlocklikeSubclass, Request, str], Response], + ) -> t.Callable[[BlocklikeSubclass, Request, str], Response]: """ A decorator to indicate a function is usable as a handler. The wrapped function must return a :class:`webob.Response` object. """ - func._is_xblock_handler = True # pylint: disable=protected-access + # pylint: disable=protected-access + func._is_xblock_handler = True # type: ignore[attr-defined] return func @classmethod - def needs(cls, *service_names): + def needs(cls, *service_names: str): """ A class decorator to indicate that an XBlock-like class needs particular services. """ @@ -219,7 +238,7 @@ def _decorator(cls_): return _decorator @classmethod - def wants(cls, *service_names): + def wants(cls, *service_names: str): """ A class decorator to indicate that a XBlock-like class wants particular services. """ @@ -230,7 +249,7 @@ def _decorator(cls_): return _decorator @classmethod - def service_declaration(cls, service_name): + def service_declaration(cls, service_name: str): """ Find and return a service declaration. @@ -247,39 +266,45 @@ def service_declaration(cls, service_name): return cls._combined_services.get(service_name) # pylint: disable=no-member @class_lazy - def _services_requested(cls): # pylint: disable=no-self-argument + def _services_requested(cls) -> dict[str, t.Any]: # pylint: disable=no-self-argument """ A per-class dictionary to store the services requested by a particular XBlock. """ return {} @class_lazy - def _combined_services(cls): # pylint: disable=no-self-argument + def _combined_services(cls) -> dict[str, t.Any]: # pylint: disable=no-self-argument """ A dictionary that collects all _services_requested by all ancestors of this XBlock class. """ # The class declares what services it desires. To deal with subclasses, # especially mixins, properly, we have to walk up the inheritance # hierarchy, and combine all the declared services into one dictionary. - combined = {} - for parent in reversed(cls.mro()): # pylint: disable=no-member + combined: dict[str, t.Any] = {} + for parent in reversed(cls._mro()): combined.update(getattr(parent, "_services_requested", {})) return combined + @classmethod + def _mro(cls) -> list[type[t.Self]]: + """ + Silly helper for getting this object's/class's resolution order (MRO) without upsetting mypy or pylint. + """ + return cls.mro() + @class_lazy - def fields(cls): # pylint: disable=no-self-argument + def fields(cls) -> dict[str, Field]: # pylint: disable=no-self-argument """ A dictionary mapping the attribute name to the Field object for all Field attributes of the class. """ - fields = {} + fields: dict[str, Field] = {} # Loop through all of the baseclasses of cls, in # the order that methods are resolved (Method Resolution Order / mro) # and find all of their defined fields. # # Only save the first such defined field (as expected for method resolution) - - bases = cls.mro() # pylint: disable=no-member - local = bases.pop(0) + bases = cls._mro() + local = bases.pop(0) # pylint: disable=no-member # pylint thinks `bases` is a tuple? # First, descend the MRO from the top down, updating the 'fields' dictionary # so that the dictionary always has the most specific version of fields in it @@ -294,17 +319,9 @@ def fields(cls): # pylint: disable=no-self-argument return fields @classmethod - def parse_xml(cls, node, runtime, keys): + def parse_xml(cls, node: etree._Element, runtime: Runtime, keys: ScopeIds) -> Blocklike: """ Use `node` to construct a new block. - - Arguments: - node (:class:`~xml.etree.ElementTree.Element`): The xml node to parse into an xblock. - - runtime (:class:`.Runtime`): The runtime to use while parsing. - - keys (:class:`.ScopeIds`): The keys identifying where this block - will store its data. """ block = runtime.construct_xblock_from_class(cls, keys) @@ -324,6 +341,8 @@ def parse_xml(cls, node, runtime, keys): # Attributes become fields. for name, value in list(node.items()): # lxml has no iteritems + if isinstance(name, bytes): + name = name.decode('utf-8') cls._set_field_if_present(block, name, value, {}) # Text content becomes "content", if such a field exists. @@ -337,7 +356,9 @@ def parse_xml(cls, node, runtime, keys): return block @classmethod - def _set_field_if_present(cls, block, name, value, attrs): + def _set_field_if_present( + cls, block: XBlock, name: str, value: t.Any, attrs: etree._Attrib | dict[str, str] + ) -> None: """ Sets the field block.name, if block have such a field. """ @@ -350,7 +371,14 @@ def _set_field_if_present(cls, block, name, value, attrs): else: logging.warning("%s does not contain field %s", type(block), name) - def __init__(self, scope_ids, field_data=None, *, runtime, **kwargs): + def __init__( + self, + scope_ids: ScopeIds, + field_data: FieldData | None = None, + *, + runtime: Runtime, + **kwargs + ): """ Arguments: @@ -382,13 +410,13 @@ def __init__(self, scope_ids, field_data=None, *, runtime, **kwargs): else: self._deprecated_per_instance_field_data = None # pylint: disable=invalid-name - self._field_data_cache = {} - self._dirty_fields = {} - self.scope_ids = scope_ids + self._field_data_cache: dict[str, t.Any] = {} + self._dirty_fields: dict[Field, t.Any] = {} + self.scope_ids: ScopeIds = scope_ids super().__init__(**kwargs) - def __repr__(self): + def __repr__(self) -> str: attrs = [] for field in self.fields.values(): try: @@ -411,7 +439,7 @@ def __repr__(self): ) @property - def usage_key(self): + def usage_key(self) -> UsageKey: """ A key identifying this particular usage of the XBlock-like, unique across all learning contexts in the system. @@ -420,7 +448,7 @@ def usage_key(self): return self.scope_ids.usage_id @property - def context_key(self): + def context_key(self) -> LearningContextKey | None: """ A key identifying the learning context (course, library, etc.) that contains this XBlock-like usage. @@ -436,7 +464,7 @@ def context_key(self): """ return getattr(self.scope_ids.usage_id, "context_key", None) - def index_dictionary(self): + def index_dictionary(self) -> dict[str, t.Any]: """ Return a dict containing information that could be used to feed a search index. @@ -467,7 +495,7 @@ def index_dictionary(self): return _index_dictionary - def handle(self, handler_name, request, suffix=''): + def handle(self, handler_name: str, request: Request, suffix: str = '') -> Response: """ Handle `request` with this block's runtime. """ @@ -535,7 +563,7 @@ def force_save_fields(self, field_names): for field in fields: self._reset_dirty_field(field) - def _get_fields_to_save(self): + def _get_fields_to_save(self) -> t.Iterable[str]: """ Get an xblock's dirty fields. """ @@ -544,13 +572,13 @@ def _get_fields_to_save(self): # pylint: disable=protected-access return [field.name for field in self._dirty_fields if field._is_dirty(self)] - def _clear_dirty_fields(self): + def _clear_dirty_fields(self) -> None: """ Remove all dirty fields from an XBlock. """ self._dirty_fields.clear() - def _reset_dirty_field(self, field): + def _reset_dirty_field(self, field: Field) -> None: """ Resets dirty field value with the value from the field data cache. """ @@ -559,11 +587,10 @@ def _reset_dirty_field(self, field): self._field_data_cache[field.name] ) - def add_xml_to_node(self, node): + def add_xml_to_node(self, node: etree._Element) -> None: """ For exporting, set data on `node` from ourselves. """ - # pylint: disable=E1101 # Set node.tag based on our class name. node.tag = self.xml_element_name() node.set('xblock-family', self.entry_point) @@ -580,22 +607,23 @@ def add_xml_to_node(self, node): if text is not None: node.text = text - def xml_element_name(self): + def xml_element_name(self) -> str: """ What XML element name should be used for this block? """ return self.scope_ids.block_type - def xml_text_content(self): + def xml_text_content(self) -> str | None: """ What is the text content for this block's XML node? """ - if 'content' in self.fields and self.content: # pylint: disable=unsupported-membership-test - return self.content + # pylint: disable=unsupported-membership-test + if 'content' in self.fields and self.content: # type: ignore[attr-defined] + return self.content # type: ignore[attr-defined] else: return None - def _add_field(self, node, field_name, field): + def _add_field(self, node: etree._Element, field_name: str, field: Field) -> None: """ Add xml representation of field to node. @@ -642,7 +670,7 @@ class _HasChildrenMetaclass(_AutoNamedFieldsMetaclass): """ def __new__(mcs, name, bases, attrs): if (attrs.get('has_children', False) or any(getattr(base, 'has_children', False) for base in bases)): - attrs['children'] = ReferenceList( + attrs['children'] = ReferenceListNotNone( help='The ids of the children of this XBlock', scope=Scope.children) else: @@ -672,7 +700,7 @@ class XBlock(Plugin, Blocklike, metaclass=_HasChildrenMetaclass): entry_point = 'xblock.v1' name = String(help="Short name for the block", scope=Scope.settings) - tags = List(help="Tags for this block", scope=Scope.settings) + tags: List[str] = List(help="Tags for this block", scope=Scope.settings) parent = Reference(help='The id of the parent of this XBlock', default=None, scope=Scope.parent) @@ -680,26 +708,25 @@ class XBlock(Plugin, Blocklike, metaclass=_HasChildrenMetaclass): # We just declare their types here to make static analyzers happy. # Note that children is only defined iff has_children is defined and True. has_children: bool - children: ReferenceList + children: ReferenceListNotNone @class_lazy - def _class_tags(cls): # pylint: disable=no-self-argument + def _class_tags(cls) -> set[str]: # pylint: disable=no-self-argument """ Collect the tags from all base classes. """ - class_tags = set() - - for base in cls.mro()[1:]: # pylint: disable=no-member + class_tags: set[str] = set() + for base in cls._mro()[1:]: class_tags.update(getattr(base, '_class_tags', set())) return class_tags @staticmethod - def tag(tags): + def tag(tags: str) -> t.Callable[[XBlock], XBlock]: """ Returns a function that adds the words in `tags` as class tags to this class. """ - def dec(cls): + def dec(cls: t.Self) -> t.Self: """Add the words in `tags` as class tags to this class.""" # Add in this class's tags cls._class_tags.update(tags.replace(",", " ").split()) # pylint: disable=protected-access @@ -707,7 +734,9 @@ def dec(cls): return dec @classmethod - def load_tagged_classes(cls, tag, fail_silently=True): + def load_tagged_classes( + cls, tag: str, fail_silently: bool = True + ) -> t.Iterable[tuple[str, type[XBlock]]]: """ Produce a sequence of all XBlock classes tagged with `tag`. @@ -725,17 +754,9 @@ def load_tagged_classes(cls, tag, fail_silently=True): yield name, class_ @classmethod - def parse_xml(cls, node, runtime, keys): + def parse_xml(cls, node: etree._Element, runtime: Runtime, keys: ScopeIds) -> XBlock: """ Use `node` to construct a new block. - - Arguments: - node (:class:`~xml.etree.ElementTree.Element`): The xml node to parse into an xblock. - - runtime (:class:`.Runtime`): The runtime to use while parsing. - - keys (:class:`.ScopeIds`): The keys identifying where this block - will store its data. """ block = runtime.construct_xblock_from_class(cls, keys) @@ -755,6 +776,8 @@ def parse_xml(cls, node, runtime, keys): # Attributes become fields. for name, value in list(node.items()): # lxml has no iteritems + if isinstance(name, bytes): + name = name.decode('utf-8') cls._set_field_if_present(block, name, value, {}) # Text content becomes "content", if such a field exists. @@ -769,10 +792,11 @@ def parse_xml(cls, node, runtime, keys): def __init__( self, - runtime, - field_data=None, - scope_ids=UNSET, - *args, # pylint: disable=keyword-arg-before-vararg + runtime: Runtime, + field_data: FieldData | None = None, + scope_ids: ScopeIds | None = None, + *, + for_parent: XBlock | None = None, **kwargs ): """ @@ -788,27 +812,26 @@ def __init__( scope_ids (:class:`.ScopeIds`): Identifiers needed to resolve scopes. """ - if scope_ids is UNSET: + if scope_ids is None: raise TypeError('scope_ids are required') # A cache of the parent block, retrieved from .parent - self._parent_block = None - self._parent_block_id = None - self._child_cache = {} + self._parent_block: XBlock | None = None + self._parent_block_id: UsageKey | None = None + self._child_cache: dict[UsageKey, XBlock] = {} - for_parent = kwargs.pop('for_parent', None) if for_parent is not None: self._parent_block = for_parent self._parent_block_id = for_parent.scope_ids.usage_id # Provide backwards compatibility for external access through _field_data - super().__init__(runtime=runtime, scope_ids=scope_ids, field_data=field_data, *args, **kwargs) + super().__init__(runtime=runtime, scope_ids=scope_ids, field_data=field_data, **kwargs) - def render(self, view, context=None): + def render(self, view: str, context: dict | None = None) -> Fragment: """Render `view` with this block's runtime and the supplied `context`""" return self.runtime.render(self, view, context) - def validate(self): + def validate(self) -> Validation: """ Ask this xblock to validate itself. Subclasses are expected to override this method, as there is currently only a no-op implementation. Any overriding method @@ -817,7 +840,7 @@ def validate(self): """ return Validation(self.scope_ids.usage_id) - def ugettext(self, text): + def ugettext(self, text: str) -> str: """ Translates message/text and returns it in a unicode string. Using runtime to get i18n service. @@ -826,15 +849,15 @@ def ugettext(self, text): runtime_ugettext = runtime_service.ugettext return runtime_ugettext(text) - def add_xml_to_node(self, node): + def add_xml_to_node(self, node: etree._Element) -> None: """ - For exporting, set data on etree.Element `node`. + For exporting, set data on etree._Element `node`. """ super().add_xml_to_node(node) # Add children for each of our children. self.add_children_to_node(node) - def get_parent(self): + def get_parent(self) -> XBlock | None: """Return the parent block of this block, or None if there isn't one.""" if not self.has_cached_parent: if self.parent is not None: @@ -845,11 +868,11 @@ def get_parent(self): return self._parent_block @property - def has_cached_parent(self): + def has_cached_parent(self) -> bool: """Return whether this block has a cached parent block.""" return self.parent is not None and self._parent_block_id == self.parent - def get_child(self, usage_id): + def get_child(self, usage_id: UsageKey) -> XBlock: """Return the child identified by ``usage_id``.""" if usage_id in self._child_cache: return self._child_cache[usage_id] @@ -858,7 +881,7 @@ def get_child(self, usage_id): self._child_cache[usage_id] = child_block return child_block - def get_children(self, usage_id_filter=None): + def get_children(self, usage_id_filter: t.Callable[[UsageKey], bool] | None = None) -> list[XBlock]: """ Return instantiated XBlocks for each of this blocks ``children``. """ @@ -871,15 +894,15 @@ def get_children(self, usage_id_filter=None): if usage_id_filter is None or usage_id_filter(usage_id) ] - def clear_child_cache(self): + def clear_child_cache(self) -> None: """ Reset the cache of children stored on this XBlock. """ self._child_cache.clear() - def add_children_to_node(self, node): + def add_children_to_node(self, node: etree._Element) -> None: """ - Add children to etree.Element `node`. + Add children to etree._Element `node`. """ if self.has_children: for child_id in self.children: @@ -887,7 +910,7 @@ def add_children_to_node(self, node): self.runtime.add_block_as_child_node(child, node) @classmethod - def supports(cls, *functionalities): + def supports(cls, *functionalities: str) -> t.Callable[[t.Callable], t.Callable]: """ A view decorator to indicate that an xBlock view has support for the given functionalities. @@ -896,20 +919,20 @@ def supports(cls, *functionalities): functionalities: String identifiers for the functionalities of the view. For example: "multi_device". """ - def _decorator(view): + def _decorator(view: t.Callable) -> t.Callable: """ Internal decorator that updates the given view's list of supported functionalities. """ # pylint: disable=protected-access if not hasattr(view, "_supports"): - view._supports = set() + view._supports = set() # type: ignore[attr-defined] for functionality in functionalities: - view._supports.add(functionality) + view._supports.add(functionality) # type: ignore[attr-defined] return view return _decorator - def has_support(self, view, functionality): + def has_support(self, view: t.Callable, functionality: str) -> bool: """ Returns whether the given view has support for the given functionality. @@ -930,6 +953,11 @@ def has_support(self, view, functionality): return hasattr(view, "_supports") and functionality in view._supports # pylint: disable=protected-access +# An XBlockAside's view method takes: itself, an XBlock, and optional context dict. +# It returns a Fragment. +AsideView = t.Callable[["XBlockAside", XBlock, t.Optional[dict]], Fragment] + + class XBlockAside(Plugin, Blocklike): """ Base class for XBlock-like objects that are rendered alongside :class:`.XBlock` views. @@ -947,7 +975,7 @@ class XBlockAside(Plugin, Blocklike): entry_point = "xblock_asides.v1" @classmethod - def aside_for(cls, view_name): + def aside_for(cls, view_name: str) -> t.Callable[[AsideView], AsideView]: """ A decorator to indicate a function is the aside view for the given view_name. @@ -991,7 +1019,7 @@ def _combined_asides(cls): # pylint: disable=no-self-argument combined_asides[view] = view_func.__name__ return combined_asides - def aside_view_declaration(self, view_name): + def aside_view_declaration(self, view_name: str) -> AsideView | None: """ Find and return a function object if one is an aside_view for the given view_name @@ -1009,7 +1037,7 @@ def aside_view_declaration(self, view_name): else: return None - def needs_serialization(self): + def needs_serialization(self) -> bool: """ Return True if the aside has any data to serialize to XML. diff --git a/xblock/django/request.py b/xblock/django/request.py index 067bdb80c..9bd5a08dc 100644 --- a/xblock/django/request.py +++ b/xblock/django/request.py @@ -1,22 +1,31 @@ """Helpers for WebOb requests and responses.""" +from __future__ import annotations + +import typing as t from itertools import chain, repeat from lazy import lazy import webob -from django.http import StreamingHttpResponse, HttpResponse -from webob.multidict import MultiDict, NestedMultiDict, NoVars +import webob.multidict +import django.core.files.uploadedfile +import django.http +import django.utils.datastructures -def webob_to_django_response(webob_response, streaming=False): +def webob_to_django_response( + webob_response: webob.Response, + streaming: bool = False +) -> django.http.HttpResponse | django.http.StreamingHttpResponse: """Returns a django response to the `webob_response`""" + django_response: django.http.HttpResponse | django.http.StreamingHttpResponse if streaming: - django_response = StreamingHttpResponse( + django_response = django.http.StreamingHttpResponse( webob_response.app_iter, content_type=webob_response.content_type, status=webob_response.status_code, ) else: - django_response = HttpResponse( + django_response = django.http.HttpResponse( webob_response.app_iter, content_type=webob_response.content_type, status=webob_response.status_code, @@ -26,15 +35,17 @@ def webob_to_django_response(webob_response, streaming=False): return django_response -def querydict_to_multidict(query_dict, wrap=None): +def querydict_to_multidict( + query_dict: django.utils.datastructures.MultiValueDict, + wrap: t.Callable[[t.Any], t.Any] | None = None +) -> webob.multidict.MultiDict: """ Returns a new `webob.MultiDict` from a `django.http.QueryDict`. If `wrap` is provided, it's used to wrap the values. - """ wrap = wrap or (lambda val: val) - return MultiDict(chain.from_iterable( + return webob.multidict.MultiDict(chain.from_iterable( zip(repeat(key), (wrap(v) for v in vals)) for key, vals in query_dict.lists() )) @@ -44,32 +55,35 @@ class DjangoUploadedFile: """ Looks like a cgi.FieldStorage, but wraps a Django UploadedFile. """ - def __init__(self, uploaded): + def __init__(self, uploaded: django.core.files.uploadedfile.UploadedFile): # FieldStorage needs a file attribute. - self.file = uploaded + self.file: t.Any = uploaded @property - def name(self): + def name(self) -> str: """The name of the input element used to upload the file.""" return self.file.field_name @property - def filename(self): + def filename(self) -> str: """The name of the uploaded file.""" return self.file.name class DjangoWebobRequest(webob.Request): """ - An implementation of the webob request api, backed - by a django request + An implementation of the webob request api, backed by a django request """ - def __init__(self, request): + # Note: + # This implementation is close enough to webob.Request for it to work OK, but it does + # make mypy complain that the type signatures are different, hence the 'type: ignore' pragmas. + + def __init__(self, request: django.http.HttpRequest): self._request = request super().__init__(self.environ) @lazy - def environ(self): + def environ(self) -> dict[str, str]: # type: ignore[override] """ Add path_info to the request's META dictionary. """ @@ -80,40 +94,40 @@ def environ(self): return environ @property - def GET(self): + def GET(self) -> webob.multidict.MultiDict: # type: ignore[override] """ Returns a new `webob.MultiDict` from the request's GET query. """ return querydict_to_multidict(self._request.GET) @property - def POST(self): + def POST(self) -> webob.multidict.MultiDict | webob.multidict.NoVars: # type: ignore[override] if self.method not in ('POST', 'PUT', 'PATCH'): - return NoVars('Not a form request') + return webob.multidict.NoVars('Not a form request') # Webob puts uploaded files into the POST dictionary, so here we # combine the Django POST data and uploaded FILES data into a single # dict. - return NestedMultiDict( + return webob.multidict.NestedMultiDict( querydict_to_multidict(self._request.POST), querydict_to_multidict(self._request.FILES, wrap=DjangoUploadedFile), ) @property - def body(self): + def body(self) -> bytes: # type: ignore[override] """ Return the content of the request body. """ return self._request.body - @property - def body_file(self): + @property # type: ignore[misc] + def body_file(self) -> django.http.HttpRequest: # type: ignore[override] """ Input stream of the request """ return self._request -def django_to_webob_request(django_request): +def django_to_webob_request(django_request: django.http.HttpRequest) -> webob.Request: """Returns a WebOb request to the `django_request`""" return DjangoWebobRequest(django_request) diff --git a/xblock/exceptions.py b/xblock/exceptions.py index 7f50b7e4e..dafeef01c 100644 --- a/xblock/exceptions.py +++ b/xblock/exceptions.py @@ -1,16 +1,25 @@ """ Module for all xblock exception classes """ +from __future__ import annotations + import json +import typing as t from webob import Response +from opaque_keys.edx.keys import UsageKey + +if t.TYPE_CHECKING: + from xblock.core import XBlock, Blocklike + from xblock.fields import Field, Scope + class XBlockNotFoundError(Exception): """ Raised to indicate that an XBlock could not be found with the requested usage_id """ - def __init__(self, usage_id): + def __init__(self, usage_id: UsageKey): # Exception is an old-style class, so can't use super Exception.__init__(self) self.message = f"Unable to load an xblock for usage_id {usage_id!r}" @@ -20,7 +29,12 @@ class XBlockSaveError(Exception): """ Raised to indicate an error in saving an XBlock """ - def __init__(self, saved_fields, dirty_fields, message=None): + def __init__( + self, + saved_fields: t.Iterable[Field], + dirty_fields: t.Iterable[Field], + message: str | None = None, + ): """ Create a new XBlockSaveError @@ -28,9 +42,6 @@ def __init__(self, saved_fields, dirty_fields, message=None): saved before the error occurred `dirty_fields` - a set of fields that were left dirty after the save """ - # Exception is an old-style class, so can't use super - Exception.__init__(self) - self.message = message self.saved_fields = saved_fields self.dirty_fields = dirty_fields @@ -40,16 +51,13 @@ class KeyValueMultiSaveError(Exception): """ Raised to indicated an error in saving multiple fields in a KeyValueStore """ - def __init__(self, saved_field_names): + def __init__(self, saved_field_names: t.Iterable[str]): """ Create a new KeyValueMultiSaveError `saved_field_names` - an iterable of field names (strings) that were successfully saved before the exception occurred """ - # Exception is an old-style class, so can't use super - Exception.__init__(self) - self.saved_field_names = saved_field_names @@ -57,8 +65,7 @@ class InvalidScopeError(Exception): """ Raised to indicated that operating on the supplied scope isn't allowed by a KeyValueStore """ - def __init__(self, invalid_scope, valid_scopes=None): - super().__init__() + def __init__(self, invalid_scope: str | Scope, valid_scopes: t.Iterable[str | Scope] = frozenset()): if valid_scopes: self.message = "Invalid scope: {}. Valid scopes are: {}".format( invalid_scope, @@ -72,15 +79,13 @@ class NoSuchViewError(Exception): """ Raised to indicate that the view requested was not found. """ - def __init__(self, block, view_name): + def __init__(self, block: XBlock, view_name: str): """ Create a new NoSuchViewError :param block: The XBlock without a view :param view_name: The name of the view that couldn't be found """ - # Can't use super because Exception is an old-style class - Exception.__init__(self) self.message = f"Unable to find view {view_name!r} on block {block!r}" @@ -109,12 +114,11 @@ class JsonHandlerError(Exception): Raised by a function decorated with XBlock.json_handler to indicate that an error response should be returned. """ - def __init__(self, status_code, message): - super().__init__() + def __init__(self, status_code: int, message: str | dict): self.status_code = status_code self.message = message - def get_response(self, **kwargs): + def get_response(self, **kwargs) -> Response: """ Returns a Response object containing this object's status code and a JSON object containing the key "error" with the value of this object's diff --git a/xblock/field_data.py b/xblock/field_data.py index 7d3d8abbf..491d7583b 100644 --- a/xblock/field_data.py +++ b/xblock/field_data.py @@ -4,12 +4,18 @@ provide varied persistence backends while keeping the API used by the `XBlock` simple. """ -import copy +from __future__ import annotations +import copy +import typing as t from abc import ABCMeta, abstractmethod from collections import defaultdict from xblock.exceptions import InvalidScopeError +from xblock.fields import Scope + +if t.TYPE_CHECKING: + from xblock.core import Blocklike class FieldData(metaclass=ABCMeta): @@ -18,7 +24,7 @@ class FieldData(metaclass=ABCMeta): """ @abstractmethod - def get(self, block, name): + def get(self, block: Blocklike, name: str) -> t.Any: """ Retrieve the value for the field named `name` for the XBlock `block`. @@ -34,7 +40,7 @@ def get(self, block, name): raise NotImplementedError @abstractmethod - def set(self, block, name, value): + def set(self, block: Blocklike, name: str, value: t.Any) -> None: """ Set the value of the field named `name` for XBlock `block`. @@ -49,7 +55,7 @@ def set(self, block, name, value): raise NotImplementedError @abstractmethod - def delete(self, block, name): + def delete(self, block: Blocklike, name: str) -> None: """ Reset the value of the field named `name` to the default for XBlock `block`. @@ -60,7 +66,7 @@ def delete(self, block, name): """ raise NotImplementedError - def has(self, block, name): + def has(self, block: Blocklike, name: str) -> bool: """ Return whether or not the field named `name` has a non-default value for the XBlock `block`. @@ -75,7 +81,7 @@ def has(self, block, name): except KeyError: return False - def set_many(self, block, update_dict): + def set_many(self, block: Blocklike, update_dict: dict[str, t.Any]) -> None: """ Update many fields on an XBlock simultaneously. @@ -87,7 +93,7 @@ def set_many(self, block, update_dict): for key, value in update_dict.items(): self.set(block, key, value) - def default(self, block, name): + def default(self, block: Blocklike, name: str) -> None: """ Get the default value for this field which may depend on context or may just be the field's global default. The default behavior is to raise KeyError which will cause the caller to return the field's @@ -100,6 +106,13 @@ def default(self, block, name): """ raise KeyError(repr(name)) + def save_block(self, block: Blocklike) -> None: + """ + Finalize/commit changes for the field data from the specified block. + + By default, this does nothing. + """ + class DictFieldData(FieldData): """ @@ -130,7 +143,7 @@ class SplitFieldData(FieldData): several backing FieldData objects. """ - def __init__(self, scope_mappings): + def __init__(self, scope_mappings: dict[Scope, FieldData]): """ `scope_mappings` defines :class:`~xblock.field_data.FieldData` objects to use for each scope. If a scope is not a key in `scope_mappings`, then using @@ -141,7 +154,7 @@ def __init__(self, scope_mappings): """ self._scope_mappings = scope_mappings - def _field_data(self, block, name): + def _field_data(self, block: Blocklike, name: str): """Return the field data for the field `name` on the :class:`~xblock.core.XBlock` `block`""" scope = block.fields[name].scope @@ -172,8 +185,7 @@ def has(self, block, name): def default(self, block, name): return self._field_data(block, name).default(block, name) - def save_block(self, block): - """ saving data """ + def save_block(self, block: Blocklike) -> None: field_datas = set(self._scope_mappings.values()) for field_data in field_datas: field_data.save_block(block) diff --git a/xblock/fields.py b/xblock/fields.py index 018944c21..b84883c9c 100644 --- a/xblock/fields.py +++ b/xblock/fields.py @@ -5,20 +5,29 @@ for each scope. """ -from collections import namedtuple +from __future__ import annotations + import copy -import datetime import hashlib import itertools import json import re import traceback +import typing as t import warnings +from dataclasses import dataclass +from datetime import datetime, timedelta +from enum import Enum import dateutil.parser -from lxml import etree import pytz import yaml +from lxml import etree +from opaque_keys.edx.keys import DefinitionKey, UsageKey + + +if t.TYPE_CHECKING: + from xblock.core import Blocklike # __all__ controls what classes end up in the docs, and in what order. @@ -43,38 +52,50 @@ class ModifyingEnforceTypeWarning(DeprecationWarning): class Sentinel: """ - Class for implementing sentinel objects (only equal to themselves). + Base class for a 'sentinel': i.e., a special value object which is entirely identified by its name. """ - def __init__(self, name): + def __init__(self, name: str): + self._name = name + + @property + def name(self) -> str: """ - `name` is the name used to identify the sentinel (which will - be displayed as the __repr__) of the sentinel. + A string that uniquely and entirely identifies this sentinel. """ - self.name = name + return self._name - def __repr__(self): - return self.name + def __repr__(self) -> str: + return self._name - @property - def attr_name(self): - """ TODO: Look into namespace collisions. block.name_space == block_name.space + def __eq__(self, other: object) -> bool: """ - return self.name.lower().replace('.', '_') - - def __eq__(self, other): - """ Equality is based on being of the same class, and having same name + Two sentinel instances are equal if they have same name (even if their concrete classes differ). """ + # In the original XBlock API, all sentinels were directly constructed from Sentinel class; + # there were no subclasses. So, to preserve backwards compatibility, we must allow for equality + # regardless of the exact Sentinel subclass, unless we DEPR the public Sentinel class. + # For example, we need to ensure that `Sentinel("fields.NO_CACHE_VALUE") == NoCacheValue()`. return isinstance(other, Sentinel) and self.name == other.name - def __hash__(self): + @property + def attr_name(self) -> str: + """ + Same as `self.name`, but with dots (.) replace with underscores (_). + """ + # TODO: I don't think is used outside of the toy runtime. Consider DEPR'ing it next time + # breaking changes are made to the XBlock API. + # NOTE: Theoretically, two different scopes could have colliding `attr_name` values. + return self.name.lower().replace('.', '_') + + def __hash__(self) -> int: """ Use a hash of the name of the sentinel """ return hash(self.name) -class BlockScope: +class BlockScope(Sentinel, Enum): """ Enumeration of block scopes. @@ -88,29 +109,48 @@ class BlockScope: unusual, one block definition can be used in more than one place in a course. - TYPE: The data is related to all instances of this type of XBlock. + TYPE: The data is related to all instances of this type of Blocklike. ALL: The data is common to all blocks. This can be useful for storing information that is purely about the student. - """ - USAGE = Sentinel('BlockScope.USAGE') - DEFINITION = Sentinel('BlockScope.DEFINITION') - TYPE = Sentinel('BlockScope.TYPE') - ALL = Sentinel('BlockScope.ALL') + + # In the original version of the XBlock API, all BlockScopes and UserScopes were + # directly constructed as Sentinel objects. Later, we turned Sentinel into an + # abstract class, with concerete subclasses for different types of Sentinels, + # allowing for richer type definitions (e.g., ensuring that nobody can set a + # BlockScope variable to NO_CACHE_VALUE, etc.). With that change made, BlockScope + # and UserScope don't really need to inherit from Sentinel... Enum is sufficient. + # But, removing Sentinel from their inheritance chains could theoretically break + # some edge-cases uses of the XBlock API. In the future, through the DEPR process, + # it might make sense to removal Sentinel from the public XBlock API entirely; + # API users should only need to know about Sentinel's subclasses. + + USAGE = 'BlockScope.USAGE' + DEFINITION = 'BlockScope.DEFINITION' + TYPE = 'BlockScope.TYPE' + ALL = 'BlockScope.ALL' @classmethod - def scopes(cls): + def scopes(cls) -> list[BlockScope]: """ Return a list of valid/understood class scopes. """ - # Why do we need this? This should either - # * Be bubbled to the places where it is used (AcidXBlock). - # * Be automatic. Look for all members of a type. - return [cls.USAGE, cls.DEFINITION, cls.TYPE, cls.ALL] + return list(cls) + + @property + def name(self) -> str: # pylint: disable=function-redefined + """ + A string the uniquely and entirely identifies this object. + + Equivalent to `self.value`. + """ + # For backwards compatibility with Sentinel, name must return "BlockScope.", + # overriding the default Enum.name property which would just return "". + return self._name -class UserScope: +class UserScope(Sentinel, Enum): """ Enumeration of user scopes. @@ -118,11 +158,11 @@ class UserScope: :class:`.BlockScope` and a :class:`.UserScope` are combined to make a :class:`.Scope` for a field. - NONE: Identifies data agnostic to the user of the :class:`.XBlock`. The + NONE: Identifies data agnostic to the user of the :class:`.Blocklike`. The data is related to no particular user. All users see the same data. For instance, the definition of a problem. - ONE: Identifies data particular to a single user of the :class:`.XBlock`. + ONE: Identifies data particular to a single user of the :class:`.Blocklike`. For instance, a student's answer to a problem. ALL: Identifies data aggregated while the block is used by many users. @@ -131,25 +171,50 @@ class UserScope: submitted by all students. """ - NONE = Sentinel('UserScope.NONE') - ONE = Sentinel('UserScope.ONE') - ALL = Sentinel('UserScope.ALL') + # (See the comment in BlockScope for historical context on this class and Sentinel) + + NONE = 'UserScope.NONE' + ONE = 'UserScope.ONE' + ALL = 'UserScope.ALL' @classmethod - def scopes(cls): + def scopes(cls) -> list[UserScope]: """ Return a list of valid/understood class scopes. Why do we need this? I believe it is not used anywhere. """ - return [cls.NONE, cls.ONE, cls.ALL] + return list(cls) + + @property + def name(self) -> str: # pylint: disable=function-redefined + """ + A string the uniquely and entirely identifies this object. + Equivalent to `self.value`. + """ + # For backwards compatibility with Sentinel, name must return "UserScope.", + # overriding the default Enum.name property which would just return "". + return self._name -UNSET = Sentinel("fields.UNSET") -ScopeBase = namedtuple('ScopeBase', 'user block name') +class ParentScope(Sentinel): + """ + A sentinel identifying a special Scope for the `parent` XBlock field. + """ + def __init__(self): + super().__init__("Scopes.parent") + + +class ChildrenScope(Sentinel): + """ + A sentinel identifying a special Scope for the `children` XBlock field. + """ + def __init__(self): + super().__init__("Scopes.children") -class Scope(ScopeBase): +@dataclass +class Scope: """ Defines six types of scopes to be used: `content`, `settings`, `user_state`, `preferences`, `user_info`, and `user_state_summary`. @@ -182,15 +247,41 @@ class Scope(ScopeBase): the points scored by all users attempting a problem. """ - content = ScopeBase(UserScope.NONE, BlockScope.DEFINITION, 'content') - settings = ScopeBase(UserScope.NONE, BlockScope.USAGE, 'settings') - user_state = ScopeBase(UserScope.ONE, BlockScope.USAGE, 'user_state') - preferences = ScopeBase(UserScope.ONE, BlockScope.TYPE, 'preferences') - user_info = ScopeBase(UserScope.ONE, BlockScope.ALL, 'user_info') - user_state_summary = ScopeBase(UserScope.ALL, BlockScope.USAGE, 'user_state_summary') + # Fields + user: UserScope + block: BlockScope + name: str | None = None + + # Predefined User+Block scopes (declard here, defined below) + content: t.ClassVar[t.Self] + settings: t.ClassVar[t.Self] + user_state: t.ClassVar[t.Self] + preferences: t.ClassVar[t.Self] + user_info: t.ClassVar[t.Self] + user_state_summary: t.ClassVar[t.Self] + + # Predefined special scopes + children: t.ClassVar = ChildrenScope() + parent: t.ClassVar = ParentScope() + + def __post__init__(self): + """ + Set a fallback scope name if none is provided. + """ + self.name = self.name or f"{self.user}_{self.block}" + + @property + def scope_name(self) -> str: + """ + Alias to `self.name`. + + Note: `self.name` and `self.scope_name` will always be equivalent and non-None, but for historical reasons, + only `self.scope_name` can guarantee its non-None-ness to the type checker. + """ + return self.name # type: ignore @classmethod - def named_scopes(cls): + def named_scopes(cls) -> list[Scope]: """Return all named Scopes.""" return [ cls.content, @@ -202,7 +293,7 @@ def named_scopes(cls): ] @classmethod - def scopes(cls): + def scopes(cls) -> list[Scope]: """Return all possible Scopes.""" named_scopes = cls.named_scopes() return named_scopes + [ @@ -212,56 +303,103 @@ def scopes(cls): if cls(user, block) not in named_scopes ] - def __new__(cls, user, block, name=None): - """Create a new Scope, with an optional name.""" + def __str__(self) -> str: + return self.scope_name - if name is None: - name = f'{user}_{block}' + def __eq__(self, other: object) -> bool: + return isinstance(other, Scope) and self.user == other.user and self.block == other.block - return ScopeBase.__new__(cls, user, block, name) + def __hash__(self) -> int: + return hash(('xblock.fields.Scope', self.user, self.block)) - children = Sentinel('Scope.children') - parent = Sentinel('Scope.parent') - def __str__(self): - return self.name +Scope.content = Scope(UserScope.NONE, BlockScope.DEFINITION, 'content') +Scope.settings = Scope(UserScope.NONE, BlockScope.USAGE, 'settings') +Scope.user_state = Scope(UserScope.ONE, BlockScope.USAGE, 'user_state') +Scope.preferences = Scope(UserScope.ONE, BlockScope.TYPE, 'preferences') +Scope.user_info = Scope(UserScope.ONE, BlockScope.ALL, 'user_info') +Scope.user_state_summary = Scope(UserScope.ALL, BlockScope.USAGE, 'user_state_summary') - def __eq__(self, other): - return isinstance(other, Scope) and self.user == other.user and self.block == other.block - def __hash__(self): - return hash(('xblock.fields.Scope', self.user, self.block)) +# Originally, ScopeBase and Scope were two separate classes, so that Scope could +# override ScopeBase.__new__ to provide a default name. Now that we have dataclasses, +# such a hack is unneccessary. We merge the classes, but keep ScopeBase as an alias +# for backwards compatibility. +ScopeBase = Scope -class ScopeIds(namedtuple('ScopeIds', 'user_id block_type def_id usage_id')): +class ScopeIds(t.NamedTuple): """ A simple wrapper to collect all of the ids needed to correctly identify an XBlock (or other classes deriving from ScopedStorageMixin) to a FieldData. These identifiers match up with BlockScope and UserScope attributes, so that, for instance, the `def_id` identifies scopes that use BlockScope.DEFINITION. """ - __slots__ = () + user_id: int | str | None + block_type: str + def_id: DefinitionKey + usage_id: UsageKey + + +ScopeIds.__slots__ = () # type: ignore + + +class Unset(Sentinel): + """ + Indicates that default value has not been provided. + """ + def __init__(self): + super().__init__("fields.UNSET") + + +class UniqueIdPlaceholder(Sentinel): + """ + A special reference that can be used as a field's default in field + definition to signal that the field should default to a unique string value + calculated at runtime. + """ + def __init__(self): + super().__init__("fields.UNIQUE_ID") -# Define special reference that can be used as a field's default in field -# definition to signal that the field should default to a unique string value -# calculated at runtime. -UNIQUE_ID = Sentinel("fields.UNIQUE_ID") +class NoCacheValue(Sentinel): + """ + Placeholder ('nil') value to indicate when nothing has been stored + in the cache ("None" may be a valid value in the cache, so we cannot use it). + """ + def __init__(self): + super().__init__("fields.NO_CACHE_VALUE") -# define a placeholder ('nil') value to indicate when nothing has been stored -# in the cache ("None" may be a valid value in the cache, so we cannot use it). -NO_CACHE_VALUE = Sentinel("fields.NO_CACHE_VALUE") -# define a placeholder value that indicates that a value is explicitly dirty, -# because it was explicitly set -EXPLICITLY_SET = Sentinel("fields.EXPLICITLY_SET") +class ExplicitlySet(Sentinel): + """ + Placeholder value that indicates that a value is explicitly dirty, + because it was explicitly set. + """ + def __init__(self): + super().__init__("fields.EXPLICITLY_SET") + + +# For backwards API compatibility, define an instance of each Field-related sentinel. +# These could be be removed in favor of just constructing the class (e.g., every use of +# `UNIQUE_ID` could be replaced with `UniqueIdPlaceholder()`) but that would require a +# DEPR ticket. +UNSET = Unset() +UNIQUE_ID = UniqueIdPlaceholder() +NO_CACHE_VALUE = NoCacheValue() +EXPLICITLY_SET = ExplicitlySet() + # Fields that cannot have runtime-generated defaults. These are special, # because they define the structure of XBlock trees. NO_GENERATED_DEFAULTS = ('parent', 'children') +# Type parameters of Fields. These only matter for static type analysis (mypy). +FieldValue = t.TypeVar("FieldValue") # What does the field hold? +InnerFieldValue = t.TypeVar("InnerFieldValue") # For Dict/List/Set fields: What do they contain? + -class Field: +class Field(t.Generic[FieldValue]): """ A field class that can be used as a class attribute to define what data the class will want to refer to. @@ -305,23 +443,31 @@ class will want to refer to. runtime_options. """ - MUTABLE = True - _default = None - # Indicates if a field's None value should be sent to the XML representation. - none_to_xml = False - - __name__ = None + MUTABLE: bool = True + _default: FieldValue | None | UniqueIdPlaceholder = None + __name__: str | None = None - # We're OK redefining built-in `help` - def __init__(self, help=None, default=UNSET, scope=Scope.content, # pylint:disable=redefined-builtin - display_name=None, values=None, enforce_type=False, - xml_node=False, force_export=False, **kwargs): + # Indicates if a field's None value should be sent to the XML representation. + none_to_xml: bool = False + + def __init__( + self, + help: str | None = None, # pylint:disable=redefined-builtin + default: FieldValue | None | UniqueIdPlaceholder | Unset = Unset(), + scope: Scope = Scope.content, + display_name: str | None = None, + values: list[object] | None = None, + enforce_type: bool = False, + xml_node: bool = False, + force_export: bool = False, + **kwargs + ): self.warned = False self.help = help self._enable_enforce_type = enforce_type - if default is not UNSET: - if default is UNIQUE_ID: - self._default = UNIQUE_ID + if not isinstance(default, Unset): + if isinstance(default, UniqueIdPlaceholder): + self._default = UniqueIdPlaceholder() else: self._default = self._check_or_enforce_type(default) self.scope = scope @@ -332,7 +478,7 @@ def __init__(self, help=None, default=UNSET, scope=Scope.content, # pylint:disa self.force_export = force_export @property - def default(self): + def default(self) -> FieldValue | None | UniqueIdPlaceholder: """Returns the static value that this defaults to.""" if self.MUTABLE: return copy.deepcopy(self._default) @@ -340,20 +486,20 @@ def default(self): return self._default @staticmethod - def needs_name(field): + def needs_name(field) -> bool: """ - Returns whether the given ) is yet to be named. + Returns whether the given field is yet to be named. """ return not field.__name__ @property - def name(self): + def name(self) -> str: """Returns the name of this field.""" # This is set by ModelMetaclass return self.__name__ or 'unknown' @property - def values(self): + def values(self) -> t.Any: """ Returns the valid values for this class. This is useful for representing possible values in a UI. @@ -386,7 +532,7 @@ def values(self): return self._values @property - def display_name(self): + def display_name(self) -> str: """ Returns the display name for this class, suitable for use in a GUI. @@ -394,27 +540,26 @@ def display_name(self): """ return self._display_name if self._display_name is not None else self.name - def _get_cached_value(self, xblock): + def _get_cached_value(self, xblock: Blocklike) -> FieldValue | None | NoCacheValue: """ Return a value from the xblock's cache, or a marker value if either the cache doesn't exist or the value is not found in the cache. """ - return getattr(xblock, '_field_data_cache', {}).get(self.name, NO_CACHE_VALUE) + # pylint: disable=protected-access + return xblock._field_data_cache.get(self.name, NoCacheValue()) - def _set_cached_value(self, xblock, value): + def _set_cached_value(self, xblock: Blocklike, value: FieldValue | None): """Store a value in the xblock's cache, creating the cache if necessary.""" # pylint: disable=protected-access - if not hasattr(xblock, '_field_data_cache'): - xblock._field_data_cache = {} xblock._field_data_cache[self.name] = value - def _del_cached_value(self, xblock): + def _del_cached_value(self, xblock: Blocklike): """Remove a value from the xblock's cache, if the cache exists.""" # pylint: disable=protected-access - if hasattr(xblock, '_field_data_cache') and self.name in xblock._field_data_cache: + if self.name in xblock._field_data_cache: del xblock._field_data_cache[self.name] - def _mark_dirty(self, xblock, value): + def _mark_dirty(self, xblock: Blocklike, value: FieldValue | None | ExplicitlySet): """Set this field to dirty on the xblock.""" # pylint: disable=protected-access @@ -423,7 +568,7 @@ def _mark_dirty(self, xblock, value): if self not in xblock._dirty_fields: xblock._dirty_fields[self] = copy.deepcopy(value) - def _is_dirty(self, xblock): + def _is_dirty(self, xblock: Blocklike) -> bool: """ Return whether this field should be saved when xblock.save() is called """ @@ -432,15 +577,15 @@ def _is_dirty(self, xblock): return False baseline = xblock._dirty_fields[self] - return baseline is EXPLICITLY_SET or xblock._field_data_cache[self.name] != baseline + return isinstance(baseline, ExplicitlySet) or xblock._field_data_cache[self.name] != baseline - def _is_lazy(self, value): + def _is_lazy(self, value: FieldValue | None) -> bool: """ Detect if a value is being evaluated lazily by Django. """ return 'django.utils.functional.' in str(type(value)) - def _check_or_enforce_type(self, value): + def _check_or_enforce_type(self, value: t.Any) -> FieldValue | None: """ Depending on whether enforce_type is enabled call self.enforce_type and return the result or call it and trigger a silent warning if the result @@ -469,9 +614,9 @@ def _check_or_enforce_type(self, value): value, new_value) warnings.warn(message, ModifyingEnforceTypeWarning, stacklevel=3) - return value + return value # type: ignore - def _calculate_unique_id(self, xblock): + def _calculate_unique_id(self, xblock: Blocklike) -> str: """ Provide a default value for fields with `default=UNIQUE_ID`. @@ -481,7 +626,7 @@ def _calculate_unique_id(self, xblock): key = scope_key(self, xblock) return hashlib.sha1(key.encode('utf-8')).hexdigest() - def _get_default_value_to_cache(self, xblock): + def _get_default_value_to_cache(self, xblock: Blocklike) -> FieldValue | None: """ Perform special logic to provide a field's default value for caching. """ @@ -489,39 +634,48 @@ def _get_default_value_to_cache(self, xblock): # pylint: disable=protected-access return self.from_json(xblock._field_data.default(xblock, self.name)) except KeyError: - if self._default is UNIQUE_ID: + if isinstance(self.default, UniqueIdPlaceholder): return self._check_or_enforce_type(self._calculate_unique_id(xblock)) else: return self.default - def _sanitize(self, value): + def _sanitize(self, value: FieldValue | None) -> FieldValue | None: """ Allow the individual fields to sanitize the value being set -or- "get". For example, a String field wants to remove control characters. """ return value - def __get__(self, xblock, xblock_class): + def __get__(self, xblock: Blocklike, xblock_class: type[Blocklike]) -> FieldValue | None: """ Gets the value of this xblock. Prioritizes the cached value over obtaining the value from the field-data service. Thus if a cached value exists, that is the value that will be returned. """ if xblock is None: + # Special case: When accessing the field directly on the class, return the field, not the value. + # e.g., `MyXBlock.my_field` returns a `Field` instance. + # This is standard Python descriptor behavior. return self field_data = xblock._field_data - value = self._get_cached_value(xblock) - if value is NO_CACHE_VALUE: + cache_value = self._get_cached_value(xblock) + value: FieldValue | None + if isinstance(cache_value, NoCacheValue): if field_data.has(xblock, self.name): value = self.from_json(field_data.get(xblock, self.name)) elif self.name not in NO_GENERATED_DEFAULTS: # Cache default value value = self._get_default_value_to_cache(xblock) else: - value = self.default + # Special handling for 'parent' and 'children' fields. + # The type checker complains that `self.default` could be a UniqueIdPlaceholder, + # but that's not possible here. + value = self.default # type: ignore[assignment] self._set_cached_value(xblock, value) + else: + value = cache_value # If this is a mutable type, mark it as dirty, since mutations can occur without an # explicit call to __set__ (but they do require a call to __get__) @@ -530,7 +684,7 @@ def __get__(self, xblock, xblock_class): return self._sanitize(value) - def __set__(self, xblock, value): + def __set__(self, xblock: Blocklike, value: FieldValue | None) -> None: """ Sets the `xblock` to the given `value`. Setting a value does not update the underlying data store; the @@ -542,7 +696,7 @@ def __set__(self, xblock, value): """ value = self._check_or_enforce_type(value) value = self._sanitize(value) - cached_value = self._get_cached_value(xblock) + cached_value = self._get_cached_value(xblock) # note: could be NoCacheValue try: value_has_changed = cached_value != value except Exception: # pylint: disable=broad-except @@ -551,10 +705,10 @@ def __set__(self, xblock, value): value_has_changed = True if value_has_changed: # Mark the field as dirty and update the cache - self._mark_dirty(xblock, EXPLICITLY_SET) + self._mark_dirty(xblock, ExplicitlySet()) self._set_cached_value(xblock, value) - def __delete__(self, xblock): + def __delete__(self, xblock: Blocklike): """ Deletes `xblock` from the underlying data store. Deletes are not cached; they are performed immediately. @@ -598,7 +752,7 @@ def _warn_deprecated_outside_JSONField(self): # pylint: disable=invalid-name ) self.warned = True - def to_json(self, value): + def to_json(self, value: FieldValue | None) -> t.Any: """ Return value in the form of nested lists and dictionaries (suitable for passing to json.dumps). @@ -609,7 +763,7 @@ def to_json(self, value): self._warn_deprecated_outside_JSONField() return value - def from_json(self, value): + def from_json(self, value) -> FieldValue | None: """ Return value as a native full featured python type (the inverse of to_json) @@ -617,22 +771,21 @@ def from_json(self, value): object """ self._warn_deprecated_outside_JSONField() - return value + return value # type: ignore - def to_string(self, value): + def to_string(self, value: FieldValue | None) -> str: """ Return a JSON serialized string representation of the value. """ self._warn_deprecated_outside_JSONField() - value = json.dumps( + return json.dumps( self.to_json(value), indent=2, sort_keys=True, separators=(',', ': '), ) - return value - def from_string(self, serialized): + def from_string(self, serialized: str) -> FieldValue | None: """ Returns a native value from a YAML serialized string representation. Since YAML is a superset of JSON, this is the inverse of to_string.) @@ -641,7 +794,7 @@ def from_string(self, serialized): value = yaml.safe_load(serialized) return self.enforce_type(value) - def enforce_type(self, value): + def enforce_type(self, value: t.Any) -> FieldValue | None: """ Coerce the type of the value, if necessary @@ -651,52 +804,52 @@ def enforce_type(self, value): This must not have side effects, since it will be executed to trigger a DeprecationWarning even if enforce_type is disabled """ - return value + return value # type: ignore - def read_from(self, xblock): + def read_from(self, xblock: Blocklike) -> FieldValue | None: """ Retrieve the value for this field from the specified xblock """ return self.__get__(xblock, xblock.__class__) # pylint: disable=unnecessary-dunder-call - def read_json(self, xblock): + def read_json(self, xblock: Blocklike) -> FieldValue | None: """ Retrieve the serialized value for this field from the specified xblock """ self._warn_deprecated_outside_JSONField() return self.to_json(self.read_from(xblock)) - def write_to(self, xblock, value): + def write_to(self, xblock: Blocklike, value: FieldValue | None) -> None: """ Set the value for this field to value on the supplied xblock """ self.__set__(xblock, value) # pylint: disable=unnecessary-dunder-call - def delete_from(self, xblock): + def delete_from(self, xblock: Blocklike) -> None: """ Delete the value for this field from the supplied xblock """ self.__delete__(xblock) # pylint: disable=unnecessary-dunder-call - def is_set_on(self, xblock): + def is_set_on(self, xblock: Blocklike) -> bool: """ Return whether this field has a non-default value on the supplied xblock """ # pylint: disable=protected-access return self._is_dirty(xblock) or xblock._field_data.has(xblock, self.name) - def __hash__(self): + def __hash__(self) -> int: return hash(self.name) -class JSONField(Field): +class JSONField(Field, t.Generic[FieldValue]): """ Field type which has a convenient JSON representation. """ # for now; we'll bubble functions down when we finish deprecation in Field -class Integer(JSONField): +class Integer(JSONField[int]): """ A field that contains an integer. @@ -710,7 +863,7 @@ class Integer(JSONField): """ MUTABLE = False - def from_json(self, value): + def from_json(self, value) -> int | None: if value is None or value == '': return None return int(value) @@ -718,7 +871,7 @@ def from_json(self, value): enforce_type = from_json -class Float(JSONField): +class Float(JSONField[float]): """ A field that contains a float. @@ -729,7 +882,7 @@ class Float(JSONField): """ MUTABLE = False - def from_json(self, value): + def from_json(self, value) -> float | None: if value is None or value == '': return None return float(value) @@ -737,7 +890,7 @@ def from_json(self, value): enforce_type = from_json -class Boolean(JSONField): +class Boolean(JSONField[bool]): """ A field class for representing a boolean. @@ -766,7 +919,7 @@ def __init__(self, help=None, default=UNSET, scope=Scope.content, display_name=N values=({'display_name': "True", "value": True}, {'display_name': "False", "value": False}), **kwargs) - def from_json(self, value): + def from_json(self, value) -> bool | None: if isinstance(value, bytes): value = value.decode('ascii', errors='replace') if isinstance(value, str): @@ -777,7 +930,7 @@ def from_json(self, value): enforce_type = from_json -class Dict(JSONField): +class Dict(JSONField[t.Dict[str, InnerFieldValue]], t.Generic[InnerFieldValue]): """ A field class for representing a Python dict. @@ -786,7 +939,7 @@ class Dict(JSONField): """ _default = {} - def from_json(self, value): + def from_json(self, value) -> dict[str, InnerFieldValue] | None: if value is None or isinstance(value, dict): return value else: @@ -794,7 +947,7 @@ def from_json(self, value): enforce_type = from_json - def to_string(self, value): + def to_string(self, value) -> str: """ In python3, json.dumps() cannot sort keys of different types, so preconvert None to 'null'. @@ -807,7 +960,7 @@ def to_string(self, value): return super().to_string(value) -class List(JSONField): +class List(JSONField[t.List[InnerFieldValue]], t.Generic[InnerFieldValue]): """ A field class for representing a list. @@ -816,7 +969,7 @@ class List(JSONField): """ _default = [] - def from_json(self, value): + def from_json(self, value) -> list[InnerFieldValue] | None: if value is None or isinstance(value, list): return value else: @@ -825,7 +978,7 @@ def from_json(self, value): enforce_type = from_json -class Set(JSONField): +class Set(JSONField[t.List[InnerFieldValue]], t.Generic[InnerFieldValue]): """ A field class for representing a set. @@ -844,7 +997,7 @@ def __init__(self, *args, **kwargs): self._default = set(self._default) - def from_json(self, value): + def from_json(self, value) -> set[InnerFieldValue] | None: if value is None or isinstance(value, set): return value else: @@ -853,7 +1006,7 @@ def from_json(self, value): enforce_type = from_json -class String(JSONField): +class String(JSONField[str]): """ A field class for representing a string. @@ -863,7 +1016,7 @@ class String(JSONField): MUTABLE = False BAD_REGEX = re.compile('[\x00-\x08\x0b\x0c\x0e-\x1f\ud800-\udfff\ufffe\uffff]', flags=re.UNICODE) - def _sanitize(self, value): + def _sanitize(self, value) -> str | None: """ Remove the control characters that are not allowed in XML: https://www.w3.org/TR/xml/#charsets @@ -883,7 +1036,7 @@ def _sanitize(self, value): else: return value - def from_json(self, value): + def from_json(self, value) -> str | None: if value is None: return None elif isinstance(value, (bytes, str)): @@ -895,20 +1048,23 @@ def from_json(self, value): else: raise TypeError('Value stored in a String must be None or a string, found %s' % type(value)) - def from_string(self, serialized): + def from_string(self, serialized) -> str | None: """String gets serialized and deserialized without quote marks.""" return self.from_json(serialized) - def to_string(self, value): + def to_string(self, value) -> str: """String gets serialized and deserialized without quote marks.""" if isinstance(value, bytes): value = value.decode('utf-8') return self.to_json(value) - @property - def none_to_xml(self): - """Returns True to use a XML node for the field and represent None as an attribute.""" - return True + def enforce_type(self, value: t.Any) -> str | None: + """ + (no-op override just to make mypy happy about XMLString.enforce_type) + """ + return super().enforce_type(value) + + none_to_xml = True # Use an XML node for the field, and represent None as an attribute. enforce_type = from_json @@ -922,7 +1078,7 @@ class XMLString(String): an lxml.etree.XMLSyntaxError will be raised. """ - def to_json(self, value): + def to_json(self, value) -> t.Any: """ Serialize the data, ensuring that it is valid XML (or None). @@ -933,13 +1089,13 @@ def to_json(self, value): value = self.enforce_type(value) return super().to_json(value) - def enforce_type(self, value): + def enforce_type(self, value: t.Any) -> str | None: if value is not None: etree.XML(value) - return value + return value # type: ignore -class DateTime(JSONField): +class DateTime(JSONField[t.Union[datetime, timedelta]]): """ A field for representing a datetime. @@ -949,7 +1105,7 @@ class DateTime(JSONField): DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' - def from_json(self, value): + def from_json(self, value) -> datetime | timedelta | None: """ Parse the date from an ISO-formatted date string, or None. """ @@ -971,16 +1127,16 @@ def from_json(self, value): # Interpret raw numbers as a relative dates if isinstance(value, (int, float)): - value = datetime.timedelta(seconds=value) + value = timedelta(seconds=value) - if not isinstance(value, (datetime.datetime, datetime.timedelta)): + if not isinstance(value, (datetime, timedelta)): raise TypeError( "Value should be loaded from a string, a datetime object, a timedelta object, or None, not {}".format( type(value) ) ) - if isinstance(value, datetime.datetime): + if isinstance(value, datetime): if value.tzinfo is not None: return value.astimezone(pytz.utc) else: @@ -988,14 +1144,14 @@ def from_json(self, value): else: return value - def to_json(self, value): + def to_json(self, value) -> t.Any: """ Serialize the date as an ISO-formatted date string, or None. """ - if isinstance(value, datetime.datetime): + if isinstance(value, datetime): return value.strftime(self.DATETIME_FORMAT) - if isinstance(value, datetime.timedelta): + if isinstance(value, timedelta): return value.total_seconds() if value is None: @@ -1003,14 +1159,14 @@ def to_json(self, value): raise TypeError(f"Value stored must be a datetime or timedelta object, not {type(value)}") - def to_string(self, value): + def to_string(self, value) -> str: """DateTime fields get serialized without quote marks.""" return self.to_json(value) enforce_type = from_json -class Any(JSONField): +class Any(JSONField[t.Any]): """ A field class for representing any piece of data; type is not enforced. @@ -1020,7 +1176,7 @@ class Any(JSONField): """ -class Reference(JSONField): +class Reference(JSONField[UsageKey]): """ An xblock reference. That is, a pointer to another xblock. @@ -1029,7 +1185,7 @@ class Reference(JSONField): """ -class ReferenceList(List): +class ReferenceList(List[UsageKey]): """ An list of xblock references. That is, pointers to xblocks. @@ -1040,7 +1196,19 @@ class ReferenceList(List): # but since Reference doesn't stipulate a definition for from/to, that seems unnecessary at this time. -class ReferenceValueDict(Dict): +class ReferenceListNotNone(ReferenceList): + """ + An list of xblock references. Should not equal None. + + Functionally, this is exactly equivalent to ReferenceList. + To the type-checker, this adds that guarantee that accessing the field will always return + a list of UsageKeys, rather than None OR a list of UsageKeys. + """ + def __get__(self, xblock: Blocklike, xblock_class: type[Blocklike]) -> list[UsageKey]: + return super().__get__(xblock, xblock_class) # type: ignore + + +class ReferenceValueDict(Dict[UsageKey]): """ A dictionary where the values are xblock references. That is, pointers to xblocks. @@ -1051,8 +1219,9 @@ class ReferenceValueDict(Dict): # but since Reference doesn't stipulate a definition for from/to, that seems unnecessary at this time. -def scope_key(instance, xblock): - """Generate a unique key for a scope that can be used as a +def scope_key(instance: Field, xblock: Blocklike) -> str: + """ + Generate a unique key for a scope that can be used as a filename, in a URL, or in a KVS. Our goal is to have a pretty, human-readable 1:1 encoding. @@ -1117,11 +1286,10 @@ def scope_key(instance, xblock): key_list = [] - def encode(char): + def encode(char: str) -> str: """ Replace all non-alphanumeric characters with -n- where n is their Unicode codepoint. - TODO: Test for UTF8 which is not ASCII """ if char.isalnum(): return char diff --git a/xblock/internal.py b/xblock/internal.py index 087eb905c..436b4c23c 100644 --- a/xblock/internal.py +++ b/xblock/internal.py @@ -1,10 +1,21 @@ """ Internal machinery used to make building XBlock family base classes easier. """ +from __future__ import annotations + import functools +import typing as t + + +# The class upon which the property is declared; i.e., the supertype of all classes +# upon which the property will be defined. +BaseT = t.TypeVar('BaseT') + +# The return value of the property. +ReturnT = t.TypeVar('ReturnT') -class LazyClassProperty: +class LazyClassProperty(t.Generic[BaseT, ReturnT]): """ A descriptor that acts as a class-level @lazy. @@ -12,18 +23,30 @@ class LazyClassProperty: executing the decorated method once, and then storing the result in the class __dict__. """ - def __init__(self, constructor): - self.__constructor = constructor - self.__cache = {} - functools.wraps(self.__constructor)(self) - - def __get__(self, instance, owner): - if owner not in self.__cache: - # If __constructor iterates over members, then we don't want to call it + def __init__(self, wrapped: t.Callable[[BaseT], ReturnT]): + + # _wrapped is the original method on BaseT which we are converting to a classproperty. + # Crazy typechecking note: From the perspective of the type checker, the wrapped + # method's argument is an *instance* of BaseT (a "self" arg). In reality, we know that + # the wrapped method's first arg is used as a *subclass* of BaseT (a "cls" arg). So, we + # must subvert the typechecker here by statically casting from [BaseT] to [type[BaseT]]. + self._wrapped: t.Callable[[type[BaseT]], ReturnT] = wrapped # type: ignore[assignment] + + # The caches maps classes (BaseT or subclasses thereof) to return values. + self._cache: dict[type[BaseT], ReturnT] = {} + + # This line transfers the "metadata" (function name, docstring, arg names) from _wrapped + # to self (the descriptor, which becomes the class property). I couldn't get mypy to be + # happy about this, since it doesn't understand that self is callable. + functools.wraps(self._wrapped)(self) # type: ignore + + def __get__(self, _instance: BaseT | None, owner: type[BaseT]) -> ReturnT: + if owner not in self._cache: + # If _wrapped iterates over members, then we don't want to call it # again in an infinite loop. So, preseed the __cache with None. - self.__cache[owner] = None - self.__cache[owner] = self.__constructor(owner) - return self.__cache[owner] + self._cache[owner] = None # type: ignore + self._cache[owner] = self._wrapped(owner) + return self._cache[owner] class_lazy = LazyClassProperty # pylint: disable=invalid-name diff --git a/xblock/plugin.py b/xblock/plugin.py index 42f1ca6ce..4bbb876c6 100644 --- a/xblock/plugin.py +++ b/xblock/plugin.py @@ -3,16 +3,20 @@ This code is in the Runtime layer. """ +from __future__ import annotations + import functools import itertools import logging -import pkg_resources +import typing as t + +from pkg_resources import iter_entry_points, EntryPoint from xblock.internal import class_lazy log = logging.getLogger(__name__) -PLUGIN_CACHE = {} +PLUGIN_CACHE: dict[tuple[str, str], type[Plugin]] = {} class PluginMissingError(Exception): @@ -21,35 +25,33 @@ class PluginMissingError(Exception): class AmbiguousPluginError(Exception): """Raised when a class name produces more than one entry_point.""" - def __init__(self, all_entry_points): + def __init__(self, all_entry_points: list[EntryPoint]): classes = (entpt.load() for entpt in all_entry_points) desc = ", ".join("{0.__module__}.{0.__name__}".format(cls) for cls in classes) msg = f"Ambiguous entry points for {all_entry_points[0].name}: {desc}" super().__init__(msg) -def default_select(identifier, all_entry_points): # pylint: disable=inconsistent-return-statements +def default_select(identifier: str, all_entry_points: list[EntryPoint]) -> EntryPoint: """ - Raise an exception when we have ambiguous entry points. + Raise an exception when we have no entry points or ambiguous entry points. """ - - if len(all_entry_points) == 0: + if not all_entry_points: raise PluginMissingError(identifier) - if len(all_entry_points) == 1: - return all_entry_points[0] - elif len(all_entry_points) > 1: + if len(all_entry_points) > 1: raise AmbiguousPluginError(all_entry_points) + return all_entry_points[0] class Plugin: - """Base class for a system that uses entry_points to load plugins. + """ + Base class for a system that uses entry_points to load plugins. Implementing classes are expected to have the following attributes: `entry_point`: The name of the entry point to load plugins from. - """ - entry_point = None # Should be overwritten by children classes + entry_point: str # Should be overwritten by children classes @class_lazy def extra_entry_points(cls): # pylint: disable=no-self-argument @@ -62,7 +64,7 @@ def extra_entry_points(cls): # pylint: disable=no-self-argument return [] @classmethod - def _load_class_entry_point(cls, entry_point): + def _load_class_entry_point(cls, entry_point: EntryPoint) -> type[t.Self]: """ Load `entry_point`, and set the `entry_point.name` as the attribute `plugin_name` on the loaded object @@ -72,7 +74,12 @@ def _load_class_entry_point(cls, entry_point): return class_ @classmethod - def load_class(cls, identifier, default=None, select=None): + def load_class( + cls, + identifier: str, + default: type[t.Self] | None = None, + select: t.Callable[[str, list[EntryPoint]], EntryPoint] | None = None, + ) -> type[t.Self]: """Load a single class specified by identifier. If `identifier` specifies more than a single class, and `select` is not None, @@ -100,7 +107,7 @@ def select(identifier, all_entry_points): if select is None: select = default_select - all_entry_points = list(pkg_resources.iter_entry_points(cls.entry_point, name=identifier)) + all_entry_points = list(iter_entry_points(cls.entry_point, name=identifier)) for extra_identifier, extra_entry_point in iter(cls.extra_entry_points): if identifier == extra_identifier: all_entry_points.append(extra_entry_point) @@ -114,10 +121,16 @@ def select(identifier, all_entry_points): PLUGIN_CACHE[key] = cls._load_class_entry_point(selected_entry_point) - return PLUGIN_CACHE[key] + result = PLUGIN_CACHE[key] + if not issubclass(result, cls): + raise TypeError( + f"{cls.__name__}.load_class('{identifier}') found the class {result.__name__}; " + f"expected a subclass of {cls.__name__}." + ) + return result @classmethod - def load_classes(cls, fail_silently=True): + def load_classes(cls, fail_silently: bool = True) -> t.Iterable[tuple[str, type[t.Self]]]: """Load all the classes for a plugin. Produces a sequence containing the identifiers and their corresponding @@ -133,7 +146,7 @@ def load_classes(cls, fail_silently=True): contexts. Hence, the flag. """ all_classes = itertools.chain( - pkg_resources.iter_entry_points(cls.entry_point), + iter_entry_points(cls.entry_point), (entry_point for identifier, entry_point in iter(cls.extra_entry_points)), ) for class_ in all_classes: @@ -146,15 +159,15 @@ def load_classes(cls, fail_silently=True): raise @classmethod - def register_temp_plugin(cls, class_, identifier=None, dist='xblock'): - """Decorate a function to run with a temporary plugin available. + def register_temp_plugin(cls, class_: type, identifier: str | None = None, dist: str = 'xblock'): + """ + Decorate a function to run with a temporary plugin available. Use it like this in tests:: @register_temp_plugin(MyXBlockClass): def test_the_thing(): # Here I can load MyXBlockClass by name. - """ from unittest.mock import Mock # pylint: disable=import-outside-toplevel diff --git a/xblock/reference/plugins.py b/xblock/reference/plugins.py index 17825942e..1e728b48f 100644 --- a/xblock/reference/plugins.py +++ b/xblock/reference/plugins.py @@ -6,25 +6,7 @@ Much of this still needs to be organized. """ -try: - from django.core.exceptions import ImproperlyConfigured -except ImportError: - class ImproperlyConfigured(Exception): - ''' - If Django is installed, and djpyfs is installed, but we're not in a - Django app, we'll get this exception. We'd like to catch - it. But we don't want the try/except to fail even if we're - either in a proper Django app, or don't have Django installed - at all. - ''' - -try: - from djpyfs import djpyfs -except ImportError: - djpyfs = None -except ImproperlyConfigured: - print("Warning! Django is not correctly configured.") - djpyfs = None # pylint: disable=invalid-name +from djpyfs import djpyfs # type: ignore[import-untyped] from xblock.fields import Field, NO_CACHE_VALUE from xblock.fields import scope_key @@ -186,17 +168,7 @@ def load(self, instance, xblock): Get the filesystem for the field specified in 'instance' and the xblock in 'xblock' It is locally scoped. """ - - # TODO: Get xblock from context, once the plumbing is piped through - if djpyfs: - return djpyfs.get_filesystem(scope_key(instance, xblock)) - else: - # The reference implementation relies on djpyfs - # https://github.com/openedx/django-pyfs - # For Django runtimes, you may use this reference - # implementation. Otherwise, you will need to - # patch pyfilesystem yourself to implement get_url. - raise NotImplementedError("djpyfs not available") + return djpyfs.get_filesystem(scope_key(instance, xblock)) def __repr__(self): return "File system object" diff --git a/xblock/run_script.py b/xblock/run_script.py index 123b5c5bc..33d571fd5 100644 --- a/xblock/run_script.py +++ b/xblock/run_script.py @@ -1,10 +1,12 @@ """ Script execution for script fragments in content. """ +import typing as t + import textwrap -def run_script(pycode): +def run_script(pycode: str) -> dict[str, t.Any]: """Run the Python in `pycode`, and return a dict of the resulting globals.""" # Fix up the whitespace in pycode. if pycode[0] == "\n": @@ -13,7 +15,7 @@ def run_script(pycode): pycode = textwrap.dedent(pycode) # execute it. - globs = {} + globs: dict[str, t.Any] = {} exec(pycode, globs, globs) # pylint: disable=W0122 return globs diff --git a/xblock/runtime.py b/xblock/runtime.py index 8aa822dda..652a804c7 100644 --- a/xblock/runtime.py +++ b/xblock/runtime.py @@ -1,6 +1,8 @@ """ Machinery to make the common case easy when building new runtimes """ +from __future__ import annotations + from abc import ABCMeta, abstractmethod from collections import namedtuple import functools @@ -12,14 +14,16 @@ import logging import re import threading +import typing as t import warnings from lxml import etree import markupsafe +from opaque_keys.edx.keys import DefinitionKey, UsageKey from web_fragments.fragment import Fragment -from xblock.core import XBlock, XBlockAside, XML_NAMESPACES +from xblock.core import Blocklike, XBlock, XBlockAside, XML_NAMESPACES from xblock.fields import Field, BlockScope, Scope, ScopeIds, UserScope from xblock.field_data import FieldData from xblock.exceptions import ( @@ -38,40 +42,42 @@ class KeyValueStore(metaclass=ABCMeta): """The abstract interface for Key Value Stores.""" - class Key(namedtuple("Key", "scope, user_id, block_scope_id, field_name, block_family")): + class Key(t.NamedTuple): """ Keys are structured to retain information about the scope of the data. Stores can use this information however they like to store and retrieve data. """ - - def __new__(cls, scope, user_id, block_scope_id, field_name, block_family='xblock.v1'): - return super(KeyValueStore.Key, cls).__new__(cls, scope, user_id, block_scope_id, field_name, block_family) + scope: Scope + user_id: int | str | None + block_scope_id: UsageKey | DefinitionKey | str | None + field_name: str + block_family: str = "xblock.v1" @abstractmethod - def get(self, key): + def get(self, key: Key) -> t.Any: """Reads the value of the given `key` from storage.""" @abstractmethod - def set(self, key, value): + def set(self, key: Key, value: t.Any) -> None: """Sets `key` equal to `value` in storage.""" @abstractmethod - def delete(self, key): + def delete(self, key: Key) -> None: """Deletes `key` from storage.""" @abstractmethod - def has(self, key): + def has(self, key: Key) -> bool: """Returns whether or not `key` is present in storage.""" - def default(self, key): + def default(self, key: Key): """ Returns the context relevant default of the given `key` or raise KeyError which will result in the field's global default. """ raise KeyError(repr(key)) - def set_many(self, update_dict): + def set_many(self, update_dict: dict[Key, t.Any]) -> None: """ For each (`key, value`) in `update_dict`, set `key` to `value` in storage. @@ -90,8 +96,8 @@ class DictKeyValueStore(KeyValueStore): A `KeyValueStore` that stores everything into a Python dictionary. """ - def __init__(self, storage=None): - self.db_dict = storage if storage is not None else {} + def __init__(self, storage: dict[KeyValueStore.Key, t.Any] | None = None): + self.db_dict: dict[KeyValueStore.Key, t.Any] = storage if storage is not None else {} def get(self, key): return self.db_dict[key] @@ -115,19 +121,19 @@ class KvsFieldData(FieldData): that uses the correct scoped keys for the underlying KeyValueStore """ - def __init__(self, kvs, **kwargs): + def __init__(self, kvs: KeyValueStore, **kwargs): super().__init__(**kwargs) self._kvs = kvs - def __repr__(self): + def __repr__(self) -> str: return "{0.__class__.__name__}({0._kvs!r})".format(self) - def _getfield(self, block, name): + def _getfield(self, block: Blocklike, name: str) -> Field: """ Return the field with the given `name` from `block`. If no field with `name` exists in any namespace, raises a KeyError. - :param block: xblock to retrieve the field from + :param block: Blocklike to retrieve the field from :type block: :class:`~xblock.core.XBlock` :param name: name of the field to retrieve :type name: str @@ -143,7 +149,7 @@ def _getfield(self, block, name): # really doesn't name a field raise KeyError(name) - def _key(self, block, name): + def _key(self, block: Blocklike, name: str) -> KeyValueStore.Key: """ Resolves `name` to a key, in the following form: @@ -156,6 +162,7 @@ def _key(self, block, name): ) """ field = self._getfield(block, name) + block_id: UsageKey | DefinitionKey | str | None # Type depends on field scope. if field.scope in (Scope.children, Scope.parent): block_id = block.scope_ids.usage_id user_id = None @@ -179,13 +186,13 @@ def _key(self, block, name): key = KeyValueStore.Key( scope=field.scope, user_id=user_id, - block_scope_id=block_id, # pylint: disable=possibly-used-before-assignment + block_scope_id=block_id, field_name=name, block_family=block.entry_point, ) return key - def get(self, block, name): + def get(self, block: Blocklike, name: str) -> t.Any: """ Retrieve the value for the field named `name`. @@ -194,19 +201,19 @@ def get(self, block, name): """ return self._kvs.get(self._key(block, name)) - def set(self, block, name, value): + def set(self, block: Blocklike, name: str, value: t.Any) -> None: """ Set the value of the field named `name` """ self._kvs.set(self._key(block, name), value) - def delete(self, block, name): + def delete(self, block: Blocklike, name: str) -> None: """ Reset the value of the field named `name` to the default """ self._kvs.delete(self._key(block, name)) - def has(self, block, name): + def has(self, block: Blocklike, name: str) -> bool: """ Return whether or not the field named `name` has a non-default value """ @@ -215,7 +222,7 @@ def has(self, block, name): except KeyError: return False - def set_many(self, block, update_dict): + def set_many(self, block, update_dict: dict[str, t.Any]): """Update the underlying model with the correct values.""" updated_dict = {} @@ -225,7 +232,7 @@ def set_many(self, block, update_dict): self._kvs.set_many(updated_dict) - def default(self, block, name): + def default(self, block: Blocklike, name: str) -> t.Any: """ Ask the kvs for the default (default implementation which other classes may override). @@ -360,8 +367,8 @@ def create_definition(self, block_type, slug=None): class MemoryIdManager(IdReader, IdGenerator): """A simple dict-based implementation of IdReader and IdGenerator.""" - ASIDE_USAGE_ID = namedtuple('MemoryAsideUsageId', 'usage_id aside_type') - ASIDE_DEFINITION_ID = namedtuple('MemoryAsideDefinitionId', 'definition_id aside_type') + ASIDE_USAGE_ID = namedtuple('MemoryAsideUsageId', 'usage_id aside_type') # type: ignore[name-match] + ASIDE_DEFINITION_ID = namedtuple('MemoryAsideDefinitionId', 'definition_id aside_type') # type: ignore[name-match] def __init__(self): self._ids = itertools.count() @@ -723,6 +730,7 @@ def _usage_id_from_node(self, node, parent_id): node (lxml.etree.Element): The DOM node to interpret. parent_id: The usage ID of the parent block """ + block_type = node.tag # remove xblock-family from elements node.attrib.pop('xblock-family', None) @@ -1222,7 +1230,7 @@ def __delattr__(self, name): # Cache of Mixologist generated classes -_CLASS_CACHE = {} +_CLASS_CACHE: dict[tuple[type, tuple[type, ...]], type] = {} _CLASS_CACHE_LOCK = threading.RLock() diff --git a/xblock/scorable.py b/xblock/scorable.py index 4ddb34d72..7c7ec82ab 100644 --- a/xblock/scorable.py +++ b/xblock/scorable.py @@ -1,17 +1,24 @@ """ Scorable. """ -from collections import namedtuple +from __future__ import annotations + +import typing as t + import logging +from xblock.core import XBlockMixin + log = logging.getLogger(__name__) -Score = namedtuple('Score', ['raw_earned', 'raw_possible']) +class Score(t.NamedTuple): + raw_earned: float + raw_possible: float -class ScorableXBlockMixin: +class ScorableXBlockMixin(XBlockMixin): """ Mixin to handle functionality related to scoring. @@ -26,9 +33,9 @@ class ScorableXBlockMixin: "has_score" boolean attribute. We maintain that identifier here. """ - has_score = True + has_score: bool = True - def rescore(self, only_if_higher): + def rescore(self, only_if_higher: bool | None) -> None: """ Calculate a new raw score and save it to the block. If only_if_higher is True and the score didn't improve, keep the existing score. @@ -39,19 +46,21 @@ def rescore(self, only_if_higher): May also raise other errors in self.calculate_score(). Currently unconstrained. """ + # TODO: Determine whether only_if_higher==False is different then only_if_higher==None. + # If so, document it. If not, change its type from 'bool | None' to just 'bool'. _ = self.runtime.service(self, 'i18n').ugettext if not self.allows_rescore(): - raise TypeError(_('Problem does not support rescoring: {}').format(self.location)) + raise TypeError(_('Problem does not support rescoring: {}').format(self.usage_key)) if not self.has_submitted_answer(): - raise ValueError(_('Cannot rescore unanswered problem: {}').format(self.location)) + raise ValueError(_('Cannot rescore unanswered problem: {}').format(self.usage_key)) new_score = self.calculate_score() self._publish_grade(new_score, only_if_higher) - def allows_rescore(self): + def allows_rescore(self) -> bool: """ Boolean value: Can this problem be rescored? @@ -60,49 +69,37 @@ def allows_rescore(self): """ return True - def has_submitted_answer(self): + def has_submitted_answer(self) -> bool: """ Returns True if the problem has been answered by the runtime user. """ raise NotImplementedError - def get_score(self): + def get_score(self) -> Score: """ Return a raw score already persisted on the XBlock. Should not perform new calculations. - - Returns: - Score(raw_earned=float, raw_possible=float) """ raise NotImplementedError - def set_score(self, score): + def set_score(self, score: Score) -> None: """ Persist a score to the XBlock. The score is a named tuple with a raw_earned attribute and a raw_possible attribute, reflecting the raw earned score and the maximum raw score the student could have earned respectively. - - Arguments: - score: Score(raw_earned=float, raw_possible=float) - - Returns: - None """ raise NotImplementedError - def calculate_score(self): + def calculate_score(self) -> Score: """ Calculate a new raw score based on the state of the problem. This method should not modify the state of the XBlock. - - Returns: - Score(raw_earned=float, raw_possible=float) """ raise NotImplementedError - def _publish_grade(self, score, only_if_higher=None): + def _publish_grade(self, score: Score, only_if_higher: bool | None) -> None: """ Publish a grade to the runtime. """ diff --git a/xblock/test/test_core.py b/xblock/test/test_core.py index c6a6fed10..792e66f3e 100644 --- a/xblock/test/test_core.py +++ b/xblock/test/test_core.py @@ -534,26 +534,26 @@ class FieldTester(XBlock): def test_class_tags(): - xblock = XBlock(None, None, None) + xblock = XBlock(None, scope_ids=Mock()) assert xblock._class_tags == set() # pylint: disable=comparison-with-callable class Sub1Block(XBlock): """Toy XBlock""" - sub1block = Sub1Block(None, None, None) + sub1block = Sub1Block(None, scope_ids=Mock()) assert sub1block._class_tags == set() # pylint: disable=comparison-with-callable @XBlock.tag("cat dog") class Sub2Block(Sub1Block): """Toy XBlock""" - sub2block = Sub2Block(None, None, None) + sub2block = Sub2Block(None, scope_ids=Mock()) assert sub2block._class_tags == {"cat", "dog"} # pylint: disable=comparison-with-callable class Sub3Block(Sub2Block): """Toy XBlock""" - sub3block = Sub3Block(None, None, None) + sub3block = Sub3Block(None, scope_ids=Mock()) assert sub3block._class_tags == {"cat", "dog"} # pylint: disable=comparison-with-callable @XBlock.tag("mixin") @@ -563,7 +563,7 @@ class MixinBlock(XBlock): class Sub4Block(MixinBlock, Sub3Block): """Toy XBlock""" - sub4block = Sub4Block(None, None, None) + sub4block = Sub4Block(None, scope_ids=Mock()) assert sub4block._class_tags == { # pylint: disable=comparison-with-callable "cat", "dog", "mixin" } @@ -780,7 +780,7 @@ def test_services_decorators(): class NoServicesBlock(XBlock): """XBlock requesting no services""" - no_services_block = NoServicesBlock(None, None, None) + no_services_block = NoServicesBlock(None, scope_ids=Mock()) assert not NoServicesBlock._services_requested assert not no_services_block._services_requested diff --git a/xblock/test/test_core_capabilities.py b/xblock/test/test_core_capabilities.py index 0919a35ee..9376b21ff 100644 --- a/xblock/test/test_core_capabilities.py +++ b/xblock/test/test_core_capabilities.py @@ -181,7 +181,7 @@ def an_unsupported_view(self): """ # pragma: no cover - test_xblock = SupportsDecoratorTester(None, None, None) + test_xblock = SupportsDecoratorTester(None, None, mock.Mock()) for view_name, functionality, expected_result in ( ("functionality_supported_view", "a_functionality", True), @@ -213,7 +213,7 @@ def has_support(self, view, functionality): """ return functionality == "a_functionality" - test_xblock = HasSupportOverrideTester(None, None, None) + test_xblock = HasSupportOverrideTester(None, None, mock.Mock()) for view_name, functionality, expected_result in ( ("functionality_supported_view", "a_functionality", True), diff --git a/xblock/test/test_fields.py b/xblock/test/test_fields.py index 6f67f71eb..f039a10fb 100644 --- a/xblock/test/test_fields.py +++ b/xblock/test/test_fields.py @@ -609,12 +609,12 @@ class TestBlock(XBlock): # Change the user id. Unique ID default should change for field_b with # user_state scope, but not for field_a with scope=settings. runtime = TestRuntime(services={'field-data': DictFieldData({})}) - block = TestBlock(runtime, DictFieldData({}), sids._replace(user_id='alice')) + block = TestBlock(runtime, DictFieldData({}), sids._replace(user_id='alice')) # pylint: disable=no-member assert unique_a == block.field_a assert unique_b != block.field_b # Change the usage id. Unique ID default for both fields should change. runtime = TestRuntime(services={'field-data': DictFieldData({})}) - block = TestBlock(runtime, DictFieldData({}), sids._replace(usage_id='usage-2')) + block = TestBlock(runtime, DictFieldData({}), sids._replace(usage_id='usage-2')) # pylint: disable=no-member assert unique_a != block.field_a assert unique_b != block.field_b diff --git a/xblock/test/test_scorable.py b/xblock/test/test_scorable.py index bc3423805..5f78cb009 100644 --- a/xblock/test/test_scorable.py +++ b/xblock/test/test_scorable.py @@ -21,8 +21,8 @@ class StubScorableBlock(scorable.ScorableXBlockMixin): _allows_rescore = None def __init__(self, initial): + super().__init__(runtime=Mock(), field_data=Mock(), scope_ids=Mock()) self.result = initial - self.runtime = Mock() def allows_rescore(self): if self._allows_rescore is not None: diff --git a/xblock/utils/publish_event.py b/xblock/utils/publish_event.py index 570b182ec..b8c5393a9 100644 --- a/xblock/utils/publish_event.py +++ b/xblock/utils/publish_event.py @@ -1,23 +1,26 @@ """ PublishEventMixin: A mixin for publishing events from an XBlock """ +import typing as t -from xblock.core import XBlock +from xblock.core import XBlockMixin, XBlock -class PublishEventMixin: +class PublishEventMixin(XBlockMixin): """ A mixin for publishing events from an XBlock Requires the object to have a runtime.publish method. """ - additional_publish_event_data = {} + additional_publish_event_data: dict[str, t.Any] = {} @XBlock.json_handler - def publish_event(self, data, suffix=''): # pylint: disable=unused-argument + def publish_event(self, data, suffix='') -> dict[str, t.Any]: # pylint: disable=unused-argument """ AJAX handler to allow client-side code to publish a server-side event """ + if not isinstance(data, dict): + return {'result': 'error', 'message': 'Request data must be a JSON object'} try: event_type = data.pop('event_type') except KeyError: @@ -25,7 +28,7 @@ def publish_event(self, data, suffix=''): # pylint: disable=unused-argument return self.publish_event_from_dict(event_type, data) - def publish_event_from_dict(self, event_type, data): + def publish_event_from_dict(self, event_type, data: dict) -> dict[str, t.Any]: """ Combine 'data' with self.additional_publish_event_data and publish an event """ diff --git a/xblock/utils/resources.py b/xblock/utils/resources.py index 1066ffd59..541e717e7 100644 --- a/xblock/utils/resources.py +++ b/xblock/utils/resources.py @@ -9,8 +9,8 @@ import pkg_resources from django.template import Context, Template, Engine from django.template.backends.django import get_installed_libraries -from mako.lookup import TemplateLookup as MakoTemplateLookup -from mako.template import Template as MakoTemplate +from mako.lookup import TemplateLookup as MakoTemplateLookup # type: ignore[import-untyped] +from mako.template import Template as MakoTemplate # type: ignore[import-untyped] class ResourceLoader: diff --git a/xblock/utils/studio_editable.py b/xblock/utils/studio_editable.py index b705854cb..e0c1bff31 100644 --- a/xblock/utils/studio_editable.py +++ b/xblock/utils/studio_editable.py @@ -8,13 +8,15 @@ import logging +import typing as t +from dataclasses import dataclass import simplejson as json from web_fragments.fragment import Fragment -from xblock.core import XBlock, XBlockMixin +from xblock.core import Blocklike, XBlock, XBlockMixin from xblock.exceptions import JsonHandlerError, NoSuchViewError -from xblock.fields import Scope, JSONField, List, Integer, Float, Boolean, String, DateTime +from xblock.fields import Scope, JSONField, List, Integer, Float, Boolean, String, DateTime, Field from xblock.utils.resources import ResourceLoader from xblock.validation import Validation @@ -41,7 +43,12 @@ class FutureFields: XBlock may get persisted even if validation fails). """ - def __init__(self, new_fields_dict, newly_removed_fields, fallback_obj): + def __init__( + self, + new_fields_dict: dict[str, t.Any], + newly_removed_fields: t.Iterable[str], + fallback_obj: Blocklike, + ): """ Create an instance whose attributes come from new_fields_dict and fallback_obj. @@ -54,7 +61,7 @@ def __init__(self, new_fields_dict, newly_removed_fields, fallback_obj): self._blacklist = newly_removed_fields self._fallback_obj = fallback_obj - def __getattr__(self, name): + def __getattr__(self, name: str) -> object: try: return self._new_fields_dict[name] except KeyError: @@ -64,13 +71,14 @@ def __getattr__(self, name): return getattr(self._fallback_obj, name) -class StudioEditableXBlockMixin: +class StudioEditableXBlockMixin(XBlockMixin, XBlock): """ An XBlock mixin to provide a configuration UI for an XBlock in Studio. """ - editable_fields = () # Set this to a list of the names of fields to appear in the editor + # Set this to a list of the names of fields to appear in the editor + editable_fields: tuple[str, ...] = () - def studio_view(self, context): + def studio_view(self, context: dict[str, t.Any]) -> Fragment: """ Render a form for editing this XBlock """ @@ -92,10 +100,11 @@ def studio_view(self, context): fragment.initialize_js('StudioEditableXBlockMixin') return fragment - def _make_field_info(self, field_name, field): # pylint: disable=too-many-statements + def _make_field_info(self, field_name: str, field: Field) -> dict[str, t.Any]: """ Create the information that the template needs to render a form field for this field. """ + # pylint: disable=too-many-statements supported_field_types = ( (Integer, 'integer'), (Float, 'float'), @@ -199,12 +208,12 @@ def ugettext(text): return info @XBlock.json_handler - def submit_studio_edits(self, data, suffix=''): # pylint: disable=unused-argument + def submit_studio_edits(self, data: dict[str, t.Any], suffix='') -> t.Any: # pylint: disable=unused-argument """ AJAX handler for studio_view() Save button """ - values = {} # dict of new field values we are updating - to_reset = [] # list of field names to delete from this XBlock + values: dict[str, t.Any] = {} # dict of new field values we are updating + to_reset: list[str] = [] # list of field names to delete from this XBlock for field_name in self.editable_fields: field = self.fields[field_name] if field_name in data['values']: @@ -233,7 +242,7 @@ def submit_studio_edits(self, data, suffix=''): # pylint: disable=unused-argume else: raise JsonHandlerError(400, validation.to_json()) - def clean_studio_edits(self, data): + def clean_studio_edits(self, data: dict[str, t.Any]) -> None: """ Given POST data dictionary 'data', clean the data before validating it. e.g. fix capitalization, remove trailing spaces, etc. @@ -242,7 +251,7 @@ def clean_studio_edits(self, data): # if "name" in data: # data["name"] = data["name"].strip() - def validate_field_data(self, validation, data): + def validate_field_data(self, validation: Validation, data: Blocklike | FutureFields) -> None: """ Validate this block's field data. Instead of checking fields like self.name, check the fields set on data, e.g. data.name. This allows the same validation method to be re-used @@ -256,7 +265,7 @@ def validate_field_data(self, validation, data): # if data.count <=0: # validation.add(ValidationMessage(ValidationMessage.ERROR, u"Invalid count")) - def validate(self): + def validate(self) -> Validation: """ Validates the state of this XBlock. @@ -269,21 +278,27 @@ def validate(self): @XBlock.needs('mako') -class StudioContainerXBlockMixin(XBlockMixin): +class StudioContainerXBlockMixin(XBlockMixin, XBlock): """ An XBlock mixin to provide convenient use of an XBlock in Studio that wants to allow the user to assign children to it. """ has_author_view = True # Without this flag, studio will use student_view on newly-added blocks :/ - def render_children(self, context, fragment, can_reorder=True, can_add=False): + def render_children( + self, + context: dict[str, t.Any], + fragment: Fragment, + can_reorder: bool = True, + can_add: bool = False, + ) -> None: """ Renders the children of the module with HTML appropriate for Studio. If can_reorder is True, then the children will be rendered to support drag and drop. """ - contents = [] + contents: list[dict[str, t.Any]] = [] - child_context = {'reorderable_items': set()} + child_context: dict[str, t.Any] = {'reorderable_items': set()} if context: child_context.update(context) @@ -310,19 +325,22 @@ def render_children(self, context, fragment, can_reorder=True, can_add=False): 'can_reorder': can_reorder, })) - def author_view(self, context): + def student_view(self, context: dict[str, t.Any]) -> Fragment: + raise NotImplementedError("StudioContainer XBlocks must implement a student_view") + + def author_view(self, context: dict[str, t.Any]) -> Fragment: """ Display a the studio editor when the user has clicked "View" to see the container view, otherwise just show the normal 'author_preview_view' or 'student_view' preview. """ root_xblock = context.get('root_xblock') - if root_xblock and root_xblock.location == self.location: + if root_xblock and root_xblock.usage_key == self.usage_key: # User has clicked the "View" link. Show an editable preview of this block's children return self.author_edit_view(context) return self.author_preview_view(context) - def author_edit_view(self, context): + def author_edit_view(self, context: dict[str, t.Any]) -> Fragment: """ Child blocks can override this to control the view shown to authors in Studio when editing this block's children. @@ -331,7 +349,7 @@ def author_edit_view(self, context): self.render_children(context, fragment, can_reorder=True, can_add=False) return fragment - def author_preview_view(self, context): + def author_preview_view(self, context: dict[str, t.Any]) -> Fragment: """ Child blocks can override this to add a custom preview shown to authors in Studio when not editing this block's children. @@ -339,71 +357,48 @@ def author_preview_view(self, context): return self.student_view(context) +@dataclass(frozen=True) class NestedXBlockSpec: """ Class that allows detailed specification of allowed nested XBlocks. For use with StudioContainerWithNestedXBlocksMixin.allowed_nested_blocks """ + # An XBlock class. + block: type[XBlock] - def __init__( - self, block, single_instance=False, disabled=False, disabled_reason=None, boilerplate=None, - category=None, label=None, - ): - self._block = block - self._single_instance = single_instance - self._disabled = disabled - self._disabled_reason = disabled_reason - self._boilerplate = boilerplate - # Some blocks may not be nesting-aware, but can be nested anyway with a bit of help. - # For example, if you wanted to include an XBlock from a different project that didn't - # yet use XBlock utils, you could specify the category and studio label here. - self._category = category - self._label = label - - @property - def category(self): - """ Block category - used as a computer-readable name of an XBlock """ - return self._category or self._block.CATEGORY + # If True, only allow single nested instance of XBlock + single_instance: bool = False - @property - def label(self): - """ Block label - used as human-readable name of an XBlock """ - return self._label or self._block.STUDIO_LABEL + # If True, renders add buttons disabled - only use when XBlock can't be added at all (i.e. not available). + # To allow single instance of XBlock use single_instance property + disabled: bool = False - @property - def single_instance(self): - """ If True, only allow single nested instance of Xblock """ - return self._single_instance + # If block is disabled this property is used as add button title, giving some hint about why it is disabled + disabled_reason: str | None = None - @property - def disabled(self): - """ - If True, renders add buttons disabled - only use when XBlock can't be added at all (i.e. not available). - To allow single instance of XBlock use single_instance property - """ - return self._disabled + # If not None and not empty used as data-boilerplate attribute value + boilerplate: str | None = None - @property - def disabled_reason(self): - """ - If block is disabled this property is used as add button title, giving some hint about why it is disabled - """ - return self._disabled_reason + # Some blocks may not be nesting-aware, but can be nested anyway with a bit of help. + # For example, if you wanted to include an XBlock from a different project that didn't + # yet use XBlock utils, you could specify the category and studio label here. + # Otherwise, `category` and `label` will be filled in from `block` + category: str = "" # Computer-readable name of an XBlock + label: str = "" # Human-readable name of an XBlock - @property - def boilerplate(self): - """ Boilerplate - if not None and not empty used as data-boilerplate attribute value """ - return self._boilerplate + def __post_init__(self): + self.category = self.category or self.block.CATEGORY + self.label = self.label or self.block.STUDIO_LABEL -class XBlockWithPreviewMixin: +class XBlockWithPreviewMixin(XBlockMixin): """ An XBlock mixin providing simple preview view. It is to be used with StudioContainerWithNestedXBlocksMixin to avoid adding studio wrappers (title, edit button, etc.) to a block when it is rendered as child in parent's author_preview_view """ - def preview_view(self, context): + def preview_view(self, context: dict[str, t.Any]) -> Fragment: """ Preview view - used by StudioContainerWithNestedXBlocksMixin to render nested xblocks in preview context. Default implementation uses author_view if available, otherwise falls back to student_view @@ -414,22 +409,22 @@ def preview_view(self, context): return renderer(context) -class StudioContainerWithNestedXBlocksMixin(StudioContainerXBlockMixin): +class StudioContainerWithNestedXBlocksMixin(StudioContainerXBlockMixin): # pylint: disable=abstract-method """ An XBlock mixin providing interface for specifying allowed nested blocks and adding/previewing them in Studio. """ has_children = True - CHILD_PREVIEW_TEMPLATE = "templates/default_preview_view.html" + CHILD_PREVIEW_TEMPLATE: str = "templates/default_preview_view.html" @property - def loader(self): + def loader(self) -> ResourceLoader: """ Loader for loading and rendering assets stored in child XBlock package """ return loader @property - def allowed_nested_blocks(self): + def allowed_nested_blocks(self) -> list[type[XBlock] | NestedXBlockSpec]: """ Returns a list of allowed nested XBlocks. Each item can be either * An XBlock class @@ -441,7 +436,7 @@ def allowed_nested_blocks(self): """ return [] - def get_nested_blocks_spec(self): + def get_nested_blocks_spec(self) -> list[NestedXBlockSpec]: """ Converts allowed_nested_blocks items to NestedXBlockSpec to provide common interface """ @@ -450,7 +445,7 @@ def get_nested_blocks_spec(self): for block_spec in self.allowed_nested_blocks ] - def author_edit_view(self, context): + def author_edit_view(self, context: dict[str, t.Any]) -> Fragment: """ View for adding/editing nested blocks """ @@ -473,7 +468,7 @@ def author_edit_view(self, context): fragment.initialize_js('StudioContainerXBlockWithNestedXBlocksMixin') return fragment - def author_preview_view(self, context): + def author_preview_view(self, context: dict[str, t.Any]) -> Fragment: """ View for previewing contents in studio. """ @@ -494,7 +489,7 @@ def author_preview_view(self, context): fragment.add_content(self.loader.render_django_template(self.CHILD_PREVIEW_TEMPLATE, render_context)) return fragment - def _render_child_fragment(self, child, context, view='student_view'): + def _render_child_fragment(self, child: XBlock, context: dict[str, t.Any], view: str = 'student_view') -> Fragment: """ Helper method to overcome html block rendering quirks """ @@ -504,7 +499,7 @@ def _render_child_fragment(self, child, context, view='student_view'): if child.scope_ids.block_type == 'html' and getattr(self.runtime, 'is_author_mode', False): # html block doesn't support preview_view, and if we use student_view Studio will wrap # it in HTML that we don't want in the preview. So just render its HTML directly: - child_fragment = Fragment(child.data) + child_fragment = Fragment(child.data) # type: ignore[attr-defined] else: child_fragment = child.render('student_view', context) diff --git a/xblock/validation.py b/xblock/validation.py index b3c189a6b..2e25fa7a8 100644 --- a/xblock/validation.py +++ b/xblock/validation.py @@ -1,6 +1,9 @@ """ Validation information for an xblock instance. """ +from __future__ import annotations + +import typing as t class ValidationMessage: @@ -13,13 +16,13 @@ class ValidationMessage: TYPES = [WARNING, ERROR] - def __init__(self, message_type, message_text): + def __init__(self, message_type: str, message_text: str): """ Create a new message. Args: - message_type (unicode): The type associated with this message. Must be included in `TYPES`. - message_text (unicode): The textual message. + message_type: The type associated with this message. Must be included in `TYPES`. + message_text: The textual message. """ if message_type not in self.TYPES: raise TypeError("Unknown message_type: " + message_type) @@ -28,7 +31,7 @@ def __init__(self, message_type, message_text): self.type = message_type self.text = message_text - def to_json(self): + def to_json(self) -> dict[str, str]: """ Convert to a json-serializable representation. @@ -49,18 +52,18 @@ class Validation: where `True` signifies that the xblock passes validation. """ - def __init__(self, xblock_id): + def __init__(self, xblock_id: object): """ Create a `Validation` instance. Args: - xblock_id (object): An identification object that must support conversion to unicode. + xblock_id: An identification object that must support conversion to unicode. """ - self.messages = [] + self.messages: list[ValidationMessage] = [] self.xblock_id = xblock_id @property - def empty(self): + def empty(self) -> bool: """ Is this object empty (contains no messages)? @@ -69,42 +72,42 @@ def empty(self): """ return not self.messages - def __bool__(self): + def __bool__(self) -> bool: """ Extended to return True if `empty` returns True Returns: - bool: True iff this instance has no validation issues. + True iff this instance has no validation issues. """ return self.empty __nonzero__ = __bool__ - def add(self, message): + def add(self, message: ValidationMessage) -> None: """ Add a new validation message to this instance. Args: - message (ValidationMessage): A validation message to add to this instance's list of messages. + message: A validation message to add to this instance's list of messages. """ if not isinstance(message, ValidationMessage): raise TypeError("Argument must of type ValidationMessage") self.messages.append(message) - def add_messages(self, validation): + def add_messages(self, validation: Validation) -> None: """ Adds all the messages in the specified `Validation` object to this instance's messages array. Args: - validation (Validation): An object containing the messages to add to this instance's messages. + validation: An object containing the messages to add to this instance's messages. """ if not isinstance(validation, Validation): raise TypeError("Argument must be of type Validation") self.messages.extend(validation.messages) - def to_json(self): + def to_json(self) -> dict[str, t.Any]: """ Convert to a json-serializable representation.