From 2e95228cf6cc48ce16906b7ad8aa98369344f44a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BB=97=20Tr=E1=BB=8Dng=20H=E1=BA=A3i?= <41283691+hainenber@users.noreply.github.com> Date: Fri, 5 Apr 2024 02:43:30 +0700 Subject: [PATCH] feat(faro/receiver): propagate request metadata to downstream consumers (#6515) Signed-off-by: hainenber --- CHANGELOG.md | 4 +- .../reference/components/faro.receiver.md | 1 + internal/component/faro/receiver/arguments.go | 3 +- internal/component/faro/receiver/handler.go | 8 + .../faro/receiver/receiver_otelcol_test.go | 166 ++++++++++++++++++ .../component/faro/receiver/receiver_test.go | 17 +- 6 files changed, 192 insertions(+), 7 deletions(-) create mode 100644 internal/component/faro/receiver/receiver_otelcol_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d74e2c156e4b..ded6410f6b67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,8 @@ Main (unreleased) - Add automatic conversion for `legacy_positions_file` in component `loki.source.file`. (@mattdurham) +- Propagate request metadata for `faro.receiver` to downstream components. (@hainenber) + ### Features - A new `loki.rules.kubernetes` component that discovers `PrometheusRule` Kubernetes resources and loads them into a Loki Ruler instance. (@EStork09) @@ -75,7 +77,7 @@ Main (unreleased) whenever that argument is explicitly configured. This issue only affected a small subset of arguments across 15 components. (@erikbaranowski, @rfratto) -- Fix panic when fanning out to invalid receivers. (@hainenber) +- Fix panic when fanning out to invalid receivers. (@hainenber) - Fix a bug where a panic could occur when reloading custom components. (@wildum) diff --git a/docs/sources/flow/reference/components/faro.receiver.md b/docs/sources/flow/reference/components/faro.receiver.md index 36e37fa5fce3..7644f4035309 100644 --- a/docs/sources/flow/reference/components/faro.receiver.md +++ b/docs/sources/flow/reference/components/faro.receiver.md @@ -66,6 +66,7 @@ Name | Type | Description | Default | Required `cors_allowed_origins` | `list(string)` | Origins for which cross-origin requests are permitted. | `[]` | no `api_key` | `secret` | Optional API key to validate client requests with. | `""` | no `max_allowed_payload_size` | `string` | Maximum size (in bytes) for client requests. | `"5MiB"` | no +`include_metadata` | `boolean` | Propagate incoming connection metadata to downstream consumers. | `false` | no By default, telemetry data is only accepted from applications on the same local network as the browser. To accept telemetry data from a wider set of clients, diff --git a/internal/component/faro/receiver/arguments.go b/internal/component/faro/receiver/arguments.go index 0169f0e80e2c..1513fb624baf 100644 --- a/internal/component/faro/receiver/arguments.go +++ b/internal/component/faro/receiver/arguments.go @@ -36,7 +36,8 @@ type ServerArguments struct { APIKey rivertypes.Secret `river:"api_key,attr,optional"` MaxAllowedPayloadSize units.Base2Bytes `river:"max_allowed_payload_size,attr,optional"` - RateLimiting RateLimitingArguments `river:"rate_limiting,block,optional"` + RateLimiting RateLimitingArguments `river:"rate_limiting,block,optional"` + IncludeMetadata bool `river:"include_metadata,attr,optional"` } func (s *ServerArguments) SetToDefault() { diff --git a/internal/component/faro/receiver/handler.go b/internal/component/faro/receiver/handler.go index e6207e511a2d..b3b8c7fe4c51 100644 --- a/internal/component/faro/receiver/handler.go +++ b/internal/component/faro/receiver/handler.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/agent/internal/flow/logging/level" "github.com/prometheus/client_golang/prometheus" "github.com/rs/cors" + "go.opentelemetry.io/collector/client" "golang.org/x/time/rate" ) @@ -80,6 +81,13 @@ func (h *handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { h.argsMut.RLock() defer h.argsMut.RUnlock() + // Propagate request headers as metadata + if h.args.IncludeMetadata { + cl := client.FromContext(req.Context()) + cl.Metadata = client.NewMetadata(req.Header.Clone()) + req = req.WithContext(client.NewContext(req.Context(), cl)) + } + if h.cors != nil { h.cors.ServeHTTP(rw, req, h.handleRequest) } else { diff --git a/internal/component/faro/receiver/receiver_otelcol_test.go b/internal/component/faro/receiver/receiver_otelcol_test.go new file mode 100644 index 000000000000..1747d6f325b2 --- /dev/null +++ b/internal/component/faro/receiver/receiver_otelcol_test.go @@ -0,0 +1,166 @@ +//go:build !race + +package receiver + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/grafana/agent/internal/component/otelcol" + "github.com/grafana/agent/internal/component/otelcol/auth" + "github.com/grafana/agent/internal/component/otelcol/auth/headers" + otlphttp "github.com/grafana/agent/internal/component/otelcol/exporter/otlphttp" + "github.com/grafana/agent/internal/flow/componenttest" + "github.com/grafana/agent/internal/util" + "github.com/phayes/freeport" + "github.com/stretchr/testify/require" +) + +func TestWithOtelcolConsumer(t *testing.T) { + ctx := componenttest.TestContext(t) + + faroReceiver, err := componenttest.NewControllerFromID( + util.TestLogger(t), + "faro.receiver", + ) + require.NoError(t, err) + faroReceiverPort, err := freeport.GetFreePort() + require.NoError(t, err) + + otelcolAuthHeader, err := componenttest.NewControllerFromID( + util.TestLogger(t), + "otelcol.auth.headers", + ) + require.NoError(t, err) + + otelcolExporter, err := componenttest.NewControllerFromID( + util.TestLogger(t), + "otelcol.exporter.otlphttp", + ) + require.NoError(t, err) + + doneChan := make(chan struct{}) + finalOtelServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "TENANTID", r.Header.Get("X-Scope-OrgId")) + close(doneChan) + w.WriteHeader(http.StatusOK) + })) + defer finalOtelServer.Close() + + tenantId := "Tenant-Id" + go func() { + err := otelcolAuthHeader.Run(ctx, headers.Arguments{ + Headers: []headers.Header{ + { + Key: "X-Scope-OrgId", + FromContext: &tenantId, + Action: headers.ActionUpsert, + }, + }, + }) + require.NoError(t, err) + }() + + require.NoError(t, otelcolAuthHeader.WaitRunning(time.Second), "otelco.auth.headers never started") + require.NoError(t, otelcolAuthHeader.WaitExports(time.Second), "otelco.auth.headers never exported anything") + otelcolAuthHeaderExport, ok := otelcolAuthHeader.Exports().(auth.Exports) + require.True(t, ok) + + go func() { + err := otelcolExporter.Run(ctx, otlphttp.Arguments{ + Client: otlphttp.HTTPClientArguments(otelcol.HTTPClientArguments{ + Endpoint: finalOtelServer.URL, + Auth: &otelcolAuthHeaderExport.Handler, + TLS: otelcol.TLSClientArguments{ + Insecure: true, + InsecureSkipVerify: true, + }, + }), + Encoding: otlphttp.EncodingJSON, + }) + require.NoError(t, err) + }() + + require.NoError(t, otelcolExporter.WaitRunning(time.Second), "otelco.exporter.otlphttp never started") + require.NoError(t, otelcolExporter.WaitExports(time.Second), "otelco.exporter.otlphttp never exported anything") + otelcolExporterExport, ok := otelcolExporter.Exports().(otelcol.ConsumerExports) + require.True(t, ok) + + go func() { + err := faroReceiver.Run(ctx, Arguments{ + LogLabels: map[string]string{ + "foo": "bar", + }, + + Server: ServerArguments{ + Host: "127.0.0.1", + Port: faroReceiverPort, + IncludeMetadata: true, + }, + + Output: OutputArguments{ + Traces: []otelcol.Consumer{otelcolExporterExport.Input}, + }, + }) + require.NoError(t, err) + }() + + // Wait for the server to be running. + util.Eventually(t, func(t require.TestingT) { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/-/ready", faroReceiverPort)) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusOK, resp.StatusCode) + }) + + // Send a sample payload to the server. + req, err := http.NewRequest( + "POST", + fmt.Sprintf("http://localhost:%d/collect", faroReceiverPort), + strings.NewReader(`{ + "traces": { + "resourceSpans": [{ + "scope_spans": [{ + "spans": [{ + "name": "TestSpan" + }] + }] + }] + }, + "logs": [{ + "message": "hello, world", + "level": "info", + "context": {"env": "dev"}, + "timestamp": "2021-01-01T00:00:00Z", + "trace": { + "trace_id": "0", + "span_id": "0" + } + }], + "exceptions": [], + "measurements": [], + "meta": {} + }`), + ) + require.NoError(t, err) + + req.Header.Add(tenantId, "TENANTID") + req.Header.Add("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusAccepted, resp.StatusCode) + select { + case <-doneChan: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for updates to finish") + } +} diff --git a/internal/component/faro/receiver/receiver_test.go b/internal/component/faro/receiver/receiver_test.go index 4e6e78e04e9d..8e4de91705f7 100644 --- a/internal/component/faro/receiver/receiver_test.go +++ b/internal/component/faro/receiver/receiver_test.go @@ -40,8 +40,9 @@ func Test(t *testing.T) { }, Server: ServerArguments{ - Host: "127.0.0.1", - Port: freePort, + Host: "127.0.0.1", + Port: freePort, + IncludeMetadata: true, }, Output: OutputArguments{ @@ -62,9 +63,9 @@ func Test(t *testing.T) { }) // Send a sample payload to the server. - resp, err := http.Post( + req, err := http.NewRequest( + "POST", fmt.Sprintf("http://localhost:%d/collect", freePort), - "application/json", strings.NewReader(`{ "traces": { "resourceSpans": [] @@ -85,6 +86,13 @@ func Test(t *testing.T) { }`), ) require.NoError(t, err) + + req.Header.Add("Tenant-Id", "TENANTID") + req.Header.Add("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) defer resp.Body.Close() require.Equal(t, http.StatusAccepted, resp.StatusCode) @@ -124,7 +132,6 @@ func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver { case <-ctx.Done(): return case ent := <-lr.Chan(): - lr.entriesMut.Lock() lr.entries = append(lr.entries, loki.Entry{ Labels: ent.Labels,