Skip to content

Commit

Permalink
feat(loki/src/k8s): not restart tailers in loki.source.kubernetes c…
Browse files Browse the repository at this point in the history
…omponent by above-average time deltas if K8s version is > 1.29.0 (#6263)

Signed-off-by: hainenber <[email protected]>
  • Loading branch information
hainenber authored Apr 18, 2024
1 parent 0a9bdad commit a34ca07
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 33 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Main (unreleased)
- Increased the alert interval and renamed the `ClusterSplitBrain` alert to `ClusterNodeCountMismatch` in the Grafana
Agent Mixin to better match the alert conditions. (@thampiotr)

- Not restart tailers in `loki.source.kubernetes` component by above-average time deltas if K8s version is >= 1.29.1 (@hainenber)

- Add conversion from static to flow mode for `loki.source.windowsevent` via `legacy_bookmark_path`. (@mattdurham)

- Add ability to convert static mode positions file to `loki.source.file` compatible via `legacy_positions_file` argument. (@mattdurham)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ require (
github.com/aws/smithy-go v1.20.1 // indirect
github.com/beevik/ntp v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/blang/semver/v4 v4.0.0
github.com/boynux/squid-exporter v1.10.5-0.20230618153315-c1fae094e18e
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b // indirect
github.com/cenkalti/backoff/v3 v3.0.0 // indirect
Expand Down
81 changes: 49 additions & 32 deletions internal/component/loki/source/kubernetes/kubetail/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
"github.com/grafana/agent/internal/component/common/loki"
"github.com/grafana/agent/internal/flow/logging/level"
Expand Down Expand Up @@ -170,6 +171,15 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
return err
}

k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion()
if err != nil {
return err
}
k8sComparableServerVersion, err := semver.ParseTolerant(k8sServerVersion.GitVersion)
if err != nil {
return err
}

// Create a new rolling average calculator to determine the average delta
// time between log entries.
//
Expand All @@ -180,41 +190,48 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
// The computed average will never be less than the minimum of 2s.
calc := newRollingAverageCalculator(10000, 100, 2*time.Second, maxTailerLifetime)

go func() {
rolledFileTicker := time.NewTicker(1 * time.Second)
defer func() {
rolledFileTicker.Stop()
_ = stream.Close()
}()
for {
select {
case <-ctx.Done():
return
case <-rolledFileTicker.C:
// Versions of Kubernetes which do not contain
// kubernetes/kubernetes#115702 will fail to detect rolled log files
// and stop sending logs to us.
//
// To work around this, we use a rolling average to determine how
// frequent we usually expect to see entries. If 3x the normal delta has
// elapsed, we'll restart the tailer.
//
// False positives here are acceptable, but false negatives mean that
// we'll have a larger spike of missing logs until we detect a rolled
// file.
avg := calc.GetAverage()
last := calc.GetLast()
if last.IsZero() {
continue
}
s := time.Since(last)
if s > avg*3 {
level.Info(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s)
// Versions of Kubernetes which do not contain
// kubernetes/kubernetes#115702 (<= v1.29.1) will fail to detect rotated log files
// and stop sending logs to us.
//
// To work around this, we use a rolling average to determine how
// frequent we usually expect to see entries. If 3x the normal delta has
// elapsed, we'll restart the tailer.
//
// False positives here are acceptable, but false negatives mean that
// we'll have a larger spike of missing logs until we detect a rolled
// file.
if k8sComparableServerVersion.LT(semver.Version{Major: 1, Minor: 29, Patch: 0}) {
go func() {
rolledFileTicker := time.NewTicker(1 * time.Second)
defer func() {
rolledFileTicker.Stop()
_ = stream.Close()
}()
for {
select {
case <-ctx.Done():
return
case <-rolledFileTicker.C:
avg := calc.GetAverage()
last := calc.GetLast()
if last.IsZero() {
continue
}
s := time.Since(last)
if s > avg*3 {
level.Debug(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s)
return
}
}
}
}
}()
}()
} else {
go func() {
<-ctx.Done()
_ = stream.Close()
}()
}

level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)

Expand Down

0 comments on commit a34ca07

Please sign in to comment.