Skip to content

Commit

Permalink
fix: require event time when dropping messages (#123)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Dec 5, 2023
1 parent 692071e commit 3e3b25f
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 18 deletions.
7 changes: 4 additions & 3 deletions examples/sourcetransform/event_time_filter/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
"""
This is a simple User Defined Function example which receives a message, applies the following
data transformation, and returns the message.
If the message event time is before year 2022, drop the message. If it's within year 2022, update
the tag to "within_year_2022" and update the message event time to Jan 1st 2022.
If the message event time is before year 2022, drop the message with event time unchanged.
If it's within year 2022, update the tag to "within_year_2022" and
update the message event time to Jan 1st 2022.
Otherwise, (exclusively after year 2022), update the tag to "after_year_2022" and update the
message event time to Jan 1st 2023.
"""
Expand All @@ -23,7 +24,7 @@ def my_handler(keys: list[str], datum: Datum) -> Messages:

if event_time < january_first_2022:
logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
messages.append(Message.to_drop())
messages.append(Message.to_drop(event_time))
elif event_time < january_first_2023:
logging.info(
"Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
Expand Down
7 changes: 0 additions & 7 deletions pynumaflow/_constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from datetime import datetime, timezone, timedelta

MAP_SOCK_PATH = "/var/run/numaflow/map.sock"
MAP_STREAM_SOCK_PATH = "/var/run/numaflow/mapstream.sock"
REDUCE_SOCK_PATH = "/var/run/numaflow/reduce.sock"
Expand All @@ -19,8 +17,3 @@
STREAM_EOF = "EOF"
DELIMITER = ":"
DROP = "U+005C__DROP__"
# Watermark are at millisecond granularity, hence we use epoch(0) - 1
# to indicate watermark is not available.
# EVENT_TIME_FOR_DROP is used to indicate that the message is dropped,
# hence excluded from watermark calculation.
EVENT_TIME_FOR_DROP = datetime(1970, 1, 1, tzinfo=timezone.utc) - timedelta(milliseconds=1)
3 changes: 1 addition & 2 deletions pynumaflow/sourcetransformer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pynumaflow.sourcetransformer._dtypes import Message, Messages, Datum, DROP, EVENT_TIME_FOR_DROP
from pynumaflow.sourcetransformer._dtypes import Message, Messages, Datum, DROP
from pynumaflow.sourcetransformer.multiproc_server import MultiProcSourceTransformer
from pynumaflow.sourcetransformer.server import SourceTransformer

Expand All @@ -7,7 +7,6 @@
"Messages",
"Datum",
"DROP",
"EVENT_TIME_FOR_DROP",
"SourceTransformer",
"MultiProcSourceTransformer",
]
6 changes: 3 additions & 3 deletions pynumaflow/sourcetransformer/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import TypeVar, Callable
from warnings import warn

from pynumaflow._constants import DROP, EVENT_TIME_FOR_DROP
from pynumaflow._constants import DROP

M = TypeVar("M", bound="Message")
Ms = TypeVar("Ms", bound="Messages")
Expand Down Expand Up @@ -43,8 +43,8 @@ def __init__(
self._value = value or b""

@classmethod
def to_drop(cls: type[M]) -> M:
return cls(b"", EVENT_TIME_FOR_DROP, None, [DROP])
def to_drop(cls: type[M], event_time: datetime) -> M:
return cls(b"", event_time, None, [DROP])

@property
def event_time(self) -> datetime:
Expand Down
6 changes: 3 additions & 3 deletions tests/sourcetransform/test_messages.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
from datetime import datetime, timezone

from pynumaflow.sourcetransformer import Messages, Message, DROP, EVENT_TIME_FOR_DROP
from pynumaflow.sourcetransformer import Messages, Message, DROP


def mock_message_t():
Expand Down Expand Up @@ -35,9 +35,9 @@ def test_message_to_drop(self):
"Keys": [],
"Value": b"",
"Tags": [DROP],
"EventTime": EVENT_TIME_FOR_DROP,
"EventTime": mock_event_time(),
}
msgt = Message(b"", mock_event_time()).to_drop()
msgt = Message(b"", datetime(1, 1, 1, 0, 0)).to_drop(mock_event_time())
self.assertEqual(Message, type(msgt))
self.assertEqual(mock_obj["Keys"], msgt.keys)
self.assertEqual(mock_obj["Value"], msgt.value)
Expand Down

0 comments on commit 3e3b25f

Please sign in to comment.