diff --git a/CHANGELOG.md b/CHANGELOG.md index 39f2a5c0..1217868a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- `pw.temporal.inactivity_detection` and `pw.temporal.utc_now` functions allowing for alerting and other time dependent usecases + ### Changed - `pw.Table.concat`, `pw.Table.with_id`, `pw.Table.with_id_from` no longer perform checks if ids are unique. It improves memory usage. - table operations that store values (like `pw.Table.join`, `pw.Table.update_cells`) no longer store columns that are not used downstream. diff --git a/python/pathway/stdlib/temporal/__init__.py b/python/pathway/stdlib/temporal/__init__.py index 18b53640..ebe93b69 100644 --- a/python/pathway/stdlib/temporal/__init__.py +++ b/python/pathway/stdlib/temporal/__init__.py @@ -40,6 +40,7 @@ common_behavior, exactly_once_behavior, ) +from .time_utils import inactivity_detection, utc_now __all__ = [ "AsofJoinResult", @@ -76,4 +77,6 @@ "CommonBehavior", "ExactlyOnceBehavior", "exactly_once_behavior", + "utc_now", + "inactivity_detection", ] diff --git a/python/pathway/stdlib/temporal/time_utils.py b/python/pathway/stdlib/temporal/time_utils.py new file mode 100644 index 00000000..0bee113f --- /dev/null +++ b/python/pathway/stdlib/temporal/time_utils.py @@ -0,0 +1,125 @@ +# Copyright © 2024 Pathway + +import datetime +import time +from functools import cache + +import pathway.internals as pw +from pathway import io + + +class TimestampSchema(pw.Schema): + timestamp_utc: pw.DateTimeUtc + + +class TimestampSubject(io.python.ConnectorSubject): + _refresh_rate: datetime.timedelta + + def __init__(self, refresh_rate: datetime.timedelta) -> None: + super().__init__() + self._refresh_rate = refresh_rate + + def run(self) -> None: + while True: + now_utc = datetime.datetime.now(datetime.timezone.utc) + self.next(timestamp_utc=now_utc) + self.commit() + time.sleep(self._refresh_rate.total_seconds()) + + +@cache +def utc_now(refresh_rate: datetime.timedelta = datetime.timedelta(seconds=60)): + """ + Provides a continuously updating stream of the current UTC time. + + This function generates a real-time feed of the current UTC timestamp, refreshing + at a specified interval. + + Args: + refresh_rate: The interval at which the current + UTC time is refreshed. Defaults to 60 seconds. + + Returns: + A table containing a stream of the current UTC timestamps, updated + according to the specified refresh rate. + """ + return io.python.read( + TimestampSubject(refresh_rate=refresh_rate), + schema=TimestampSchema, + ) + + +def inactivity_detection( + event_time_column: pw.ColumnReference, + allowed_inactivity_period: pw.Duration, + refresh_rate: pw.Duration = pw.Duration(seconds=1), + instance: pw.ColumnReference | None = None, +) -> tuple[pw.Table, pw.Table]: + """Detects periods of inactivity in a given table and identifies when activity resumes. + + This function monitors a stream of events defined by a timestamp column and detects + inactivity periods that exceed a specified threshold. Additionally, it identifies + the first event that resumes activity after each period of inactivity. + + Note: Assumes that the ingested timestamps (event_time_column) follow current UTC time + and that the latency of the system is negligible compared to the `allowed_inactivity_period`. + + Args: + event_time_column: A reference to the column containing + UTC timestamps of events in the monitored table. + allowed_inactivity_period: The maximum allowed period of + inactivity. If no events occur within this duration, an inactivity + period is flagged. + refresh_rate: The frequency at which the current time + is refreshed for inactivity detection. Defaults to 1 second. + instance: + The inactivity periods are computed separately per each `instance` value + + Returns: + Tuple of tables: + - **inactivities** (`pw.Table`): A table containing timestamps (`inactive_t`) + where periods of inactivity begin (i.e., the last timestamp before inactivity + was detected). + - **resumed_activities** (`pw.Table`): A table containing the earliest + timestamps (`resumed_t`) of resumed activity following each period + of inactivity. + """ + + events_t = event_time_column.table.select(t=event_time_column, instance=instance) + + now_t = utc_now(refresh_rate=refresh_rate) + latest_t = ( + events_t.groupby(pw.this.instance) + .reduce(pw.this.instance, latest_t=pw.reducers.max(pw.this.t)) + .filter( + pw.this.latest_t > datetime.datetime.now(datetime.timezone.utc) + ) # filter to avoid alerts during backfilling + ) + inactivities = ( + now_t.asof_now_join(latest_t) + .select(pw.left.timestamp_utc, pw.right.instance, pw.right.latest_t) + .filter(pw.this.latest_t + allowed_inactivity_period < pw.this.timestamp_utc) + .groupby( + pw.this.latest_t, pw.this.instance + ) # todo: memoryless alert deduplication + .reduce(pw.this.latest_t, pw.this.instance) + .select(instance=pw.this.instance, inactive_t=pw.this.latest_t) + ) + + latest_inactivity = inactivities.groupby(pw.this.instance).reduce( + pw.this.instance, inactive_t=pw.reducers.latest(pw.this.inactive_t) + ) + resumed_activities = ( + events_t.asof_now_join( + latest_inactivity, events_t.instance == latest_inactivity.instance + ) + .select(pw.left.t, pw.left.instance, pw.right.inactive_t) + .groupby( + pw.this.inactive_t, pw.this.instance + ) # todo: memoryless alert deduplication + .reduce(pw.this.instance, resumed_t=pw.reducers.min(pw.this.t)) + ) + if instance is None: + inactivities = inactivities.without(pw.this.instance) + resumed_activities = resumed_activities.without(pw.this.instance) + return inactivities, resumed_activities diff --git a/python/pathway/tests/temporal/test_time_utils.py b/python/pathway/tests/temporal/test_time_utils.py new file mode 100644 index 00000000..73cab51f --- /dev/null +++ b/python/pathway/tests/temporal/test_time_utils.py @@ -0,0 +1,140 @@ +# Copyright © 2024 Pathway + +from __future__ import annotations + +import datetime +from unittest.mock import patch + +import pathway as pw +from pathway.tests.utils import T, assert_stream_equality_wo_index + + +@patch("pathway.stdlib.temporal.time_utils.utc_now") +def test_inactivity_detection_instance(utc_now_mock): + now = datetime.datetime.now(datetime.timezone.utc) + now_ms = int((int(now.timestamp() * 1000) // 1000) * 1000) + 100000 + events = T( + f""" + | t | instance | __time__ + 1 | {now_ms} | A | {now_ms} + 2 | {now_ms+50} | A | {now_ms+50} + 3 | {now_ms+150} | A | {now_ms+150} + 4 | {now_ms+200} | A | {now_ms+200} + 5 | {now_ms+900} | A | {now_ms+900} + 6 | {now_ms+1000} | A | {now_ms+1000} + 7 | {now_ms} | B | {now_ms} + 8 | {now_ms+200} | B | {now_ms+200} + 9 | {now_ms+400} | B | {now_ms+400} + 10 | {now_ms+1000} | B | {now_ms+1000} + + + + """ + ).with_columns(t=pw.this.t.dt.utc_from_timestamp(unit="ms")) + + utc_now_mock.side_effect = lambda refresh_rate: pw.debug.table_from_rows( + pw.schema_from_types(t=int), + [ + (time_ms, time_ms, 1) + for time_ms in range( + now_ms, now_ms + 1400, int(refresh_rate.total_seconds() * 1000) + ) + ], + is_stream=True, + ).select(timestamp_utc=pw.this.t.dt.utc_from_timestamp(unit="ms")) + + inactivities, resumed_activities = pw.temporal.inactivity_detection( + events.t, + pw.Duration(milliseconds=300), + refresh_rate=pw.Duration(milliseconds=50), + instance=pw.this.instance, + ) + + expected_inactivities = T( + f""" + instance | inactive_t | __time__ | __diff__ + A | {now_ms+200} | {now_ms+550} | 1 + A | {now_ms+1000} | {now_ms+1350} | 1 + B | {now_ms+400} | {now_ms+750} | 1 + B | {now_ms+1000} | {now_ms+1350} | 1 + """ + ) + expected_resumes = T( + f""" + instance | resumed_t | __time__ | __diff__ + A | {now_ms+900} | {now_ms+900} | 1 + B | {now_ms+1000} | {now_ms+1000} | 1 + """ + ) + assert_stream_equality_wo_index( + ( + inactivities.with_columns( + inactive_t=pw.cast(int, pw.this.inactive_t.dt.timestamp(unit="ms")) + ), + resumed_activities.with_columns( + resumed_t=pw.cast(int, pw.this.resumed_t.dt.timestamp(unit="ms")) + ), + ), + (expected_inactivities, expected_resumes), + ) + + +@patch("pathway.stdlib.temporal.time_utils.utc_now") +def test_inactivity_detection(utc_now_mock): + now = datetime.datetime.now(datetime.timezone.utc) + now_ms = int((int(now.timestamp() * 1000) // 1000) * 1000) + 100000 + events = T( + f""" + | t | __time__ + 1 | {now_ms} | {now_ms} + 2 | {now_ms+50} | {now_ms+50} + 3 | {now_ms+150} | {now_ms+150} + 4 | {now_ms+200} | {now_ms+200} + 5 | {now_ms+900} | {now_ms+900} + 6 | {now_ms+1000} | {now_ms+1000} + + + """ + ).with_columns(t=pw.this.t.dt.utc_from_timestamp(unit="ms")) + + utc_now_mock.side_effect = lambda refresh_rate: pw.debug.table_from_rows( + pw.schema_from_types(t=int), + [ + (time_ms, time_ms, 1) + for time_ms in range( + now_ms, now_ms + 1400, int(refresh_rate.total_seconds() * 1000) + ) + ], + is_stream=True, + ).select(timestamp_utc=pw.this.t.dt.utc_from_timestamp(unit="ms")) + + inactivities, resumed_activities = pw.temporal.inactivity_detection( + events.t, + pw.Duration(milliseconds=300), + refresh_rate=pw.Duration(milliseconds=50), + ) + + expected_inactivities = T( + f""" + inactive_t | __time__ | __diff__ + {now_ms+200} | {now_ms+550} | 1 + {now_ms+1000} | {now_ms+1350} | 1 + """ + ) + expected_resumes = T( + f""" + resumed_t | __time__ | __diff__ + {now_ms+900} | {now_ms+900} | 1 + """ + ) + assert_stream_equality_wo_index( + ( + inactivities.with_columns( + inactive_t=pw.cast(int, pw.this.inactive_t.dt.timestamp(unit="ms")) + ), + resumed_activities.with_columns( + resumed_t=pw.cast(int, pw.this.resumed_t.dt.timestamp(unit="ms")) + ), + ), + (expected_inactivities, expected_resumes), + )