Skip to content

Commit

Permalink
[receiver/k8sobjects] Add ability to exclude deleted updates for watc…
Browse files Browse the repository at this point in the history
…hed objects (#26042)

**Description:** 
Adds a new configuration option that allows excluding `DELETED` updates
for watched objects.

**Link to tracking Issue:** 
Closes
#24904

**Testing:** 
New unit tests and tested locally

**Documentation:**
Updated the readme
  • Loading branch information
TylerHelmuth authored Sep 5, 2023
1 parent 5e25c8b commit 930b421
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 9 deletions.
27 changes: 27 additions & 0 deletions .chloggen/k8sobjects-filter-delete.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobjectreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds option to exclude event types (MODIFIED, DELETED, etc) in watch mode.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26042]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions receiver/k8sobjectsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ the K8s API server. This can be one of `none` (for no auth), `serviceAccount`
- `label_selector`: select objects by label(s)
- `field_selector`: select objects by field(s)
- `interval`: the interval at which object is pulled, default 60 minutes. Only useful for `pull` mode.
- `exclude_watch_type`: allows excluding specific watch types. Valid values are `ADDED`, `MODIFIED`, `DELETED`, `BOOKMARK`, and `ERROR`. Only usable in `watch` mode.
- `resource_version` allows watch resources starting from a specific version (default = `1`). Only available for `watch` mode. If not specified, the receiver will do an initial list to get the resourceVersion before starting the watch. See [Efficient Detection of Change](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes) for details on why this is necessary.
- `namespaces`: An array of `namespaces` to collect events from. (default = `all`)
- `group`: API group name. It is an optional config. When given resource object is present in multiple groups,
Expand Down
25 changes: 16 additions & 9 deletions receiver/k8sobjectsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/runtime/schema"
apiWatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"

Expand All @@ -32,15 +33,17 @@ var modeMap = map[mode]bool{
}

type K8sObjectsConfig struct {
Name string `mapstructure:"name"`
Group string `mapstructure:"group"`
Namespaces []string `mapstructure:"namespaces"`
Mode mode `mapstructure:"mode"`
LabelSelector string `mapstructure:"label_selector"`
FieldSelector string `mapstructure:"field_selector"`
Interval time.Duration `mapstructure:"interval"`
ResourceVersion string `mapstructure:"resource_version"`
gvr *schema.GroupVersionResource
Name string `mapstructure:"name"`
Group string `mapstructure:"group"`
Namespaces []string `mapstructure:"namespaces"`
Mode mode `mapstructure:"mode"`
LabelSelector string `mapstructure:"label_selector"`
FieldSelector string `mapstructure:"field_selector"`
Interval time.Duration `mapstructure:"interval"`
ResourceVersion string `mapstructure:"resource_version"`
ExcludeWatchType []apiWatch.EventType `mapstructure:"exclude_watch_type"`
exclude map[apiWatch.EventType]bool
gvr *schema.GroupVersionResource
}

type Config struct {
Expand Down Expand Up @@ -87,6 +90,10 @@ func (c *Config) Validate() error {
object.Interval = defaultPullInterval
}

if object.Mode == PullMode && len(object.ExcludeWatchType) != 0 {
return fmt.Errorf("the Exclude config can only be used with watch mode")
}

object.gvr = gvr
}
return nil
Expand Down
7 changes: 7 additions & 0 deletions receiver/k8sobjectsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
"k8s.io/apimachinery/pkg/runtime/schema"
apiWatch "k8s.io/apimachinery/pkg/watch"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver/internal/metadata"
Expand Down Expand Up @@ -50,6 +51,9 @@ func TestLoadConfig(t *testing.T) {
Namespaces: []string{"default"},
Group: "events.k8s.io",
ResourceVersion: "",
ExcludeWatchType: []apiWatch.EventType{
apiWatch.Deleted,
},
gvr: &schema.GroupVersionResource{
Group: "events.k8s.io",
Version: "v1",
Expand Down Expand Up @@ -130,6 +134,9 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "invalid_resource"),
},
{
id: component.NewIDWithName(metadata.Type, "exclude_deleted_with_pull"),
},
}

for _, tt := range tests {
Expand Down
10 changes: 10 additions & 0 deletions receiver/k8sobjectsreceiver/mock_dynamic_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ func (c mockDynamicClient) createPods(objects ...*unstructured.Unstructured) {
}
}

