diff --git a/examples/sourcetransform/event_time_filter/example.py b/examples/sourcetransform/event_time_filter/example.py index 24c8a9e3..e33604dc 100644 --- a/examples/sourcetransform/event_time_filter/example.py +++ b/examples/sourcetransform/event_time_filter/example.py @@ -6,8 +6,9 @@ """ This is a simple User Defined Function example which receives a message, applies the following data transformation, and returns the message. -If the message event time is before year 2022, drop the message. If it's within year 2022, update -the tag to "within_year_2022" and update the message event time to Jan 1st 2022. +If the message event time is before year 2022, drop the message with event time unchanged. +If it's within year 2022, update the tag to "within_year_2022" and +update the message event time to Jan 1st 2022. Otherwise, (exclusively after year 2022), update the tag to "after_year_2022" and update the message event time to Jan 1st 2023. """ @@ -23,7 +24,7 @@ def my_handler(keys: list[str], datum: Datum) -> Messages: if event_time < january_first_2022: logging.info("Got event time:%s, it is before 2022, so dropping", event_time) - messages.append(Message.to_drop()) + messages.append(Message.to_drop(event_time)) elif event_time < january_first_2023: logging.info( "Got event time:%s, it is within year 2022, so forwarding to within_year_2022", diff --git a/pynumaflow/_constants.py b/pynumaflow/_constants.py index fce8419a..253d0401 100644 --- a/pynumaflow/_constants.py +++ b/pynumaflow/_constants.py @@ -1,5 +1,3 @@ -from datetime import datetime, timezone, timedelta - MAP_SOCK_PATH = "/var/run/numaflow/map.sock" MAP_STREAM_SOCK_PATH = "/var/run/numaflow/mapstream.sock" REDUCE_SOCK_PATH = "/var/run/numaflow/reduce.sock" @@ -19,8 +17,3 @@ STREAM_EOF = "EOF" DELIMITER = ":" DROP = "U+005C__DROP__" -# Watermark are at millisecond granularity, hence we use epoch(0) - 1 -# to indicate watermark is not available. -# EVENT_TIME_FOR_DROP is used to indicate that the message is dropped, -# hence excluded from watermark calculation. -EVENT_TIME_FOR_DROP = datetime(1970, 1, 1, tzinfo=timezone.utc) - timedelta(milliseconds=1) diff --git a/pynumaflow/sourcetransformer/__init__.py b/pynumaflow/sourcetransformer/__init__.py index 13bd5fc4..4708603d 100644 --- a/pynumaflow/sourcetransformer/__init__.py +++ b/pynumaflow/sourcetransformer/__init__.py @@ -1,4 +1,4 @@ -from pynumaflow.sourcetransformer._dtypes import Message, Messages, Datum, DROP, EVENT_TIME_FOR_DROP +from pynumaflow.sourcetransformer._dtypes import Message, Messages, Datum, DROP from pynumaflow.sourcetransformer.multiproc_server import MultiProcSourceTransformer from pynumaflow.sourcetransformer.server import SourceTransformer @@ -7,7 +7,6 @@ "Messages", "Datum", "DROP", - "EVENT_TIME_FOR_DROP", "SourceTransformer", "MultiProcSourceTransformer", ] diff --git a/pynumaflow/sourcetransformer/_dtypes.py b/pynumaflow/sourcetransformer/_dtypes.py index 24e8f79f..b3242cd2 100644 --- a/pynumaflow/sourcetransformer/_dtypes.py +++ b/pynumaflow/sourcetransformer/_dtypes.py @@ -4,7 +4,7 @@ from typing import TypeVar, Callable from warnings import warn -from pynumaflow._constants import DROP, EVENT_TIME_FOR_DROP +from pynumaflow._constants import DROP M = TypeVar("M", bound="Message") Ms = TypeVar("Ms", bound="Messages") @@ -43,8 +43,8 @@ def __init__( self._value = value or b"" @classmethod - def to_drop(cls: type[M]) -> M: - return cls(b"", EVENT_TIME_FOR_DROP, None, [DROP]) + def to_drop(cls: type[M], event_time: datetime) -> M: + return cls(b"", event_time, None, [DROP]) @property def event_time(self) -> datetime: diff --git a/tests/sourcetransform/test_messages.py b/tests/sourcetransform/test_messages.py index 34d7af08..8f2caf46 100644 --- a/tests/sourcetransform/test_messages.py +++ b/tests/sourcetransform/test_messages.py @@ -1,7 +1,7 @@ import unittest from datetime import datetime, timezone -from pynumaflow.sourcetransformer import Messages, Message, DROP, EVENT_TIME_FOR_DROP +from pynumaflow.sourcetransformer import Messages, Message, DROP def mock_message_t(): @@ -35,9 +35,9 @@ def test_message_to_drop(self): "Keys": [], "Value": b"", "Tags": [DROP], - "EventTime": EVENT_TIME_FOR_DROP, + "EventTime": mock_event_time(), } - msgt = Message(b"", mock_event_time()).to_drop() + msgt = Message(b"", datetime(1, 1, 1, 0, 0)).to_drop(mock_event_time()) self.assertEqual(Message, type(msgt)) self.assertEqual(mock_obj["Keys"], msgt.keys) self.assertEqual(mock_obj["Value"], msgt.value)