Skip to content

Commit

Permalink
Merge pull request cortexproject#6378 from SungJin1212/Add-OTLPWriteH…
Browse files Browse the repository at this point in the history
…andler-Benchmark

Add OTLP write handler benchmark
  • Loading branch information
CharlieTLe authored Dec 18, 2024
2 parents 79259ae + 3af142e commit 81321dc
Showing 1 changed file with 163 additions and 56 deletions.
219 changes: 163 additions & 56 deletions pkg/util/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,133 +220,240 @@ func TestOTLPConvertToPromTS(t *testing.T) {
}
}

// for testing
type resetReader struct {
*bytes.Reader
body []byte
}

func newResetReader(body []byte) *resetReader {
return &resetReader{
Reader: bytes.NewReader(body),
body: body,
}
}

func (r *resetReader) Reset() {
r.Reader.Reset(r.body)
}

func (r *resetReader) Close() error {
return nil
}

func getOTLPHttpRequest(otlpRequest *pmetricotlp.ExportRequest, contentType, encodingType string) (*http.Request, error) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")

var body []byte
var err error
switch contentType {
case jsonContentType:
body, err = otlpRequest.MarshalJSON()
if err != nil {
return nil, err
}
case pbContentType:
body, err = otlpRequest.MarshalProto()
if err != nil {
return nil, err
}
}

if encodingType == "gzip" {
var gzipBody bytes.Buffer
gz := gzip.NewWriter(&gzipBody)
_, err = gz.Write(body)
if err != nil {
return nil, err
}
if err = gz.Close(); err != nil {
return nil, err
}
body = gzipBody.Bytes()
}

req, err := http.NewRequestWithContext(ctx, "", "", newResetReader(body))
if err != nil {
return nil, err
}

switch contentType {
case jsonContentType:
req.Header.Set("Content-Type", jsonContentType)
case pbContentType:
req.Header.Set("Content-Type", pbContentType)
}

if encodingType != "" {
req.Header.Set("Content-Encoding", encodingType)
}
req.ContentLength = int64(len(body))

return req, nil
}

func BenchmarkOTLPWriteHandler(b *testing.B) {
cfg := distributor.OTLPConfig{
ConvertAllAttributes: false,
DisableTargetInfo: false,
}
overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)
require.NoError(b, err)

exportRequest := generateOTLPWriteRequest()
mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}
handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc)

b.Run("json with no compression", func(b *testing.B) {
req, err := getOTLPHttpRequest(&exportRequest, jsonContentType, "")
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(b, http.StatusOK, resp.StatusCode)
req.Body.(*resetReader).Reset()
}
})
b.Run("json with gzip", func(b *testing.B) {
req, err := getOTLPHttpRequest(&exportRequest, jsonContentType, "gzip")
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(b, http.StatusOK, resp.StatusCode)
req.Body.(*resetReader).Reset()
}
})
b.Run("proto with no compression", func(b *testing.B) {
req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "")
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(b, http.StatusOK, resp.StatusCode)
req.Body.(*resetReader).Reset()
}
})
b.Run("proto with gzip", func(b *testing.B) {
req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "gzip")
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(b, http.StatusOK, resp.StatusCode)
req.Body.(*resetReader).Reset()
}
})
}

func TestOTLPWriteHandler(t *testing.T) {
cfg := distributor.OTLPConfig{
ConvertAllAttributes: false,
DisableTargetInfo: false,
}

exportRequest := generateOTLPWriteRequest(t)
exportRequest := generateOTLPWriteRequest()

tests := []struct {
description string
maxRecvMsgSize int
format string
contentType string
expectedStatusCode int
expectedErrMsg string
gzipCompression bool
encodingType string
}{
{
description: "Test proto format write with no compression",
maxRecvMsgSize: 10000,
format: pbContentType,
contentType: pbContentType,
expectedStatusCode: http.StatusOK,
},
{
description: "Test proto format write with gzip",
maxRecvMsgSize: 10000,
format: pbContentType,
contentType: pbContentType,
expectedStatusCode: http.StatusOK,
encodingType: "gzip",
gzipCompression: true,
},
{
description: "Test json format write with no compression",
maxRecvMsgSize: 10000,
format: jsonContentType,
contentType: jsonContentType,
expectedStatusCode: http.StatusOK,
},
{
description: "Test json format write with gzip",
maxRecvMsgSize: 10000,
format: jsonContentType,
contentType: jsonContentType,
expectedStatusCode: http.StatusOK,
encodingType: "gzip",
gzipCompression: true,
},
{
description: "request too big than maxRecvMsgSize (proto) with no compression",
maxRecvMsgSize: 10,
format: pbContentType,
contentType: pbContentType,
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "received message larger than max",
},
{
description: "request too big than maxRecvMsgSize (proto) with gzip",
maxRecvMsgSize: 10,
format: pbContentType,
contentType: pbContentType,
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "received message larger than max",
encodingType: "gzip",
gzipCompression: true,
},
{
description: "request too big than maxRecvMsgSize (json) with no compression",
maxRecvMsgSize: 10,
format: jsonContentType,
contentType: jsonContentType,
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "received message larger than max",
},
{
description: "request too big than maxRecvMsgSize (json) with gzip",
maxRecvMsgSize: 10,
format: jsonContentType,
contentType: jsonContentType,
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "received message larger than max",
encodingType: "gzip",
gzipCompression: true,
},
{
description: "invalid encoding type: snappy",
maxRecvMsgSize: 10000,
format: jsonContentType,
contentType: jsonContentType,
expectedStatusCode: http.StatusBadRequest,
encodingType: "snappy",
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")
var req *http.Request

compressionFunc := func(t *testing.T, body []byte) []byte {
var b bytes.Buffer
gz := gzip.NewWriter(&b)
_, err := gz.Write(body)
require.NoError(t, err)
require.NoError(t, gz.Close())

return b.Bytes()
}

if test.format == pbContentType {
buf, err := exportRequest.MarshalProto()
require.NoError(t, err)

if test.gzipCompression {
buf = compressionFunc(t, buf)
}

req, err = http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf))
require.NoError(t, err)
req.Header.Set("Content-Type", pbContentType)
req.Header.Set("Content-Encoding", test.encodingType)
} else {
buf, err := exportRequest.MarshalJSON()
require.NoError(t, err)

if test.gzipCompression {
buf = compressionFunc(t, buf)
}

req, err = http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf))
require.NoError(t, err)
req.Header.Set("Content-Type", jsonContentType)
req.Header.Set("Content-Encoding", test.encodingType)
}
req, err := getOTLPHttpRequest(&exportRequest, test.contentType, test.encodingType)
require.NoError(t, err)

push := verifyOTLPWriteRequestHandler(t, cortexpb.API)
overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)
Expand All @@ -368,7 +475,7 @@ func TestOTLPWriteHandler(t *testing.T) {
}
}

func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest {
func generateOTLPWriteRequest() pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
Expand Down

0 comments on commit 81321dc

Please sign in to comment.