-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose LISTEN pubsub functionality #129
base: master
Are you sure you want to change the base?
Conversation
We already have a robust LISTEN/NOTIFY implementation internally. It would potentially be useful for a variety of user-space coordination activities. Expose it by wrapping our internal notifier package in a minimal public API. Ensure that we validate topic names in the process.
// TODO(bgentry): is this the wrong context? Should the notifier actually | ||
// use the `workCtx` so that it doesn't get shut down before existing jobs | ||
// have finished / had their contexts cancelled? This would preserve the | ||
// ability to cancel an individual job's context during the initial | ||
// shutdown phase. | ||
c.notifier.Run(fetchNewWorkCtx) | ||
c.wg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this while I was in here and wanted to ask about it. If we shut down the notifier at the same time as we stop fetching new work, that means the notifier is no longer available for ongoing jobs (including potentially for the individualized cancellation of ongoing jobs). That seems wrong. Perhaps we should use the workCtx
instead? That will require a bit of refactoring for the Stop
flow because we'd want to call workCancel
as soon as the producers finish but before waiting on the notifier. Currently, the notifier is bundled together into a waitgroup with producers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much to add, but +1 — seems wrong as is. I'm a little scared to add another stop step since it's already somewhat difficult to understand the progression of each step as is. IMO, might not be a bad idea to try and refactor stopping at some point so that each step is easily visible in sequence (i.e. most of the logic lives in one place and you can easily scroll through each phase).
// It returns an error if the specified topic is not a valid Postgres | ||
// identifier. Panics if notifyFunc is nil. | ||
func (c *Client[TTx]) Listen(topic string, notifyFunc NotifyFunc) (*ListenSubscription, error) { | ||
// Validate that the topic is a valid Postgres identifier: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandur should we also forbid the use of river_
prefix topics? Or do you think there's no real risk in allowing it? I've gone back and forth but atm I'm not concerned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, doesn't hurt to start with constraints first and relax them later if it seems necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bgentry I'd be a little reluctant to expose this on the client since it doesn't have much to do with River's core job working functionality. If a user wants to listen/notify on a channel, wouldn't it be better to push them towards using their own pgx primitives instead of going through River?
If this is for use with some of our other planned features, I wonder if we should try to put this in the driver API which is a little out of the way, and which we've marked as clearly subject to change. Thoughts?
// It returns an error if the specified topic is not a valid Postgres | ||
// identifier. Panics if notifyFunc is nil. | ||
func (c *Client[TTx]) Listen(topic string, notifyFunc NotifyFunc) (*ListenSubscription, error) { | ||
// Validate that the topic is a valid Postgres identifier: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, doesn't hurt to start with constraints first and relax them later if it seems necessary.
// TODO(bgentry): is this the wrong context? Should the notifier actually | ||
// use the `workCtx` so that it doesn't get shut down before existing jobs | ||
// have finished / had their contexts cancelled? This would preserve the | ||
// ability to cancel an individual job's context during the initial | ||
// shutdown phase. | ||
c.notifier.Run(fetchNewWorkCtx) | ||
c.wg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much to add, but +1 — seems wrong as is. I'm a little scared to add another stop step since it's already somewhat difficult to understand the progression of each step as is. IMO, might not be a bad idea to try and refactor stopping at some point so that each step is easily visible in sequence (i.e. most of the logic lives in one place and you can easily scroll through each phase).
Will adding this feature allow us to be notified of a completion of a job, I want to send a realtime update of the status of the job to the fronetend and to other parts of the backend. Think this should be already possible right? Does this add something else? EDIT: https://riverqueue.com/docs/subscriptions I completely missed this^ apologies |
@geekodour Yeah, exactly. The subscriber interface is the way to go for that. |
We already have a robust LISTEN/NOTIFY implementation internally. It would potentially be useful for a variety of user-space coordination activities.
Expose it by wrapping our internal notifier package in a minimal public API. Ensure that we validate topic names in the process.
TODO: add test coverage