Skip to content

Commit

Permalink
Don't exit when the context is cancelled (#391)
Browse files Browse the repository at this point in the history
* Added a check for deadlocked adapters

* Don't exit when the context is cancelled

If we do this, it means we close the goroutine that receives on the `errChan` which leaves deadlocked goroutines hanging around forever

* Fixed cancellation signals
  • Loading branch information
dylanratcliffe authored Dec 7, 2024
1 parent 83e47ed commit 229985d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
40 changes: 33 additions & 7 deletions enginerequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
// Since we need to wait for only the processing of this query's executions, we need a separate WaitGroup here
// Overall MaxParallelExecutions evaluation is handled by e.executionPool
wg := sync.WaitGroup{}
expandedMutex := sync.RWMutex{}
expandedMutex.RLock()
for q, adapters := range expanded {
wg.Add(1)
// localize values for the closure below
Expand All @@ -239,7 +241,16 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
)
p.Go(func() {
defer LogRecoverToReturn(ctx, "ExecuteQuery inner")
defer wg.Done()
defer func() {
// Delete our query from the map so that we can track which
// ones are still running
expandedMutex.Lock()
defer expandedMutex.Unlock()
delete(expanded, localQ)

// Mark the work as done
wg.Done()
}()
defer func() {
if localQ.GetMethod() == sdp.QueryMethod_LIST {
listExecutionPoolCount.Add(-1)
Expand Down Expand Up @@ -285,6 +296,7 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
})
}()
}
expandedMutex.RUnlock()

waitGroupDone := make(chan struct{})
go func() {
Expand All @@ -310,17 +322,31 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
return
case <-time.After(longRunningAdaptersTimeout):
// If we're here, then the wait group didn't finish in time
log.WithContext(ctx).WithFields(log.Fields{
"ovm.query.uuid": query.GetUUID(),
"ovm.query.type": query.GetType(),
"ovm.query.scope": query.GetScope(),
"ovm.query.method": query.GetMethod().String(),
}).Errorf("Wait group still running %v after context cancelled", longRunningAdaptersTimeout)
expandedMutex.RLock()
for q, adapters := range expanded {
adapterNames := make([]string, len(adapters))
for i, a := range adapters {
adapterNames[i] = a.Name()
}
log.WithContext(ctx).WithFields(log.Fields{
"ovm.query.uuid": q.ParseUuid().String(),
"ovm.query.type": q.GetType(),
"ovm.query.scope": q.GetScope(),
"ovm.query.method": q.GetMethod().String(),
"ovm.query.adapters": adapterNames,
}).Errorf("Wait group still running %v after context cancelled", longRunningAdaptersTimeout)
}
expandedMutex.RUnlock()
}
}
}()
}

// If the context is cancelled, return that error
if ctx.Err() != nil {
return ctx.Err()
}

// If all failed then return first error
if numAdaptersInt := numAdapters.Load(); numErrs == int(numAdaptersInt) {
return AllAdaptersFailedError{
Expand Down
3 changes: 0 additions & 3 deletions querytracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ func (qt *QueryTracker) Execute(ctx context.Context) ([]*sdp.Item, []*sdp.QueryE
} else {
errs = nil
}
case <-ctx.Done():
// If the context is closed, return an error
return sdpItems, sdpErrs, ctx.Err()
}

if items == nil && errs == nil {
Expand Down

0 comments on commit 229985d

Please sign in to comment.