Skip to content

Commit

Permalink
[chore] BatchProcessor: Move fields from top level struct to child th…
Browse files Browse the repository at this point in the history
…at uses it (open-telemetry#11502)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Oct 21, 2024
1 parent 007f06b commit 5cd035b
Showing 1 changed file with 15 additions and 16 deletions.
31 changes: 15 additions & 16 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ type batchProcessor struct {
// with the appropriate signal.
batchFunc func() batch

// metadataKeys is the configured list of metadata keys. When
// empty, the `singleton` batcher is used. When non-empty,
// each distinct combination of metadata keys and values
// triggers a new batcher, counted in `goroutines`.
metadataKeys []string

// metadataLimit is the limiting size of the batchers map.
metadataLimit int

shutdownC chan struct{}
goroutines sync.WaitGroup

Expand Down Expand Up @@ -138,17 +129,16 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
timeout: cfg.Timeout,
batchFunc: batchFunc,
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
}
if len(bp.metadataKeys) == 0 {
if len(mks) == 0 {
bp.batcher = &singleShardBatcher{
processor: bp,
single: nil, // created in start
}
} else {
bp.batcher = &multiShardBatcher{
processor: bp,
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
processor: bp,
}
}

Expand Down Expand Up @@ -310,6 +300,15 @@ func (sb *singleShardBatcher) currentMetadataCardinality() int {

// multiBatcher is used when metadataKeys is not empty.
type multiShardBatcher struct {
// metadataKeys is the configured list of metadata keys. When
// empty, the `singleton` batcher is used. When non-empty,
// each distinct combination of metadata keys and values
// triggers a new batcher, counted in `goroutines`.
metadataKeys []string

// metadataLimit is the limiting size of the batchers map.
metadataLimit int

processor *batchProcessor
batchers sync.Map

Expand All @@ -329,7 +328,7 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
info := client.FromContext(ctx)
md := map[string][]string{}
var attrs []attribute.KeyValue
for _, k := range mb.processor.metadataKeys {
for _, k := range mb.metadataKeys {
// Lookup the value in the incoming metadata, copy it
// into the outgoing metadata, and create a unique
// value for the attributeSet.
Expand All @@ -346,7 +345,7 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
b, ok := mb.batchers.Load(aset)
if !ok {
mb.lock.Lock()
if mb.processor.metadataLimit != 0 && mb.size >= mb.processor.metadataLimit {
if mb.metadataLimit != 0 && mb.size >= mb.metadataLimit {
mb.lock.Unlock()
return errTooManyBatchers
}
Expand Down

0 comments on commit 5cd035b

Please sign in to comment.