Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (processor/k8sattributes): wait for synced when starting k8sattributes processor. #32622

Merged
merged 8 commits into from
Nov 17, 2024
27 changes: 27 additions & 0 deletions .chloggen/k8sattributes-block.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/k8sattributes

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Block when starting until the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32556]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
15 changes: 15 additions & 0 deletions processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ the processor associates the received trace to the pod, based on the connection
}
```

By default, the processor will be ready as soon as it starts, even if no metadata has been fetched yet.
If data is sent to this processor before the metadata is synced, there will be no metadata to enrich the data with.

To wait for the metadata to be synced before the processor is ready, set the `wait_for_metadata` option to `true`.
Then the processor will not be ready until the metadata is fully synced. As a result, the start-up of the Collector will be blocked. If the metadata cannot be synced, the Collector will ultimately fail to start.
If a timeout is reached, the processor will fail to start and return an error, which will cause the collector to exit.
The timeout defaults to 10s and can be configured with the `metadata_sync_timeout` option.

example for setting the processor to wait for metadata to be synced before it is ready:

```yaml
wait_for_metadata: true
wait_for_metadata_timeout: 10s
```

## Extracting attributes from pod labels and annotations

The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods, namespaces and nodes.
Expand Down
9 changes: 6 additions & 3 deletions processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package k8sattributesprocessor

import (
"time"

"go.opentelemetry.io/collector/component"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -35,7 +37,7 @@ func selectors() (labels.Selector, fields.Selector) {
}

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) {
cs := fake.NewSimpleClientset()

ls, fs := selectors()
Expand Down Expand Up @@ -70,10 +72,11 @@ func (f *fakeClient) GetNode(nodeName string) (*kube.Node, bool) {
}

// Start is a noop for FakeClient.
func (f *fakeClient) Start() {
func (f *fakeClient) Start() error {
if f.Informer != nil {
f.Informer.Run(f.StopCh)
go f.Informer.Run(f.StopCh)
}
return nil
}

// Stop is a noop for FakeClient.
Expand Down
7 changes: 7 additions & 0 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr
import (
"fmt"
"regexp"
"time"

"go.opentelemetry.io/collector/featuregate"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand Down Expand Up @@ -46,6 +47,12 @@ type Config struct {
// Exclude section allows to define names of pod that should be
// ignored while tagging.
Exclude ExcludeConfig `mapstructure:"exclude"`

TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
// WaitForMetadata is a flag that determines if the processor should wait k8s metadata to be synced when starting.
WaitForMetadata bool `mapstructure:"wait_for_metadata"`

// WaitForMetadataTimeout is the maximum time the processor will wait for the k8s metadata to be synced.
WaitForMetadataTimeout time.Duration `mapstructure:"wait_for_metadata_timeout"`
}

func (cfg *Config) Validate() error {
Expand Down
5 changes: 5 additions & 0 deletions processor/k8sattributesprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package k8sattributesprocessor
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -34,6 +35,7 @@ func TestLoadConfig(t *testing.T) {
Extract: ExtractConfig{
Metadata: enabledAttributes(),
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand Down Expand Up @@ -105,6 +107,7 @@ func TestLoadConfig(t *testing.T) {
{Name: "jaeger-collector"},
},
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand All @@ -127,6 +130,7 @@ func TestLoadConfig(t *testing.T) {
{Name: "jaeger-collector"},
},
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand All @@ -149,6 +153,7 @@ func TestLoadConfig(t *testing.T) {
{Name: "jaeger-collector"},
},
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand Down
7 changes: 7 additions & 0 deletions processor/k8sattributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -44,6 +45,7 @@ func createDefaultConfig() component.Config {
Extract: ExtractConfig{
Metadata: enabledAttributes(),
},
WaitForMetadataTimeout: 10 * time.Second,
}
}

Expand Down Expand Up @@ -202,5 +204,10 @@ func createProcessorOpts(cfg component.Config) []option {

opts = append(opts, withExcludes(oCfg.Exclude))

opts = append(opts, withWaitForMetadataTimeout(oCfg.WaitForMetadataTimeout))
if oCfg.WaitForMetadata {
opts = append(opts, withWaitForMetadata(true))
}

return opts
}
38 changes: 0 additions & 38 deletions processor/k8sattributesprocessor/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 66 additions & 31 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri

import (
"context"
"errors"
"fmt"
"regexp"
"strings"
Expand Down Expand Up @@ -39,18 +40,20 @@ var enableRFC3339Timestamp = featuregate.GlobalRegistry().MustRegister(

// WatchClient is the main interface provided by this package to a kubernetes cluster.
type WatchClient struct {
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
nodeInformer cache.SharedInformer
replicasetInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
deleteQueue []deleteRequest
stopCh chan struct{}
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
nodeInformer cache.SharedInformer
replicasetInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
deleteQueue []deleteRequest
stopCh chan struct{}
waitForMetadata bool
waitForMetadataTimeout time.Duration

// A map containing Pod related data, used to associate them with resources.
// Key can be either an IP address or Pod UID
Expand Down Expand Up @@ -84,21 +87,36 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`)
var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)

