Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Receiver.close() method #348

Merged
merged 4 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ def __init__(self, channel: Anycast[_T], /) -> None:
self._channel: Anycast[_T] = channel
"""The channel that this receiver belongs to."""

self._closed: bool = False
"""Whether the receiver is closed."""

self._next: _T | type[_Empty] = _Empty

@override
Expand All @@ -409,6 +412,9 @@ async def ready(self) -> bool:
if self._next is not _Empty:
return True

if self._closed:
return False

# pylint: disable=protected-access
while len(self._channel._deque) == 0:
if self._channel._closed:
Expand Down Expand Up @@ -436,6 +442,9 @@ def consume(self) -> _T:
):
raise ReceiverStoppedError(self) from ChannelClosedError(self._channel)

if self._next is _Empty and self._closed:
raise ReceiverStoppedError(self)

assert (
self._next is not _Empty
), "`consume()` must be preceded by a call to `ready()`"
Expand All @@ -446,6 +455,14 @@ def consume(self) -> _T:

return next_val

@override
def close(self) -> None:
"""Close this receiver.

After closing, the receiver will not be able to receive any more messages.
"""
self._closed = True
llucax marked this conversation as resolved.
Show resolved Hide resolved

def __str__(self) -> str:
"""Return a string representation of this receiver."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
21 changes: 20 additions & 1 deletion src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ def __init__(
self._q: deque[_T] = deque(maxlen=limit)
"""The receiver's internal message queue."""

self._closed: bool = False
"""Whether the receiver is closed."""

def enqueue(self, message: _T, /) -> None:
"""Put a message into this receiver's queue.

Expand Down Expand Up @@ -466,7 +469,7 @@ async def ready(self) -> bool:
# consumed, then we return immediately.
# pylint: disable=protected-access
while len(self._q) == 0:
if self._channel._closed:
if self._channel._closed or self._closed:
return False
async with self._channel._recv_cv:
await self._channel._recv_cv.wait()
Expand All @@ -486,9 +489,25 @@ def consume(self) -> _T:
if not self._q and self._channel._closed: # pylint: disable=protected-access
raise ReceiverStoppedError(self) from ChannelClosedError(self._channel)

if self._closed:
raise ReceiverStoppedError(self)

assert self._q, "`consume()` must be preceded by a call to `ready()`"
return self._q.popleft()

@override
def close(self) -> None:
"""Close the receiver.

After calling this method, new messages will not be received. Once the
receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""
self._closed = True
self._channel._receivers.pop( # pylint: disable=protected-access
hash(self), None
)

def __str__(self) -> str:
"""Return a string representation of this receiver."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
15 changes: 15 additions & 0 deletions src/frequenz/channels/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,21 @@ def consume(self) -> ReceiverMessageT_co:

return self._results.popleft()

@override
def close(self) -> None:
"""Close the receiver.

After calling this method, new messages will not be received. Once the
receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""
for task in self._pending:
if not task.done() and task.get_loop().is_running():
llucax marked this conversation as resolved.
Show resolved Hide resolved
task.cancel()
self._pending = set()
for recv in self._receivers.values():
recv.close()

def __str__(self) -> str:
"""Return a string representation of this receiver."""
if len(self._receivers) > 3:
Expand Down
29 changes: 29 additions & 0 deletions src/frequenz/channels/_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,15 @@ def consume(self) -> ReceiverMessageT_co:
ReceiverError: If there is some problem with the receiver.
"""

def close(self) -> None:
"""Close the receiver.

After calling this method, new messages will not be available from the receiver.
Once the receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""
raise NotImplementedError("close() must be implemented by subclasses")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't forget:


def __aiter__(self) -> Self:
"""Get an async iterator over the received messages.

Expand Down Expand Up @@ -464,6 +473,16 @@ def consume(self) -> MappedMessageT_co: # noqa: DOC502
"""
return self._mapping_function(self._receiver.consume())

@override
def close(self) -> None:
"""Close the receiver.

After calling this method, new messages will not be received. Once the
receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""
self._receiver.close()

def __str__(self) -> str:
"""Return a string representation of the mapper."""
return f"{type(self).__name__}:{self._receiver}:{self._mapping_function}"
Expand Down Expand Up @@ -553,6 +572,16 @@ def consume(self) -> ReceiverMessageT_co:
self._next_message = _SENTINEL
return message

@override
def close(self) -> None:
"""Close the receiver.

After calling this method, new messages will not be received. Once the
receiver's buffer is drained, trying to receive a message will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
"""
self._receiver.close()

def __str__(self) -> str:
"""Return a string representation of the filter."""
return f"{type(self).__name__}:{self._receiver}:{self._filter_function}"
Expand Down
5 changes: 5 additions & 0 deletions src/frequenz/channels/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ def consume(self) -> None:
self._is_set = False
self._event.clear()

@override
def close(self) -> None:
"""Close this receiver."""
self.stop()

def __str__(self) -> str:
"""Return a string representation of this event."""
return f"{type(self).__name__}({self._name!r})"
Expand Down
5 changes: 5 additions & 0 deletions src/frequenz/channels/file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ def consume(self) -> Event:
change, path_str = self._changes.pop()
return Event(type=EventType(change), path=pathlib.Path(path_str))

@override
def close(self) -> None:
"""Close this receiver."""
self._stop_event.set()
llucax marked this conversation as resolved.
Show resolved Hide resolved

def __str__(self) -> str:
"""Return a string representation of this receiver."""
if len(self._paths) > 3:
Expand Down
5 changes: 5 additions & 0 deletions src/frequenz/channels/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,11 @@ def consume(self) -> timedelta:
self._current_drift = None
return drift

@override
def close(self) -> None:
"""Close the timer."""
self.stop()

def _now(self) -> int:
"""Return the current monotonic clock time in microseconds.

Expand Down