Skip to content

Commit

Permalink
Fix non-working panic recovery (missing defer) (#1128)
Browse files Browse the repository at this point in the history
The `recover()` golang built-in function should be called only from
`defer`:

> Recover is only useful inside deferred functions
Source: (https://go.dev/blog/defer-panic-and-recover)

However, there were many places where it was called without `defer` and
the recovery would not occur. See this minimal example below:

https://go.dev/play/p/Qx5eLCadQnU

```go
func LogPanic() {
	if r := recover(); r != nil {
		fmt.Println("Recovered:", r)
	}
}

func main() {
	go func() {
		LogPanic() // recover() runs immediately
		           // there's no panic to recover
		           // and logic from this function won't ever run again
		panic("oh no") // NOT RECOVERED!
	}()
	time.Sleep(1 * time.Second)
}
```

Therefore add missing `defer` to all places.
  • Loading branch information
avelanarius authored Dec 19, 2024
1 parent 10f2000 commit 44cd2cf
Show file tree
Hide file tree
Showing 13 changed files with 18 additions and 12 deletions.
4 changes: 2 additions & 2 deletions quesma/ab_testing/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ func (r *InMemoryCollector) Start() {
logger.Info().Msg("Starting A/B Results Collector")

go func() {
recovery.LogAndHandlePanic(r.ctx, func(err error) {
defer recovery.LogAndHandlePanic(r.ctx, func(err error) {
r.cancelFunc()
})
r.receiveIncomingResults()
}()

go func() {
recovery.LogAndHandlePanic(r.ctx, func(err error) {
defer recovery.LogAndHandlePanic(r.ctx, func(err error) {
r.cancelFunc()
})
r.receiveHealthAndErrorsLoop()
Expand Down
2 changes: 1 addition & 1 deletion quesma/ab_testing/sender/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *SenderCoordinator) Start() {
c.sender.Start()

go func() {
recovery.LogAndHandlePanic(c.ctx, func(err error) {
defer recovery.LogAndHandlePanic(c.ctx, func(err error) {
c.cancelFunc()
})
c.receiveHealthStatusesLoop()
Expand Down
2 changes: 1 addition & 1 deletion quesma/ab_testing/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newSender(ctx context.Context) *sender {
func (f *sender) Start() {

go func() {
recovery.LogPanic()
defer recovery.LogPanic()

for {
select {
Expand Down
2 changes: 1 addition & 1 deletion quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (lm *LogManager) Start() {
forceReloadCh := lm.tableDiscovery.ForceReloadCh()

go func() {
recovery.LogPanic()
defer recovery.LogPanic()
for {
select {
case <-lm.ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func read(ctx context.Context, rows *sql.Rows, selectFields []string, rowToScan
return nil, fmt.Errorf("clickhouse: iterating over rows failed: %v", rows.Err())
}
go func() {
recovery.LogPanicWithCtx(ctx)
defer recovery.LogPanicWithCtx(ctx)
err := rows.Close()
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("clickhouse: closing rows failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion quesma/elasticsearch/index_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (im *indexManagement) Start() {
im.ctx, im.cancel = context.WithCancel(context.Background())

go func() {
recovery.LogPanic()
defer recovery.LogPanic()
for {
select {
case <-im.ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (ip *IngestProcessor) Start() {
forceReloadCh := ip.tableDiscovery.ForceReloadCh()

go func() {
recovery.LogPanic()
defer recovery.LogPanic()
for {
select {
case <-ip.ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion quesma/ingest/processor2.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (ip *IngestProcessor2) Start() {
forceReloadCh := ip.tableDiscovery.ForceReloadCh()

go func() {
recovery.LogPanic()
defer recovery.LogPanic()
for {
select {
case <-ip.ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/async_search_storage/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (e *AsyncQueryTraceLoggerEvictor) TryFlushHangingAsyncQueryTrace(timeFun fu

func (e *AsyncQueryTraceLoggerEvictor) FlushHangingAsyncQueryTrace(timeFun func(time.Time) time.Duration) {
go func() {
recovery.LogPanic()
defer recovery.LogPanic()
for {
select {
case <-time.After(GCInterval):
Expand Down
6 changes: 6 additions & 0 deletions quesma/quesma/recovery/recovery_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ func commonRecovery(r any, panicLogger func() *zerolog.Event) {
panicLogger().Msgf("Panic recovered: %s\n%s", recoveredToError(r), string(debug.Stack()))
}

// IMPORTANT: must be used with defer:
// defer recovery.LogPanic()
func LogPanic() {
if r := recover(); r != nil {
commonRecovery(r, logger.Error)
}
}

// IMPORTANT: must be used with defer:
// defer recovery.LogPanicWithCtx(ctx)
func LogPanicWithCtx(ctx context.Context) {
if r := recover(); r != nil {
commonRecovery(r, func() *zerolog.Event {
Expand All @@ -46,6 +50,8 @@ func LogPanicWithCtx(ctx context.Context) {
}
}

// IMPORTANT: must be used with defer:
// defer recovery.LogAndHandlePanic(ctx, cleanupHandler)
func LogAndHandlePanic(ctx context.Context, cleanupHandler func(err error)) {
if r := recover(); r != nil {
commonRecovery(r, func() *zerolog.Event {
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan
select {
case <-time.After(time.Duration(optAsync.waitForResultsMs) * time.Millisecond):
go func() { // Async search takes longer. Return partial results and wait for
recovery.LogPanicWithCtx(ctx)
defer recovery.LogPanicWithCtx(ctx)
res := <-doneCh
responseBody, err = q.storeAsyncSearch(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, body, res, true, opaqueId)
sendMainPlanResult(responseBody, err)
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/search_ab_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (q *QueryRunner) executePlanElastic(ctx context.Context, plan *model.Execut

case <-time.After(time.Duration(optAsync.waitForResultsMs) * time.Millisecond):
go func() { // Async search takes longer. Return partial results and wait for
recovery.LogPanicWithCtx(ctx)
defer recovery.LogPanicWithCtx(ctx)
res := <-doneCh
responseBody, err = q.storeAsyncSearchWithRaw(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, requestBody, res.response, res.err, res.translatedQueryBody, true, opaqueId)
sendABResult(responseBody, err)
Expand Down

0 comments on commit 44cd2cf

Please sign in to comment.