diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fbef786b1b1..14cbfa110606 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,8 @@ Main (unreleased) - Grafana Agent Operator: `config-reloader` container no longer runs as root. (@rootmout) +- `loki.source.kubernetes_events` now supports clustering. (@captncraig) + ### Bugfixes - Fixed an issue where `loki.process` validation for stage `metric.counter` was diff --git a/component/loki/source/kubernetes_events/kubernetes_events.go b/component/loki/source/kubernetes_events/kubernetes_events.go index e409f6aebcdc..732911cf05b8 100644 --- a/component/loki/source/kubernetes_events/kubernetes_events.go +++ b/component/loki/source/kubernetes_events/kubernetes_events.go @@ -18,6 +18,8 @@ import ( "github.com/grafana/agent/component/common/loki/positions" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/grafana/agent/pkg/runner" + "github.com/grafana/agent/service/cluster" + "github.com/grafana/ckit/shard" "github.com/oklog/run" "k8s.io/client-go/rest" ) @@ -27,9 +29,9 @@ const informerSyncTimeout = 10 * time.Second func init() { component.Register(component.Registration{ - Name: "loki.source.kubernetes_events", - Args: Arguments{}, - + Name: "loki.source.kubernetes_events", + Args: Arguments{}, + NeedsServices: []string{cluster.ServiceName}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { return New(opts, args.(Arguments)) }, @@ -47,6 +49,8 @@ type Arguments struct { // Client settings to connect to Kubernetes. Client kubernetes.ClientArguments `river:"client,block,optional"` + + Clustering cluster.ComponentBlock `river:"clustering,block,optional"` } // DefaultArguments holds default settings for loki.source.kubernetes_events. @@ -91,6 +95,8 @@ type Component struct { tasksMut sync.RWMutex tasks []eventControllerTask + cluster cluster.Cluster + receiversMut sync.RWMutex receivers []loki.LogsReceiver } @@ -113,6 +119,11 @@ func New(o component.Options, args Arguments) (*Component, error) { if err != nil { return nil, err } + data, err := o.GetServiceData(cluster.ServiceName) + if err != nil { + return nil, fmt.Errorf("failed to get information about cluster: %w", err) + } + clusterData := data.(cluster.Cluster) c := &Component{ log: o.Logger, @@ -122,6 +133,7 @@ func New(o component.Options, args Arguments) (*Component, error) { runner: runner.New(func(t eventControllerTask) runner.Worker { return newEventController(t) }), + cluster: clusterData, newTasksCh: make(chan struct{}, 1), } if err := c.Update(args); err != nil { @@ -150,7 +162,24 @@ func (c *Component) Run(ctx context.Context) error { c.tasksMut.RLock() tasks := c.tasks c.tasksMut.RUnlock() - + if c.args.Clustering.Enabled { + // distribute event tasks by namespace. + // in the default case, there will only be one namespace ("") for "all namespaces". + // That is fine, it just means only one node will send events. + newTasks := make([]eventControllerTask, 0, len(tasks)) + for _, t := range tasks { + peers, err := c.cluster.Lookup(shard.StringKey(t.Namespace), 1, shard.OpReadWrite) + if err != nil { + // This should never happen, but in any case we fall + // back to owning the target ourselves. + newTasks = append(newTasks, t) + } + if peers[0].Self { + newTasks = append(newTasks, t) + } + } + tasks = newTasks + } if err := c.runner.ApplyTasks(ctx, tasks); err != nil { level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err) } @@ -234,6 +263,22 @@ func (c *Component) Update(args component.Arguments) error { return nil } +// NotifyClusterChange implements component.ClusterComponent. +func (c *Component) NotifyClusterChange() { + c.mut.Lock() + defer c.mut.Unlock() + + if !c.args.Clustering.Enabled { + return // no-op + } + + // Schedule a reload so namespaces get redistributed + select { + case c.newTasksCh <- struct{}{}: + default: + } +} + // getNamespaces gets a list of namespaces to watch from the arguments. If the // list of namespaces is empty, returns a slice to watch all namespaces. func getNamespaces(args Arguments) []string { diff --git a/docs/sources/flow/reference/components/loki.source.kubernetes_events.md b/docs/sources/flow/reference/components/loki.source.kubernetes_events.md index 68168544d994..4f7b9841d145 100644 --- a/docs/sources/flow/reference/components/loki.source.kubernetes_events.md +++ b/docs/sources/flow/reference/components/loki.source.kubernetes_events.md @@ -80,6 +80,7 @@ client > authorization | [authorization][] | Configure generic authorization to client > oauth2 | [oauth2][] | Configure OAuth2 for authenticating to the endpoint. | no client > oauth2 > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no client > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no +clustering | [clustering][] | Configure the component for when the Agent is running in clustered mode. | no The `>` symbol indicates deeper levels of nesting. For example, `client > basic_auth` refers to a `basic_auth` block defined @@ -90,6 +91,7 @@ inside a `client` block. [authorization]: #authorization-block [oauth2]: #oauth2-block [tls_config]: #tls_config-block +[clustering]: #clustering-beta ### client block @@ -133,6 +135,25 @@ Name | Type | Description | Default | Required {{< docs/shared lookup="flow/reference/components/tls-config-block.md" source="agent" version="" >}} +### clustering (beta) + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`enabled` | `bool` | Distribute event collection with other cluster nodes. | | yes + +When the agent is [using clustering][], and `enabled` is set to true, then this +`loki.source.kubernetes_events` component instance opts-in to participating in the +cluster to distribute the load of event collection between all cluster nodes. If namespaces +are specified in the arguments, each namespace's events will be watched by a single +member of the cluster. Otherwise, a single node will be chosen to handle all events in order +to avoid duplication. + +If the agent is _not_ running in clustered mode, then the block is a no-op and +`loki.source.kubernetes_events` collects logs from every namespace in its +arguments. + +[using clustering]: {{< relref "../../concepts/clustering.md" >}} + ## Exported fields `loki.source.kubernetes_events` does not export any fields.