Skip to content

Commit

Permalink
Add a close method to the Receiver interface
Browse files Browse the repository at this point in the history
Also implement the method in all classes implementing the `Receiver`
interface.

Signed-off-by: Sahas Subramanian <[email protected]>
  • Loading branch information
shsms committed Nov 29, 2024
1 parent 4bc8a3a commit b66e09b
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ def __init__(self, channel: Anycast[_T], /) -> None:
self._channel: Anycast[_T] = channel
"""The channel that this receiver belongs to."""

self._closed: bool = False
"""Whether the receiver is closed."""

self._next: _T | type[_Empty] = _Empty

@override
Expand Down Expand Up @@ -436,6 +439,9 @@ def consume(self) -> _T:
):
raise ReceiverStoppedError(self) from ChannelClosedError(self._channel)

if self._next is _Empty and self._closed:
raise ReceiverStoppedError(self)

assert (
self._next is not _Empty
), "`consume()` must be preceded by a call to `ready()`"
Expand All @@ -446,6 +452,14 @@ def consume(self) -> _T:

return next_val

@override
def close(self) -> None:
"""Close this receiver.
After closing, the receiver will not be able to receive any more messages.
"""
self._closed = True

def __str__(self) -> str:
"""Return a string representation of this receiver."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
16 changes: 16 additions & 0 deletions src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ def __init__(
self._q: deque[_T] = deque(maxlen=limit)
"""The receiver's internal message queue."""

self._closed: bool = False
"""Whether the receiver is closed."""

def enqueue(self, message: _T, /) -> None:
"""Put a message into this receiver's queue.
Expand Down Expand Up @@ -474,6 +477,19 @@ def consume(self) -> _T:
assert self._q, "`consume()` must be preceded by a call to `ready()`"
return self._q.popleft()

@override
def close(self) -> None:
"""Close the receiver.
After calling this method, new messages will not be received. Once the
receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""
self._closed = True
self._channel._receivers.pop( # pylint: disable=protected-access
hash(self), None
)

def __str__(self) -> str:
"""Return a string representation of this receiver."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
15 changes: 15 additions & 0 deletions src/frequenz/channels/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,21 @@ def consume(self) -> ReceiverMessageT_co:

return self._results.popleft()

@override
def close(self) -> None:
"""Close the receiver.
After calling this method, new messages will not be received. Once the
receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""
for task in self._pending:
if not task.done() and task.get_loop().is_running():
task.cancel()
self._pending = set()
for recv in self._receivers.values():
recv.close()

def __str__(self) -> str:
"""Return a string representation of this receiver."""
if len(self._receivers) > 3:
Expand Down
29 changes: 29 additions & 0 deletions src/frequenz/channels/_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,15 @@ def consume(self) -> ReceiverMessageT_co:
ReceiverError: If there is some problem with the receiver.
"""

@abstractmethod
def close(self) -> None:
"""Close the receiver.
After calling this method, new messages will not be received. Once the
receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""

def __aiter__(self) -> Self:
"""Get an async iterator over the received messages.
Expand Down Expand Up @@ -464,6 +473,16 @@ def consume(self) -> MappedMessageT_co: # noqa: DOC502
"""
return self._mapping_function(self._receiver.consume())

@override
def close(self) -> None:
"""Close the receiver.
After calling this method, new messages will not be received. Once the
receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""
self._receiver.close()

def __str__(self) -> str:
"""Return a string representation of the mapper."""
return f"{type(self).__name__}:{self._receiver}:{self._mapping_function}"
Expand Down Expand Up @@ -553,6 +572,16 @@ def consume(self) -> ReceiverMessageT_co:
self._next_message = _SENTINEL
return message

@override
def close(self) -> None:
"""Close the receiver.
After calling this method, new messages will not be received. Once the
receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""
self._receiver.close()

def __str__(self) -> str:
"""Return a string representation of the filter."""
return f"{type(self).__name__}:{self._receiver}:{self._filter_function}"
Expand Down
5 changes: 5 additions & 0 deletions src/frequenz/channels/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ def consume(self) -> None:
self._is_set = False
self._event.clear()

@override
def close(self) -> None:
"""Close this receiver."""
self.stop()

def __str__(self) -> str:
"""Return a string representation of this event."""
return f"{type(self).__name__}({self._name!r})"
Expand Down
5 changes: 5 additions & 0 deletions src/frequenz/channels/file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ def consume(self) -> Event:
change, path_str = self._changes.pop()
return Event(type=EventType(change), path=pathlib.Path(path_str))

@override
def close(self) -> None:
"""Close this receiver."""
self._stop_event.set()

def __str__(self) -> str:
"""Return a string representation of this receiver."""
if len(self._paths) > 3:
Expand Down
5 changes: 5 additions & 0 deletions src/frequenz/channels/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,11 @@ def consume(self) -> timedelta:
self._current_drift = None
return drift

@override
def close(self) -> None:
"""Close the timer."""
self.stop()

def _now(self) -> int:
"""Return the current monotonic clock time in microseconds.
Expand Down

0 comments on commit b66e09b

Please sign in to comment.