diff --git a/.gitignore b/.gitignore index 4f5cb31e3c33..54a98fed0465 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,9 @@ /.eventcache vendor data-agent - +/cmd/benchmark/data/ +/cmd/benchmark/main +/cmd/benchmark/grafana-agent-flow /cmd/agent/agent /cmd/agentctl/agentctl /cmd/agent-operator/agent-operator @@ -24,4 +26,4 @@ cover*.out .uptodate node_modules -/docs/variables.mk.local \ No newline at end of file +/docs/variables.mk.local diff --git a/cmd/benchmark/README.md b/cmd/benchmark/README.md new file mode 100644 index 000000000000..564dce3c2194 --- /dev/null +++ b/cmd/benchmark/README.md @@ -0,0 +1,47 @@ +# Benchmark notes + +These are synthetic benchmarks meant to represent common workloads. These are not meant to be exhaustive or fine grained. +These will give a coarse idea of how the agent behaves in a sitations. + +## Running the benchmarks + +Running `PROM_USERNAME="" PROM_PASSWORD="" ./benchmark.sh` will start the benchmark and run for 8 hours. The duration and type of tests +can be adjusted by editting the `benchmark.sh` file. This will start two Agents and the benchmark runner. Relevant CPU and memory metrics +will be sent to the endpoint described in `normal.river`. + +TODO: Add mixin for graph I am using + +## Adjusting the benchmark + +Each benchmark can be adjusted within `test.river`. These settings allow fine tuning to a specific scenario. Each `prometheus.test.metric` component +exposes a service discovery URL that is used to collect the targets. + +## Benchmark categories + +### prometheus.test.metrics "single" + +This roughly represents a single node exporter and is the simpliest use case. Every `10m` 5% of the metrics are replaced driven by `churn_percent`. + +### prometheus.test.metrics "many" + +This roughly represents scraping many node_exporter instances in say a Kubernetes environment. + +### prometheus.test.metrics "large" + +This represents scraping 2 very large instances with 1,000,000 series. + +### prometheus.test.metrics "churn" + +This represents a worst case scenario, 2 large instances with an extremely high churn rate. + +## Adjusting the tests + +`prometheus.relabel` is often a CPU bottleneck so adding additional rules allows you to test the impact of that. + +## Rules + +There are existing rules to only send to the prometheus remote write the specific metrics that matter. These are tagged with the `runtype` and the benchmark. For instance `normal-large`. + +The benchmark starts an endpoint to consume the metrics from `prometheus.test.metrics`, in half the tests it will return HTTP Status 200 and in the other half will return 500. + +TODO add optional pyroscope profiles diff --git a/cmd/benchmark/benchmark.sh b/cmd/benchmark/benchmark.sh new file mode 100755 index 000000000000..348eed20f714 --- /dev/null +++ b/cmd/benchmark/benchmark.sh @@ -0,0 +1,16 @@ +go build -o main + +# each test is ran with the first argument being the name , the second whether the endpoint accepts metrics, the third for the duration and the last being the discovery +# endpont. See test.river for details on each endpoint. +./main metrics churn true 1h "http://127.0.0.1:9001/api/v0/component/prometheus.test.metrics.churn/discovery" +./main metrics churn false 1h "http://127.0.0.1:9001/api/v0/component/prometheus.test.metrics.churn/discovery" + +./main metrics single true 1h "http://127.0.0.1:9001/api/v0/component/prometheus.test.metrics.single/discovery" +./main metrics single false 1h "http://127.0.0.1:9001/api/v0/component/prometheus.test.metrics.single/discovery" + +./main metrics many true 1h "http://127.0.0.1:9001/api/v0/component/prometheus.test.metrics.many/discovery" +./main metrics many false 1h "http://127.0.0.1:9001/api/v0/component/prometheus.test.metrics.many/discovery" + +./main metrics large true 1h "http://127.0.0.1:9001/api/v0/component/prometheus.test.metrics.large/discovery" +./main metrics large false 1h "http://127.0.0.1:9001/api/v0/component/prometheus.test.metrics.large/discovery" + diff --git a/cmd/benchmark/logs.go b/cmd/benchmark/logs.go new file mode 100644 index 000000000000..9b8d3c3e32c1 --- /dev/null +++ b/cmd/benchmark/logs.go @@ -0,0 +1,58 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + "syscall" + "time" +) + +func startLogsRun(run time.Duration) { + allow = true + _ = os.MkdirAll("./data/", 0777) + _ = os.RemoveAll("./data/") + _ = os.Setenv("NAME", "logs") + gen := startLogsGenAgent() + old := startLogsAgent() + fmt.Println("starting logs agent") + defer func() { + _ = old.Process.Kill() + _ = old.Process.Release() + _ = old.Wait() + _ = syscall.Kill(-old.Process.Pid, syscall.SIGKILL) + _ = gen.Process.Kill() + _ = gen.Process.Release() + _ = gen.Wait() + _ = syscall.Kill(-gen.Process.Pid, syscall.SIGKILL) + _ = os.RemoveAll("./data/") + }() + + time.Sleep(run) +} + +func startLogsAgent() *exec.Cmd { + cmd := exec.Command("./grafana-agent-flow", "run", "./logs.river", "--storage.path=./data/logs", "--server.http.listen-addr=127.0.0.1:12346") + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + err := cmd.Start() + if err != nil { + panic(err.Error()) + } + return cmd +} + +func startLogsGenAgent() *exec.Cmd { + cmd := exec.Command("./grafana-agent-flow", "run", "./logsgen.river", "--storage.path=./data/logs-gen", "--server.http.listen-addr=127.0.0.1:12349") + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + err := cmd.Start() + if err != nil { + panic(err.Error()) + } + return cmd +} diff --git a/cmd/benchmark/logs.river b/cmd/benchmark/logs.river new file mode 100644 index 000000000000..55b8b891abbe --- /dev/null +++ b/cmd/benchmark/logs.river @@ -0,0 +1,62 @@ +logging { + level = "debug" +} + +prometheus.scrape "scraper" { + targets = concat([{"__address__" = "localhost:12346"}]) + forward_to = [prometheus.relabel.mutator.receiver] + scrape_interval = "60s" +} + +prometheus.relabel "mutator" { + rule { + source_labels = ["__name__"] + regex = "(.+)" + replacement = "normal" + target_label = "runtype" + } + + rule { + source_labels = ["__name__"] + regex = "(.+)" + replacement = env("NAME") + target_label = "test_name" + } + + rule { + source_labels = ["__name__"] + action = "keep" + regex = "(agent_wal_storage_active_series|agent_resources_process_cpu_seconds_total|go_memstats_alloc_bytes|go_gc_duration_seconds_sum|go_gc_duration_seconds_count|loki_source_file_files_active_total|loki_write_encoded_bytes_total|loki_write_sent_bytes_total|loki_source_file_file_bytes_total)" + } + + forward_to = [prometheus.remote_write.agent_stats.receiver] +} + +prometheus.remote_write "agent_stats" { + endpoint { + url = "https://prometheus-us-central1.grafana.net/api/prom/push" + + basic_auth { + username = env("PROM_USERNAME") + password = env("PROM_PASSWORD") + } + } +} + + +local.file_match "logs" { + path_targets = [ + {__path__ = "./data/logs-gen/loki.test.logs.logs/*.log"}, + ] +} + +loki.source.file "tmpfiles" { + targets = local.file_match.logs.targets + forward_to = [loki.write.local.receiver] +} + +loki.write "local" { + endpoint { + url = "http://localhost:8888/post" + } +} diff --git a/cmd/benchmark/logs.sh b/cmd/benchmark/logs.sh new file mode 100755 index 000000000000..7e09ba52017a --- /dev/null +++ b/cmd/benchmark/logs.sh @@ -0,0 +1,5 @@ +go build -o main + +# each test is ran with the first argument being the name , the second whether the endpoint accepts metrics, the third for the duration and the last being the discovery +# endpont. See test.river for details on each endpoint. +./main logs 1h diff --git a/cmd/benchmark/logsgen.river b/cmd/benchmark/logsgen.river new file mode 100644 index 000000000000..b66cbf99ff9c --- /dev/null +++ b/cmd/benchmark/logsgen.river @@ -0,0 +1,6 @@ +loki.test.logs "logs" { + number_of_files = 100 + file_churn_percent = .25 + file_refresh = "1m" + writes_per_cadence = 1000 +} \ No newline at end of file diff --git a/cmd/benchmark/main.go b/cmd/benchmark/main.go new file mode 100644 index 000000000000..4501f8593274 --- /dev/null +++ b/cmd/benchmark/main.go @@ -0,0 +1,145 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "os/exec" + "strconv" + "syscall" + "time" + + "github.com/gorilla/mux" +) + +// main handles creating the benchmark. +func main() { + username := os.Getenv("PROM_USERNAME") + if username == "" { + panic("PROM_USERNAME env must be set") + } + password := os.Getenv("PROM_PASSWORD") + if password == "" { + panic("PROM_PASSWORD env must be set") + } + + // Start the HTTP server, that can swallow requests. + go httpServer() + // Build the agent + buildAgent() + + benchType := os.Args[1] + if benchType == "metrics" { + name := os.Args[2] + allowWal := os.Args[3] + duration := os.Args[4] + discovery := os.Args[5] + allowWalBool, _ := strconv.ParseBool(allowWal) + parsedDuration, _ := time.ParseDuration(duration) + fmt.Println(name, allowWalBool, parsedDuration, discovery) + + startMetricsRun(name, allowWalBool, parsedDuration, discovery) + } else if benchType == "logs" { + duration := os.Args[2] + parsedDuration, _ := time.ParseDuration(duration) + startLogsRun(parsedDuration) + } else { + panic("unknown benchmark type") + } +} + +func startMetricsRun(name string, allowWAL bool, run time.Duration, discovery string) { + _ = os.RemoveAll("./data/normal-data") + _ = os.RemoveAll("./data/test-data") + + allow = allowWAL + _ = os.Setenv("NAME", name) + _ = os.Setenv("ALLOW_WAL", strconv.FormatBool(allowWAL)) + _ = os.Setenv("DISCOVERY", discovery) + + metric := startMetricsAgent() + fmt.Println("starting metric agent") + defer func() { + _ = metric.Process.Kill() + _ = metric.Process.Release() + _ = metric.Wait() + _ = syscall.Kill(-metric.Process.Pid, syscall.SIGKILL) + _ = os.RemoveAll("./data/test-data") + }() + old := startNormalAgent() + fmt.Println("starting normal agent") + + defer func() { + _ = old.Process.Kill() + _ = old.Process.Release() + _ = old.Wait() + _ = syscall.Kill(-old.Process.Pid, syscall.SIGKILL) + _ = os.RemoveAll("./data/normal-data") + }() + time.Sleep(run) +} + +func buildAgent() { + cmd := exec.Command("go", "build", "../grafana-agent-flow") + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + panic(err.Error()) + } +} + +func startNormalAgent() *exec.Cmd { + cmd := exec.Command("./grafana-agent-flow", "run", "./normal.river", "--storage.path=./data/normal-data", "--server.http.listen-addr=127.0.0.1:12346") + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + //cmd.Stdout = os.Stdout + //cmd.Stderr = os.Stderr + err := cmd.Start() + if err != nil { + panic(err.Error()) + } + return cmd +} + +func startMetricsAgent() *exec.Cmd { + cmd := exec.Command("./grafana-agent-flow", "run", "./test.river", "--storage.path=./data/test-data", "--server.http.listen-addr=127.0.0.1:9001") + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + err := cmd.Start() + if err != nil { + panic(err.Error()) + } + return cmd +} + +var allow = false + +func httpServer() { + r := mux.NewRouter() + r.HandleFunc("/post", func(w http.ResponseWriter, r *http.Request) { + handlePost(w, r) + }) + r.HandleFunc("/allow", func(w http.ResponseWriter, r *http.Request) { + println("allowing") + allow = true + }) + r.HandleFunc("/block", func(w http.ResponseWriter, r *http.Request) { + println("blocking") + allow = false + }) + http.Handle("/", r) + println("Starting server") + err := http.ListenAndServe(":8888", nil) + if err != nil { + println(err) + } +} + +func handlePost(w http.ResponseWriter, _ *http.Request) { + if allow { + return + } else { + println("returning 500") + w.WriteHeader(500) + } +} diff --git a/cmd/benchmark/normal.river b/cmd/benchmark/normal.river new file mode 100644 index 000000000000..e1c7fbbb9513 --- /dev/null +++ b/cmd/benchmark/normal.river @@ -0,0 +1,149 @@ +logging { + level = "debug" +} + +discovery.http "disco" { + url = env("DISCOVERY") +} + +prometheus.scrape "scraper" { + targets = concat([{"__address__" = "localhost:12346"}]) + forward_to = [prometheus.relabel.mutator.receiver] + scrape_interval = "60s" +} + +logging { + level = "debug" +} + +discovery.http "disco" { + url = env("DISCOVERY") +} + +prometheus.scrape "scraper" { + targets = concat([{"__address__" = "localhost:12346"}]) + forward_to = [prometheus.relabel.mutator.receiver] + scrape_interval = "60s" +} + +prometheus.relabel "mutator" { + rule { + source_labels = ["__name__"] + regex = "(.+)" + replacement = "normal" + target_label = "runtype" + } + + rule { + source_labels = ["__name__"] + regex = "(.+)" + replacement = env("NAME") + target_label = "test_name" + } + + rule { + source_labels = ["__name__"] + regex = "(.+)" + replacement = env("ALLOW_WAL") + target_label = "remote_write_enable" + } + + rule { + source_labels = ["__name__"] + regex = "(.+)" + replacement = env("DISCOVERY") + target_label = "discovery" + } + + rule { + source_labels = ["__name__"] + action = "keep" + regex = "(agent_wal_storage_active_series|agent_resources_process_cpu_seconds_total|go_memstats_alloc_bytes|go_gc_duration_seconds_sum|go_gc_duration_seconds_count)" + } + + forward_to = [prometheus.remote_write.agent_stats.receiver] +} + +prometheus.remote_write "agent_stats" { + endpoint { + url = "https://prometheus-us-central1.grafana.net/api/prom/push" + + basic_auth { + username = env("PROM_USERNAME") + password = env("PROM_PASSWORD") + } + } +} + +prometheus.scrape "data" { + targets = discovery.http.disco.targets + forward_to = [prometheus.remote_write.empty.receiver] + scrape_interval = "60s" +} + +prometheus.remote_write "empty" { + endpoint { + url = "http://localhost:8888/post" + } +} + +prometheus.relabel "mutator" { + rule { + source_labels = ["__name__"] + regex = "(.+)" + replacement = "normal" + target_label = "runtype" + } + + rule { + source_labels = ["__name__"] + regex = "(.+)" + replacement = env("NAME") + target_label = "test_name" + } + + rule { + source_labels = ["__name__"] + regex = "(.+)" + replacement = env("ALLOW_WAL") + target_label = "remote_write_enable" + } + + rule { + source_labels = ["__name__"] + regex = "(.+)" + replacement = env("DISCOVERY") + target_label = "discovery" + } + + rule { + source_labels = ["__name__"] + action = "keep" + regex = "(agent_wal_storage_active_series|agent_resources_process_cpu_seconds_total|go_memstats_alloc_bytes|go_gc_duration_seconds_sum|go_gc_duration_seconds_count)" + } + + forward_to = [prometheus.remote_write.agent_stats.receiver] +} + +prometheus.remote_write "agent_stats" { + endpoint { + url = "https://prometheus-us-central1.grafana.net/api/prom/push" + + basic_auth { + username = env("PROM_USERNAME") + password = env("PROM_PASSWORD") + } + } +} + +prometheus.scrape "data" { + targets = discovery.http.disco.targets + forward_to = [prometheus.remote_write.empty.receiver] + scrape_interval = "60s" +} + +prometheus.remote_write "empty" { + endpoint { + url = "http://localhost:8888/post" + } +} diff --git a/cmd/benchmark/test.river b/cmd/benchmark/test.river new file mode 100644 index 000000000000..ba146d60bdb0 --- /dev/null +++ b/cmd/benchmark/test.river @@ -0,0 +1,33 @@ +// This is meant to mimic handling a single node_exporter instance. +prometheus.test.metrics "single" { + number_of_instances = 1 + number_of_metrics = 2000 + number_of_labels = 5 + metrics_refresh = "10m" + churn_percent = 0.05 +} + +// This is meant to mimic handling many node_exporter instances. +prometheus.test.metrics "many" { + number_of_instances = 1000 + number_of_metrics = 2000 + number_of_labels = 5 + metrics_refresh = "10m" + churn_percent = 0.05 +} + +prometheus.test.metrics "large" { + number_of_instances = 2 + number_of_metrics = 1000000 + number_of_labels = 9 + metrics_refresh = "10m" + churn_percent = 0.05 +} + +prometheus.test.metrics "churn" { + number_of_instances = 2 + number_of_metrics = 200000 + number_of_labels = 12 + metrics_refresh = "10m" + churn_percent = 0.50 +} diff --git a/component/all/all.go b/component/all/all.go index 26acd30ce946..e41c7e21d939 100644 --- a/component/all/all.go +++ b/component/all/all.go @@ -52,6 +52,7 @@ import ( _ "github.com/grafana/agent/component/loki/source/podlogs" // Import loki.source.podlogs _ "github.com/grafana/agent/component/loki/source/syslog" // Import loki.source.syslog _ "github.com/grafana/agent/component/loki/source/windowsevent" // Import loki.source.windowsevent + _ "github.com/grafana/agent/component/loki/test/logs" // Import loki.test.logs _ "github.com/grafana/agent/component/loki/write" // Import loki.write _ "github.com/grafana/agent/component/mimir/rules/kubernetes" // Import mimir.rules.kubernetes _ "github.com/grafana/agent/component/module/file" // Import module.file diff --git a/component/loki/test/logs/component.go b/component/loki/test/logs/component.go new file mode 100644 index 000000000000..663a00328f13 --- /dev/null +++ b/component/loki/test/logs/component.go @@ -0,0 +1,205 @@ +package logs + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math/rand" + "os" + "path/filepath" + "strconv" + "sync" + "time" + + "github.com/brianvoe/gofakeit/v6" + "github.com/grafana/agent/component" +) + +func init() { + component.Register(component.Registration{ + Name: "loki.test.logs", + Args: Arguments{}, + Exports: Exports{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return NewComponent(opts, args.(Arguments)) + }, + }) +} + +type Component struct { + mut sync.Mutex + o component.Options + index int + files []string + args Arguments + argsChan chan Arguments + writeTicker *time.Ticker + churnTicker *time.Ticker +} + +func NewComponent(o component.Options, c Arguments) (*Component, error) { + err := os.MkdirAll(o.DataPath, 0750) + if err != nil { + return nil, err + } + entries, _ := os.ReadDir(o.DataPath) + for _, e := range entries { + if e.IsDir() { + continue + } + _ = os.Remove(filepath.Join(o.DataPath, e.Name())) + } + comp := &Component{ + args: c, + index: 1, + files: make([]string, 0), + writeTicker: time.NewTicker(c.WriteCadence), + churnTicker: time.NewTicker(c.FileRefresh), + argsChan: make(chan Arguments), + o: o, + } + o.OnStateChange(Exports{Directory: o.DataPath}) + return comp, nil +} + +func (c *Component) Run(ctx context.Context) error { + defer c.writeTicker.Stop() + defer c.churnTicker.Stop() + // Create the initial set of files. + c.churnFiles() + for { + select { + case <-ctx.Done(): + return nil + case <-c.writeTicker.C: + c.writeFiles() + case <-c.churnTicker.C: + c.churnFiles() + case args := <-c.argsChan: + c.args = args + c.writeTicker.Reset(c.args.WriteCadence) + c.churnTicker.Reset(c.args.FileRefresh) + } + } +} + +func (c *Component) Update(args component.Arguments) error { + c.argsChan <- args.(Arguments) + return nil +} + +func (c *Component) writeFiles() { + c.mut.Lock() + defer c.mut.Unlock() + + // TODO add error handling and figure out why some files are 0 bytes. + for _, f := range c.files { + bb := bytes.Buffer{} + for i := 0; i <= c.args.WritesPerCadence; i++ { + attributes := make(map[string]string) + attributes["ts"] = time.Now().Format(time.RFC3339) + msgLen := 0 + if c.args.MessageMaxLength == c.args.MessageMinLength { + msgLen = c.args.MessageMinLength + } else { + msgLen = rand.Intn(c.args.MessageMaxLength-c.args.MessageMinLength) + c.args.MessageMinLength + + } + attributes["msg"] = gofakeit.LetterN(uint(msgLen)) + for k, v := range c.args.Labels { + attributes[k] = v + } + data, err := json.Marshal(attributes) + if err != nil { + continue + } + bb.Write(data) + bb.WriteString("\n") + } + fh, err := os.OpenFile(f, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + continue + } + _, _ = fh.Write(bb.Bytes()) + _ = fh.Close() + } +} + +func (c *Component) churnFiles() { + c.mut.Lock() + defer c.mut.Unlock() + + if c.args.NumberOfFiles > len(c.files) { + fullpath := filepath.Join(c.o.DataPath, strconv.Itoa(c.index)+".log") + c.files = append(c.files, fullpath) + _ = os.WriteFile(fullpath, []byte(""), 0644) + c.index++ + } else if c.args.NumberOfFiles < len(c.files) { + c.files = c.files[:c.args.NumberOfFiles] + } + + churn := int(float64(c.args.NumberOfFiles) * c.args.FileChurnPercent) + for i := 0; i < churn; i++ { + candidate := rand.Intn(len(c.files)) + fullpath := filepath.Join(c.o.DataPath, strconv.Itoa(c.index)+".log") + c.files = append(c.files, fullpath) + _ = os.WriteFile(fullpath, []byte(""), 0644) + c.index++ + c.files[candidate] = fullpath + } +} + +type Arguments struct { + // WriteCadance is the interval at which it will write to a file. + WriteCadence time.Duration `river:"write_cadence,attr,optional"` + WritesPerCadence int `river:"writes_per_cadence,attr,optional"` + NumberOfFiles int `river:"number_of_files,attr,optional"` + Labels map[string]string `river:"labels,attr,optional"` + MessageMaxLength int `river:"message_max_length,attr,optional"` + MessageMinLength int `river:"message_min_length,attr,optional"` + FileChurnPercent float64 `river:"file_churn_percent,attr,optional"` + // FileRefresh is the interval at which it will stop writing to a number of files equal to churn percent and start new ones. + FileRefresh time.Duration `river:"file_refresh,attr,optional"` +} + +// SetToDefault implements river.Defaulter. +func (r *Arguments) SetToDefault() { + *r = DefaultArguments() +} + +func DefaultArguments() Arguments { + return Arguments{ + WriteCadence: 1 * time.Second, + NumberOfFiles: 1, + MessageMaxLength: 100, + MessageMinLength: 10, + FileChurnPercent: 0.1, + FileRefresh: 1 * time.Minute, + WritesPerCadence: 1, + } +} + +// Validate implements river.Validator. +func (r *Arguments) Validate() error { + if r.NumberOfFiles <= 0 { + return fmt.Errorf("number_of_files must be greater than 0") + } + if r.MessageMaxLength < r.MessageMinLength { + return fmt.Errorf("message_max_length must be greater than or equal to message_min_length") + } + if r.FileChurnPercent < 0 || r.FileChurnPercent > 1 { + return fmt.Errorf("file_churn_percent must be between 0 and 1") + } + if r.WriteCadence < 0 { + return fmt.Errorf("write_cadence must be greater than 0") + } + if r.FileRefresh < 0 { + return fmt.Errorf("file_refresh must be greater than 0") + } + return nil +} + +type Exports struct { + Directory string `river:"directory,attr"` +} diff --git a/component/loki/test/logs/component_test.go b/component/loki/test/logs/component_test.go new file mode 100644 index 000000000000..af3432080eaa --- /dev/null +++ b/component/loki/test/logs/component_test.go @@ -0,0 +1,55 @@ +package logs + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/grafana/agent/component" + "github.com/stretchr/testify/require" +) + +func TestLogs(t *testing.T) { + dir := t.TempDir() + c, err := NewComponent( + component.Options{ + DataPath: dir, + }, + Arguments{ + WriteCadence: 1 * time.Second, + NumberOfFiles: 1, + MessageMaxLength: 10, + MessageMinLength: 10, + FileChurnPercent: 0, + FileRefresh: 1 * time.Minute, + }) + require.NoError(t, err) + ctx := context.Background() + ctx, cncl := context.WithTimeout(ctx, 5*time.Second) + c.Run(ctx) + cncl() + files, err := os.ReadDir(dir) + require.NoError(t, err) + require.Equal(t, 1, len(files)) + fi, err := files[0].Info() + require.NoError(t, err) + + require.True(t, fi.Name() == "1.log") + require.True(t, fi.Size() > 0) + data, err := os.ReadFile(filepath.Join(dir, fi.Name())) + require.NoError(t, err) + lines := strings.Split(string(data), "\n") + for _, l := range lines { + if len(l) == 0 { + continue + } + entry := make(map[string]string) + err = json.Unmarshal([]byte(l), &entry) + require.NoError(t, err) + } + +} diff --git a/go.mod b/go.mod index 6c82f970e4b7..d77a26af60d1 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/grafana/agent -go 1.21.0 +go 1.21 require ( cloud.google.com/go/pubsub v1.33.0 @@ -609,6 +609,7 @@ require ( require github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab require ( + github.com/brianvoe/gofakeit/v6 v6.26.3 github.com/githubexporter/github-exporter v0.0.0-20231025122338-656e7dc33fe7 github.com/natefinch/atomic v1.0.1 github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.87.0 diff --git a/go.sum b/go.sum index 648c487e8ed2..cecc8ba256c4 100644 --- a/go.sum +++ b/go.sum @@ -427,6 +427,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/boynux/squid-exporter v1.10.5-0.20230618153315-c1fae094e18e h1:C1vYe728vM2FpXaICJuDRt5zgGyRdMmUGYnVfM7WcLY= github.com/boynux/squid-exporter v1.10.5-0.20230618153315-c1fae094e18e/go.mod h1:8NpZERGK+R9DGuZqqsKfnf2qI/rh7yBT8End29IvgNA= +github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8= +github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= github.com/bufbuild/connect-go v1.10.0 h1:QAJ3G9A1OYQW2Jbk3DeoJbkCxuKArrvZgDt47mjdTbg= github.com/bufbuild/connect-go v1.10.0/go.mod h1:CAIePUgkDR5pAFaylSMtNK45ANQjp9JvpluG20rhpV8= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=