From b1dd4994d632648896d2784c4f63bf4d62dad06b Mon Sep 17 00:00:00 2001 From: Craig Peterson <192540+captncraig@users.noreply.github.com> Date: Tue, 2 Jan 2024 13:38:01 -0500 Subject: [PATCH] Attach Agent ID to remote-write requests (#5999) * pull out agentseed package for reading and storing uuid. Also put it in data dir for flow. * add uid header to prometheus.remote_write and loki.write * init func * cleaner api with fewer edge cases * add to pyroscope * compile * add to static remote write * add to static mode loki write * remove return from write. we never need it. * move loki write out of convert function * move out of prometheus convert function * static prom, get out of defaults function * static logs: take out of defaults function * constant for header. Work done in init with sync.once. Hardening * added some tests * maybe fix tests * testmain? * changelog --- CHANGELOG.md | 2 + cmd/grafana-agent/entrypoint.go | 2 + cmd/internal/flowmode/cmd_run.go | 2 + component/loki/write/write.go | 8 + .../prometheus/remotewrite/remote_write.go | 8 + component/prometheus/remotewrite/types.go | 1 - component/pyroscope/write/write.go | 6 + internal/agentseed/agentseed.go | 149 ++++++++++++++++++ internal/agentseed/agentseed_test.go | 79 ++++++++++ pkg/logs/logs.go | 9 ++ pkg/metrics/instance/instance.go | 11 +- pkg/usagestats/reporter.go | 75 +-------- pkg/usagestats/stats.go | 3 +- 13 files changed, 278 insertions(+), 77 deletions(-) create mode 100644 internal/agentseed/agentseed.go create mode 100644 internal/agentseed/agentseed_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 1db88fa58fe6..3b83ee4d268c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,8 @@ Main (unreleased) - Bump github.com/IBM/sarama from v1.41.2 to v1.42.1 +- Attatch unique Agent ID header to remote-write requests. (@captncraig) + v0.38.1 (2023-11-30) -------------------- diff --git a/cmd/grafana-agent/entrypoint.go b/cmd/grafana-agent/entrypoint.go index c0b36845d330..172bbf8c58bb 100644 --- a/cmd/grafana-agent/entrypoint.go +++ b/cmd/grafana-agent/entrypoint.go @@ -17,6 +17,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gorilla/mux" + "github.com/grafana/agent/internal/agentseed" "github.com/grafana/agent/pkg/config" "github.com/grafana/agent/pkg/logs" "github.com/grafana/agent/pkg/metrics" @@ -98,6 +99,7 @@ func NewEntrypoint(logger *server.Logger, cfg *config.Config, reloader Reloader) return nil, err } + agentseed.Init("", logger) ep.reporter, err = usagestats.NewReporter(logger) if err != nil { return nil, err diff --git a/cmd/internal/flowmode/cmd_run.go b/cmd/internal/flowmode/cmd_run.go index 4f319b61bb5a..c8618b928b85 100644 --- a/cmd/internal/flowmode/cmd_run.go +++ b/cmd/internal/flowmode/cmd_run.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/agent/component" "github.com/grafana/agent/converter" convert_diag "github.com/grafana/agent/converter/diag" + "github.com/grafana/agent/internal/agentseed" "github.com/grafana/agent/pkg/boringcrypto" "github.com/grafana/agent/pkg/config/instrumentation" "github.com/grafana/agent/pkg/flow" @@ -248,6 +249,7 @@ func (fr *flowRun) Run(configPath string) error { } labelService := labelstore.New(l, reg) + agentseed.Init(fr.storagePath, l) f := flow.New(flow.Options{ Logger: l, diff --git a/component/loki/write/write.go b/component/loki/write/write.go index a31cb0745976..65fd04c6f692 100644 --- a/component/loki/write/write.go +++ b/component/loki/write/write.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/agent/component/common/loki/client" "github.com/grafana/agent/component/common/loki/limit" "github.com/grafana/agent/component/common/loki/wal" + "github.com/grafana/agent/internal/agentseed" ) func init() { @@ -159,6 +160,13 @@ func (c *Component) Update(args component.Arguments) error { } cfgs := newArgs.convertClientConfigs() + uid := agentseed.Get().UID + for _, cfg := range cfgs { + if cfg.Headers == nil { + cfg.Headers = map[string]string{} + } + cfg.Headers[agentseed.HeaderName] = uid + } walCfg := wal.Config{ Enabled: newArgs.WAL.Enabled, MaxSegmentAge: newArgs.WAL.MaxSegmentAge, diff --git a/component/prometheus/remotewrite/remote_write.go b/component/prometheus/remotewrite/remote_write.go index e337ff8f8cca..354e3248450b 100644 --- a/component/prometheus/remotewrite/remote_write.go +++ b/component/prometheus/remotewrite/remote_write.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/agent/component" + "github.com/grafana/agent/internal/agentseed" "github.com/grafana/agent/internal/useragent" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/grafana/agent/pkg/metrics/wal" @@ -257,6 +258,13 @@ func (c *Component) Update(newConfig component.Arguments) error { if err != nil { return err } + uid := agentseed.Get().UID + for _, cfg := range convertedConfig.RemoteWriteConfigs { + if cfg.Headers == nil { + cfg.Headers = map[string]string{} + } + cfg.Headers[agentseed.HeaderName] = uid + } err = c.remoteStore.ApplyConfig(convertedConfig) if err != nil { return err diff --git a/component/prometheus/remotewrite/types.go b/component/prometheus/remotewrite/types.go index 39ef6a55a191..473a2928e246 100644 --- a/component/prometheus/remotewrite/types.go +++ b/component/prometheus/remotewrite/types.go @@ -231,7 +231,6 @@ func convertConfigs(cfg Arguments) (*config.Config, error) { if err != nil { return nil, fmt.Errorf("cannot parse remote_write url %q: %w", rw.URL, err) } - rwConfigs = append(rwConfigs, &config.RemoteWriteConfig{ URL: &common.URL{URL: parsedURL}, RemoteTimeout: model.Duration(rw.RemoteTimeout), diff --git a/component/pyroscope/write/write.go b/component/pyroscope/write/write.go index 165a7f20a812..4c20797a611c 100644 --- a/component/pyroscope/write/write.go +++ b/component/pyroscope/write/write.go @@ -8,6 +8,7 @@ import ( "github.com/bufbuild/connect-go" "github.com/grafana/agent/component/pyroscope" + "github.com/grafana/agent/internal/agentseed" "github.com/grafana/agent/internal/useragent" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/oklog/run" @@ -156,7 +157,12 @@ type fanOutClient struct { // NewFanOut creates a new fan out client that will fan out to all endpoints. func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) { clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints)) + uid := agentseed.Get().UID for _, endpoint := range config.Endpoints { + if endpoint.Headers == nil { + endpoint.Headers = map[string]string{} + } + endpoint.Headers[agentseed.HeaderName] = uid httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) if err != nil { return nil, err diff --git a/internal/agentseed/agentseed.go b/internal/agentseed/agentseed.go new file mode 100644 index 000000000000..2405c732d9c5 --- /dev/null +++ b/internal/agentseed/agentseed.go @@ -0,0 +1,149 @@ +package agentseed + +import ( + "encoding/json" + "errors" + "os" + "path/filepath" + "runtime" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/google/uuid" + "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/prometheus/common/version" +) + +// AgentSeed identifies a unique agent +type AgentSeed struct { + UID string `json:"UID"` + CreatedAt time.Time `json:"created_at"` + Version string `json:"version"` +} + +const HeaderName = "X-Agent-Id" +const filename = "agent_seed.json" + +var savedSeed *AgentSeed +var once sync.Once + +// Init should be called by an app entrypoint as soon as it can to configure where the unique seed will be stored. +// dir is the directory where we will read and store agent_seed.json +// If left empty it will default to $APPDATA or /tmp +// A unique agent seed will be generated when this method is first called, and reused for the lifetime of this agent. +func Init(dir string, l log.Logger) { + if l == nil { + l = log.NewNopLogger() + } + once.Do(func() { + loadOrGenerate(dir, l) + }) +} + +func loadOrGenerate(dir string, l log.Logger) { + var err error + var seed *AgentSeed + // list of paths in preference order. + // we will always write to the first path + paths := []string{} + if dir != "" { + paths = append(paths, filepath.Join(dir, filename)) + } + paths = append(paths, legacyPath()) + defer func() { + // as a fallback, gen and save a new uid + if seed == nil || seed.UID == "" { + seed = generateNew() + writeSeedFile(seed, paths[0], l) + } + // Finally save seed + savedSeed = seed + }() + for i, p := range paths { + if fileExists(p) { + if seed, err = readSeedFile(p, l); err == nil { + if i == 0 { + // we found it at the preferred path. Just return it + return + } else { + // it was at a backup path. write it to the preferred path. + writeSeedFile(seed, paths[0], l) + return + } + } + } + } +} + +func generateNew() *AgentSeed { + return &AgentSeed{ + UID: uuid.NewString(), + Version: version.Version, + CreatedAt: time.Now(), + } +} + +// Get will return a unique agent seed for this agent. +// It will always return a valid seed, even if previous attempts to +// load or save the seed file have failed +func Get() *AgentSeed { + // Init should have been called before this. If not, call it now with defaults. + once.Do(func() { + loadOrGenerate("", log.NewNopLogger()) + }) + if savedSeed != nil { + return savedSeed + } + // we should never get here. But if somehow we do, + // still return a valid seed for this request only + return generateNew() +} + +// readSeedFile reads the agent seed file +func readSeedFile(path string, logger log.Logger) (*AgentSeed, error) { + data, err := os.ReadFile(path) + if err != nil { + level.Error(logger).Log("msg", "Reading seed file", "err", err) + return nil, err + } + seed := &AgentSeed{} + err = json.Unmarshal(data, seed) + if err != nil { + level.Error(logger).Log("msg", "Decoding seed file", "err", err) + return nil, err + } + + if seed.UID == "" { + level.Error(logger).Log("msg", "Seed file has empty uid") + } + return seed, nil +} + +func legacyPath() string { + // windows + if runtime.GOOS == "windows" { + return filepath.Join(os.Getenv("APPDATA"), filename) + } + // linux/mac + return filepath.Join("/tmp", filename) +} + +func fileExists(path string) bool { + _, err := os.Stat(path) + return !errors.Is(err, os.ErrNotExist) +} + +// writeSeedFile writes the agent seed file +func writeSeedFile(seed *AgentSeed, path string, logger log.Logger) { + data, err := json.Marshal(*seed) + if err != nil { + level.Error(logger).Log("msg", "Encoding seed file", "err", err) + return + } + err = os.WriteFile(path, data, 0644) + if err != nil { + level.Error(logger).Log("msg", "Writing seed file", "err", err) + return + } +} diff --git a/internal/agentseed/agentseed_test.go b/internal/agentseed/agentseed_test.go new file mode 100644 index 000000000000..91650c59e7d1 --- /dev/null +++ b/internal/agentseed/agentseed_test.go @@ -0,0 +1,79 @@ +package agentseed + +import ( + "os" + "path/filepath" + "sync" + "testing" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" +) + +func TestMain(m *testing.M) { + os.Remove(legacyPath()) + exitVal := m.Run() + os.Exit(exitVal) +} + +func reset() { + os.Remove(legacyPath()) + savedSeed = nil + once = sync.Once{} +} + +func TestNoExistingFile(t *testing.T) { + t.Cleanup(reset) + dir := t.TempDir() + l := log.NewNopLogger() + f := filepath.Join(dir, filename) + require.NoFileExists(t, f) + Init(dir, l) + require.FileExists(t, f) + loaded, err := readSeedFile(f, l) + require.NoError(t, err) + seed := Get() + require.Equal(t, seed.UID, loaded.UID) +} + +func TestExistingFile(t *testing.T) { + t.Cleanup(reset) + dir := t.TempDir() + l := log.NewNopLogger() + f := filepath.Join(dir, filename) + seed := generateNew() + writeSeedFile(seed, f, l) + Init(dir, l) + require.NotNil(t, savedSeed) + require.Equal(t, seed.UID, savedSeed.UID) + require.Equal(t, seed.UID, Get().UID) +} + +func TestNoInitCalled(t *testing.T) { + t.Cleanup(reset) + l := log.NewNopLogger() + seed := Get() + require.NotNil(t, seed) + f := legacyPath() + require.FileExists(t, f) + loaded, err := readSeedFile(f, l) + require.NoError(t, err) + require.Equal(t, seed.UID, loaded.UID) +} + +func TestLegacyExists(t *testing.T) { + t.Cleanup(reset) + dir := t.TempDir() + l := log.NewNopLogger() + f := filepath.Join(dir, filename) + seed := generateNew() + writeSeedFile(seed, legacyPath(), l) + Init(dir, l) + require.FileExists(t, f) + require.NotNil(t, savedSeed) + require.Equal(t, seed.UID, savedSeed.UID) + require.Equal(t, seed.UID, Get().UID) + loaded, err := readSeedFile(f, l) + require.NoError(t, err) + require.Equal(t, seed.UID, loaded.UID) +} diff --git a/pkg/logs/logs.go b/pkg/logs/logs.go index a821298be9fd..118c1a75bf58 100644 --- a/pkg/logs/logs.go +++ b/pkg/logs/logs.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/agent/internal/agentseed" "github.com/grafana/agent/internal/useragent" "github.com/grafana/agent/pkg/util" "github.com/grafana/loki/clients/pkg/promtail" @@ -183,6 +184,14 @@ func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) e return nil } + uid := agentseed.Get().UID + for _, cfg := range c.ClientConfigs { + if cfg.Headers == nil { + cfg.Headers = map[string]string{} + } + cfg.Headers[agentseed.HeaderName] = uid + } + clientMetrics := client.NewMetrics(i.reg) cfg := DefaultConfig() cfg.Global = config.GlobalConfig{ diff --git a/pkg/metrics/instance/instance.go b/pkg/metrics/instance/instance.go index 3d0e9fd47ca7..eae93936be2f 100644 --- a/pkg/metrics/instance/instance.go +++ b/pkg/metrics/instance/instance.go @@ -17,6 +17,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/agent/internal/agentseed" "github.com/grafana/agent/internal/useragent" "github.com/grafana/agent/pkg/metrics/wal" "github.com/grafana/agent/pkg/util" @@ -157,7 +158,6 @@ func (c *Config) ApplyDefaults(global GlobalConfig) error { } rwNames := map[string]struct{}{} - // If the instance remote write is not filled in, then apply the prometheus write config if len(c.RemoteWrite) == 0 { c.RemoteWrite = c.global.RemoteWrite @@ -166,7 +166,6 @@ func (c *Config) ApplyDefaults(global GlobalConfig) error { if cfg == nil { return fmt.Errorf("empty or null remote write config section") } - // Typically Prometheus ignores empty names here, but we need to assign a // unique name to the config so we can pull metrics from it when running // an instance. @@ -183,7 +182,6 @@ func (c *Config) ApplyDefaults(global GlobalConfig) error { cfg.Name = c.Name + "-" + hash[:6] generatedName = true } - if _, exists := rwNames[cfg.Name]; exists { if generatedName { return fmt.Errorf("found two identical remote_write configs") @@ -419,6 +417,13 @@ func (i *Instance) initialize(ctx context.Context, reg prometheus.Registerer, cf // Set up the remote storage remoteLogger := log.With(i.logger, "component", "remote") i.remoteStore = remote.NewStorage(remoteLogger, reg, i.wal.StartTime, i.wal.Directory(), cfg.RemoteFlushDeadline, i.readyScrapeManager) + uid := agentseed.Get().UID + for _, rw := range cfg.RemoteWrite { + if rw.Headers == nil { + rw.Headers = map[string]string{} + } + rw.Headers[agentseed.HeaderName] = uid + } err = i.remoteStore.ApplyConfig(&config.Config{ GlobalConfig: cfg.global.Prometheus, RemoteWriteConfigs: cfg.RemoteWrite, diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index 2c1ac8fa43ba..a175e154b92a 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -2,20 +2,14 @@ package usagestats import ( "context" - "encoding/json" - "errors" "math" - "os" - "path/filepath" - "runtime" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/google/uuid" + "github.com/grafana/agent/internal/agentseed" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/multierror" - "github.com/prometheus/common/version" ) var ( @@ -27,17 +21,10 @@ var ( type Reporter struct { logger log.Logger - agentSeed *AgentSeed + agentSeed *agentseed.AgentSeed lastReport time.Time } -// AgentSeed identifies a unique agent -type AgentSeed struct { - UID string `json:"UID"` - CreatedAt time.Time `json:"created_at"` - Version string `json:"version"` -} - // NewReporter creates a Reporter that will send periodically reports to grafana.com func NewReporter(logger log.Logger) (*Reporter, error) { r := &Reporter{ @@ -46,66 +33,10 @@ func NewReporter(logger log.Logger) (*Reporter, error) { return r, nil } -func (rep *Reporter) init(ctx context.Context) error { - path := agentSeedFileName() - - if fileExists(path) { - seed, err := rep.readSeedFile(path) - rep.agentSeed = seed - return err - } - rep.agentSeed = &AgentSeed{ - UID: uuid.NewString(), - Version: version.Version, - CreatedAt: time.Now(), - } - return rep.writeSeedFile(*rep.agentSeed, path) -} - -func fileExists(path string) bool { - _, err := os.Stat(path) - return !errors.Is(err, os.ErrNotExist) -} - -// readSeedFile reads the agent seed file -func (rep *Reporter) readSeedFile(path string) (*AgentSeed, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, err - } - seed := &AgentSeed{} - err = json.Unmarshal(data, seed) - if err != nil { - return nil, err - } - return seed, nil -} - -// writeSeedFile writes the agent seed file -func (rep *Reporter) writeSeedFile(seed AgentSeed, path string) error { - data, err := json.Marshal(seed) - if err != nil { - return err - } - return os.WriteFile(path, data, 0644) -} - -func agentSeedFileName() string { - if runtime.GOOS == "windows" { - return filepath.Join(os.Getenv("APPDATA"), "agent_seed.json") - } - // linux/mac - return "/tmp/agent_seed.json" -} - // Start inits the reporter seed and start sending report for every interval func (rep *Reporter) Start(ctx context.Context, metricsFunc func() map[string]interface{}) error { level.Info(rep.logger).Log("msg", "running usage stats reporter") - err := rep.init(ctx) - if err != nil { - level.Info(rep.logger).Log("msg", "failed to init seed", "err", err) - return err - } + rep.agentSeed = agentseed.Get() // check every minute if we should report. ticker := time.NewTicker(reportCheckInterval) diff --git a/pkg/usagestats/stats.go b/pkg/usagestats/stats.go index d3418a5c4bde..004bc6e6d0e6 100644 --- a/pkg/usagestats/stats.go +++ b/pkg/usagestats/stats.go @@ -10,6 +10,7 @@ import ( "runtime" "time" + "github.com/grafana/agent/internal/agentseed" "github.com/grafana/agent/internal/useragent" "github.com/prometheus/common/version" ) @@ -31,7 +32,7 @@ type Report struct { DeployMode string `json:"deployMode"` } -func sendReport(ctx context.Context, seed *AgentSeed, interval time.Time, metrics map[string]interface{}) error { +func sendReport(ctx context.Context, seed *agentseed.AgentSeed, interval time.Time, metrics map[string]interface{}) error { report := Report{ UsageStatsID: seed.UID, CreatedAt: seed.CreatedAt,