Skip to content

Commit

Permalink
feat(loki.source.awsfirehose): add authentication support (#6114)
Browse files Browse the repository at this point in the history
Add an optional `access_key` attribute to enforce authentication for AWS
Firehose requests.

Signed-off-by: Simon Berz <[email protected]>
Co-authored-by: mattdurham <[email protected]>
  • Loading branch information
sberz and mattdurham authored Jan 12, 2024
1 parent eae8c03 commit e23aa30
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Main (unreleased)
### Enhancements

- Add an option to the windows static mode installer for expanding environment vars in the yaml config. (@erikbaranowski)
- Add authentication support to `loki.source.awsfirehose` (@sberz)

- Sort kubelet endpoint to reduce pressure on K8s's API server and watcher endpoints. (@hainenber)

Expand Down
8 changes: 7 additions & 1 deletion component/loki/source/aws_firehose/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
flow_relabel "github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/aws_firehose/internal"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/river/rivertypes"
)

func init() {
Expand All @@ -32,6 +33,7 @@ func init() {

type Arguments struct {
Server *fnet.ServerConfig `river:",squash"`
AccessKey rivertypes.Secret `river:"access_key,attr,optional"`
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
Expand Down Expand Up @@ -131,6 +133,10 @@ func (c *Component) Update(args component.Arguments) error {
handlerNeedsUpdate = true
}

if c.args.AccessKey != newArgs.AccessKey {
handlerNeedsUpdate = true
}

// Since the handler is created ad-hoc for the server, and the handler depends on the relabels
// consider this as a cause for server restart as well. Much simpler than adding a lock on the
// handler and doing the relabel rules change on the fly
Expand Down Expand Up @@ -159,7 +165,7 @@ func (c *Component) Update(args component.Arguments) error {

if err = c.server.MountAndRun(func(router *mux.Router) {
// re-create handler when server is re-computed
handler := internal.NewHandler(c, c.logger, c.handlerMetrics, c.rbs, newArgs.UseIncomingTimestamp)
handler := internal.NewHandler(c, c.logger, c.handlerMetrics, c.rbs, newArgs.UseIncomingTimestamp, string(newArgs.AccessKey))
router.Path("/awsfirehose/api/v1/push").Methods("POST").Handler(handler)
}); err != nil {
return err
Expand Down
15 changes: 14 additions & 1 deletion component/loki/source/aws_firehose/internal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"compress/gzip"
"context"
"crypto/subtle"
"encoding/base64"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -57,16 +58,18 @@ type Handler struct {
sender Sender
relabelRules []*relabel.Config
useIncomingTs bool
accessKey string
}

// NewHandler creates a new handler.
func NewHandler(sender Sender, logger log.Logger, metrics *Metrics, rbs []*relabel.Config, useIncomingTs bool) *Handler {
func NewHandler(sender Sender, logger log.Logger, metrics *Metrics, rbs []*relabel.Config, useIncomingTs bool, accessKey string) *Handler {
return &Handler{
metrics: metrics,
logger: logger,
sender: sender,
relabelRules: rbs,
useIncomingTs: useIncomingTs,
accessKey: accessKey,
}
}

Expand All @@ -76,6 +79,16 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
level.Info(h.logger).Log("msg", "handling request")

// authenticate request if the component has an access key configured
if len(h.accessKey) > 0 {
apiHeader := req.Header.Get("X-Amz-Firehose-Access-Key")

if subtle.ConstantTimeCompare([]byte(apiHeader), []byte(h.accessKey)) != 1 {
http.Error(w, "access key not provided or incorrect", http.StatusUnauthorized)
return
}
}

var bodyReader io.Reader = req.Body
// firehose allows the user to configure gzip content-encoding, in that case
// decompress in the reader during unmarshalling
Expand Down
68 changes: 67 additions & 1 deletion component/loki/source/aws_firehose/internal/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ func TestHandler(t *testing.T) {

testReceiver := &receiver{entries: make([]loki.Entry, 0)}
registry := prometheus.NewRegistry()
handler := NewHandler(testReceiver, logger, NewMetrics(registry), tc.Relabels, tc.UseIncomingTs)
accessKey := ""
handler := NewHandler(testReceiver, logger, NewMetrics(registry), tc.Relabels, tc.UseIncomingTs, accessKey)

bs := bytes.NewBuffer(nil)
var bodyReader io.Reader = strings.NewReader(tc.Body)
Expand Down Expand Up @@ -360,6 +361,71 @@ func TestHandler(t *testing.T) {
}
}

func TestHandlerAuth(t *testing.T) {
type testcase struct {
// AccessKey configures the key required by the handler to accept requests
AccessKey string

// ReqAccessKey configures the key sent in the request
ReqAccessKey string

// ExpectedCode is the expected HTTP status code
ExpectedCode int
}

tests := map[string]testcase{
"auth disabled": {
AccessKey: "",
ReqAccessKey: "",
ExpectedCode: 200,
},
"auth enabled, valid key": {
AccessKey: "fakekey",
ReqAccessKey: "fakekey",
ExpectedCode: 200,
},
"auth enabled, invalid key": {
AccessKey: "fakekey",
ReqAccessKey: "badkey",
ExpectedCode: 401,
},
"auth enabled, no key": {
AccessKey: "fakekey",
ReqAccessKey: "",
ExpectedCode: 401,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

testReceiver := &receiver{entries: make([]loki.Entry, 0)}
registry := prometheus.NewRegistry()
relabeling := []*relabel.Config{}
incommingTs := false
handler := NewHandler(testReceiver, logger, NewMetrics(registry), relabeling, incommingTs, tc.AccessKey)

body := strings.NewReader(readTestData(t, "testdata/direct_put.json"))
req, err := http.NewRequest("POST", "http://test", body)
req.Header.Set("X-Amz-Firehose-Request-Id", testRequestID)
req.Header.Set("X-Amz-Firehose-Source-Arn", testSourceARN)
req.Header.Set("X-Amz-Firehose-Protocol-Version", "1.0")
req.Header.Set("User-Agent", "Amazon Kinesis Data Firehose Agent/1.0")
if tc.ReqAccessKey != "" {
req.Header.Set("X-Amz-Firehose-Access-Key", tc.ReqAccessKey)
}
require.NoError(t, err)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

require.Equal(t, tc.ExpectedCode, recorder.Code)
})
}
}

const cwLambdaControlMessage = `CWL CONTROL MESSAGE: Checking health of destination Firehose.`

var cwLambdaLogMessages = []string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ The component will start an HTTP server on the configured port and address with

`loki.source.awsfirehose` supports the following arguments:

| Name | Type | Description | Default | Required |
|--------------------------|----------------------|------------------------------------------------------------|---------|----------|
| `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes |
| Name | Type | Description | Default | Required |
| ------------------------ | -------------------- | -------------------------------------------------------------- | ------- | -------- |
| `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes |
| `use_incoming_timestamp` | `bool` | Whether or not to use the timestamp received from the request. | `false` | no |
| `relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no |
| `relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no |
| `access_key` | `secret` | If set, require AWS Firehose to provide a matching key. | `""` | no |

The `relabel_rules` field can make use of the `rules` export value from a
[`loki.relabel`][loki.relabel] component to apply one or more relabeling rules to log entries before they're forwarded
Expand Down

0 comments on commit e23aa30

Please sign in to comment.