Skip to content

Commit

Permalink
Attach Agent ID to remote-write requests (#5999)
Browse files Browse the repository at this point in the history
* pull out agentseed package for reading and storing uuid. Also put it in data dir for flow.

* add uid header to prometheus.remote_write and loki.write

* init func

* cleaner api with fewer edge cases

* add to pyroscope

* compile

* add to static remote write

* add to static mode loki write

* remove return from write. we never need it.

* move loki write out of convert function

* move out of prometheus convert function

* static prom, get out of defaults function

* static logs: take out of defaults function

* constant for header. Work done in init with sync.once. Hardening

* added some tests

* maybe fix tests

* testmain?

* changelog
  • Loading branch information
captncraig authored Jan 2, 2024
1 parent 1cbeff4 commit b1dd499
Show file tree
Hide file tree
Showing 13 changed files with 278 additions and 77 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ Main (unreleased)

- Bump github.com/IBM/sarama from v1.41.2 to v1.42.1

- Attatch unique Agent ID header to remote-write requests. (@captncraig)

v0.38.1 (2023-11-30)
--------------------

Expand Down
2 changes: 2 additions & 0 deletions cmd/grafana-agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/agent/internal/agentseed"
"github.com/grafana/agent/pkg/config"
"github.com/grafana/agent/pkg/logs"
"github.com/grafana/agent/pkg/metrics"
Expand Down Expand Up @@ -98,6 +99,7 @@ func NewEntrypoint(logger *server.Logger, cfg *config.Config, reloader Reloader)
return nil, err
}

agentseed.Init("", logger)
ep.reporter, err = usagestats.NewReporter(logger)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/converter"
convert_diag "github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/internal/agentseed"
"github.com/grafana/agent/pkg/boringcrypto"
"github.com/grafana/agent/pkg/config/instrumentation"
"github.com/grafana/agent/pkg/flow"
Expand Down Expand Up @@ -248,6 +249,7 @@ func (fr *flowRun) Run(configPath string) error {
}

labelService := labelstore.New(l, reg)
agentseed.Init(fr.storagePath, l)

