diff --git a/.editorconfig b/.editorconfig index 6a1ab459..f8320761 100644 --- a/.editorconfig +++ b/.editorconfig @@ -6,4 +6,4 @@ indent_size = 4 end_of_line = lf insert_final_newline = true trim_trailing_whitespace = true -max_line_length = 79 +max_line_length = 140 diff --git a/SpiffWorkflow/task.py b/SpiffWorkflow/task.py index bf0d0f58..26a51d96 100644 --- a/SpiffWorkflow/task.py +++ b/SpiffWorkflow/task.py @@ -21,10 +21,15 @@ import logging import time -from uuid import uuid4 +from typing import Optional, Any, TYPE_CHECKING +from uuid import uuid4, UUID -from .util.task import TaskState, TaskFilter, TaskIterator -from .util.deep_merge import DeepMerge +if TYPE_CHECKING: + from . import Workflow + +from .specs import StartTask +from .specs.base import TaskSpec +from .util.task import TaskState, TaskIterator from .exceptions import WorkflowException logger = logging.getLogger('spiff.task') @@ -34,17 +39,17 @@ class Task(object): """Used internally for composing a tree that represents possible paths through a Workflow. Attributes: - id (UUID): a unique identifierfor this task - workflow (`Workflow`): the workflow associated with this task - parent (`Task`): the parent of this task - children (list(`Task`)): the children of this task - triggered (bool): True if the task is not part of output specification of the task spec - task_spec (`TaskSpec`): the spec associated with this task - thread_id (int): a thread id for this task - data (dict): a dictionary containing data for this task - internal_data (dict): a dictionary containing information relevant to the task state or execution - last_state_change (float): the timestamp when this task last changed state - thread_id (int): a thread identifier + id: A unique identifier for this task. + workflow: The workflow associated with this task. + parent: The parent of this task. + children (list(`Task`)): The children of this task. + triggered (bool): True if the task is not part of output specification of the task spec. + task_spec: The spec associated with this task. + thread_id (int): A thread id for this task. + data (dict): A dictionary containing data for this task. + internal_data (dict): A dictionary containing information relevant to the task state or execution. + last_state_change (float): The timestamp when this task last changed state. + thread_id (int): A thread identifier. Note: The `thread_id` attribute might be more accurately named `branch_id`, as it pertains only to workflow @@ -58,16 +63,23 @@ class Task(object): thread_id_pool = 0 # Pool for assigning a unique thread id to every new Task. - def __init__(self, workflow, task_spec, parent=None, state=TaskState.MAYBE, id=None): + def __init__( + self, + workflow: "Workflow", + task_spec: Optional[TaskSpec], + parent: Optional["Task"] = None, + state: int = TaskState.MAYBE, + id: Optional[UUID] = None + ): """ Args: - workflow (`Workflow`): the workflow this task should be added to - task_spec (`TaskSpec`): the spec associated with this task + workflow: The workflow this task should be added to. + task_spec: The spec associated with this task. Keyword Args: - parent (`Task`): the parent of this task - state (`TaskState`): the state of this task (default MAYBE) - id: an optional id (defaults to a random UUID) + parent: The parent of this task. + state: The state of this task (default MAYBE). + id: An optional id (defaults to a random UUID). """ self.id = id or uuid4() workflow.tasks[self.id] = self @@ -78,18 +90,18 @@ def __init__(self, workflow, task_spec, parent=None, state=TaskState.MAYBE, id=N self._state = state self.triggered = False - self.task_spec = task_spec + self.task_spec = task_spec if task_spec else StartTask(workflow.spec) self.thread_id = self.__class__.thread_id_pool self.data = {} self.internal_data = {} self.last_state_change = time.time() - if parent is not None: - self.parent._child_added_notify(self) + if _parent := self.parent: + _parent._child_added_notify(self) @property - def state(self): - """`TaskState`: this task's state + def state(self) -> int: + """`This task's state. Raises: `WorkflowException`: If setting the state results in a "backwards" state change. @@ -97,7 +109,7 @@ def state(self): return self._state @state.setter - def state(self, value): + def state(self, value: int) -> None: if value < self._state: raise WorkflowException( 'state went from %s to %s!' % (TaskState.get_name(self._state), TaskState.get_name(value)), @@ -106,26 +118,26 @@ def state(self, value): self._set_state(value) @property - def parent(self): - """`Task`: This task's parent task""" + def parent(self) -> Optional["Task"]: + """This task's parent task.""" return self.workflow.tasks.get(self._parent) @parent.setter - def parent(self, task): + def parent(self, task: Optional["Task"]) -> None: self._parent = task.id if task is not None else None - + @property - def children(self): - """list(`Task`): This task's child tasks""" + def children(self) -> list["Task"]: + """This task's child tasks.""" return [self.workflow.tasks.get(child) for child in self._children] - + @children.setter - def children(self, tasks): + def children(self, tasks: list["Task"]) -> None: self._children = [child.id for child in tasks] @property - def depth(self): - """The task's depth""" + def depth(self) -> int: + """The task's depth.""" depth = 0 task = self.parent while task is not None: @@ -133,46 +145,46 @@ def depth(self): task = task.parent return depth - def has_state(self, state): + def has_state(self, state: int) -> bool: """Check the state of this task. Args: - state (`TaskState`): the state to check + state (`TaskState`): The state to check. Returns: - bool: `True` is the task has the state or mask + `True` is the task has the state or mask. """ return (self._state & state) != 0 - def set_data(self, **kwargs): + def set_data(self, **kwargs) -> None: """Defines the given attribute/value pairs in this task's data.""" self.data.update(kwargs) - - def get_data(self, name, default=None): + + def get_data(self, name: str, default: Optional[Any] = None): """Returns the value of the data field with the given name, or the given default value if the data field does not exist. Args: - name (str): the dictionary key to return - default (obj): a default value to return if the key does not exist + name: The dictionary key to return. + default: A default value to return if the key does not exist. Returns: the value of the key, or the default """ return self.data.get(name, default) - def reset_branch(self, data): - """Removes all descendendants of this task and set this task to be runnable. + def reset_branch(self, data: Optional[dict]) -> list["Task"]: + """Removes all descendants of this task and set this task to be runnable. Args: - data (dict): set the task data to these values (if None, inherit from parent task) + data: Set the task data to these values (if None, inherit from parent task). Returns: - list(`Task`): tasks removed from the tree + Tasks removed from the tree. """ logger.info(f'Branch reset', extra=self.collect_log_extras()) self.internal_data = {} - self.data = deepcopy(self.parent.data) if data is None else data + self.data = deepcopy(self.parent.data) if data is None else data descendants = [t for t in self] self._drop_children(force=True) self._set_state(TaskState.FUTURE) @@ -180,11 +192,11 @@ def reset_branch(self, data): self.task_spec._update(self) return descendants[1:] if len(descendants) > 1 else [] - def is_descendant_of(self, task): + def is_descendant_of(self, task: "Task") -> bool: """Checks whether a task is an ancestor of this task. Args: - task (`Task`): the potential ancestor + task: The potential ancestor. Returns: bool: whether the task is an ancestor of this task @@ -195,14 +207,14 @@ def is_descendant_of(self, task): return True return self.parent.is_descendant_of(task) - def find_ancestor(self, spec_name): + def find_ancestor(self, spec_name: str) -> Optional["Task"]: """Search for an ancestor that has a task with a spec of the specified name. Args: - spec_name (str): the name of the spec associated with the task + spec_name: The name of the spec associated with the task. Returns: - `Task`: the first result (or None, if no matching task was found) + The first result (or None, if no matching task was found). """ if self.parent is None: return None @@ -210,18 +222,18 @@ def find_ancestor(self, spec_name): return self.parent return self.parent.find_ancestor(spec_name) - def _add_child(self, task_spec, state=TaskState.MAYBE): + def _add_child(self, task_spec: TaskSpec, state: int = TaskState.MAYBE) -> "Task": """Adds a new child and assigns the given TaskSpec to it. Args: - task_spec (`TaskSpec`): the spec associated with the child task - state (`TaskState`): the state to assign + task_spec: The spec associated with the child task. + state: The state to assign. Returns: - `Task`: the new child + The new child `Task`. Raises: - `WorkflowException`: if an invalid task task addition occurs + `WorkflowException`: if an invalid task addition occurs """ if self.has_state(TaskState.PREDICTED_MASK) and state & TaskState.PREDICTED_MASK == 0: raise WorkflowException('Attempt to add non-predicted child to predicted task', task_spec=self.task_spec) @@ -231,9 +243,9 @@ def _add_child(self, task_spec, state=TaskState.MAYBE): task._ready() return task - def _sync_children(self, task_specs, state=TaskState.MAYBE): - """Syncs the task's children with the given list of task specs. - + def _sync_children(self, task_specs: list[TaskSpec], state: int = TaskState.MAYBE) -> None: + """Syncs the task's children with the given list of task specs. + - Add one child for each given `TaskSpec`, unless that child already exists. - Remove all children for which there is no spec in the given list, unless it is a "triggered" task. @@ -241,8 +253,8 @@ def _sync_children(self, task_specs, state=TaskState.MAYBE): It is an error if the task has a non-predicted child that is not given in the TaskSpecs. Args: - task_spec (list(`TaskSpec`)): the list of task specs that may become children - state (`TaskState`): the state to assign + task_specs: The list of task specs that may become children. + state: The state to assign. """ if task_specs is None: raise ValueError('"task_specs" argument is None') @@ -271,12 +283,12 @@ def _sync_children(self, task_specs, state=TaskState.MAYBE): for task_spec in new_children: self._add_child(task_spec, state) - def _child_added_notify(self, child): + def _child_added_notify(self, child: "Task") -> None: """Called by another task to let us know that a child was added.""" self._children.append(child.id) - def _drop_children(self, force=False): - """Remove this task's children from the tree""" + def _drop_children(self, force: bool = False): + """Remove this task's children from the tree.""" drop = [] for child in self.children: @@ -288,7 +300,7 @@ def _drop_children(self, force=False): for task in drop: self.workflow._remove_task(task.id) - def _set_state(self, value): + def _set_state(self, value: int) -> None: """Force set the state on a task""" if value != self.state: @@ -302,7 +314,7 @@ def _set_state(self, value): else: logger.debug(f'State set to {TaskState.get_name(value)}', extra=self.collect_log_extras()) - def _assign_new_thread_id(self, recursive=True): + def _assign_new_thread_id(self, recursive: bool = True) -> int: """Assigns a new thread id to the task.""" self.__class__.thread_id_pool += 1 @@ -313,26 +325,26 @@ def _assign_new_thread_id(self, recursive=True): child.thread_id = self.thread_id return self.thread_id - def _inherit_data(self): + def _inherit_data(self) -> None: """Copies the data from the parent.""" self.set_data(**deepcopy(self.parent.data)) - def _set_internal_data(self, **kwargs): + def _set_internal_data(self, **kwargs) -> None: """Defines the given attribute/value pairs in this task's internal data.""" self.internal_data.update(kwargs) - def _get_internal_data(self, name, default=None): - """Retrieves an internal data field""" + def _get_internal_data(self, name: str, default: Optional[Any] = None) -> Optional[Any]: + """Retrieves an internal data field.""" return self.internal_data.get(name, default) - def _ready(self): + def _ready(self) -> None: """Marks the task as ready for execution.""" if self.has_state(TaskState.COMPLETED) or self.has_state(TaskState.CANCELLED): return self._set_state(TaskState.READY) self.task_spec._on_ready(self) - def run(self): + def run(self) -> Optional[bool]: """Execute the task. Call's the task spec's `TaskSpec.run` method and checks the return value. @@ -343,7 +355,7 @@ def run(self): - `None`: mark the task STARTED Returns: - bool: the value returned by the `TaskSpec`'s run method + The value returned by the `TaskSpec`'s run method. See `TaskState` for more information about states. """ @@ -357,7 +369,7 @@ def run(self): self.complete() return retval - def cancel(self): + def cancel(self) -> None: """Cancels the item if it was not yet completed; recursively cancels its children.""" if self.has_state(TaskState.FINISHED_MASK): for child in self.children: @@ -367,24 +379,24 @@ def cancel(self): self._drop_children() self.task_spec._on_cancel(self) - def complete(self): + def complete(self) -> None: """Marks this task complete.""" self._set_state(TaskState.COMPLETED) self.task_spec._on_complete(self) - def error(self): + def error(self) -> None: """Marks this task as error.""" self._set_state(TaskState.ERROR) self.task_spec._on_error(self) - def trigger(self, *args): + def trigger(self, *args) -> None: """Call the `TaskSpec`'s trigger method. Args are passed directly to the task spec. """ self.task_spec._on_trigger(self, *args) - def collect_log_extras(self, dct=None): + def collect_log_extras(self, dct: Optional[dict] = None) -> dict: """Return logging details for this task""" extra = { 'workflow_spec': self.workflow.spec.name, @@ -405,19 +417,19 @@ def collect_log_extras(self, dct=None): }) return extra - def __iter__(self): + def __iter__(self) -> TaskIterator: return TaskIterator(self) - def __repr__(self): + def __repr__(self) -> str: return f'' # I will probably remove these methods at some point because I hate them - def get_dump(self, indent=0, recursive=True): + def get_dump(self, indent: int = 0, recursive: bool = True) -> str: """Returns the subtree as a string for debugging. Returns: - str: a tree view of the task (and optionally its children) + A tree view of the task (and optionally its children). """ dbg = (' ' * indent * 2) dbg += '%s/' % self.id @@ -432,6 +444,6 @@ def get_dump(self, indent=0, recursive=True): dbg += '\n' + child.get_dump(indent + 1) return dbg - def dump(self, indent=0): + def dump(self, indent: int = 0) -> None: """Prints the subtree as a string for debugging.""" - print(self.get_dump()) + print(self.get_dump(indent)) diff --git a/SpiffWorkflow/workflow.py b/SpiffWorkflow/workflow.py index a67d5f7b..be260411 100644 --- a/SpiffWorkflow/workflow.py +++ b/SpiffWorkflow/workflow.py @@ -18,20 +18,24 @@ # 02110-1301 USA import logging +from typing import Optional, Any +from uuid import UUID +from .serializer.base import Serializer +from .specs import WorkflowSpec from .task import Task from .util.task import TaskState, TaskIterator, TaskFilter from .util.compat import mutex from .util.event import Event -from .exceptions import TaskNotFoundException, WorkflowException +from .exceptions import TaskNotFoundException logger = logging.getLogger('spiff.workflow') -class Workflow(object): +class Workflow: """The instantiation of a `WorkflowSpec`. - Reprsents the state of a running workflow and its data. + Represents the state of a running workflow and its data. Attributes: spec (`WorkflowSpec`): the spec that describes this workflow instance @@ -44,11 +48,15 @@ class Workflow(object): completed_event (`Event`): an event holding callbacks to be run when the workflow completes """ - def __init__(self, workflow_spec, deserializing=False): + def __init__( + self, + workflow_spec: WorkflowSpec, + deserializing: bool = False, + ) -> None: """ Parameters: - workflow_spec (`WorkflowSpec`): the spec that describes this workflow - deserializing (bool): whether this workflow is being deserialized + workflow_spec: The spec that describes this workflow. + deserializing: Whether this workflow is being deserialized. """ self.spec = workflow_spec self.data = {} @@ -67,110 +75,110 @@ def __init__(self, workflow_spec, deserializing=False): logger.info('Initialized workflow', extra=self.collect_log_extras()) self.task_tree._ready() - def is_completed(self): + def is_completed(self) -> bool: """Checks whether the workflow is complete. Returns: - bool: True if the workflow has no unfinished tasks + True if the workflow has no unfinished tasks. """ if not self.completed: - iter = TaskIterator(self.task_tree, state=TaskState.NOT_FINISHED_MASK) + _iter = TaskIterator(self.task_tree, state=TaskState.NOT_FINISHED_MASK) try: - next(iter) + next(_iter) except StopIteration: self.completed = True return self.completed - def manual_input_required(self): + def manual_input_required(self) -> bool: """Checks whether the workflow requires manual input. Returns: - bool: True if the workflow cannot proceed until manual tasks are complete + True if the workflow cannot proceed until manual tasks are complete. """ - iter = TaskIterator(self.task_tree, state=TaskState.READY, manual=False) + _iter = TaskIterator(self.task_tree, state=TaskState.READY, manual=False) try: - next(iter) + next(_iter) except StopIteration: return True return False - def get_tasks(self, first_task=None, **kwargs): + def get_tasks(self, first_task: Optional[Task] = None, **kwargs) -> list[Task]: """Returns a list of `Task`s that meet the conditions specified `kwargs`, starting from the root by default. Notes: - Keyword args are passed directly to `get_tasks_iterator` + Keyword args are passed directly to `get_tasks_iterator`. Returns: - list(`Task`): the tasks that match the filtering conditions + The tasks that match the filtering conditions. """ - return [t for t in self.get_tasks_iterator(first_task, **kwargs)] + return [task for task in self.get_tasks_iterator(first_task, **kwargs)] - def get_next_task(self, first_task=None, **kwargs): + def get_next_task(self, first_task: Optional[Task] = None, **kwargs) -> Optional[Task]: """Returns the next task that meets the iteration conditions, starting from the root by default. Parameters: - first_task (`Task`): search beginning from this task + first_task: Search beginning from this task. Notes: - Other keyword args are passed directly into `get_tasks_iterator` + Other keyword args are passed directly into `get_tasks_iterator`. Returns: - `Task` or None: the first task that meets the conditions or None if no tasks match + The first task that meets the conditions or None if no tasks match """ - iter = self.get_tasks_iterator(first_task, **kwargs) + _iter = self.get_tasks_iterator(first_task, **kwargs) try: - return next(iter) + return next(_iter) except StopIteration: return None - def get_tasks_iterator(self, first_task=None, **kwargs): + def get_tasks_iterator(self, first_task: Optional[Task] = None, **kwargs) -> TaskIterator: """Returns an iterator of Tasks that meet the conditions specified `kwargs`, starting from the root by default. Parameters: - first_task (`Task`): search beginning from this task + first_task: Search beginning from this task. Notes: - Other keyword args are passed directly into `TaskIterator` + Other keyword args are passed directly into `TaskIterator`. Returns: - `TaskIterator`: an iterator over the matching tasks + An iterator over the matching tasks. """ return TaskIterator(first_task or self.task_tree, **kwargs) - def get_task_from_id(self, task_id): + def get_task_from_id(self, task_id: UUID) -> Task: """Returns the task with the given id. Args: - task_id: the id of the task to run + task_id: The id of the task to run. Returns: - `Task`: the task + The task. Raises: `TaskNotFoundException`: if the task does not exist """ if task_id not in self.tasks: raise TaskNotFoundException(f'A task with id {task_id} was not found', task_spec=self.spec) - return self.tasks.get(task_id) + return self.tasks[task_id] - def run_task_from_id(self, task_id): + def run_task_from_id(self, task_id: UUID) -> Optional[bool]: """Runs the task with the given id. Args: - task_id: the id of the task to run + task_id: The id of the task to run. """ task = self.get_task_from_id(task_id) return task.run() - def run_next(self, use_last_task=True, halt_on_manual=True): + def run_next(self, use_last_task: bool = True, halt_on_manual: bool = True) -> Optional[bool]: """Runs the next task, starting from the branch containing the last completed task by default. Parameters: - use_last_task (bool): start with the currently running branch - halt_on_manual (bool): do not run tasks with `TaskSpec`s that have the `manual` attribute set + use_last_task: Start with the currently running branch. + halt_on_manual: Do not run tasks with `TaskSpec`s that have the `manual` attribute set. Returns: - bool: True when a task runs sucessfully + True when a task runs successfully. """ first_task = self.last_task if use_last_task and self.last_task is not None else self.task_tree task_filter = TaskFilter( @@ -188,29 +196,29 @@ def run_next(self, use_last_task=True, halt_on_manual=True): else: return task.run() - def run_all(self, use_last_task=True, halt_on_manual=True): + def run_all(self, use_last_task: bool = True, halt_on_manual: bool = True) -> None: """Runs all possible tasks, starting from the current branch by default. Parameters: - use_last_task (bool): start with the currently running branch - halt_on_manual (bool): do not run tasks with `TaskSpec`s that have the `manual` attribute set + use_last_task: Start with the currently running branch. + halt_on_manual: Do not run tasks with `TaskSpec`s that have the `manual` attribute set. """ while self.run_next(use_last_task, halt_on_manual): pass - def update_waiting_tasks(self): + def update_waiting_tasks(self) -> None: """Update all tasks in the WAITING state""" for task in TaskIterator(self.task_tree, state=TaskState.WAITING): task.task_spec._update(task) - def cancel(self, success=False): + def cancel(self, success: bool = False) -> list[Task]: """Cancels all open tasks in the workflow. Args: - success (bool): the state of the workflow + success: The state of the workflow. Returns: - list(`Task`): the cancelled tasks + The cancelled tasks. """ self.success = success self.completed = True @@ -222,39 +230,39 @@ def cancel(self, success=False): task.cancel() return cancelled - def set_data(self, **kwargs): + def set_data(self, **kwargs) -> None: """Defines the given attribute/value pairs.""" self.data.update(kwargs) - def get_data(self, name, default=None): + def get_data(self, name: str, default: Optional[Any] = None) -> Optional[Any]: """Returns the value of the data field with the given name, or the given default value if the data field does not exist. Args: - name (str): the dictionary key to return - default (obj): a default value to return if the key does not exist + name: The dictionary key to return. + default: A default value to return if the key does not exist. Returns: - the value of the key, or the default + The value of the key, or the default. """ return self.data.get(name, default) - def reset_from_task_id(self, task_id, data=None): - """Removed all descendendants of this task and set this task to be runnable. + def reset_from_task_id(self, task_id: UUID, data: Optional[dict] = None) -> list[Task]: + """Removed all descendants of this task and set this task to be runnable. Args: - task_id: the id of the task to reset to - data (dict): optionally replace the data (if None, data will be copied from the parent task) + task_id: The id of the task to reset to. + data: Optionally replace the data (if None, data will be copied from the parent task). - Returns: extra.update( - list(`Task`): tasks removed from the tree + Returns: + Tasks removed from the tree. """ task = self.get_task_from_id(task_id) self.last_task = task.parent return task.reset_branch(data) - def collect_log_extras(self, dct=None): - """Return logging details for this workflow""" + def collect_log_extras(self, dct: Optional[dict] = None) -> dict: + """Return logging details for this workflow.""" extra = dct or {} extra.update({ 'workflow_spec': self.spec.name, @@ -265,13 +273,13 @@ def collect_log_extras(self, dct=None): extra.update({'tasks': [t.id for t in Workflow.get_tasks(self)]}) return extra - def _predict(self, mask=TaskState.NOT_FINISHED_MASK): - """Predict tasks with the provided mask""" + def _predict(self, mask: int = TaskState.NOT_FINISHED_MASK) -> None: + """Predict tasks with the provided mask.""" for task in Workflow.get_tasks(self, state=TaskState.NOT_FINISHED_MASK): task.task_spec._predict(task, mask=mask) - def _task_completed_notify(self, task): - """Called whenever a task completes""" + def _task_completed_notify(self, task: Task) -> None: + """Called whenever a task completes.""" self.last_task = task if task.task_spec.name == 'End': self._mark_complete(task) @@ -280,29 +288,29 @@ def _task_completed_notify(self, task): else: self.update_waiting_tasks() - def _remove_task(self, task_id): + def _remove_task(self, task_id: UUID) -> None: task = self.tasks[task_id] - for child in task.children: + for child in task.children: self._remove_task(child.id) task.parent._children.remove(task.id) self.tasks.pop(task_id) - def _mark_complete(self, task): + def _mark_complete(self, task: Task) -> None: logger.info('Workflow completed', extra=self.collect_log_extras()) self.data.update(task.data) self.completed = True - def _get_mutex(self, name): - """Get or create a mutex""" + def _get_mutex(self, name: str) -> mutex: + """Get or create a mutex.""" if name not in self.locks: self.locks[name] = mutex() return self.locks[name] - def get_task_mapping(self): + def get_task_mapping(self) -> dict: """I don't know that this does. Seriously, this returns a mapping of thread ids to tasks in that thread. It can be used to identify - tasks by branch and use this information for decision making (despite the flawed implementation + tasks by branch and use this information for decision-making (despite the flawed implementation mechanism; IMO, this should be maintained by the workflow rather than a class attribute). """ task_mapping = {} @@ -314,19 +322,19 @@ def get_task_mapping(self): task_mapping[task.thread_id] = thread_task_mapping return task_mapping - def get_dump(self): + def get_dump(self) -> str: """Returns a string representation of the task tree. Returns: - str: a tree view of the current workflow state + A tree view of the current workflow state. """ return self.task_tree.get_dump() - def dump(self): - """Print a dump of the current task tree""" + def dump(self) -> None: + """Print a dump of the current task tree.""" print(self.task_tree.dump()) - def serialize(self, serializer, **kwargs): + def serialize(self, serializer: Serializer, **kwargs) -> Any: """ Serializes a Workflow instance using the provided serializer. @@ -340,7 +348,7 @@ def serialize(self, serializer, **kwargs): return serializer.serialize_workflow(self, **kwargs) @classmethod - def deserialize(cls, serializer, s_state, **kwargs): + def deserialize(cls, serializer: Serializer, s_state: Any, **kwargs) -> "Workflow": """ Deserializes a Workflow instance using the provided serializer.