Skip to content

Commit

Permalink
Internalize Prometheus remote write output
Browse files Browse the repository at this point in the history
Move the Prometheus remote write output from the external extension's
repository to the k6 core repository.
  • Loading branch information
codebien committed Jan 24, 2025
1 parent 7129195 commit 4de4431
Show file tree
Hide file tree
Showing 19 changed files with 3,610 additions and 1 deletion.
2 changes: 1 addition & 1 deletion internal/cmd/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"go.k6.io/k6/internal/output/cloud"
"go.k6.io/k6/internal/output/influxdb"
"go.k6.io/k6/internal/output/json"
"go.k6.io/k6/internal/output/prometheusrw/remotewrite"
"go.k6.io/k6/lib"
"go.k6.io/k6/output"
"go.k6.io/k6/output/csv"

"github.com/grafana/xk6-dashboard/dashboard"
"github.com/grafana/xk6-output-opentelemetry/pkg/opentelemetry"
"github.com/grafana/xk6-output-prometheus-remote/pkg/remotewrite"
)

// builtinOutput marks the available builtin outputs.
Expand Down
142 changes: 142 additions & 0 deletions internal/output/prometheusrw/remote/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Package remote implements the Prometheus remote write protocol.
package remote

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"math"
"net/http"
"net/url"
"time"

"github.com/grafana/xk6-output-prometheus-remote/pkg/sigv4"

prompb "buf.build/gen/go/prometheus/prometheus/protocolbuffers/go"
"github.com/klauspost/compress/snappy"
"google.golang.org/protobuf/proto"
)

// HTTPConfig holds the config for the HTTP client.
type HTTPConfig struct {
Timeout time.Duration
TLSConfig *tls.Config
BasicAuth *BasicAuth
SigV4 *sigv4.Config
Headers http.Header
}

// BasicAuth holds the config for basic authentication.
type BasicAuth struct {
Username, Password string
}

// WriteClient is a client implementation of the Prometheus remote write protocol.
// It follows the specs defined by the official design document:
// https://docs.google.com/document/d/1LPhVRSFkGNSuU1fBd81ulhsCPR4hkSZyyBj1SZ8fWOM
type WriteClient struct {
hc *http.Client
url *url.URL
cfg *HTTPConfig
}

// NewWriteClient creates a new WriteClient.
func NewWriteClient(endpoint string, cfg *HTTPConfig) (*WriteClient, error) {
if cfg == nil {
cfg = &HTTPConfig{}
}
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
wc := &WriteClient{
hc: &http.Client{
Timeout: cfg.Timeout,
},
url: u,
cfg: cfg,
}
if cfg.TLSConfig != nil {
wc.hc.Transport = &http.Transport{
TLSClientConfig: cfg.TLSConfig,
}
}
if cfg.SigV4 != nil {
tripper, err := sigv4.NewRoundTripper(cfg.SigV4, wc.hc.Transport)
if err != nil {
return nil, err
}
wc.hc.Transport = tripper
}
return wc, nil
}

// Store sends a batch of samples to the HTTP endpoint,
// the request is the proto marshaled and encoded.
func (c *WriteClient) Store(ctx context.Context, series []*prompb.TimeSeries) error {
b, err := newWriteRequestBody(series)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(
ctx, http.MethodPost, c.url.String(), bytes.NewReader(b))
if err != nil {
return fmt.Errorf("create new HTTP request failed: %w", err)
}
if c.cfg.BasicAuth != nil {
req.SetBasicAuth(c.cfg.BasicAuth.Username, c.cfg.BasicAuth.Password)
}

if len(c.cfg.Headers) > 0 {
req.Header = c.cfg.Headers.Clone()
}

req.Header.Set("User-Agent", "k6-prometheus-rw-output")

// They are mostly defined by the specs
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

resp, err := c.hc.Do(req)
if err != nil {
return fmt.Errorf("HTTP POST request failed: %w", err)
}
defer func() {
err = resp.Body.Close()
if err != nil {
panic(err)
}
}()

_, err = io.Copy(io.Discard, resp.Body)
if err != nil {
return err
}

return validateResponseStatus(resp.StatusCode)
}

