Skip to content

Commit

Permalink
Merge branch 'remove-requests'
Browse files Browse the repository at this point in the history
  • Loading branch information
amh4r committed Oct 30, 2023
2 parents 8451970 + 5b17226 commit 88320b1
Show file tree
Hide file tree
Showing 19 changed files with 189 additions and 185 deletions.
3 changes: 3 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ make install
## Start Example Servers

```sh
# Fast API
(cd examples/fast_api && make dev)

# Flask
(cd examples/flask && make dev)

Expand Down
2 changes: 1 addition & 1 deletion constraints.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Constrain to the lowest version for each range in pyproject.toml. This ensures
# that our code won't rely on features in newer versions of dependencies. These
# constraints won't affect installed versions by consumers.
httpx==0.24.0
pydantic==2.1.1
requests==2.27.0
3 changes: 0 additions & 3 deletions examples/fast_api/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,3 @@ check-venv:

dev: check-venv
@sh ./scripts/start.sh

prod: check-venv
@sh ./scripts/start.sh
7 changes: 0 additions & 7 deletions examples/fast_api/README.md

This file was deleted.

7 changes: 0 additions & 7 deletions examples/flask/README.md

This file was deleted.

10 changes: 0 additions & 10 deletions examples/flask/pyproject.toml

This file was deleted.

7 changes: 0 additions & 7 deletions examples/tornado/README.md

This file was deleted.

10 changes: 0 additions & 10 deletions examples/tornado/pyproject.toml

This file was deleted.

28 changes: 0 additions & 28 deletions foo.py

This file was deleted.

18 changes: 9 additions & 9 deletions inngest/_internal/client_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ def __init__(
) -> None:
self.app_id = app_id
self.base_url = base_url
self._httpx_client = httpx.AsyncClient()
self._httpx_client_sync = httpx.Client()
self.is_production = is_production or env.is_prod()
self.logger = logger or logging.getLogger(__name__)

Expand Down Expand Up @@ -62,7 +60,7 @@ def _build_send_request(
d["ts"] = int(time.time() * 1000)
body.append(d)

return self._httpx_client.build_request(
return httpx.Client().build_request(
"POST",
url,
headers=headers,
Expand All @@ -77,9 +75,10 @@ async def send(
if not isinstance(events, list):
events = [events]

res = await self._httpx_client.send(
self._build_send_request(events),
)
async with httpx.AsyncClient() as client:
res = await client.send(
self._build_send_request(events),
)

return _extract_ids(res.json())

Expand All @@ -90,9 +89,10 @@ def send_sync(
if not isinstance(events, list):
events = [events]

res = self._httpx_client_sync.send(
self._build_send_request(events),
)
with httpx.Client() as client:
res = client.send(
self._build_send_request(events),
)

return _extract_ids(res.json())

Expand Down
150 changes: 57 additions & 93 deletions inngest/_internal/comm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import typing
import urllib.parse

import requests
import httpx

from inngest._internal import (
client_lib,
Expand Down Expand Up @@ -53,6 +53,21 @@ def body(self, body: object) -> None:
else:
self.headers[const.HeaderKey.CONTENT_TYPE.value] = "text/plain"

@classmethod
def from_internal_error(
cls,
err: errors.InternalError,
framework: str,
) -> CommResponse:
return cls(
body={
"code": str(err),
"message": str(err),
},
headers=net.create_headers(framework=framework),
status_code=err.status_code,
)


class CommHandlerBase(typing.Generic[FunctionT]):
_base_url: str
Expand Down Expand Up @@ -100,6 +115,42 @@ def __init__(
const.EnvKey.SIGNING_KEY.value
)

def _build_registration_request(
self,
app_url: str,
) -> httpx.Request:
registration_url = urllib.parse.urljoin(
self._base_url,
"/fn/register",
)

body = transforms.prep_body(
registration.RegisterRequest(
app_name=self._client.app_id,
deploy_type=registration.DeployType.PING,
framework=self._framework,
functions=self.get_function_configs(app_url),
sdk=f"{const.LANGUAGE}:v{const.VERSION}",
url=app_url,
# TODO: Do this for real.
v="0.1",
).to_dict()
)

headers = net.create_headers(framework=self._framework)
if self._signing_key:
headers[
"Authorization"
] = f"Bearer {transforms.hash_signing_key(self._signing_key)}"

return httpx.Client().build_request(
"POST",
registration_url,
headers=headers,
json=body,
timeout=30,
)

def _create_response(
self,
call_res: list[execution.CallResponse] | str | execution.CallError,
Expand Down Expand Up @@ -156,27 +207,9 @@ def get_function_configs(
raise errors.InvalidConfig("no functions found")
return configs

def _convert_error_to_response(
self,
err: errors.InternalError,
) -> CommResponse:
body = {
"code": str(err),
"message": str(err),
}
self._logger.error(
"function call failed",
extra=body,
)
return CommResponse(
body=body,
headers=net.create_headers(framework=self._framework),
status_code=err.status_code,
)

def _parse_registration_response(
self,
server_res: requests.Response,
server_res: httpx.Response,
) -> CommResponse:
comm_res = CommResponse(
headers=net.create_headers(framework=self._framework)
Expand Down Expand Up @@ -219,77 +252,8 @@ def _parse_registration_response(
comm_res.body = body
return comm_res

def register(
self,
*,
app_url: str,
is_from_dev_server: bool,
) -> CommResponse:
"""
Handles a registration call.
"""

try:
if is_from_dev_server and self._is_production:
self._logger.error(
"Dev Server registration not allowed in production mode"
)

return CommResponse(
body={
"code": const.ErrorCode.DEV_SERVER_REGISTRATION_NOT_ALLOWED.value,
"message": "dev server not allowed",
},
headers={},
status_code=400,
)

registration_url = urllib.parse.urljoin(
self._base_url,
"/fn/register",
)

body = transforms.prep_body(
registration.RegisterRequest(
app_name=self._client.app_id,
deploy_type=registration.DeployType.PING,
framework=self._framework,
functions=self.get_function_configs(app_url),
sdk=f"{const.LANGUAGE}:v{const.VERSION}",
url=app_url,
# TODO: Do this for real.
v="0.1",
).to_dict()
)

headers = net.create_headers(framework=self._framework)
if self._signing_key:
headers[
"Authorization"
] = f"Bearer {transforms.hash_signing_key(self._signing_key)}"

res = net.requests_session.post(
registration_url,
json=body,
headers=headers,
timeout=30,
)

return self._parse_registration_response(res)
except errors.InternalError as err:
self._logger.error(
"registration failed",
extra={
"error_code": err.code,
"error_message": str(err),
},
)

return CommResponse(
body={
"code": err.code,
"message": str(err),
},
headers=net.create_headers(framework=self._framework),
status_code=err.status_code,
def _validate_registration(self, is_from_dev_server: bool) -> None:
if is_from_dev_server and self._is_production:
raise errors.DevServerRegistrationNotAllowed(
"Dev Server registration not allowed in production mode"
)
26 changes: 25 additions & 1 deletion inngest/_internal/comm/comm_async.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import httpx

from inngest._internal import errors, execution, function, net

from . import base
Expand All @@ -24,4 +26,26 @@ async def call_function(
await fn.call(call, self._client, fn_id)
)
except errors.InternalError as err:
return self._convert_error_to_response(err)
return base.CommResponse.from_internal_error(err, self._framework)

async def register(
self,
*,
app_url: str,
is_from_dev_server: bool,
) -> base.CommResponse:
"""
Handles a registration call.
"""

try:
self._validate_registration(is_from_dev_server)

async with httpx.AsyncClient() as client:
res = await client.send(
self._build_registration_request(app_url),
)

return self._parse_registration_response(res)
except errors.InternalError as err:
return base.CommResponse.from_internal_error(err, self._framework)
Loading

0 comments on commit 88320b1

Please sign in to comment.