Skip to content

Commit

Permalink
Unify create_function and create_function_sync
Browse files Browse the repository at this point in the history
  • Loading branch information
amh4r committed Nov 1, 2023
1 parent 697ddca commit 2c5cab8
Show file tree
Hide file tree
Showing 39 changed files with 438 additions and 501 deletions.
3 changes: 1 addition & 2 deletions examples/functions/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import inngest


@inngest.create_function_sync(
@inngest.create_function(
batch_events=inngest.Batch(
max_size=2,
timeout=datetime.timedelta(minutes=1),
),
fn_id="batch",
name="Batch",
trigger=inngest.TriggerEvent(event="app/batch"),
)
def fn_sync(
Expand Down
2 changes: 1 addition & 1 deletion examples/functions/cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import inngest


@inngest.create_function_sync(
@inngest.create_function(
cancel=[inngest.Cancel(event="app/cancel.cancel")],
fn_id="cancel",
trigger=inngest.TriggerEvent(event="app/cancel"),
Expand Down
2 changes: 1 addition & 1 deletion examples/functions/debounce.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import inngest


@inngest.create_function_sync(
@inngest.create_function(
debounce=inngest.Debounce(period=datetime.timedelta(seconds=5)),
fn_id="debounce",
trigger=inngest.TriggerEvent(event="app/debounce"),
Expand Down
7 changes: 3 additions & 4 deletions examples/functions/duplicate_step_name.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import inngest


@inngest.create_function_sync(
fn_id="duplicate_step_name",
name="Duplicate step name",
trigger=inngest.TriggerEvent(event="app/duplicate_step_name"),
@inngest.create_function(
fn_id="duplicate_step_name_sync",
trigger=inngest.TriggerEvent(event="app/duplicate_step_name_sync"),
)
def fn_sync(*, step: inngest.StepSync, **_kwargs: object) -> str:
for _ in range(3):
Expand Down
3 changes: 1 addition & 2 deletions examples/functions/error_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ class MyError(Exception):
pass


@inngest.create_function_sync(
@inngest.create_function(
fn_id="error_step",
name="Error step",
retries=0,
trigger=inngest.TriggerEvent(event="app/error_step"),
)
Expand Down
3 changes: 1 addition & 2 deletions examples/functions/no_steps.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import inngest


@inngest.create_function_sync(
@inngest.create_function(
fn_id="no_steps",
name="No steps",
trigger=inngest.TriggerEvent(event="app/no_steps"),
)
def fn_sync(**_kwargs: object) -> int:
Expand Down
2 changes: 1 addition & 1 deletion examples/functions/on_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def _on_failure(
print("on_failure called")


@inngest.create_function_sync(
@inngest.create_function(
fn_id="on_failure",
on_failure=_on_failure,
retries=0,
Expand Down
4 changes: 1 addition & 3 deletions examples/functions/print_event.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import inngest


@inngest.create_function_sync(
@inngest.create_function(
fn_id="print_event",
name="Print event",
trigger=inngest.TriggerEvent(event="app/print_event"),
)
def fn_sync(
Expand All @@ -24,7 +23,6 @@ def _print_user() -> dict[str, object]:

@inngest.create_function(
fn_id="print_event_async",
name="Print event (async)",
trigger=inngest.TriggerEvent(event="app/print_event_async"),
)
async def fn(
Expand Down
3 changes: 1 addition & 2 deletions examples/functions/send_event.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import inngest


@inngest.create_function_sync(
@inngest.create_function(
fn_id="send_event",
name="Send event",
trigger=inngest.TriggerEvent(event="app/send_event"),
)
def fn_sync(*, step: inngest.StepSync, **_kwargs: object) -> None:
Expand Down
3 changes: 1 addition & 2 deletions examples/functions/two_steps_and_sleep.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import inngest


@inngest.create_function_sync(
@inngest.create_function(
fn_id="two_steps_and_sleep",
name="Two steps and sleep",
trigger=inngest.TriggerEvent(event="app/two_steps_and_sleep"),
)
def fn_sync(*, step: inngest.StepSync, **_kwargs: object) -> str:
Expand Down
3 changes: 1 addition & 2 deletions examples/functions/wait_for_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import inngest


@inngest.create_function_sync(
@inngest.create_function(
fn_id="wait_for_event",
name="wait_for_event",
trigger=inngest.TriggerEvent(event="app/wait_for_event"),
)
def fn_sync(*, step: inngest.StepSync, **_kwargs: object) -> None:
Expand Down
13 changes: 1 addition & 12 deletions inngest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
from ._internal.client_lib import Inngest
from ._internal.errors import NonRetriableError
from ._internal.event_lib import Event
from ._internal.function import (
Function,
FunctionOpts,
FunctionOptsSync,
FunctionSync,
create_function,
create_function_sync,
)
from ._internal.function import Function, create_function
from ._internal.function_config import (
Batch,
Cancel,
Expand All @@ -26,9 +19,6 @@
"Debounce",
"Event",
"Function",
"FunctionOpts",
"FunctionOptsSync",
"FunctionSync",
"Inngest",
"NonRetriableError",
"RateLimit",
Expand All @@ -38,5 +28,4 @@
"TriggerCron",
"TriggerEvent",
"create_function",
"create_function_sync",
]
49 changes: 29 additions & 20 deletions inngest/_internal/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def from_error(
class CommHandler:
_base_url: str
_client: client_lib.Inngest
_fns: dict[str, function.Function | function.FunctionSync]
_fns: dict[str, function.Function]
_framework: const.Framework
_is_production: bool
_logger: logging.Logger
Expand All @@ -87,7 +87,7 @@ def __init__(
base_url: str | None = None,
client: client_lib.Inngest,
framework: const.Framework,
functions: list[function.Function] | list[function.FunctionSync],
functions: list[function.Function],
logger: logging.Logger,
signing_key: str | None = None,
) -> None:
Expand Down Expand Up @@ -190,12 +190,6 @@ async def call_function(
case result.Err(err):
return CommResponse.from_error(err, self._framework)

if not isinstance(fn, function.Function):
return CommResponse.from_error(
errors.MismatchedSync(f"function {fn_id} is not asynchronous"),
self._framework,
)

return self._create_response(await fn.call(call, self._client, fn_id))

def call_function_sync(
Expand All @@ -211,23 +205,24 @@ def call_function_sync(

validation_res = req_sig.validate(self._signing_key)
if result.is_err(validation_res):
return CommResponse.from_error(
validation_res.err_value, self._framework
)
err = validation_res.err_value
extra = {}
if isinstance(err, errors.InternalError):
extra["code"] = err.code
self._logger.error(err, extra=extra)
return CommResponse.from_error(err, self._framework)

match self._get_function(fn_id):
case result.Ok(fn):
pass
case result.Err(err):
extra = {}
if isinstance(err, errors.InternalError):
extra["code"] = err.code
self._logger.error(err, extra=extra)
return CommResponse.from_error(err, self._framework)

if not isinstance(fn, function.FunctionSync):
return CommResponse.from_error(
errors.MismatchedSync(f"function {fn_id} is not asynchronous"),
self._framework,
)

return self._create_response(fn.call(call, self._client, fn_id))
return self._create_response(fn.call_sync(call, self._client, fn_id))

def _create_response(
self,
Expand All @@ -252,7 +247,17 @@ def _create_response(
comm_res.body = transforms.prep_body(out)
comm_res.status_code = 206
elif isinstance(call_res, execution.CallError):
comm_res.body = transforms.prep_body(call_res.model_dump())
match call_res.to_dict():
case result.Ok(d):
body = transforms.prep_body(d)
case result.Err(err):
return CommResponse.from_error(err, self._framework)

self._logger.error(
call_res.message,
extra={"is_internal": call_res.is_internal},
)
comm_res.body = body
comm_res.status_code = 500

if call_res.is_retriable is False:
Expand All @@ -264,7 +269,7 @@ def _create_response(

def _get_function(
self, fn_id: str
) -> result.Result[function.Function | function.FunctionSync, Exception]:
) -> result.Result[function.Function, Exception]:
# Look for the function ID in the list of user functions, but also
# look for it in the list of on_failure functions.
for _fn in self._fns.values():
Expand Down Expand Up @@ -370,6 +375,8 @@ async def register(
case result.Ok(_):
pass
case result.Err(err):
print(err)
self._logger.error(err)
return CommResponse.from_error(err, self._framework)

async with httpx.AsyncClient() as client:
Expand All @@ -379,6 +386,8 @@ async def register(
await client.send(req)
)
case result.Err(err):
print(err)
self._logger.error(err)
return CommResponse.from_error(err, self._framework)

return res
Expand Down
4 changes: 2 additions & 2 deletions inngest/_internal/comm_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_full_config(self) -> None:
fully-specified config.
"""

@inngest.create_function_sync(
@inngest.create_function(
batch_events=inngest.Batch(
max_size=2, timeout=datetime.timedelta(minutes=1)
),
Expand Down Expand Up @@ -63,7 +63,7 @@ def fn(**_kwargs: object) -> int:
assert False, f"Unexpected error: {err}"

def test_no_functions(self) -> None:
functions: list[inngest.FunctionSync] = []
functions: list[inngest.Function] = []

handler = comm.CommHandler(
base_url="http://foo.bar",
Expand Down
10 changes: 10 additions & 0 deletions inngest/_internal/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,16 @@ def __init__(self, message: str | None = None) -> None:
)


class UnknownError(InternalError):
status_code: int = http.HTTPStatus.INTERNAL_SERVER_ERROR

def __init__(self, message: str | None = None) -> None:
super().__init__(
code=const.ErrorCode.UNKNOWN,
message=message,
)


class UnserializableOutput(InternalError):
status_code: int = http.HTTPStatus.INTERNAL_SERVER_ERROR

Expand Down
Loading

0 comments on commit 2c5cab8

Please sign in to comment.