diff --git a/async_processor/__init__.py b/async_processor/__init__.py index feea5ad..80688a1 100644 --- a/async_processor/__init__.py +++ b/async_processor/__init__.py @@ -10,7 +10,6 @@ InputConfig, InputMessage, InputMessageInterface, - InputMessageV2, KafkaInputConfig, KafkaOutputConfig, KafkaSASLAuth, @@ -49,6 +48,5 @@ "KafkaSASLAuth", "AsyncProcessor", "CoreNATSOutputConfig", - "InputMessageV2", "InputMessageInterface", ] diff --git a/async_processor/processor.py b/async_processor/processor.py index 7c71aa4..d0a1694 100644 --- a/async_processor/processor.py +++ b/async_processor/processor.py @@ -8,11 +8,9 @@ from async_processor.app import ProcessorApp from async_processor.logger import logger -from async_processor.pydantic_v1 import ValidationError from async_processor.types import ( InputMessage, InputMessageInterface, - InputMessageV2, OutputMessage, WorkerConfig, ) @@ -39,10 +37,7 @@ def input_deserializer( f"Expected dict, got {type(input_message)}" ) logger.debug(f"Deserializing input message: {input_message!r}") - try: - return InputMessage(**input_message) - except ValidationError: - return InputMessageV2(**input_message) + return InputMessage(**input_message) def output_serializer(self, output_message: OutputMessage) -> bytes: logger.debug(f"Serializing output message: {output_message!r}") diff --git a/async_processor/types.py b/async_processor/types.py index 011ba67..72fbf7c 100644 --- a/async_processor/types.py +++ b/async_processor/types.py @@ -48,44 +48,6 @@ def get_request_method(self) -> Optional[str]: class InputMessage(InputMessageInterface): - request_id: constr(min_length=1) - body: Any - published_at_epoch_ns: Optional[int] = None - stream_response: bool = False - request_headers: Optional[Dict[str, Union[str, List[str]]]] = None - request_url_path: Optional[str] = None - request_method: Optional[str] = None - - class Config: - extra = Extra.forbid - - def get_request_id(self) -> Optional[str]: - return self.request_id - - def get_published_at_epoch_ns(self) -> Optional[int]: - return self.published_at_epoch_ns - - # TODO: this method is only here for sidecar - # move this logic to sidecar module - def get_body(self) -> Any: - return self.body - - def should_stream_response(self) -> bool: - return self.stream_response - - def get_request_headers(self) -> Optional[Dict[str, Union[str, List[str]]]]: - return self.request_headers - - def get_request_url_path(self) -> Optional[str]: - return self.request_url_path - - def get_request_method(self) -> Optional[str]: - return self.request_method - - -# We cannot maintain two different types and should remove `InputMessage` -# after sometime -class InputMessageV2(InputMessageInterface): tfy_request_id: Optional[constr(regex=r"^[a-zA-Z0-9\-]{1,36}$")] = None tfy_published_at_epoch_ns: Optional[int] = None tfy_stream_response: bool = False @@ -131,7 +93,7 @@ class OutputMessage(BaseModel): status: ProcessStatus body: Optional[Any] = None error: Optional[str] = None - input_message: Optional[Union[InputMessage, InputMessageV2]] = None + input_message: Optional[InputMessage] = None # these are experimental fields status_code: Optional[str] = None diff --git a/tests/test_processor.py b/tests/test_processor.py index f37bd07..e853d20 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -1,14 +1,10 @@ -from async_processor import InputMessage, InputMessageV2 +from async_processor import InputMessage from async_processor.processor import BaseProcessor def test_default_input_deserializer(): p = BaseProcessor() - assert p.input_deserializer( - serialized_input_message='{"request_id": "a", "body": "a"}' - ) == InputMessage(request_id="a", body="a") - - assert p.input_deserializer( - serialized_input_message='{"a": "b"}' - ) == InputMessageV2(a="b") + assert p.input_deserializer(serialized_input_message='{"a": "b"}') == InputMessage( + a="b" + ) diff --git a/tests/test_types.py b/tests/test_types.py index 44f4bf5..0f2667c 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -1,10 +1,8 @@ -from async_processor import InputMessage, InputMessageV2 +from async_processor import InputMessage def test_input_message_get_body(): - assert InputMessage(request_id="a", body="b").get_body() == "b" - - assert InputMessageV2(tfy_request_id="a", foo="bar", baz=[1, 2]).get_body() == { + assert InputMessage(tfy_request_id="a", foo="bar", baz=[1, 2]).get_body() == { "foo": "bar", "baz": [1, 2], }