Skip to content

Commit

Permalink
[receiver/k8sobjects] Add logic to properly handle 410 responses (ope…
Browse files Browse the repository at this point in the history
…n-telemetry#26098)

**Description:** 
Updates the receiver to properly handle 410 response code. The
expectations for what clients should do when a 410 is received can be
found here:
https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses.

I originally implemented this feature directly in `startWatch`,
rebuilding the watcher and res channel within the for loop, but I grew
concerned about making sure everything stopped correctly. I took a look
at the retry watcher's implementation and reused its concepts for this
implementation. If it is overcomplicated we can go back to my original
idea.

**Link to tracking Issue:** <Issue number if applicable>

Closes
open-telemetry#24903

**Testing:** <Describe what testing was performed and which tests were
added.>

Tested locally. Unit tests proved to be extremely challenging since I
couldn't figure out how to get the mock to produce a 410. We really need
e2e tests
(open-telemetry#18395).

---------

Co-authored-by: Evan Bradley <[email protected]>
  • Loading branch information
TylerHelmuth and evan-bradley authored Sep 11, 2023
1 parent ca8c1ad commit a4999eb
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 11 deletions.
27 changes: 27 additions & 0 deletions .chloggen/k8sobject-handle-410.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobjectsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds logic to properly handle 410 response codes when watching. This improves the reliability of the receiver.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26098]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 2 additions & 1 deletion receiver/k8sobjectsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ Use the below commands to create a `ClusterRole` with required permissions and a
Following config will work for collecting pods and events only. You need to add
appropriate rule for collecting other objects.

When using watch mode without specifying a `resource_version` you must also specify `list` verb so that the receiver has permission to do its initial list.
When using watch mode you must also specify `list` verb so that the receiver has permission to do its initial list if no
`resource_version` was supplied or a list to recover from [410 Gone scenarios](https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses).

```bash
<<EOF | kubectl apply -f -
Expand Down
50 changes: 40 additions & 10 deletions receiver/k8sobjectsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package k8sobjectsreceiver // import "github.com/open-telemetry/opentelemetry-co
import (
"context"
"fmt"
"net/http"
"sync"
"time"

Expand All @@ -14,7 +15,9 @@ import (
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
apiWatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -148,36 +151,64 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC
}

func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) {

stopperChan := make(chan struct{})
kr.mu.Lock()
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
kr.mu.Unlock()

resourceVersion, err := getResourceVersion(ctx, config, resource)
if err != nil {
kr.setting.Logger.Error("could not retrieve an initial resourceVersion", zap.String("resource", config.gvr.String()), zap.Error(err))
return
}
watchFunc := func(options metav1.ListOptions) (apiWatch.Interface, error) {
options.FieldSelector = config.FieldSelector
options.LabelSelector = config.LabelSelector
return resource.Watch(ctx, options)
}

cancelCtx, cancel := context.WithCancel(ctx)
cfgCopy := *config
wait.UntilWithContext(cancelCtx, func(newCtx context.Context) {
resourceVersion, err := getResourceVersion(newCtx, &cfgCopy, resource)
if err != nil {
kr.setting.Logger.Error("could not retrieve a resourceVersion", zap.String("resource", cfgCopy.gvr.String()), zap.Error(err))
cancel()
return
}

done := kr.doWatch(newCtx, &cfgCopy, resourceVersion, watchFunc, stopperChan)
if done {
cancel()
return
}

// need to restart with a fresh resource version
cfgCopy.ResourceVersion = ""
}, 0)
}

// doWatch returns true when watching is done, false when watching should be restarted.
func (kr *k8sobjectsreceiver) doWatch(ctx context.Context, config *K8sObjectsConfig, resourceVersion string, watchFunc func(options metav1.ListOptions) (apiWatch.Interface, error), stopperChan chan struct{}) bool {
watcher, err := watch.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc})
if err != nil {
kr.setting.Logger.Error("error in watching object", zap.String("resource", config.gvr.String()), zap.Error(err))
return
return true
}

defer watcher.Stop()
res := watcher.ResultChan()
for {
select {
case data, ok := <-res:
if data.Type == apiWatch.Error {
errObject := apierrors.FromObject(data.Object)
// nolint:errorlint
if errObject.(*apierrors.StatusError).ErrStatus.Code == http.StatusGone {
kr.setting.Logger.Info("received a 410, grabbing new resource version", zap.Any("data", data))
// we received a 410 so we need to restart
return false
}
}

if !ok {
kr.setting.Logger.Warn("Watch channel closed unexpectedly", zap.String("resource", config.gvr.String()))
return
return true
}

if config.exclude[data.Type] {
Expand All @@ -195,10 +226,9 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
}
case <-stopperChan:
watcher.Stop()
return
return true
}
}

}

func getResourceVersion(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) (string, error) {
Expand Down

0 comments on commit a4999eb

Please sign in to comment.