From c875d832012f8d34724b96b9d268020ad0148c8c Mon Sep 17 00:00:00 2001 From: zhanghonghaobj Date: Thu, 18 Apr 2024 11:40:59 +0800 Subject: [PATCH 1/8] feat (processor/k8sattributes): wait for synced when starting --- .chloggen/k8sattributes-block.yaml | 27 +++++++++++++++++ .../k8sattributesprocessor/client_test.go | 5 ++-- .../internal/kube/client.go | 30 +++++++++++++------ .../internal/kube/client_test.go | 2 +- .../internal/kube/fake_informer.go | 4 +-- .../internal/kube/kube.go | 2 +- processor/k8sattributesprocessor/processor.go | 6 +++- 7 files changed, 60 insertions(+), 16 deletions(-) create mode 100644 .chloggen/k8sattributes-block.yaml diff --git a/.chloggen/k8sattributes-block.yaml b/.chloggen/k8sattributes-block.yaml new file mode 100644 index 000000000000..f07469326c4d --- /dev/null +++ b/.chloggen/k8sattributes-block.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/k8sattributes + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Block when starting util the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index 2a893e52790c..78903fbeeda1 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -70,10 +70,11 @@ func (f *fakeClient) GetNode(nodeName string) (*kube.Node, bool) { } // Start is a noop for FakeClient. -func (f *fakeClient) Start() { +func (f *fakeClient) Start() error { if f.Informer != nil { - f.Informer.Run(f.StopCh) + go f.Informer.Run(f.StopCh) } + return nil } // Stop is a noop for FakeClient. diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 9624fb250b22..f45861043930 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -5,6 +5,7 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" + "errors" "fmt" "regexp" "strings" @@ -189,50 +190,61 @@ func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules Extr } // Start registers pod event handlers and starts watching the kubernetes cluster for pod changes. -func (c *WatchClient) Start() { - _, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ +func (c *WatchClient) Start() error { + synced := make([]cache.InformerSynced, 0) + reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handlePodAdd, UpdateFunc: c.handlePodUpdate, DeleteFunc: c.handlePodDelete, }) if err != nil { - c.logger.Error("error adding event handler to pod informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.informer.Run(c.stopCh) - _, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNamespaceAdd, UpdateFunc: c.handleNamespaceUpdate, DeleteFunc: c.handleNamespaceDelete, }) if err != nil { - c.logger.Error("error adding event handler to namespace informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.namespaceInformer.Run(c.stopCh) if c.Rules.DeploymentName || c.Rules.DeploymentUID { - _, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleReplicaSetAdd, UpdateFunc: c.handleReplicaSetUpdate, DeleteFunc: c.handleReplicaSetDelete, }) if err != nil { - c.logger.Error("error adding event handler to replicaset informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.replicasetInformer.Run(c.stopCh) } if c.nodeInformer != nil { - _, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNodeAdd, UpdateFunc: c.handleNodeUpdate, DeleteFunc: c.handleNodeDelete, }) if err != nil { - c.logger.Error("error adding event handler to node informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.nodeInformer.Run(c.stopCh) } + + if !cache.WaitForCacheSync(c.stopCh, synced...) { + return errors.New("failed to wait for caches to sync") + } + + return nil } // Stop signals the the k8s watcher/informer to stop watching for new events. diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index 7f6b85d195c7..d4cfc6e33a4e 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -180,7 +180,7 @@ func TestClientStartStop(t *testing.T) { done := make(chan struct{}) assert.False(t, fctr.HasStopped()) go func() { - c.Start() + assert.NoError(t, c.Start()) close(done) }() c.Stop() diff --git a/processor/k8sattributesprocessor/internal/kube/fake_informer.go b/processor/k8sattributesprocessor/internal/kube/fake_informer.go index fc5f479a6ddd..15d63c4e9c0c 100644 --- a/processor/k8sattributesprocessor/internal/kube/fake_informer.go +++ b/processor/k8sattributesprocessor/internal/kube/fake_informer.go @@ -40,7 +40,7 @@ func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cach } func (f *FakeInformer) AddEventHandlerWithResyncPeriod(_ cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) { - return nil, nil + return f, nil } func (f *FakeInformer) RemoveEventHandler(_ cache.ResourceEventHandlerRegistration) error { @@ -165,7 +165,7 @@ func (f *NoOpInformer) AddEventHandler(handler cache.ResourceEventHandler) (cach } func (f *NoOpInformer) AddEventHandlerWithResyncPeriod(_ cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) { - return nil, nil + return f, nil } func (f *NoOpInformer) RemoveEventHandler(_ cache.ResourceEventHandlerRegistration) error { diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 6145cb972235..fc4b775edd00 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -91,7 +91,7 @@ type Client interface { GetPod(PodIdentifier) (*Pod, bool) GetNamespace(string) (*Namespace, bool) GetNode(string) (*Node, bool) - Start() + Start() error Stop() } diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 5d63cbbd100a..ff4d30a91b16 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -73,7 +73,11 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err } } if !kp.passthroughMode { - go kp.kc.Start() + err := kp.kc.Start() + if err != nil { + kp.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) + return nil + } } return nil } From fd9a89943c2a478a0e2476660962caa01fd35adc Mon Sep 17 00:00:00 2001 From: zhanghonghaobj Date: Fri, 7 Jun 2024 20:52:24 +0800 Subject: [PATCH 2/8] feat (processor/k8sattributes): add timeout for waiting. --- .chloggen/k8sattributes-block.yaml | 2 +- processor/k8sattributesprocessor/README.md | 15 +++++ .../k8sattributesprocessor/client_test.go | 4 +- processor/k8sattributesprocessor/config.go | 7 +++ .../k8sattributesprocessor/config_test.go | 5 ++ processor/k8sattributesprocessor/factory.go | 7 +++ .../generated_component_test.go | 44 +------------- .../internal/kube/client.go | 60 +++++++++++-------- .../internal/kube/client_test.go | 21 ++----- .../internal/kube/kube.go | 2 +- .../metadata/generated_resource_test.go | 44 +++++++------- .../k8sattributesprocessor/metadata.yaml | 1 + processor/k8sattributesprocessor/options.go | 17 ++++++ processor/k8sattributesprocessor/processor.go | 35 ++++++----- .../k8sattributesprocessor/processor_test.go | 2 +- 15 files changed, 142 insertions(+), 124 deletions(-) diff --git a/.chloggen/k8sattributes-block.yaml b/.chloggen/k8sattributes-block.yaml index f07469326c4d..0bef1e56c018 100644 --- a/.chloggen/k8sattributes-block.yaml +++ b/.chloggen/k8sattributes-block.yaml @@ -10,7 +10,7 @@ component: processor/k8sattributes note: Block when starting util the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started. # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [] +issues: [32556] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. diff --git a/processor/k8sattributesprocessor/README.md b/processor/k8sattributesprocessor/README.md index cdb82bc759aa..0a39b2e2daeb 100644 --- a/processor/k8sattributesprocessor/README.md +++ b/processor/k8sattributesprocessor/README.md @@ -198,6 +198,21 @@ the processor associates the received trace to the pod, based on the connection } ``` +By default, the processor will be ready as soon as it starts, even no metadata has been fetched yet. +If data is sent to this processor before the metadata is synced, there will be no metadata to enrich the data with. + +To wait for the metadata to be synced before the processor is ready, set the `wait_for_metadata` option to `true`. +Then the processor will not be ready until the metadata has been synced. +If a timeout is reached, the processor will fail to start and return an error, which will cause the collector to exit. +The timeout defaults to 10s and can be configured with the `metadata_sync_timeout` option. + +example for setting the processor to wait for metadata to be synced before it is ready: + +```yaml +wait_for_metadata: true +wait_for_metadata_timeout: 10s +``` + ## Extracting attributes from pod labels and annotations The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods, namespaces and nodes. diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index 78903fbeeda1..c69e0638301a 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -4,6 +4,8 @@ package k8sattributesprocessor import ( + "time" + "go.opentelemetry.io/collector/component" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -35,7 +37,7 @@ func selectors() (labels.Selector, fields.Selector) { } // newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type -func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) { +func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) { cs := fake.NewSimpleClientset() ls, fs := selectors() diff --git a/processor/k8sattributesprocessor/config.go b/processor/k8sattributesprocessor/config.go index 27b49cef5d63..e5651a087bf4 100644 --- a/processor/k8sattributesprocessor/config.go +++ b/processor/k8sattributesprocessor/config.go @@ -6,6 +6,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr import ( "fmt" "regexp" + "time" "go.opentelemetry.io/collector/featuregate" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" @@ -46,6 +47,12 @@ type Config struct { // Exclude section allows to define names of pod that should be // ignored while tagging. Exclude ExcludeConfig `mapstructure:"exclude"` + + // WaitForMetadata is a flag that determines if the processor should wait k8s metadata to be synced when starting. + WaitForMetadata bool `mapstructure:"wait_for_metadata"` + + // WaitForMetadataTimeout is the maximum time the processor will wait for the k8s metadata to be synced. + WaitForMetadataTimeout time.Duration `mapstructure:"wait_for_metadata_timeout"` } func (cfg *Config) Validate() error { diff --git a/processor/k8sattributesprocessor/config_test.go b/processor/k8sattributesprocessor/config_test.go index 78826108016b..9510df99a3b7 100644 --- a/processor/k8sattributesprocessor/config_test.go +++ b/processor/k8sattributesprocessor/config_test.go @@ -6,6 +6,7 @@ package k8sattributesprocessor import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -34,6 +35,7 @@ func TestLoadConfig(t *testing.T) { Extract: ExtractConfig{ Metadata: enabledAttributes(), }, + WaitForMetadataTimeout: 10 * time.Second, }, }, { @@ -105,6 +107,7 @@ func TestLoadConfig(t *testing.T) { {Name: "jaeger-collector"}, }, }, + WaitForMetadataTimeout: 10 * time.Second, }, }, { @@ -127,6 +130,7 @@ func TestLoadConfig(t *testing.T) { {Name: "jaeger-collector"}, }, }, + WaitForMetadataTimeout: 10 * time.Second, }, }, { @@ -149,6 +153,7 @@ func TestLoadConfig(t *testing.T) { {Name: "jaeger-collector"}, }, }, + WaitForMetadataTimeout: 10 * time.Second, }, }, { diff --git a/processor/k8sattributesprocessor/factory.go b/processor/k8sattributesprocessor/factory.go index 2c3eb39b2914..56d27a2b5102 100644 --- a/processor/k8sattributesprocessor/factory.go +++ b/processor/k8sattributesprocessor/factory.go @@ -5,6 +5,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr import ( "context" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -44,6 +45,7 @@ func createDefaultConfig() component.Config { Extract: ExtractConfig{ Metadata: enabledAttributes(), }, + WaitForMetadataTimeout: 10 * time.Second, } } @@ -202,5 +204,10 @@ func createProcessorOpts(cfg component.Config) []option { opts = append(opts, withExcludes(oCfg.Exclude)) + opts = append(opts, withWaitForMetadataTimeout(oCfg.WaitForMetadataTimeout)) + if oCfg.WaitForMetadata { + opts = append(opts, withWaitForMetadata(true)) + } + return opts } diff --git a/processor/k8sattributesprocessor/generated_component_test.go b/processor/k8sattributesprocessor/generated_component_test.go index 13918bbe7795..4371090d46c3 100644 --- a/processor/k8sattributesprocessor/generated_component_test.go +++ b/processor/k8sattributesprocessor/generated_component_test.go @@ -65,47 +65,9 @@ func TestComponentLifecycle(t *testing.T) { require.NoError(t, err) require.NoError(t, sub.Unmarshal(&cfg)) - for _, tt := range tests { - t.Run(tt.name+"-shutdown", func(t *testing.T) { - c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg) - require.NoError(t, err) - err = c.Shutdown(context.Background()) - require.NoError(t, err) - }) - t.Run(tt.name+"-lifecycle", func(t *testing.T) { - c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg) - require.NoError(t, err) - host := componenttest.NewNopHost() - err = c.Start(context.Background(), host) - require.NoError(t, err) - require.NotPanics(t, func() { - switch tt.name { - case "logs": - e, ok := c.(processor.Logs) - require.True(t, ok) - logs := generateLifecycleTestLogs() - if !e.Capabilities().MutatesData { - logs.MarkReadOnly() - } - err = e.ConsumeLogs(context.Background(), logs) - case "metrics": - e, ok := c.(processor.Metrics) - require.True(t, ok) - metrics := generateLifecycleTestMetrics() - if !e.Capabilities().MutatesData { - metrics.MarkReadOnly() - } - err = e.ConsumeMetrics(context.Background(), metrics) - case "traces": - e, ok := c.(processor.Traces) - require.True(t, ok) - traces := generateLifecycleTestTraces() - if !e.Capabilities().MutatesData { - traces.MarkReadOnly() - } - err = e.ConsumeTraces(context.Background(), traces) - } - }) + for _, test := range tests { + t.Run(test.name+"-shutdown", func(t *testing.T) { + c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index f45861043930..bd3e351e3937 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -40,18 +40,20 @@ var enableRFC3339Timestamp = featuregate.GlobalRegistry().MustRegister( // WatchClient is the main interface provided by this package to a kubernetes cluster. type WatchClient struct { - m sync.RWMutex - deleteMut sync.Mutex - logger *zap.Logger - kc kubernetes.Interface - informer cache.SharedInformer - namespaceInformer cache.SharedInformer - nodeInformer cache.SharedInformer - replicasetInformer cache.SharedInformer - replicasetRegex *regexp.Regexp - cronJobRegex *regexp.Regexp - deleteQueue []deleteRequest - stopCh chan struct{} + m sync.RWMutex + deleteMut sync.Mutex + logger *zap.Logger + kc kubernetes.Interface + informer cache.SharedInformer + namespaceInformer cache.SharedInformer + nodeInformer cache.SharedInformer + replicasetInformer cache.SharedInformer + replicasetRegex *regexp.Regexp + cronJobRegex *regexp.Regexp + deleteQueue []deleteRequest + stopCh chan struct{} + waitForMetadata bool + waitForMetadataTimeout time.Duration // A map containing Pod related data, used to associate them with resources. // Key can be either an IP address or Pod UID @@ -85,21 +87,23 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`) var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`) // New initializes a new k8s Client. -func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) { +func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet, waitForMetadata bool, waitForMetadataTimeout time.Duration) (Client, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set) if err != nil { return nil, err } c := &WatchClient{ - logger: set.Logger, - Rules: rules, - Filters: filters, - Associations: associations, - Exclude: exclude, - replicasetRegex: rRegex, - cronJobRegex: cronJobRegex, - stopCh: make(chan struct{}), - telemetryBuilder: telemetryBuilder, + logger: set.Logger, + Rules: rules, + Filters: filters, + Associations: associations, + Exclude: exclude, + replicasetRegex: rRegex, + cronJobRegex: cronJobRegex, + stopCh: make(chan struct{}), + telemetryBuilder: telemetryBuilder, + waitForMetadata: waitForMetadata, + waitForMetadataTimeout: waitForMetadataTimeout, } go c.deleteLoop(time.Second*30, defaultPodDeleteGracePeriod) @@ -240,10 +244,16 @@ func (c *WatchClient) Start() error { go c.nodeInformer.Run(c.stopCh) } - if !cache.WaitForCacheSync(c.stopCh, synced...) { - return errors.New("failed to wait for caches to sync") + if c.waitForMetadata { + timeoutCh := make(chan struct{}) + t := time.AfterFunc(c.waitForMetadataTimeout, func() { + close(timeoutCh) + }) + defer t.Stop() + if !cache.WaitForCacheSync(timeoutCh, synced...) { + return errors.New("failed to wait for caches to sync") + } } - return nil } diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index d4cfc6e33a4e..60593e57a4a1 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -143,29 +143,18 @@ func nodeAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj any)) { } func TestDefaultClientset(t *testing.T) { - c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil) + c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil, false, 10*time.Second) assert.Error(t, err) assert.Equal(t, "invalid authType for kubernetes: ", err.Error()) assert.Nil(t, c) - c, err = New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil) + c, err = New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil, false, 10*time.Second) assert.NoError(t, err) assert.NotNil(t, c) } func TestBadFilters(t *testing.T) { - c, err := New( - componenttest.NewNopTelemetrySettings(), - k8sconfig.APIConfig{}, - ExtractionRules{}, - Filters{Fields: []FieldFilter{{Op: selection.Exists}}}, - []Association{}, - Excludes{}, - newFakeAPIClientset, - NewFakeInformer, - NewFakeNamespaceInformer, - NewFakeReplicaSetInformer, - ) + c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{Fields: []FieldFilter{{Op: selection.Exists}}}, []Association{}, Excludes{}, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer, false, 10*time.Second) assert.Error(t, err) assert.Nil(t, c) } @@ -201,7 +190,7 @@ func TestConstructorErrors(t *testing.T) { gotAPIConfig = c return nil, fmt.Errorf("error creating k8s client") } - c, err := New(componenttest.NewNopTelemetrySettings(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil) + c, err := New(componenttest.NewNopTelemetrySettings(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil, false, 10*time.Second) assert.Nil(t, c) assert.Error(t, err) assert.Equal(t, "error creating k8s client", err.Error()) @@ -1923,7 +1912,7 @@ func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *o }, }, } - c, err := New(set, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer) + c, err := New(set, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer, false, 10*time.Second) require.NoError(t, err) return c.(*WatchClient), logs } diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index fc4b775edd00..9faeee2452dd 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -96,7 +96,7 @@ type Client interface { } // ClientProvider defines a func type that returns a new Client. -type ClientProvider func(component.TelemetrySettings, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet) (Client, error) +type ClientProvider func(component.TelemetrySettings, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet, bool, time.Duration) (Client, error) // APIClientsetProvider defines a func type that initializes and return a new kubernetes // Clientset object. diff --git a/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go b/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go index 774643d4677f..02486fcc796a 100644 --- a/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go +++ b/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go @@ -9,9 +9,9 @@ import ( ) func TestResourceBuilder(t *testing.T) { - for _, tt := range []string{"default", "all_set", "none_set"} { - t.Run(tt, func(t *testing.T) { - cfg := loadResourceAttributesConfig(t, tt) + for _, test := range []string{"default", "all_set", "none_set"} { + t.Run(test, func(t *testing.T) { + cfg := loadResourceAttributesConfig(t, test) rb := NewResourceBuilder(cfg) rb.SetContainerID("container.id-val") rb.SetContainerImageName("container.image.name-val") @@ -42,7 +42,7 @@ func TestResourceBuilder(t *testing.T) { res := rb.Emit() assert.Equal(t, 0, rb.Emit().Attributes().Len()) // Second call should return empty Resource - switch tt { + switch test { case "default": assert.Equal(t, 8, res.Attributes().Len()) case "all_set": @@ -51,11 +51,11 @@ func TestResourceBuilder(t *testing.T) { assert.Equal(t, 0, res.Attributes().Len()) return default: - assert.Failf(t, "unexpected test case: %s", tt) + assert.Failf(t, "unexpected test case: %s", test) } val, ok := res.Attributes().Get("container.id") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "container.id-val", val.Str()) } @@ -65,7 +65,7 @@ func TestResourceBuilder(t *testing.T) { assert.EqualValues(t, "container.image.name-val", val.Str()) } val, ok = res.Attributes().Get("container.image.repo_digests") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, []any{"container.image.repo_digests-item1", "container.image.repo_digests-item2"}, val.Slice().AsRaw()) } @@ -75,27 +75,27 @@ func TestResourceBuilder(t *testing.T) { assert.EqualValues(t, "container.image.tag-val", val.Str()) } val, ok = res.Attributes().Get("k8s.cluster.uid") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.cluster.uid-val", val.Str()) } val, ok = res.Attributes().Get("k8s.container.name") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.container.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.cronjob.name") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.cronjob.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.daemonset.name") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.daemonset.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.daemonset.uid") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.daemonset.uid-val", val.Str()) } @@ -105,17 +105,17 @@ func TestResourceBuilder(t *testing.T) { assert.EqualValues(t, "k8s.deployment.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.deployment.uid") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.deployment.uid-val", val.Str()) } val, ok = res.Attributes().Get("k8s.job.name") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.job.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.job.uid") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.job.uid-val", val.Str()) } @@ -130,17 +130,17 @@ func TestResourceBuilder(t *testing.T) { assert.EqualValues(t, "k8s.node.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.node.uid") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.node.uid-val", val.Str()) } val, ok = res.Attributes().Get("k8s.pod.hostname") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.pod.hostname-val", val.Str()) } val, ok = res.Attributes().Get("k8s.pod.ip") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.pod.ip-val", val.Str()) } @@ -160,22 +160,22 @@ func TestResourceBuilder(t *testing.T) { assert.EqualValues(t, "k8s.pod.uid-val", val.Str()) } val, ok = res.Attributes().Get("k8s.replicaset.name") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.replicaset.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.replicaset.uid") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.replicaset.uid-val", val.Str()) } val, ok = res.Attributes().Get("k8s.statefulset.name") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.statefulset.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.statefulset.uid") - assert.Equal(t, tt == "all_set", ok) + assert.Equal(t, test == "all_set", ok) if ok { assert.EqualValues(t, "k8s.statefulset.uid-val", val.Str()) } diff --git a/processor/k8sattributesprocessor/metadata.yaml b/processor/k8sattributesprocessor/metadata.yaml index a388cfcb3dfd..edfb8bbc414d 100644 --- a/processor/k8sattributesprocessor/metadata.yaml +++ b/processor/k8sattributesprocessor/metadata.yaml @@ -114,6 +114,7 @@ resource_attributes: tests: config: + skip_lifecycle: true goleak: skip: true diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index ec39cdb9b827..4ccccb0d4638 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "regexp" + "time" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "k8s.io/apimachinery/pkg/selection" @@ -381,3 +382,19 @@ func withExcludes(podExclude ExcludeConfig) option { return nil } } + +// withWaitForMetadata allows specifying whether to wait for pod metadata to be synced. +func withWaitForMetadata(wait bool) option { + return func(p *kubernetesprocessor) error { + p.waitForMetadata = wait + return nil + } +} + +// withWaitForMetadataTimeout allows specifying the timeout for waiting for pod metadata to be synced. +func withWaitForMetadataTimeout(timeout time.Duration) option { + return func(p *kubernetesprocessor) error { + p.waitForMetadataTimeout = timeout + return nil + } +} diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index ff4d30a91b16..98499f5e5473 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "strconv" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" @@ -27,17 +28,19 @@ const ( ) type kubernetesprocessor struct { - cfg component.Config - options []option - telemetrySettings component.TelemetrySettings - logger *zap.Logger - apiConfig k8sconfig.APIConfig - kc kube.Client - passthroughMode bool - rules kube.ExtractionRules - filters kube.Filters - podAssociations []kube.Association - podIgnore kube.Excludes + cfg component.Config + options []option + telemetrySettings component.TelemetrySettings + logger *zap.Logger + apiConfig k8sconfig.APIConfig + kc kube.Client + passthroughMode bool + rules kube.ExtractionRules + filters kube.Filters + podAssociations []kube.Association + podIgnore kube.Excludes + waitForMetadata bool + waitForMetadataTimeout time.Duration } func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, kubeClient kube.ClientProvider) error { @@ -45,7 +48,7 @@ func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, k kubeClient = kube.New } if !kp.passthroughMode { - kc, err := kubeClient(set, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil) + kc, err := kubeClient(set, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil, kp.waitForMetadata, kp.waitForMetadataTimeout) if err != nil { return err } @@ -60,7 +63,7 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err for _, opt := range allOptions { if err := opt(kp); err != nil { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) - return nil + return err } } @@ -69,14 +72,14 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err err := kp.initKubeClient(kp.telemetrySettings, kubeClientProvider) if err != nil { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) - return nil + return err } } if !kp.passthroughMode { err := kp.kc.Start() if err != nil { - kp.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) - return nil + componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) + return err } } return nil diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index b8cbf1ec66b0..790c612549b2 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -266,7 +266,7 @@ func TestNewProcessor(t *testing.T) { } func TestProcessorBadClientProvider(t *testing.T) { - clientProvider := func(_ component.TelemetrySettings, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) { + clientProvider := func(_ component.TelemetrySettings, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) { return nil, fmt.Errorf("bad client error") } From 6b983f5225cefaa6599c16a5794423e72c740d73 Mon Sep 17 00:00:00 2001 From: zhanghonghaobj Date: Tue, 15 Oct 2024 21:03:49 +0800 Subject: [PATCH 3/8] feat: regenerate --- .../generated_component_test.go | 6 +-- .../metadata/generated_resource_test.go | 44 +++++++++---------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/processor/k8sattributesprocessor/generated_component_test.go b/processor/k8sattributesprocessor/generated_component_test.go index 4371090d46c3..38c00260ae43 100644 --- a/processor/k8sattributesprocessor/generated_component_test.go +++ b/processor/k8sattributesprocessor/generated_component_test.go @@ -65,9 +65,9 @@ func TestComponentLifecycle(t *testing.T) { require.NoError(t, err) require.NoError(t, sub.Unmarshal(&cfg)) - for _, test := range tests { - t.Run(test.name+"-shutdown", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + for _, tt := range tests { + t.Run(tt.name+"-shutdown", func(t *testing.T) { + c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) diff --git a/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go b/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go index 02486fcc796a..774643d4677f 100644 --- a/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go +++ b/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go @@ -9,9 +9,9 @@ import ( ) func TestResourceBuilder(t *testing.T) { - for _, test := range []string{"default", "all_set", "none_set"} { - t.Run(test, func(t *testing.T) { - cfg := loadResourceAttributesConfig(t, test) + for _, tt := range []string{"default", "all_set", "none_set"} { + t.Run(tt, func(t *testing.T) { + cfg := loadResourceAttributesConfig(t, tt) rb := NewResourceBuilder(cfg) rb.SetContainerID("container.id-val") rb.SetContainerImageName("container.image.name-val") @@ -42,7 +42,7 @@ func TestResourceBuilder(t *testing.T) { res := rb.Emit() assert.Equal(t, 0, rb.Emit().Attributes().Len()) // Second call should return empty Resource - switch test { + switch tt { case "default": assert.Equal(t, 8, res.Attributes().Len()) case "all_set": @@ -51,11 +51,11 @@ func TestResourceBuilder(t *testing.T) { assert.Equal(t, 0, res.Attributes().Len()) return default: - assert.Failf(t, "unexpected test case: %s", test) + assert.Failf(t, "unexpected test case: %s", tt) } val, ok := res.Attributes().Get("container.id") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "container.id-val", val.Str()) } @@ -65,7 +65,7 @@ func TestResourceBuilder(t *testing.T) { assert.EqualValues(t, "container.image.name-val", val.Str()) } val, ok = res.Attributes().Get("container.image.repo_digests") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, []any{"container.image.repo_digests-item1", "container.image.repo_digests-item2"}, val.Slice().AsRaw()) } @@ -75,27 +75,27 @@ func TestResourceBuilder(t *testing.T) { assert.EqualValues(t, "container.image.tag-val", val.Str()) } val, ok = res.Attributes().Get("k8s.cluster.uid") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.cluster.uid-val", val.Str()) } val, ok = res.Attributes().Get("k8s.container.name") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.container.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.cronjob.name") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.cronjob.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.daemonset.name") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.daemonset.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.daemonset.uid") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.daemonset.uid-val", val.Str()) } @@ -105,17 +105,17 @@ func TestResourceBuilder(t *testing.T) { assert.EqualValues(t, "k8s.deployment.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.deployment.uid") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.deployment.uid-val", val.Str()) } val, ok = res.Attributes().Get("k8s.job.name") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.job.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.job.uid") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.job.uid-val", val.Str()) } @@ -130,17 +130,17 @@ func TestResourceBuilder(t *testing.T) { assert.EqualValues(t, "k8s.node.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.node.uid") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.node.uid-val", val.Str()) } val, ok = res.Attributes().Get("k8s.pod.hostname") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.pod.hostname-val", val.Str()) } val, ok = res.Attributes().Get("k8s.pod.ip") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.pod.ip-val", val.Str()) } @@ -160,22 +160,22 @@ func TestResourceBuilder(t *testing.T) { assert.EqualValues(t, "k8s.pod.uid-val", val.Str()) } val, ok = res.Attributes().Get("k8s.replicaset.name") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.replicaset.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.replicaset.uid") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.replicaset.uid-val", val.Str()) } val, ok = res.Attributes().Get("k8s.statefulset.name") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.statefulset.name-val", val.Str()) } val, ok = res.Attributes().Get("k8s.statefulset.uid") - assert.Equal(t, test == "all_set", ok) + assert.Equal(t, tt == "all_set", ok) if ok { assert.EqualValues(t, "k8s.statefulset.uid-val", val.Str()) } From 53327aa90a6362c2e8ab202bcd3616f2dd4ce7b5 Mon Sep 17 00:00:00 2001 From: zhanghonghaobj Date: Tue, 29 Oct 2024 20:39:22 +0800 Subject: [PATCH 4/8] test: add unit tests: wait for metadata --- .../internal/kube/client_test.go | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index 60593e57a4a1..8ba21de46993 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -18,6 +18,8 @@ import ( apps_v1 "k8s.io/api/apps/v1" api_v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -1920,3 +1922,56 @@ func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *o func newTestClient(t *testing.T) (*WatchClient, *observer.ObservedLogs) { return newTestClientWithRulesAndFilters(t, Filters{}) } + +type neverSyncedFakeClient struct { + cache.SharedInformer +} + +type neverSyncedResourceEventHandlerRegistration struct { + cache.ResourceEventHandlerRegistration +} + +func (n *neverSyncedResourceEventHandlerRegistration) HasSynced() bool { + return false +} + +func (n *neverSyncedFakeClient) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + delegate, err := n.SharedInformer.AddEventHandler(handler) + if err != nil { + return nil, err + } + return &neverSyncedResourceEventHandlerRegistration{ResourceEventHandlerRegistration: delegate}, nil +} + +func TestWaitForMetadata(t *testing.T) { + testCases := []struct { + name string + informerProvider InformerProvider + err bool + }{{ + name: "no wait", + informerProvider: NewFakeInformer, + err: false, + }, { + name: "wait but never synced", + informerProvider: func(client kubernetes.Interface, namespace string, labelSelector labels.Selector, fieldSelector fields.Selector) cache.SharedInformer { + return &neverSyncedFakeClient{NewFakeInformer(client, namespace, labelSelector, fieldSelector)} + }, + err: true, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, tc.informerProvider, nil, nil, true, 1*time.Second) + require.NoError(t, err) + + err = c.Start() + if tc.err { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } + +} From d3c33a3ce65ddcb117e804c5cd8cfa1cb9c1502f Mon Sep 17 00:00:00 2001 From: Howard Cheung Date: Tue, 5 Nov 2024 19:53:12 +0800 Subject: [PATCH 5/8] docs (processor/k8sattributes): add docs and fix typos. Co-authored-by: Christos Markou --- .chloggen/k8sattributes-block.yaml | 2 +- processor/k8sattributesprocessor/README.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.chloggen/k8sattributes-block.yaml b/.chloggen/k8sattributes-block.yaml index 0bef1e56c018..2b05153af746 100644 --- a/.chloggen/k8sattributes-block.yaml +++ b/.chloggen/k8sattributes-block.yaml @@ -7,7 +7,7 @@ change_type: bug_fix component: processor/k8sattributes # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Block when starting util the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started. +note: Block when starting until the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started. # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [32556] diff --git a/processor/k8sattributesprocessor/README.md b/processor/k8sattributesprocessor/README.md index 0a39b2e2daeb..c55e0863e77a 100644 --- a/processor/k8sattributesprocessor/README.md +++ b/processor/k8sattributesprocessor/README.md @@ -198,11 +198,11 @@ the processor associates the received trace to the pod, based on the connection } ``` -By default, the processor will be ready as soon as it starts, even no metadata has been fetched yet. +By default, the processor will be ready as soon as it starts, even if no metadata has been fetched yet. If data is sent to this processor before the metadata is synced, there will be no metadata to enrich the data with. To wait for the metadata to be synced before the processor is ready, set the `wait_for_metadata` option to `true`. -Then the processor will not be ready until the metadata has been synced. +Then the processor will not be ready until the metadata is fully synced. As a result, the start-up of the Collector tool will be blocked. If the metadata cannot be synced, the Collector tool will ultimately fail to start. If a timeout is reached, the processor will fail to start and return an error, which will cause the collector to exit. The timeout defaults to 10s and can be configured with the `metadata_sync_timeout` option. From 6bc7014c662a30fed6d7564bb71896ddc9132b8b Mon Sep 17 00:00:00 2001 From: zhanghonghaobj Date: Tue, 5 Nov 2024 20:03:30 +0800 Subject: [PATCH 6/8] refactor (processor/k8sattributes): split declaration to make diffs easier. --- .../internal/kube/client.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index bd3e351e3937..bf8746b3e8ef 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -87,7 +87,20 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`) var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`) // New initializes a new k8s Client. -func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet, waitForMetadata bool, waitForMetadataTimeout time.Duration) (Client, error) { +func New( + set component.TelemetrySettings, + apiCfg k8sconfig.APIConfig, + rules ExtractionRules, + filters Filters, + associations []Association, + exclude Excludes, + newClientSet APIClientsetProvider, + newInformer InformerProvider, + newNamespaceInformer InformerProviderNamespace, + newReplicaSetInformer InformerProviderReplicaSet, + waitForMetadata bool, + waitForMetadataTimeout time.Duration, +) (Client, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set) if err != nil { return nil, err From 7ed0364c0127e1ef728600fd919a4554e2d85bb1 Mon Sep 17 00:00:00 2001 From: Howard Cheung Date: Tue, 5 Nov 2024 22:09:58 +0800 Subject: [PATCH 7/8] docs (processor/k8sattributes): update README. Co-authored-by: Christos Markou --- processor/k8sattributesprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/k8sattributesprocessor/README.md b/processor/k8sattributesprocessor/README.md index c55e0863e77a..12a3e4d32f8a 100644 --- a/processor/k8sattributesprocessor/README.md +++ b/processor/k8sattributesprocessor/README.md @@ -202,7 +202,7 @@ By default, the processor will be ready as soon as it starts, even if no metadat If data is sent to this processor before the metadata is synced, there will be no metadata to enrich the data with. To wait for the metadata to be synced before the processor is ready, set the `wait_for_metadata` option to `true`. -Then the processor will not be ready until the metadata is fully synced. As a result, the start-up of the Collector tool will be blocked. If the metadata cannot be synced, the Collector tool will ultimately fail to start. +Then the processor will not be ready until the metadata is fully synced. As a result, the start-up of the Collector will be blocked. If the metadata cannot be synced, the Collector will ultimately fail to start. If a timeout is reached, the processor will fail to start and return an error, which will cause the collector to exit. The timeout defaults to 10s and can be configured with the `metadata_sync_timeout` option. From 25c4e12d0b029630fb08f06cb344828fe11eaf68 Mon Sep 17 00:00:00 2001 From: zhanghonghaobj Date: Thu, 14 Nov 2024 14:13:22 +0800 Subject: [PATCH 8/8] fix (processor/k8sattributes): lint error --- processor/k8sattributesprocessor/internal/kube/client_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index 8ba21de46993..f5e9cd03bfdc 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -1973,5 +1973,4 @@ func TestWaitForMetadata(t *testing.T) { } }) } - }