func newWriteRequestBody(series []*prompb.TimeSeries) ([]byte, error) {
b, err := proto.Marshal(&prompb.WriteRequest{
Timeseries: series,
})
if err != nil {
return nil, fmt.Errorf("encoding series as protobuf write request failed: %w", err)
}
if snappy.MaxEncodedLen(len(b)) < 0 {
return nil, fmt.Errorf("the protobuf message is too large to be handled by Snappy encoder; "+
"size: %d, limit: %d", len(b), math.MaxUint32)
}
return snappy.Encode(nil, b), nil
}

func validateResponseStatus(code int) error {
if code >= http.StatusOK && code < 300 {
return nil
}

return fmt.Errorf("got status code: %d instead expected a 2xx successful status code", code)
}
229 changes: 229 additions & 0 deletions internal/output/prometheusrw/remote/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package remote

import (
"context"
"io"
"math"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/grafana/xk6-output-prometheus-remote/pkg/stale"

prompb "buf.build/gen/go/prometheus/prometheus/protocolbuffers/go"
"github.com/golang/snappy"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

func TestNewWriteClient(t *testing.T) {
t.Parallel()
t.Run("DefaultConfig", func(t *testing.T) {
t.Parallel()
wc, err := NewWriteClient("http://example.com/api/v1/write", nil)
require.NoError(t, err)
require.NotNil(t, wc)
assert.Equal(t, wc.cfg, &HTTPConfig{})
})

t.Run("CustomConfig", func(t *testing.T) {
t.Parallel()
hc := &HTTPConfig{Timeout: time.Second}
wc, err := NewWriteClient("http://example.com/api/v1/write", hc)
require.NoError(t, err)
require.NotNil(t, wc)
assert.Equal(t, wc.cfg, hc)
})

t.Run("InvalidURL", func(t *testing.T) {
t.Parallel()
wc, err := NewWriteClient("fake://bad url", nil)
require.Error(t, err)
assert.Nil(t, wc)
})
}

func TestClientStore(t *testing.T) {
t.Parallel()
h := func(rw http.ResponseWriter, r *http.Request) {
assert.Equal(t, r.Header.Get("Content-Encoding"), "snappy")
assert.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf")
assert.Equal(t, r.Header.Get("User-Agent"), "k6-prometheus-rw-output")
assert.Equal(t, r.Header.Get("X-Prometheus-Remote-Write-Version"), "0.1.0")
assert.NotEmpty(t, r.Header.Get("Content-Length"))

b, err := io.ReadAll(r.Body)
assert.NoError(t, err)
assert.NotEmpty(t, len(b))

rw.WriteHeader(http.StatusNoContent)
}
ts := httptest.NewServer(http.HandlerFunc(h))
defer ts.Close()

u, err := url.Parse(ts.URL)
require.NoError(t, err)

c := &WriteClient{
hc: ts.Client(),
url: u,
cfg: &HTTPConfig{},
}
data := &prompb.TimeSeries{
Labels: []*prompb.Label{
{
Name: "label1",
Value: "label1-val",
},
},
Samples: []*prompb.Sample{
{
Value: 8.5,
Timestamp: time.Now().UnixMilli(),
},
},
}
err = c.Store(context.Background(), []*prompb.TimeSeries{data})
assert.NoError(t, err)
}

func TestClientStoreHTTPError(t *testing.T) {
t.Parallel()
h := func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "bad", http.StatusUnauthorized)
}
ts := httptest.NewServer(http.HandlerFunc(h))
defer ts.Close()

u, err := url.Parse(ts.URL)
require.NoError(t, err)

c := &WriteClient{
hc: ts.Client(),
url: u,
cfg: &HTTPConfig{},
}
assert.Error(t, c.Store(context.Background(), nil))
}

