diff --git a/inngest/_internal/server_lib/__init__.py b/inngest/_internal/server_lib/__init__.py index 7c46733d..a28b4279 100644 --- a/inngest/_internal/server_lib/__init__.py +++ b/inngest/_internal/server_lib/__init__.py @@ -14,7 +14,11 @@ SyncKind, ) from .event import Event -from .execution_request import ServerRequest +from .execution_request import ( + ServerRequest, + ServerRequestCtx, + ServerRequestCtxStack, +) from .inspection import AuthenticatedInspection, UnauthenticatedInspection from .registration import ( Batch, @@ -62,6 +66,8 @@ "Runtime", "ServerKind", "ServerRequest", + "ServerRequestCtx", + "ServerRequestCtxStack", "Step", "SyncKind", "Throttle", diff --git a/inngest/experimental/mocked/__init__.py b/inngest/experimental/mocked/__init__.py new file mode 100644 index 00000000..d9d252f7 --- /dev/null +++ b/inngest/experimental/mocked/__init__.py @@ -0,0 +1,18 @@ +""" +Simulate Inngest function execution without an Inngest server. + +NOT STABLE! This is an experimental feature and may change in the future. If +you'd like to depend on it, we recommend copying this directory into your source +code. +""" + +from .client import Inngest +from .consts import Status, Timeout +from .trigger import trigger + +__all__ = [ + "Inngest", + "Status", + "Timeout", + "trigger", +] diff --git a/inngest/experimental/mocked/client.py b/inngest/experimental/mocked/client.py new file mode 100644 index 00000000..bb4c67c9 --- /dev/null +++ b/inngest/experimental/mocked/client.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import typing + +import inngest +from inngest._internal import server_lib + + +class Inngest(inngest.Inngest): + """ + Mock Inngest client. + """ + + async def send( + self, + events: typing.Union[server_lib.Event, list[server_lib.Event]], + *, + skip_middleware: bool = False, + ) -> list[str]: + """ + Mocked event send method. + """ + + ids = [] + for event in events: + ids.append("00000000000000000000000000") + return ids + + def send_sync( + self, + events: typing.Union[server_lib.Event, list[server_lib.Event]], + *, + skip_middleware: bool = False, + ) -> list[str]: + """ + Mocked event send method. + """ + + ids = [] + for event in events: + ids.append("00000000000000000000000000") + return ids diff --git a/inngest/experimental/mocked/consts.py b/inngest/experimental/mocked/consts.py new file mode 100644 index 00000000..1ca08519 --- /dev/null +++ b/inngest/experimental/mocked/consts.py @@ -0,0 +1,13 @@ +import enum + + +class Status(enum.Enum): + """ + Function run status. + """ + + COMPLETED = "Completed" + FAILED = "Failed" + + +Timeout = object() diff --git a/inngest/experimental/mocked/errors.py b/inngest/experimental/mocked/errors.py new file mode 100644 index 00000000..18cfbcf2 --- /dev/null +++ b/inngest/experimental/mocked/errors.py @@ -0,0 +1,13 @@ +class UnstubbedStepError(Exception): + """ + Raised when a must-stub step is not stubbed + """ + + def __init__(self, step_id: str) -> None: + """ + Args: + ---- + step_id: Unmocked step ID. + """ + + super().__init__(f"step {step_id} is not stubbed") diff --git a/inngest/experimental/mocked/trigger.py b/inngest/experimental/mocked/trigger.py new file mode 100644 index 00000000..453704a3 --- /dev/null +++ b/inngest/experimental/mocked/trigger.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +import asyncio +import dataclasses +import typing +import unittest.mock + +import inngest +from inngest._internal import ( + async_lib, + execution_lib, + middleware_lib, + server_lib, + step_lib, +) + +from .client import Inngest +from .consts import Status, Timeout +from .errors import UnstubbedStepError + + +def trigger( + fn: inngest.Function, + event: typing.Union[inngest.Event, list[inngest.Event]], + client: Inngest, + *, + step_stubs: typing.Optional[dict[str, object]] = None, +) -> _Result: + """ + Trigger a function. + + Args: + ---- + fn: Function to trigger. + event: Triggering event. + client: Mock Inngest client. + step_stubs: Static step stubs. Keys are step IDs and values are stubs. + """ + + if not isinstance(event, list): + event = [event] + elif len(event) == 0: + raise Exception("Must provide at least 1 event") + + if step_stubs is None: + step_stubs = {} + + stack: list[str] = [] + steps: dict[str, object] = {} + planned = set[str]() + attempt = 0 + + max_attempt = 4 + if fn._opts.retries is not None: + max_attempt = fn._opts.retries + + while True: + step_id: typing.Optional[str] = None + if len(planned) > 0: + step_id = planned.pop() + + logger = unittest.mock.Mock() + request = server_lib.ServerRequest( + ctx=server_lib.ServerRequestCtx( + attempt=attempt, + disable_immediate_execution=True, + run_id="test", + stack=server_lib.ServerRequestCtxStack(stack=stack), + ), + event=event[0], + events=event, + steps=steps, + use_api=False, + ) + middleware = middleware_lib.MiddlewareManager.from_client( + client, + {}, + ) + + ctx = execution_lib.Context( + attempt=request.ctx.attempt, + event=event[0], + events=event, + logger=logger, + run_id=request.ctx.run_id, + ) + + memos = step_lib.StepMemos.from_raw(steps) + + if fn.is_handler_async: + loop = async_lib.get_event_loop() + if loop is None: + loop = asyncio.new_event_loop() + + res = loop.run_until_complete( + fn.call( + client, + ctx, + fn.id, + middleware, + request, + memos, + step_id, + ) + ) + else: + res = fn.call_sync( + client, + ctx, + fn.id, + middleware, + request, + memos, + step_id, + ) + + if res.error: + if attempt >= max_attempt: + return _Result( + error=res.error, + output=None, + status=Status.FAILED, + ) + + attempt += 1 + continue + + if res.multi: + for step in res.multi: + if not step.step: + # Unreachable + continue + + if step.error: + if attempt >= max_attempt: + return _Result( + error=step.error, + output=None, + status=Status.FAILED, + ) + + attempt += 1 + continue + + if step.step.display_name in step_stubs: + stub = step_stubs[step.step.display_name] + if stub is Timeout: + stub = None + + steps[step.step.id] = stub + stack.append(step.step.id) + continue + + if step.step.op is server_lib.Opcode.PLANNED: + planned.add(step.step.id) + elif step.step.op is server_lib.Opcode.SLEEP: + steps[step.step.id] = None + stack.append(step.step.id) + elif step.step.op is server_lib.Opcode.STEP_RUN: + steps[step.step.id] = step.output + stack.append(step.step.id) + else: + raise UnstubbedStepError(step.step.display_name) + + continue + + return _Result( + error=None, + output=res.output, + status=Status.COMPLETED, + ) + + +@dataclasses.dataclass +class _Result: + error: typing.Optional[Exception] + output: object + status: Status diff --git a/inngest/experimental/mocked/trigger_test.py b/inngest/experimental/mocked/trigger_test.py new file mode 100644 index 00000000..7901945a --- /dev/null +++ b/inngest/experimental/mocked/trigger_test.py @@ -0,0 +1,331 @@ +import datetime +import typing +import unittest + +import pytest + +import inngest + +from .client import Inngest +from .consts import Status, Timeout +from .errors import UnstubbedStepError +from .trigger import trigger + +client = Inngest(app_id="test") + + +class TestTriggerAsync(unittest.TestCase): + def test_parallel(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + async def fn( + ctx: inngest.Context, + step: inngest.Step, + ) -> tuple[str, ...]: + return await step.parallel( + ( + lambda: step.run("a", lambda: "a"), + lambda: step.run("b", lambda: "b"), + ) + ) + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.COMPLETED + assert res.output == ("a", "b") + + +class TestTriggerSync(unittest.TestCase): + def test_no_steps(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> str: + return "hi" + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.COMPLETED + assert res.output == "hi" + + def test_two_steps(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> str: + step.run("a", lambda: None) + step.run("b", lambda: None) + return "hi" + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.COMPLETED + assert res.output == "hi" + + def test_client_send(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> list[str]: + return client.send_sync( + [ + inngest.Event(name="other-event"), + inngest.Event(name="other-event"), + ] + ) + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.COMPLETED + assert res.output == [ + "00000000000000000000000000", + "00000000000000000000000000", + ] + + def test_send_event(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> list[str]: + return step.send_event( + "a", + [ + inngest.Event(name="event-1"), + inngest.Event(name="event-2"), + ], + ) + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.COMPLETED + assert res.output == [ + "00000000000000000000000000", + "00000000000000000000000000", + ] + + def test_invoke(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> typing.Any: + return step.invoke_by_id( + "a", + app_id="foo", + function_id="bar", + ) + + res = trigger( + fn, + inngest.Event(name="test"), + client, + step_stubs={"a": "hi"}, + ) + assert res.status is Status.COMPLETED + assert res.output == "hi" + + def test_parallel(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> tuple[str, ...]: + return step.parallel( + ( + lambda: step.run("a", lambda: "a"), + lambda: step.run("b", lambda: "b"), + ) + ) + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.COMPLETED + assert res.output == ("a", "b") + + def test_sleep(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> str: + step.sleep("a", datetime.timedelta(seconds=1)) + return "hi" + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.COMPLETED + assert res.output == "hi" + + def test_wait_for_event(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> typing.Mapping[str, inngest.JSON]: + event = step.wait_for_event( + "a", + event="other-event", + timeout=datetime.timedelta(seconds=1), + ) + assert event is not None + return event.data + + res = trigger( + fn, + inngest.Event(name="test"), + client, + step_stubs={ + "a": inngest.Event(data={"foo": 1}, name="other-event") + }, + ) + assert res.status is Status.COMPLETED + assert res.output == {"foo": 1} + + def test_wait_for_event_timeout(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> None: + event = step.wait_for_event( + "a", + event="other-event", + timeout=datetime.timedelta(seconds=1), + ) + assert event is None + + res = trigger( + fn, + inngest.Event(name="test"), + client, + step_stubs={"a": Timeout}, + ) + assert res.status is Status.COMPLETED + + def test_wait_for_event_not_stubbed(self) -> None: + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> None: + step.wait_for_event( + "a", + event="other-event", + timeout=datetime.timedelta(seconds=1), + ) + + with pytest.raises(UnstubbedStepError): + trigger(fn, inngest.Event(name="test"), client) + + def test_retry_step(self) -> None: + counter = 0 + + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> str: + def a() -> str: + nonlocal counter + counter += 1 + if counter < 2: + raise Exception("oh no") + return "hi" + + return step.run("a", a) + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.COMPLETED + assert res.output == "hi" + + def test_fail_step(self) -> None: + @client.create_function( + fn_id="test", + retries=0, + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> None: + def a() -> None: + raise Exception("oh no") + + step.run("a", a) + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.FAILED + assert res.output is None + assert isinstance(res.error, Exception) + assert str(res.error) == "oh no" + + def test_retry_fn(self) -> None: + counter = 0 + + @client.create_function( + fn_id="test", + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> str: + nonlocal counter + counter += 1 + if counter < 2: + raise Exception("oh no") + return "hi" + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.COMPLETED + assert res.output == "hi" + + def test_fail_fn(self) -> None: + @client.create_function( + fn_id="test", + retries=0, + trigger=inngest.TriggerEvent(event="test"), + ) + def fn( + ctx: inngest.Context, + step: inngest.StepSync, + ) -> None: + raise Exception("oh no") + + res = trigger(fn, inngest.Event(name="test"), client) + assert res.status is Status.FAILED + assert res.output is None + assert isinstance(res.error, Exception) + assert str(res.error) == "oh no" diff --git a/pyproject.toml b/pyproject.toml index bd1737e0..3e6f0190 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,12 +99,14 @@ extend-select = [ 'UP', ] extend-ignore = [ + 'D100', 'D200', 'D202', 'D203', 'D205', 'D212', 'D400', + 'D401', 'D415', 'S112',