From b66e09b08539f06647c363ff15e6c8b467b05442 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Fri, 29 Nov 2024 15:54:43 +0100 Subject: [PATCH] Add a `close` method to the `Receiver` interface Also implement the method in all classes implementing the `Receiver` interface. Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_anycast.py | 14 +++++++++++++ src/frequenz/channels/_broadcast.py | 16 +++++++++++++++ src/frequenz/channels/_merge.py | 15 ++++++++++++++ src/frequenz/channels/_receiver.py | 29 +++++++++++++++++++++++++++ src/frequenz/channels/event.py | 5 +++++ src/frequenz/channels/file_watcher.py | 5 +++++ src/frequenz/channels/timer.py | 5 +++++ 7 files changed, 89 insertions(+) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 066d5e9b..63f3e029 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -391,6 +391,9 @@ def __init__(self, channel: Anycast[_T], /) -> None: self._channel: Anycast[_T] = channel """The channel that this receiver belongs to.""" + self._closed: bool = False + """Whether the receiver is closed.""" + self._next: _T | type[_Empty] = _Empty @override @@ -436,6 +439,9 @@ def consume(self) -> _T: ): raise ReceiverStoppedError(self) from ChannelClosedError(self._channel) + if self._next is _Empty and self._closed: + raise ReceiverStoppedError(self) + assert ( self._next is not _Empty ), "`consume()` must be preceded by a call to `ready()`" @@ -446,6 +452,14 @@ def consume(self) -> _T: return next_val + @override + def close(self) -> None: + """Close this receiver. + + After closing, the receiver will not be able to receive any more messages. + """ + self._closed = True + def __str__(self) -> str: """Return a string representation of this receiver.""" return f"{self._channel}:{type(self).__name__}" diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 3956aff6..56421fb0 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -403,6 +403,9 @@ def __init__( self._q: deque[_T] = deque(maxlen=limit) """The receiver's internal message queue.""" + self._closed: bool = False + """Whether the receiver is closed.""" + def enqueue(self, message: _T, /) -> None: """Put a message into this receiver's queue. @@ -474,6 +477,19 @@ def consume(self) -> _T: assert self._q, "`consume()` must be preceded by a call to `ready()`" return self._q.popleft() + @override + def close(self) -> None: + """Close the receiver. + + After calling this method, new messages will not be received. Once the + receiver's buffer is drained, trying to receive a message will raise a + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. + """ + self._closed = True + self._channel._receivers.pop( # pylint: disable=protected-access + hash(self), None + ) + def __str__(self) -> str: """Return a string representation of this receiver.""" return f"{self._channel}:{type(self).__name__}" diff --git a/src/frequenz/channels/_merge.py b/src/frequenz/channels/_merge.py index 479e57ae..b1f38857 100644 --- a/src/frequenz/channels/_merge.py +++ b/src/frequenz/channels/_merge.py @@ -191,6 +191,21 @@ def consume(self) -> ReceiverMessageT_co: return self._results.popleft() + @override + def close(self) -> None: + """Close the receiver. + + After calling this method, new messages will not be received. Once the + receiver's buffer is drained, trying to receive a message will raise a + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. + """ + for task in self._pending: + if not task.done() and task.get_loop().is_running(): + task.cancel() + self._pending = set() + for recv in self._receivers.values(): + recv.close() + def __str__(self) -> str: """Return a string representation of this receiver.""" if len(self._receivers) > 3: diff --git a/src/frequenz/channels/_receiver.py b/src/frequenz/channels/_receiver.py index e5715f6a..d5a81eaa 100644 --- a/src/frequenz/channels/_receiver.py +++ b/src/frequenz/channels/_receiver.py @@ -217,6 +217,15 @@ def consume(self) -> ReceiverMessageT_co: ReceiverError: If there is some problem with the receiver. """ + @abstractmethod + def close(self) -> None: + """Close the receiver. + + After calling this method, new messages will not be received. Once the + receiver's buffer is drained, trying to receive a message will raise a + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. + """ + def __aiter__(self) -> Self: """Get an async iterator over the received messages. @@ -464,6 +473,16 @@ def consume(self) -> MappedMessageT_co: # noqa: DOC502 """ return self._mapping_function(self._receiver.consume()) + @override + def close(self) -> None: + """Close the receiver. + + After calling this method, new messages will not be received. Once the + receiver's buffer is drained, trying to receive a message will raise a + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. + """ + self._receiver.close() + def __str__(self) -> str: """Return a string representation of the mapper.""" return f"{type(self).__name__}:{self._receiver}:{self._mapping_function}" @@ -553,6 +572,16 @@ def consume(self) -> ReceiverMessageT_co: self._next_message = _SENTINEL return message + @override + def close(self) -> None: + """Close the receiver. + + After calling this method, new messages will not be received. Once the + receiver's buffer is drained, trying to receive a message will raise a + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. + """ + self._receiver.close() + def __str__(self) -> str: """Return a string representation of the filter.""" return f"{type(self).__name__}:{self._receiver}:{self._filter_function}" diff --git a/src/frequenz/channels/event.py b/src/frequenz/channels/event.py index 9391c694..5d1bd425 100644 --- a/src/frequenz/channels/event.py +++ b/src/frequenz/channels/event.py @@ -172,6 +172,11 @@ def consume(self) -> None: self._is_set = False self._event.clear() + @override + def close(self) -> None: + """Close this receiver.""" + self.stop() + def __str__(self) -> str: """Return a string representation of this event.""" return f"{type(self).__name__}({self._name!r})" diff --git a/src/frequenz/channels/file_watcher.py b/src/frequenz/channels/file_watcher.py index ac66e95e..e9ff4ca4 100644 --- a/src/frequenz/channels/file_watcher.py +++ b/src/frequenz/channels/file_watcher.py @@ -232,6 +232,11 @@ def consume(self) -> Event: change, path_str = self._changes.pop() return Event(type=EventType(change), path=pathlib.Path(path_str)) + @override + def close(self) -> None: + """Close this receiver.""" + self._stop_event.set() + def __str__(self) -> str: """Return a string representation of this receiver.""" if len(self._paths) > 3: diff --git a/src/frequenz/channels/timer.py b/src/frequenz/channels/timer.py index 24531c93..998430e9 100644 --- a/src/frequenz/channels/timer.py +++ b/src/frequenz/channels/timer.py @@ -745,6 +745,11 @@ def consume(self) -> timedelta: self._current_drift = None return drift + @override + def close(self) -> None: + """Close the timer.""" + self.stop() + def _now(self) -> int: """Return the current monotonic clock time in microseconds.