Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Receiver.close() method #348

Open
wants to merge 5 commits into
base: v1.x.x
Choose a base branch
from

Conversation

shsms
Copy link
Contributor

@shsms shsms commented Nov 29, 2024

This method would allow individual receivers to be closed, without affecting the underlying channel, if there is one.

@shsms shsms requested a review from a team as a code owner November 29, 2024 14:57
@github-actions github-actions bot added part:channels Affects channels implementation part:synchronization Affects the synchronization of multiple sources (`select`, `merge`) part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:utilities Affects the utility receivers (`Timer`, `Event`, `FileWatcher`) part:experimental Affects the experimental package labels Nov 29, 2024
@shsms
Copy link
Contributor Author

shsms commented Nov 29, 2024

Still need to add tests.

Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but the CI is failing, and this reminds me that it would be good to add tests at least for the receivers that actually do something when close() is called.

And oh, yes! As usual names, names, names 😆

TL;DR: All good the name actually, just some other comments about future directions.

I was going to suggest to rename close() to stop(), for once to match the ReceiverStoppedError but also to match the upcoming Service interface.

But I see close actually wait for the current buffer to be consumed, my idea for stop() would be that stuff gets just dropped.

Maybe to make it play better with Service we can keep close() as you implemented it, as a "graceful" shutdown, and when we introduce Service we can use cancel() to drop the current buffers. Then maybe stop() can take an option to decide if close() or cancel() is called (stop is usually implemented as self.cancel(); await self).

src/frequenz/channels/_merge.py Show resolved Hide resolved
@shsms
Copy link
Contributor Author

shsms commented Dec 2, 2024

LGTM but the CI is failing, and this reminds me that it would be good to add tests at least for the receivers that actually do something when close() is called.

Yes, I'm already working on tests in this same branch, just made an early PR because the code is ready for review.

Maybe to make it play better with Service we can keep close() as you implemented it, as a "graceful" shutdown, and when we introduce Service we can use cancel() to drop the current buffers. Then maybe stop() can take an option to decide if close() or cancel() is called (stop is usually implemented as self.cancel(); await self).

I think stop() doesn't make sense for receivers, because it is not a task. Idiomatically, we should have stop() only for things that look like there must be an activity there. So it is great for Actor, etc. But for a stream, I like close() better.

@llucax
Copy link
Contributor

llucax commented Dec 3, 2024

I think stop() doesn't make sense for receivers, because it is not a task. Idiomatically, we should have stop() only for things that look like there must be an activity there. So it is great for Actor, etc. But for a stream, I like close() better.

Mmm, receiver don't need to have task, but many do (like Merger, Timer and FileWatcher). If we don't add a task-like interface, it means we can't stop these receivers using the Receiver interface, which is the only way to access them if we apply a filter or map to them for example.

But maybe there are other solutions to this. We can discuss this when the time comes.

@llucax
Copy link
Contributor

llucax commented Dec 3, 2024

Please let me know when this is ready for a final review :)

@shsms
Copy link
Contributor Author

shsms commented Dec 3, 2024

Mmm, receiver don't need to have task, but many do (like Merger, Timer and FileWatcher). If we don't add a task-like interface, it means we can't stop these receivers using the Receiver interface, which is the only way to access them if we apply a filter or map to them for example.

That doesn't sound like an issue, because people still call close() on the receiver, and if the underlying thing is a FileWatcher for example, the close() would just call the stop(), which is what this PR is doing already.

And calling close() on Map and Filter would call close() of the underlying receiver. This PR has that too.

@llucax
Copy link
Contributor

llucax commented Dec 3, 2024

That doesn't sound like an issue, because people still call close() on the receiver, and if the underlying thing is a FileWatcher for example, the close() would just call the stop(), which is what this PR is doing already.

But stop() is async, and it is important to be able to wait until the thing actually stopped to avoid those annoying warnings in test we were talking about 😆

If we make close() async, then fine, but then we are ACKing that receivers can have background running tasks, and then why shouldn't they follow the Service interface?

shsms added 5 commits January 6, 2025 14:23
Classes that implement the `Sender` or `Receiver` interfaces
currently need to override the `send` or the `ready` and `consume`
methods respectively.

As we add more methods, to these interfaces, it becomes hard to track
which methods are there for internal use and which ones are there to
implement the interface.

Using the `override` decorators helps with that.

Signed-off-by: Sahas Subramanian <[email protected]>
Also implement the method in all classes implementing the `Receiver`
interface.

Signed-off-by: Sahas Subramanian <[email protected]>
This makes it consistent with the channel close method, which is
async.  It also makes it easy to fully stop any pending tasks, as
is the case with `merge`.

Signed-off-by: Sahas Subramanian <[email protected]>
Signed-off-by: Sahas Subramanian <[email protected]>
@github-actions github-actions bot added part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests labels Jan 7, 2025
@shsms
Copy link
Contributor Author

shsms commented Jan 7, 2025

This is ready, the only big change is that Receiver.close() is now async.

@shsms shsms requested a review from llucax January 7, 2025 10:39
@shsms shsms modified the milestones: v1.5.0, v1.6.0 Jan 8, 2025
@llucax
Copy link
Contributor

llucax commented Jan 9, 2025

Last push to have a (Background)Service-compatible interface.

If we make close() async, then fine, but then we are ACKing that receivers can have background running tasks, and then why shouldn't they follow the Service interface?

@llucax
Copy link
Contributor

llucax commented Jan 9, 2025

BTW, the method can't be abstract, otherwise this is a breaking change. We need to provide a default empty implementation I guess. We can create an issue to make the method abstract for 2.0.

Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't review everything, because I think the review will change a lot depending on what we do with close vs stop() (stop() being close()/cancel() + __await__).

cancel() and __await__ are actually the 2 primitives used by (Background)Service too, based on Task, and the more I look at it, the more I think receivers should follow this interface too.

@override
async def close(self) -> None:
"""Close this receiver."""
self._stop_event.set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably do a await self._awatch.aclose() here to make sure all the threads from the awatch are stopped too. Maybe even mark the receiver somehow as stopped so ready() doesn ´t try to use self._awatch again? Not sure what would happen otherwise, what kind of exception would be raised.


After closing, the receiver will not be able to receive any more messages.
"""
self._closed = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we should also break the ready() if it is waiting for the condition variable. I guess this would mean adding a an event and then async.wait() on the first one to complete in ready()?

I guess this is actually the difference between closing and stopping. Close is basically analogous to Task.cancel(), it is like it schedules the stopping. And it is actually probably a good idea to have it separated, but we are still missing the await then, making stop() async but then not actually awaiting for anything seems weird to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
part:channels Affects channels implementation part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:docs Affects the documentation part:experimental Affects the experimental package part:synchronization Affects the synchronization of multiple sources (`select`, `merge`) part:tests Affects the unit, integration and performance (benchmarks) tests part:utilities Affects the utility receivers (`Timer`, `Event`, `FileWatcher`)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants