Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code to check for staleness in labelstore #5970

Merged
merged 6 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ Main (unreleased)

- Fixes `otelcol.connector.servicegraph` store ttl default value from 2ms to 2s. (@rlankfo)

- Add staleness tracking to labelstore to reduce memory usage. (@mattdurham)

### Other changes

- Bump github.com/IBM/sarama from v1.41.2 to v1.42.1
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (fr *flowRun) Run(configPath string) error {
return fmt.Errorf("failed to create otel service")
}

labelService := labelstore.New(l)
labelService := labelstore.New(l, reg)

f := flow.New(flow.Options{
Logger: l,
Expand Down
12 changes: 8 additions & 4 deletions component/prometheus/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import (
"time"

"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/client_golang/prometheus"

"github.com/hashicorp/go-multierror"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"

"github.com/prometheus/prometheus/storage"
)

Expand Down Expand Up @@ -112,6 +110,12 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}
if value.IsStaleNaN(v) {
a.ls.AddStaleMarker(uint64(ref), l)
} else {
// Tested this to ensure it had no cpu impact, since it is called so often.
a.ls.RemoveStaleMarker(uint64(ref))
}
var multiErr error
updated := false
for _, x := range a.children {
Expand Down
4 changes: 2 additions & 2 deletions component/prometheus/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ import (
)

func TestRollback(t *testing.T) {
ls := labelstore.New(nil)
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
app := fanout.Appender(context.Background())
err := app.Rollback()
require.NoError(t, err)
}

func TestCommit(t *testing.T) {
ls := labelstore.New(nil)
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
app := fanout.Appender(context.Background())
err := app.Commit()
Expand Down
8 changes: 8 additions & 0 deletions component/prometheus/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
)

Expand Down Expand Up @@ -102,6 +103,13 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}

if value.IsStaleNaN(v) {
a.ls.AddStaleMarker(uint64(ref), l)
} else {
// Tested this to ensure it had no cpu impact, since it is called so often.
a.ls.RemoveStaleMarker(uint64(ref))
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
}

if a.interceptor.onAppend != nil {
return a.interceptor.onAppend(ref, l, t, v, a.child)
}
Expand Down
5 changes: 3 additions & 2 deletions component/prometheus/operator/common/crdmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/agent/component/prometheus/operator"
"github.com/grafana/agent/service/cluster"
"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
Expand All @@ -33,7 +34,7 @@ func TestClearConfigsSameNsSamePrefix(t *testing.T) {
logger,
&operator.DefaultArguments,
KindServiceMonitor,
labelstore.New(logger),
labelstore.New(logger, prometheus.DefaultRegisterer),
)

m.discoveryManager = newMockDiscoveryManager()
Expand Down Expand Up @@ -98,7 +99,7 @@ func TestClearConfigsProbe(t *testing.T) {
logger,
&operator.DefaultArguments,
KindProbe,
labelstore.New(logger),
labelstore.New(logger, prometheus.DefaultRegisterer),
)

m.discoveryManager = newMockDiscoveryManager()
Expand Down
4 changes: 2 additions & 2 deletions component/prometheus/receive_http/receive_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func testAppendable(actualSamples chan testSample) []storage.Appendable {
return ref, nil
}

ls := labelstore.New(nil)
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
return []storage.Appendable{agentprom.NewInterceptor(
nil,
ls,
Expand Down Expand Up @@ -385,7 +385,7 @@ func testOptions(t *testing.T) component.Options {
Logger: util.TestFlowLogger(t),
Registerer: prometheus.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
return labelstore.New(nil), nil
return labelstore.New(nil, prometheus.DefaultRegisterer), nil
},
}
}
Expand Down
12 changes: 6 additions & 6 deletions component/prometheus/relabel/relabel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func TestCache(t *testing.T) {
lc := labelstore.New(nil)
lc := labelstore.New(nil, prom.DefaultRegisterer)
relabeller := generateRelabel(t)
lbls := labels.FromStrings("__address__", "localhost")
relabeller.relabel(0, lbls)
Expand All @@ -50,7 +50,7 @@ func TestUpdateReset(t *testing.T) {
}

func TestNil(t *testing.T) {
ls := labelstore.New(nil)
ls := labelstore.New(nil, prom.DefaultRegisterer)
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
require.True(t, false)
return ref, nil
Expand All @@ -61,7 +61,7 @@ func TestNil(t *testing.T) {
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
return labelstore.New(nil), nil
return labelstore.New(nil, prom.DefaultRegisterer), nil
},
}, Arguments{
ForwardTo: []storage.Appendable{fanout},
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestLRUNaN(t *testing.T) {
}

func BenchmarkCache(b *testing.B) {
ls := labelstore.New(nil)
ls := labelstore.New(nil, prom.DefaultRegisterer)
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
require.True(b, l.Has("new_label"))
return ref, nil
Expand Down Expand Up @@ -137,7 +137,7 @@ func BenchmarkCache(b *testing.B) {
}

func generateRelabel(t *testing.T) *Component {
ls := labelstore.New(nil)
ls := labelstore.New(nil, prom.DefaultRegisterer)
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
require.True(t, l.Has("new_label"))
return ref, nil
Expand All @@ -148,7 +148,7 @@ func generateRelabel(t *testing.T) *Component {
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
return labelstore.New(nil), nil
return labelstore.New(nil, prom.DefaultRegisterer), nil
},
}, Arguments{
ForwardTo: []storage.Appendable{fanout},
Expand Down
6 changes: 3 additions & 3 deletions component/prometheus/scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestForwardingToAppendable(t *testing.T) {
case cluster.ServiceName:
return cluster.Mock(), nil
case labelstore.ServiceName:
return labelstore.New(nil), nil
return labelstore.New(nil, prometheus_client.DefaultRegisterer), nil
default:
return nil, fmt.Errorf("service %q does not exist", name)
}
Expand All @@ -114,7 +114,7 @@ func TestForwardingToAppendable(t *testing.T) {
// Update the component with a mock receiver; it should be passed along to the Appendable.
var receivedTs int64
var receivedSamples labels.Labels
ls := labelstore.New(nil)
ls := labelstore.New(nil, prometheus_client.DefaultRegisterer)
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, t int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
receivedTs = t
receivedSamples = l
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestCustomDialer(t *testing.T) {
case cluster.ServiceName:
return cluster.Mock(), nil
case labelstore.ServiceName:
return labelstore.New(nil), nil
return labelstore.New(nil, prometheus_client.DefaultRegisterer), nil

default:
return nil, fmt.Errorf("service %q does not exist", name)
Expand Down
3 changes: 2 additions & 1 deletion converter/internal/test_common/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
cluster_service "github.com/grafana/agent/service/cluster"
http_service "github.com/grafana/agent/service/http"
"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -194,7 +195,7 @@ func attemptLoadingFlowConfig(t *testing.T, river []byte) {
// properly.
http_service.New(http_service.Options{}),
clusterService,
labelstore.New(nil),
labelstore.New(nil, prometheus.DefaultRegisterer),
},
})
err = f.LoadSource(cfg, nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/flow/componenttest/componenttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) (
GetServiceData: func(name string) (interface{}, error) {
switch name {
case labelstore.ServiceName:
return labelstore.New(nil), nil
return labelstore.New(nil, prometheus.DefaultRegisterer), nil
default:
return nil, fmt.Errorf("no service named %s defined", name)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/flow/module_caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
http_service "github.com/grafana/agent/service/http"
"github.com/grafana/agent/service/labelstore"
otel_service "github.com/grafana/agent/service/otel"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

Expand Down Expand Up @@ -164,7 +165,7 @@ func testOptions(t *testing.T) flow.Options {
http_service.New(http_service.Options{}),
clusterService,
otelService,
labelstore.New(nil),
labelstore.New(nil, prometheus.DefaultRegisterer),
},
}
}
Expand Down
73 changes: 56 additions & 17 deletions service/labelstore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/agent/pkg/flow/logging/level"
agent_service "github.com/grafana/agent/service"
flow_service "github.com/grafana/agent/service"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
)

const ServiceName = "labelstore"

type service struct {
log log.Logger
mut sync.Mutex
globalRefID uint64
mappings map[string]*remoteWriteMapping
labelsHashToGlobal map[uint64]uint64
staleGlobals map[uint64]*staleMarker
log log.Logger
mut sync.Mutex
globalRefID uint64
mappings map[string]*remoteWriteMapping
labelsHashToGlobal map[uint64]uint64
staleGlobals map[uint64]*staleMarker
totalIDs *prometheus.Desc
idsInRemoteWrapping *prometheus.Desc
Comment on lines +25 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these be constants in the package?

lastStaleCheck prometheus.Gauge
}

type staleMarker struct {
globalID uint64
lastMarkedStale time.Time
Expand All @@ -32,17 +36,26 @@ type Arguments struct{}

var _ flow_service.Service = (*service)(nil)

func New(l log.Logger) *service {
func New(l log.Logger, r prometheus.Registerer) *service {
if l == nil {
l = log.NewNopLogger()
}
return &service{
log: l,
globalRefID: 0,
mappings: make(map[string]*remoteWriteMapping),
labelsHashToGlobal: make(map[uint64]uint64),
staleGlobals: make(map[uint64]*staleMarker),
s := &service{
log: l,
globalRefID: 0,
mappings: make(map[string]*remoteWriteMapping),
labelsHashToGlobal: make(map[uint64]uint64),
staleGlobals: make(map[uint64]*staleMarker),
totalIDs: prometheus.NewDesc("agent_labelstore_global_ids_count", "Total number of global ids.", nil, nil),
idsInRemoteWrapping: prometheus.NewDesc("agent_labelstore_remote_store_ids_count", "Total number of ids per remote write", []string{"remote_name"}, nil),
lastStaleCheck: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "agent_labelstore_last_stale_check_timestamp",
Help: "Last time stale check was ran expressed in unix timestamp.",
}),
}
_ = r.Register(s.lastStaleCheck)
_ = r.Register(s)
return s
}

// Definition returns the Definition of the Service.
Expand All @@ -56,12 +69,33 @@ func (s *service) Definition() agent_service.Definition {
}
}

func (s *service) Describe(m chan<- *prometheus.Desc) {
m <- s.totalIDs
m <- s.idsInRemoteWrapping
}
func (s *service) Collect(m chan<- prometheus.Metric) {
s.mut.Lock()
defer s.mut.Unlock()

m <- prometheus.MustNewConstMetric(s.totalIDs, prometheus.GaugeValue, float64(len(s.labelsHashToGlobal)))
for name, rw := range s.mappings {
m <- prometheus.MustNewConstMetric(s.idsInRemoteWrapping, prometheus.GaugeValue, float64(len(rw.globalToLocal)), name)
}
}

// Run starts a Service. Run must block until the provided
// context is canceled. Returning an error should be treated
// as a fatal error for the Service.
func (s *service) Run(ctx context.Context, host agent_service.Host) error {
<-ctx.Done()
return nil
staleCheck := time.NewTicker(10 * time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need to do this now, but do you think there may be a need to configure this in the future?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since staleness markers arent configurable and this was meant to be double the staleness marker length I dont think it will need to be configurable. Though that is famous last words.

for {
select {
case <-ctx.Done():
return nil
case <-staleCheck.C:
s.CheckAndRemoveStaleMarkers()
}
}
}

// Update updates a Service at runtime. Update is never
Expand All @@ -72,7 +106,7 @@ func (s *service) Run(ctx context.Context, host agent_service.Host) error {
//
// Update will be called once before Run, and may be called
// while Run is active.
func (s *service) Update(newConfig any) error {
func (s *service) Update(_ any) error {
return nil
}

Expand Down Expand Up @@ -190,6 +224,8 @@ func (s *service) CheckAndRemoveStaleMarkers() {
s.mut.Lock()
defer s.mut.Unlock()

s.lastStaleCheck.Set(float64(time.Now().Unix()))
level.Debug(s.log).Log("msg", "labelstore removing stale markers")
curr := time.Now()
idsToBeGCed := make([]*staleMarker, 0)
for _, stale := range s.staleGlobals {
Expand All @@ -199,6 +235,9 @@ func (s *service) CheckAndRemoveStaleMarkers() {
}
idsToBeGCed = append(idsToBeGCed, stale)
}

level.Debug(s.log).Log("msg", "number of ids to remove", "count", len(idsToBeGCed))

for _, marker := range idsToBeGCed {
delete(s.staleGlobals, marker.globalID)
delete(s.labelsHashToGlobal, marker.labelHash)
Expand Down
Loading