Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean up: replace input message with input message v2 #100

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions async_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
InputConfig,
InputMessage,
InputMessageInterface,
InputMessageV2,
KafkaInputConfig,
KafkaOutputConfig,
KafkaSASLAuth,
Expand Down Expand Up @@ -49,6 +48,5 @@
"KafkaSASLAuth",
"AsyncProcessor",
"CoreNATSOutputConfig",
"InputMessageV2",
"InputMessageInterface",
]
7 changes: 1 addition & 6 deletions async_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@

from async_processor.app import ProcessorApp
from async_processor.logger import logger
from async_processor.pydantic_v1 import ValidationError
from async_processor.types import (
InputMessage,
InputMessageInterface,
InputMessageV2,
OutputMessage,
WorkerConfig,
)
Expand All @@ -39,10 +37,7 @@ def input_deserializer(
f"Expected dict, got {type(input_message)}"
)
logger.debug(f"Deserializing input message: {input_message!r}")
try:
return InputMessage(**input_message)
except ValidationError:
return InputMessageV2(**input_message)
return InputMessage(**input_message)

def output_serializer(self, output_message: OutputMessage) -> bytes:
logger.debug(f"Serializing output message: {output_message!r}")
Expand Down
40 changes: 1 addition & 39 deletions async_processor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,44 +48,6 @@ def get_request_method(self) -> Optional[str]:


class InputMessage(InputMessageInterface):
request_id: constr(min_length=1)
body: Any
published_at_epoch_ns: Optional[int] = None
stream_response: bool = False
request_headers: Optional[Dict[str, Union[str, List[str]]]] = None
request_url_path: Optional[str] = None
request_method: Optional[str] = None

class Config:
extra = Extra.forbid

def get_request_id(self) -> Optional[str]:
return self.request_id

def get_published_at_epoch_ns(self) -> Optional[int]:
return self.published_at_epoch_ns

# TODO: this method is only here for sidecar
# move this logic to sidecar module
def get_body(self) -> Any:
return self.body

def should_stream_response(self) -> bool:
return self.stream_response

def get_request_headers(self) -> Optional[Dict[str, Union[str, List[str]]]]:
return self.request_headers

def get_request_url_path(self) -> Optional[str]:
return self.request_url_path

def get_request_method(self) -> Optional[str]:
return self.request_method


# We cannot maintain two different types and should remove `InputMessage`
# after sometime
class InputMessageV2(InputMessageInterface):
tfy_request_id: Optional[constr(regex=r"^[a-zA-Z0-9\-]{1,36}$")] = None
tfy_published_at_epoch_ns: Optional[int] = None
tfy_stream_response: bool = False
Expand Down Expand Up @@ -131,7 +93,7 @@ class OutputMessage(BaseModel):
status: ProcessStatus
body: Optional[Any] = None
error: Optional[str] = None
input_message: Optional[Union[InputMessage, InputMessageV2]] = None
input_message: Optional[InputMessage] = None

# these are experimental fields
status_code: Optional[str] = None
Expand Down
12 changes: 4 additions & 8 deletions tests/test_processor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
from async_processor import InputMessage, InputMessageV2
from async_processor import InputMessage
from async_processor.processor import BaseProcessor


def test_default_input_deserializer():
p = BaseProcessor()

assert p.input_deserializer(
serialized_input_message='{"request_id": "a", "body": "a"}'
) == InputMessage(request_id="a", body="a")

assert p.input_deserializer(
serialized_input_message='{"a": "b"}'
) == InputMessageV2(a="b")
assert p.input_deserializer(serialized_input_message='{"a": "b"}') == InputMessage(
a="b"
)
6 changes: 2 additions & 4 deletions tests/test_types.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from async_processor import InputMessage, InputMessageV2
from async_processor import InputMessage


def test_input_message_get_body():
assert InputMessage(request_id="a", body="b").get_body() == "b"

assert InputMessageV2(tfy_request_id="a", foo="bar", baz=[1, 2]).get_body() == {
assert InputMessage(tfy_request_id="a", foo="bar", baz=[1, 2]).get_body() == {
"foo": "bar",
"baz": [1, 2],
}
Loading