diff --git a/exporter/exporterbatcher/config.go b/exporter/exporterbatcher/config.go index 239dc2dd4fe..f9cb604ff77 100644 --- a/exporter/exporterbatcher/config.go +++ b/exporter/exporterbatcher/config.go @@ -31,6 +31,7 @@ type MinSizeConfig struct { // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. MinSizeItems int `mapstructure:"min_size_items"` + MinSizeBytes int `mapstructure:"min_size_bytes"` } // MaxSizeConfig defines the configuration for the maximum number of items in a batch. @@ -41,18 +42,32 @@ type MaxSizeConfig struct { // If the batch size exceeds this value, it will be broken up into smaller batches if possible. // Setting this value to zero disables the maximum size limit. MaxSizeItems int `mapstructure:"max_size_items"` + MaxSizeBytes int `mapstructure:"max_size_bytes"` } func (c Config) Validate() error { + if c.MinSizeBytes != 0 && c.MinSizeItems != 0 || c.MinSizeBytes != 0 && c.MaxSizeItems != 0 || c.MinSizeItems != 0 && c.MaxSizeBytes != 0 { + return errors.New("size limit and bytes limit cannot be specified at the same time") + } + if c.MinSizeItems < 0 { return errors.New("min_size_items must be greater than or equal to zero") } + if c.MinSizeBytes < 0 { + return errors.New("min_size_bytes must be greater than or equal to zero") + } if c.MaxSizeItems < 0 { return errors.New("max_size_items must be greater than or equal to zero") } + if c.MaxSizeBytes < 0 { + return errors.New("max_size_bytes must be greater than or equal to zero") + } if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems { return errors.New("max_size_items must be greater than or equal to min_size_items") } + if c.MaxSizeBytes != 0 && c.MaxSizeBytes < c.MinSizeBytes { + return errors.New("max_size_bytes must be greater than or equal to min_size_bytes") + } if c.FlushTimeout <= 0 { return errors.New("timeout must be greater than zero") } diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 55652dd7b4a..1fcc863b5f7 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -63,6 +63,10 @@ func (req *logsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.ld) } +func (req *logsRequest) BytesSize() int { + return req.ld.GetOrig().Size() +} + func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index 3e5b1330de9..2a5be98e368 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -25,9 +25,10 @@ func (req *logsRequest) Merge(_ context.Context, r2 Request) (Request, error) { // conforming with the MaxSizeConfig. func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { var ( - res []Request - destReq *logsRequest - capacityLeft = cfg.MaxSizeItems + res []Request + destReq *logsRequest + // capacityLeft = cfg.MaxSizeItems + capacityLeft = cfg.MaxSizeBytes ) for _, req := range []Request{req, r2} { if req == nil { @@ -37,22 +38,22 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz if !ok { return nil, errors.New("invalid input type") } - if srcReq.ld.LogRecordCount() <= capacityLeft { + if srcReq.ld.GetOrig().Size() <= capacityLeft { if destReq == nil { destReq = srcReq } else { srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) } - capacityLeft -= destReq.ld.LogRecordCount() + capacityLeft -= destReq.ld.GetOrig().Size() continue } for { extractedLogs := extractLogs(srcReq.ld, capacityLeft) - if extractedLogs.LogRecordCount() == 0 { + if extractedLogs.GetOrig().Size() == 0 { break } - capacityLeft -= extractedLogs.LogRecordCount() + capacityLeft -= extractedLogs.GetOrig().Size() if destReq == nil { destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher} } else { diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index 3023fa4df46..0fa57c7b8aa 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -28,6 +28,20 @@ func (qb *DefaultBatcher) resetTimer() { } } +func (qb *DefaultBatcher) maxSizeLimitExists() bool { + return qb.batchCfg.MaxSizeItems > 0 || qb.batchCfg.MaxSizeBytes > 0 +} + +func (qb *DefaultBatcher) reachedMinSizeThreadhold(req internal.Request) bool { + if qb.batchCfg.MinSizeItems > 0 { + return req.ItemsCount() >= qb.batchCfg.MinSizeItems + } else if qb.batchCfg.MinSizeBytes > 0 { + return req.BytesSize() >= qb.batchCfg.MinSizeBytes + } else { + return true + } +} + // startReadingFlushingGoroutine starts a goroutine that reads and then flushes. func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.stopWG.Add(1) @@ -43,7 +57,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatchMu.Lock() - if qb.batchCfg.MaxSizeItems > 0 { + if qb.maxSizeLimitExists() { var reqList []internal.Request var mergeSplitErr error if qb.currentBatch == nil || qb.currentBatch.req == nil { @@ -60,7 +74,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { } // If there was a split, we flush everything immediately. - if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 { + if qb.reachedMinSizeThreadhold(reqList[0]) || len(reqList) > 1 { qb.currentBatch = nil qb.currentBatchMu.Unlock() for i := 0; i < len(reqList); i++ { @@ -102,7 +116,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { } } - if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems { + if qb.reachedMinSizeThreadhold(qb.currentBatch.req) { batchToFlush := *qb.currentBatch qb.currentBatch = nil qb.currentBatchMu.Unlock() diff --git a/exporter/internal/request.go b/exporter/internal/request.go index bd24d982de6..7cf477e3df4 100644 --- a/exporter/internal/request.go +++ b/exporter/internal/request.go @@ -19,6 +19,8 @@ type Request interface { // sent. For example, for OTLP exporter, this value represents the number of spans, // metric data points or log records. ItemsCount() int + // BytesSize returns the size of serialized request. + BytesSize() int // Merge is a function that merges this request with another one into a single request. // Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is // marked as not mutable. diff --git a/pdata/plog/json.go b/pdata/plog/json.go index 373985e7090..b435e188d50 100644 --- a/pdata/plog/json.go +++ b/pdata/plog/json.go @@ -40,7 +40,7 @@ func (*JSONUnmarshaler) UnmarshalLogs(buf []byte) (Logs, error) { if iter.Error != nil { return Logs{}, iter.Error } - otlp.MigrateLogs(ld.getOrig().ResourceLogs) + otlp.MigrateLogs(ld.GetOrig().ResourceLogs) return ld, nil } diff --git a/pdata/plog/logs.go b/pdata/plog/logs.go index 490526090f8..0b79e2ae655 100644 --- a/pdata/plog/logs.go +++ b/pdata/plog/logs.go @@ -17,7 +17,7 @@ func newLogs(orig *otlpcollectorlog.ExportLogsServiceRequest) Logs { return Logs(internal.NewLogs(orig, &state)) } -func (ms Logs) getOrig() *otlpcollectorlog.ExportLogsServiceRequest { +func (ms Logs) GetOrig() *otlpcollectorlog.ExportLogsServiceRequest { return internal.GetOrigLogs(internal.Logs(ms)) } @@ -57,7 +57,7 @@ func (ms Logs) LogRecordCount() int { // ResourceLogs returns the ResourceLogsSlice associated with this Logs. func (ms Logs) ResourceLogs() ResourceLogsSlice { - return newResourceLogsSlice(&ms.getOrig().ResourceLogs, internal.GetLogsState(internal.Logs(ms))) + return newResourceLogsSlice(&ms.GetOrig().ResourceLogs, internal.GetLogsState(internal.Logs(ms))) } // MarkReadOnly marks the Logs as shared so that no further modifications can be done on it. diff --git a/pdata/plog/logs_test.go b/pdata/plog/logs_test.go index b47393a763b..ff01c0f917d 100644 --- a/pdata/plog/logs_test.go +++ b/pdata/plog/logs_test.go @@ -71,7 +71,7 @@ func TestToFromLogOtlp(t *testing.T) { otlp := &otlpcollectorlog.ExportLogsServiceRequest{} logs := newLogs(otlp) assert.EqualValues(t, NewLogs(), logs) - assert.EqualValues(t, otlp, logs.getOrig()) + assert.EqualValues(t, otlp, logs.GetOrig()) } func TestResourceLogsWireCompatibility(t *testing.T) { @@ -84,7 +84,7 @@ func TestResourceLogsWireCompatibility(t *testing.T) { fillTestResourceLogsSlice(logs.ResourceLogs()) // Marshal its underlying ProtoBuf to wire. - wire1, err := gogoproto.Marshal(logs.getOrig()) + wire1, err := gogoproto.Marshal(logs.GetOrig()) require.NoError(t, err) assert.NotNil(t, wire1) @@ -105,7 +105,7 @@ func TestResourceLogsWireCompatibility(t *testing.T) { // Now compare that the original and final ProtoBuf messages are the same. // This proves that goproto and gogoproto marshaling/unmarshaling are wire compatible. - assert.EqualValues(t, logs.getOrig(), &gogoprotoRS2) + assert.EqualValues(t, logs.GetOrig(), &gogoprotoRS2) } func TestLogsCopyTo(t *testing.T) {