Skip to content

Commit

Permalink
[exporter] moved mergeBatchFunc and mergeBatchSplitFunc to request (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#11459)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR changes `mergeBatchFunc` and `mergeBatchSplit` function as a
member function of `batchRequest`.

<!-- Issue number if applicable -->
#### Link to tracking issue

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
sfc-gh-sili authored Oct 21, 2024
1 parent 5cd035b commit 2efeae4
Show file tree
Hide file tree
Showing 22 changed files with 348 additions and 423 deletions.
27 changes: 27 additions & 0 deletions .chloggen/merge-function-as-requet-method.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Made mergeFunc and mergeSplitFunc required method of exporter.Request

# One or more tracking issues or pull requests related to the change
issues: [10368]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
mergeFunc and mergeSplitFunc used to be part of the configuration pass to the exporter. Now it is changed
| to be a method function of request.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
24 changes: 0 additions & 24 deletions exporter/exporterbatcher/batch_func.go

This file was deleted.

8 changes: 0 additions & 8 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,3 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
func WithBatcher(cfg exporterbatcher.Config) Option {
return internal.WithBatcher(cfg)
}

// WithBatchFuncs enables setting custom batch merge functions.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request],
msf exporterbatcher.BatchMergeSplitFunc[Request]) Option {
return internal.WithBatchFuncs(mf, msf)
}
2 changes: 1 addition & 1 deletion exporter/exporterhelper/exporterhelperprofiles/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
go.opentelemetry.io/collector/exporter v0.111.0
go.opentelemetry.io/collector/exporter/exporterprofiles v0.111.0
go.opentelemetry.io/collector/exporter/exportertest v0.111.0
go.opentelemetry.io/collector/pdata v1.17.0
go.opentelemetry.io/collector/pdata/pprofile v0.111.0
go.opentelemetry.io/collector/pdata/testdata v0.111.0
go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.0.0-20241021162523-3193106bf4b1
Expand All @@ -38,6 +37,7 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.111.0 // indirect
go.opentelemetry.io/collector/extension v0.111.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.111.0 // indirect
go.opentelemetry.io/collector/pdata v1.17.0 // indirect
go.opentelemetry.io/collector/pipeline v0.111.0 // indirect
go.opentelemetry.io/collector/receiver v0.111.0 // indirect
go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion exporter/exporterhelper/exporterhelperprofiles/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func NewProfilesExporter(
}
profilesOpts := []exporterhelper.Option{
internal.WithMarshaler(profilesRequestMarshaler), internal.WithUnmarshaler(newProfileRequestUnmarshalerFunc(pusher)),
internal.WithBatchFuncs(mergeProfiles, mergeSplitProfiles),
}
return NewProfilesRequestExporter(ctx, set, requestFromProfiles(pusher), append(profilesOpts, options...)...)
}
Expand Down
21 changes: 10 additions & 11 deletions exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,28 @@ import (
"go.opentelemetry.io/collector/pdata/pprofile"
)

