Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple clustering support for loki.source.kubernetes_events #5725

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ Main (unreleased)
- Grafana Agent Operator: `config-reloader` container no longer runs as root.
(@rootmout)

- `loki.source.kubernetes_events` now supports clustering. (@captncraig)

### Bugfixes

- Fixed an issue where `loki.process` validation for stage `metric.counter` was
Expand Down
53 changes: 49 additions & 4 deletions component/loki/source/kubernetes_events/kubernetes_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/grafana/agent/component/common/loki/positions"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/pkg/runner"
"github.com/grafana/agent/service/cluster"
"github.com/grafana/ckit/shard"
"github.com/oklog/run"
"k8s.io/client-go/rest"
)
Expand All @@ -27,9 +29,9 @@ const informerSyncTimeout = 10 * time.Second

func init() {
component.Register(component.Registration{
Name: "loki.source.kubernetes_events",
Args: Arguments{},

Name: "loki.source.kubernetes_events",
Args: Arguments{},
NeedsServices: []string{cluster.ServiceName},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
Expand All @@ -47,6 +49,8 @@ type Arguments struct {

// Client settings to connect to Kubernetes.
Client kubernetes.ClientArguments `river:"client,block,optional"`

Clustering cluster.ComponentBlock `river:"clustering,block,optional"`
}

// DefaultArguments holds default settings for loki.source.kubernetes_events.
Expand Down Expand Up @@ -91,6 +95,8 @@ type Component struct {
tasksMut sync.RWMutex
tasks []eventControllerTask

cluster cluster.Cluster

receiversMut sync.RWMutex
receivers []loki.LogsReceiver
}
Expand All @@ -113,6 +119,11 @@ func New(o component.Options, args Arguments) (*Component, error) {
if err != nil {
return nil, err
}
data, err := o.GetServiceData(cluster.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about cluster: %w", err)
}
clusterData := data.(cluster.Cluster)

c := &Component{
log: o.Logger,
Expand All @@ -122,6 +133,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
runner: runner.New(func(t eventControllerTask) runner.Worker {
return newEventController(t)
}),
cluster: clusterData,
newTasksCh: make(chan struct{}, 1),
}
if err := c.Update(args); err != nil {
Expand Down Expand Up @@ -150,7 +162,24 @@ func (c *Component) Run(ctx context.Context) error {
c.tasksMut.RLock()
tasks := c.tasks
c.tasksMut.RUnlock()

if c.args.Clustering.Enabled {
// distribute event tasks by namespace.
// in the default case, there will only be one namespace ("") for "all namespaces".
// That is fine, it just means only one node will send events.
newTasks := make([]eventControllerTask, 0, len(tasks))
for _, t := range tasks {
peers, err := c.cluster.Lookup(shard.StringKey(t.Namespace), 1, shard.OpReadWrite)
if err != nil {
// This should never happen, but in any case we fall
// back to owning the target ourselves.
newTasks = append(newTasks, t)
}
if peers[0].Self {
newTasks = append(newTasks, t)
}
}
tasks = newTasks
}
if err := c.runner.ApplyTasks(ctx, tasks); err != nil {
level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err)
}
Expand Down Expand Up @@ -234,6 +263,22 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

// NotifyClusterChange implements component.ClusterComponent.
func (c *Component) NotifyClusterChange() {
c.mut.Lock()
defer c.mut.Unlock()

if !c.args.Clustering.Enabled {
return // no-op
}

// Schedule a reload so namespaces get redistributed
select {
case c.newTasksCh <- struct{}{}:
default:
}
}

// getNamespaces gets a list of namespaces to watch from the arguments. If the
// list of namespaces is empty, returns a slice to watch all namespaces.
func getNamespaces(args Arguments) []string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ client > authorization | [authorization][] | Configure generic authorization to
client > oauth2 | [oauth2][] | Configure OAuth2 for authenticating to the endpoint. | no
client > oauth2 > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no
client > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no
clustering | [clustering][] | Configure the component for when the Agent is running in clustered mode. | no

The `>` symbol indicates deeper levels of nesting. For example, `client >
basic_auth` refers to a `basic_auth` block defined
Expand All @@ -90,6 +91,7 @@ inside a `client` block.
[authorization]: #authorization-block
[oauth2]: #oauth2-block
[tls_config]: #tls_config-block
[clustering]: #clustering-beta

### client block

Expand Down Expand Up @@ -133,6 +135,25 @@ Name | Type | Description | Default | Required

{{< docs/shared lookup="flow/reference/components/tls-config-block.md" source="agent" version="<AGENT VERSION>" >}}

### clustering (beta)

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`enabled` | `bool` | Distribute event collection with other cluster nodes. | | yes

When the agent is [using clustering][], and `enabled` is set to true, then this
`loki.source.kubernetes_events` component instance opts-in to participating in the
cluster to distribute the load of event collection between all cluster nodes. If namespaces
are specified in the arguments, each namespace's events will be watched by a single
member of the cluster. Otherwise, a single node will be chosen to handle all events in order
to avoid duplication.

If the agent is _not_ running in clustered mode, then the block is a no-op and
`loki.source.kubernetes_events` collects logs from every namespace in its
arguments.

[using clustering]: {{< relref "../../concepts/clustering.md" >}}

## Exported fields

`loki.source.kubernetes_events` does not export any fields.
Expand Down