From 56864361ecf1d973b596963102d9ef474676a2d8 Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 2 Jan 2024 10:02:15 -0500 Subject: [PATCH] Add more details on logs and get global view of bytes. --- cmd/benchmark/logs.go | 4 ++-- cmd/benchmark/logs.river | 3 +-- cmd/benchmark/logsgen.river | 14 +++++++---- component/loki/source/file/decompresser.go | 10 +++++--- component/loki/source/file/metrics.go | 8 +++++++ component/loki/source/file/tailer.go | 10 +++++--- component/loki/test/logs/component.go | 27 ++++++++++++++-------- 7 files changed, 51 insertions(+), 25 deletions(-) diff --git a/cmd/benchmark/logs.go b/cmd/benchmark/logs.go index 9b8d3c3e32c1..655578845841 100644 --- a/cmd/benchmark/logs.go +++ b/cmd/benchmark/logs.go @@ -34,8 +34,8 @@ func startLogsRun(run time.Duration) { func startLogsAgent() *exec.Cmd { cmd := exec.Command("./grafana-agent-flow", "run", "./logs.river", "--storage.path=./data/logs", "--server.http.listen-addr=127.0.0.1:12346") cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + //cmd.Stdout = os.Stdout + //cmd.Stderr = os.Stderr err := cmd.Start() if err != nil { diff --git a/cmd/benchmark/logs.river b/cmd/benchmark/logs.river index 55b8b891abbe..7708795aeebb 100644 --- a/cmd/benchmark/logs.river +++ b/cmd/benchmark/logs.river @@ -26,7 +26,7 @@ prometheus.relabel "mutator" { rule { source_labels = ["__name__"] action = "keep" - regex = "(agent_wal_storage_active_series|agent_resources_process_cpu_seconds_total|go_memstats_alloc_bytes|go_gc_duration_seconds_sum|go_gc_duration_seconds_count|loki_source_file_files_active_total|loki_write_encoded_bytes_total|loki_write_sent_bytes_total|loki_source_file_file_bytes_total)" + regex = "(agent_wal_storage_active_series|agent_resources_process_cpu_seconds_total|go_memstats_alloc_bytes|go_gc_duration_seconds_sum|go_gc_duration_seconds_count|loki_source_file_files_active_total|loki_write_encoded_bytes_total|loki_write_sent_bytes_total|loki_sum_source_file_read_bytes_total)" } forward_to = [prometheus.remote_write.agent_stats.receiver] @@ -43,7 +43,6 @@ prometheus.remote_write "agent_stats" { } } - local.file_match "logs" { path_targets = [ {__path__ = "./data/logs-gen/loki.test.logs.logs/*.log"}, diff --git a/cmd/benchmark/logsgen.river b/cmd/benchmark/logsgen.river index b66cbf99ff9c..b0fbfe11af85 100644 --- a/cmd/benchmark/logsgen.river +++ b/cmd/benchmark/logsgen.river @@ -1,6 +1,10 @@ loki.test.logs "logs" { - number_of_files = 100 - file_churn_percent = .25 - file_refresh = "1m" - writes_per_cadence = 1000 -} \ No newline at end of file + number_of_files = 100 + file_churn_percent = .25 + file_refresh = "1m" + writes_per_cadence = 100 + + labels = { + "test" = "me", + } +} diff --git a/component/loki/source/file/decompresser.go b/component/loki/source/file/decompresser.go index c70ec2b7b94b..19e44183073e 100644 --- a/component/loki/source/file/decompresser.go +++ b/component/loki/source/file/decompresser.go @@ -58,9 +58,10 @@ type decompressor struct { decoder *encoding.Decoder - position int64 - size int64 - cfg DecompressionConfig + position int64 + previousPosition int64 + size int64 + cfg DecompressionConfig } func newDecompressor( @@ -267,6 +268,9 @@ func (d *decompressor) MarkPositionAndSize() error { d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size)) d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position)) + d.metrics.totalReadBytes.Add(float64(d.position - d.previousPosition)) + + d.previousPosition = d.position d.positions.Put(d.path, d.labels, d.position) return nil diff --git a/component/loki/source/file/metrics.go b/component/loki/source/file/metrics.go index 4f8be70fb706..23bbeb3da6a6 100644 --- a/component/loki/source/file/metrics.go +++ b/component/loki/source/file/metrics.go @@ -13,6 +13,7 @@ type metrics struct { // File-specific metrics readBytes *prometheus.GaugeVec + totalReadBytes prometheus.Gauge totalBytes *prometheus.GaugeVec readLines *prometheus.CounterVec encodingFailures *prometheus.CounterVec @@ -29,6 +30,12 @@ func newMetrics(reg prometheus.Registerer) *metrics { Name: "loki_source_file_read_bytes_total", Help: "Number of bytes read.", }, []string{"path"}) + g := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "loki_sum_source_file_read_bytes_total", + Help: "Number of bytes read across all files.", + }) + m.totalReadBytes = g + m.totalBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "loki_source_file_file_bytes_total", Help: "Number of bytes total.", @@ -49,6 +56,7 @@ func newMetrics(reg prometheus.Registerer) *metrics { if reg != nil { reg.MustRegister( m.readBytes, + m.totalReadBytes, m.totalBytes, m.readLines, m.encodingFailures, diff --git a/component/loki/source/file/tailer.go b/component/loki/source/file/tailer.go index c05fcda166f6..7e062e853ff6 100644 --- a/component/loki/source/file/tailer.go +++ b/component/loki/source/file/tailer.go @@ -32,9 +32,10 @@ type tailer struct { handler loki.EntryHandler positions positions.Positions - path string - labels string - tail *tail.Tail + path string + labels string + tail *tail.Tail + previousPos int64 posAndSizeMtx sync.Mutex stopOnce sync.Once @@ -293,6 +294,9 @@ func (t *tailer) MarkPositionAndSize() error { // Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped. t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size)) t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos)) + + t.metrics.totalReadBytes.Add(float64(pos - t.previousPos)) + t.previousPos = pos t.positions.Put(t.path, t.labels, pos) return nil diff --git a/component/loki/test/logs/component.go b/component/loki/test/logs/component.go index 663a00328f13..13df8be7969e 100644 --- a/component/loki/test/logs/component.go +++ b/component/loki/test/logs/component.go @@ -13,6 +13,7 @@ import ( "time" "github.com/brianvoe/gofakeit/v6" + "github.com/go-kit/log/level" "github.com/grafana/agent/component" ) @@ -67,7 +68,7 @@ func (c *Component) Run(ctx context.Context) error { defer c.writeTicker.Stop() defer c.churnTicker.Stop() // Create the initial set of files. - c.churnFiles() + c.createFiles() for { select { case <-ctx.Done(): @@ -104,7 +105,6 @@ func (c *Component) writeFiles() { msgLen = c.args.MessageMinLength } else { msgLen = rand.Intn(c.args.MessageMaxLength-c.args.MessageMinLength) + c.args.MessageMinLength - } attributes["msg"] = gofakeit.LetterN(uint(msgLen)) for k, v := range c.args.Labels { @@ -117,34 +117,41 @@ func (c *Component) writeFiles() { bb.Write(data) bb.WriteString("\n") } - fh, err := os.OpenFile(f, os.O_APPEND|os.O_WRONLY, 0644) + fh, err := os.OpenFile(f, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0644) if err != nil { + level.Error(c.o.Logger).Log("msg", "error opening file", "file", f, "err", err) + _ = fh.Close() continue } - _, _ = fh.Write(bb.Bytes()) + _, err = fh.Write(bb.Bytes()) + if err != nil { + level.Error(c.o.Logger).Log("msg", "error writing file", "file", f, "err", err) + } _ = fh.Close() } } -func (c *Component) churnFiles() { - c.mut.Lock() - defer c.mut.Unlock() - +func (c *Component) createFiles() { if c.args.NumberOfFiles > len(c.files) { fullpath := filepath.Join(c.o.DataPath, strconv.Itoa(c.index)+".log") c.files = append(c.files, fullpath) - _ = os.WriteFile(fullpath, []byte(""), 0644) c.index++ } else if c.args.NumberOfFiles < len(c.files) { c.files = c.files[:c.args.NumberOfFiles] } +} + +func (c *Component) churnFiles() { + c.mut.Lock() + defer c.mut.Unlock() + + c.createFiles() churn := int(float64(c.args.NumberOfFiles) * c.args.FileChurnPercent) for i := 0; i < churn; i++ { candidate := rand.Intn(len(c.files)) fullpath := filepath.Join(c.o.DataPath, strconv.Itoa(c.index)+".log") c.files = append(c.files, fullpath) - _ = os.WriteFile(fullpath, []byte(""), 0644) c.index++ c.files[candidate] = fullpath }