Skip to content

Commit

Permalink
Improving memory performance when it comes to snappy (open-telemetry#…
Browse files Browse the repository at this point in the history
…11177)

#### Description
Moving snappy to lazy read from the original payload instead
decompressing all of the buffer into memory.

This is something I noticed while trying to introduce support for lz4
compression, moved into its own PR at the suggestion of @atoulme.

#### Testing

All of the tests are already present so no additional tests were needed.
  • Loading branch information
MovieStoreGuy authored and HongChenTW committed Dec 19, 2024
1 parent ad82d2d commit 3f2e6a4
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 11 deletions.
25 changes: 25 additions & 0 deletions .chloggen/msg_fix-snappy-lazy-read.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Snappy compression to lazy read for memory efficiency

# One or more tracking issues or pull requests related to the change
issues: [11177]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
23 changes: 23 additions & 0 deletions config/confighttp/compress_readcloser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package confighttp // import "go.opentelemetry.io/collector/config/confighttp"

import "io"

// compressReadCloser couples the original compressed reader
// and the compression reader to ensure that the original body
// is correctly closed to ensure resources are freed.
type compressReadCloser struct {
io.Reader
orig io.ReadCloser
}

var (
_ io.Reader = (*compressReadCloser)(nil)
_ io.Closer = (*compressReadCloser)(nil)
)

func (crc *compressReadCloser) Close() error {
return crc.orig.Close()
}
77 changes: 77 additions & 0 deletions config/confighttp/compress_readcloser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package confighttp

import (
"bytes"
"errors"
"io"
"testing"
"testing/iotest"

"github.com/stretchr/testify/require"
)

type errorReadCloser struct {
io.Reader
err error
}

func (erc errorReadCloser) Close() error {
return erc.err
}

func TestCompressReadCloser(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
wrapper func(r io.Reader) io.ReadCloser
content []byte
errVal string
}{
{
name: "non mutating wrapper",
wrapper: func(r io.Reader) io.ReadCloser {
return errorReadCloser{
Reader: r,
err: nil,
}
},
content: []byte("hello world"),
errVal: "",
},
{
name: "failed reader",
wrapper: func(r io.Reader) io.ReadCloser {
return errorReadCloser{
Reader: r,
err: errors.New("failed to close reader"),
}
},
errVal: "failed to close reader",
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

orig := bytes.NewBuffer([]byte("hello world"))

crc := &compressReadCloser{
Reader: orig,
orig: tc.wrapper(orig),
}

require.NoError(t, iotest.TestReader(crc, orig.Bytes()), "Must be able to read original content")

err := crc.Close()
if tc.errVal != "" {
require.EqualError(t, err, tc.errVal, "Must match the expected error message")
} else {
require.NoError(t, err, "Must not error when closing reader")
}
})
}
}
16 changes: 6 additions & 10 deletions config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,13 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro
}
return zr, nil
},
//nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature
"snappy": func(body io.ReadCloser) (io.ReadCloser, error) {
sr := snappy.NewReader(body)
sb := new(bytes.Buffer)
_, err := io.Copy(sb, sr)
if err != nil {
return nil, err
}
if err = body.Close(); err != nil {
return nil, err
}
return io.NopCloser(sb), nil
// Lazy Reading content to improve memory efficiency
return &compressReadCloser{
Reader: snappy.NewReader(body),
orig: body,
}, nil
},
}

Expand Down
2 changes: 1 addition & 1 deletion config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
encoding: "snappy",
reqBody: bytes.NewBuffer(testBody),
respCode: http.StatusBadRequest,
respBody: "snappy: corrupt input\n",
respBody: "snappy: corrupt input",
},
{
name: "UnsupportedCompression",
Expand Down

0 comments on commit 3f2e6a4

Please sign in to comment.