Skip to content

Commit

Permalink
Feature: Adding support for lz4 compression
Browse files Browse the repository at this point in the history
Having trialed out the compression algorithm in a differnt project, the
performance impact has greatly reduced the amount of resource overhead
with similar compression ratios when compared to flate or gzip.
  • Loading branch information
MovieStoreGuy committed Sep 11, 2024
1 parent 0e22dc3 commit 5ea30fc
Show file tree
Hide file tree
Showing 24 changed files with 190 additions and 27 deletions.
25 changes: 25 additions & 0 deletions .chloggen/msg_feature-include-lz4-compression.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: "confighttp"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adding support for lz4 compression into the project

# One or more tracking issues or pull requests related to the change
issues: [9128]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.2.3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.20.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcorecol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/configcompression/compressiontype.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
TypeDeflate Type = "deflate"
TypeSnappy Type = "snappy"
TypeZstd Type = "zstd"
TypeLz4 Type = "lz4"
typeNone Type = "none"
typeEmpty Type = ""
)
Expand All @@ -31,6 +32,7 @@ func (ct *Type) UnmarshalText(in []byte) error {
typ == TypeDeflate ||
typ == TypeSnappy ||
typ == TypeZstd ||
typ == TypeLz4 ||
typ == typeNone ||
typ == typeEmpty {
*ct = typ
Expand Down
6 changes: 6 additions & 0 deletions config/configcompression/compressiontype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func TestUnmarshalText(t *testing.T) {
compressionName: []byte("none"),
shouldError: false,
},
{
name: "ValidLz4",
compressionName: []byte("lz4"),
isCompressed: true,
shouldError: false,
},
{
name: "Invalid",
compressionName: []byte("ggip"),
Expand Down
4 changes: 2 additions & 2 deletions config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ README](../configtls/README.md).
- [`read_buffer_size`](https://golang.org/pkg/net/http/#Transport)
- [`timeout`](https://golang.org/pkg/net/http/#Client)
- [`write_buffer_size`](https://golang.org/pkg/net/http/#Transport)
- `compression`: Compression type to use among `gzip`, `zstd`, `snappy`, `zlib`, and `deflate`.
- `compression`: Compression type to use among `gzip`, `zstd`, `snappy`, `zlib`, `deflate`, and `lz4`.
- look at the documentation for the server-side of the communication.
- `none` will be treated as uncompressed, and any other inputs will cause an error.
- [`max_idle_conns`](https://golang.org/pkg/net/http/#Transport)
Expand Down Expand Up @@ -79,7 +79,7 @@ will not be enabled.
not set, browsers use a default of 5 seconds.
- `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md)
- `max_request_body_size`: configures the maximum allowed body size in bytes for a single request. Default: `20971520` (20MiB)
- `compression_algorithms`: configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate"]
- `compression_algorithms`: configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"]
- [`tls`](../configtls/README.md)
- [`auth`](../configauth/README.md)
- `request_params`: a list of query parameter names to add to the auth context, along with the HTTP headers
Expand Down
37 changes: 16 additions & 21 deletions config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"

"go.opentelemetry.io/collector/config/configcompression"
)
Expand All @@ -31,11 +32,7 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro
return nil, nil
},
"gzip": func(body io.ReadCloser) (io.ReadCloser, error) {
gr, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
return gr, nil
return gzip.NewReader(body)
},
"zstd": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zstd.NewReader(
Expand All @@ -52,23 +49,16 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro
return zr.IOReadCloser(), nil
},
"zlib": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zlib.NewReader(body)
if err != nil {
return nil, err
}
return zr, nil
return zlib.NewReader(body)
},
//nolint:unparam // Require to have error return so it can be part of map
"snappy": func(body io.ReadCloser) (io.ReadCloser, error) {
sr := snappy.NewReader(body)
sb := new(bytes.Buffer)
_, err := io.Copy(sb, sr)
if err != nil {
return nil, err
}
if err = body.Close(); err != nil {
return nil, err
}
return io.NopCloser(sb), nil
return io.NopCloser(snappy.NewReader(body)), nil
},
//nolint:unparam // Require to have error return so it can be part of map
"lz4": func(body io.ReadCloser) (io.ReadCloser, error) {
return io.NopCloser(lz4.NewReader(body)), nil

},
}

Expand Down Expand Up @@ -123,7 +113,6 @@ type decompressor struct {
// httpContentDecompressor offloads the task of handling compressed HTTP requests
// by identifying the compression format in the "Content-Encoding" header and re-writing
// request body so that the handlers further in the chain can work on decompressed data.
// It supports gzip and deflate/zlib compression.
func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), enableDecoders []string, decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
errHandler := defaultErrorHandler
if eh != nil {
Expand Down Expand Up @@ -160,6 +149,12 @@ func (d *decompressor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if newBody != nil {
defer func(orig io.ReadCloser) {
// Ensure the original body is correctly consumed and closed
_, _ = io.Copy(io.Discard, orig)
_ = orig.Close()
}(r.Body)

defer newBody.Close()
// "Content-Encoding" header is removed to avoid decompressing twice
// in case the next handler(s) have implemented a similar mechanism.
Expand Down
102 changes: 101 additions & 1 deletion config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -31,6 +32,7 @@ func TestHTTPClientCompression(t *testing.T) {
compressedDeflateBody := compressZlib(t, testBody)
compressedSnappyBody := compressSnappy(t, testBody)
compressedZstdBody := compressZstd(t, testBody)
compressedLz4Body := compressLz4(t, testBody)

tests := []struct {
name string
Expand Down Expand Up @@ -80,6 +82,12 @@ func TestHTTPClientCompression(t *testing.T) {
reqBody: compressedZstdBody.Bytes(),
shouldError: false,
},
{
name: "ValidLz4",
encoding: configcompression.TypeLz4,
reqBody: compressedLz4Body.Bytes(),
shouldError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -197,6 +205,12 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
reqBody: compressSnappy(t, testBody),
respCode: http.StatusOK,
},
{
name: "ValidLz4",
encoding: "lz4",
reqBody: compressLz4(t, testBody),
respCode: http.StatusOK,
},
{
name: "InvalidDeflate",
encoding: "deflate",
Expand Down Expand Up @@ -230,7 +244,14 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
encoding: "snappy",
reqBody: bytes.NewBuffer(testBody),
respCode: http.StatusBadRequest,
respBody: "snappy: corrupt input\n",
respBody: "snappy: corrupt input",
},
{
name: "InvalidLz4",
encoding: "lz4",
reqBody: bytes.NewBuffer(testBody),
respCode: http.StatusBadRequest,
respBody: "lz4: bad magic number",
},
{
name: "UnsupportedCompression",
Expand Down Expand Up @@ -374,6 +395,76 @@ func TestOverrideCompressionList(t *testing.T) {
require.NoError(t, res.Body.Close(), "failed to close request body: %v", err)
}

func TestDecompressorAvoidDecompressionBomb(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
encoding string
compress func(tb testing.TB, payload []byte) *bytes.Buffer
}{
// None encoding is ignored since it does not
// enforce the max body size if content encoding header is not set
{
name: "gzip",
encoding: "gzip",
compress: compressGzip,
},
{
name: "zstd",
encoding: "zstd",
compress: compressZstd,
},
{
name: "zlib",
encoding: "zlib",
compress: compressZlib,
},
{
name: "snappy",
encoding: "snappy",
compress: compressSnappy,
},
{
name: "lz4",
encoding: "lz4",
compress: compressLz4,
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

h := httpContentDecompressor(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
n, err := io.Copy(io.Discard, r.Body)
assert.Equal(t, int64(1024), n, "Must have only read the limited value of bytes")
assert.EqualError(t, err, "http: request body too large")
w.WriteHeader(http.StatusBadRequest)
}),
1024,
defaultErrorHandler,
defaultCompressionAlgorithms,
availableDecoders,
)

payload := tc.compress(t, make([]byte, 2*1024)) // 2KB uncompressed payload
assert.NotEmpty(t, payload.Bytes(), "Must have data available")

req := httptest.NewRequest(http.MethodPost, "/", payload)
req.Header.Set("Content-Encoding", tc.encoding)

resp := httptest.NewRecorder()

h.ServeHTTP(resp, req)

assert.Equal(t, http.StatusBadRequest, resp.Code, "Must match the expected code")
assert.Empty(t, resp.Body.String(), "Must match the returned string")
assert.Empty(t, payload.Bytes(), "Must have consumed original payload")
})
}
}

func compressGzip(t testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
Expand Down Expand Up @@ -409,3 +500,12 @@ func compressZstd(t testing.TB, body []byte) *bytes.Buffer {
require.NoError(t, zw.Close())
return &buf
}

func compressLz4(tb testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
lz := lz4.NewWriter(&buf)
_, err := lz.Write(body)
require.NoError(tb, err)
require.NoError(tb, lz.Close())
return &buf
}
14 changes: 12 additions & 2 deletions config/confighttp/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"bytes"
"compress/gzip"
"compress/zlib"
"errors"
"fmt"
"io"
"sync"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"

"go.opentelemetry.io/collector/config/configcompression"
)
Expand All @@ -32,6 +33,13 @@ var (
zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)); return zw }}}
_ writeCloserReset = (*zlib.Writer)(nil)
zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}}
_ writeCloserReset = (*lz4.Writer)(nil)
lz4Pool = &compressor{pool: sync.Pool{New: func() any {
lz := lz4.NewWriter(nil)
// Setting concurrency to 1 to disable async decoding by goroutines. This will reduce the overall memory footprint and pool
_ = lz.Apply(lz4.ConcurrencyOption(1))
return lz
}}}
)

type compressor struct {
Expand All @@ -50,8 +58,10 @@ func newCompressor(compressionType configcompression.Type) (*compressor, error)
return zStdPool, nil
case configcompression.TypeZlib, configcompression.TypeDeflate:
return zLibPool, nil
case configcompression.TypeLz4:
return lz4Pool, nil
}
return nil, errors.New("unsupported compression type, ")
return nil, fmt.Errorf("unsupported compression type %q", compressionType)

Check warning on line 64 in config/confighttp/compressor.go

View check run for this annotation

Codecov / codecov/patch

config/confighttp/compressor.go#L64

Added line #L64 was not covered by tests
}

func (p *compressor) compress(buf *bytes.Buffer, body io.ReadCloser) error {
Expand Down
2 changes: 1 addition & 1 deletion config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

const headerContentEncoding = "Content-Encoding"
const defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB
var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate"}
var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"}

// ClientConfig defines settings for creating an HTTP client.
type ClientConfig struct {
Expand Down
1 change: 1 addition & 0 deletions config/confighttp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.0
require (
github.com/golang/snappy v0.0.4
github.com/klauspost/compress v1.17.9
github.com/pierrec/lz4/v4 v4.1.21
github.com/rs/cors v1.11.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.109.0
Expand Down
2 changes: 2 additions & 0 deletions config/confighttp/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions exporter/otlphttpexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.20.3 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions exporter/otlphttpexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5ea30fc

Please sign in to comment.