From a7153e4bb940f4a1d65e01b173a24e236d15c043 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BB=97=20Tr=E1=BB=8Dng=20H=E1=BA=A3i?= <41283691+hainenber@users.noreply.github.com> Date: Tue, 19 Mar 2024 16:03:40 +0700 Subject: [PATCH] feat(rules/k8s): refactor common `rules.k8s` logic into `common/kubernetes` package (#6592) Signed-off-by: hainenber Co-authored-by: William Dumont --- .../{loki/rules => common}/kubernetes/diff.go | 38 ++--- .../rules => common}/kubernetes/diff_test.go | 32 ++-- internal/component/common/kubernetes/event.go | 61 +++++++ internal/component/common/kubernetes/rules.go | 34 ++++ .../component/common/kubernetes/rules_test.go | 13 ++ .../component/loki/rules/kubernetes/events.go | 90 ++-------- .../loki/rules/kubernetes/events_test.go | 7 +- .../component/loki/rules/kubernetes/rules.go | 32 +--- .../loki/rules/kubernetes/rules_test.go | 7 - .../component/loki/rules/kubernetes/types.go | 16 +- .../component/mimir/rules/kubernetes/diff.go | 113 ------------- .../mimir/rules/kubernetes/diff_test.go | 157 ------------------ .../mimir/rules/kubernetes/events.go | 90 ++-------- .../mimir/rules/kubernetes/events_test.go | 7 +- .../component/mimir/rules/kubernetes/rules.go | 32 +--- .../mimir/rules/kubernetes/rules_test.go | 7 - .../component/mimir/rules/kubernetes/types.go | 16 +- internal/flow/logging/logger_test.go | 2 +- 18 files changed, 210 insertions(+), 544 deletions(-) rename internal/component/{loki/rules => common}/kubernetes/diff.go (68%) rename internal/component/{loki/rules => common}/kubernetes/diff_test.go (83%) create mode 100644 internal/component/common/kubernetes/event.go create mode 100644 internal/component/common/kubernetes/rules.go create mode 100644 internal/component/common/kubernetes/rules_test.go delete mode 100644 internal/component/mimir/rules/kubernetes/diff.go delete mode 100644 internal/component/mimir/rules/kubernetes/diff_test.go diff --git a/internal/component/loki/rules/kubernetes/diff.go b/internal/component/common/kubernetes/diff.go similarity index 68% rename from internal/component/loki/rules/kubernetes/diff.go rename to internal/component/common/kubernetes/diff.go index 34c74ed62e37..5a1ab8d8d7ff 100644 --- a/internal/component/loki/rules/kubernetes/diff.go +++ b/internal/component/common/kubernetes/diff.go @@ -1,4 +1,4 @@ -package rules +package kubernetes import ( "bytes" @@ -7,27 +7,27 @@ import ( "gopkg.in/yaml.v3" // Used for prometheus rulefmt compatibility instead of gopkg.in/yaml.v2 ) -type ruleGroupDiffKind string +type RuleGroupDiffKind string const ( - ruleGroupDiffKindAdd ruleGroupDiffKind = "add" - ruleGroupDiffKindRemove ruleGroupDiffKind = "remove" - ruleGroupDiffKindUpdate ruleGroupDiffKind = "update" + RuleGroupDiffKindAdd RuleGroupDiffKind = "add" + RuleGroupDiffKindRemove RuleGroupDiffKind = "remove" + RuleGroupDiffKindUpdate RuleGroupDiffKind = "update" ) -type ruleGroupDiff struct { - Kind ruleGroupDiffKind +type RuleGroupDiff struct { + Kind RuleGroupDiffKind Actual rulefmt.RuleGroup Desired rulefmt.RuleGroup } -type ruleGroupsByNamespace map[string][]rulefmt.RuleGroup -type ruleGroupDiffsByNamespace map[string][]ruleGroupDiff +type RuleGroupsByNamespace map[string][]rulefmt.RuleGroup +type RuleGroupDiffsByNamespace map[string][]RuleGroupDiff -func diffRuleState(desired, actual ruleGroupsByNamespace) ruleGroupDiffsByNamespace { +func DiffRuleState(desired, actual RuleGroupsByNamespace) RuleGroupDiffsByNamespace { seenNamespaces := map[string]bool{} - diff := make(ruleGroupDiffsByNamespace) + diff := make(RuleGroupDiffsByNamespace) for namespace, desiredRuleGroups := range desired { seenNamespaces[namespace] = true @@ -55,8 +55,8 @@ func diffRuleState(desired, actual ruleGroupsByNamespace) ruleGroupDiffsByNamesp return diff } -func diffRuleNamespaceState(desired []rulefmt.RuleGroup, actual []rulefmt.RuleGroup) []ruleGroupDiff { - var diff []ruleGroupDiff +func diffRuleNamespaceState(desired []rulefmt.RuleGroup, actual []rulefmt.RuleGroup) []RuleGroupDiff { + var diff []RuleGroupDiff seenGroups := map[string]bool{} @@ -70,8 +70,8 @@ desiredGroups: continue desiredGroups } - diff = append(diff, ruleGroupDiff{ - Kind: ruleGroupDiffKindUpdate, + diff = append(diff, RuleGroupDiff{ + Kind: RuleGroupDiffKindUpdate, Actual: actualRuleGroup, Desired: desiredRuleGroup, }) @@ -79,8 +79,8 @@ desiredGroups: } } - diff = append(diff, ruleGroupDiff{ - Kind: ruleGroupDiffKindAdd, + diff = append(diff, RuleGroupDiff{ + Kind: RuleGroupDiffKindAdd, Desired: desiredRuleGroup, }) } @@ -90,8 +90,8 @@ desiredGroups: continue } - diff = append(diff, ruleGroupDiff{ - Kind: ruleGroupDiffKindRemove, + diff = append(diff, RuleGroupDiff{ + Kind: RuleGroupDiffKindRemove, Actual: actualRuleGroup, }) } diff --git a/internal/component/loki/rules/kubernetes/diff_test.go b/internal/component/common/kubernetes/diff_test.go similarity index 83% rename from internal/component/loki/rules/kubernetes/diff_test.go rename to internal/component/common/kubernetes/diff_test.go index e52ae13288d7..7b22e963cfd3 100644 --- a/internal/component/loki/rules/kubernetes/diff_test.go +++ b/internal/component/common/kubernetes/diff_test.go @@ -1,4 +1,4 @@ -package rules +package kubernetes import ( "fmt" @@ -42,7 +42,7 @@ groups: name string desired map[string][]rulefmt.RuleGroup actual map[string][]rulefmt.RuleGroup - expected map[string][]ruleGroupDiff + expected map[string][]RuleGroupDiff } testCases := []testCase{ @@ -50,7 +50,7 @@ groups: name: "empty sets", desired: map[string][]rulefmt.RuleGroup{}, actual: map[string][]rulefmt.RuleGroup{}, - expected: map[string][]ruleGroupDiff{}, + expected: map[string][]RuleGroupDiff{}, }, { name: "add rule group", @@ -58,10 +58,10 @@ groups: managedNamespace: ruleGroupsA, }, actual: map[string][]rulefmt.RuleGroup{}, - expected: map[string][]ruleGroupDiff{ + expected: map[string][]RuleGroupDiff{ managedNamespace: { { - Kind: ruleGroupDiffKindAdd, + Kind: RuleGroupDiffKindAdd, Desired: ruleGroupsA[0], }, }, @@ -73,10 +73,10 @@ groups: actual: map[string][]rulefmt.RuleGroup{ managedNamespace: ruleGroupsA, }, - expected: map[string][]ruleGroupDiff{ + expected: map[string][]RuleGroupDiff{ managedNamespace: { { - Kind: ruleGroupDiffKindRemove, + Kind: RuleGroupDiffKindRemove, Actual: ruleGroupsA[0], }, }, @@ -90,10 +90,10 @@ groups: actual: map[string][]rulefmt.RuleGroup{ managedNamespace: ruleGroupsAModified, }, - expected: map[string][]ruleGroupDiff{ + expected: map[string][]RuleGroupDiff{ managedNamespace: { { - Kind: ruleGroupDiffKindUpdate, + Kind: RuleGroupDiffKindUpdate, Desired: ruleGroupsA[0], Actual: ruleGroupsAModified[0], }, @@ -108,28 +108,28 @@ groups: actual: map[string][]rulefmt.RuleGroup{ managedNamespace: ruleGroupsA, }, - expected: map[string][]ruleGroupDiff{}, + expected: map[string][]RuleGroupDiff{}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - actual := diffRuleState(tc.desired, tc.actual) + actual := DiffRuleState(tc.desired, tc.actual) requireEqualRuleDiffs(t, tc.expected, actual) }) } } -func requireEqualRuleDiffs(t *testing.T, expected, actual map[string][]ruleGroupDiff) { +func requireEqualRuleDiffs(t *testing.T, expected, actual map[string][]RuleGroupDiff) { require.Equal(t, len(expected), len(actual)) - var summarizeDiff = func(diff ruleGroupDiff) string { + var summarizeDiff = func(diff RuleGroupDiff) string { switch diff.Kind { - case ruleGroupDiffKindAdd: + case RuleGroupDiffKindAdd: return fmt.Sprintf("add: %s", diff.Desired.Name) - case ruleGroupDiffKindRemove: + case RuleGroupDiffKindRemove: return fmt.Sprintf("remove: %s", diff.Actual.Name) - case ruleGroupDiffKindUpdate: + case RuleGroupDiffKindUpdate: return fmt.Sprintf("update: %s", diff.Desired.Name) } panic("unreachable") diff --git a/internal/component/common/kubernetes/event.go b/internal/component/common/kubernetes/event.go new file mode 100644 index 000000000000..6850500582b6 --- /dev/null +++ b/internal/component/common/kubernetes/event.go @@ -0,0 +1,61 @@ +package kubernetes + +import ( + "github.com/go-kit/log" + "github.com/grafana/agent/internal/flow/logging/level" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +// This type must be hashable, so it is kept simple. The indexer will maintain a +// cache of current state, so this is mostly used for logging. +type Event struct { + Typ EventType + ObjectKey string +} + +type EventType string + +const ( + EventTypeResourceChanged EventType = "resource-changed" +) + +type queuedEventHandler struct { + log log.Logger + queue workqueue.RateLimitingInterface +} + +func NewQueuedEventHandler(log log.Logger, queue workqueue.RateLimitingInterface) *queuedEventHandler { + return &queuedEventHandler{ + log: log, + queue: queue, + } +} + +// OnAdd implements the cache.ResourceEventHandler interface. +func (c *queuedEventHandler) OnAdd(obj interface{}, _ bool) { + c.publishEvent(obj) +} + +// OnUpdate implements the cache.ResourceEventHandler interface. +func (c *queuedEventHandler) OnUpdate(oldObj, newObj interface{}) { + c.publishEvent(newObj) +} + +// OnDelete implements the cache.ResourceEventHandler interface. +func (c *queuedEventHandler) OnDelete(obj interface{}) { + c.publishEvent(obj) +} + +func (c *queuedEventHandler) publishEvent(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + level.Error(c.log).Log("msg", "failed to get key for object", "err", err) + return + } + + c.queue.AddRateLimited(Event{ + Typ: EventTypeResourceChanged, + ObjectKey: key, + }) +} diff --git a/internal/component/common/kubernetes/rules.go b/internal/component/common/kubernetes/rules.go new file mode 100644 index 000000000000..c89d9742afe8 --- /dev/null +++ b/internal/component/common/kubernetes/rules.go @@ -0,0 +1,34 @@ +package kubernetes + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +type LabelSelector struct { + MatchLabels map[string]string `river:"match_labels,attr,optional"` + MatchExpressions []MatchExpression `river:"match_expression,block,optional"` +} + +type MatchExpression struct { + Key string `river:"key,attr"` + Operator string `river:"operator,attr"` + Values []string `river:"values,attr,optional"` +} + +func ConvertSelectorToListOptions(selector LabelSelector) (labels.Selector, error) { + matchExpressions := []metav1.LabelSelectorRequirement{} + + for _, me := range selector.MatchExpressions { + matchExpressions = append(matchExpressions, metav1.LabelSelectorRequirement{ + Key: me.Key, + Operator: metav1.LabelSelectorOperator(me.Operator), + Values: me.Values, + }) + } + + return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: selector.MatchLabels, + MatchExpressions: matchExpressions, + }) +} diff --git a/internal/component/common/kubernetes/rules_test.go b/internal/component/common/kubernetes/rules_test.go new file mode 100644 index 000000000000..3994ea36b65e --- /dev/null +++ b/internal/component/common/kubernetes/rules_test.go @@ -0,0 +1,13 @@ +package kubernetes + +import ( + "testing" + + "k8s.io/client-go/util/workqueue" +) + +func TestEventTypeIsHashable(t *testing.T) { + // This test is here to ensure that the EventType type is hashable according to the workqueue implementation + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + queue.AddRateLimited(Event{}) +} diff --git a/internal/component/loki/rules/kubernetes/events.go b/internal/component/loki/rules/kubernetes/events.go index f8f80da31fef..cde73f79cdfc 100644 --- a/internal/component/loki/rules/kubernetes/events.go +++ b/internal/component/loki/rules/kubernetes/events.go @@ -6,69 +6,15 @@ import ( "regexp" "time" - "github.com/go-kit/log" + "github.com/grafana/agent/internal/component/common/kubernetes" "github.com/grafana/agent/internal/flow/logging/level" "github.com/hashicorp/go-multierror" promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "github.com/prometheus/prometheus/model/rulefmt" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" "sigs.k8s.io/yaml" // Used for CRD compatibility instead of gopkg.in/yaml.v2 ) -// This type must be hashable, so it is kept simple. The indexer will maintain a -// cache of current state, so this is mostly used for logging. -type event struct { - typ eventType - objectKey string -} - -type eventType string - -const ( - eventTypeResourceChanged eventType = "resource-changed" - eventTypeSyncLoki eventType = "sync-loki" -) - -type queuedEventHandler struct { - log log.Logger - queue workqueue.RateLimitingInterface -} - -func newQueuedEventHandler(log log.Logger, queue workqueue.RateLimitingInterface) *queuedEventHandler { - return &queuedEventHandler{ - log: log, - queue: queue, - } -} - -// OnAdd implements the cache.ResourceEventHandler interface. -func (c *queuedEventHandler) OnAdd(obj interface{}, _ bool) { - c.publishEvent(obj) -} - -// OnUpdate implements the cache.ResourceEventHandler interface. -func (c *queuedEventHandler) OnUpdate(oldObj, newObj interface{}) { - c.publishEvent(newObj) -} - -// OnDelete implements the cache.ResourceEventHandler interface. -func (c *queuedEventHandler) OnDelete(obj interface{}) { - c.publishEvent(obj) -} - -func (c *queuedEventHandler) publishEvent(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - level.Error(c.log).Log("msg", "failed to get key for object", "err", err) - return - } - - c.queue.AddRateLimited(event{ - typ: eventTypeResourceChanged, - objectKey: key, - }) -} +const eventTypeSyncLoki kubernetes.EventType = "sync-loki" func (c *Component) eventLoop(ctx context.Context) { for { @@ -78,14 +24,14 @@ func (c *Component) eventLoop(ctx context.Context) { return } - evt := eventInterface.(event) - c.metrics.eventsTotal.WithLabelValues(string(evt.typ)).Inc() + evt := eventInterface.(kubernetes.Event) + c.metrics.eventsTotal.WithLabelValues(string(evt.Typ)).Inc() err := c.processEvent(ctx, evt) if err != nil { retries := c.queue.NumRequeues(evt) if retries < 5 { - c.metrics.eventsRetried.WithLabelValues(string(evt.typ)).Inc() + c.metrics.eventsRetried.WithLabelValues(string(evt.Typ)).Inc() c.queue.AddRateLimited(evt) level.Error(c.log).Log( "msg", "failed to process event, will retry", @@ -94,7 +40,7 @@ func (c *Component) eventLoop(ctx context.Context) { ) continue } else { - c.metrics.eventsFailed.WithLabelValues(string(evt.typ)).Inc() + c.metrics.eventsFailed.WithLabelValues(string(evt.Typ)).Inc() level.Error(c.log).Log( "msg", "failed to process event, max retries exceeded", "retries", fmt.Sprintf("%d/5", retries), @@ -110,12 +56,12 @@ func (c *Component) eventLoop(ctx context.Context) { } } -func (c *Component) processEvent(ctx context.Context, e event) error { +func (c *Component) processEvent(ctx context.Context, e kubernetes.Event) error { defer c.queue.Done(e) - switch e.typ { - case eventTypeResourceChanged: - level.Info(c.log).Log("msg", "processing event", "type", e.typ, "key", e.objectKey) + switch e.Typ { + case kubernetes.EventTypeResourceChanged: + level.Info(c.log).Log("msg", "processing event", "type", e.Typ, "key", e.ObjectKey) case eventTypeSyncLoki: level.Debug(c.log).Log("msg", "syncing current state from ruler") err := c.syncLoki(ctx) @@ -123,7 +69,7 @@ func (c *Component) processEvent(ctx context.Context, e event) error { return err } default: - return fmt.Errorf("unknown event type: %s", e.typ) + return fmt.Errorf("unknown event type: %s", e.Typ) } return c.reconcileState(ctx) @@ -156,7 +102,7 @@ func (c *Component) reconcileState(ctx context.Context) error { return err } - diffs := diffRuleState(desiredState, c.currentState) + diffs := kubernetes.DiffRuleState(desiredState, c.currentState) var result error for ns, diff := range diffs { err = c.applyChanges(ctx, ns, diff) @@ -169,13 +115,13 @@ func (c *Component) reconcileState(ctx context.Context) error { return result } -func (c *Component) loadStateFromK8s() (ruleGroupsByNamespace, error) { +func (c *Component) loadStateFromK8s() (kubernetes.RuleGroupsByNamespace, error) { matchedNamespaces, err := c.namespaceLister.List(c.namespaceSelector) if err != nil { return nil, fmt.Errorf("failed to list namespaces: %w", err) } - desiredState := make(ruleGroupsByNamespace) + desiredState := make(kubernetes.RuleGroupsByNamespace) for _, ns := range matchedNamespaces { crdState, err := c.ruleLister.PrometheusRules(ns.Name).List(c.ruleSelector) if err != nil { @@ -213,26 +159,26 @@ func convertCRDRuleGroupToRuleGroup(crd promv1.PrometheusRuleSpec) ([]rulefmt.Ru return groups.Groups, nil } -func (c *Component) applyChanges(ctx context.Context, namespace string, diffs []ruleGroupDiff) error { +func (c *Component) applyChanges(ctx context.Context, namespace string, diffs []kubernetes.RuleGroupDiff) error { if len(diffs) == 0 { return nil } for _, diff := range diffs { switch diff.Kind { - case ruleGroupDiffKindAdd: + case kubernetes.RuleGroupDiffKindAdd: err := c.lokiClient.CreateRuleGroup(ctx, namespace, diff.Desired) if err != nil { return err } level.Info(c.log).Log("msg", "added rule group", "namespace", namespace, "group", diff.Desired.Name) - case ruleGroupDiffKindRemove: + case kubernetes.RuleGroupDiffKindRemove: err := c.lokiClient.DeleteRuleGroup(ctx, namespace, diff.Actual.Name) if err != nil { return err } level.Info(c.log).Log("msg", "removed rule group", "namespace", namespace, "group", diff.Actual.Name) - case ruleGroupDiffKindUpdate: + case kubernetes.RuleGroupDiffKindUpdate: err := c.lokiClient.CreateRuleGroup(ctx, namespace, diff.Desired) if err != nil { return err diff --git a/internal/component/loki/rules/kubernetes/events_test.go b/internal/component/loki/rules/kubernetes/events_test.go index e6ebf800d6f9..8c0b5f928e47 100644 --- a/internal/component/loki/rules/kubernetes/events_test.go +++ b/internal/component/loki/rules/kubernetes/events_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/agent/internal/component/common/kubernetes" lokiClient "github.com/grafana/agent/internal/loki/client" v1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" promListers "github.com/prometheus-operator/prometheus-operator/pkg/client/listers/monitoring/v1" @@ -135,7 +136,7 @@ func TestEventLoop(t *testing.T) { args: Arguments{LokiNameSpacePrefix: "agent"}, metrics: newMetrics(), } - eventHandler := newQueuedEventHandler(component.log, component.queue) + eventHandler := kubernetes.NewQueuedEventHandler(component.log, component.queue) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -153,7 +154,7 @@ func TestEventLoop(t *testing.T) { require.NoError(t, err) return len(rules) == 1 }, time.Second, 10*time.Millisecond) - component.queue.AddRateLimited(event{typ: eventTypeSyncLoki}) + component.queue.AddRateLimited(kubernetes.Event{Typ: eventTypeSyncLoki}) // Update the rule in kubernetes rule.Spec.Groups[0].Rules = append(rule.Spec.Groups[0].Rules, v1.Rule{ @@ -170,7 +171,7 @@ func TestEventLoop(t *testing.T) { rules := allRules[lokiNamespaceForRuleCRD("agent", rule)][0].Rules return len(rules) == 2 }, time.Second, 10*time.Millisecond) - component.queue.AddRateLimited(event{typ: eventTypeSyncLoki}) + component.queue.AddRateLimited(kubernetes.Event{Typ: eventTypeSyncLoki}) // Remove the rule from kubernetes ruleIndexer.Delete(rule) diff --git a/internal/component/loki/rules/kubernetes/rules.go b/internal/component/loki/rules/kubernetes/rules.go index d2cd5ca87df6..fe0cf7b0e8a8 100644 --- a/internal/component/loki/rules/kubernetes/rules.go +++ b/internal/component/loki/rules/kubernetes/rules.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/agent/internal/component" + commonK8s "github.com/grafana/agent/internal/component/common/kubernetes" "github.com/grafana/agent/internal/featuregate" "github.com/grafana/agent/internal/flow/logging/level" lokiClient "github.com/grafana/agent/internal/loki/client" @@ -63,7 +64,7 @@ type Component struct { namespaceSelector labels.Selector ruleSelector labels.Selector - currentState ruleGroupsByNamespace + currentState commonK8s.RuleGroupsByNamespace metrics *metrics healthMut sync.RWMutex @@ -202,8 +203,8 @@ func (c *Component) Run(ctx context.Context) error { c.shutdown() return nil case <-c.ticker.C: - c.queue.Add(event{ - typ: eventTypeSyncLoki, + c.queue.Add(commonK8s.Event{ + Typ: eventTypeSyncLoki, }) } } @@ -274,12 +275,12 @@ func (c *Component) init() error { c.ticker.Reset(c.args.SyncInterval) - c.namespaceSelector, err = convertSelectorToListOptions(c.args.RuleNamespaceSelector) + c.namespaceSelector, err = commonK8s.ConvertSelectorToListOptions(c.args.RuleNamespaceSelector) if err != nil { return err } - c.ruleSelector, err = convertSelectorToListOptions(c.args.RuleSelector) + c.ruleSelector, err = commonK8s.ConvertSelectorToListOptions(c.args.RuleSelector) if err != nil { return err } @@ -287,23 +288,6 @@ func (c *Component) init() error { return nil } -func convertSelectorToListOptions(selector LabelSelector) (labels.Selector, error) { - matchExpressions := []metav1.LabelSelectorRequirement{} - - for _, me := range selector.MatchExpressions { - matchExpressions = append(matchExpressions, metav1.LabelSelectorRequirement{ - Key: me.Key, - Operator: metav1.LabelSelectorOperator(me.Operator), - Values: me.Values, - }) - } - - return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: selector.MatchLabels, - MatchExpressions: matchExpressions, - }) -} - func (c *Component) startNamespaceInformer() error { factory := informers.NewSharedInformerFactoryWithOptions( c.k8sClient, @@ -316,7 +300,7 @@ func (c *Component) startNamespaceInformer() error { namespaces := factory.Core().V1().Namespaces() c.namespaceLister = namespaces.Lister() c.namespaceInformer = namespaces.Informer() - _, err := c.namespaceInformer.AddEventHandler(newQueuedEventHandler(c.log, c.queue)) + _, err := c.namespaceInformer.AddEventHandler(commonK8s.NewQueuedEventHandler(c.log, c.queue)) if err != nil { return err } @@ -338,7 +322,7 @@ func (c *Component) startRuleInformer() error { promRules := factory.Monitoring().V1().PrometheusRules() c.ruleLister = promRules.Lister() c.ruleInformer = promRules.Informer() - _, err := c.ruleInformer.AddEventHandler(newQueuedEventHandler(c.log, c.queue)) + _, err := c.ruleInformer.AddEventHandler(commonK8s.NewQueuedEventHandler(c.log, c.queue)) if err != nil { return err } diff --git a/internal/component/loki/rules/kubernetes/rules_test.go b/internal/component/loki/rules/kubernetes/rules_test.go index 332c8942febe..74ccd4cbebc1 100644 --- a/internal/component/loki/rules/kubernetes/rules_test.go +++ b/internal/component/loki/rules/kubernetes/rules_test.go @@ -5,15 +5,8 @@ import ( "github.com/grafana/river" "github.com/stretchr/testify/require" - "k8s.io/client-go/util/workqueue" ) -func TestEventTypeIsHashable(t *testing.T) { - // This test is here to ensure that the EventType type is hashable according to the workqueue implementation - queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - queue.AddRateLimited(event{}) -} - func TestRiverConfig(t *testing.T) { var exampleRiverConfig = ` address = "GRAFANA_CLOUD_METRICS_URL" diff --git a/internal/component/loki/rules/kubernetes/types.go b/internal/component/loki/rules/kubernetes/types.go index 0e9f0bfedc9e..b98db47196a2 100644 --- a/internal/component/loki/rules/kubernetes/types.go +++ b/internal/component/loki/rules/kubernetes/types.go @@ -5,6 +5,7 @@ import ( "time" "github.com/grafana/agent/internal/component/common/config" + "github.com/grafana/agent/internal/component/common/kubernetes" ) type Arguments struct { @@ -15,8 +16,8 @@ type Arguments struct { SyncInterval time.Duration `river:"sync_interval,attr,optional"` LokiNameSpacePrefix string `river:"loki_namespace_prefix,attr,optional"` - RuleSelector LabelSelector `river:"rule_selector,block,optional"` - RuleNamespaceSelector LabelSelector `river:"rule_namespace_selector,block,optional"` + RuleSelector kubernetes.LabelSelector `river:"rule_selector,block,optional"` + RuleNamespaceSelector kubernetes.LabelSelector `river:"rule_namespace_selector,block,optional"` } var DefaultArguments = Arguments{ @@ -42,14 +43,3 @@ func (args *Arguments) Validate() error { // We must explicitly Validate because HTTPClientConfig is squashed and it won't run otherwise return args.HTTPClientConfig.Validate() } - -type LabelSelector struct { - MatchLabels map[string]string `river:"match_labels,attr,optional"` - MatchExpressions []MatchExpression `river:"match_expression,block,optional"` -} - -type MatchExpression struct { - Key string `river:"key,attr"` - Operator string `river:"operator,attr"` - Values []string `river:"values,attr,optional"` -} diff --git a/internal/component/mimir/rules/kubernetes/diff.go b/internal/component/mimir/rules/kubernetes/diff.go deleted file mode 100644 index 34c74ed62e37..000000000000 --- a/internal/component/mimir/rules/kubernetes/diff.go +++ /dev/null @@ -1,113 +0,0 @@ -package rules - -import ( - "bytes" - - "github.com/prometheus/prometheus/model/rulefmt" - "gopkg.in/yaml.v3" // Used for prometheus rulefmt compatibility instead of gopkg.in/yaml.v2 -) - -type ruleGroupDiffKind string - -const ( - ruleGroupDiffKindAdd ruleGroupDiffKind = "add" - ruleGroupDiffKindRemove ruleGroupDiffKind = "remove" - ruleGroupDiffKindUpdate ruleGroupDiffKind = "update" -) - -type ruleGroupDiff struct { - Kind ruleGroupDiffKind - Actual rulefmt.RuleGroup - Desired rulefmt.RuleGroup -} - -type ruleGroupsByNamespace map[string][]rulefmt.RuleGroup -type ruleGroupDiffsByNamespace map[string][]ruleGroupDiff - -func diffRuleState(desired, actual ruleGroupsByNamespace) ruleGroupDiffsByNamespace { - seenNamespaces := map[string]bool{} - - diff := make(ruleGroupDiffsByNamespace) - - for namespace, desiredRuleGroups := range desired { - seenNamespaces[namespace] = true - - actualRuleGroups := actual[namespace] - subDiff := diffRuleNamespaceState(desiredRuleGroups, actualRuleGroups) - - if len(subDiff) == 0 { - continue - } - - diff[namespace] = subDiff - } - - for namespace, actualRuleGroups := range actual { - if seenNamespaces[namespace] { - continue - } - - subDiff := diffRuleNamespaceState(nil, actualRuleGroups) - - diff[namespace] = subDiff - } - - return diff -} - -func diffRuleNamespaceState(desired []rulefmt.RuleGroup, actual []rulefmt.RuleGroup) []ruleGroupDiff { - var diff []ruleGroupDiff - - seenGroups := map[string]bool{} - -desiredGroups: - for _, desiredRuleGroup := range desired { - seenGroups[desiredRuleGroup.Name] = true - - for _, actualRuleGroup := range actual { - if desiredRuleGroup.Name == actualRuleGroup.Name { - if equalRuleGroups(desiredRuleGroup, actualRuleGroup) { - continue desiredGroups - } - - diff = append(diff, ruleGroupDiff{ - Kind: ruleGroupDiffKindUpdate, - Actual: actualRuleGroup, - Desired: desiredRuleGroup, - }) - continue desiredGroups - } - } - - diff = append(diff, ruleGroupDiff{ - Kind: ruleGroupDiffKindAdd, - Desired: desiredRuleGroup, - }) - } - - for _, actualRuleGroup := range actual { - if seenGroups[actualRuleGroup.Name] { - continue - } - - diff = append(diff, ruleGroupDiff{ - Kind: ruleGroupDiffKindRemove, - Actual: actualRuleGroup, - }) - } - - return diff -} - -func equalRuleGroups(a, b rulefmt.RuleGroup) bool { - aBuf, err := yaml.Marshal(a) - if err != nil { - return false - } - bBuf, err := yaml.Marshal(b) - if err != nil { - return false - } - - return bytes.Equal(aBuf, bBuf) -} diff --git a/internal/component/mimir/rules/kubernetes/diff_test.go b/internal/component/mimir/rules/kubernetes/diff_test.go deleted file mode 100644 index e52ae13288d7..000000000000 --- a/internal/component/mimir/rules/kubernetes/diff_test.go +++ /dev/null @@ -1,157 +0,0 @@ -package rules - -import ( - "fmt" - "testing" - - "github.com/prometheus/prometheus/model/rulefmt" - "github.com/stretchr/testify/require" -) - -func parseRuleGroups(t *testing.T, buf []byte) []rulefmt.RuleGroup { - t.Helper() - - groups, errs := rulefmt.Parse(buf) - require.Empty(t, errs) - - return groups.Groups -} - -func TestDiffRuleState(t *testing.T) { - ruleGroupsA := parseRuleGroups(t, []byte(` -groups: -- name: rule-group-a - interval: 1m - rules: - - record: rule_a - expr: 1 -`)) - - ruleGroupsAModified := parseRuleGroups(t, []byte(` -groups: -- name: rule-group-a - interval: 1m - rules: - - record: rule_a - expr: 3 -`)) - - managedNamespace := "agent/namespace/name/12345678-1234-1234-1234-123456789012" - - type testCase struct { - name string - desired map[string][]rulefmt.RuleGroup - actual map[string][]rulefmt.RuleGroup - expected map[string][]ruleGroupDiff - } - - testCases := []testCase{ - { - name: "empty sets", - desired: map[string][]rulefmt.RuleGroup{}, - actual: map[string][]rulefmt.RuleGroup{}, - expected: map[string][]ruleGroupDiff{}, - }, - { - name: "add rule group", - desired: map[string][]rulefmt.RuleGroup{ - managedNamespace: ruleGroupsA, - }, - actual: map[string][]rulefmt.RuleGroup{}, - expected: map[string][]ruleGroupDiff{ - managedNamespace: { - { - Kind: ruleGroupDiffKindAdd, - Desired: ruleGroupsA[0], - }, - }, - }, - }, - { - name: "remove rule group", - desired: map[string][]rulefmt.RuleGroup{}, - actual: map[string][]rulefmt.RuleGroup{ - managedNamespace: ruleGroupsA, - }, - expected: map[string][]ruleGroupDiff{ - managedNamespace: { - { - Kind: ruleGroupDiffKindRemove, - Actual: ruleGroupsA[0], - }, - }, - }, - }, - { - name: "update rule group", - desired: map[string][]rulefmt.RuleGroup{ - managedNamespace: ruleGroupsA, - }, - actual: map[string][]rulefmt.RuleGroup{ - managedNamespace: ruleGroupsAModified, - }, - expected: map[string][]ruleGroupDiff{ - managedNamespace: { - { - Kind: ruleGroupDiffKindUpdate, - Desired: ruleGroupsA[0], - Actual: ruleGroupsAModified[0], - }, - }, - }, - }, - { - name: "unchanged rule groups", - desired: map[string][]rulefmt.RuleGroup{ - managedNamespace: ruleGroupsA, - }, - actual: map[string][]rulefmt.RuleGroup{ - managedNamespace: ruleGroupsA, - }, - expected: map[string][]ruleGroupDiff{}, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - actual := diffRuleState(tc.desired, tc.actual) - requireEqualRuleDiffs(t, tc.expected, actual) - }) - } -} - -func requireEqualRuleDiffs(t *testing.T, expected, actual map[string][]ruleGroupDiff) { - require.Equal(t, len(expected), len(actual)) - - var summarizeDiff = func(diff ruleGroupDiff) string { - switch diff.Kind { - case ruleGroupDiffKindAdd: - return fmt.Sprintf("add: %s", diff.Desired.Name) - case ruleGroupDiffKindRemove: - return fmt.Sprintf("remove: %s", diff.Actual.Name) - case ruleGroupDiffKindUpdate: - return fmt.Sprintf("update: %s", diff.Desired.Name) - } - panic("unreachable") - } - - for namespace, expectedDiffs := range expected { - actualDiffs, ok := actual[namespace] - require.True(t, ok) - - require.Equal(t, len(expectedDiffs), len(actualDiffs)) - - for i, expectedDiff := range expectedDiffs { - actualDiff := actualDiffs[i] - - if expectedDiff.Kind != actualDiff.Kind || - !equalRuleGroups(expectedDiff.Desired, actualDiff.Desired) || - !equalRuleGroups(expectedDiff.Actual, actualDiff.Actual) { - - t.Logf("expected diff: %s", summarizeDiff(expectedDiff)) - t.Logf("actual diff: %s", summarizeDiff(actualDiff)) - t.Fail() - } - } - } -} diff --git a/internal/component/mimir/rules/kubernetes/events.go b/internal/component/mimir/rules/kubernetes/events.go index ed3ace052386..7752077d9730 100644 --- a/internal/component/mimir/rules/kubernetes/events.go +++ b/internal/component/mimir/rules/kubernetes/events.go @@ -6,69 +6,15 @@ import ( "regexp" "time" - "github.com/go-kit/log" + "github.com/grafana/agent/internal/component/common/kubernetes" "github.com/grafana/agent/internal/flow/logging/level" "github.com/hashicorp/go-multierror" promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "github.com/prometheus/prometheus/model/rulefmt" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" "sigs.k8s.io/yaml" // Used for CRD compatibility instead of gopkg.in/yaml.v2 ) -// This type must be hashable, so it is kept simple. The indexer will maintain a -// cache of current state, so this is mostly used for logging. -type event struct { - typ eventType - objectKey string -} - -type eventType string - -const ( - eventTypeResourceChanged eventType = "resource-changed" - eventTypeSyncMimir eventType = "sync-mimir" -) - -type queuedEventHandler struct { - log log.Logger - queue workqueue.RateLimitingInterface -} - -func newQueuedEventHandler(log log.Logger, queue workqueue.RateLimitingInterface) *queuedEventHandler { - return &queuedEventHandler{ - log: log, - queue: queue, - } -} - -// OnAdd implements the cache.ResourceEventHandler interface. -func (c *queuedEventHandler) OnAdd(obj interface{}, _ bool) { - c.publishEvent(obj) -} - -// OnUpdate implements the cache.ResourceEventHandler interface. -func (c *queuedEventHandler) OnUpdate(oldObj, newObj interface{}) { - c.publishEvent(newObj) -} - -// OnDelete implements the cache.ResourceEventHandler interface. -func (c *queuedEventHandler) OnDelete(obj interface{}) { - c.publishEvent(obj) -} - -func (c *queuedEventHandler) publishEvent(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - level.Error(c.log).Log("msg", "failed to get key for object", "err", err) - return - } - - c.queue.AddRateLimited(event{ - typ: eventTypeResourceChanged, - objectKey: key, - }) -} +const eventTypeSyncMimir kubernetes.EventType = "sync-mimir" func (c *Component) eventLoop(ctx context.Context) { for { @@ -78,14 +24,14 @@ func (c *Component) eventLoop(ctx context.Context) { return } - evt := eventInterface.(event) - c.metrics.eventsTotal.WithLabelValues(string(evt.typ)).Inc() + evt := eventInterface.(kubernetes.Event) + c.metrics.eventsTotal.WithLabelValues(string(evt.Typ)).Inc() err := c.processEvent(ctx, evt) if err != nil { retries := c.queue.NumRequeues(evt) if retries < 5 { - c.metrics.eventsRetried.WithLabelValues(string(evt.typ)).Inc() + c.metrics.eventsRetried.WithLabelValues(string(evt.Typ)).Inc() c.queue.AddRateLimited(evt) level.Error(c.log).Log( "msg", "failed to process event, will retry", @@ -94,7 +40,7 @@ func (c *Component) eventLoop(ctx context.Context) { ) continue } else { - c.metrics.eventsFailed.WithLabelValues(string(evt.typ)).Inc() + c.metrics.eventsFailed.WithLabelValues(string(evt.Typ)).Inc() level.Error(c.log).Log( "msg", "failed to process event, max retries exceeded", "retries", fmt.Sprintf("%d/5", retries), @@ -110,12 +56,12 @@ func (c *Component) eventLoop(ctx context.Context) { } } -func (c *Component) processEvent(ctx context.Context, e event) error { +func (c *Component) processEvent(ctx context.Context, e kubernetes.Event) error { defer c.queue.Done(e) - switch e.typ { - case eventTypeResourceChanged: - level.Info(c.log).Log("msg", "processing event", "type", e.typ, "key", e.objectKey) + switch e.Typ { + case kubernetes.EventTypeResourceChanged: + level.Info(c.log).Log("msg", "processing event", "type", e.Typ, "key", e.ObjectKey) case eventTypeSyncMimir: level.Debug(c.log).Log("msg", "syncing current state from ruler") err := c.syncMimir(ctx) @@ -123,7 +69,7 @@ func (c *Component) processEvent(ctx context.Context, e event) error { return err } default: - return fmt.Errorf("unknown event type: %s", e.typ) + return fmt.Errorf("unknown event type: %s", e.Typ) } return c.reconcileState(ctx) @@ -156,7 +102,7 @@ func (c *Component) reconcileState(ctx context.Context) error { return err } - diffs := diffRuleState(desiredState, c.currentState) + diffs := kubernetes.DiffRuleState(desiredState, c.currentState) var result error for ns, diff := range diffs { err = c.applyChanges(ctx, ns, diff) @@ -169,13 +115,13 @@ func (c *Component) reconcileState(ctx context.Context) error { return result } -func (c *Component) loadStateFromK8s() (ruleGroupsByNamespace, error) { +func (c *Component) loadStateFromK8s() (kubernetes.RuleGroupsByNamespace, error) { matchedNamespaces, err := c.namespaceLister.List(c.namespaceSelector) if err != nil { return nil, fmt.Errorf("failed to list namespaces: %w", err) } - desiredState := make(ruleGroupsByNamespace) + desiredState := make(kubernetes.RuleGroupsByNamespace) for _, ns := range matchedNamespaces { crdState, err := c.ruleLister.PrometheusRules(ns.Name).List(c.ruleSelector) if err != nil { @@ -211,26 +157,26 @@ func convertCRDRuleGroupToRuleGroup(crd promv1.PrometheusRuleSpec) ([]rulefmt.Ru return groups.Groups, nil } -func (c *Component) applyChanges(ctx context.Context, namespace string, diffs []ruleGroupDiff) error { +func (c *Component) applyChanges(ctx context.Context, namespace string, diffs []kubernetes.RuleGroupDiff) error { if len(diffs) == 0 { return nil } for _, diff := range diffs { switch diff.Kind { - case ruleGroupDiffKindAdd: + case kubernetes.RuleGroupDiffKindAdd: err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired) if err != nil { return err } level.Info(c.log).Log("msg", "added rule group", "namespace", namespace, "group", diff.Desired.Name) - case ruleGroupDiffKindRemove: + case kubernetes.RuleGroupDiffKindRemove: err := c.mimirClient.DeleteRuleGroup(ctx, namespace, diff.Actual.Name) if err != nil { return err } level.Info(c.log).Log("msg", "removed rule group", "namespace", namespace, "group", diff.Actual.Name) - case ruleGroupDiffKindUpdate: + case kubernetes.RuleGroupDiffKindUpdate: err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired) if err != nil { return err diff --git a/internal/component/mimir/rules/kubernetes/events_test.go b/internal/component/mimir/rules/kubernetes/events_test.go index 621f3383effb..e177e41bd13f 100644 --- a/internal/component/mimir/rules/kubernetes/events_test.go +++ b/internal/component/mimir/rules/kubernetes/events_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/agent/internal/component/common/kubernetes" mimirClient "github.com/grafana/agent/internal/mimir/client" v1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" promListers "github.com/prometheus-operator/prometheus-operator/pkg/client/listers/monitoring/v1" @@ -135,7 +136,7 @@ func TestEventLoop(t *testing.T) { args: Arguments{MimirNameSpacePrefix: "agent"}, metrics: newMetrics(), } - eventHandler := newQueuedEventHandler(component.log, component.queue) + eventHandler := kubernetes.NewQueuedEventHandler(component.log, component.queue) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -153,7 +154,7 @@ func TestEventLoop(t *testing.T) { require.NoError(t, err) return len(rules) == 1 }, time.Second, 10*time.Millisecond) - component.queue.AddRateLimited(event{typ: eventTypeSyncMimir}) + component.queue.AddRateLimited(kubernetes.Event{Typ: eventTypeSyncMimir}) // Update the rule in kubernetes rule.Spec.Groups[0].Rules = append(rule.Spec.Groups[0].Rules, v1.Rule{ @@ -170,7 +171,7 @@ func TestEventLoop(t *testing.T) { rules := allRules[mimirNamespaceForRuleCRD("agent", rule)][0].Rules return len(rules) == 2 }, time.Second, 10*time.Millisecond) - component.queue.AddRateLimited(event{typ: eventTypeSyncMimir}) + component.queue.AddRateLimited(kubernetes.Event{Typ: eventTypeSyncMimir}) // Remove the rule from kubernetes ruleIndexer.Delete(rule) diff --git a/internal/component/mimir/rules/kubernetes/rules.go b/internal/component/mimir/rules/kubernetes/rules.go index db75be3dee7a..afd28c72fda3 100644 --- a/internal/component/mimir/rules/kubernetes/rules.go +++ b/internal/component/mimir/rules/kubernetes/rules.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/agent/internal/component" + commonK8s "github.com/grafana/agent/internal/component/common/kubernetes" "github.com/grafana/agent/internal/featuregate" "github.com/grafana/agent/internal/flow/logging/level" mimirClient "github.com/grafana/agent/internal/mimir/client" @@ -63,7 +64,7 @@ type Component struct { namespaceSelector labels.Selector ruleSelector labels.Selector - currentState ruleGroupsByNamespace + currentState commonK8s.RuleGroupsByNamespace metrics *metrics healthMut sync.RWMutex @@ -202,8 +203,8 @@ func (c *Component) Run(ctx context.Context) error { c.shutdown() return nil case <-c.ticker.C: - c.queue.Add(event{ - typ: eventTypeSyncMimir, + c.queue.Add(commonK8s.Event{ + Typ: eventTypeSyncMimir, }) } } @@ -275,12 +276,12 @@ func (c *Component) init() error { c.ticker.Reset(c.args.SyncInterval) - c.namespaceSelector, err = convertSelectorToListOptions(c.args.RuleNamespaceSelector) + c.namespaceSelector, err = commonK8s.ConvertSelectorToListOptions(c.args.RuleNamespaceSelector) if err != nil { return err } - c.ruleSelector, err = convertSelectorToListOptions(c.args.RuleSelector) + c.ruleSelector, err = commonK8s.ConvertSelectorToListOptions(c.args.RuleSelector) if err != nil { return err } @@ -288,23 +289,6 @@ func (c *Component) init() error { return nil } -func convertSelectorToListOptions(selector LabelSelector) (labels.Selector, error) { - matchExpressions := []metav1.LabelSelectorRequirement{} - - for _, me := range selector.MatchExpressions { - matchExpressions = append(matchExpressions, metav1.LabelSelectorRequirement{ - Key: me.Key, - Operator: metav1.LabelSelectorOperator(me.Operator), - Values: me.Values, - }) - } - - return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: selector.MatchLabels, - MatchExpressions: matchExpressions, - }) -} - func (c *Component) startNamespaceInformer() error { factory := informers.NewSharedInformerFactoryWithOptions( c.k8sClient, @@ -317,7 +301,7 @@ func (c *Component) startNamespaceInformer() error { namespaces := factory.Core().V1().Namespaces() c.namespaceLister = namespaces.Lister() c.namespaceInformer = namespaces.Informer() - _, err := c.namespaceInformer.AddEventHandler(newQueuedEventHandler(c.log, c.queue)) + _, err := c.namespaceInformer.AddEventHandler(commonK8s.NewQueuedEventHandler(c.log, c.queue)) if err != nil { return err } @@ -339,7 +323,7 @@ func (c *Component) startRuleInformer() error { promRules := factory.Monitoring().V1().PrometheusRules() c.ruleLister = promRules.Lister() c.ruleInformer = promRules.Informer() - _, err := c.ruleInformer.AddEventHandler(newQueuedEventHandler(c.log, c.queue)) + _, err := c.ruleInformer.AddEventHandler(commonK8s.NewQueuedEventHandler(c.log, c.queue)) if err != nil { return err } diff --git a/internal/component/mimir/rules/kubernetes/rules_test.go b/internal/component/mimir/rules/kubernetes/rules_test.go index 332c8942febe..74ccd4cbebc1 100644 --- a/internal/component/mimir/rules/kubernetes/rules_test.go +++ b/internal/component/mimir/rules/kubernetes/rules_test.go @@ -5,15 +5,8 @@ import ( "github.com/grafana/river" "github.com/stretchr/testify/require" - "k8s.io/client-go/util/workqueue" ) -func TestEventTypeIsHashable(t *testing.T) { - // This test is here to ensure that the EventType type is hashable according to the workqueue implementation - queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - queue.AddRateLimited(event{}) -} - func TestRiverConfig(t *testing.T) { var exampleRiverConfig = ` address = "GRAFANA_CLOUD_METRICS_URL" diff --git a/internal/component/mimir/rules/kubernetes/types.go b/internal/component/mimir/rules/kubernetes/types.go index d59265f9c66d..564d6b4f0e67 100644 --- a/internal/component/mimir/rules/kubernetes/types.go +++ b/internal/component/mimir/rules/kubernetes/types.go @@ -5,6 +5,7 @@ import ( "time" "github.com/grafana/agent/internal/component/common/config" + "github.com/grafana/agent/internal/component/common/kubernetes" ) type Arguments struct { @@ -16,8 +17,8 @@ type Arguments struct { SyncInterval time.Duration `river:"sync_interval,attr,optional"` MimirNameSpacePrefix string `river:"mimir_namespace_prefix,attr,optional"` - RuleSelector LabelSelector `river:"rule_selector,block,optional"` - RuleNamespaceSelector LabelSelector `river:"rule_namespace_selector,block,optional"` + RuleSelector kubernetes.LabelSelector `river:"rule_selector,block,optional"` + RuleNamespaceSelector kubernetes.LabelSelector `river:"rule_namespace_selector,block,optional"` } var DefaultArguments = Arguments{ @@ -44,14 +45,3 @@ func (args *Arguments) Validate() error { // We must explicitly Validate because HTTPClientConfig is squashed and it won't run otherwise return args.HTTPClientConfig.Validate() } - -type LabelSelector struct { - MatchLabels map[string]string `river:"match_labels,attr,optional"` - MatchExpressions []MatchExpression `river:"match_expression,block,optional"` -} - -type MatchExpression struct { - Key string `river:"key,attr"` - Operator string `river:"operator,attr"` - Values []string `river:"values,attr,optional"` -} diff --git a/internal/flow/logging/logger_test.go b/internal/flow/logging/logger_test.go index 7b8fb15d76fd..79c84d630738 100644 --- a/internal/flow/logging/logger_test.go +++ b/internal/flow/logging/logger_test.go @@ -18,7 +18,7 @@ import ( ) /* Most recent performance results on M2 Macbook Air: -$ go test -count=1 -benchmem ./pkg/flow/logging -run ^$ -bench BenchmarkLogging_ +$ go test -count=1 -benchmem ./internal/flow/logging -run ^$ -bench BenchmarkLogging_ goos: darwin goarch: arm64 pkg: github.com/grafana/agent/internal/flow/logging