From 15201f1e5967fd1bc1e51c118178ab109170fe07 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 22 Mar 2024 09:55:17 -0700 Subject: [PATCH] Prevent starting unnecessary goroutines (#9817) Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/9739 Replaces https://github.com/open-telemetry/opentelemetry-collector/pull/9814 Signed-off-by: Bogdan Drutu --- .../fix-batch-processor-goroutine-leak.yaml | 25 +++++++++++++++++++ processor/batchprocessor/batch_processor.go | 13 +++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) create mode 100644 .chloggen/fix-batch-processor-goroutine-leak.yaml diff --git a/.chloggen/fix-batch-processor-goroutine-leak.yaml b/.chloggen/fix-batch-processor-goroutine-leak.yaml new file mode 100644 index 00000000000..c5ce6662b97 --- /dev/null +++ b/.chloggen/fix-batch-processor-goroutine-leak.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processor/batch + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Prevent starting unnecessary goroutines. + +# One or more tracking issues or pull requests related to the change +issues: [9739] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 4853eef524a..60c6fa10ef6 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -129,7 +129,9 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func metadataLimit: int(cfg.MetadataCardinalityLimit), } if len(bp.metadataKeys) == 0 { - bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)} + s := bp.newShard(nil) + s.start() + bp.batcher = &singleShardBatcher{batcher: s} } else { bp.batcher = &multiShardBatcher{ batchProcessor: bp, @@ -156,8 +158,6 @@ func (bp *batchProcessor) newShard(md map[string][]string) *shard { exportCtx: exportCtx, batch: bp.batchFunc(), } - b.processor.goroutines.Add(1) - go b.start() return b } @@ -180,6 +180,11 @@ func (bp *batchProcessor) Shutdown(context.Context) error { } func (b *shard) start() { + b.processor.goroutines.Add(1) + go b.startLoop() +} + +func (b *shard) startLoop() { defer b.processor.goroutines.Done() // timerCh ensures we only block when there is a @@ -320,6 +325,8 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { var loaded bool b, loaded = mb.batchers.LoadOrStore(aset, mb.newShard(md)) if !loaded { + // Start the goroutine only if we added the object to the map, otherwise is already started. + b.(*shard).start() mb.size++ } mb.lock.Unlock()