// mergeProfiles merges two profiles requests into one.
func mergeProfiles(_ context.Context, r1 exporterhelper.Request, r2 exporterhelper.Request) (exporterhelper.Request, error) {
tr1, ok1 := r1.(*profilesRequest)
// Merge merges two profiles requests into one.
func (req *profilesRequest) Merge(_ context.Context, r2 exporterhelper.Request) (exporterhelper.Request, error) {
tr2, ok2 := r2.(*profilesRequest)
if !ok1 || !ok2 {
if !ok2 {
return nil, errors.New("invalid input type")
}
tr2.pd.ResourceProfiles().MoveAndAppendTo(tr1.pd.ResourceProfiles())
return tr1, nil
tr2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles())
return req, nil
}

// mergeSplitProfiles splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
func mergeSplitProfiles(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 exporterhelper.Request, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
var (
res []exporterhelper.Request
destReq *profilesRequest
capacityLeft = cfg.MaxSizeItems
)
for _, req := range []exporterhelper.Request{r1, r2} {
if req == nil {
for _, r := range []exporterhelper.Request{req, r2} {
if r == nil {
continue
}
srcReq, ok := req.(*profilesRequest)
srcReq, ok := r.(*profilesRequest)
if !ok {
return nil, errors.New("invalid input type")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,25 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/testdata"
)

func TestMergeProfiles(t *testing.T) {
pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
res, err := mergeProfiles(context.Background(), pr1, pr2)
res, err := pr1.Merge(context.Background(), pr2)
require.NoError(t, err)
fmt.Fprintf(os.Stdout, "%#v\n", res.(*profilesRequest).pd)
assert.Equal(t, 5, res.(*profilesRequest).pd.SampleCount())
}

func TestMergeProfilesInvalidInput(t *testing.T) {
pr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
pr1 := &dummyRequest{}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
_, err := mergeProfiles(context.Background(), pr1, pr2)
_, err := pr2.Merge(context.Background(), pr1)
assert.Error(t, err)
}

Expand All @@ -51,13 +49,6 @@ func TestMergeSplitProfiles(t *testing.T) {
pr2: &profilesRequest{pd: pprofile.NewProfiles()},
expected: []*profilesRequest{{pd: pprofile.NewProfiles()}},
},
{
name: "both_requests_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: nil,
pr2: nil,
expected: []*profilesRequest{},
},
{
name: "first_request_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
Expand All @@ -66,17 +57,10 @@ func TestMergeSplitProfiles(t *testing.T) {
expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}},
},
{
name: "first_requests_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: nil,
pr2: &profilesRequest{pd: testdata.GenerateProfiles(5)},
expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}},
},
{
name: "first_nil_second_empty",
name: "first_empty_second_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: nil,
pr2: &profilesRequest{pd: pprofile.NewProfiles()},
pr1: &profilesRequest{pd: pprofile.NewProfiles()},
pr2: nil,
expected: []*profilesRequest{{pd: pprofile.NewProfiles()}},
},
{
Expand All @@ -93,8 +77,8 @@ func TestMergeSplitProfiles(t *testing.T) {
{
name: "split_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4},
pr1: nil,
pr2: &profilesRequest{pd: testdata.GenerateProfiles(10)},
pr1: &profilesRequest{pd: testdata.GenerateProfiles(10)},
pr2: nil,
expected: []*profilesRequest{
{pd: testdata.GenerateProfiles(4)},
{pd: testdata.GenerateProfiles(4)},
Expand Down Expand Up @@ -133,7 +117,7 @@ func TestMergeSplitProfiles(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, err := mergeSplitProfiles(context.Background(), tt.cfg, tt.pr1, tt.pr2)
res, err := tt.pr1.MergeSplit(context.Background(), tt.cfg, tt.pr2)
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i, r := range res {
Expand All @@ -145,9 +129,9 @@ func TestMergeSplitProfiles(t *testing.T) {
}

func TestMergeSplitProfilesInvalidInput(t *testing.T) {
r1 := &tracesRequest{td: testdata.GenerateTraces(2)}
r1 := &dummyRequest{}
r2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
_, err := mergeSplitProfiles(context.Background(), exporterbatcher.MaxSizeConfig{}, r1, r2)
_, err := r2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r1)
assert.Error(t, err)
}

Expand All @@ -160,15 +144,23 @@ func TestExtractProfiles(t *testing.T) {
}
}

type tracesRequest struct {
td ptrace.Traces
pusher consumer.ConsumeTracesFunc
// dummyRequest implements Request. It is for checking that merging two request types would fail
type dummyRequest struct {
}

func (req *dummyRequest) Export(_ context.Context) error {
return nil
}

func (req *dummyRequest) ItemsCount() int {
return 1
}

func (req *tracesRequest) Export(ctx context.Context) error {
return req.pusher(ctx, req.td)
func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.Request) (exporterhelper.Request, error) {
return nil, nil
}

func (req *tracesRequest) ItemsCount() int {
return req.td.SpanCount()
func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.Request) (
[]exporterhelper.Request, error) {
return nil, nil
}
18 changes: 1 addition & 17 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ type BaseExporter struct {

Signal pipeline.Signal

BatchMergeFunc exporterbatcher.BatchMergeFunc[internal.Request]
BatchMergeSplitfunc exporterbatcher.BatchMergeSplitFunc[internal.Request]

Marshaler exporterqueue.Marshaler[internal.Request]
Unmarshaler exporterqueue.Unmarshaler[internal.Request]

Expand Down Expand Up @@ -104,10 +101,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
}

if be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc)
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}

Expand Down Expand Up @@ -298,16 +292,6 @@ func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Op
}
}

// withBatchFuncs is used to set the functions for merging and splitting batches for OLTP-based exporters.
// It must be provided as the first option when creating a new exporter helper.
func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) Option {
return func(o *BaseExporter) error {
o.BatchMergeFunc = mf
o.BatchMergeSplitfunc = msf
return nil
}
}

func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
if err != nil {
require.Equal(t, codes.Error, sd.Status().Code, "SpanData %v", sd)
Expand Down
24 changes: 13 additions & 11 deletions exporter/exporterhelper/internal/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
// - concurrencyLimit is reached.
type BatchSender struct {
BaseRequestSender
cfg exporterbatcher.Config
mergeFunc exporterbatcher.BatchMergeFunc[internal.Request]
mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[internal.Request]
cfg exporterbatcher.Config

// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
Expand All @@ -46,14 +44,11 @@ type BatchSender struct {
}

// newBatchSender returns a new batch consumer component.
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings,
mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) *BatchSender {
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender {
bs := &BatchSender{
activeBatch: newEmptyBatch(),
cfg: cfg,
logger: set.Logger,
mergeFunc: mf,
mergeSplitFunc: msf,
shutdownCh: nil,
shutdownCompleteCh: make(chan struct{}),
stopped: &atomic.Bool{},
Expand Down Expand Up @@ -156,10 +151,17 @@ func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error {
func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Request) error {
bs.mu.Lock()

reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req)
if err != nil || len(reqs) == 0 {
var reqs []internal.Request
var mergeSplitErr error
if bs.activeBatch.request == nil {
reqs, mergeSplitErr = req.MergeSplit(ctx, bs.cfg.MaxSizeConfig, nil)
} else {
reqs, mergeSplitErr = bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req)
}

if mergeSplitErr != nil || len(reqs) == 0 {
bs.mu.Unlock()
return err
return mergeSplitErr
}

bs.activeRequests.Add(1)
Expand Down Expand Up @@ -201,7 +203,7 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request)

if bs.activeBatch.request != nil {
var err error
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
req, err = bs.activeBatch.request.Merge(ctx, req)
if err != nil {
bs.mu.Unlock()
return err
Expand Down
Loading

0 comments on commit 2efeae4

Please sign in to comment.