From c840e69bbbd185f0fc7f167c0fc1794fc7d10e57 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 7 Nov 2024 15:48:04 +0100 Subject: [PATCH] [pkg/stanza] make log emitter and entry conversion in adapter synchronous (#35669) #### Description This PR changes the `LogEmitter` to accept a synchronous consumer callback function for processing a batch of log entries as an alternative to sending log entry batches to a channel. The components that use the `LogEmitter` (adapter and parser) have been adapted accordingly. Also, in the case of the adapter, the log entries are converted directly, rather than sending them over a channel to the converter and receiving the converted results over a different channel. #### Link to tracking issue Fixes #35453 #### Testing I did some initial performance tests using the `TestLogLargeFiles` load test to see how this change affects the performance. Below are the results: **Before the change (i.e. with async log entry batch processing)** ``` === RUN TestLogLargeFiles/filelog-largefiles-2Gb-lifetime 2024/10/08 09:02:53 | Sent:17,769,795 logs (179,507/sec) | Received:17,755,188 items (179,346/sec) === RUN TestLogLargeFiles/filelog-largefiles-6GB-lifetime 2024/10/08 09:06:29 | Sent:42,857,755 logs (216,465/sec) | Received:42,851,987 items (216,424/sec) Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items| ---------------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:| LogLargeFiles/filelog-largefiles-2Gb-lifetime|PASS | 100s| 73.1| 78.4| 106| 118| 18249451| 18249451| LogLargeFiles/filelog-largefiles-6GB-lifetime|PASS | 200s| 87.5| 98.1| 110| 116| 44358460| 44358460| ``` **After the change (i.e. with sync log entry batch processing)** ``` === RUN TestLogLargeFiles/filelog-largefiles-2Gb-lifetime 2024/10/08 10:09:51 Agent RAM (RES): 139 MiB, CPU:71.7% | Sent:17,802,561 logs (179,836/sec) | Received:17,788,273 items (179,680/sec) === RUN TestLogLargeFiles/filelog-largefiles-6GB-lifetime 2024/10/08 10:05:15 Agent RAM (RES): 140 MiB, CPU:95.6% | Sent:42,912,030 logs (216,744/sec) | Received:42,904,306 items (216,689/sec) Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items| ---------------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:| LogLargeFiles/filelog-largefiles-2Gb-lifetime|PASS | 100s| 74.8| 78.9| 127| 139| 17984687| 17984687| LogLargeFiles/filelog-largefiles-6GB-lifetime|PASS | 200s| 89.3| 100.9| 134| 140| 43376210| 43376210| ``` Those results seem to indicate comparable throughput, but with an increased resource consumption, especially in terms of memory. I also did a test comparing the performance between the synchronous and asynchronous log emitter using the same methodology as in #35454. The results were the following, and indicate an increase in the time it takes for reading the generated log file (see #35454 for details on how the file is generated and the test execution): - Async Log Emitter: ~8s - Sync Log Emitter: ~12s
output-async.log === Step 3: Thu Oct 10 08:54:23 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 2.209674e+06 === Step 4: Thu Oct 10 08:54:25 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 5.428103e+06 === Step 5: Thu Oct 10 08:54:26 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 7.337017e+06 === Step 6: Thu Oct 10 08:54:27 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 9.258843e+06 === Step 7: Thu Oct 10 08:54:29 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.3082428e+07 === Step 8: Thu Oct 10 08:54:31 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.6519068e+07
output-sync.log === Step 2: Thu Oct 10 08:51:27 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.580891e+06 === Step 3: Thu Oct 10 08:51:28 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 3.01034e+06 === Step 4: Thu Oct 10 08:51:29 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 4.434627e+06 === Step 5: Thu Oct 10 08:51:31 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 7.416612e+06 === Step 6: Thu Oct 10 08:51:34 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.0496072e+07 === Step 7: Thu Oct 10 08:51:36 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.3523882e+07 === Step 8: Thu Oct 10 08:51:37 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.4929707e+07 === Step 9: Thu Oct 10 08:51:39 CEST 2024 otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""} 1.6519105e+07
--------- Signed-off-by: Florian Bacher Co-authored-by: Andrzej Stencel --- .chloggen/stanza-sync-log-emitter.yaml | 27 ++++ pkg/stanza/adapter/converter.go | 54 +++---- pkg/stanza/adapter/factory.go | 44 +++--- pkg/stanza/adapter/integration_test.go | 23 +-- pkg/stanza/adapter/receiver.go | 98 +++---------- pkg/stanza/adapter/receiver_test.go | 133 ++++++++++-------- pkg/stanza/operator/helper/emitter.go | 34 +---- pkg/stanza/operator/helper/emitter_test.go | 94 ++++++++----- .../operator/parser/container/config.go | 22 +-- .../operator/parser/container/parser.go | 27 ++-- processor/logstransformprocessor/processor.go | 95 +------------ testbed/tests/log_test.go | 9 ++ 12 files changed, 283 insertions(+), 377 deletions(-) create mode 100644 .chloggen/stanza-sync-log-emitter.yaml diff --git a/.chloggen/stanza-sync-log-emitter.yaml b/.chloggen/stanza-sync-log-emitter.yaml new file mode 100644 index 000000000000..b8bc8cfbf954 --- /dev/null +++ b/.chloggen/stanza-sync-log-emitter.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Synchronous handling of entries passed from the log emitter to the receiver adapter + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35453] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/adapter/converter.go b/pkg/stanza/adapter/converter.go index 2f8c3540cf42..3ab508745bc3 100644 --- a/pkg/stanza/adapter/converter.go +++ b/pkg/stanza/adapter/converter.go @@ -155,44 +155,46 @@ func (c *Converter) workerLoop() { defer c.wg.Done() for entries := range c.workerChan { + // Send plogs directly to flushChan + c.flushChan <- ConvertEntries(entries) + } +} - resourceHashToIdx := make(map[uint64]int) - scopeIdxByResource := make(map[uint64]map[string]int) +func ConvertEntries(entries []*entry.Entry) plog.Logs { + resourceHashToIdx := make(map[uint64]int) + scopeIdxByResource := make(map[uint64]map[string]int) - pLogs := plog.NewLogs() - var sl plog.ScopeLogs + pLogs := plog.NewLogs() + var sl plog.ScopeLogs - for _, e := range entries { - resourceID := HashResource(e.Resource) - var rl plog.ResourceLogs + for _, e := range entries { + resourceID := HashResource(e.Resource) + var rl plog.ResourceLogs - resourceIdx, ok := resourceHashToIdx[resourceID] - if !ok { - resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len() + resourceIdx, ok := resourceHashToIdx[resourceID] + if !ok { + resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len() - rl = pLogs.ResourceLogs().AppendEmpty() - upsertToMap(e.Resource, rl.Resource().Attributes()) + rl = pLogs.ResourceLogs().AppendEmpty() + upsertToMap(e.Resource, rl.Resource().Attributes()) - scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0} + scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0} + sl = rl.ScopeLogs().AppendEmpty() + sl.Scope().SetName(e.ScopeName) + } else { + rl = pLogs.ResourceLogs().At(resourceIdx) + scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName] + if !ok { + scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len() sl = rl.ScopeLogs().AppendEmpty() sl.Scope().SetName(e.ScopeName) } else { - rl = pLogs.ResourceLogs().At(resourceIdx) - scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName] - if !ok { - scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len() - sl = rl.ScopeLogs().AppendEmpty() - sl.Scope().SetName(e.ScopeName) - } else { - sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource) - } + sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource) } - convertInto(e, sl.LogRecords().AppendEmpty()) } - - // Send plogs directly to flushChan - c.flushChan <- pLogs + convertInto(e, sl.LogRecords().AppendEmpty()) } + return pLogs } func (c *Converter) flushLoop() { diff --git a/pkg/stanza/adapter/factory.go b/pkg/stanza/adapter/factory.go index 2f42a1480bb6..e4b8c83ecac2 100644 --- a/pkg/stanza/adapter/factory.go +++ b/pkg/stanza/adapter/factory.go @@ -46,6 +46,21 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc { operators := append([]operator.Config{inputCfg}, baseCfg.Operators...) + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: params.ID, + ReceiverCreateSettings: params, + }) + if err != nil { + return nil, err + } + rcv := &receiver{ + set: params.TelemetrySettings, + id: params.ID, + consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer), + obsrecv: obsrecv, + storageID: baseCfg.StorageID, + } + var emitterOpts []helper.EmitterOption if baseCfg.maxBatchSize > 0 { emitterOpts = append(emitterOpts, helper.WithMaxBatchSize(baseCfg.maxBatchSize)) @@ -53,7 +68,8 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc { if baseCfg.flushInterval > 0 { emitterOpts = append(emitterOpts, helper.WithFlushInterval(baseCfg.flushInterval)) } - emitter := helper.NewLogEmitter(params.TelemetrySettings, emitterOpts...) + + emitter := helper.NewLogEmitter(params.TelemetrySettings, rcv.consumeEntries, emitterOpts...) pipe, err := pipeline.Config{ Operators: operators, DefaultOutput: emitter, @@ -62,27 +78,9 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc { return nil, err } - var converterOpts []converterOption - if baseCfg.numWorkers > 0 { - converterOpts = append(converterOpts, withWorkerCount(baseCfg.numWorkers)) - } - converter := NewConverter(params.TelemetrySettings, converterOpts...) - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: params.ID, - ReceiverCreateSettings: params, - }) - if err != nil { - return nil, err - } - return &receiver{ - set: params.TelemetrySettings, - id: params.ID, - pipe: pipe, - emitter: emitter, - consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer), - converter: converter, - obsrecv: obsrecv, - storageID: baseCfg.StorageID, - }, nil + rcv.emitter = emitter + rcv.pipe = pipe + + return rcv, nil } } diff --git a/pkg/stanza/adapter/integration_test.go b/pkg/stanza/adapter/integration_test.go index a088a917c808..c75eeedd56d9 100644 --- a/pkg/stanza/adapter/integration_test.go +++ b/pkg/stanza/adapter/integration_test.go @@ -27,7 +27,7 @@ import ( func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) { set := componenttest.NewNopTelemetrySettings() set.Logger = zap.NewNop() - emitter := helper.NewLogEmitter(set) + pipe, err := pipeline.Config{ Operators: []operator.Config{ { @@ -48,15 +48,18 @@ func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) { return nil, err } - return &receiver{ - set: set, - id: component.MustNewID("testReceiver"), - pipe: pipe, - emitter: emitter, - consumer: nextConsumer, - converter: NewConverter(componenttest.NewNopTelemetrySettings()), - obsrecv: obsrecv, - }, nil + rcv := &receiver{ + set: set, + id: component.MustNewID("testReceiver"), + pipe: pipe, + consumer: nextConsumer, + obsrecv: obsrecv, + } + + emitter := helper.NewLogEmitter(set, rcv.consumeEntries) + + rcv.emitter = emitter + return rcv, nil } // BenchmarkEmitterToConsumer serves as a benchmark for entries going from the emitter to consumer, diff --git a/pkg/stanza/adapter/receiver.go b/pkg/stanza/adapter/receiver.go index 61124e3bf3c7..5b7760992181 100644 --- a/pkg/stanza/adapter/receiver.go +++ b/pkg/stanza/adapter/receiver.go @@ -6,7 +6,6 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-con import ( "context" "fmt" - "sync" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -16,22 +15,19 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" ) type receiver struct { - set component.TelemetrySettings - id component.ID - emitWg sync.WaitGroup - consumeWg sync.WaitGroup - cancel context.CancelFunc - - pipe pipeline.Pipeline - emitter *helper.LogEmitter - consumer consumer.Logs - converter *Converter - obsrecv *receiverhelper.ObsReport + set component.TelemetrySettings + id component.ID + + pipe pipeline.Pipeline + emitter *helper.LogEmitter + consumer consumer.Logs + obsrecv *receiverhelper.ObsReport storageID *component.ID storageClient storage.Client @@ -42,8 +38,6 @@ var _ rcvr.Logs = (*receiver)(nil) // Start tells the receiver to start func (r *receiver) Start(ctx context.Context, host component.Host) error { - rctx, cancel := context.WithCancel(ctx) - r.cancel = cancel r.set.Logger.Info("Starting stanza receiver") if err := r.setStorageClient(ctx, host); err != nil { @@ -54,86 +48,26 @@ func (r *receiver) Start(ctx context.Context, host component.Host) error { return fmt.Errorf("start stanza: %w", err) } - r.converter.Start() - - // Below we're starting 2 loops: - // * one which reads all the logs produced by the emitter and then forwards - // them to converter - // ... - r.emitWg.Add(1) - go r.emitterLoop() - - // ... - // * second one which reads all the logs produced by the converter - // (aggregated by Resource) and then calls consumer to consume them. - r.consumeWg.Add(1) - go r.consumerLoop(rctx) - - // Those 2 loops are started in separate goroutines because batching in - // the emitter loop can cause a flush, caused by either reaching the max - // flush size or by the configurable ticker which would in turn cause - // a set of log entries to be available for reading in converter's out - // channel. In order to prevent backpressure, reading from the converter - // channel and batching are done in those 2 goroutines. - return nil } -// emitterLoop reads the log entries produced by the emitter and batches them -// in converter. -func (r *receiver) emitterLoop() { - defer r.emitWg.Done() - - // Don't create done channel on every iteration. - // emitter.OutChannel is closed on ctx.Done(), no need to handle ctx here - // instead we should drain and process the channel to let emitter cancel properly - for e := range r.emitter.OutChannel() { - if err := r.converter.Batch(e); err != nil { - r.set.Logger.Error("Could not add entry to batch", zap.Error(err)) - } - } +func (r *receiver) consumeEntries(ctx context.Context, entries []*entry.Entry) { + obsrecvCtx := r.obsrecv.StartLogsOp(ctx) + pLogs := ConvertEntries(entries) + logRecordCount := pLogs.LogRecordCount() - r.set.Logger.Debug("Emitter loop stopped") -} - -// consumerLoop reads converter log entries and calls the consumer to consumer them. -func (r *receiver) consumerLoop(ctx context.Context) { - defer r.consumeWg.Done() - - // Don't create done channel on every iteration. - // converter.OutChannel is closed on Shutdown before context is cancelled. - // Drain the channel and process events before exiting - for pLogs := range r.converter.OutChannel() { - obsrecvCtx := r.obsrecv.StartLogsOp(ctx) - logRecordCount := pLogs.LogRecordCount() - - cErr := r.consumer.ConsumeLogs(ctx, pLogs) - if cErr != nil { - r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr)) - } - r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr) + cErr := r.consumer.ConsumeLogs(ctx, pLogs) + if cErr != nil { + r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr)) } - - r.set.Logger.Debug("Consumer loop stopped") + r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr) } // Shutdown is invoked during service shutdown func (r *receiver) Shutdown(ctx context.Context) error { - if r.cancel == nil { - return nil - } - r.set.Logger.Info("Stopping stanza receiver") pipelineErr := r.pipe.Stop() - // wait for emitter to finish batching and let consumers catch up - r.emitWg.Wait() - - r.converter.Stop() - r.cancel() - // wait for consumers to catch up - r.consumeWg.Wait() - if r.storageClient != nil { clientErr := r.storageClient.Close(ctx) return multierr.Combine(pipelineErr, clientErr) diff --git a/pkg/stanza/adapter/receiver_test.go b/pkg/stanza/adapter/receiver_test.go index a5349a479866..c46d0c5a376f 100644 --- a/pkg/stanza/adapter/receiver_test.go +++ b/pkg/stanza/adapter/receiver_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" @@ -48,16 +49,12 @@ func TestStart(t *testing.T) { require.NoError(t, err, "receiver start failed") stanzaReceiver := logsReceiver.(*receiver) - logChan := stanzaReceiver.emitter.OutChannelForWrite() - logChan <- []*entry.Entry{entry.New()} + + stanzaReceiver.consumeEntries(context.Background(), []*entry.Entry{entry.New()}) // Eventually because of asynchronuous nature of the receiver. - require.Eventually(t, - func() bool { - return mockConsumer.LogRecordCount() == 1 - }, - 10*time.Second, 5*time.Millisecond, "one log entry expected", - ) + require.Equal(t, 1, mockConsumer.LogRecordCount()) + require.NoError(t, logsReceiver.Shutdown(context.Background())) } @@ -87,8 +84,8 @@ func TestHandleConsume(t *testing.T) { require.NoError(t, err, "receiver start failed") stanzaReceiver := logsReceiver.(*receiver) - logChan := stanzaReceiver.emitter.OutChannelForWrite() - logChan <- []*entry.Entry{entry.New()} + + stanzaReceiver.consumeEntries(context.Background(), []*entry.Entry{entry.New()}) // Eventually because of asynchronuous nature of the receiver. require.Eventually(t, @@ -113,8 +110,8 @@ func TestHandleConsumeRetry(t *testing.T) { require.NoError(t, logsReceiver.Start(context.Background(), componenttest.NewNopHost())) stanzaReceiver := logsReceiver.(*receiver) - logChan := stanzaReceiver.emitter.OutChannelForWrite() - logChan <- []*entry.Entry{entry.New()} + + stanzaReceiver.consumeEntries(context.Background(), []*entry.Entry{entry.New()}) require.Eventually(t, func() bool { @@ -212,26 +209,12 @@ func benchmarkReceiver(b *testing.B, logsPerIteration int) { Builder: inputBuilder, } - set := componenttest.NewNopTelemetrySettings() - emitter := helper.NewLogEmitter(set) - defer func() { - require.NoError(b, emitter.Stop()) - }() - - pipe, err := pipeline.Config{ - Operators: []operator.Config{inputCfg}, - DefaultOutput: emitter, - }.Build(set) - require.NoError(b, err) - storageClient := storagetest.NewInMemoryClient( component.KindReceiver, component.MustNewID("foolog"), "test", ) - converter := NewConverter(set) - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(b, err) @@ -241,15 +224,27 @@ func benchmarkReceiver(b *testing.B, logsPerIteration int) { receivedLogs: atomic.Uint32{}, } rcv := &receiver{ - set: set, - pipe: pipe, - emitter: emitter, consumer: mockConsumer, - converter: converter, obsrecv: obsrecv, storageClient: storageClient, } + set := componenttest.NewNopTelemetrySettings() + emitter := helper.NewLogEmitter(set, rcv.consumeEntries) + defer func() { + require.NoError(b, emitter.Stop()) + }() + + pipe, err := pipeline.Config{ + Operators: []operator.Config{inputCfg}, + DefaultOutput: emitter, + }.Build(set) + require.NoError(b, err) + + rcv.pipe = pipe + rcv.set = set + rcv.emitter = emitter + b.ResetTimer() require.NoError(b, rcv.Start(context.Background(), nil)) @@ -264,20 +259,54 @@ func benchmarkReceiver(b *testing.B, logsPerIteration int) { } func BenchmarkReadLine(b *testing.B) { + receivedAllLogs := make(chan struct{}) filePath := filepath.Join(b.TempDir(), "bench.log") pipelineYaml := fmt.Sprintf(` -- type: file_input +pipeline: + type: file_input include: - %s start_at: beginning`, filePath) - var operatorCfgs []operator.Config - require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs)) + confmapFilePath := filepath.Join(b.TempDir(), "conf.yaml") + require.NoError(b, os.WriteFile(confmapFilePath, []byte(pipelineYaml), 0600)) + + testConfMaps, err := confmaptest.LoadConf(confmapFilePath) + require.NoError(b, err) + + conf, err := testConfMaps.Sub("pipeline") + require.NoError(b, err) + require.NotNil(b, conf) + + operatorCfg := operator.Config{} + require.NoError(b, conf.Unmarshal(&operatorCfg)) + + operatorCfgs := []operator.Config{operatorCfg} + + storageClient := storagetest.NewInMemoryClient( + component.KindReceiver, + component.MustNewID("foolog"), + "test", + ) + + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) + require.NoError(b, err) + + mockConsumer := &testConsumer{ + receivedAllLogs: receivedAllLogs, + expectedLogs: uint32(b.N), + receivedLogs: atomic.Uint32{}, + } + rcv := &receiver{ + consumer: mockConsumer, + obsrecv: obsrecv, + storageClient: storageClient, + } set := componenttest.NewNopTelemetrySettings() - emitter := helper.NewLogEmitter(set) + emitter := helper.NewLogEmitter(set, rcv.consumeEntries) defer func() { require.NoError(b, emitter.Stop()) }() @@ -288,6 +317,10 @@ func BenchmarkReadLine(b *testing.B) { }.Build(set) require.NoError(b, err) + rcv.pipe = pipe + rcv.set = set + rcv.emitter = emitter + // Populate the file that will be consumed file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0666) require.NoError(b, err) @@ -296,22 +329,13 @@ func BenchmarkReadLine(b *testing.B) { require.NoError(b, err) } - storageClient := storagetest.NewInMemoryClient( - component.KindReceiver, - component.MustNewID("foolog"), - "test", - ) - // Run the actual benchmark b.ResetTimer() - require.NoError(b, pipe.Start(storageClient)) - logChan := emitter.OutChannel() - for i := 0; i < b.N; i++ { - entries := <-logChan - for _, e := range entries { - convert(e) - } - } + require.NoError(b, rcv.Start(context.Background(), nil)) + + <-receivedAllLogs + + require.NoError(b, rcv.Shutdown(context.Background())) } func BenchmarkParseAndMap(b *testing.B) { @@ -344,7 +368,11 @@ func BenchmarkParseAndMap(b *testing.B) { require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs)) set := componenttest.NewNopTelemetrySettings() - emitter := helper.NewLogEmitter(set) + emitter := helper.NewLogEmitter(set, func(_ context.Context, entries []*entry.Entry) { + for _, e := range entries { + convert(e) + } + }) defer func() { require.NoError(b, emitter.Stop()) }() @@ -372,13 +400,6 @@ func BenchmarkParseAndMap(b *testing.B) { // Run the actual benchmark b.ResetTimer() require.NoError(b, pipe.Start(storageClient)) - logChan := emitter.OutChannel() - for i := 0; i < b.N; i++ { - entries := <-logChan - for _, e := range entries { - convert(e) - } - } } const testInputOperatorTypeStr = "test_input" diff --git a/pkg/stanza/operator/helper/emitter.go b/pkg/stanza/operator/helper/emitter.go index 51f1aa772863..aa91b85c92be 100644 --- a/pkg/stanza/operator/helper/emitter.go +++ b/pkg/stanza/operator/helper/emitter.go @@ -14,10 +14,9 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) -// LogEmitter is a stanza operator that emits log entries to a channel +// LogEmitter is a stanza operator that emits log entries to the consumer callback function `consumerFunc` type LogEmitter struct { OutputOperator - logChan chan []*entry.Entry closeChan chan struct{} stopOnce sync.Once batchMux sync.Mutex @@ -25,6 +24,7 @@ type LogEmitter struct { wg sync.WaitGroup maxBatchSize uint flushInterval time.Duration + consumerFunc func(context.Context, []*entry.Entry) } var ( @@ -61,15 +61,15 @@ func (o flushIntervalOption) apply(e *LogEmitter) { } // NewLogEmitter creates a new receiver output -func NewLogEmitter(set component.TelemetrySettings, opts ...EmitterOption) *LogEmitter { +func NewLogEmitter(set component.TelemetrySettings, consumerFunc func(context.Context, []*entry.Entry), opts ...EmitterOption) *LogEmitter { op, _ := NewOutputConfig("log_emitter", "log_emitter").Build(set) e := &LogEmitter{ OutputOperator: op, - logChan: make(chan []*entry.Entry), closeChan: make(chan struct{}), maxBatchSize: defaultMaxBatchSize, batch: make([]*entry.Entry, 0, defaultMaxBatchSize), flushInterval: defaultFlushInterval, + consumerFunc: consumerFunc, } for _, opt := range opts { opt.apply(e) @@ -89,27 +89,15 @@ func (e *LogEmitter) Stop() error { e.stopOnce.Do(func() { close(e.closeChan) e.wg.Wait() - - close(e.logChan) }) return nil } -// OutChannel returns the channel on which entries will be sent to. -func (e *LogEmitter) OutChannel() <-chan []*entry.Entry { - return e.logChan -} - -// OutChannelForWrite returns the channel on which entries can be sent to. -func (e *LogEmitter) OutChannelForWrite() chan []*entry.Entry { - return e.logChan -} - // Process will emit an entry to the output channel func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error { if oldBatch := e.appendEntry(ent); len(oldBatch) > 0 { - e.flush(ctx, oldBatch) + e.consumerFunc(ctx, oldBatch) } return nil @@ -142,26 +130,18 @@ func (e *LogEmitter) flusher() { select { case <-ticker.C: if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 { - e.flush(context.Background(), oldBatch) + e.consumerFunc(context.Background(), oldBatch) } case <-e.closeChan: // flush currently batched entries if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 { - e.flush(context.Background(), oldBatch) + e.consumerFunc(context.Background(), oldBatch) } return } } } -// flush flushes the provided batch to the log channel. -func (e *LogEmitter) flush(ctx context.Context, batch []*entry.Entry) { - select { - case e.logChan <- batch: - case <-ctx.Done(): - } -} - // makeNewBatch replaces the current batch on the log emitter with a new batch, returning the old one func (e *LogEmitter) makeNewBatch() []*entry.Entry { e.batchMux.Lock() diff --git a/pkg/stanza/operator/helper/emitter_test.go b/pkg/stanza/operator/helper/emitter_test.go index f17e7f503b2d..927734dccf56 100644 --- a/pkg/stanza/operator/helper/emitter_test.go +++ b/pkg/stanza/operator/helper/emitter_test.go @@ -6,6 +6,7 @@ package helper import ( "context" "fmt" + "sync" "testing" "time" @@ -17,7 +18,16 @@ import ( ) func TestLogEmitter(t *testing.T) { - emitter := NewLogEmitter(componenttest.NewNopTelemetrySettings()) + rwMtx := &sync.RWMutex{} + var receivedEntries []*entry.Entry + emitter := NewLogEmitter( + componenttest.NewNopTelemetrySettings(), + func(_ context.Context, entries []*entry.Entry) { + rwMtx.Lock() + defer rwMtx.Unlock() + receivedEntries = entries + }, + ) require.NoError(t, emitter.Start(nil)) @@ -27,16 +37,14 @@ func TestLogEmitter(t *testing.T) { in := entry.New() - go func() { - assert.NoError(t, emitter.Process(context.Background(), in)) - }() + assert.NoError(t, emitter.Process(context.Background(), in)) - select { - case out := <-emitter.logChan: - require.Equal(t, in, out[0]) - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for output") - } + require.Eventually(t, func() bool { + rwMtx.RLock() + defer rwMtx.RUnlock() + return receivedEntries != nil + }, time.Second, 10*time.Millisecond) + require.Equal(t, in, receivedEntries[0]) } func TestLogEmitterEmitsOnMaxBatchSize(t *testing.T) { @@ -44,7 +52,16 @@ func TestLogEmitterEmitsOnMaxBatchSize(t *testing.T) { maxBatchSize = 100 timeout = time.Second ) - emitter := NewLogEmitter(componenttest.NewNopTelemetrySettings()) + rwMtx := &sync.RWMutex{} + var receivedEntries []*entry.Entry + emitter := NewLogEmitter( + componenttest.NewNopTelemetrySettings(), + func(_ context.Context, entries []*entry.Entry) { + rwMtx.Lock() + defer rwMtx.Unlock() + receivedEntries = entries + }, + ) require.NoError(t, emitter.Start(nil)) defer func() { @@ -53,21 +70,17 @@ func TestLogEmitterEmitsOnMaxBatchSize(t *testing.T) { entries := complexEntries(maxBatchSize) - go func() { - ctx := context.Background() - for _, e := range entries { - assert.NoError(t, emitter.Process(ctx, e)) - } - }() - - timeoutChan := time.After(timeout) - - select { - case recv := <-emitter.logChan: - require.Len(t, recv, maxBatchSize, "Length of received entries was not the same as max batch size!") - case <-timeoutChan: - require.FailNow(t, "Failed to receive log entries before timeout") + ctx := context.Background() + for _, e := range entries { + assert.NoError(t, emitter.Process(ctx, e)) } + + require.Eventually(t, func() bool { + rwMtx.RLock() + defer rwMtx.RUnlock() + return receivedEntries != nil + }, timeout, 10*time.Millisecond) + require.Len(t, receivedEntries, maxBatchSize) } func TestLogEmitterEmitsOnFlushInterval(t *testing.T) { @@ -75,7 +88,17 @@ func TestLogEmitterEmitsOnFlushInterval(t *testing.T) { flushInterval = 100 * time.Millisecond timeout = time.Second ) - emitter := NewLogEmitter(componenttest.NewNopTelemetrySettings()) + rwMtx := &sync.RWMutex{} + var receivedEntries []*entry.Entry + emitter := NewLogEmitter( + componenttest.NewNopTelemetrySettings(), + func(_ context.Context, entries []*entry.Entry) { + rwMtx.Lock() + defer rwMtx.Unlock() + receivedEntries = entries + }, + ) + emitter.flushInterval = flushInterval require.NoError(t, emitter.Start(nil)) defer func() { @@ -84,19 +107,16 @@ func TestLogEmitterEmitsOnFlushInterval(t *testing.T) { entry := complexEntry() - go func() { - ctx := context.Background() - assert.NoError(t, emitter.Process(ctx, entry)) - }() + ctx := context.Background() + assert.NoError(t, emitter.Process(ctx, entry)) - timeoutChan := time.After(timeout) + require.Eventually(t, func() bool { + rwMtx.RLock() + defer rwMtx.RUnlock() + return receivedEntries != nil + }, timeout, 10*time.Millisecond) - select { - case recv := <-emitter.logChan: - require.Len(t, recv, 1, "Should have received one entry, got %d instead", len(recv)) - case <-timeoutChan: - require.FailNow(t, "Failed to receive log entry before timeout") - } + require.Len(t, receivedEntries, 1) } func complexEntries(count int) []*entry.Entry { diff --git a/pkg/stanza/operator/parser/container/config.go b/pkg/stanza/operator/parser/container/config.go index b707883713f7..81e6f2339be9 100644 --- a/pkg/stanza/operator/parser/container/config.go +++ b/pkg/stanza/operator/parser/container/config.go @@ -67,14 +67,6 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error return nil, err } - cLogEmitter := helper.NewLogEmitter(set) - recombineParser, err := createRecombine(set, c, cLogEmitter) - if err != nil { - return nil, fmt.Errorf("failed to create internal recombine config: %w", err) - } - - wg := sync.WaitGroup{} - if c.Format != "" { switch c.Format { case dockerFormat, crioFormat, containerdFormat: @@ -95,14 +87,24 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error ) } + wg := sync.WaitGroup{} + p := &Parser{ ParserOperator: parserOperator, - recombineParser: recombineParser, format: c.Format, addMetadataFromFilepath: c.AddMetadataFromFilePath, - criLogEmitter: cLogEmitter, criConsumers: &wg, } + + cLogEmitter := helper.NewLogEmitter(set, p.consumeEntries) + p.criLogEmitter = cLogEmitter + recombineParser, err := createRecombine(set, c, cLogEmitter) + if err != nil { + return nil, fmt.Errorf("failed to create internal recombine config: %w", err) + } + + p.recombineParser = recombineParser + return p, nil } diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index b53c025869df..ae27fa778318 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -95,7 +95,6 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { p.Logger().Error("unable to start the internal recombine operator", zap.Error(err)) return } - go p.criConsumer(ctx) p.asyncConsumerStarted = true }) @@ -141,22 +140,6 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { return nil } -// criConsumer receives log entries from the criLogEmitter and -// writes them to the output of the main parser -func (p *Parser) criConsumer(ctx context.Context) { - entriesChan := p.criLogEmitter.OutChannel() - p.criConsumers.Add(1) - defer p.criConsumers.Done() - for entries := range entriesChan { - for _, e := range entries { - err := p.Write(ctx, e) - if err != nil { - p.Logger().Error("failed to write entry", zap.Error(err)) - } - } - } -} - // Stop ensures that the internal recombineParser, the internal criLogEmitter and // the crioConsumer are stopped in the proper order without being affected by // any possible race conditions @@ -165,7 +148,6 @@ func (p *Parser) Stop() error { // nothing is started return return nil } - var stopErrs []error err := p.recombineParser.Stop() if err != nil { @@ -305,6 +287,15 @@ func (p *Parser) extractk8sMetaFromFilePath(e *entry.Entry) error { return nil } +func (p *Parser) consumeEntries(ctx context.Context, entries []*entry.Entry) { + for _, e := range entries { + err := p.Write(ctx, e) + if err != nil { + p.Logger().Error("failed to write entry", zap.Error(err)) + } + } +} + func moveField(e *entry.Entry, originalKey, mappedKey string) error { val, exist := entry.NewAttributeField(originalKey).Delete(e) if !exist { diff --git a/processor/logstransformprocessor/processor.go b/processor/logstransformprocessor/processor.go index 270ea0439488..09f3a16430c9 100644 --- a/processor/logstransformprocessor/processor.go +++ b/processor/logstransformprocessor/processor.go @@ -17,6 +17,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" @@ -31,7 +32,6 @@ type logsTransformProcessor struct { pipe *pipeline.DirectedPipeline firstOperator operator.Operator emitter *helper.LogEmitter - converter *adapter.Converter fromConverter *adapter.FromPdataConverter shutdownFns []component.ShutdownFunc } @@ -45,7 +45,7 @@ func newProcessor(config *Config, nextConsumer consumer.Logs, set component.Tele baseCfg := p.config.BaseConfig - p.emitter = helper.NewLogEmitter(p.set) + p.emitter = helper.NewLogEmitter(p.set, p.consumeStanzaLogEntries) pipe, err := pipeline.Config{ Operators: baseCfg.Operators, DefaultOutput: p.emitter, @@ -79,8 +79,6 @@ func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error { } func (ltp *logsTransformProcessor) Start(ctx context.Context, _ component.Host) error { - // create all objects before starting them, since the loops (consumerLoop, converterLoop) depend on these converters not being nil. - ltp.converter = adapter.NewConverter(ltp.set) wkrCount := int(math.Max(1, float64(runtime.NumCPU()))) ltp.fromConverter = adapter.NewFromPdataConverter(ltp.set, wkrCount) @@ -90,15 +88,10 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, _ component.Host) // fromConverter: converts logs to stanza format -> // converterLoop: forwards converted logs to the stanza pipeline -> // pipeline: performs user configured operations on the logs -> - // emitterLoop: forwards output stanza logs for conversion to OTLP -> - // converter: converts stanza logs to OTLP -> - // consumerLoop: sends the converted OTLP logs to the next consumer + // transformProcessor: receives []*entry.Entries, converts them to plog.Logs and sends the converted OTLP logs to the next consumer // // We should start these components in reverse order of the data flow, then stop them in order of the data flow, // in order to allow for pipeline draining. - ltp.startConsumerLoop(ctx) - ltp.startConverter() - ltp.startEmitterLoop(ctx) err := ltp.startPipeline() if err != nil { return err @@ -151,41 +144,6 @@ func (ltp *logsTransformProcessor) startPipeline() error { return nil } -// startEmitterLoop starts the loop which reads all the logs modified by the pipeline and then forwards -// them to converter -func (ltp *logsTransformProcessor) startEmitterLoop(ctx context.Context) { - wg := &sync.WaitGroup{} - wg.Add(1) - go ltp.emitterLoop(ctx, wg) - - ltp.shutdownFns = append(ltp.shutdownFns, func(_ context.Context) error { - wg.Wait() - return nil - }) -} - -func (ltp *logsTransformProcessor) startConverter() { - ltp.converter.Start() - - ltp.shutdownFns = append(ltp.shutdownFns, func(_ context.Context) error { - ltp.converter.Stop() - return nil - }) -} - -// startConsumerLoop starts the loop which reads all the logs produced by the converter -// (aggregated by Resource) and then places them on the next consumer -func (ltp *logsTransformProcessor) startConsumerLoop(ctx context.Context) { - wg := &sync.WaitGroup{} - wg.Add(1) - go ltp.consumerLoop(ctx, wg) - - ltp.shutdownFns = append(ltp.shutdownFns, func(_ context.Context) error { - wg.Wait() - return nil - }) -} - func (ltp *logsTransformProcessor) ConsumeLogs(_ context.Context, ld plog.Logs) error { // Add the logs to the chain return ltp.fromConverter.Batch(ld) @@ -219,48 +177,9 @@ func (ltp *logsTransformProcessor) converterLoop(ctx context.Context, wg *sync.W } } -// emitterLoop reads the log entries produced by the emitter and batches them -// in converter. -func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context, wg *sync.WaitGroup) { - defer wg.Done() - - for { - select { - case <-ctx.Done(): - ltp.set.Logger.Debug("emitter loop stopped") - return - case e, ok := <-ltp.emitter.OutChannel(): - if !ok { - ltp.set.Logger.Debug("emitter channel got closed") - return - } - - if err := ltp.converter.Batch(e); err != nil { - ltp.set.Logger.Error("processor encountered an issue with the converter", zap.Error(err)) - } - } - } -} - -// consumerLoop reads converter log entries and calls the consumer to consumer them. -func (ltp *logsTransformProcessor) consumerLoop(ctx context.Context, wg *sync.WaitGroup) { - defer wg.Done() - - for { - select { - case <-ctx.Done(): - ltp.set.Logger.Debug("consumer loop stopped") - return - - case pLogs, ok := <-ltp.converter.OutChannel(): - if !ok { - ltp.set.Logger.Debug("converter channel got closed") - return - } - - if err := ltp.consumer.ConsumeLogs(ctx, pLogs); err != nil { - ltp.set.Logger.Error("processor encountered an issue with next consumer", zap.Error(err)) - } - } +func (ltp *logsTransformProcessor) consumeStanzaLogEntries(ctx context.Context, entries []*entry.Entry) { + pLogs := adapter.ConvertEntries(entries) + if err := ltp.consumer.ConsumeLogs(ctx, pLogs); err != nil { + ltp.set.Logger.Error("processor encountered an issue with next consumer", zap.Error(err)) } } diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 288b3c117783..8b44f83f670a 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -250,6 +250,7 @@ func TestLogLargeFiles(t *testing.T) { sender testbed.DataSender receiver testbed.DataReceiver loadOptions testbed.LoadOptions + resourceSpec testbed.ResourceSpec sleepSeconds int }{ { @@ -266,6 +267,10 @@ func TestLogLargeFiles(t *testing.T) { ItemsPerBatch: 1, Parallel: 100, }, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 80, + ExpectedMaxRAM: 150, + }, sleepSeconds: 100, }, { @@ -282,6 +287,10 @@ func TestLogLargeFiles(t *testing.T) { ItemsPerBatch: 10, Parallel: 10, }, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 100, + ExpectedMaxRAM: 150, + }, sleepSeconds: 200, }, }