diff --git a/.github/workflows/test-python-package.yml b/.github/workflows/test-python-package.yml index fb6e9f6d..64c3def3 100644 --- a/.github/workflows/test-python-package.yml +++ b/.github/workflows/test-python-package.yml @@ -2,9 +2,8 @@ name: Test pyzeebe on: push: - branches: [ master, development, feature/*, bugfix/*, maintenance/*, pre-release/* ] pull_request: - branches: [ master, development, feature/*, bugfix/*, maintenance/*, pre-release/* ] + branches: [ master, development ] jobs: build: diff --git a/.github/workflows/test-zeebe-integration.yml b/.github/workflows/test-zeebe-integration.yml index de0b5705..6fd8623a 100644 --- a/.github/workflows/test-zeebe-integration.yml +++ b/.github/workflows/test-zeebe-integration.yml @@ -2,9 +2,8 @@ name: Integration test pyzeebe on: push: - branches: [ master, development, feature/*, bugfix/*, maintenance/*, pre-release/* ] pull_request: - branches: [ master, development, feature/*, bugfix/*, maintenance/*, pre-release/* ] + branches: [ master, development ] jobs: test: diff --git a/docs/exceptions.rst b/docs/exceptions.rst index eb4cbfc4..3f74518d 100644 --- a/docs/exceptions.rst +++ b/docs/exceptions.rst @@ -1,43 +1,43 @@ ========== -Exceptions +errors ========== -All ``pyzeebe`` exceptions inherit from :py:class:`PyZeebeException` +All ``pyzeebe`` exceptions inherit from :py:class:`PyZeebeError` -.. autoexception:: pyzeebe.exceptions.PyZeebeException +.. autoexception:: pyzeebe.errors.PyZeebeError -.. autoexception:: pyzeebe.exceptions.TaskNotFound +.. autoexception:: pyzeebe.errors.TaskNotFoundError -.. autoexception:: pyzeebe.exceptions.NoVariableNameGiven +.. autoexception:: pyzeebe.errors.NoVariableNameGivenError -.. autoexception:: pyzeebe.exceptions.NoZeebeAdapter +.. autoexception:: pyzeebe.errors.NoZeebeAdapterError -.. autoexception:: pyzeebe.exceptions.DuplicateTaskType +.. autoexception:: pyzeebe.errors.DuplicateTaskTypeError -.. autoexception:: pyzeebe.exceptions.ActivateJobsRequestInvalid +.. autoexception:: pyzeebe.errors.ActivateJobsRequestInvalidError -.. autoexception:: pyzeebe.exceptions.JobAlreadyDeactivated +.. autoexception:: pyzeebe.errors.JobAlreadyDeactivatedError -.. autoexception:: pyzeebe.exceptions.JobNotFound +.. autoexception:: pyzeebe.errors.JobNotFoundError -.. autoexception:: pyzeebe.exceptions.MessageAlreadyExists +.. autoexception:: pyzeebe.errors.MessageAlreadyExistsError -.. autoexception:: pyzeebe.exceptions.WorkflowNotFound +.. autoexception:: pyzeebe.errors.WorkflowNotFoundError -.. autoexception:: pyzeebe.exceptions.WorkflowInstanceNotFound +.. autoexception:: pyzeebe.errors.WorkflowInstanceNotFoundError -.. autoexception:: pyzeebe.exceptions.WorkflowHasNoStartEvent +.. autoexception:: pyzeebe.errors.WorkflowHasNoStartEventError -.. autoexception:: pyzeebe.exceptions.WorkflowInvalid +.. autoexception:: pyzeebe.errors.WorkflowInvalidError -.. autoexception:: pyzeebe.exceptions.InvalidJSON +.. autoexception:: pyzeebe.errors.InvalidJSONError -.. autoexception:: pyzeebe.exceptions.ZeebeBackPressure +.. autoexception:: pyzeebe.errors.ZeebeBackPressureError -.. autoexception:: pyzeebe.exceptions.ZeebeGatewayUnavailable +.. autoexception:: pyzeebe.errors.ZeebeGatewayUnavailableError -.. autoexception:: pyzeebe.exceptions.ZeebeInternalError +.. autoexception:: pyzeebe.errors.ZeebeInternalError -.. autoexception:: pyzeebe.exceptions.InvalidOAuthCredentials +.. autoexception:: pyzeebe.errors.InvalidOAuthCredentialsError -.. autoexception:: pyzeebe.exceptions.InvalidCamundaCloudCredentials +.. autoexception:: pyzeebe.errors.InvalidCamundaCloudCredentialsError diff --git a/docs/worker_reference.rst b/docs/worker_reference.rst index 8ac5e46a..4cb774f8 100644 --- a/docs/worker_reference.rst +++ b/docs/worker_reference.rst @@ -2,9 +2,10 @@ Worker Reference ================ -The :py:class:`ZeebeTaskHandler` class from which both :py:class:`ZeebeWorker` and :py:class:`ZeebeTaskRouter` inherit. +The :py:class:`ZeebeWorker` class inherits from :py:class:`ZeebeTaskRouter` class. +This means that all methods that :py:class:`ZeebeTaskRouter` has will also appear in :py:class:`ZeebeWorker`. -.. autoclass:: pyzeebe.worker.task_handler.ZeebeTaskHandler +.. autoclass:: pyzeebe.ZeebeTaskRouter :members: :undoc-members: @@ -13,9 +14,6 @@ The :py:class:`ZeebeTaskHandler` class from which both :py:class:`ZeebeWorker` a :members: :undoc-members: -.. autoclass:: pyzeebe.ZeebeTaskRouter - :members: - :undoc-members: .. autoclass:: pyzeebe.Job :members: diff --git a/pyzeebe/__init__.py b/pyzeebe/__init__.py index bb4ae620..27461110 100644 --- a/pyzeebe/__init__.py +++ b/pyzeebe/__init__.py @@ -1,12 +1,13 @@ __version__ = "2.3.1" -from pyzeebe import exceptions +from pyzeebe import errors from pyzeebe.client.client import ZeebeClient from pyzeebe.credentials.camunda_cloud_credentials import CamundaCloudCredentials from pyzeebe.credentials.oauth_credentials import OAuthCredentials from pyzeebe.job.job import Job from pyzeebe.job.job_status import JobStatus from pyzeebe.task.exception_handler import ExceptionHandler -from pyzeebe.task.task_decorator import TaskDecorator -from pyzeebe.worker.task_router import ZeebeTaskRouter +from pyzeebe.task.task_config import TaskConfig +from pyzeebe.task.types import TaskDecorator +from pyzeebe.worker.task_router import ZeebeTaskRouter, default_exception_handler from pyzeebe.worker.worker import ZeebeWorker diff --git a/pyzeebe/client/client.py b/pyzeebe/client/client.py index c9973835..eeefb74e 100644 --- a/pyzeebe/client/client.py +++ b/pyzeebe/client/client.py @@ -35,11 +35,11 @@ def run_workflow(self, bpmn_process_id: str, variables: Dict = None, version: in int: workflow_instance_key, the unique id of the running workflow generated by Zeebe. Raises: - WorkflowNotFound: No workflow with bpmn_process_id exists - InvalidJSON: variables is not JSONable - WorkflowHasNoStartEvent: The specified workflow does not have a start event - ZeebeBackPressure: If Zeebe is currently in back pressure (too many requests) - ZeebeGatewayUnavailable: If the Zeebe gateway is unavailable + WorkflowNotFoundError: No workflow with bpmn_process_id exists + InvalidJSONError: variables is not JSONable + WorkflowHasNoStartEventError: The specified workflow does not have a start event + ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) + ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable ZeebeInternalError: If Zeebe experiences an internal error """ @@ -62,11 +62,11 @@ def run_workflow_with_result(self, bpmn_process_id: str, variables: Dict = None, tuple: (The workflow instance key, A dictionary of the end state of the workflow instance) Raises: - WorkflowNotFound: No workflow with bpmn_process_id exists - InvalidJSON: variables is not JSONable - WorkflowHasNoStartEvent: The specified workflow does not have a start event - ZeebeBackPressure: If Zeebe is currently in back pressure (too many requests) - ZeebeGatewayUnavailable: If the Zeebe gateway is unavailable + WorkflowNotFoundError: No workflow with bpmn_process_id exists + InvalidJSONError: variables is not JSONable + WorkflowHasNoStartEventError: The specified workflow does not have a start event + ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) + ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable ZeebeInternalError: If Zeebe experiences an internal error """ @@ -86,9 +86,9 @@ def cancel_workflow_instance(self, workflow_instance_key: int) -> int: int: The workflow_instance_key Raises: - WorkflowInstanceNotFound: If no workflow instance with workflow_instance_key exists - ZeebeBackPressure: If Zeebe is currently in back pressure (too many requests) - ZeebeGatewayUnavailable: If the Zeebe gateway is unavailable + WorkflowInstanceNotFoundError: If no workflow instance with workflow_instance_key exists + ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) + ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable ZeebeInternalError: If Zeebe experiences an internal error """ @@ -103,9 +103,9 @@ def deploy_workflow(self, *workflow_file_path: str) -> None: workflow_file_path (str): The file path to a workflow definition file (bpmn/yaml) Raises: - WorkflowInvalid: If one of the workflow file definitions is invalid - ZeebeBackPressure: If Zeebe is currently in back pressure (too many requests) - ZeebeGatewayUnavailable: If the Zeebe gateway is unavailable + WorkflowInvalidError: If one of the workflow file definitions is invalid + ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) + ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable ZeebeInternalError: If Zeebe experiences an internal error """ @@ -125,9 +125,9 @@ def publish_message(self, name: str, correlation_key: str, variables: Dict = Non active, a MessageAlreadyExists will be raised. Raises: - MessageAlreadyExist: If a message with message_id already exists - ZeebeBackPressure: If Zeebe is currently in back pressure (too many requests) - ZeebeGatewayUnavailable: If the Zeebe gateway is unavailable + MessageAlreadyExistError: If a message with message_id already exists + ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) + ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable ZeebeInternalError: If Zeebe experiences an internal error """ diff --git a/pyzeebe/credentials/camunda_cloud_credentials.py b/pyzeebe/credentials/camunda_cloud_credentials.py index 3758b2c8..40e6477f 100644 --- a/pyzeebe/credentials/camunda_cloud_credentials.py +++ b/pyzeebe/credentials/camunda_cloud_credentials.py @@ -1,5 +1,5 @@ from pyzeebe.credentials.oauth_credentials import OAuthCredentials -from pyzeebe.exceptions import InvalidOAuthCredentials, InvalidCamundaCloudCredentials +from pyzeebe.errors import InvalidOAuthCredentialsError, InvalidCamundaCloudCredentialsError class CamundaCloudCredentials(OAuthCredentials): @@ -7,8 +7,8 @@ def __init__(self, client_id: str, client_secret: str, cluster_id: str): try: super().__init__(url="https://login.cloud.camunda.io/oauth/token", client_id=client_id, client_secret=client_secret, audience=f"{cluster_id}.zeebe.camunda.io") - except InvalidOAuthCredentials: - raise InvalidCamundaCloudCredentials(client_id=client_id, cluster_id=cluster_id) + except InvalidOAuthCredentialsError: + raise InvalidCamundaCloudCredentialsError(client_id=client_id, cluster_id=cluster_id) def get_connection_uri(self) -> str: return f"{self.audience}:443" diff --git a/pyzeebe/credentials/oauth_credentials.py b/pyzeebe/credentials/oauth_credentials.py index 48374635..0c0985a7 100644 --- a/pyzeebe/credentials/oauth_credentials.py +++ b/pyzeebe/credentials/oauth_credentials.py @@ -4,7 +4,7 @@ from requests_oauthlib import OAuth2Session from pyzeebe.credentials.base_credentials import BaseCredentials -from pyzeebe.exceptions import InvalidOAuthCredentials +from pyzeebe.errors import InvalidOAuthCredentialsError class OAuthCredentials(BaseCredentials): @@ -34,7 +34,7 @@ def get_access_token(url: str, client_id: str, client_secret: str, audience: str response.raise_for_status() return response.json()["access_token"] except HTTPError: - raise InvalidOAuthCredentials(url=url, client_id=client_id, audience=audience) + raise InvalidOAuthCredentialsError(url=url, client_id=client_id, audience=audience) def get_connection_uri(self) -> str: return None diff --git a/pyzeebe/decorators/__init__.py b/pyzeebe/decorators/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pyzeebe/decorators/zeebe_decorator_base.py b/pyzeebe/decorators/zeebe_decorator_base.py deleted file mode 100644 index 3cd39ce7..00000000 --- a/pyzeebe/decorators/zeebe_decorator_base.py +++ /dev/null @@ -1,15 +0,0 @@ -from typing import List - -from pyzeebe.task.task_decorator import TaskDecorator - - -class ZeebeDecoratorBase(object): - def __init__(self, before: List[TaskDecorator] = None, after: List[TaskDecorator] = None): - self._before: List[TaskDecorator] = before or [] - self._after: List[TaskDecorator] = after or [] - - def before(self, *decorators: TaskDecorator) -> None: - self._before.extend(decorators) - - def after(self, *decorators: TaskDecorator) -> None: - self._after.extend(decorators) diff --git a/pyzeebe/errors/__init__.py b/pyzeebe/errors/__init__.py new file mode 100644 index 00000000..dd348601 --- /dev/null +++ b/pyzeebe/errors/__init__.py @@ -0,0 +1,6 @@ +from .credentials_errors import * +from .job_errors import * +from .message_errors import * +from .pyzeebe_errors import * +from .workflow_errors import * +from .zeebe_errors import * diff --git a/pyzeebe/exceptions/credentials_exceptions.py b/pyzeebe/errors/credentials_errors.py similarity index 68% rename from pyzeebe/exceptions/credentials_exceptions.py rename to pyzeebe/errors/credentials_errors.py index 5fe5a1ab..9c41e3c4 100644 --- a/pyzeebe/exceptions/credentials_exceptions.py +++ b/pyzeebe/errors/credentials_errors.py @@ -1,12 +1,12 @@ -from pyzeebe.exceptions.pyzeebe_exceptions import PyZeebeException +from pyzeebe.errors.pyzeebe_errors import PyZeebeError -class InvalidOAuthCredentials(PyZeebeException): +class InvalidOAuthCredentialsError(PyZeebeError): def __init__(self, url: str, client_id: str, audience: str): super().__init__( f"Invalid OAuth credentials supplied for {url} with audience {audience} and client id {client_id}") -class InvalidCamundaCloudCredentials(PyZeebeException): +class InvalidCamundaCloudCredentialsError(PyZeebeError): def __init__(self, client_id: str, cluster_id: str): super().__init__(f"Invalid credentials supplied for cluster {cluster_id} with client {client_id}") diff --git a/pyzeebe/exceptions/job_exceptions.py b/pyzeebe/errors/job_errors.py similarity index 80% rename from pyzeebe/exceptions/job_exceptions.py rename to pyzeebe/errors/job_errors.py index feb27a42..849b430a 100644 --- a/pyzeebe/exceptions/job_exceptions.py +++ b/pyzeebe/errors/job_errors.py @@ -1,7 +1,7 @@ -from pyzeebe.exceptions.pyzeebe_exceptions import PyZeebeException +from pyzeebe.errors.pyzeebe_errors import PyZeebeError -class ActivateJobsRequestInvalid(PyZeebeException): +class ActivateJobsRequestInvalidError(PyZeebeError): def __init__(self, task_type: str, worker: str, timeout: int, max_jobs_to_activate: int): msg = "Failed to activate jobs. Reasons:" if task_type == "" or task_type is None: @@ -16,13 +16,13 @@ def __init__(self, task_type: str, worker: str, timeout: int, max_jobs_to_activa super().__init__(msg) -class JobAlreadyDeactivated(PyZeebeException): +class JobAlreadyDeactivatedError(PyZeebeError): def __init__(self, job_key: int): super().__init__(f"Job {job_key} was already stopped (Completed/Failed/Error)") self.job_key = job_key -class JobNotFound(PyZeebeException): +class JobNotFoundError(PyZeebeError): def __init__(self, job_key: int): super().__init__(f"Job {job_key} not found") self.job_key = job_key diff --git a/pyzeebe/errors/message_errors.py b/pyzeebe/errors/message_errors.py new file mode 100644 index 00000000..fc22a962 --- /dev/null +++ b/pyzeebe/errors/message_errors.py @@ -0,0 +1,5 @@ +from pyzeebe.errors.pyzeebe_errors import PyZeebeError + + +class MessageAlreadyExistsError(PyZeebeError): + pass diff --git a/pyzeebe/exceptions/pyzeebe_exceptions.py b/pyzeebe/errors/pyzeebe_errors.py similarity index 57% rename from pyzeebe/exceptions/pyzeebe_exceptions.py rename to pyzeebe/errors/pyzeebe_errors.py index ecc25afb..89118fb1 100644 --- a/pyzeebe/exceptions/pyzeebe_exceptions.py +++ b/pyzeebe/errors/pyzeebe_errors.py @@ -1,25 +1,26 @@ -class PyZeebeException(Exception): +class PyZeebeError(Exception): pass -class TaskNotFound(PyZeebeException): +class TaskNotFoundError(PyZeebeError): pass -class NoVariableNameGiven(PyZeebeException): +class NoVariableNameGivenError(PyZeebeError): def __init__(self, task_type: str): super().__init__(f"No variable name given for single_value task {task_type}") self.task_type = task_type -class NoZeebeAdapter(PyZeebeException): +class NoZeebeAdapterError(PyZeebeError): pass -class DuplicateTaskType(PyZeebeException): +class DuplicateTaskTypeError(PyZeebeError): def __init__(self, task_type: str): super().__init__(f"Task with type {task_type} already exists") self.task_type = task_type -class MaxConsecutiveTaskThreadError(PyZeebeException): + +class MaxConsecutiveTaskThreadError(PyZeebeError): pass diff --git a/pyzeebe/exceptions/workflow_exceptions.py b/pyzeebe/errors/workflow_errors.py similarity index 70% rename from pyzeebe/exceptions/workflow_exceptions.py rename to pyzeebe/errors/workflow_errors.py index 688e6cfd..f6bb55fd 100644 --- a/pyzeebe/exceptions/workflow_exceptions.py +++ b/pyzeebe/errors/workflow_errors.py @@ -1,7 +1,7 @@ -from pyzeebe.exceptions.pyzeebe_exceptions import PyZeebeException +from pyzeebe.errors.pyzeebe_errors import PyZeebeError -class WorkflowNotFound(PyZeebeException): +class WorkflowNotFoundError(PyZeebeError): def __init__(self, bpmn_process_id: str, version: int): super().__init__( f"Workflow definition: {bpmn_process_id} with {version} was not found") @@ -9,21 +9,21 @@ def __init__(self, bpmn_process_id: str, version: int): self.version = version -class WorkflowInstanceNotFound(PyZeebeException): +class WorkflowInstanceNotFoundError(PyZeebeError): def __init__(self, workflow_instance_key: int): super().__init__(f"Workflow instance key: {workflow_instance_key} was not found") self.workflow_instance_key = workflow_instance_key -class WorkflowHasNoStartEvent(PyZeebeException): +class WorkflowHasNoStartEventError(PyZeebeError): def __init__(self, bpmn_process_id: str): super().__init__(f"Workflow {bpmn_process_id} has no start event that can be called manually") self.bpmn_process_id = bpmn_process_id -class WorkflowInvalid(PyZeebeException): +class WorkflowInvalidError(PyZeebeError): pass -class InvalidJSON(PyZeebeException): +class InvalidJSONError(PyZeebeError): pass diff --git a/pyzeebe/errors/zeebe_errors.py b/pyzeebe/errors/zeebe_errors.py new file mode 100644 index 00000000..e2bf3cc5 --- /dev/null +++ b/pyzeebe/errors/zeebe_errors.py @@ -0,0 +1,13 @@ +from pyzeebe.errors.pyzeebe_errors import PyZeebeError + + +class ZeebeBackPressureError(PyZeebeError): + pass + + +class ZeebeGatewayUnavailableError(PyZeebeError): + pass + + +class ZeebeInternalError(PyZeebeError): + pass diff --git a/pyzeebe/exceptions/__init__.py b/pyzeebe/exceptions/__init__.py deleted file mode 100644 index 7e2ff3a4..00000000 --- a/pyzeebe/exceptions/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -from .credentials_exceptions import * -from .job_exceptions import * -from .message_exceptions import * -from .pyzeebe_exceptions import * -from .workflow_exceptions import * -from .zeebe_exceptions import * diff --git a/pyzeebe/exceptions/message_exceptions.py b/pyzeebe/exceptions/message_exceptions.py deleted file mode 100644 index d93c5068..00000000 --- a/pyzeebe/exceptions/message_exceptions.py +++ /dev/null @@ -1,5 +0,0 @@ -from pyzeebe.exceptions.pyzeebe_exceptions import PyZeebeException - - -class MessageAlreadyExists(PyZeebeException): - pass diff --git a/pyzeebe/exceptions/zeebe_exceptions.py b/pyzeebe/exceptions/zeebe_exceptions.py deleted file mode 100644 index 7fe37a3d..00000000 --- a/pyzeebe/exceptions/zeebe_exceptions.py +++ /dev/null @@ -1,13 +0,0 @@ -from pyzeebe.exceptions.pyzeebe_exceptions import PyZeebeException - - -class ZeebeBackPressure(PyZeebeException): - pass - - -class ZeebeGatewayUnavailable(PyZeebeException): - pass - - -class ZeebeInternalError(PyZeebeException): - pass diff --git a/pyzeebe/grpc_internals/zeebe_adapter_base.py b/pyzeebe/grpc_internals/zeebe_adapter_base.py index 28eb6ef2..db79b3f9 100644 --- a/pyzeebe/grpc_internals/zeebe_adapter_base.py +++ b/pyzeebe/grpc_internals/zeebe_adapter_base.py @@ -5,7 +5,7 @@ from zeebe_grpc.gateway_pb2_grpc import GatewayStub from pyzeebe.credentials.base_credentials import BaseCredentials -from pyzeebe.exceptions import ZeebeBackPressure, ZeebeGatewayUnavailable, ZeebeInternalError +from pyzeebe.errors import ZeebeBackPressureError, ZeebeGatewayUnavailableError, ZeebeInternalError logger = logging.getLogger(__name__) @@ -81,12 +81,12 @@ def _should_retry(self): def _common_zeebe_grpc_errors(self, rpc_error: grpc.RpcError): if self.is_error_status(rpc_error, grpc.StatusCode.RESOURCE_EXHAUSTED): - raise ZeebeBackPressure() + raise ZeebeBackPressureError() elif self.is_error_status(rpc_error, grpc.StatusCode.UNAVAILABLE): self._current_connection_retries += 1 if not self._should_retry(): self._close() - raise ZeebeGatewayUnavailable() + raise ZeebeGatewayUnavailableError() elif self.is_error_status(rpc_error, grpc.StatusCode.INTERNAL): self._current_connection_retries += 1 if not self._should_retry(): diff --git a/pyzeebe/grpc_internals/zeebe_job_adapter.py b/pyzeebe/grpc_internals/zeebe_job_adapter.py index 3436ff11..86b29f8a 100644 --- a/pyzeebe/grpc_internals/zeebe_job_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_job_adapter.py @@ -6,7 +6,7 @@ from zeebe_grpc.gateway_pb2 import ActivateJobsRequest, CompleteJobRequest, CompleteJobResponse, FailJobRequest, \ FailJobResponse, ThrowErrorRequest, ThrowErrorResponse -from pyzeebe.exceptions import ActivateJobsRequestInvalid, JobAlreadyDeactivated, JobNotFound +from pyzeebe.errors import ActivateJobsRequestInvalidError, JobAlreadyDeactivatedError, JobNotFoundError from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase from pyzeebe.job.job import Job @@ -27,7 +27,7 @@ def activate_jobs(self, task_type: str, worker: str, timeout: int, max_jobs_to_a yield job except grpc.RpcError as rpc_error: if self.is_error_status(rpc_error, grpc.StatusCode.INVALID_ARGUMENT): - raise ActivateJobsRequestInvalid(task_type, worker, timeout, max_jobs_to_activate) + raise ActivateJobsRequestInvalidError(task_type, worker, timeout, max_jobs_to_activate) else: self._common_zeebe_grpc_errors(rpc_error) @@ -51,9 +51,9 @@ def complete_job(self, job_key: int, variables: Dict) -> CompleteJobResponse: return self._gateway_stub.CompleteJob(CompleteJobRequest(jobKey=job_key, variables=json.dumps(variables))) except grpc.RpcError as rpc_error: if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND): - raise JobNotFound(job_key=job_key) + raise JobNotFoundError(job_key=job_key) elif self.is_error_status(rpc_error, grpc.StatusCode.FAILED_PRECONDITION): - raise JobAlreadyDeactivated(job_key=job_key) + raise JobAlreadyDeactivatedError(job_key=job_key) else: self._common_zeebe_grpc_errors(rpc_error) @@ -62,9 +62,9 @@ def fail_job(self, job_key: int, message: str) -> FailJobResponse: return self._gateway_stub.FailJob(FailJobRequest(jobKey=job_key, errorMessage=message)) except grpc.RpcError as rpc_error: if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND): - raise JobNotFound(job_key=job_key) + raise JobNotFoundError(job_key=job_key) elif self.is_error_status(rpc_error, grpc.StatusCode.FAILED_PRECONDITION): - raise JobAlreadyDeactivated(job_key=job_key) + raise JobAlreadyDeactivatedError(job_key=job_key) else: self._common_zeebe_grpc_errors(rpc_error) @@ -74,8 +74,8 @@ def throw_error(self, job_key: int, message: str) -> ThrowErrorResponse: ThrowErrorRequest(jobKey=job_key, errorMessage=message)) except grpc.RpcError as rpc_error: if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND): - raise JobNotFound(job_key=job_key) + raise JobNotFoundError(job_key=job_key) elif self.is_error_status(rpc_error, grpc.StatusCode.FAILED_PRECONDITION): - raise JobAlreadyDeactivated(job_key=job_key) + raise JobAlreadyDeactivatedError(job_key=job_key) else: self._common_zeebe_grpc_errors(rpc_error) diff --git a/pyzeebe/grpc_internals/zeebe_message_adapter.py b/pyzeebe/grpc_internals/zeebe_message_adapter.py index 01d9a75e..8bf16aee 100644 --- a/pyzeebe/grpc_internals/zeebe_message_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_message_adapter.py @@ -4,7 +4,7 @@ import grpc from zeebe_grpc.gateway_pb2 import PublishMessageRequest, PublishMessageResponse -from pyzeebe.exceptions import MessageAlreadyExists +from pyzeebe.errors import MessageAlreadyExistsError from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase @@ -17,6 +17,6 @@ def publish_message(self, name: str, correlation_key: str, time_to_live_in_milli timeToLive=time_to_live_in_milliseconds, variables=json.dumps(variables))) except grpc.RpcError as rpc_error: if self.is_error_status(rpc_error, grpc.StatusCode.ALREADY_EXISTS): - raise MessageAlreadyExists() + raise MessageAlreadyExistsError() else: self._common_zeebe_grpc_errors(rpc_error) diff --git a/pyzeebe/grpc_internals/zeebe_workflow_adapter.py b/pyzeebe/grpc_internals/zeebe_workflow_adapter.py index 473d7261..c474ef05 100644 --- a/pyzeebe/grpc_internals/zeebe_workflow_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_workflow_adapter.py @@ -6,8 +6,8 @@ from zeebe_grpc.gateway_pb2 import CreateWorkflowInstanceRequest, CreateWorkflowInstanceWithResultRequest, \ CancelWorkflowInstanceRequest, WorkflowRequestObject, DeployWorkflowRequest, DeployWorkflowResponse -from pyzeebe.exceptions import InvalidJSON, WorkflowNotFound, WorkflowInstanceNotFound, WorkflowHasNoStartEvent, \ - WorkflowInvalid +from pyzeebe.errors import InvalidJSONError, WorkflowNotFoundError, WorkflowInstanceNotFoundError, WorkflowHasNoStartEventError, \ + WorkflowInvalidError from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase @@ -36,12 +36,12 @@ def create_workflow_instance_with_result(self, bpmn_process_id: str, version: in def _create_workflow_errors(self, rpc_error: grpc.RpcError, bpmn_process_id: str, version: int, variables: Dict) -> None: if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND): - raise WorkflowNotFound(bpmn_process_id=bpmn_process_id, version=version) + raise WorkflowNotFoundError(bpmn_process_id=bpmn_process_id, version=version) elif self.is_error_status(rpc_error, grpc.StatusCode.INVALID_ARGUMENT): - raise InvalidJSON( + raise InvalidJSONError( f"Cannot start workflow: {bpmn_process_id} with version {version}. Variables: {variables}") elif self.is_error_status(rpc_error, grpc.StatusCode.FAILED_PRECONDITION): - raise WorkflowHasNoStartEvent(bpmn_process_id=bpmn_process_id) + raise WorkflowHasNoStartEventError(bpmn_process_id=bpmn_process_id) else: self._common_zeebe_grpc_errors(rpc_error) @@ -51,7 +51,7 @@ def cancel_workflow_instance(self, workflow_instance_key: int) -> None: CancelWorkflowInstanceRequest(workflowInstanceKey=workflow_instance_key)) except grpc.RpcError as rpc_error: if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND): - raise WorkflowInstanceNotFound(workflow_instance_key=workflow_instance_key) + raise WorkflowInstanceNotFoundError(workflow_instance_key=workflow_instance_key) else: self._common_zeebe_grpc_errors(rpc_error) @@ -61,7 +61,7 @@ def deploy_workflow(self, *workflow_file_path: str) -> DeployWorkflowResponse: DeployWorkflowRequest(workflows=map(self._get_workflow_request_object, workflow_file_path))) except grpc.RpcError as rpc_error: if self.is_error_status(rpc_error, grpc.StatusCode.INVALID_ARGUMENT): - raise WorkflowInvalid() + raise WorkflowInvalidError() else: self._common_zeebe_grpc_errors(rpc_error) diff --git a/pyzeebe/job/job.py b/pyzeebe/job/job.py index 741eddd1..c9490aab 100644 --- a/pyzeebe/job/job.py +++ b/pyzeebe/job/job.py @@ -1,6 +1,6 @@ from typing import Dict -from pyzeebe.exceptions import NoZeebeAdapter +from pyzeebe.errors import NoZeebeAdapterError from pyzeebe.job.job_status import JobStatus @@ -30,16 +30,16 @@ def set_success_status(self) -> None: Success status means that the job has been completed as intended. Raises: - NoZeebeAdapter: If the job does not have a configured ZeebeAdapter - ZeebeBackPressure: If Zeebe is currently in back pressure (too many requests) - ZeebeGatewayUnavailable: If the Zeebe gateway is unavailable + NoZeebeAdapterError: If the job does not have a configured ZeebeAdapter + ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) + ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable ZeebeInternalError: If Zeebe experiences an internal error """ if self.zeebe_adapter: self.zeebe_adapter.complete_job(job_key=self.key, variables=self.variables) else: - raise NoZeebeAdapter() + raise NoZeebeAdapterError() def set_failure_status(self, message: str) -> None: """ @@ -50,16 +50,16 @@ def set_failure_status(self, message: str) -> None: message (str): The failure message that Zeebe will receive Raises: - NoZeebeAdapter: If the job does not have a configured ZeebeAdapter - ZeebeBackPressure: If Zeebe is currently in back pressure (too many requests) - ZeebeGatewayUnavailable: If the Zeebe gateway is unavailable + NoZeebeAdapterError: If the job does not have a configured ZeebeAdapter + ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) + ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable ZeebeInternalError: If Zeebe experiences an internal error """ if self.zeebe_adapter: self.zeebe_adapter.fail_job(job_key=self.key, message=message) else: - raise NoZeebeAdapter() + raise NoZeebeAdapterError() def set_error_status(self, message: str) -> None: """ @@ -70,16 +70,16 @@ def set_error_status(self, message: str) -> None: message (str): The error message that Zeebe will receive Raises: - NoZeebeAdapter: If the job does not have a configured ZeebeAdapter - ZeebeBackPressure: If Zeebe is currently in back pressure (too many requests) - ZeebeGatewayUnavailable: If the Zeebe gateway is unavailable + NoZeebeAdapterError: If the job does not have a configured ZeebeAdapter + ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) + ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable ZeebeInternalError: If Zeebe experiences an internal error """ if self.zeebe_adapter: self.zeebe_adapter.throw_error(job_key=self.key, message=message) else: - raise NoZeebeAdapter() + raise NoZeebeAdapterError() def __repr__(self): return str({"jobKey": self.key, "taskType": self.type, "workflowInstanceKey": self.workflow_instance_key, diff --git a/pyzeebe/task/task.py b/pyzeebe/task/task.py index 9780c210..c4bdb7e3 100644 --- a/pyzeebe/task/task.py +++ b/pyzeebe/task/task.py @@ -1,24 +1,19 @@ -from typing import Callable, List, Dict +from typing import Callable -from pyzeebe.decorators.zeebe_decorator_base import ZeebeDecoratorBase -from pyzeebe.job.job import Job -from pyzeebe.task.exception_handler import ExceptionHandler +from pyzeebe.task.task_config import TaskConfig +from pyzeebe.task.types import JobHandler -class Task(ZeebeDecoratorBase): - def __init__(self, task_type: str, task_handler: Callable[..., Dict], exception_handler: ExceptionHandler, - timeout: int = 10000, max_jobs_to_activate: int = 32, variables_to_fetch: List[str] = None, - before: List = None, after: List = None): - super().__init__(before=before, after=after) +class Task: + def __init__(self, original_function: Callable, job_handler: JobHandler, config: TaskConfig): + self.original_function = original_function + self.job_handler = job_handler + self.config = config - self.type = task_type - self.inner_function = task_handler - self.exception_handler = exception_handler - self.timeout = timeout - self.max_jobs_to_activate = max_jobs_to_activate - self.variables_to_fetch = variables_to_fetch or [] - self.handler: Callable[[Job], Job] = None + @property + def type(self): + return self.config.type - def __repr__(self) -> str: - return str({"type": self.type, "timeout": self.timeout, "max_jobs_to_activate": self.max_jobs_to_activate, - "variables_to_fetch": self.variables_to_fetch}) + def __repr__(self): + return f"Task(config= {self.config}, original_function={self.original_function}, " \ + f"job_handler={self.job_handler})" diff --git a/pyzeebe/task/task_builder.py b/pyzeebe/task/task_builder.py new file mode 100644 index 00000000..2d4d1fdb --- /dev/null +++ b/pyzeebe/task/task_builder.py @@ -0,0 +1,75 @@ +import inspect +import logging +from typing import List, Callable, Dict, Tuple + +from pyzeebe import Job, TaskDecorator +from pyzeebe.task.task import Task +from pyzeebe.task.task_config import TaskConfig +from pyzeebe.task.types import DecoratorRunner, JobHandler + +logger = logging.getLogger(__name__) + + +def build_task(task_function: Callable, task_config: TaskConfig) -> Task: + if task_config.single_value: + task_function = convert_to_dict_function( + task_function, task_config.variable_name) + + return Task(task_function, build_job_handler(task_function, task_config), task_config) + + +def build_job_handler(task_function: Callable, task_config: TaskConfig) -> JobHandler: + before_decorator_runner = create_decorator_runner(task_config.before) + after_decorator_runner = create_decorator_runner(task_config.after) + + def job_handler(job: Job) -> Job: + job = before_decorator_runner(job) + job.variables, succeeded = run_original_task_function( + task_function, task_config, job) + job = after_decorator_runner(job) + if succeeded: + job.set_success_status() + return job + + return job_handler + + +def run_original_task_function(task_function: Callable, task_config: TaskConfig, job: Job) -> Tuple[Dict, bool]: + try: + return task_function(**job.variables), True + except Exception as e: + logger.debug(f"Failed job: {job}. Error: {e}.") + task_config.exception_handler(e, job) + return job.variables, False + + +def create_decorator_runner(decorators: List[TaskDecorator]) -> DecoratorRunner: + def decorator_runner(job: Job): + for decorator in decorators: + job = run_decorator(decorator, job) + return job + + return decorator_runner + + +def run_decorator(decorator: TaskDecorator, job: Job) -> Job: + try: + return decorator(job) + except Exception as e: + logger.warning(f"Failed to run decorator {decorator}. Exception: {e}") + return job + + +def convert_to_dict_function(single_value_function: Callable, variable_name: str) -> Callable[..., Dict]: + def inner_fn(*args, **kwargs): + return {variable_name: single_value_function(*args, **kwargs)} + + return inner_fn + + +def get_parameters_from_function(task_function: Callable) -> List[str]: + function_signature = inspect.signature(task_function) + for _, parameter in function_signature.parameters.items(): + if parameter.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD): + return [] + return list(function_signature.parameters) diff --git a/pyzeebe/task/task_config.py b/pyzeebe/task/task_config.py new file mode 100644 index 00000000..0ba9f529 --- /dev/null +++ b/pyzeebe/task/task_config.py @@ -0,0 +1,31 @@ +from typing import List + +from pyzeebe.errors import NoVariableNameGivenError +from pyzeebe.task.exception_handler import ExceptionHandler +from pyzeebe.task.types import TaskDecorator + + +class TaskConfig: + def __init__(self, type: str, exception_handler: ExceptionHandler, + timeout_ms: int, max_jobs_to_activate: int, + variables_to_fetch: List[str], + single_value: bool, variable_name: str, before: List[TaskDecorator], + after: List[TaskDecorator]): + if single_value and not variable_name: + raise NoVariableNameGivenError(type) + + self.type = type + self.exception_handler = exception_handler + self.timeout_ms = timeout_ms + self.max_jobs_to_activate = max_jobs_to_activate + self.variables_to_fetch = variables_to_fetch + self.single_value = single_value + self.variable_name = variable_name + self.before = before + self.after = after + + def __repr__(self): + return f"TaskConfig(type={self.type}, exception_handler={self.exception_handler}, " \ + f"timeout_ms={self.timeout_ms}, max_jobs_to_activate={self.max_jobs_to_activate}, " \ + f"variables_to_fetch={self.variables_to_fetch}, single_value={self.single_value}, " \ + f"variable_name={self.variable_name}, before={self.before}, after={self.after})" diff --git a/pyzeebe/task/task_decorator.py b/pyzeebe/task/task_decorator.py deleted file mode 100644 index cdc18ed6..00000000 --- a/pyzeebe/task/task_decorator.py +++ /dev/null @@ -1,5 +0,0 @@ -from typing import Callable - -from pyzeebe.job.job import Job - -TaskDecorator = Callable[[Job], Job] diff --git a/pyzeebe/task/types.py b/pyzeebe/task/types.py new file mode 100644 index 00000000..a018c192 --- /dev/null +++ b/pyzeebe/task/types.py @@ -0,0 +1,7 @@ +from typing import Callable + +from pyzeebe import Job + +DecoratorRunner = Callable[[Job], Job] +JobHandler = Callable[[Job], Job] +TaskDecorator = Callable[[Job], Job] diff --git a/pyzeebe/worker/task_handler.py b/pyzeebe/worker/task_handler.py deleted file mode 100644 index a0b9be49..00000000 --- a/pyzeebe/worker/task_handler.py +++ /dev/null @@ -1,145 +0,0 @@ -import logging -from abc import abstractmethod -from typing import Tuple, List, Callable, Dict - -from pyzeebe.decorators.zeebe_decorator_base import ZeebeDecoratorBase -from pyzeebe.exceptions import NoVariableNameGiven, TaskNotFound, DuplicateTaskType -from pyzeebe.job.job import Job -from pyzeebe.task.exception_handler import ExceptionHandler -from pyzeebe.task.task import Task -from pyzeebe.task.task_decorator import TaskDecorator - -logger = logging.getLogger(__name__) - - -def default_exception_handler(e: Exception, job: Job) -> None: - logger.warning(f"Task type: {job.type} - failed job {job}. Error: {e}.") - job.set_failure_status(f"Failed job. Error: {e}") - - -class ZeebeTaskHandler(ZeebeDecoratorBase): - def __init__(self, before: List[TaskDecorator] = None, after: List[TaskDecorator] = None): - """ - Args: - before (List[TaskDecorator]): Decorators to be performed before each task - after (List[TaskDecorator]): Decorators to be performed after each task - """ - super().__init__(before, after) - self.tasks: List[Task] = [] - - def task(self, task_type: str, exception_handler: ExceptionHandler = default_exception_handler, - variables_to_fetch: List[str] = None, timeout: int = 10000, max_jobs_to_activate: int = 32, - before: List[TaskDecorator] = None, after: List[TaskDecorator] = None, single_value: bool = False, - variable_name: str = None): - """ - Decorator to create a task - - Args: - before (List[TaskDecorator]): All decorators which should be performed before the task. - after (List[TaskDecorator]): All decorators which should be performed after the task. - timeout (int): How long Zeebe should wait before the job is retried. Default: 10000 milliseconds - single_value (bool): If the function returns a single value (int, string, list) and not a dictionary set - this to True. Default: False - variable_name (str): If single_value then this will be the variable name given to zeebe: - { : } - timeout (int): Maximum duration of the task in milliseconds. If the timeout is surpasses Zeebe will give up - on the job and retry it. Default: 10000 - max_jobs_to_activate (int): Maximum jobs the worker will execute in parallel (of this task). Default: 32 - - Raises: - DuplicateTaskType: If a task from the router already exists in the worker - - """ - self._is_task_duplicate(task_type) - - if single_value and not variable_name: - raise NoVariableNameGiven(task_type=task_type) - - elif single_value and variable_name: - return self._non_dict_task(task_type=task_type, variable_name=variable_name, timeout=timeout, - max_jobs_to_activate=max_jobs_to_activate, exception_handler=exception_handler, - before=before, after=after, variables_to_fetch=variables_to_fetch) - - else: - return self._dict_task(task_type=task_type, exception_handler=exception_handler, before=before, after=after, - timeout=timeout, max_jobs_to_activate=max_jobs_to_activate, - variables_to_fetch=variables_to_fetch) - - @abstractmethod - def _dict_task(self, task_type: str, exception_handler: ExceptionHandler = default_exception_handler, - timeout: int = 10000, max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None, - after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None): - raise NotImplemented() - - @abstractmethod - def _non_dict_task(self, task_type: str, variable_name: str, - exception_handler: ExceptionHandler = default_exception_handler, timeout: int = 10000, - max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None, - after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None): - raise NotImplemented() - - @staticmethod - def _single_value_function_to_dict(variable_name: str, fn: Callable) -> Callable[..., Dict]: - def inner_fn(*args, **kwargs): - return {variable_name: fn(*args, **kwargs)} - - return inner_fn - - @staticmethod - def _get_parameters_from_function(fn: Callable) -> List[str]: - parameters = fn.__code__.co_varnames - if "args" in parameters: - return [] - elif "kwargs" in parameters: - return [] - else: - return list(parameters) - - def _is_task_duplicate(self, task_type: str) -> None: - try: - self.get_task(task_type) - raise DuplicateTaskType(task_type) - except TaskNotFound: - return - - def remove_task(self, task_type: str) -> Task: - """ - Remove a task - - Args: - task_type (str): The type of the wanted task - - Returns: - Task: The task that was removed - - Raises: - TaskNotFound: If no task with specified type exists - - """ - task_index = self._get_task_index(task_type) - return self.tasks.pop(task_index) - - def get_task(self, task_type: str) -> Task: - """ - Get a task by its type - - Args: - task_type (str): The type of the wanted task - - Returns: - Task: The wanted task - - Raises: - TaskNotFound: If no task with specified type exists - - """ - return self._get_task_and_index(task_type)[0] - - def _get_task_index(self, task_type: str) -> int: - return self._get_task_and_index(task_type)[-1] - - def _get_task_and_index(self, task_type: str) -> Tuple[Task, int]: - for index, task in enumerate(self.tasks): - if task.type == task_type: - return task, index - raise TaskNotFound(f"Could not find task {task_type}") diff --git a/pyzeebe/worker/task_router.py b/pyzeebe/worker/task_router.py index a5db9673..17a35918 100644 --- a/pyzeebe/worker/task_router.py +++ b/pyzeebe/worker/task_router.py @@ -1,63 +1,154 @@ -from typing import List, Callable, Dict +import logging +from typing import Callable, List, Tuple, Optional +from pyzeebe import TaskDecorator +from pyzeebe.errors import (DuplicateTaskTypeError, NoVariableNameGivenError, + TaskNotFoundError) +from pyzeebe.job.job import Job +from pyzeebe.task import task_builder from pyzeebe.task.exception_handler import ExceptionHandler from pyzeebe.task.task import Task -from pyzeebe.task.task_decorator import TaskDecorator -from pyzeebe.worker.task_handler import ZeebeTaskHandler, default_exception_handler +from pyzeebe.task.task_config import TaskConfig +logger = logging.getLogger(__name__) -class ZeebeTaskRouter(ZeebeTaskHandler): - def _dict_task(self, task_type: str, exception_handler: ExceptionHandler = default_exception_handler, - timeout: int = 10000, max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None, - after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None): - def wrapper(fn: Callable[..., Dict]): - nonlocal variables_to_fetch - if not variables_to_fetch: - variables_to_fetch = self._get_parameters_from_function(fn) - task = self._create_task(task_type=task_type, task_handler=fn, exception_handler=exception_handler, - timeout=timeout, max_jobs_to_activate=max_jobs_to_activate, before=before, - after=after, variables_to_fetch=variables_to_fetch) +def default_exception_handler(e: Exception, job: Job) -> None: + logger.warning(f"Task type: {job.type} - failed job {job}. Error: {e}.") + job.set_failure_status(f"Failed job. Error: {e}") - self.tasks.append(task) - return fn - return wrapper +class ZeebeTaskRouter: + def __init__(self, before: List[TaskDecorator] = None, after: List[TaskDecorator] = None): + """ + Args: + before (List[TaskDecorator]): Decorators to be performed before each task + after (List[TaskDecorator]): Decorators to be performed after each task + """ + self._before: List[TaskDecorator] = before or [] + self._after: List[TaskDecorator] = after or [] + self.tasks: List[Task] = [] - def _non_dict_task(self, task_type: str, variable_name: str, - exception_handler: ExceptionHandler = default_exception_handler, timeout: int = 10000, - max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None, - after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None): - def wrapper(fn: Callable[..., Dict]): - nonlocal variables_to_fetch - if not variables_to_fetch: - variables_to_fetch = self._get_parameters_from_function(fn) + def task(self, task_type: str, exception_handler: ExceptionHandler = default_exception_handler, + variables_to_fetch: Optional[List[str]] = None, timeout_ms: int = 10000, max_jobs_to_activate: int = 32, + before: List[TaskDecorator] = None, after: List[TaskDecorator] = None, single_value: bool = False, + variable_name: str = None): + """ + Decorator to create a task + Args: + task_type (str): The task type + exception_handler (ExceptionHandler): Handler that will be called when a job fails. + variables_to_fetch (Optional[List[str]]): The variables to request from Zeebe when activating jobs. + timeout_ms (int): Maximum duration of the task in milliseconds. If the timeout is surpassed Zeebe will give up + on the worker and retry it. Default: 10000 (10 seconds). + max_jobs_to_activate (int): Maximum jobs the worker will execute in parallel (of this task). Default: 32 + before (List[TaskDecorator]): All decorators which should be performed before the task. + after (List[TaskDecorator]): All decorators which should be performed after the task. + single_value (bool): If the function returns a single value (int, string, list) and not a dictionary set + this to True. Default: False + variable_name (str): If single_value then this will be the variable name given to zeebe: + { : } + Raises: + DuplicateTaskTypeError: If a task from the router already exists in the worker + NoVariableNameGivenError: When single_value is set, but no variable_name is given + """ + def task_wrapper(task_function: Callable): + config = TaskConfig( + task_type, + exception_handler, + timeout_ms, + max_jobs_to_activate, + variables_to_fetch or task_builder.get_parameters_from_function( + task_function), + single_value, + variable_name or "", + before or [], + after or [] + ) + config_with_decorators = self._add_decorators_to_config(config) - dict_fn = self._single_value_function_to_dict(variable_name=variable_name, fn=fn) + task = task_builder.build_task( + task_function, config_with_decorators + ) + self._add_task(task) + return task_function - task = self._create_task(task_type=task_type, task_handler=dict_fn, exception_handler=exception_handler, - timeout=timeout, max_jobs_to_activate=max_jobs_to_activate, before=before, - after=after, variables_to_fetch=variables_to_fetch) + return task_wrapper - self.tasks.append(task) - return fn + def _add_task(self, task: Task): + self._is_task_duplicate(task.type) + self.tasks.append(task) - return wrapper + def _add_decorators_to_config(self, config: TaskConfig) -> TaskConfig: + before_decorators = self._before.copy() + before_decorators.extend(config.before) + config.before = before_decorators + config.after.extend(self._after) + return config - def _create_task(self, task_type: str, task_handler: Callable, exception_handler: ExceptionHandler, - timeout: int = 10000, max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None, - after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None) -> Task: - task = Task(task_type=task_type, task_handler=task_handler, exception_handler=exception_handler, - timeout=timeout, max_jobs_to_activate=max_jobs_to_activate, variables_to_fetch=variables_to_fetch) - return self._add_decorators_to_task(task, before or [], after or []) + def _is_task_duplicate(self, task_type: str) -> None: + try: + self.get_task(task_type) + raise DuplicateTaskTypeError(task_type) + except TaskNotFoundError: + return - def _add_decorators_to_task(self, task: Task, before: List[TaskDecorator], - after: List[TaskDecorator]) -> Task: - before_decorators = self._before.copy() - before_decorators.extend(before) + def before(self, *decorators: TaskDecorator) -> None: + """ + Add decorators to be performed before a job is run + + Args: + decorators (Iterable[TaskDecorator]): The decorators to be performed before each job is run + """ + self._before.extend(decorators) + + def after(self, *decorators: TaskDecorator) -> None: + """ + Add decorators to be performed after a job is run + + Args: + decorators (Iterable[TaskDecorator]): The decorators to be performed after each job is run + """ + self._after.extend(decorators) + + def remove_task(self, task_type: str) -> Task: + """ + Remove a task + + Args: + task_type (str): The type of the wanted task + + Returns: + Task: The task that was removed + + Raises: + TaskNotFoundError: If no task with specified type exists + + """ + task_index = self._get_task_index(task_type) + return self.tasks.pop(task_index) + + def get_task(self, task_type: str) -> Task: + """ + Get a task by its type + + Args: + task_type (str): The type of the wanted task + + Returns: + Task: The wanted task + + Raises: + TaskNotFoundError: If no task with specified type exists + + """ + return self._get_task_and_index(task_type)[0] - after.extend(self._after) + def _get_task_index(self, task_type: str) -> int: + return self._get_task_and_index(task_type)[1] - task.before(*before_decorators) - task.after(*after) - return task + def _get_task_and_index(self, task_type: str) -> Tuple[Task, int]: + for index, task in enumerate(self.tasks): + if task.type == task_type: + return task, index + raise TaskNotFoundError(f"Could not find task {task_type}") diff --git a/pyzeebe/worker/worker.py b/pyzeebe/worker/worker.py index 715b44df..32a28686 100644 --- a/pyzeebe/worker/worker.py +++ b/pyzeebe/worker/worker.py @@ -2,22 +2,20 @@ import socket import time from threading import Thread, Event -from typing import List, Callable, Generator, Tuple, Dict, Union +from typing import List, Generator, Dict +from pyzeebe import TaskDecorator from pyzeebe.credentials.base_credentials import BaseCredentials -from pyzeebe.exceptions.pyzeebe_exceptions import MaxConsecutiveTaskThreadError +from pyzeebe.errors.pyzeebe_errors import MaxConsecutiveTaskThreadError from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter from pyzeebe.job.job import Job -from pyzeebe.task.exception_handler import ExceptionHandler from pyzeebe.task.task import Task -from pyzeebe.task.task_decorator import TaskDecorator -from pyzeebe.worker.task_handler import ZeebeTaskHandler, default_exception_handler from pyzeebe.worker.task_router import ZeebeTaskRouter logger = logging.getLogger(__name__) -class ZeebeWorker(ZeebeTaskHandler): +class ZeebeWorker(ZeebeTaskRouter): """A zeebe worker that can connect to a zeebe instance and perform tasks.""" def __init__(self, name: str = None, request_timeout: int = 0, hostname: str = None, port: int = None, @@ -44,7 +42,7 @@ def __init__(self, name: str = None, request_timeout: int = 0, hostname: str = N self.stop_event = Event() self._task_threads: Dict[str, Thread] = {} self.watcher_max_errors_factor = watcher_max_errors_factor - self._watcher_thread = None + self._watcher_thread = None def work(self, watch: bool = False) -> None: """ @@ -54,9 +52,9 @@ def work(self, watch: bool = False) -> None: watch (bool): Start a watcher thread that restarts task threads on error Raises: - ActivateJobsRequestInvalid: If one of the worker's task has invalid types - ZeebeBackPressure: If Zeebe is currently in back pressure (too many requests) - ZeebeGatewayUnavailable: If the Zeebe gateway is unavailable + ActivateJobsRequestInvalidError: If one of the worker's task has invalid types + ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) + ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable ZeebeInternalError: If Zeebe experiences an internal error """ @@ -171,7 +169,7 @@ def _handle_task(self, task: Task) -> None: def _handle_jobs(self, task: Task) -> None: for job in self._get_jobs(task): - thread = Thread(target=task.handler, + thread = Thread(target=task.job_handler, args=(job,), name=f"{self.__class__.__name__}-Job-{job.type}") logger.debug(f"Running job: {job}") @@ -179,9 +177,9 @@ def _handle_jobs(self, task: Task) -> None: def _get_jobs(self, task: Task) -> Generator[Job, None, None]: logger.debug(f"Activating jobs for task: {task}") - return self.zeebe_adapter.activate_jobs(task_type=task.type, worker=self.name, timeout=task.timeout, - max_jobs_to_activate=task.max_jobs_to_activate, - variables_to_fetch=task.variables_to_fetch, + return self.zeebe_adapter.activate_jobs(task_type=task.type, worker=self.name, timeout=task.config.timeout_ms, + max_jobs_to_activate=task.config.max_jobs_to_activate, + variables_to_fetch=task.config.variables_to_fetch, request_timeout=self.request_timeout) def include_router(self, *routers: ZeebeTaskRouter) -> None: @@ -189,111 +187,10 @@ def include_router(self, *routers: ZeebeTaskRouter) -> None: Adds all router's tasks to the worker. Raises: - DuplicateTaskType: If a task from the router already exists in the worker + DuplicateTaskTypeError: If a task from the router already exists in the worker """ for router in routers: for task in router.tasks: + task.config = self._add_decorators_to_config(task.config) self._add_task(task) - - def _dict_task(self, task_type: str, exception_handler: ExceptionHandler = default_exception_handler, - timeout: int = 10000, max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None, - after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None): - def wrapper(fn: Callable[..., Dict]): - nonlocal variables_to_fetch - if not variables_to_fetch: - variables_to_fetch = self._get_parameters_from_function(fn) - - task = Task(task_type=task_type, task_handler=fn, exception_handler=exception_handler, timeout=timeout, - max_jobs_to_activate=max_jobs_to_activate, before=before, after=after, - variables_to_fetch=variables_to_fetch) - self._add_task(task) - - return fn - - return wrapper - - def _non_dict_task(self, task_type: str, variable_name: str, - exception_handler: ExceptionHandler = default_exception_handler, timeout: int = 10000, - max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None, - after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None): - def wrapper(fn: Callable[..., Union[str, bool, int, List]]): - nonlocal variables_to_fetch - if not variables_to_fetch: - variables_to_fetch = self._get_parameters_from_function(fn) - - dict_fn = self._single_value_function_to_dict(variable_name=variable_name, fn=fn) - - task = Task(task_type=task_type, task_handler=dict_fn, exception_handler=exception_handler, timeout=timeout, - max_jobs_to_activate=max_jobs_to_activate, before=before, after=after, - variables_to_fetch=variables_to_fetch) - self._add_task(task) - - return fn - - return wrapper - - def _add_task(self, task: Task) -> None: - self._is_task_duplicate(task.type) - task.handler = self._create_task_handler(task) - self.tasks.append(task) - - def _create_task_handler(self, task: Task) -> Callable[[Job], Job]: - before_decorator_runner = self._create_before_decorator_runner(task) - after_decorator_runner = self._create_after_decorator_runner(task) - - def task_handler(job: Job) -> Job: - job = before_decorator_runner(job) - job, task_succeeded = self._run_task_inner_function(task, job) - job = after_decorator_runner(job) - if task_succeeded: - self._complete_job(job) - return job - - return task_handler - - @staticmethod - def _run_task_inner_function(task: Task, job: Job) -> Tuple[Job, bool]: - task_succeeded = False - try: - job.variables = task.inner_function(**job.variables) - task_succeeded = True - except Exception as e: - logger.debug(f"Failed job: {job}. Error: {e}.") - task.exception_handler(e, job) - finally: - return job, task_succeeded - - def _complete_job(self, job: Job) -> None: - try: - logger.debug(f"Completing job: {job}") - self.zeebe_adapter.complete_job(job_key=job.key, variables=job.variables) - except Exception as e: - logger.warning(f"Failed to complete job: {job}. Error: {e}") - - def _create_before_decorator_runner(self, task: Task) -> Callable[[Job], Job]: - decorators = task._before.copy() - decorators.extend(self._before) - return self._create_decorator_runner(decorators) - - def _create_after_decorator_runner(self, task: Task) -> Callable[[Job], Job]: - decorators = self._after.copy() - decorators.extend(task._after) - return self._create_decorator_runner(decorators) - - @staticmethod - def _create_decorator_runner(decorators: List[TaskDecorator]) -> Callable[[Job], Job]: - def decorator_runner(job: Job): - for decorator in decorators: - job = ZeebeWorker._run_decorator(decorator, job) - return job - - return decorator_runner - - @staticmethod - def _run_decorator(decorator: TaskDecorator, job: Job) -> Job: - try: - return decorator(job) - except Exception as e: - logger.warning(f"Failed to run decorator {decorator}. Error: {e}") - return job diff --git a/tests/conftest.py b/tests/conftest.py index b1f43ef6..865c7a7f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,10 +5,11 @@ import pytest -from pyzeebe import ZeebeClient, ZeebeWorker, ZeebeTaskRouter, Job +from pyzeebe import ZeebeClient, ZeebeWorker, Job from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter -from pyzeebe.task.task import Task -from pyzeebe.worker.task_handler import ZeebeTaskHandler +from pyzeebe.task import task_builder +from pyzeebe.task.task_config import TaskConfig +from pyzeebe.worker.task_router import ZeebeTaskRouter from tests.unit.utils.gateway_mock import GatewayMock from tests.unit.utils.random_utils import random_job @@ -18,6 +19,14 @@ def job_with_adapter(zeebe_adapter): return random_job(zeebe_adapter=zeebe_adapter) +@pytest.fixture +def mocked_job_with_adapter(job_with_adapter): + job_with_adapter.set_success_status = MagicMock() + job_with_adapter.set_failure_status = MagicMock() + job_with_adapter.set_error_status = MagicMock() + return job_with_adapter + + @pytest.fixture def job_without_adapter(): return random_job() @@ -48,8 +57,29 @@ def zeebe_worker(zeebe_adapter): @pytest.fixture -def task(task_type): - return Task(task_type, MagicMock(wraps=lambda x: dict(x=x)), MagicMock(wraps=lambda x, y, z: x)) +def task(original_task_function, task_config): + return task_builder.build_task(original_task_function, task_config) + + +@pytest.fixture +def first_active_job(task, job_from_task, grpc_servicer) -> str: + grpc_servicer.active_jobs[job_from_task.key] = job_from_task + return job_from_task + + +@pytest.fixture +def task_config(task_type): + return TaskConfig( + type=task_type, + exception_handler=MagicMock(), + timeout_ms=10000, + max_jobs_to_activate=32, + variables_to_fetch=[], + single_value=False, + variable_name="", + before=[], + after=[] + ) @pytest.fixture @@ -57,6 +87,16 @@ def task_type(): return str(uuid4()) +@pytest.fixture +def original_task_function(): + def original_function(): + pass + + mock = MagicMock(wraps=original_function) + mock.__code__ = original_function.__code__ + return mock + + @pytest.fixture def stop_after_test(): stop_test = Event() @@ -92,11 +132,6 @@ def routers(): return [ZeebeTaskRouter() for _ in range(0, randint(2, 100))] -@pytest.fixture -def task_handler(): - return ZeebeTaskHandler() - - @pytest.fixture def decorator(): def simple_decorator(job: Job) -> Job: diff --git a/tests/integration/integration_test.py b/tests/integration/integration_test.py index 59141bfc..576a5245 100644 --- a/tests/integration/integration_test.py +++ b/tests/integration/integration_test.py @@ -4,32 +4,36 @@ import pytest -from pyzeebe import ZeebeWorker, ZeebeClient, Job -from pyzeebe.exceptions import WorkflowNotFound +from pyzeebe import ZeebeClient, ZeebeWorker, Job +from pyzeebe.errors import WorkflowNotFoundError -zeebe_client: ZeebeClient -zeebe_worker = ZeebeWorker() +@pytest.fixture(scope="session") +def zeebe_client(): + return ZeebeClient() -def exception_handler(exc: Exception, job: Job) -> None: - job.set_error_status(f"Failed to run task {job.type}. Reason: {exc}") +@pytest.fixture(scope="session") +def zeebe_worker(): + worker = ZeebeWorker() -@zeebe_worker.task(task_type="test", exception_handler=exception_handler) -def task_handler(should_throw: bool, input: str) -> Dict: - if should_throw: - raise Exception("Error thrown") - else: - return {"output": input + str(uuid4())} + def exception_handler(exc: Exception, job: Job) -> None: + job.set_error_status(f"Failed to run task {job.type}. Reason: {exc}") + @worker.task("test", exception_handler) + def task_handler(should_throw: bool, input: str) -> Dict: + if should_throw: + raise Exception("Error thrown") + else: + return {"output": input + str(uuid4())} + + return worker -@pytest.fixture(scope="module", autouse=True) -def setup(): - global zeebe_client, task_handler +@pytest.fixture(scope="module", autouse=True) +def setup(zeebe_worker, zeebe_client): zeebe_worker.work(watch=True) - zeebe_client = ZeebeClient() try: integration_tests_path = os.path.join("tests", "integration") zeebe_client.deploy_workflow( @@ -38,12 +42,12 @@ def setup(): except FileNotFoundError: zeebe_client.deploy_workflow("test.bpmn") - yield zeebe_client + yield zeebe_worker.stop(wait=True) assert not zeebe_worker._watcher_thread.is_alive() -def test_run_workflow(): +def test_run_workflow(zeebe_client: ZeebeClient): workflow_key = zeebe_client.run_workflow( "test", {"input": str(uuid4()), "should_throw": False} @@ -51,12 +55,12 @@ def test_run_workflow(): assert isinstance(workflow_key, int) -def test_non_existent_workflow(): - with pytest.raises(WorkflowNotFound): +def test_non_existent_workflow(zeebe_client: ZeebeClient): + with pytest.raises(WorkflowNotFoundError): zeebe_client.run_workflow(str(uuid4())) -def test_run_workflow_with_result(): +def test_run_workflow_with_result(zeebe_client: ZeebeClient): input = str(uuid4()) workflow_instance_key, workflow_result = zeebe_client.run_workflow_with_result( "test", @@ -67,7 +71,7 @@ def test_run_workflow_with_result(): assert workflow_result["output"].startswith(input) -def test_cancel_workflow(): +def test_cancel_workflow(zeebe_client: ZeebeClient): workflow_key = zeebe_client.run_workflow( "test", {"input": str(uuid4()), "should_throw": False} diff --git a/tests/unit/client/client_test.py b/tests/unit/client/client_test.py index b1c62a8a..77ba31ff 100644 --- a/tests/unit/client/client_test.py +++ b/tests/unit/client/client_test.py @@ -4,7 +4,7 @@ import pytest -from pyzeebe.exceptions import WorkflowNotFound +from pyzeebe.errors import WorkflowNotFoundError def test_run_workflow(zeebe_client, grpc_servicer): @@ -46,12 +46,12 @@ def test_deploy_workflow(zeebe_client): def test_run_non_existent_workflow(zeebe_client): - with pytest.raises(WorkflowNotFound): + with pytest.raises(WorkflowNotFoundError): zeebe_client.run_workflow(bpmn_process_id=str(uuid4())) def test_run_non_existent_workflow_with_result(zeebe_client): - with pytest.raises(WorkflowNotFound): + with pytest.raises(WorkflowNotFoundError): zeebe_client.run_workflow_with_result(bpmn_process_id=str(uuid4())) diff --git a/tests/unit/credentials/camunda_cloud_credentials_test.py b/tests/unit/credentials/camunda_cloud_credentials_test.py index 51d104c5..933d6b87 100644 --- a/tests/unit/credentials/camunda_cloud_credentials_test.py +++ b/tests/unit/credentials/camunda_cloud_credentials_test.py @@ -4,7 +4,7 @@ import pytest from pyzeebe.credentials.camunda_cloud_credentials import CamundaCloudCredentials -from pyzeebe.exceptions import InvalidOAuthCredentials, InvalidCamundaCloudCredentials +from pyzeebe.errors import InvalidOAuthCredentialsError, InvalidCamundaCloudCredentialsError def test_init(): @@ -20,7 +20,7 @@ def test_init(): def test_invalid_credentials(): CamundaCloudCredentials.get_access_token = MagicMock( - side_effect=InvalidOAuthCredentials(str(uuid4()), str(uuid4()), str(uuid4()))) + side_effect=InvalidOAuthCredentialsError(str(uuid4()), str(uuid4()), str(uuid4()))) - with pytest.raises(InvalidCamundaCloudCredentials): + with pytest.raises(InvalidCamundaCloudCredentialsError): CamundaCloudCredentials(str(uuid4()), str(uuid4()), str(uuid4())) diff --git a/tests/unit/credentials/oauth_credentials_test.py b/tests/unit/credentials/oauth_credentials_test.py index 23b33bfb..c2ea1706 100644 --- a/tests/unit/credentials/oauth_credentials_test.py +++ b/tests/unit/credentials/oauth_credentials_test.py @@ -5,7 +5,7 @@ from requests import HTTPError from pyzeebe.credentials.oauth_credentials import OAuthCredentials -from pyzeebe.exceptions import InvalidOAuthCredentials +from pyzeebe.errors import InvalidOAuthCredentialsError def test_get_access_token(): @@ -27,6 +27,6 @@ def test_get_invalid_access_token(): with patch("requests_oauthlib.OAuth2Session.post") as post_mock: post_mock.side_effect = HTTPError() - with pytest.raises(InvalidOAuthCredentials): + with pytest.raises(InvalidOAuthCredentialsError): OAuthCredentials.get_access_token(url=f"https://{str(uuid4())}/oauth/token", client_id=str(uuid4()), client_secret=str(uuid4()), audience=str(uuid4())) diff --git a/tests/unit/decorators/__init__.py b/tests/unit/decorators/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/unit/decorators/zeebe_decorator_base_test.py b/tests/unit/decorators/zeebe_decorator_base_test.py deleted file mode 100644 index 304ed4cd..00000000 --- a/tests/unit/decorators/zeebe_decorator_base_test.py +++ /dev/null @@ -1,50 +0,0 @@ -from pyzeebe.decorators.zeebe_decorator_base import ZeebeDecoratorBase -from tests.unit.utils.random_utils import random_job - - -def test_add_before(): - base_decorator = ZeebeDecoratorBase() - base_decorator.before(lambda x: x) - assert len(base_decorator._before) == 1 - - -def test_add_after(): - base_decorator = ZeebeDecoratorBase() - base_decorator.after(lambda x: x) - assert len(base_decorator._after) == 1 - - -def test_add_before_plus_constructor(): - def constructor_decorator(x): - return x - - def function_decorator(x): - return x - - job = random_job() - - assert constructor_decorator(job) == job - assert function_decorator(job) == job - - base_decorator = ZeebeDecoratorBase(before=[constructor_decorator]) - base_decorator.before(function_decorator) - assert len(base_decorator._before) == 2 - assert base_decorator._before == [constructor_decorator, function_decorator] - - -def test_add_after_plus_constructor(): - def constructor_decorator(x): - return x - - def function_decorator(x): - return x - - job = random_job() - - assert constructor_decorator(job) == job - assert function_decorator(job) == job - - base_decorator = ZeebeDecoratorBase(after=[constructor_decorator]) - base_decorator.after(function_decorator) - assert len(base_decorator._after) == 2 - assert base_decorator._after == [constructor_decorator, function_decorator] diff --git a/tests/unit/grpc_internals/zeebe_adapter_base_test.py b/tests/unit/grpc_internals/zeebe_adapter_base_test.py index e5c5b0ec..bea811b5 100644 --- a/tests/unit/grpc_internals/zeebe_adapter_base_test.py +++ b/tests/unit/grpc_internals/zeebe_adapter_base_test.py @@ -7,7 +7,7 @@ from pyzeebe.credentials.camunda_cloud_credentials import CamundaCloudCredentials from pyzeebe.credentials.oauth_credentials import OAuthCredentials -from pyzeebe.exceptions import ZeebeBackPressure, ZeebeGatewayUnavailable, ZeebeInternalError +from pyzeebe.errors import ZeebeBackPressureError, ZeebeGatewayUnavailableError, ZeebeInternalError from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase from tests.unit.utils.grpc_utils import GRPCStatusCode from tests.unit.utils.random_utils import RANDOM_RANGE @@ -143,14 +143,14 @@ def test_common_zeebe_grpc_error_internal(zeebe_adapter): def test_common_zeebe_grpc_error_back_pressure(zeebe_adapter): error = grpc.RpcError() error._state = GRPCStatusCode(grpc.StatusCode.RESOURCE_EXHAUSTED) - with pytest.raises(ZeebeBackPressure): + with pytest.raises(ZeebeBackPressureError): zeebe_adapter._common_zeebe_grpc_errors(error) def test_common_zeebe_grpc_error_gateway_unavailable(zeebe_adapter): error = grpc.RpcError() error._state = GRPCStatusCode(grpc.StatusCode.UNAVAILABLE) - with pytest.raises(ZeebeGatewayUnavailable): + with pytest.raises(ZeebeGatewayUnavailableError): zeebe_adapter._common_zeebe_grpc_errors(error) @@ -166,7 +166,7 @@ def test_close_after_retried_unavailable(zeebe_adapter): error._state = GRPCStatusCode(grpc.StatusCode.UNAVAILABLE) zeebe_adapter._close = MagicMock() zeebe_adapter._max_connection_retries = 1 - with pytest.raises(ZeebeGatewayUnavailable): + with pytest.raises(ZeebeGatewayUnavailableError): zeebe_adapter._common_zeebe_grpc_errors(error) zeebe_adapter._close.assert_called_once() diff --git a/tests/unit/grpc_internals/zeebe_job_adapter_test.py b/tests/unit/grpc_internals/zeebe_job_adapter_test.py index 1551f26e..79418927 100644 --- a/tests/unit/grpc_internals/zeebe_job_adapter_test.py +++ b/tests/unit/grpc_internals/zeebe_job_adapter_test.py @@ -6,22 +6,16 @@ import pytest from zeebe_grpc.gateway_pb2 import * -from pyzeebe.exceptions import ActivateJobsRequestInvalid, JobAlreadyDeactivated, JobNotFound +from pyzeebe.errors import ActivateJobsRequestInvalidError, JobAlreadyDeactivatedError, JobNotFoundError from pyzeebe.job.job import Job from pyzeebe.task.task import Task from tests.unit.utils.grpc_utils import GRPCStatusCode from tests.unit.utils.random_utils import RANDOM_RANGE, random_job -def create_random_task_and_activate(grpc_servicer, task_type: str = None) -> str: - if task_type: - mock_task_type = task_type - else: - mock_task_type = str(uuid4()) - task = Task(task_type=mock_task_type, task_handler=lambda x: x, exception_handler=lambda x: x) +def activate_task(grpc_servicer, task: Task): job = random_job(task) grpc_servicer.active_jobs[job.key] = job - return mock_task_type def get_first_active_job(task_type, zeebe_adapter) -> Job: @@ -29,14 +23,14 @@ def get_first_active_job(task_type, zeebe_adapter) -> Job: timeout=100, variables_to_fetch=[], worker=str(uuid4()))) -def test_activate_jobs(zeebe_adapter, grpc_servicer): - task_type = create_random_task_and_activate(grpc_servicer) +def test_activate_jobs(zeebe_adapter, grpc_servicer, task): + activate_task(grpc_servicer, task) active_jobs_count = randint(4, 100) counter = 0 for i in range(0, active_jobs_count): - create_random_task_and_activate(grpc_servicer, task_type) + activate_task(grpc_servicer, task) - for job in zeebe_adapter.activate_jobs(task_type=task_type, worker=str(uuid4()), timeout=randint(10, 100), + for job in zeebe_adapter.activate_jobs(task_type=task.type, worker=str(uuid4()), timeout=randint(10, 100), request_timeout=100, max_jobs_to_activate=1, variables_to_fetch=[]): counter += 1 assert isinstance(job, Job) @@ -44,26 +38,26 @@ def test_activate_jobs(zeebe_adapter, grpc_servicer): def test_activate_jobs_invalid_worker(zeebe_adapter): - with pytest.raises(ActivateJobsRequestInvalid): + with pytest.raises(ActivateJobsRequestInvalidError): next(zeebe_adapter.activate_jobs(task_type=str(uuid4()), worker=None, timeout=randint(10, 100), request_timeout=100, max_jobs_to_activate=1, variables_to_fetch=[])) def test_activate_jobs_invalid_job_timeout(zeebe_adapter): - with pytest.raises(ActivateJobsRequestInvalid): + with pytest.raises(ActivateJobsRequestInvalidError): next(zeebe_adapter.activate_jobs(task_type=str(uuid4()), worker=str(uuid4()), timeout=0, request_timeout=100, max_jobs_to_activate=1, variables_to_fetch=[])) def test_activate_jobs_invalid_task_type(zeebe_adapter): - with pytest.raises(ActivateJobsRequestInvalid): + with pytest.raises(ActivateJobsRequestInvalidError): next(zeebe_adapter.activate_jobs(task_type=None, worker=str(uuid4()), timeout=randint(10, 100), request_timeout=100, max_jobs_to_activate=1, variables_to_fetch=[])) def test_activate_jobs_invalid_max_jobs(zeebe_adapter): - with pytest.raises(ActivateJobsRequestInvalid): + with pytest.raises(ActivateJobsRequestInvalidError): next(zeebe_adapter.activate_jobs(task_type=str(uuid4()), worker=str(uuid4()), timeout=randint(10, 100), request_timeout=100, max_jobs_to_activate=0, variables_to_fetch=[])) @@ -82,103 +76,87 @@ def test_activate_jobs_common_errors_called(zeebe_adapter): zeebe_adapter._common_zeebe_grpc_errors.assert_called() -def test_complete_job(zeebe_adapter, grpc_servicer): - task_type = create_random_task_and_activate(grpc_servicer) - job = get_first_active_job(task_type, zeebe_adapter) - response = zeebe_adapter.complete_job(job_key=job.key, variables={}) +def test_complete_job(zeebe_adapter, first_active_job: Job): + response = zeebe_adapter.complete_job(job_key=first_active_job.key, variables={}) assert isinstance(response, CompleteJobResponse) def test_complete_job_not_found(zeebe_adapter): - with pytest.raises(JobNotFound): + with pytest.raises(JobNotFoundError): zeebe_adapter.complete_job(job_key=randint(0, RANDOM_RANGE), variables={}) -def test_complete_job_already_completed(zeebe_adapter, grpc_servicer): - task_type = create_random_task_and_activate(grpc_servicer) - job = get_first_active_job(task_type, zeebe_adapter) - zeebe_adapter.complete_job(job_key=job.key, variables={}) - with pytest.raises(JobAlreadyDeactivated): - zeebe_adapter.complete_job(job_key=job.key, variables={}) +def test_complete_job_already_completed(zeebe_adapter, first_active_job: Job): + zeebe_adapter.complete_job(job_key=first_active_job.key, variables={}) + with pytest.raises(JobAlreadyDeactivatedError): + zeebe_adapter.complete_job(job_key=first_active_job.key, variables={}) -def test_complete_job_common_errors_called(zeebe_adapter, grpc_servicer): +def test_complete_job_common_errors_called(zeebe_adapter, first_active_job: Job): zeebe_adapter._common_zeebe_grpc_errors = MagicMock() error = grpc.RpcError() error._state = GRPCStatusCode(grpc.StatusCode.INTERNAL) zeebe_adapter._gateway_stub.CompleteJob = MagicMock(side_effect=error) - task_type = create_random_task_and_activate(grpc_servicer) - job = get_first_active_job(task_type, zeebe_adapter) - zeebe_adapter.complete_job(job_key=job.key, variables={}) + zeebe_adapter.complete_job(job_key=first_active_job.key, variables={}) zeebe_adapter._common_zeebe_grpc_errors.assert_called() -def test_fail_job(zeebe_adapter, grpc_servicer): - task_type = create_random_task_and_activate(grpc_servicer) - job = get_first_active_job(task_type, zeebe_adapter) - response = zeebe_adapter.fail_job(job_key=job.key, message=str(uuid4())) +def test_fail_job(zeebe_adapter, first_active_job: Job): + response = zeebe_adapter.fail_job(job_key=first_active_job.key, message=str(uuid4())) assert isinstance(response, FailJobResponse) def test_fail_job_not_found(zeebe_adapter): - with pytest.raises(JobNotFound): + with pytest.raises(JobNotFoundError): zeebe_adapter.fail_job(job_key=randint(0, RANDOM_RANGE), message=str(uuid4())) -def test_fail_job_already_failed(zeebe_adapter, grpc_servicer): - task_type = create_random_task_and_activate(grpc_servicer) - job = get_first_active_job(task_type, zeebe_adapter) - zeebe_adapter.fail_job(job_key=job.key, message=str(uuid4())) - with pytest.raises(JobAlreadyDeactivated): - zeebe_adapter.fail_job(job_key=job.key, message=str(uuid4())) +def test_fail_job_already_failed(zeebe_adapter, first_active_job: Job): + zeebe_adapter.fail_job(job_key=first_active_job.key, message=str(uuid4())) + with pytest.raises(JobAlreadyDeactivatedError): + zeebe_adapter.fail_job(job_key=first_active_job.key, message=str(uuid4())) -def test_fail_job_common_errors_called(zeebe_adapter, grpc_servicer): +def test_fail_job_common_errors_called(zeebe_adapter, first_active_job: Job): zeebe_adapter._common_zeebe_grpc_errors = MagicMock() error = grpc.RpcError() error._state = GRPCStatusCode(grpc.StatusCode.INTERNAL) zeebe_adapter._gateway_stub.FailJob = MagicMock(side_effect=error) - task_type = create_random_task_and_activate(grpc_servicer) - job = get_first_active_job(task_type, zeebe_adapter) - zeebe_adapter.fail_job(job_key=job.key, message=str(uuid4())) + zeebe_adapter.fail_job(job_key=first_active_job.key, message=str(uuid4())) zeebe_adapter._common_zeebe_grpc_errors.assert_called() -def test_throw_error(zeebe_adapter, grpc_servicer): - task_type = create_random_task_and_activate(grpc_servicer) - job = get_first_active_job(task_type, zeebe_adapter) - response = zeebe_adapter.throw_error(job_key=job.key, message=str(uuid4())) +def test_throw_error(zeebe_adapter, first_active_job: Job): + response = zeebe_adapter.throw_error(job_key=first_active_job.key, message=str(uuid4())) assert isinstance(response, ThrowErrorResponse) def test_throw_error_job_not_found(zeebe_adapter): - with pytest.raises(JobNotFound): + with pytest.raises(JobNotFoundError): zeebe_adapter.throw_error(job_key=randint(0, RANDOM_RANGE), message=str(uuid4())) -def test_throw_error_already_thrown(zeebe_adapter, grpc_servicer): - task_type = create_random_task_and_activate(grpc_servicer) - job = get_first_active_job(task_type, zeebe_adapter) - zeebe_adapter.throw_error(job_key=job.key, message=str(uuid4())) - with pytest.raises(JobAlreadyDeactivated): - zeebe_adapter.throw_error(job_key=job.key, message=str(uuid4())) +def test_throw_error_already_thrown(zeebe_adapter, first_active_job: Job): + zeebe_adapter.throw_error(job_key=first_active_job.key, message=str(uuid4())) + with pytest.raises(JobAlreadyDeactivatedError): + zeebe_adapter.throw_error(job_key=first_active_job.key, message=str(uuid4())) -def test_throw_error_common_errors_called(zeebe_adapter, grpc_servicer): +def test_throw_error_common_errors_called(zeebe_adapter, grpc_servicer, task): zeebe_adapter._common_zeebe_grpc_errors = MagicMock() error = grpc.RpcError() error._state = GRPCStatusCode(grpc.StatusCode.INTERNAL) zeebe_adapter._gateway_stub.ThrowError = MagicMock(side_effect=error) - task_type = create_random_task_and_activate(grpc_servicer) - job = get_first_active_job(task_type, zeebe_adapter) + activate_task(grpc_servicer, task) + job = get_first_active_job(task.type, zeebe_adapter) zeebe_adapter.throw_error(job_key=job.key, message=str(uuid4())) zeebe_adapter._common_zeebe_grpc_errors.assert_called() diff --git a/tests/unit/grpc_internals/zeebe_message_adapter_test.py b/tests/unit/grpc_internals/zeebe_message_adapter_test.py index 43ccb03e..8f4dab66 100644 --- a/tests/unit/grpc_internals/zeebe_message_adapter_test.py +++ b/tests/unit/grpc_internals/zeebe_message_adapter_test.py @@ -6,7 +6,7 @@ import pytest from zeebe_grpc.gateway_pb2 import * -from pyzeebe.exceptions import MessageAlreadyExists +from pyzeebe.errors import MessageAlreadyExistsError from tests.unit.utils.grpc_utils import GRPCStatusCode from tests.unit.utils.random_utils import RANDOM_RANGE @@ -37,7 +37,7 @@ def test_punlish_message_invalid_time_to_live(zeebe_adapter): def test_publish_message_already_exists(zeebe_adapter): message_id = str(uuid4()) - with pytest.raises(MessageAlreadyExists): + with pytest.raises(MessageAlreadyExistsError): zeebe_adapter.publish_message(message_id=message_id, name=str(uuid4()), variables={}, correlation_key=str(uuid4()), time_to_live_in_milliseconds=randint(0, RANDOM_RANGE)) diff --git a/tests/unit/grpc_internals/zeebe_workflow_adapter_test.py b/tests/unit/grpc_internals/zeebe_workflow_adapter_test.py index e6efa47b..fdf8da1d 100644 --- a/tests/unit/grpc_internals/zeebe_workflow_adapter_test.py +++ b/tests/unit/grpc_internals/zeebe_workflow_adapter_test.py @@ -6,8 +6,8 @@ import grpc import pytest -from pyzeebe.exceptions import InvalidJSON, WorkflowNotFound, WorkflowInstanceNotFound, WorkflowHasNoStartEvent, \ - WorkflowInvalid +from pyzeebe.errors import InvalidJSONError, WorkflowNotFoundError, WorkflowInstanceNotFoundError, WorkflowHasNoStartEventError, \ + WorkflowInvalidError from tests.unit.utils.grpc_utils import GRPCStatusCode from tests.unit.utils.random_utils import RANDOM_RANGE @@ -79,7 +79,7 @@ def test_cancel_workflow_instance_already_cancelled(zeebe_adapter): zeebe_adapter._gateway_stub.CancelWorkflowInstance = MagicMock(side_effect=error) - with pytest.raises(WorkflowInstanceNotFound): + with pytest.raises(WorkflowInstanceNotFoundError): zeebe_adapter.cancel_workflow_instance(workflow_instance_key=randint(0, RANDOM_RANGE)) @@ -104,7 +104,7 @@ def test_deploy_workflow_workflow_invalid(zeebe_adapter): zeebe_adapter._gateway_stub.DeployWorkflow = MagicMock(side_effect=error) - with pytest.raises(WorkflowInvalid): + with pytest.raises(WorkflowInvalidError): zeebe_adapter.deploy_workflow() @@ -134,21 +134,21 @@ def test_get_workflow_request_object(zeebe_adapter): def test_create_workflow_errors_not_found(zeebe_adapter): error = grpc.RpcError() error._state = GRPCStatusCode(grpc.StatusCode.NOT_FOUND) - with pytest.raises(WorkflowNotFound): + with pytest.raises(WorkflowNotFoundError): zeebe_adapter._create_workflow_errors(error, str(uuid4()), randint(0, 10, ), {}) def test_create_workflow_errors_invalid_json(zeebe_adapter): error = grpc.RpcError() error._state = GRPCStatusCode(grpc.StatusCode.INVALID_ARGUMENT) - with pytest.raises(InvalidJSON): + with pytest.raises(InvalidJSONError): zeebe_adapter._create_workflow_errors(error, str(uuid4()), randint(0, 10, ), {}) def test_create_workflow_errors_workflow_has_no_start_event(zeebe_adapter): error = grpc.RpcError() error._state = GRPCStatusCode(grpc.StatusCode.FAILED_PRECONDITION) - with pytest.raises(WorkflowHasNoStartEvent): + with pytest.raises(WorkflowHasNoStartEventError): zeebe_adapter._create_workflow_errors(error, str(uuid4()), randint(0, 10, ), {}) diff --git a/tests/unit/job/job_test.py b/tests/unit/job/job_test.py index 49a5daed..6f190ee3 100644 --- a/tests/unit/job/job_test.py +++ b/tests/unit/job/job_test.py @@ -3,7 +3,7 @@ import pytest -from pyzeebe.exceptions import NoZeebeAdapter +from pyzeebe.errors import NoZeebeAdapterError def test_success(job_with_adapter): @@ -13,7 +13,7 @@ def test_success(job_with_adapter): def test_success_no_zeebe_adapter(job_without_adapter): - with pytest.raises(NoZeebeAdapter): + with pytest.raises(NoZeebeAdapterError): job_without_adapter.set_success_status() @@ -25,7 +25,7 @@ def test_error(job_with_adapter): def test_error_no_zeebe_adapter(job_without_adapter): - with pytest.raises(NoZeebeAdapter): + with pytest.raises(NoZeebeAdapterError): message = str(uuid4()) job_without_adapter.set_error_status(message) @@ -38,6 +38,6 @@ def test_failure(job_with_adapter): def test_failure_no_zeebe_adapter(job_without_adapter): - with pytest.raises(NoZeebeAdapter): + with pytest.raises(NoZeebeAdapterError): message = str(uuid4()) job_without_adapter.set_failure_status(message) diff --git a/tests/unit/task/task_builder_test.py b/tests/unit/task/task_builder_test.py new file mode 100644 index 00000000..2e398fd3 --- /dev/null +++ b/tests/unit/task/task_builder_test.py @@ -0,0 +1,170 @@ +from typing import Callable, List + +import pytest + +from pyzeebe import Job, TaskDecorator +from pyzeebe.task import task_builder +from pyzeebe.task.task import Task +from pyzeebe.task.task_config import TaskConfig +from tests.unit.utils import dummy_functions + + +class TestBuildTask: + @pytest.fixture + def single_value_task_config(self, task_config: TaskConfig): + task_config.single_value = True + task_config.variable_name = "y" + + return task_config + + def test_returns_task(self, original_task_function: Callable, task_config: TaskConfig): + task = task_builder.build_task(original_task_function, task_config) + + assert isinstance(task, Task) + + def test_single_value_func(self, single_value_task_config: TaskConfig, mocked_job_with_adapter: Job): + task = task_builder.build_task(lambda: 1, single_value_task_config) + job = task.job_handler(mocked_job_with_adapter) + + assert job.variables.pop("y") == 1 + + def test_no_additional_variables_are_added_to_result(self, single_value_task_config: TaskConfig, mocked_job_with_adapter: Job): + mocked_job_with_adapter.variables = {"x": 1} + + task = task_builder.build_task(lambda x: x, single_value_task_config) + job = task.job_handler(mocked_job_with_adapter) + + assert len(job.variables.keys()) == 1 + assert set(job.variables.keys()) == {"y"} + + +class TestBuildJobHandler: + def test_returned_task_is_callable(self, original_task_function: Callable, task_config: TaskConfig): + task = task_builder.build_job_handler( + original_task_function, task_config) + assert callable(task) + + def test_exception_handler_called(self, original_task_function: Callable, task_config: TaskConfig, + mocked_job_with_adapter: Job): + exception = Exception() + original_task_function.side_effect = exception + job_handler = task_builder.build_job_handler( + original_task_function, task_config) + + job_handler(mocked_job_with_adapter) + + task_config.exception_handler.assert_called_with( + exception, mocked_job_with_adapter) + + def test_parameters_are_provided_to_task(self, original_task_function: Callable, task_config: TaskConfig, + mocked_job_with_adapter: Job): + mocked_job_with_adapter.variables = {"x": 1} + job_handler = task_builder.build_job_handler( + original_task_function, task_config) + + job_handler(mocked_job_with_adapter) + + original_task_function.assert_called_with(x=1) + + def test_variables_are_added_to_result(self, original_task_function: Callable, task_config: TaskConfig, + mocked_job_with_adapter: Job): + original_task_function.return_value = {"x": 1} + job_handler = task_builder.build_job_handler( + original_task_function, task_config) + + job = job_handler(mocked_job_with_adapter) + + assert job.variables.pop("x") == 1 + + def test_complete_job_called(self, original_task_function: Callable, task_config: TaskConfig, + mocked_job_with_adapter: Job): + job_handler = task_builder.build_job_handler( + original_task_function, task_config) + + job_handler(mocked_job_with_adapter) + + mocked_job_with_adapter.set_success_status.assert_called_once() + + def test_returned_task_runs_original_function(self, original_task_function: Callable, task_config: TaskConfig, + mocked_job_with_adapter: Job): + job_handler = task_builder.build_job_handler( + original_task_function, task_config) + + job_handler(mocked_job_with_adapter) + + original_task_function.assert_called_once() + + def test_before_decorator_called(self, original_task_function: Callable, decorator: TaskDecorator, + task_config: TaskConfig, + mocked_job_with_adapter: Job): + task_config.before.append(decorator) + job_handler = task_builder.build_job_handler( + original_task_function, task_config) + + job_handler(mocked_job_with_adapter) + + task_config.before.pop().assert_called_once() + + def test_after_decorator_called(self, original_task_function: Callable, decorator: TaskDecorator, + task_config: TaskConfig, + mocked_job_with_adapter: Job): + task_config.after.append(decorator) + job_handler = task_builder.build_job_handler( + original_task_function, task_config) + + job_handler(mocked_job_with_adapter) + + task_config.after.pop().assert_called_once() + + def test_failing_decorator_continues(self, original_task_function: Callable, decorator: TaskDecorator, + task_config: TaskConfig, mocked_job_with_adapter: Job): + decorator.side_effect = Exception() + task_config.before.append(decorator) + job_handler = task_builder.build_job_handler( + original_task_function, task_config) + + job_handler(mocked_job_with_adapter) + + decorator.assert_called_once() + task_config.exception_handler.assert_not_called() + + def test_decorator_variables_are_added(self, original_task_function: Callable, decorator: TaskDecorator, + task_config: TaskConfig, mocked_job_with_adapter: Job): + mocked_job_with_adapter.variables = {"x": 2} + decorator_return_value = mocked_job_with_adapter + decorator.return_value = decorator_return_value + job_handler = task_builder.build_job_handler( + original_task_function, task_config) + + job = job_handler(mocked_job_with_adapter) + + assert "x" in job.variables + + +class TestConvertToDictFunction: + def test_converting_to_dict(self): + dict_function = task_builder.convert_to_dict_function(lambda x: x, "x") + + assert {"x": 1} == dict_function(1) + + +class TestGetFunctionParameters: + @pytest.mark.parametrize("fn,expected", [ + (dummy_functions.no_param, []), + (dummy_functions.one_param, ["x"]), + (dummy_functions.multiple_params, ["x", "y", "z"]), + (dummy_functions.one_keyword_param, ["x"]), + (dummy_functions.multiple_keyword_param, ["x", "y", "z"]), + (dummy_functions.positional_and_keyword_params, ["x", "y"]), + (dummy_functions.args_param, []), + (dummy_functions.kwargs_param, []), + (dummy_functions.standard_named_params, ["args", "kwargs"]), + (dummy_functions.lambda_no_params, []), + (dummy_functions.lambda_one_param, ["x"]), + (dummy_functions.lambda_multiple_params, ["x", "y", "z"]), + (dummy_functions.lambda_one_keyword_param, ["x"]), + (dummy_functions.lambda_multiple_keyword_params, ["x", "y", "z"]), + (dummy_functions.lambda_positional_and_keyword_params, ["x", "y"]) + ]) + def test_get_params(self, fn: Callable, expected: List[str]): + assert task_builder.get_parameters_from_function(fn) == expected diff --git a/tests/unit/task/task_test.py b/tests/unit/task/task_test.py deleted file mode 100644 index a3a7a6f1..00000000 --- a/tests/unit/task/task_test.py +++ /dev/null @@ -1,43 +0,0 @@ -import uuid - -from pyzeebe.task.task import Task - - -def test_add_before(): - base_decorator = Task(task_type=str(uuid.uuid4()), task_handler=lambda x: x, exception_handler=lambda x: x) - base_decorator.before(lambda x: x) - assert len(base_decorator._before) == 1 - - -def test_add_after(): - base_decorator = Task(task_type=str(uuid.uuid4()), task_handler=lambda x: x, exception_handler=lambda x: x) - base_decorator.after(lambda x: x) - assert len(base_decorator._after) == 1 - - -def test_add_before_plus_constructor(): - def constructor_decorator(x): - return x + 1 - - def function_decorator(x): - return x - - base_decorator = Task(task_type=str(uuid.uuid4()), task_handler=lambda x: x, exception_handler=lambda x: x, - before=[constructor_decorator]) - base_decorator.before(function_decorator) - assert len(base_decorator._before) == 2 - assert base_decorator._before == [constructor_decorator, function_decorator] - - -def test_add_after_plus_constructor(): - def constructor_decorator(x): - return x + 1 - - def function_decorator(x): - return x - - base_decorator = Task(task_type=str(uuid.uuid4()), task_handler=lambda x: x, exception_handler=lambda x: x, - after=[constructor_decorator]) - base_decorator.after(function_decorator) - assert len(base_decorator._after) == 2 - assert base_decorator._after == [constructor_decorator, function_decorator] diff --git a/tests/unit/utils/dummy_functions.py b/tests/unit/utils/dummy_functions.py new file mode 100644 index 00000000..b0768eb6 --- /dev/null +++ b/tests/unit/utils/dummy_functions.py @@ -0,0 +1,42 @@ +def no_param(): + pass + + +def one_param(x): + pass + + +def multiple_params(x, y, z): + pass + + +def one_keyword_param(x=0): + pass + + +def multiple_keyword_param(x=0, y=0, z=0): + pass + + +def positional_and_keyword_params(x, y=0): + pass + + +def args_param(*args): + pass + + +def kwargs_param(**kwargs): + pass + + +def standard_named_params(args, kwargs): + pass + + +lambda_no_params = lambda: None +lambda_one_param = lambda x: None +lambda_multiple_params = lambda x, y, z: None +lambda_one_keyword_param = lambda x=0: None +lambda_multiple_keyword_params = lambda x=0, y=0, z=0: None +lambda_positional_and_keyword_params = lambda x, y=0: None diff --git a/tests/unit/utils/random_utils.py b/tests/unit/utils/random_utils.py index b0730717..4f95a155 100644 --- a/tests/unit/utils/random_utils.py +++ b/tests/unit/utils/random_utils.py @@ -3,13 +3,15 @@ from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter from pyzeebe.job.job import Job +from pyzeebe.task import task_builder from pyzeebe.task.task import Task +from pyzeebe.task.task_config import TaskConfig RANDOM_RANGE = 1000000000 -def random_job(task: Task = Task(task_type="test", task_handler=lambda x: {"x": x}, - exception_handler=lambda x, y, z: x), zeebe_adapter: ZeebeAdapter = None) -> Job: +def random_job(task: Task = task_builder.build_task(lambda x: {"x": x}, TaskConfig("test", lambda: None, 10000, 32, [], False, "", [], [])), + zeebe_adapter: ZeebeAdapter = None) -> Job: return Job(_type=task.type, key=randint(0, RANDOM_RANGE), worker=str(uuid4()), retries=randint(0, 10), workflow_instance_key=randint(0, RANDOM_RANGE), bpmn_process_id=str(uuid4()), workflow_definition_version=randint(0, 100), diff --git a/tests/unit/worker/task_handler_test.py b/tests/unit/worker/task_handler_test.py deleted file mode 100644 index e6ae28e7..00000000 --- a/tests/unit/worker/task_handler_test.py +++ /dev/null @@ -1,178 +0,0 @@ -from unittest.mock import patch, MagicMock -from uuid import uuid4 - -import pytest - -from pyzeebe.exceptions import NoVariableNameGiven, TaskNotFound, DuplicateTaskType -from pyzeebe.task.task import Task -from pyzeebe.worker.task_handler import default_exception_handler -from tests.unit.utils.random_utils import randint - - -def test_get_task(task_handler, task): - task_handler.tasks.append(task) - found_task = task_handler.get_task(task.type) - assert isinstance(found_task, Task) - assert found_task == task - - -def test_get_fake_task(task_handler): - with pytest.raises(TaskNotFound): - task_handler.get_task(str(uuid4())) - - -def test_get_task_index(task_handler, task): - task_handler.tasks.append(task) - index = task_handler._get_task_index(task.type) - assert isinstance(index, int) - assert task_handler.tasks[index] == task - - -def test_get_task_and_index(task_handler, task): - task_handler.tasks.append(task) - found_task, index = task_handler._get_task_and_index(task.type) - assert isinstance(index, int) - assert task_handler.tasks[index] == task - assert isinstance(found_task, Task) - assert found_task == task - - -def test_remove_task(task_handler, task): - task_handler.tasks.append(task) - assert task_handler.remove_task(task.type) is not None - assert task not in task_handler.tasks - - -def test_remove_task_from_many(task_handler, task): - task_handler.tasks.append(task) - - for i in range(0, randint(0, 100)): - task_handler.tasks.append(Task(str(uuid4()), lambda x: x, lambda x: x)) - assert task_handler.remove_task(task.type) is not None - assert task not in task_handler.tasks - - -def test_remove_fake_task(task_handler): - with pytest.raises(TaskNotFound): - task_handler.remove_task(str(uuid4())) - - -def test_add_dict_task(task_handler): - task_handler._dict_task = MagicMock() - - @task_handler.task(task_type=str(uuid4())) - def dict_task(): - return {} - - task_handler._dict_task.assert_called() - - -def test_add_non_dict_task(task_handler): - task_handler._non_dict_task = MagicMock() - - @task_handler.task(task_type=str(uuid4()), single_value=True, variable_name=str(uuid4())) - def non_dict_task(): - return True - - task_handler._non_dict_task.assert_called() - - -def test_add_non_dict_task_without_variable_name(task_handler): - with pytest.raises(NoVariableNameGiven): - @task_handler.task(task_type=str(uuid4()), single_value=True) - def non_dict_task(): - return True - - -def test_fn_to_dict(task_handler): - variable_name = str(uuid4()) - - def no_dict_fn(x): - return x + 1 - - dict_fn = task_handler._single_value_function_to_dict(fn=no_dict_fn, variable_name=variable_name) - - variable = randint(0, 1000) - assert dict_fn(variable) == {variable_name: variable + 1} - - -def test_default_exception_handler(job_without_adapter): - with patch("pyzeebe.worker.task_handler.logger.warning") as logging_mock: - with patch("pyzeebe.job.job.Job.set_failure_status") as failure_mock: - failure_mock.return_value = None - default_exception_handler(Exception(), job_without_adapter) - - failure_mock.assert_called() - logging_mock.assert_called() - - -def test_get_parameters_from_function_no_parameters(task_handler): - def no_parameters(): - pass - - assert task_handler._get_parameters_from_function(no_parameters) == [] - - -def test_get_parameters_from_function_one_positional(task_handler): - def one_pos_func(x): - pass - - assert task_handler._get_parameters_from_function(one_pos_func) == ["x"] - - -def test_get_parameters_from_function_multiple_positional(task_handler): - def mul_pos_func(x, y, z): - pass - - assert task_handler._get_parameters_from_function(mul_pos_func) == ["x", "y", "z"] - - -def test_get_parameters_from_function_one_keyword(task_handler): - def one_key_func(x=0): - pass - - assert task_handler._get_parameters_from_function(one_key_func) == ["x"] - - -def test_get_parameters_from_function_multiple_keywords(task_handler): - def mul_key_func(x=0, y=0, z=0): - pass - - assert task_handler._get_parameters_from_function(mul_key_func) == ["x", "y", "z"] - - -def test_get_parameters_from_function_positional_and_keyword(task_handler): - def pos_and_key_func(x, y=0): - pass - - assert task_handler._get_parameters_from_function(pos_and_key_func) == ["x", "y"] - - -def test_get_parameters_from_function_args(task_handler): - def args_func(*args): - pass - - assert task_handler._get_parameters_from_function(args_func) == [] - - -def test_get_parameters_from_function_kwargs(task_handler): - def kwargs_func(**kwargs): - pass - - assert task_handler._get_parameters_from_function(kwargs_func) == [] - - -def test_get_parameters_from_function_lambda(task_handler): - my_func = lambda x: x - - assert task_handler._get_parameters_from_function(my_func) == ["x"] - - -def test_check_is_task_duplicate_with_duplicate(task_handler, task): - task_handler.tasks.append(task) - with pytest.raises(DuplicateTaskType): - task_handler._is_task_duplicate(task.type) - - -def test_check_is_task_duplicate_no_duplicate(task_handler, task): - task_handler.tasks.append(task) diff --git a/tests/unit/worker/task_router_test.py b/tests/unit/worker/task_router_test.py index 7fee0e61..92d42971 100644 --- a/tests/unit/worker/task_router_test.py +++ b/tests/unit/worker/task_router_test.py @@ -1,126 +1,109 @@ -from random import randint from unittest.mock import patch from uuid import uuid4 +import pytest +from pyzeebe import TaskDecorator +from pyzeebe.errors import DuplicateTaskTypeError, TaskNotFoundError from pyzeebe.job.job import Job +from pyzeebe.task.task import Task +from pyzeebe.worker.task_router import (ZeebeTaskRouter, + default_exception_handler) +from tests.unit.utils.random_utils import randint -def decorator(job: Job) -> Job: - return job +def test_get_task(router: ZeebeTaskRouter, task: Task): + router.tasks.append(task) + found_task = router.get_task(task.type) -def test_add_task_through_decorator(router): - task_type = str(uuid4()) - timeout = randint(0, 10000) - max_jobs_to_activate = randint(0, 1000) + assert found_task == task - @router.task(task_type=task_type, timeout=timeout, max_jobs_to_activate=max_jobs_to_activate) - def example_test_task(x): - return {"x": x} - assert len(router.tasks) == 1 +def test_get_fake_task(router: ZeebeTaskRouter): + with pytest.raises(TaskNotFoundError): + router.get_task(str(uuid4())) - variable = str(uuid4()) - assert example_test_task(variable) == {"x": variable} - task = router.get_task(task_type) - assert task is not None +def test_get_task_index(router: ZeebeTaskRouter, task: Task): + router.tasks.append(task) - variable = str(uuid4()) - assert task.inner_function(variable) == {"x": variable} - assert task.variables_to_fetch == ["x"] - assert task.timeout == timeout - assert task.max_jobs_to_activate == max_jobs_to_activate + index = router._get_task_index(task.type) + assert router.tasks[index] == task -def test_router_before_decorator(router): - task_type = str(uuid4()) - router.before(decorator) - @router.task(task_type=task_type) - def task_fn(x): - return {"x": x} +def test_get_task_and_index(router: ZeebeTaskRouter, task: Task): + router.tasks.append(task) - task = router.get_task(task_type) - assert task is not None - assert len(task._before) == 1 - assert len(task._after) == 0 + found_task, index = router._get_task_and_index(task.type) + assert router.tasks[index] == task + assert found_task == task -def test_router_after_before_multiple(router): - task_type = str(uuid4()) - router.before(decorator) - @router.task(task_type=task_type, before=[decorator]) - def task_fn(x): - return {"x": x} +def test_remove_task(router: ZeebeTaskRouter, task: Task): + router.tasks.append(task) - task = router.get_task(task_type) - assert task is not None - assert len(task._before) == 2 - assert len(task._after) == 0 + router.remove_task(task.type) + assert task not in router.tasks -def test_router_after_decorator(router): - task_type = str(uuid4()) - router.after(decorator) - @router.task(task_type=task_type) - def task_fn(x): - return {"x": x} +def test_remove_task_from_many(router: ZeebeTaskRouter, task: Task): + router.tasks.append(task) - task = router.get_task(task_type) - assert task is not None - assert len(task._after) == 1 - assert len(task._before) == 0 + for _ in range(1, randint(0, 100)): + @router.task(str(uuid4())) + def dummy_function(): + pass + router.remove_task(task.type) -def test_router_after_decorator_multiple(router): - task_type = str(uuid4()) - router.after(decorator) + assert task not in router.tasks - @router.task(task_type=task_type, after=[decorator]) - def task_fn(x): - return {"x": x} - task = router.get_task(task_type) - assert task is not None - assert len(task._after) == 2 - assert len(task._before) == 0 +def test_remove_fake_task(router: ZeebeTaskRouter): + with pytest.raises(TaskNotFoundError): + router.remove_task(str(uuid4())) -def test_router_non_dict_task(router): - with patch("pyzeebe.worker.task_handler.ZeebeTaskHandler._single_value_function_to_dict") as single_value_mock: - task_type = str(uuid4()) - variable_name = str(uuid4()) +def test_check_is_task_duplicate_with_duplicate(router: ZeebeTaskRouter, task: Task): + router.tasks.append(task) + with pytest.raises(DuplicateTaskTypeError): + router._is_task_duplicate(task.type) - @router.task(task_type=task_type, single_value=True, variable_name=variable_name) - def task_fn(x): - return {"x": x} - single_value_mock.assert_called_with(variable_name=variable_name, fn=task_fn) - assert len(router.tasks) == 1 +def test_no_duplicate_task_type_error_is_raised(router: ZeebeTaskRouter, task: Task): + router._is_task_duplicate(task.type) -def test_router_dict_task(router): - task_type = str(uuid4()) +def test_add_before_decorator(router: ZeebeTaskRouter, decorator: TaskDecorator): + router.before(decorator) - @router.task(task_type=task_type) - def task_fn(x): - return {"x": x} + assert len(router._before) == 1 - assert len(router.tasks) == 1 +def test_add_after_decorator(router: ZeebeTaskRouter, decorator: TaskDecorator): + router.after(decorator) -def test_add_decorators_to_task(router, task): - router._add_decorators_to_task(task, [decorator], [decorator]) - assert len(task._before) == 1 - assert len(task._after) == 1 + assert len(router._after) == 1 -def test_add_decorators_to_task_with_router_decorators(router, task): - router.before(decorator) - router.after(decorator) - router._add_decorators_to_task(task, [decorator], [decorator]) - assert len(task._before) == 2 - assert len(task._after) == 2 +def test_add_before_decorator_through_constructor(decorator: TaskDecorator): + router = ZeebeTaskRouter(before=[decorator]) + + assert len(router._before) == 1 + + +def test_add_after_decorator_through_constructor(decorator: TaskDecorator): + router = ZeebeTaskRouter(after=[decorator]) + + assert len(router._after) == 1 + + +def test_default_exception_handler_logs_a_warning(mocked_job_with_adapter: Job): + with patch("pyzeebe.worker.task_router.logger.warning") as logging_mock: + default_exception_handler(Exception(), mocked_job_with_adapter) + + mocked_job_with_adapter.set_failure_status.assert_called() + logging_mock.assert_called() diff --git a/tests/unit/worker/worker_test.py b/tests/unit/worker/worker_test.py index 9711418d..4af0cd69 100644 --- a/tests/unit/worker/worker_test.py +++ b/tests/unit/worker/worker_test.py @@ -1,178 +1,109 @@ -from random import randint +import time +from threading import Event as StopEvent +from typing import List from unittest.mock import patch, MagicMock from uuid import uuid4 -import time import pytest -from pyzeebe.exceptions import DuplicateTaskType, MaxConsecutiveTaskThreadError +from pyzeebe import TaskDecorator, ZeebeTaskRouter +from pyzeebe.errors import DuplicateTaskTypeError, MaxConsecutiveTaskThreadError from pyzeebe.job.job import Job +from pyzeebe.task.task import Task from pyzeebe.worker.worker import ZeebeWorker class TestAddTask: - def test_task_added(self, zeebe_worker, task): + def test_add_task(self, zeebe_worker: ZeebeWorker, task: Task): zeebe_worker._add_task(task) assert zeebe_worker.get_task(task.type) == task - def test_raises_on_duplicate(self, zeebe_worker, task): + def test_raises_on_duplicate(self, zeebe_worker: ZeebeWorker, task: Task): zeebe_worker._add_task(task) - with pytest.raises(DuplicateTaskType): + with pytest.raises(DuplicateTaskTypeError): zeebe_worker._add_task(task) - def test_only_one_task_added(self, zeebe_worker): + def test_only_one_task_added(self, zeebe_worker: ZeebeWorker): @zeebe_worker.task(str(uuid4())) - def _(): + def dummy_function(): pass assert len(zeebe_worker.tasks) == 1 - def test_task_type_saved(self, zeebe_worker, task): + def test_task_type_saved(self, zeebe_worker: ZeebeWorker, task: Task): zeebe_worker._add_task(task) assert zeebe_worker.get_task(task.type).type == task.type - def test_original_function_not_changed(self, zeebe_worker, task, job_from_task): - zeebe_worker._add_task(task) - - assert task.inner_function(**job_from_task.variables) == job_from_task.variables - - def test_task_handler_calls_original_function(self, zeebe_worker, task, job_from_task): - zeebe_worker._add_task(task) - - task.handler(job_from_task) - - task.inner_function.assert_called_once() - - def test_task_timeout_saved(self, zeebe_worker, task): - timeout = randint(0, 10000) - task.timeout = timeout - - zeebe_worker._add_task(task) - - assert zeebe_worker.get_task(task.type).timeout == timeout - - def test_task_max_jobs_saved(self, zeebe_worker, task): - max_jobs_to_activate = randint(0, 1000) - task.max_jobs_to_activate = max_jobs_to_activate - - zeebe_worker._add_task(task) - - assert zeebe_worker.get_task(task.type).max_jobs_to_activate == max_jobs_to_activate - - def test_variables_to_fetch_match_function_parameters(self, zeebe_worker, task_type): + def test_variables_to_fetch_match_function_parameters(self, zeebe_worker: ZeebeWorker, task_type: str): expected_variables_to_fetch = ["x"] @zeebe_worker.task(task_type) - def _(x): + def dummy_function(x): pass - assert zeebe_worker.get_task(task_type).variables_to_fetch == expected_variables_to_fetch - - def test_task_handler_is_callable(self, zeebe_worker, task): - zeebe_worker._add_task(task) - - assert callable(task.handler) - - def test_exception_handler_called(self, zeebe_worker, task, job_from_task): - task.inner_function.side_effect = Exception() - task.exception_handler = MagicMock() - zeebe_worker._add_task(task) - - task.handler(job_from_task) - - task.exception_handler.assert_called() + assert zeebe_worker.get_task( + task_type).config.variables_to_fetch == expected_variables_to_fetch class TestDecorator: - def test_add_before_decorator(self, zeebe_worker, decorator): + def test_add_before_decorator(self, zeebe_worker: ZeebeWorker, decorator: TaskDecorator): zeebe_worker.before(decorator) assert len(zeebe_worker._before) == 1 assert decorator in zeebe_worker._before - def test_add_after_decorator(self, zeebe_worker, decorator): + def test_add_after_decorator(self, zeebe_worker: ZeebeWorker, decorator: TaskDecorator): zeebe_worker.after(decorator) assert len(zeebe_worker._after) == 1 assert decorator in zeebe_worker._after - def test_add_constructor_before_decorator(self, decorator): + def test_add_constructor_before_decorator(self, decorator: TaskDecorator): zeebe_worker = ZeebeWorker(before=[decorator]) assert len(zeebe_worker._before) == 1 assert decorator in zeebe_worker._before - def test_add_constructor_after_decorator(self, decorator): + def test_add_constructor_after_decorator(self, decorator: TaskDecorator): zeebe_worker = ZeebeWorker(after=[decorator]) assert len(zeebe_worker._after) == 1 assert decorator in zeebe_worker._after - def test_create_before_decorator_runner(self, zeebe_worker, task, decorator, job_from_task): - task.before(decorator) - - decorators = zeebe_worker._create_before_decorator_runner(task) - - assert isinstance(decorators(job_from_task), Job) - - def test_before_task_decorator_called(self, zeebe_worker, task, decorator, job_from_task): - task.before(decorator) - zeebe_worker._add_task(task) - - task.handler(job_from_task) - - decorator.assert_called_with(job_from_task) - - def test_after_task_decorator_called(self, zeebe_worker, task, decorator, job_from_task): - task.after(decorator) - zeebe_worker._add_task(task) - - task.handler(job_from_task) - - decorator.assert_called_with(job_from_task) - - def test_decorator_failed(self, zeebe_worker, task, decorator, job_from_task): - decorator.side_effect = Exception() - zeebe_worker.before(decorator) - zeebe_worker.after(decorator) - zeebe_worker._add_task(task) - - assert isinstance(task.handler(job_from_task), Job) - assert decorator.call_count == 2 - class TestHandleJobs: @pytest.fixture(autouse=True) - def get_jobs_mock(self, zeebe_worker): + def get_jobs_mock(self, zeebe_worker: ZeebeWorker): zeebe_worker._get_jobs = MagicMock() return zeebe_worker._get_jobs @pytest.fixture(autouse=True) - def task_handler_mock(self, task): - task.handler = MagicMock(wraps=task.handler) + def job_handler_spy(self, task: Task): + task.job_handler = MagicMock(wraps=task.job_handler) - def test_handle_no_job(self, zeebe_worker, task, get_jobs_mock): + def test_handle_no_job(self, zeebe_worker: ZeebeWorker, task: Task, get_jobs_mock: MagicMock): get_jobs_mock.return_value = [] zeebe_worker._handle_jobs(task) - task.handler.assert_not_called() + task.job_handler.assert_not_called() - def test_handle_one_job(self, zeebe_worker, task, job_from_task, get_jobs_mock): + def test_handle_one_job(self, zeebe_worker: ZeebeWorker, task: Task, job_from_task: Job, get_jobs_mock: MagicMock): get_jobs_mock.return_value = [job_from_task] zeebe_worker._handle_jobs(task) - task.handler.assert_called_with(job_from_task) + task.job_handler.assert_called_with(job_from_task) - def test_handle_many_jobs(self, zeebe_worker, task, job_from_task, get_jobs_mock): + def test_handle_many_jobs(self, zeebe_worker: ZeebeWorker, task: Task, job_from_task: Job, + get_jobs_mock: MagicMock): get_jobs_mock.return_value = [job_from_task] * 10 zeebe_worker._handle_jobs(task) - assert task.handler.call_count == 10 + assert task.job_handler.call_count == 10 class TestWorkerThreads: - def test_work_thread_start_called(self, zeebe_worker, task): + def test_work_thread_start_called(self, zeebe_worker: ZeebeWorker, task: Task): with patch("pyzeebe.worker.worker.Thread") as thread_mock: thread_instance_mock = MagicMock() thread_mock.return_value = thread_instance_mock @@ -181,12 +112,13 @@ def test_work_thread_start_called(self, zeebe_worker, task): zeebe_worker.stop() thread_instance_mock.start.assert_called_once() - def test_stop_worker(self, zeebe_worker): + def test_stop_worker(self, zeebe_worker: ZeebeWorker): zeebe_worker.work() zeebe_worker.stop() def test_watch_task_threads_dont_restart_running_threads( - self, zeebe_worker, task, handle_task_mock, stop_event_mock, handle_not_alive_thread_spy, stop_after_test): + self, zeebe_worker: ZeebeWorker, task: Task, handle_task_mock: MagicMock, stop_event_mock: MagicMock, + handle_not_alive_thread_spy: MagicMock, stop_after_test: StopEvent): def fake_task_handler_never_return(*_args): while not stop_after_test.is_set(): time.sleep(0.05) @@ -202,7 +134,8 @@ def fake_task_handler_never_return(*_args): assert handle_not_alive_thread_spy.call_count == 0 def test_watch_task_threads_that_die_get_restarted_then_exit_after_too_many_errors( - self, zeebe_worker, task, handle_task_mock, stop_event_mock, handle_not_alive_thread_spy): + self, zeebe_worker: ZeebeWorker, task: Task, handle_task_mock: MagicMock, stop_event_mock: MagicMock, + handle_not_alive_thread_spy: MagicMock): def fake_task_handler_return_immediately(*_args): pass @@ -220,51 +153,53 @@ def fake_task_handler_return_immediately(*_args): class TestGetJobs: - def test_activate_jobs_called(self, zeebe_worker, task): + def test_activate_jobs_called(self, zeebe_worker: ZeebeWorker, task: Task): zeebe_worker.zeebe_adapter.activate_jobs = MagicMock() zeebe_worker._get_jobs(task) zeebe_worker.zeebe_adapter.activate_jobs.assert_called_with(task_type=task.type, worker=zeebe_worker.name, - timeout=task.timeout, - max_jobs_to_activate=task.max_jobs_to_activate, - variables_to_fetch=task.variables_to_fetch, + timeout=task.config.timeout_ms, + max_jobs_to_activate=task.config.max_jobs_to_activate, + variables_to_fetch=task.config.variables_to_fetch, request_timeout=zeebe_worker.request_timeout) class TestIncludeRouter: - def test_include_router_adds_task(self, zeebe_worker, router, task_type): + def test_include_router_adds_task(self, zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, task_type: str): self.include_router_with_task(zeebe_worker, router, task_type) assert zeebe_worker.get_task(task_type) is not None - def test_include_multiple_routers(self, zeebe_worker, routers): + def test_include_multiple_routers(self, zeebe_worker: ZeebeWorker, routers: List[ZeebeTaskRouter]): for router in routers: self.include_router_with_task(zeebe_worker, router) assert len(zeebe_worker.tasks) == len(routers) - def test_router_before_decorator(self, zeebe_worker, router, decorator, job_without_adapter): + def test_router_before_decorator(self, zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, decorator: TaskDecorator, + mocked_job_with_adapter: Job): router.before(decorator) task = self.include_router_with_task(zeebe_worker, router) - task.handler(job_without_adapter) + task.job_handler(mocked_job_with_adapter) - assert decorator.call_count == 1 + decorator.assert_called_once() - def test_router_after_decorator(self, zeebe_worker, router, decorator, job_without_adapter): + def test_router_after_decorator(self, zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, decorator: TaskDecorator, + mocked_job_with_adapter: Job): router.after(decorator) task = self.include_router_with_task(zeebe_worker, router) - task.handler(job_without_adapter) + task.job_handler(mocked_job_with_adapter) - assert decorator.call_count == 1 + decorator.assert_called_once() @staticmethod - def include_router_with_task(zeebe_worker, router, task_type=None): + def include_router_with_task(zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, task_type: str = None) -> Task: task_type = task_type or str(uuid4()) @router.task(task_type) - def _(x): - return dict(x=x) + def dummy_function(): + return {} zeebe_worker.include_router(router) return zeebe_worker.get_task(task_type)