func (c mockDynamicClient) deletePods(objects ...*unstructured.Unstructured) {
pods := c.client.Resource(schema.GroupVersionResource{
Version: "v1",
Resource: "pods",
})
for _, pod := range objects {
_ = pods.Namespace(pod.GetNamespace()).Delete(context.Background(), pod.GetName(), v1.DeleteOptions{})
}
}

func generatePod(name, namespace string, labels map[string]interface{}, resourceVersion string) *unstructured.Unstructured {
pod := unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down
13 changes: 13 additions & 0 deletions receiver/k8sobjectsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ func newReceiver(params receiver.CreateSettings, config *Config, consumer consum
return nil, err
}

for _, object := range config.Objects {
object.exclude = make(map[apiWatch.EventType]bool)
for _, item := range object.ExcludeWatchType {
object.exclude[item] = true
}
}

return &k8sobjectsreceiver{
client: client,
setting: params,
Expand Down Expand Up @@ -172,6 +179,12 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
kr.setting.Logger.Warn("Watch channel closed unexpectedly", zap.String("resource", config.gvr.String()))
return
}

if config.exclude[data.Type] {
kr.setting.Logger.Debug("dropping excluded data", zap.String("type", string(data.Type)))
continue
}

logs, err := watchObjectsToLogData(&data, time.Now(), config)
if err != nil {
kr.setting.Logger.Error("error converting objects to log data", zap.Error(err))
Expand Down
67 changes: 67 additions & 0 deletions receiver/k8sobjectsreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
apiWatch "k8s.io/apimachinery/pkg/watch"
)

func TestNewReceiver(t *testing.T) {
Expand Down Expand Up @@ -136,5 +137,71 @@ func TestWatchObject(t *testing.T) {
assert.Len(t, consumer.Logs(), 2)
assert.Equal(t, 2, consumer.Count())

mockClient.deletePods(
generatePod("pod2", "default", map[string]interface{}{
"environment": "test",
}, "2"),
)
time.Sleep(time.Millisecond * 100)
assert.Len(t, consumer.Logs(), 3)
assert.Equal(t, 3, consumer.Count())

assert.NoError(t, r.Shutdown(ctx))
}

func TestExludeDeletedTrue(t *testing.T) {
t.Parallel()

mockClient := newMockDynamicClient()

mockClient.createPods(
generatePod("pod1", "default", map[string]interface{}{
"environment": "production",
}, "1"),
)

rCfg := createDefaultConfig().(*Config)
rCfg.makeDynamicClient = mockClient.getMockDynamicClient
rCfg.makeDiscoveryClient = getMockDiscoveryClient

rCfg.Objects = []*K8sObjectsConfig{
{
Name: "pods",
Mode: WatchMode,
Namespaces: []string{"default"},
ExcludeWatchType: []apiWatch.EventType{
apiWatch.Deleted,
},
},
}

err := rCfg.Validate()
require.NoError(t, err)

consumer := newMockLogConsumer()
r, err := newReceiver(
receivertest.NewNopCreateSettings(),
rCfg,
consumer,
)

ctx := context.Background()
require.NoError(t, err)
require.NotNil(t, r)
require.NoError(t, r.Start(ctx, componenttest.NewNopHost()))

time.Sleep(time.Millisecond * 100)
assert.Len(t, consumer.Logs(), 0)
assert.Equal(t, 0, consumer.Count())

mockClient.deletePods(
generatePod("pod1", "default", map[string]interface{}{
"environment": "test",
}, "1"),
)
time.Sleep(time.Millisecond * 100)
assert.Len(t, consumer.Logs(), 0)
assert.Equal(t, 0, consumer.Count())

assert.NoError(t, r.Shutdown(ctx))
}
6 changes: 6 additions & 0 deletions receiver/k8sobjectsreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ k8sobjects:
mode: watch
group: events.k8s.io
namespaces: [default]
exclude_watch_type: [DELETED]
k8sobjects/pull_with_resource:
objects:
- name: pods
Expand All @@ -30,3 +31,8 @@ k8sobjects/invalid_resource:
objects:
- name: fake_resource
mode: watch
k8sobjects/exclude_deleted_with_pull:
objects:
- name: events
mode: pull
exclude_watch_type: [DELETED]

0 comments on commit 930b421

Please sign in to comment.