func TestClientStoreHTTPBasic(t *testing.T) {
t.Parallel()
h := func(_ http.ResponseWriter, r *http.Request) {
u, pwd, ok := r.BasicAuth()
require.True(t, ok)
assert.Equal(t, "usertest", u)
assert.Equal(t, "pwdtest", pwd)
}
ts := httptest.NewServer(http.HandlerFunc(h))
defer ts.Close()

u, err := url.Parse(ts.URL)
require.NoError(t, err)

c := &WriteClient{
hc: ts.Client(),
url: u,
cfg: &HTTPConfig{
BasicAuth: &BasicAuth{
Username: "usertest",
Password: "pwdtest",
},
},
}
assert.NoError(t, c.Store(context.Background(), nil))
}

func TestClientStoreHeaders(t *testing.T) {
t.Parallel()
h := func(_ http.ResponseWriter, r *http.Request) {
assert.Equal(t, r.Header.Get("X-Prometheus-Remote-Write-Version"), "0.1.0")
assert.Equal(t, r.Header.Get("X-MY-CUSTOM-HEADER"), "fake")
}
ts := httptest.NewServer(http.HandlerFunc(h))
defer ts.Close()

u, err := url.Parse(ts.URL)
require.NoError(t, err)

c := &WriteClient{
hc: ts.Client(),
url: u,
cfg: &HTTPConfig{
Headers: http.Header(map[string][]string{
"X-MY-CUSTOM-HEADER": {"fake"},
// If the same key, of a mandatory protocol's header
// is provided, it will be overwritten.
"X-Prometheus-Remote-Write-Version": {"fake"},
}),
},
}
assert.NoError(t, c.Store(context.Background(), nil))
}

func TestNewWriteRequestBody(t *testing.T) {
t.Parallel()
ts := []*prompb.TimeSeries{
{
Labels: []*prompb.Label{{Name: "label1", Value: "val1"}},
Samples: []*prompb.Sample{{Value: 10.1, Timestamp: time.Unix(1, 0).Unix()}},
},
}
b, err := newWriteRequestBody(ts)
require.NoError(t, err)
require.NotEmpty(t, string(b))
assert.Contains(t, string(b), `label1`)
}

func TestNewWriteRequestBodyWithStaleMarker(t *testing.T) {
t.Parallel()

timestamp := time.Date(2022, time.December, 15, 11, 41, 18, 123, time.UTC)

ts := []*prompb.TimeSeries{
{
Labels: []*prompb.Label{{Name: "label1", Value: "val1"}},
Samples: []*prompb.Sample{{
Value: stale.Marker,
Timestamp: timestamp.UnixMilli(),
}},
},
}
b, err := newWriteRequestBody(ts)
require.NoError(t, err)
require.NotEmpty(t, b)

sb, err := snappy.Decode(nil, b)
require.NoError(t, err)

var series prompb.WriteRequest
err = proto.Unmarshal(sb, &series)
require.NoError(t, err)
require.NotEmpty(t, series.Timeseries[0])
require.NotEmpty(t, series.Timeseries[0].Samples)

assert.True(t, math.IsNaN(series.Timeseries[0].Samples[0].Value))
assert.Equal(t, timestamp.UnixMilli(), series.Timeseries[0].Samples[0].Timestamp)
}

func TestValidateStatusCode(t *testing.T) {
t.Parallel()
tests := []struct {
status int
expErr bool
}{
{status: http.StatusOK, expErr: false}, // Mimir
{status: http.StatusNoContent, expErr: false}, // Prometheus
{status: http.StatusBadRequest, expErr: true},
}
for _, tt := range tests {
err := validateResponseStatus(tt.status)
if tt.expErr {
assert.Error(t, err)
continue
}
assert.NoError(t, err)
}
}
Loading

0 comments on commit 4de4431

Please sign in to comment.