Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attach Agent ID to remote-write requests #5999

Merged
merged 18 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ Main (unreleased)

- Bump github.com/IBM/sarama from v1.41.2 to v1.42.1

- Attatch unique Agent ID header to remote-write requests. (@captncraig)

v0.38.1 (2023-11-30)
--------------------

Expand Down
2 changes: 2 additions & 0 deletions cmd/grafana-agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/agent/internal/agentseed"
"github.com/grafana/agent/pkg/config"
"github.com/grafana/agent/pkg/logs"
"github.com/grafana/agent/pkg/metrics"
Expand Down Expand Up @@ -98,6 +99,7 @@ func NewEntrypoint(logger *server.Logger, cfg *config.Config, reloader Reloader)
return nil, err
}

agentseed.Init("", logger)
ep.reporter, err = usagestats.NewReporter(logger)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/converter"
convert_diag "github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/internal/agentseed"
"github.com/grafana/agent/pkg/boringcrypto"
"github.com/grafana/agent/pkg/config/instrumentation"
"github.com/grafana/agent/pkg/flow"
Expand Down Expand Up @@ -246,6 +247,7 @@ func (fr *flowRun) Run(configPath string) error {
}

labelService := labelstore.New(l, reg)
agentseed.Init(fr.storagePath, l)