f := flow.New(flow.Options{
Logger: l,
Expand Down
8 changes: 8 additions & 0 deletions component/loki/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/agent/component/common/loki/client"
"github.com/grafana/agent/component/common/loki/limit"
"github.com/grafana/agent/component/common/loki/wal"
"github.com/grafana/agent/internal/agentseed"
)

func init() {
Expand Down Expand Up @@ -159,6 +160,13 @@ func (c *Component) Update(args component.Arguments) error {
}

cfgs := newArgs.convertClientConfigs()
uid := agentseed.Get().UID
for _, cfg := range cfgs {
if cfg.Headers == nil {
cfg.Headers = map[string]string{}
}
cfg.Headers[agentseed.HeaderName] = uid
}
walCfg := wal.Config{
Enabled: newArgs.WAL.Enabled,
MaxSegmentAge: newArgs.WAL.MaxSegmentAge,
Expand Down
8 changes: 8 additions & 0 deletions component/prometheus/remotewrite/remote_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/agent/component"
"github.com/grafana/agent/internal/agentseed"
"github.com/grafana/agent/internal/useragent"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/pkg/metrics/wal"
Expand Down Expand Up @@ -257,6 +258,13 @@ func (c *Component) Update(newConfig component.Arguments) error {
if err != nil {
return err
}
uid := agentseed.Get().UID
for _, cfg := range convertedConfig.RemoteWriteConfigs {
if cfg.Headers == nil {
cfg.Headers = map[string]string{}
}
cfg.Headers[agentseed.HeaderName] = uid
}
err = c.remoteStore.ApplyConfig(convertedConfig)
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion component/prometheus/remotewrite/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func convertConfigs(cfg Arguments) (*config.Config, error) {
if err != nil {
return nil, fmt.Errorf("cannot parse remote_write url %q: %w", rw.URL, err)
}

rwConfigs = append(rwConfigs, &config.RemoteWriteConfig{
URL: &common.URL{URL: parsedURL},
RemoteTimeout: model.Duration(rw.RemoteTimeout),
Expand Down
6 changes: 6 additions & 0 deletions component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/bufbuild/connect-go"
"github.com/grafana/agent/component/pyroscope"
"github.com/grafana/agent/internal/agentseed"
"github.com/grafana/agent/internal/useragent"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/oklog/run"
Expand Down Expand Up @@ -156,7 +157,12 @@ type fanOutClient struct {
// NewFanOut creates a new fan out client that will fan out to all endpoints.
func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) {
clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints))
uid := agentseed.Get().UID
for _, endpoint := range config.Endpoints {
if endpoint.Headers == nil {
endpoint.Headers = map[string]string{}
}
endpoint.Headers[agentseed.HeaderName] = uid
httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name)
if err != nil {
return nil, err
Expand Down
149 changes: 149 additions & 0 deletions internal/agentseed/agentseed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package agentseed

import (
"encoding/json"
"errors"
"os"
"path/filepath"
"runtime"
"sync"
"time"

"github.com/go-kit/log"
"github.com/google/uuid"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/prometheus/common/version"
)

// AgentSeed identifies a unique agent
type AgentSeed struct {
UID string `json:"UID"`
CreatedAt time.Time `json:"created_at"`
Version string `json:"version"`
}

const HeaderName = "X-Agent-Id"
const filename = "agent_seed.json"

var savedSeed *AgentSeed
var once sync.Once

// Init should be called by an app entrypoint as soon as it can to configure where the unique seed will be stored.
// dir is the directory where we will read and store agent_seed.json
// If left empty it will default to $APPDATA or /tmp
// A unique agent seed will be generated when this method is first called, and reused for the lifetime of this agent.
func Init(dir string, l log.Logger) {
if l == nil {
l = log.NewNopLogger()
}
once.Do(func() {
loadOrGenerate(dir, l)
})
}

func loadOrGenerate(dir string, l log.Logger) {
var err error
var seed *AgentSeed
// list of paths in preference order.
// we will always write to the first path
paths := []string{}
if dir != "" {
paths = append(paths, filepath.Join(dir, filename))
}
paths = append(paths, legacyPath())
defer func() {
// as a fallback, gen and save a new uid
if seed == nil || seed.UID == "" {
seed = generateNew()
writeSeedFile(seed, paths[0], l)
}
// Finally save seed
savedSeed = seed
}()
for i, p := range paths {
if fileExists(p) {
if seed, err = readSeedFile(p, l); err == nil {
if i == 0 {
// we found it at the preferred path. Just return it
return
} else {
// it was at a backup path. write it to the preferred path.
writeSeedFile(seed, paths[0], l)
return
}
}
}
}
}

func generateNew() *AgentSeed {
return &AgentSeed{
UID: uuid.NewString(),
Version: version.Version,
CreatedAt: time.Now(),
}
}

// Get will return a unique agent seed for this agent.
// It will always return a valid seed, even if previous attempts to
// load or save the seed file have failed
func Get() *AgentSeed {
// Init should have been called before this. If not, call it now with defaults.
once.Do(func() {
loadOrGenerate("", log.NewNopLogger())
})
if savedSeed != nil {
return savedSeed
}
// we should never get here. But if somehow we do,
// still return a valid seed for this request only
return generateNew()
}

// readSeedFile reads the agent seed file
func readSeedFile(path string, logger log.Logger) (*AgentSeed, error) {
data, err := os.ReadFile(path)
if err != nil {
level.Error(logger).Log("msg", "Reading seed file", "err", err)
return nil, err
}
seed := &AgentSeed{}
err = json.Unmarshal(data, seed)
if err != nil {
level.Error(logger).Log("msg", "Decoding seed file", "err", err)
return nil, err
}

if seed.UID == "" {
level.Error(logger).Log("msg", "Seed file has empty uid")
}
return seed, nil
}

func legacyPath() string {
// windows
if runtime.GOOS == "windows" {
return filepath.Join(os.Getenv("APPDATA"), filename)
}
// linux/mac
return filepath.Join("/tmp", filename)
}

func fileExists(path string) bool {
_, err := os.Stat(path)
return !errors.Is(err, os.ErrNotExist)
}

// writeSeedFile writes the agent seed file
func writeSeedFile(seed *AgentSeed, path string, logger log.Logger) {
data, err := json.Marshal(*seed)
if err != nil {
level.Error(logger).Log("msg", "Encoding seed file", "err", err)
return
}
err = os.WriteFile(path, data, 0644)
if err != nil {
level.Error(logger).Log("msg", "Writing seed file", "err", err)
return
}
}
79 changes: 79 additions & 0 deletions internal/agentseed/agentseed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package agentseed

import (
"os"
"path/filepath"
"sync"
"testing"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
)

func TestMain(m *testing.M) {
os.Remove(legacyPath())
exitVal := m.Run()
os.Exit(exitVal)
}

func reset() {
os.Remove(legacyPath())
savedSeed = nil
once = sync.Once{}
}

func TestNoExistingFile(t *testing.T) {
t.Cleanup(reset)
dir := t.TempDir()
l := log.NewNopLogger()
f := filepath.Join(dir, filename)
require.NoFileExists(t, f)
Init(dir, l)
require.FileExists(t, f)
loaded, err := readSeedFile(f, l)
require.NoError(t, err)
seed := Get()
require.Equal(t, seed.UID, loaded.UID)
}

func TestExistingFile(t *testing.T) {
t.Cleanup(reset)
dir := t.TempDir()
l := log.NewNopLogger()
f := filepath.Join(dir, filename)
seed := generateNew()
writeSeedFile(seed, f, l)
Init(dir, l)
require.NotNil(t, savedSeed)
require.Equal(t, seed.UID, savedSeed.UID)
require.Equal(t, seed.UID, Get().UID)
}

func TestNoInitCalled(t *testing.T) {
t.Cleanup(reset)
l := log.NewNopLogger()
seed := Get()
require.NotNil(t, seed)
f := legacyPath()
require.FileExists(t, f)
loaded, err := readSeedFile(f, l)
require.NoError(t, err)
require.Equal(t, seed.UID, loaded.UID)
}

func TestLegacyExists(t *testing.T) {
t.Cleanup(reset)
dir := t.TempDir()
l := log.NewNopLogger()
f := filepath.Join(dir, filename)
seed := generateNew()
writeSeedFile(seed, legacyPath(), l)
Init(dir, l)
require.FileExists(t, f)
require.NotNil(t, savedSeed)
require.Equal(t, seed.UID, savedSeed.UID)
require.Equal(t, seed.UID, Get().UID)
loaded, err := readSeedFile(f, l)
require.NoError(t, err)
require.Equal(t, seed.UID, loaded.UID)
}
9 changes: 9 additions & 0 deletions pkg/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/internal/agentseed"
"github.com/grafana/agent/internal/useragent"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/loki/clients/pkg/promtail"
Expand Down Expand Up @@ -183,6 +184,14 @@ func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) e
return nil
}

uid := agentseed.Get().UID
for _, cfg := range c.ClientConfigs {
if cfg.Headers == nil {
cfg.Headers = map[string]string{}
}
cfg.Headers[agentseed.HeaderName] = uid
}

clientMetrics := client.NewMetrics(i.reg)
cfg := DefaultConfig()
cfg.Global = config.GlobalConfig{
Expand Down
Loading

0 comments on commit b1dd499

Please sign in to comment.