diff --git a/quesma/ab_testing/collector/collector.go b/quesma/ab_testing/collector/collector.go index 989e1d734..da6f66b0f 100644 --- a/quesma/ab_testing/collector/collector.go +++ b/quesma/ab_testing/collector/collector.go @@ -111,14 +111,14 @@ func (r *InMemoryCollector) Start() { logger.Info().Msg("Starting A/B Results Collector") go func() { - recovery.LogAndHandlePanic(r.ctx, func(err error) { + defer recovery.LogAndHandlePanic(r.ctx, func(err error) { r.cancelFunc() }) r.receiveIncomingResults() }() go func() { - recovery.LogAndHandlePanic(r.ctx, func(err error) { + defer recovery.LogAndHandlePanic(r.ctx, func(err error) { r.cancelFunc() }) r.receiveHealthAndErrorsLoop() diff --git a/quesma/ab_testing/sender/coordinator.go b/quesma/ab_testing/sender/coordinator.go index d4aead8ac..5776f9f1e 100644 --- a/quesma/ab_testing/sender/coordinator.go +++ b/quesma/ab_testing/sender/coordinator.go @@ -124,7 +124,7 @@ func (c *SenderCoordinator) Start() { c.sender.Start() go func() { - recovery.LogAndHandlePanic(c.ctx, func(err error) { + defer recovery.LogAndHandlePanic(c.ctx, func(err error) { c.cancelFunc() }) c.receiveHealthStatusesLoop() diff --git a/quesma/ab_testing/sender/sender.go b/quesma/ab_testing/sender/sender.go index 585fa3cd0..bfd468a8a 100644 --- a/quesma/ab_testing/sender/sender.go +++ b/quesma/ab_testing/sender/sender.go @@ -36,7 +36,7 @@ func newSender(ctx context.Context) *sender { func (f *sender) Start() { go func() { - recovery.LogPanic() + defer recovery.LogPanic() for { select { diff --git a/quesma/clickhouse/clickhouse.go b/quesma/clickhouse/clickhouse.go index 5f0948def..f01a9dcdf 100644 --- a/quesma/clickhouse/clickhouse.go +++ b/quesma/clickhouse/clickhouse.go @@ -84,7 +84,7 @@ func (lm *LogManager) Start() { forceReloadCh := lm.tableDiscovery.ForceReloadCh() go func() { - recovery.LogPanic() + defer recovery.LogPanic() for { select { case <-lm.ctx.Done(): diff --git a/quesma/clickhouse/quesma_communicator.go b/quesma/clickhouse/quesma_communicator.go index 1dfe80742..a63458085 100644 --- a/quesma/clickhouse/quesma_communicator.go +++ b/quesma/clickhouse/quesma_communicator.go @@ -235,7 +235,7 @@ func read(ctx context.Context, rows *sql.Rows, selectFields []string, rowToScan return nil, fmt.Errorf("clickhouse: iterating over rows failed: %v", rows.Err()) } go func() { - recovery.LogPanicWithCtx(ctx) + defer recovery.LogPanicWithCtx(ctx) err := rows.Close() if err != nil { logger.ErrorWithCtx(ctx).Msgf("clickhouse: closing rows failed: %v", err) diff --git a/quesma/elasticsearch/index_management.go b/quesma/elasticsearch/index_management.go index f4db50427..71a56055b 100644 --- a/quesma/elasticsearch/index_management.go +++ b/quesma/elasticsearch/index_management.go @@ -100,7 +100,7 @@ func (im *indexManagement) Start() { im.ctx, im.cancel = context.WithCancel(context.Background()) go func() { - recovery.LogPanic() + defer recovery.LogPanic() for { select { case <-im.ctx.Done(): diff --git a/quesma/ingest/processor.go b/quesma/ingest/processor.go index bb16a6fed..0949bfc36 100644 --- a/quesma/ingest/processor.go +++ b/quesma/ingest/processor.go @@ -99,7 +99,7 @@ func (ip *IngestProcessor) Start() { forceReloadCh := ip.tableDiscovery.ForceReloadCh() go func() { - recovery.LogPanic() + defer recovery.LogPanic() for { select { case <-ip.ctx.Done(): diff --git a/quesma/ingest/processor2.go b/quesma/ingest/processor2.go index 72d5009b9..8f6c6dc98 100644 --- a/quesma/ingest/processor2.go +++ b/quesma/ingest/processor2.go @@ -65,7 +65,7 @@ func (ip *IngestProcessor2) Start() { forceReloadCh := ip.tableDiscovery.ForceReloadCh() go func() { - recovery.LogPanic() + defer recovery.LogPanic() for { select { case <-ip.ctx.Done(): diff --git a/quesma/quesma/async_search_storage/in_memory.go b/quesma/quesma/async_search_storage/in_memory.go index d758c9977..129c73f95 100644 --- a/quesma/quesma/async_search_storage/in_memory.go +++ b/quesma/quesma/async_search_storage/in_memory.go @@ -167,7 +167,7 @@ func (e *AsyncQueryTraceLoggerEvictor) TryFlushHangingAsyncQueryTrace(timeFun fu func (e *AsyncQueryTraceLoggerEvictor) FlushHangingAsyncQueryTrace(timeFun func(time.Time) time.Duration) { go func() { - recovery.LogPanic() + defer recovery.LogPanic() for { select { case <-time.After(GCInterval): diff --git a/quesma/quesma/recovery/recovery_strategies.go b/quesma/quesma/recovery/recovery_strategies.go index 955d2ea31..22f29f92d 100644 --- a/quesma/quesma/recovery/recovery_strategies.go +++ b/quesma/quesma/recovery/recovery_strategies.go @@ -32,12 +32,16 @@ func commonRecovery(r any, panicLogger func() *zerolog.Event) { panicLogger().Msgf("Panic recovered: %s\n%s", recoveredToError(r), string(debug.Stack())) } +// IMPORTANT: must be used with defer: +// defer recovery.LogPanic() func LogPanic() { if r := recover(); r != nil { commonRecovery(r, logger.Error) } } +// IMPORTANT: must be used with defer: +// defer recovery.LogPanicWithCtx(ctx) func LogPanicWithCtx(ctx context.Context) { if r := recover(); r != nil { commonRecovery(r, func() *zerolog.Event { @@ -46,6 +50,8 @@ func LogPanicWithCtx(ctx context.Context) { } } +// IMPORTANT: must be used with defer: +// defer recovery.LogAndHandlePanic(ctx, cleanupHandler) func LogAndHandlePanic(ctx context.Context, cleanupHandler func(err error)) { if r := recover(); r != nil { commonRecovery(r, func() *zerolog.Event { diff --git a/quesma/quesma/recovery/revocery_strategies_test.go b/quesma/quesma/recovery/recovery_strategies_test.go similarity index 100% rename from quesma/quesma/recovery/revocery_strategies_test.go rename to quesma/quesma/recovery/recovery_strategies_test.go diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index 4d0ed74a3..5bfce7364 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -268,7 +268,7 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan select { case <-time.After(time.Duration(optAsync.waitForResultsMs) * time.Millisecond): go func() { // Async search takes longer. Return partial results and wait for - recovery.LogPanicWithCtx(ctx) + defer recovery.LogPanicWithCtx(ctx) res := <-doneCh responseBody, err = q.storeAsyncSearch(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, body, res, true, opaqueId) sendMainPlanResult(responseBody, err) diff --git a/quesma/quesma/search_ab_testing.go b/quesma/quesma/search_ab_testing.go index 7a886baf9..e64d7a7ca 100644 --- a/quesma/quesma/search_ab_testing.go +++ b/quesma/quesma/search_ab_testing.go @@ -222,7 +222,7 @@ func (q *QueryRunner) executePlanElastic(ctx context.Context, plan *model.Execut case <-time.After(time.Duration(optAsync.waitForResultsMs) * time.Millisecond): go func() { // Async search takes longer. Return partial results and wait for - recovery.LogPanicWithCtx(ctx) + defer recovery.LogPanicWithCtx(ctx) res := <-doneCh responseBody, err = q.storeAsyncSearchWithRaw(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, requestBody, res.response, res.err, res.translatedQueryBody, true, opaqueId) sendABResult(responseBody, err)