Skip to content

Commit

Permalink
Adds snappy support for http_listener_v2 (influxdata#8966)
Browse files Browse the repository at this point in the history
  • Loading branch information
helenosheaa authored Mar 18, 2021
1 parent 1eb47e2 commit cc6c51c
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 15 deletions.
53 changes: 38 additions & 15 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/golang/snappy"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
Expand Down Expand Up @@ -247,28 +248,50 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request)
}

func (h *HTTPListenerV2) collectBody(res http.ResponseWriter, req *http.Request) ([]byte, bool) {
body := req.Body
encoding := req.Header.Get("Content-Encoding")

// Handle gzip request bodies
if req.Header.Get("Content-Encoding") == "gzip" {
var err error
body, err = gzip.NewReader(req.Body)
switch encoding {
case "gzip":
r, err := gzip.NewReader(req.Body)
if err != nil {
h.Log.Debug(err.Error())
badRequest(res)
return nil, false
}
defer body.Close()
}

body = http.MaxBytesReader(res, body, h.MaxBodySize.Size)
bytes, err := ioutil.ReadAll(body)
if err != nil {
tooLarge(res)
return nil, false
defer r.Close()
maxReader := http.MaxBytesReader(res, r, h.MaxBodySize.Size)
bytes, err := ioutil.ReadAll(maxReader)
if err != nil {
tooLarge(res)
return nil, false
}
return bytes, true
case "snappy":
defer req.Body.Close()
bytes, err := ioutil.ReadAll(req.Body)
if err != nil {
h.Log.Debug(err.Error())
badRequest(res)
return nil, false
}
// snappy block format is only supported by decode/encode not snappy reader/writer
bytes, err = snappy.Decode(nil, bytes)
if err != nil {
h.Log.Debug(err.Error())
badRequest(res)
return nil, false
}
return bytes, true
default:
defer req.Body.Close()
bytes, err := ioutil.ReadAll(req.Body)
if err != nil {
h.Log.Debug(err.Error())
badRequest(res)
return nil, false
}
return bytes, true
}

return bytes, true
}

func (h *HTTPListenerV2) collectQuery(res http.ResponseWriter, req *http.Request) ([]byte, bool) {
Expand Down
39 changes: 39 additions & 0 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/golang/snappy"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -327,6 +328,44 @@ func TestWriteHTTPGzippedData(t *testing.T) {
}
}

// test that writing snappy data works
func TestWriteHTTPSnappyData(t *testing.T) {
listener := newTestHTTPListenerV2()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()

testData := "cpu_load_short,host=server01 value=12.0 1422568543702900257\n"
encodedData := snappy.Encode(nil, []byte(testData))

req, err := http.NewRequest("POST", createURL(listener, "http", "/write", ""), bytes.NewBuffer(encodedData))
require.NoError(t, err)
req.Header.Set("Content-Encoding", "snappy")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
t.Log("Test client request failed. Error: ", err)
}
err = resp.Body.Close()
if err != nil {
t.Log("Test client close failed. Error: ", err)
}
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)

hostTags := []string{"server01"}
acc.Wait(1)

for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
)
}
}

// writes 25,000 metrics to the listener with 10 different writers
func TestWriteHTTPHighTraffic(t *testing.T) {
if runtime.GOOS == "darwin" {
Expand Down

0 comments on commit cc6c51c

Please sign in to comment.