diff --git a/CHANGELOG.md b/CHANGELOG.md index 43636a26ed2f..4519cef6f6d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Main (unreleased) ### Enhancements - Add an option to the windows static mode installer for expanding environment vars in the yaml config. (@erikbaranowski) +- Add authentication support to `loki.source.awsfirehose` (@sberz) - Sort kubelet endpoint to reduce pressure on K8s's API server and watcher endpoints. (@hainenber) diff --git a/component/loki/source/aws_firehose/component.go b/component/loki/source/aws_firehose/component.go index 639f8e6c7c16..12552b2f604e 100644 --- a/component/loki/source/aws_firehose/component.go +++ b/component/loki/source/aws_firehose/component.go @@ -17,6 +17,7 @@ import ( flow_relabel "github.com/grafana/agent/component/common/relabel" "github.com/grafana/agent/component/loki/source/aws_firehose/internal" "github.com/grafana/agent/pkg/util" + "github.com/grafana/river/rivertypes" ) func init() { @@ -32,6 +33,7 @@ func init() { type Arguments struct { Server *fnet.ServerConfig `river:",squash"` + AccessKey rivertypes.Secret `river:"access_key,attr,optional"` UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"` ForwardTo []loki.LogsReceiver `river:"forward_to,attr"` RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"` @@ -131,6 +133,10 @@ func (c *Component) Update(args component.Arguments) error { handlerNeedsUpdate = true } + if c.args.AccessKey != newArgs.AccessKey { + handlerNeedsUpdate = true + } + // Since the handler is created ad-hoc for the server, and the handler depends on the relabels // consider this as a cause for server restart as well. Much simpler than adding a lock on the // handler and doing the relabel rules change on the fly @@ -159,7 +165,7 @@ func (c *Component) Update(args component.Arguments) error { if err = c.server.MountAndRun(func(router *mux.Router) { // re-create handler when server is re-computed - handler := internal.NewHandler(c, c.logger, c.handlerMetrics, c.rbs, newArgs.UseIncomingTimestamp) + handler := internal.NewHandler(c, c.logger, c.handlerMetrics, c.rbs, newArgs.UseIncomingTimestamp, string(newArgs.AccessKey)) router.Path("/awsfirehose/api/v1/push").Methods("POST").Handler(handler) }); err != nil { return err diff --git a/component/loki/source/aws_firehose/internal/handler.go b/component/loki/source/aws_firehose/internal/handler.go index 8313f1a63e7a..6a59ec0ee73b 100644 --- a/component/loki/source/aws_firehose/internal/handler.go +++ b/component/loki/source/aws_firehose/internal/handler.go @@ -5,6 +5,7 @@ import ( "bytes" "compress/gzip" "context" + "crypto/subtle" "encoding/base64" "encoding/json" "fmt" @@ -57,16 +58,18 @@ type Handler struct { sender Sender relabelRules []*relabel.Config useIncomingTs bool + accessKey string } // NewHandler creates a new handler. -func NewHandler(sender Sender, logger log.Logger, metrics *Metrics, rbs []*relabel.Config, useIncomingTs bool) *Handler { +func NewHandler(sender Sender, logger log.Logger, metrics *Metrics, rbs []*relabel.Config, useIncomingTs bool, accessKey string) *Handler { return &Handler{ metrics: metrics, logger: logger, sender: sender, relabelRules: rbs, useIncomingTs: useIncomingTs, + accessKey: accessKey, } } @@ -76,6 +79,16 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { defer req.Body.Close() level.Info(h.logger).Log("msg", "handling request") + // authenticate request if the component has an access key configured + if len(h.accessKey) > 0 { + apiHeader := req.Header.Get("X-Amz-Firehose-Access-Key") + + if subtle.ConstantTimeCompare([]byte(apiHeader), []byte(h.accessKey)) != 1 { + http.Error(w, "access key not provided or incorrect", http.StatusUnauthorized) + return + } + } + var bodyReader io.Reader = req.Body // firehose allows the user to configure gzip content-encoding, in that case // decompress in the reader during unmarshalling diff --git a/component/loki/source/aws_firehose/internal/handler_test.go b/component/loki/source/aws_firehose/internal/handler_test.go index 89a4ebecc0e2..c926dbc34bd6 100644 --- a/component/loki/source/aws_firehose/internal/handler_test.go +++ b/component/loki/source/aws_firehose/internal/handler_test.go @@ -313,7 +313,8 @@ func TestHandler(t *testing.T) { testReceiver := &receiver{entries: make([]loki.Entry, 0)} registry := prometheus.NewRegistry() - handler := NewHandler(testReceiver, logger, NewMetrics(registry), tc.Relabels, tc.UseIncomingTs) + accessKey := "" + handler := NewHandler(testReceiver, logger, NewMetrics(registry), tc.Relabels, tc.UseIncomingTs, accessKey) bs := bytes.NewBuffer(nil) var bodyReader io.Reader = strings.NewReader(tc.Body) @@ -360,6 +361,71 @@ func TestHandler(t *testing.T) { } } +func TestHandlerAuth(t *testing.T) { + type testcase struct { + // AccessKey configures the key required by the handler to accept requests + AccessKey string + + // ReqAccessKey configures the key sent in the request + ReqAccessKey string + + // ExpectedCode is the expected HTTP status code + ExpectedCode int + } + + tests := map[string]testcase{ + "auth disabled": { + AccessKey: "", + ReqAccessKey: "", + ExpectedCode: 200, + }, + "auth enabled, valid key": { + AccessKey: "fakekey", + ReqAccessKey: "fakekey", + ExpectedCode: 200, + }, + "auth enabled, invalid key": { + AccessKey: "fakekey", + ReqAccessKey: "badkey", + ExpectedCode: 401, + }, + "auth enabled, no key": { + AccessKey: "fakekey", + ReqAccessKey: "", + ExpectedCode: 401, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + testReceiver := &receiver{entries: make([]loki.Entry, 0)} + registry := prometheus.NewRegistry() + relabeling := []*relabel.Config{} + incommingTs := false + handler := NewHandler(testReceiver, logger, NewMetrics(registry), relabeling, incommingTs, tc.AccessKey) + + body := strings.NewReader(readTestData(t, "testdata/direct_put.json")) + req, err := http.NewRequest("POST", "http://test", body) + req.Header.Set("X-Amz-Firehose-Request-Id", testRequestID) + req.Header.Set("X-Amz-Firehose-Source-Arn", testSourceARN) + req.Header.Set("X-Amz-Firehose-Protocol-Version", "1.0") + req.Header.Set("User-Agent", "Amazon Kinesis Data Firehose Agent/1.0") + if tc.ReqAccessKey != "" { + req.Header.Set("X-Amz-Firehose-Access-Key", tc.ReqAccessKey) + } + require.NoError(t, err) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + require.Equal(t, tc.ExpectedCode, recorder.Code) + }) + } +} + const cwLambdaControlMessage = `CWL CONTROL MESSAGE: Checking health of destination Firehose.` var cwLambdaLogMessages = []string{ diff --git a/docs/sources/flow/reference/components/loki.source.awsfirehose.md b/docs/sources/flow/reference/components/loki.source.awsfirehose.md index 86bf634e395a..9b1d2c6d75c5 100644 --- a/docs/sources/flow/reference/components/loki.source.awsfirehose.md +++ b/docs/sources/flow/reference/components/loki.source.awsfirehose.md @@ -75,11 +75,12 @@ The component will start an HTTP server on the configured port and address with `loki.source.awsfirehose` supports the following arguments: -| Name | Type | Description | Default | Required | - |--------------------------|----------------------|------------------------------------------------------------|---------|----------| -| `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes | +| Name | Type | Description | Default | Required | +| ------------------------ | -------------------- | -------------------------------------------------------------- | ------- | -------- | +| `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes | | `use_incoming_timestamp` | `bool` | Whether or not to use the timestamp received from the request. | `false` | no | -| `relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no | +| `relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no | +| `access_key` | `secret` | If set, require AWS Firehose to provide a matching key. | `""` | no | The `relabel_rules` field can make use of the `rules` export value from a [`loki.relabel`][loki.relabel] component to apply one or more relabeling rules to log entries before they're forwarded