f := flow.New(flow.Options{
Logger: l,
Expand Down
8 changes: 8 additions & 0 deletions component/loki/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/agent/component/common/loki/client"
"github.com/grafana/agent/component/common/loki/limit"
"github.com/grafana/agent/component/common/loki/wal"
"github.com/grafana/agent/internal/agentseed"
)

func init() {
Expand Down Expand Up @@ -159,6 +160,13 @@ func (c *Component) Update(args component.Arguments) error {
}

cfgs := newArgs.convertClientConfigs()
uid := agentseed.Get().UID
for _, cfg := range cfgs {
if cfg.Headers == nil {
cfg.Headers = map[string]string{}
}
cfg.Headers[agentseed.HeaderName] = uid
}
walCfg := wal.Config{
Enabled: newArgs.WAL.Enabled,
MaxSegmentAge: newArgs.WAL.MaxSegmentAge,
Expand Down
8 changes: 8 additions & 0 deletions component/prometheus/remotewrite/remote_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/agent/component"
"github.com/grafana/agent/internal/agentseed"
"github.com/grafana/agent/internal/useragent"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/pkg/metrics/wal"
Expand Down Expand Up @@ -257,6 +258,13 @@ func (c *Component) Update(newConfig component.Arguments) error {
if err != nil {
return err
}
uid := agentseed.Get().UID
for _, cfg := range convertedConfig.RemoteWriteConfigs {
if cfg.Headers == nil {
cfg.Headers = map[string]string{}
}
cfg.Headers[agentseed.HeaderName] = uid
}
err = c.remoteStore.ApplyConfig(convertedConfig)
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion component/prometheus/remotewrite/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func convertConfigs(cfg Arguments) (*config.Config, error) {
if err != nil {
return nil, fmt.Errorf("cannot parse remote_write url %q: %w", rw.URL, err)
}

rwConfigs = append(rwConfigs, &config.RemoteWriteConfig{
URL: &common.URL{URL: parsedURL},
RemoteTimeout: model.Duration(rw.RemoteTimeout),
Expand Down
6 changes: 6 additions & 0 deletions component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/bufbuild/connect-go"
"github.com/grafana/agent/component/pyroscope"
"github.com/grafana/agent/internal/agentseed"
"github.com/grafana/agent/internal/useragent"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/oklog/run"
Expand Down Expand Up @@ -156,7 +157,12 @@ type fanOutClient struct {
// NewFanOut creates a new fan out client that will fan out to all endpoints.
func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) {
clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints))
uid := agentseed.Get().UID
for _, endpoint := range config.Endpoints {
if endpoint.Headers == nil {
endpoint.Headers = map[string]string{}
}
endpoint.Headers[agentseed.HeaderName] = uid
httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name)
if err != nil {
return nil, err
Expand Down
149 changes: 149 additions & 0 deletions internal/agentseed/agentseed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package agentseed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to make this work for otelcol.exporter.* components?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear at present. I'd assume there's some mechanism for including metadata, even though it may not be over http. Needs more research.


import (
"encoding/json"
"errors"
"os"
"path/filepath"
"runtime"
"sync"
"time"

"github.com/go-kit/log"
"github.com/google/uuid"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/prometheus/common/version"
)

// AgentSeed identifies a unique agent
type AgentSeed struct {
UID string `json:"UID"`
CreatedAt time.Time `json:"created_at"`
Version string `json:"version"`
}

const HeaderName = "X-Agent-Id"
const filename = "agent_seed.json"

var savedSeed *AgentSeed
var once sync.Once

// Init should be called by an app entrypoint as soon as it can to configure where the unique seed will be stored.
// dir is the directory where we will read and store agent_seed.json
// If left empty it will default to $APPDATA or /tmp
// A unique agent seed will be generated when this method is first called, and reused for the lifetime of this agent.
func Init(dir string, l log.Logger) {
if l == nil {
l = log.NewNopLogger()
}
once.Do(func() {
loadOrGenerate(dir, l)
})
}

func loadOrGenerate(dir string, l log.Logger) {
var err error
var seed *AgentSeed
// list of paths in preference order.
// we will always write to the first path
paths := []string{}
if dir != "" {
paths = append(paths, filepath.Join(dir, filename))
}
paths = append(paths, legacyPath())
defer func() {
// as a fallback, gen and save a new uid
if seed == nil || seed.UID == "" {
seed = generateNew()
writeSeedFile(seed, paths[0], l)
}
// Finally save seed
savedSeed = seed
}()
for i, p := range paths {
if fileExists(p) {
if seed, err = readSeedFile(p, l); err == nil {
if i == 0 {
// we found it at the preferred path. Just return it
return
} else {
// it was at a backup path. write it to the preferred path.
writeSeedFile(seed, paths[0], l)
return
}
}
}
}
}

func generateNew() *AgentSeed {
return &AgentSeed{
UID: uuid.NewString(),
Version: version.Version,
CreatedAt: time.Now(),
}
}

// Get will return a unique agent seed for this agent.
// It will always return a valid seed, even if previous attempts to
// load or save the seed file have failed
func Get() *AgentSeed {
// Init should have been called before this. If not, call it now with defaults.
once.Do(func() {
loadOrGenerate("", log.NewNopLogger())
})
if savedSeed != nil {
return savedSeed
}
// we should never get here. But if somehow we do,
// still return a valid seed for this request only
return generateNew()
}

// readSeedFile reads the agent seed file
func readSeedFile(path string, logger log.Logger) (*AgentSeed, error) {
data, err := os.ReadFile(path)
if err != nil {
level.Error(logger).Log("msg", "Reading seed file", "err", err)
return nil, err
}
seed := &AgentSeed{}
err = json.Unmarshal(data, seed)
if err != nil {
level.Error(logger).Log("msg", "Decoding seed file", "err", err)
return nil, err
}

if seed.UID == "" {
level.Error(logger).Log("msg", "Seed file has empty uid")
}
return seed, nil
}

func legacyPath() string {
// windows
if runtime.GOOS == "windows" {
return filepath.Join(os.Getenv("APPDATA"), filename)
}
// linux/mac
return filepath.Join("/tmp", filename)
}

func fileExists(path string) bool {
_, err := os.Stat(path)
return !errors.Is(err, os.ErrNotExist)
}

// writeSeedFile writes the agent seed file
func writeSeedFile(seed *AgentSeed, path string, logger log.Logger) {
data, err := json.Marshal(*seed)
if err != nil {
level.Error(logger).Log("msg", "Encoding seed file", "err", err)
return
}
err = os.WriteFile(path, data, 0644)
if err != nil {
level.Error(logger).Log("msg", "Writing seed file", "err", err)
return
}
}
79 changes: 79 additions & 0 deletions internal/agentseed/agentseed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package agentseed

import (
"os"
"path/filepath"
"sync"
"testing"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
)

func TestMain(m *testing.M) {
os.Remove(legacyPath())
exitVal := m.Run()
os.Exit(exitVal)
}

func reset() {
os.Remove(legacyPath())
savedSeed = nil
once = sync.Once{}
}

func TestNoExistingFile(t *testing.T) {
t.Cleanup(reset)
dir := t.TempDir()
l := log.NewNopLogger()
f := filepath.Join(dir, filename)
require.NoFileExists(t, f)
Init(dir, l)
require.FileExists(t, f)
loaded, err := readSeedFile(f, l)
require.NoError(t, err)
seed := Get()
require.Equal(t, seed.UID, loaded.UID)
}

func TestExistingFile(t *testing.T) {
t.Cleanup(reset)
dir := t.TempDir()
l := log.NewNopLogger()
f := filepath.Join(dir, filename)
seed := generateNew()
writeSeedFile(seed, f, l)
Init(dir, l)
require.NotNil(t, savedSeed)
require.Equal(t, seed.UID, savedSeed.UID)
require.Equal(t, seed.UID, Get().UID)
}

func TestNoInitCalled(t *testing.T) {
t.Cleanup(reset)
l := log.NewNopLogger()
seed := Get()
require.NotNil(t, seed)
f := legacyPath()
require.FileExists(t, f)
loaded, err := readSeedFile(f, l)
require.NoError(t, err)
require.Equal(t, seed.UID, loaded.UID)
}

func TestLegacyExists(t *testing.T) {
t.Cleanup(reset)
dir := t.TempDir()
l := log.NewNopLogger()
f := filepath.Join(dir, filename)
seed := generateNew()
writeSeedFile(seed, legacyPath(), l)
Init(dir, l)
require.FileExists(t, f)
require.NotNil(t, savedSeed)
require.Equal(t, seed.UID, savedSeed.UID)
require.Equal(t, seed.UID, Get().UID)
loaded, err := readSeedFile(f, l)
require.NoError(t, err)
require.Equal(t, seed.UID, loaded.UID)
}
9 changes: 9 additions & 0 deletions pkg/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/internal/agentseed"
"github.com/grafana/agent/internal/useragent"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/loki/clients/pkg/promtail"
Expand Down Expand Up @@ -183,6 +184,14 @@ func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) e
return nil
}

uid := agentseed.Get().UID
for _, cfg := range c.ClientConfigs {
if cfg.Headers == nil {
cfg.Headers = map[string]string{}
}
cfg.Headers[agentseed.HeaderName] = uid
}

clientMetrics := client.NewMetrics(i.reg)
cfg := DefaultConfig()
cfg.Global = config.GlobalConfig{
Expand Down
Loading