-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Concurrency entity for worker #1405
base: master
Are you sure you want to change the base?
Conversation
610dbeb
to
6c9af52
Compare
Codecov ReportAttention: Patch coverage is
... and 1 file with indirect coverage changes Continue to review full report in Codecov by Sentry.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look safe as long as the underlying semaphore library is not buggy. It will be the critical component determining concurrency controls of client SDK. I recommend deep diving on that to understand the implementation, check for potential deadlock cases and write comprehensive unit/concurrency tests for the wrapper permit
implementation
bcb22f8
to
dc99021
Compare
internal/worker/concurrency.go
Outdated
@@ -74,7 +77,7 @@ func (p *permit) AcquireChan(ctx context.Context, wg *sync.WaitGroup) <-chan str | |||
} | |||
select { // try to send to channel, but don't block if listener is gone | |||
case ch <- struct{}{}: | |||
default: | |||
case <-time.After(10 * time.Millisecond): // wait time is needed to avoid race condition of channel sending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
found a race condition of channel sending. Adding a wait time would be more reliable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm :\ unfortunately this means that if processing is delayed by 10ms it will block the chan-reader forever. that's not too unlikely with big CPU spikes, and definitely not impossible.
tbh I think that might rule this impl out entirely. though I think it's possible to build a AcquireChan(...) (<-chan struct{}, cancel func())
that doesn't have this issue, and that might be worth doing.
or we might have to embrace the atomic-like behavior around this and add retries to (*baseWorker).runPoller
/ anything using AcquireChan
. that wouldn't be a fatal constraint afaict, though it's not ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we expect chan-reader to always consume from the returned ch
unless ctx is canceled then we can replace this goroutine implementation with
defer wg.Done()
if err := p.sem.Acquire(ctx, 1); err != nil {
return // assuming Acquire only returns err if ctx.Done
}
select {
case ch <- struct{}{}:
case <-ctx.Done():
p.sem.Release(1)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed through Taylan's suggestion. It should be safe now.
Reading through marusama/semaphore/v2:
So... I think we're probably fine. For pollers we won't run into any of these in practice. In the meantime we should probably get some integer overflow checks added to it, and a new release. There's no reason to allow those to occur. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm gonna have to block this for https://github.com/cadence-workflow/cadence-go-client/pull/1405/files#r1859368368 since that's potentially fatal.
Overall though:
- library looks fine (more details above)
- changes look reasonable
- four-layer-deep
bw.concurrency.TaskPermit.AcquireChan
accesses are a bit dubious in principle, but I think they make sense here. the added structure/layers seems useful. - wrapper as a whole seems good to have, though do we have any plans to do non-
1
values? I'd get rid ofcount
if not, but I suppose that depends on the "resource". (but see library comment for risks there, e.g. we definitely cannot use it for "bytes" in many cases)
- four-layer-deep
- I have not checked the tests in detail but the high level approach seems useful. might need some fine-tuning, but it's reasonably "meaningful" and that's a very solid start.
I'm... not entirely sure what to do to resolve the core "resizable semaphore but we also need chans" issue tbh. We might end up needing / badly-wanting chans, so that may be important. I suspect there's some other library, but I haven't hunted for one or thought too much on what it'd need to be correct 🤔
Double checked. We don't do non-one values in acquire at any place. I'll remove it in the next PR. Some of the pollerAutoScaler interface methods are redundant and thus can be removed. |
// AcquireChan returns a permit ready channel. Similar to Acquire, but non-blocking. | ||
// Remember to call Release(1) to release the permit after usage | ||
func (p *permit) AcquireChan(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} { | ||
ch := make(chan struct{}) | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
if err := p.sem.Acquire(ctx, 1); err != nil { | ||
return | ||
} | ||
select { // try to send to channel, but don't block if listener is gone | ||
case ch <- struct{}{}: | ||
case <-ctx.Done(): | ||
p.sem.Release(1) | ||
} | ||
}() | ||
return ch | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. this is possible to use correctly, and doesn't leak goroutines beyond ctx.Done()
which is good (and possibly good enough), but it still leaves the chan permanently blocking in some cases.
that's correct as long as the AcquireChan(...)
caller also listens to the same / a derived ctx.Done()
and stops using the chan if that occurs. and ideally also cancels the context when it stops reading in all other branches.
func (bw *baseWorker) runPoller() {
does this currently, because bw.shutdownCh
closes when bw.limiterContext
is canceled and there effectively is no timeout, but it feels kinda error-prone 🤔
and it also feels like there might be an alternative that isn't as risky... though I'm not yet sure what that might be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From trying to build some alternatives, and thinking about it some more, two thoughts:
1: "Permanently block if the context is timed out" is I think the correct choice, because the alternative is to close the chan and that can easily lead to infinite loops + might be interpreted as "no limit". So you'd also need a "detect if closed" check everywhere all the time, rather than just adding the case
that is needed to know when to stop. So 👍 that's fine here.
2: With resizing being allowed, I'm growing convinced that either "one-use chan + goroutine + release func if not read, per AcquireChan call" or "a background maintenance goroutine" is unavoidable. Limit and count changes have to be synchronized with reads so reads can't be buffered, but we can't tell if a reader is gone forever or just delayed so we can't pair releases up with acquires synchronously, so we need some kind of buffer somewhere else.
So... this might work, though the "correct use requires passing a context that you also wait on in the same select, and also cancel when you stop reading" detail is still striking me as a moderate footgun.
E.g. the core runPoller
loop is only safe right now because there is both no timeout and no other branch in the select - if we add a branch, it'll leak goroutines ~forever every time it takes that other branch.
But that is something we could document thoroughly and probably not run into. And using murasama/semaphore/v2 does make it a much simpler implementation than doing it by hand.
Mind if I sit on this over the long weekend, and maybe we can grab others / discuss an alternative I made that's a bit more misuse-resistant? With some careful docs I think this is acceptable and looks functionally-correct, but I'm not entirely sure it's worth keeping...
What changed?
Why?
add dynamic params component towards worker auto configuration
How did you test it?
unit test
integration test
Potential risks