Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve composability of receivers #116

Open
2 tasks
llucax opened this issue Jun 9, 2023 · 7 comments
Open
2 tasks

Improve composability of receivers #116

llucax opened this issue Jun 9, 2023 · 7 comments
Labels
part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) priority:high Address this as soon as possible type:enhancement New feature or enhancement visitble to users
Milestone

Comments

@llucax
Copy link
Contributor

llucax commented Jun 9, 2023

What's needed?

It would be nice for select() to be more flexible (add and remove receivers, have optional receivers) without losing its static-ness.

Proposed solution

Improve the composability of existing receivers and create a new one:

  • Add a channel or receiver that merges messages from multiple receivers (or recycle Merger to do that) and can dynamically add and remove receivers, and just waits forever (it's never ready) when there are no receivers
  • Maybe add a wrapper on the above that only accepts 0 or 1 receivers, to implement an optional receiver

Use cases

I think many cases where users need to handle lots of tasks manually to cope with more dynamic situations, like the resampler, could be greatly simplified if they could be handled automatically by using a more flexible select.

Alternatives and workarounds

Break the select loop, use tasks manually.

Additional context

This would fix a few open issues / discussions:

@llucax llucax added type:enhancement New feature or enhancement visitble to users part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) labels Jun 9, 2023
@llucax llucax added this to the v0.17.0 milestone Jun 9, 2023
@llucax
Copy link
Contributor Author

llucax commented Jun 9, 2023

Maybe I should have made this a discussion first. If anyone sees any controversy in this, we can convert it to a discussion. If there are no big objections, we can turn the TODO list into issues as we plan to address them.

@shsms
Copy link
Contributor

shsms commented Jun 9, 2023

I think many cases where users need to handle lots of tasks manually to cope with more dynamic situations, like the resampler, could be greatly simplified if they could be handled automatically by using a more flexible select.

I guess I'm just having trouble imagining what kind of possibilities would come out of these features. It sounds like they're to deal with scenarios where channels that we read from close for some reason. Is that accurate or are you thinking of other scenarios as well?

Is the resampler the only known use-case for these features? I'm not very familiar with it, so maybe that would be just a good first test: it would be nice to see how they would simplify the resampler. Would it help eliminate the two *helper layers?

So we just make a basic interface without implementation for the features proposed in this issue, then see how it affects the resampler, just to make sure we limit feature creep.

Or maybe even a simpler version if that sounds like too much work.

@llucax
Copy link
Contributor Author

llucax commented Jun 9, 2023

It sounds like they're to deal with scenarios where channels that we read from close for some reason.

Well, continuing with the resampling actor as an use case, at some point it is very likely that we'll need to be able to unsubscribe (when we'll be able to switch peak-shaving on and off for example). Maybe is going too much into the future, and we'll still somehow keep all subscriptions forever, but if we do need to unsubscribe, we'll need to close receivers.

Is that accurate or are you thinking of other scenarios as well?

I haven't think very hardly in particular scenarios, but is just more like the vague memory of use cases I saw in the past, but thinking again I think handling subscriptions is a good example. Another scenario for stopping a receiver might be stopping a timer, and maybe you want to re-enable later. But also this more dynamic scenario when we start to compose actors and might need to handle new messages/channels when an actor is created and remove it when it is gone.

Would it help eliminate the two *helper layers?

Yes, exactly. I think with this all that complexity should be gone.

So we just make a basic interface without implementation for the features proposed in this issue, then see how it affects the resampler, just to make sure we limit feature creep.

Or maybe even a simpler version if that sounds like too much work.

Yes, I don't think is something to implement right now, I put it in v0.17.0 just because there isn't much else after select() is done (and thinking about how we could extend it with features that were in the air for a while). So I just had this idea, and thought it was pretty nice that select() wouldn't even have to be touched, so I wanted to write it down.

I agree that it would be good to test the interface in the field before going for it.

@llucax
Copy link
Contributor Author

llucax commented Nov 9, 2023

Update after a new discussion we had around this:

  • We'll remove MergeNamed (Remove MergeNamed #236) and make a wrapper merge() function for Merge and make Merge private (Expose Merge as a merge() function #237), so updating the issue to reflect this.
  • We'll evaluate what's the best construction to implement dynamic/optional receivers for select separately. Note that we should still return a full Receiver so merge() can also be used with select().

@llucax
Copy link
Contributor Author

llucax commented Jan 8, 2025

Another use case for this is actors that need to support processing dispatches conditionally and always need to use some other receiver (for example, for config updates). In this case it would be perfect if the dispatch updates receiver is optional, so we don't need to build 2 different select-loops based on if dispatch is enabled or not.

@llucax llucax added the priority:high Address this as soon as possible label Jan 8, 2025
@llucax
Copy link
Contributor Author

llucax commented Jan 8, 2025

I increased the priority of this to high, I just stumbled upon too many use cases where this would be very useful, specially the optional receiver.

@llucax
Copy link
Contributor Author

llucax commented Jan 10, 2025

I started to need this pretty badly, so I quickly coded an OptionalReceiver to use in some other project. Posting it here as it could serve as a basis for something we add here (untested):

class OptionalReceiver(Receiver[ReceiverMessageT_co]):
    """An optional receiver that will wait indefinitely until an underlying receiver is set.

    This receiver is useful when the underlying receiver is not set initially, but will
    be set later. It will wait indefinitely until the underlying receiver is set and it
    is ready.
    """

    def __init__(self, receiver: Receiver[ReceiverMessageT_co] | None):
        """Initialize this instance.

        Args:
            receiver: The underlying receiver, or `None` if there is no receiver.
        """
        self._receiver: Receiver[ReceiverMessageT_co] | None = receiver
        self._update_event = asyncio.Event()
        self._should_stop: bool = False

    @override
    async def ready(self) -> bool:
        """Wait until the receiver is ready with a message or an error.

        Once a call to `ready()` has finished, the message should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        while not self._should_stop:
            if self._receiver is None:
                await self._update_event.wait()
                self._update_event.clear()
            else:
                return await self._receiver.ready()
        return False

    @override
    def consume(self) -> ReceiverMessageT_co:  # noqa: DOC503 (raised indirectly)
        """Return the latest from the underlying receiver message once `ready()` is complete.

        `ready()` must be called before each call to `consume()`.

        Returns:
            The next message received.

        Raises:
            ReceiverStoppedError: If the receiver stopped producing messages.
            ReceiverError: If there is some problem with the underlying receiver.
        """
        if self._receiver is None and self._should_stop:
            raise ReceiverStoppedError(self)
        assert (
            self._receiver is not None
        ), "`consume()` must be preceded by a call to `ready()`"
        return self._receiver.consume()

    def clear(self) -> Receiver[ReceiverMessageT_co] | None:
        """Clear the current underlying receiver.

        This receiver will never be ready until a new underlying receiver is set.

        Returns:
            The old receiver.
        """
        old_receiver = self._receiver
        self._receiver = None
        self._update_event.set()
        return old_receiver

    def set(
        self, receiver: Receiver[ReceiverMessageT_co]
    ) -> Receiver[ReceiverMessageT_co] | None:
        """Set a new underlying receiver.

        Args:
            receiver: The new receiver.

        Returns:
            The old receiver.
        """
        old_receiver = self._receiver
        self._receiver = receiver
        self._update_event.set()
        return old_receiver

    def stop(self) -> None:
        """Stop the receiver."""
        self._should_stop = True
        self._update_event.set()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) priority:high Address this as soon as possible type:enhancement New feature or enhancement visitble to users
Projects
Status: To do
Development

No branches or pull requests

3 participants