// New initializes a new k8s Client.
func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) {
func New(
set component.TelemetrySettings,
apiCfg k8sconfig.APIConfig,
rules ExtractionRules,
filters Filters,
associations []Association,
exclude Excludes,
newClientSet APIClientsetProvider,
newInformer InformerProvider,
newNamespaceInformer InformerProviderNamespace,
newReplicaSetInformer InformerProviderReplicaSet,
waitForMetadata bool,
waitForMetadataTimeout time.Duration,
) (Client, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
if err != nil {
return nil, err
}
c := &WatchClient{
logger: set.Logger,
Rules: rules,
Filters: filters,
Associations: associations,
Exclude: exclude,
replicasetRegex: rRegex,
cronJobRegex: cronJobRegex,
stopCh: make(chan struct{}),
telemetryBuilder: telemetryBuilder,
logger: set.Logger,
Rules: rules,
Filters: filters,
Associations: associations,
Exclude: exclude,
replicasetRegex: rRegex,
cronJobRegex: cronJobRegex,
stopCh: make(chan struct{}),
telemetryBuilder: telemetryBuilder,
waitForMetadata: waitForMetadata,
waitForMetadataTimeout: waitForMetadataTimeout,
}
go c.deleteLoop(time.Second*30, defaultPodDeleteGracePeriod)

Expand Down Expand Up @@ -189,50 +207,67 @@ func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules Extr
}

// Start registers pod event handlers and starts watching the kubernetes cluster for pod changes.
func (c *WatchClient) Start() {
_, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
func (c *WatchClient) Start() error {
synced := make([]cache.InformerSynced, 0)
reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePodAdd,
UpdateFunc: c.handlePodUpdate,
DeleteFunc: c.handlePodDelete,
})
if err != nil {
c.logger.Error("error adding event handler to pod informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.informer.Run(c.stopCh)

_, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleNamespaceAdd,
UpdateFunc: c.handleNamespaceUpdate,
DeleteFunc: c.handleNamespaceDelete,
})
if err != nil {
c.logger.Error("error adding event handler to namespace informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.namespaceInformer.Run(c.stopCh)

if c.Rules.DeploymentName || c.Rules.DeploymentUID {
_, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
reg, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleReplicaSetAdd,
UpdateFunc: c.handleReplicaSetUpdate,
DeleteFunc: c.handleReplicaSetDelete,
})
if err != nil {
c.logger.Error("error adding event handler to replicaset informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.replicasetInformer.Run(c.stopCh)
}

if c.nodeInformer != nil {
_, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleNodeAdd,
UpdateFunc: c.handleNodeUpdate,
DeleteFunc: c.handleNodeDelete,
})
if err != nil {
c.logger.Error("error adding event handler to node informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.nodeInformer.Run(c.stopCh)
}

if c.waitForMetadata {
timeoutCh := make(chan struct{})
t := time.AfterFunc(c.waitForMetadataTimeout, func() {
close(timeoutCh)
})
defer t.Stop()
if !cache.WaitForCacheSync(timeoutCh, synced...) {
return errors.New("failed to wait for caches to sync")
}
}
return nil
}

// Stop signals the the k8s watcher/informer to stop watching for new events.
Expand Down
Loading