Skip to content

Commit

Permalink
Add more details on logs and get global view of bytes.
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdurham committed Jan 2, 2024
1 parent 266bc3f commit 5686436
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 25 deletions.
4 changes: 2 additions & 2 deletions cmd/benchmark/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func startLogsRun(run time.Duration) {
func startLogsAgent() *exec.Cmd {
cmd := exec.Command("./grafana-agent-flow", "run", "./logs.river", "--storage.path=./data/logs", "--server.http.listen-addr=127.0.0.1:12346")
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
//cmd.Stdout = os.Stdout
//cmd.Stderr = os.Stderr

err := cmd.Start()
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions cmd/benchmark/logs.river
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ prometheus.relabel "mutator" {
rule {
source_labels = ["__name__"]
action = "keep"
regex = "(agent_wal_storage_active_series|agent_resources_process_cpu_seconds_total|go_memstats_alloc_bytes|go_gc_duration_seconds_sum|go_gc_duration_seconds_count|loki_source_file_files_active_total|loki_write_encoded_bytes_total|loki_write_sent_bytes_total|loki_source_file_file_bytes_total)"
regex = "(agent_wal_storage_active_series|agent_resources_process_cpu_seconds_total|go_memstats_alloc_bytes|go_gc_duration_seconds_sum|go_gc_duration_seconds_count|loki_source_file_files_active_total|loki_write_encoded_bytes_total|loki_write_sent_bytes_total|loki_sum_source_file_read_bytes_total)"
}

forward_to = [prometheus.remote_write.agent_stats.receiver]
Expand All @@ -43,7 +43,6 @@ prometheus.remote_write "agent_stats" {
}
}


local.file_match "logs" {
path_targets = [
{__path__ = "./data/logs-gen/loki.test.logs.logs/*.log"},
Expand Down
14 changes: 9 additions & 5 deletions cmd/benchmark/logsgen.river
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
loki.test.logs "logs" {
number_of_files = 100
file_churn_percent = .25
file_refresh = "1m"
writes_per_cadence = 1000
}
number_of_files = 100
file_churn_percent = .25
file_refresh = "1m"
writes_per_cadence = 100

labels = {
"test" = "me",
}
}
10 changes: 7 additions & 3 deletions component/loki/source/file/decompresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ type decompressor struct {

decoder *encoding.Decoder

position int64
size int64
cfg DecompressionConfig
position int64
previousPosition int64
size int64
cfg DecompressionConfig
}

func newDecompressor(
Expand Down Expand Up @@ -267,6 +268,9 @@ func (d *decompressor) MarkPositionAndSize() error {

d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size))
d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position))
d.metrics.totalReadBytes.Add(float64(d.position - d.previousPosition))

d.previousPosition = d.position
d.positions.Put(d.path, d.labels, d.position)

return nil
Expand Down
8 changes: 8 additions & 0 deletions component/loki/source/file/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type metrics struct {

// File-specific metrics
readBytes *prometheus.GaugeVec
totalReadBytes prometheus.Gauge
totalBytes *prometheus.GaugeVec
readLines *prometheus.CounterVec
encodingFailures *prometheus.CounterVec
Expand All @@ -29,6 +30,12 @@ func newMetrics(reg prometheus.Registerer) *metrics {
Name: "loki_source_file_read_bytes_total",
Help: "Number of bytes read.",
}, []string{"path"})
g := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "loki_sum_source_file_read_bytes_total",
Help: "Number of bytes read across all files.",
})
m.totalReadBytes = g

m.totalBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "loki_source_file_file_bytes_total",
Help: "Number of bytes total.",
Expand All @@ -49,6 +56,7 @@ func newMetrics(reg prometheus.Registerer) *metrics {
if reg != nil {
reg.MustRegister(
m.readBytes,
m.totalReadBytes,
m.totalBytes,
m.readLines,
m.encodingFailures,
Expand Down
10 changes: 7 additions & 3 deletions component/loki/source/file/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ type tailer struct {
handler loki.EntryHandler
positions positions.Positions

path string
labels string
tail *tail.Tail
path string
labels string
tail *tail.Tail
previousPos int64

posAndSizeMtx sync.Mutex
stopOnce sync.Once
Expand Down Expand Up @@ -293,6 +294,9 @@ func (t *tailer) MarkPositionAndSize() error {
// Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped.
t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size))
t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos))

t.metrics.totalReadBytes.Add(float64(pos - t.previousPos))
t.previousPos = pos
t.positions.Put(t.path, t.labels, pos)

return nil
Expand Down
27 changes: 17 additions & 10 deletions component/loki/test/logs/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/brianvoe/gofakeit/v6"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
)

Expand Down Expand Up @@ -67,7 +68,7 @@ func (c *Component) Run(ctx context.Context) error {
defer c.writeTicker.Stop()
defer c.churnTicker.Stop()
// Create the initial set of files.
c.churnFiles()
c.createFiles()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -104,7 +105,6 @@ func (c *Component) writeFiles() {
msgLen = c.args.MessageMinLength
} else {
msgLen = rand.Intn(c.args.MessageMaxLength-c.args.MessageMinLength) + c.args.MessageMinLength

}
attributes["msg"] = gofakeit.LetterN(uint(msgLen))
for k, v := range c.args.Labels {
Expand All @@ -117,34 +117,41 @@ func (c *Component) writeFiles() {
bb.Write(data)
bb.WriteString("\n")
}
fh, err := os.OpenFile(f, os.O_APPEND|os.O_WRONLY, 0644)
fh, err := os.OpenFile(f, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0644)
if err != nil {
level.Error(c.o.Logger).Log("msg", "error opening file", "file", f, "err", err)
_ = fh.Close()
continue
}
_, _ = fh.Write(bb.Bytes())
_, err = fh.Write(bb.Bytes())
if err != nil {
level.Error(c.o.Logger).Log("msg", "error writing file", "file", f, "err", err)
}
_ = fh.Close()
}
}

func (c *Component) churnFiles() {
c.mut.Lock()
defer c.mut.Unlock()

func (c *Component) createFiles() {
if c.args.NumberOfFiles > len(c.files) {
fullpath := filepath.Join(c.o.DataPath, strconv.Itoa(c.index)+".log")
c.files = append(c.files, fullpath)
_ = os.WriteFile(fullpath, []byte(""), 0644)
c.index++
} else if c.args.NumberOfFiles < len(c.files) {
c.files = c.files[:c.args.NumberOfFiles]
}
}

func (c *Component) churnFiles() {
c.mut.Lock()
defer c.mut.Unlock()

c.createFiles()

churn := int(float64(c.args.NumberOfFiles) * c.args.FileChurnPercent)
for i := 0; i < churn; i++ {
candidate := rand.Intn(len(c.files))
fullpath := filepath.Join(c.o.DataPath, strconv.Itoa(c.index)+".log")
c.files = append(c.files, fullpath)
_ = os.WriteFile(fullpath, []byte(""), 0644)
c.index++
c.files[candidate] = fullpath
}
Expand Down

0 comments on commit 5686436

Please sign in to comment.