From 70b15f730d799995a518ad5db70159a0a1ef1c93 Mon Sep 17 00:00:00 2001 From: morvencao Date: Thu, 10 Oct 2024 08:50:46 +0000 Subject: [PATCH] add cloudevents metrics. Signed-off-by: morvencao --- pkg/cloudevents/generic/agentclient.go | 1 + pkg/cloudevents/generic/agentclient_test.go | 8 +- pkg/cloudevents/generic/baseclient.go | 17 ++ pkg/cloudevents/generic/metrics_collector.go | 161 +++++++++++++++--- .../generic/metrics_collector_test.go | 161 ++++++++++++++++-- .../generic/options/fake/fakeoptions.go | 7 +- pkg/cloudevents/generic/sourceclient.go | 1 + .../work/agent/client/manifestwork.go | 84 ++++++--- .../work/source/client/manifestwork.go | 98 ++++++++--- 9 files changed, 458 insertions(+), 80 deletions(-) diff --git a/pkg/cloudevents/generic/agentclient.go b/pkg/cloudevents/generic/agentclient.go index 459b8af8..a8425507 100644 --- a/pkg/cloudevents/generic/agentclient.go +++ b/pkg/cloudevents/generic/agentclient.go @@ -43,6 +43,7 @@ func NewCloudEventAgentClient[T ResourceObject]( codecs ...Codec[T], ) (*CloudEventAgentClient[T], error) { baseClient := &baseClient{ + clientID: agentOptions.AgentID, cloudEventsOptions: agentOptions.CloudEventsOptions, cloudEventsRateLimiter: NewRateLimiter(agentOptions.EventRateLimit), reconnectedChan: make(chan struct{}), diff --git a/pkg/cloudevents/generic/agentclient_test.go b/pkg/cloudevents/generic/agentclient_test.go index fc50a456..da296171 100644 --- a/pkg/cloudevents/generic/agentclient_test.go +++ b/pkg/cloudevents/generic/agentclient_test.go @@ -60,7 +60,7 @@ func TestAgentResync(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) lister := newMockResourceLister(c.resources...) - agent, err := NewCloudEventAgentClient[*mockResource](ctx, fake.NewAgentOptions(gochan.New(), c.clusterName, testAgentName), lister, statusHash, newMockResourceCodec()) + agent, err := NewCloudEventAgentClient[*mockResource](ctx, fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName), lister, statusHash, newMockResourceCodec()) require.NoError(t, err) // start a cloudevents receiver client go to receive the event @@ -127,7 +127,7 @@ func TestAgentPublish(t *testing.T) { t.Run(c.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - agentOptions := fake.NewAgentOptions(gochan.New(), c.clusterName, testAgentName) + agentOptions := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName) lister := newMockResourceLister() agent, err := NewCloudEventAgentClient[*mockResource](context.TODO(), agentOptions, lister, statusHash, newMockResourceCodec()) require.Nil(t, err) @@ -281,7 +281,7 @@ func TestStatusResyncResponse(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - agentOptions := fake.NewAgentOptions(gochan.New(), c.clusterName, testAgentName) + agentOptions := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName) lister := newMockResourceLister(c.resources...) agent, err := NewCloudEventAgentClient[*mockResource](ctx, agentOptions, lister, statusHash, newMockResourceCodec()) require.NoError(t, err) @@ -463,7 +463,7 @@ func TestReceiveResourceSpec(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - agentOptions := fake.NewAgentOptions(gochan.New(), c.clusterName, testAgentName) + agentOptions := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName) lister := newMockResourceLister(c.resources...) agent, err := NewCloudEventAgentClient[*mockResource](context.TODO(), agentOptions, lister, statusHash, newMockResourceCodec()) require.NoError(t, err) diff --git a/pkg/cloudevents/generic/baseclient.go b/pkg/cloudevents/generic/baseclient.go index ba08e133..8d3e69b4 100644 --- a/pkg/cloudevents/generic/baseclient.go +++ b/pkg/cloudevents/generic/baseclient.go @@ -15,6 +15,7 @@ import ( "k8s.io/utils/clock" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) const ( @@ -35,6 +36,7 @@ type receiveFn func(ctx context.Context, evt cloudevents.Event) type baseClient struct { sync.RWMutex + clientID string // the client id is used to identify the client, either a source or an agent ID cloudEventsOptions options.CloudEventsOptions cloudEventsProtocol options.CloudEventsProtocol cloudEventsClient cloudevents.Client @@ -68,6 +70,7 @@ func (c *baseClient) connect(ctx context.Context) error { } // the cloudevents network connection is back, mark the client ready and send the receiver restart signal klog.V(4).Infof("the cloudevents client is reconnected") + increaseClientReconnectedCounter(c.clientID) c.setClientReady(true) c.sendReceiverSignal(restartReceiverSignal) c.sendReconnectedSignal() @@ -130,6 +133,14 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error { return fmt.Errorf("failed to send event %s, %v", evt.Context, result) } + clusterName := evt.Context.GetExtensions()[types.ExtensionClusterName].(string) + + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err == nil { + // only increase the sent counter for the known event types + increaseCloudEventsSentCounter(evt.Source(), clusterName, eventType.CloudEventsDataType.String()) + } + return nil } @@ -155,6 +166,12 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { go func() { if err := c.cloudEventsClient.StartReceiver(receiverCtx, func(evt cloudevents.Event) { klog.V(4).Infof("Received event: %s", evt) + clusterName := evt.Context.GetExtensions()[types.ExtensionClusterName].(string) + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err == nil { + // only increase the received counter for the known event types + increaseCloudEventsReceivedCounter(evt.Source(), clusterName, eventType.CloudEventsDataType.String()) + } receive(receiverCtx, evt) }); err != nil { runtime.HandleError(fmt.Errorf("failed to receive cloudevents, %v", err)) diff --git a/pkg/cloudevents/generic/metrics_collector.go b/pkg/cloudevents/generic/metrics_collector.go index bdb24cbd..75790a89 100644 --- a/pkg/cloudevents/generic/metrics_collector.go +++ b/pkg/cloudevents/generic/metrics_collector.go @@ -7,26 +7,74 @@ import ( ) // Subsystem used to define the metrics: -const metricsSubsystem = "resources" +const ( + cloudeventsMetricsSubsystem = "cloudevents" + resourcesMetricsSubsystem = "resources" + manifestworkMetricsSubsystem = "manifestworks" +) // Names of the labels added to metrics: const ( - metricsSourceLabel = "source" - metricsClusterLabel = "cluster" - metrucsDataTypeLabel = "type" + metricsSourceLabel = "source" + metricsClusterLabel = "cluster" + metricsDataTypeLabel = "type" + metricsClientIDLabel = "client_id" + metricsWorkActionLabel = "action" + metricsWorkCodeLabel = "code" ) -// metricsLabels - Array of labels added to metrics: -var metricsLabels = []string{ +// cloudeventsMetricsLabels - Array of labels added to cloudevents metrics: +var cloudeventsMetricsLabels = []string{ metricsSourceLabel, // source metricsClusterLabel, // cluster - metrucsDataTypeLabel, // resource type + metricsDataTypeLabel, // data type, e.g. manifests, manifestbundles +} + +// cloudeventsClientMetricsLabels - Array of labels added to cloudevents client metrics: +var cloudeventsClientMetricsLabels = []string{ + metricsClientIDLabel, // client_id +} + +// workMetricsLabels - Array of labels added to manifestwork metrics: +var workMetricsLabels = []string{ + metricsWorkActionLabel, // action + metricsWorkCodeLabel, // code } // Names of the metrics: const ( + receivedCounterMetric = "received_total" + sentCounterMetric = "sent_total" specResyncDurationMetric = "spec_resync_duration_seconds" statusResyncDurationMetric = "status_resync_duration_seconds" + clientReconnectedCounter = "client_reconnected_total" + workProcessedCounter = "processed_total" +) + +// The cloudevents received counter metric is a counter with a base metric name of 'received_total' +// and a help string of 'The total number of received CloudEvents.' +// For example, 2 CloudEvents received from source1 to agent on cluster1 with data type manifests would result in the following metrics: +// cloudevents_received_total{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests"} 2 +var cloudeventsReceivedCounterMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: cloudeventsMetricsSubsystem, + Name: receivedCounterMetric, + Help: "The total number of received CloudEvents.", + }, + cloudeventsMetricsLabels, +) + +// The cloudevents sent counter metric is a counter with a base metric name of 'sent_total' +// and a help string of 'The total number of sent CloudEvents.' +// For example, 2 CloudEvents sent from agent on cluster1 to source1 with data type manifestbundles would result in the following metrics: +// cloudevents_sent_total{source="cluster1-work-agent",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles"} 2 +var cloudeventsSentCounterMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: cloudeventsMetricsSubsystem, + Name: sentCounterMetric, + Help: "The total number of sent CloudEvents.", + }, + cloudeventsMetricsLabels, ) // The resource spec resync duration metric is a histogram with a base metric name of 'resource_spec_resync_duration_second' @@ -47,7 +95,7 @@ const ( // resource_spec_resync_duration_seconds_count{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests"} 2 var resourceSpecResyncDurationMetric = prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Subsystem: metricsSubsystem, + Subsystem: resourcesMetricsSubsystem, Name: specResyncDurationMetric, Help: "The duration of the resource spec resync in seconds.", Buckets: []float64{ @@ -60,7 +108,7 @@ var resourceSpecResyncDurationMetric = prometheus.NewHistogramVec( 30.0, }, }, - metricsLabels, + cloudeventsMetricsLabels, ) // The resource status resync duration metric is a histogram with a base metric name of 'resource_status_resync_duration_second' @@ -81,7 +129,7 @@ var resourceSpecResyncDurationMetric = prometheus.NewHistogramVec( // resource_status_resync_duration_seconds_count{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles"} 2 var resourceStatusResyncDurationMetric = prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Subsystem: metricsSubsystem, + Subsystem: resourcesMetricsSubsystem, Name: statusResyncDurationMetric, Help: "The duration of the resource status resync in seconds.", Buckets: []float64{ @@ -94,25 +142,79 @@ var resourceStatusResyncDurationMetric = prometheus.NewHistogramVec( 30.0, }, }, - metricsLabels, + cloudeventsMetricsLabels, +) + +// The cloudevents client reconnected counter metric is a counter with a base metric name of 'client_reconnected_total' +// and a help string of 'The total number of reconnects for the CloudEvents client.' +// For example, 2 reconnects for the CloudEvents client with client_id=client1 would result in the following metrics: +// client_reconnected_total{client_id="client1"} 2 +var clientReconnectedCounterMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: cloudeventsMetricsSubsystem, + Name: clientReconnectedCounter, + Help: "The total number of reconnects for the CloudEvents client.", + }, + cloudeventsClientMetricsLabels, +) + +var workProcessedCounterMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: manifestworkMetricsSubsystem, + Name: workProcessedCounter, + Help: "The total number of processed manifestworks.", + }, + workMetricsLabels, ) // Register the metrics: -func RegisterResourceResyncMetrics() { - prometheus.MustRegister(resourceSpecResyncDurationMetric) - prometheus.MustRegister(resourceStatusResyncDurationMetric) +func RegisterCloudEventsMetrics(register prometheus.Registerer) { + register.MustRegister(cloudeventsReceivedCounterMetric) + register.MustRegister(cloudeventsSentCounterMetric) + register.MustRegister(resourceSpecResyncDurationMetric) + register.MustRegister(resourceStatusResyncDurationMetric) + register.MustRegister(clientReconnectedCounterMetric) + register.MustRegister(workProcessedCounterMetric) } // Unregister the metrics: -func UnregisterResourceResyncMetrics() { - prometheus.Unregister(resourceStatusResyncDurationMetric) - prometheus.Unregister(resourceStatusResyncDurationMetric) +func UnregisterCloudEventsMetrics(register prometheus.Registerer) { + register.Unregister(cloudeventsReceivedCounterMetric) + register.Unregister(cloudeventsSentCounterMetric) + register.Unregister(resourceStatusResyncDurationMetric) + register.Unregister(resourceStatusResyncDurationMetric) + register.Unregister(clientReconnectedCounterMetric) + register.Unregister(workProcessedCounterMetric) } -// ResetResourceResyncMetricsCollectors resets all collectors -func ResetResourceResyncMetricsCollectors() { +// ResetCloudEventsMetrics resets all collectors +func ResetCloudEventsMetrics() { + cloudeventsReceivedCounterMetric.Reset() + cloudeventsSentCounterMetric.Reset() resourceSpecResyncDurationMetric.Reset() resourceStatusResyncDurationMetric.Reset() + clientReconnectedCounterMetric.Reset() + workProcessedCounterMetric.Reset() +} + +// increaseCloudEventsReceivedCounter increases the cloudevents sent counter metric: +func increaseCloudEventsReceivedCounter(source, cluster, dataType string) { + labels := prometheus.Labels{ + metricsSourceLabel: source, + metricsClusterLabel: cluster, + metricsDataTypeLabel: dataType, + } + cloudeventsReceivedCounterMetric.With(labels).Inc() +} + +// increaseCloudEventsSentCounter increases the cloudevents sent counter metric: +func increaseCloudEventsSentCounter(source, cluster, dataType string) { + labels := prometheus.Labels{ + metricsSourceLabel: source, + metricsClusterLabel: cluster, + metricsDataTypeLabel: dataType, + } + cloudeventsSentCounterMetric.With(labels).Inc() } // updateResourceSpecResyncDurationMetric updates the resource spec resync duration metric: @@ -120,7 +222,7 @@ func updateResourceSpecResyncDurationMetric(source, cluster, dataType string, st labels := prometheus.Labels{ metricsSourceLabel: source, metricsClusterLabel: cluster, - metrucsDataTypeLabel: dataType, + metricsDataTypeLabel: dataType, } duration := time.Since(startTime) resourceSpecResyncDurationMetric.With(labels).Observe(duration.Seconds()) @@ -131,8 +233,25 @@ func updateResourceStatusResyncDurationMetric(source, cluster, dataType string, labels := prometheus.Labels{ metricsSourceLabel: source, metricsClusterLabel: cluster, - metrucsDataTypeLabel: dataType, + metricsDataTypeLabel: dataType, } duration := time.Since(startTime) resourceStatusResyncDurationMetric.With(labels).Observe(duration.Seconds()) } + +// increaseClientReconnectedCounter increases the client reconnected counter metric: +func increaseClientReconnectedCounter(clientID string) { + labels := prometheus.Labels{ + metricsClientIDLabel: clientID, + } + clientReconnectedCounterMetric.With(labels).Inc() +} + +// IncreaseWorkProcessedCounter increases the work processed counter metric: +func IncreaseWorkProcessedCounter(action, code string) { + labels := prometheus.Labels{ + metricsWorkActionLabel: action, + metricsWorkCodeLabel: code, + } + workProcessedCounterMetric.With(labels).Inc() +} diff --git a/pkg/cloudevents/generic/metrics_collector_test.go b/pkg/cloudevents/generic/metrics_collector_test.go index f16fcd3f..7b9429b0 100644 --- a/pkg/cloudevents/generic/metrics_collector_test.go +++ b/pkg/cloudevents/generic/metrics_collector_test.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + kubetypes "k8s.io/apimachinery/pkg/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/fake" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/payload" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -23,42 +24,180 @@ const ( testStatusResync testResyncType = "status" ) +func TestCloudEventsMetrics(t *testing.T) { + cases := []struct { + name string + clusterName string + sourceID string + resources []*mockResource + dataType types.CloudEventsDataType + }{ + { + name: "receive single resource", + clusterName: "cluster1", + sourceID: "source1", + resources: []*mockResource{ + {Namespace: "cluster1", UID: kubetypes.UID("test1"), ResourceVersion: "2", Status: "test1"}, + }, + dataType: mockEventDataType, + }, + { + name: "receive multiple resources", + clusterName: "cluster1", + sourceID: "source1", + resources: []*mockResource{ + {Namespace: "cluster1", UID: kubetypes.UID("test1"), ResourceVersion: "2", Status: "test1"}, + {Namespace: "cluster1", UID: kubetypes.UID("test2"), ResourceVersion: "3", Status: "test2"}, + }, + dataType: mockEventDataType, + }, + } + for _, c := range cases { + // reset metrics + ResetCloudEventsMetrics() + // run test + t.Run(c.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + sendReceiver := gochan.New() + + // initialize source client + sourceOptions := fake.NewSourceOptions(sendReceiver, c.sourceID) + lister := newMockResourceLister([]*mockResource{}...) + source, err := NewCloudEventSourceClient[*mockResource](ctx, sourceOptions, lister, statusHash, newMockResourceCodec()) + require.NoError(t, err) + + // initialize agent client + agentOptions := fake.NewAgentOptions(sendReceiver, nil, c.clusterName, testAgentName) + agentLister := newMockResourceLister([]*mockResource{}...) + agent, err := NewCloudEventAgentClient[*mockResource](ctx, agentOptions, agentLister, statusHash, newMockResourceCodec()) + require.NoError(t, err) + + // start agent subscription + agent.subscribe(ctx, func(ctx context.Context, evt cloudevents.Event) {}) + + eventType := types.CloudEventsType{ + CloudEventsDataType: c.dataType, + SubResource: types.SubResourceSpec, + Action: "test_create_request", + } + + // publish resources to agent + for _, resource := range c.resources { + err = source.Publish(ctx, eventType, resource) + require.NoError(t, err) + } + + // wait 1 second for agent receive the resources + time.Sleep(time.Second) + + // ensure metrics are updated + sentTotal := cloudeventsSentCounterMetric.WithLabelValues(c.sourceID, c.clusterName, c.dataType.String()) + require.Equal(t, len(c.resources), int(toFloat64Counter(sentTotal))) + receivedTotal := cloudeventsReceivedCounterMetric.WithLabelValues(c.sourceID, c.clusterName, c.dataType.String()) + require.Equal(t, len(c.resources), int(toFloat64Counter(receivedTotal))) + + cancel() + }) + } +} + +func TestReconnectMetrics(t *testing.T) { + // reset metrics + ResetCloudEventsMetrics() + ctx, cancel := context.WithCancel(context.Background()) + + originalDelayFn := DelayFn + // override DelayFn to avoid waiting for backoff + DelayFn = func() time.Duration { return 0 } + defer func() { + // reset DelayFn + DelayFn = originalDelayFn + }() + + errChan := make(chan error) + agentOptions := fake.NewAgentOptions(gochan.New(), errChan, "cluster1", testAgentName) + agentLister := newMockResourceLister([]*mockResource{}...) + _, err := NewCloudEventAgentClient[*mockResource](ctx, agentOptions, agentLister, statusHash, newMockResourceCodec()) + require.NoError(t, err) + + // mimic agent disconnection by sending an error + errChan <- fmt.Errorf("test error") + // sleep second to wait for the agent to reconnect + time.Sleep(time.Second) + + reconnectTotal := clientReconnectedCounterMetric.WithLabelValues(testAgentName) + require.Equal(t, 1.0, toFloat64Counter(reconnectTotal)) + + cancel() +} + +// toFloat64Counter returns the count of a counter metric +func toFloat64Counter(c prometheus.Counter) float64 { + var ( + m prometheus.Metric + mCount int + mChan = make(chan prometheus.Metric) + done = make(chan struct{}) + ) + + go func() { + for m = range mChan { + mCount++ + } + close(done) + }() + + c.Collect(mChan) + close(mChan) + <-done + + if mCount != 1 { + panic(fmt.Errorf("collected %d metrics instead of exactly 1", mCount)) + } + + pb := &dto.Metric{} + if err := m.Write(pb); err != nil { + panic(fmt.Errorf("metric write failed, err=%v", err)) + } + + if pb.Counter != nil { + return pb.Counter.GetValue() + } + panic(fmt.Errorf("collected a non-counter metric: %s", pb)) +} + func TestResyncMetrics(t *testing.T) { cases := []struct { name string - rescType testResyncType + resyncType testResyncType clusterName string sourceID string dataType types.CloudEventsDataType }{ { name: "resync spec", - rescType: testSpecResync, + resyncType: testSpecResync, clusterName: "cluster1", sourceID: "source1", dataType: mockEventDataType, }, { name: "resync status", - rescType: testStatusResync, + resyncType: testStatusResync, clusterName: "cluster1", sourceID: "source1", dataType: mockEventDataType, }, } - // register metrics - RegisterResourceResyncMetrics() - // unregister metrics - defer UnregisterResourceResyncMetrics() for _, c := range cases { // reset metrics - ResetResourceResyncMetricsCollectors() + ResetCloudEventsMetrics() // run test t.Run(c.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - if c.rescType == testSpecResync { + if c.resyncType == testSpecResync { sourceOptions := fake.NewSourceOptions(gochan.New(), c.sourceID) lister := newMockResourceLister([]*mockResource{}...) source, err := NewCloudEventSourceClient[*mockResource](ctx, sourceOptions, lister, statusHash, newMockResourceCodec()) @@ -89,8 +228,8 @@ func TestResyncMetrics(t *testing.T) { require.Less(t, sum, 1.0) } - if c.rescType == testStatusResync { - agentOptions := fake.NewAgentOptions(gochan.New(), c.clusterName, testAgentName) + if c.resyncType == testStatusResync { + agentOptions := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName) lister := newMockResourceLister([]*mockResource{}...) agent, err := NewCloudEventAgentClient[*mockResource](ctx, agentOptions, lister, statusHash, newMockResourceCodec()) require.NoError(t, err) diff --git a/pkg/cloudevents/generic/options/fake/fakeoptions.go b/pkg/cloudevents/generic/options/fake/fakeoptions.go index b74e4ec5..f61b7fac 100644 --- a/pkg/cloudevents/generic/options/fake/fakeoptions.go +++ b/pkg/cloudevents/generic/options/fake/fakeoptions.go @@ -10,11 +10,12 @@ import ( type CloudEventsFakeOptions struct { protocol options.CloudEventsProtocol + errChan chan error } -func NewAgentOptions(protocol options.CloudEventsProtocol, clusterName, agentID string) *options.CloudEventsAgentOptions { +func NewAgentOptions(protocol options.CloudEventsProtocol, errChan chan error, clusterName, agentID string) *options.CloudEventsAgentOptions { return &options.CloudEventsAgentOptions{ - CloudEventsOptions: &CloudEventsFakeOptions{protocol: protocol}, + CloudEventsOptions: &CloudEventsFakeOptions{protocol: protocol, errChan: errChan}, AgentID: agentID, ClusterName: clusterName, } @@ -36,5 +37,5 @@ func (o *CloudEventsFakeOptions) Protocol(ctx context.Context) (options.CloudEve } func (o *CloudEventsFakeOptions) ErrorChan() <-chan error { - return nil + return o.errChan } diff --git a/pkg/cloudevents/generic/sourceclient.go b/pkg/cloudevents/generic/sourceclient.go index cd45043e..11760b5c 100644 --- a/pkg/cloudevents/generic/sourceclient.go +++ b/pkg/cloudevents/generic/sourceclient.go @@ -43,6 +43,7 @@ func NewCloudEventSourceClient[T ResourceObject]( codecs ...Codec[T], ) (*CloudEventSourceClient[T], error) { baseClient := &baseClient{ + clientID: sourceOptions.SourceID, cloudEventsOptions: sourceOptions.CloudEventsOptions, cloudEventsRateLimiter: NewRateLimiter(sourceOptions.EventRateLimit), reconnectedChan: make(chan struct{}), diff --git a/pkg/cloudevents/work/agent/client/manifestwork.go b/pkg/cloudevents/work/agent/client/manifestwork.go index 645974d5..c250e45d 100644 --- a/pkg/cloudevents/work/agent/client/manifestwork.go +++ b/pkg/cloudevents/work/agent/client/manifestwork.go @@ -78,12 +78,17 @@ func (c *ManifestWorkAgentClient) Get(ctx context.Context, name string, opts met klog.V(4).Infof("getting manifestwork %s/%s", c.namespace, name) work, exists, err := c.watcherStore.Get(c.namespace, name) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("get", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if !exists { - return nil, errors.NewNotFound(common.ManifestWorkGR, name) + returnErr := errors.NewNotFound(common.ManifestWorkGR, name) + generic.IncreaseWorkProcessedCounter("get", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + generic.IncreaseWorkProcessedCounter("get", metav1.StatusSuccess) return work, nil } @@ -91,9 +96,12 @@ func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOpti klog.V(4).Infof("list manifestworks from cluster %s", c.namespace) works, err := c.watcherStore.List(c.namespace, opts) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("list", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + generic.IncreaseWorkProcessedCounter("list", metav1.StatusSuccess) return works, nil } @@ -101,8 +109,12 @@ func (c *ManifestWorkAgentClient) Watch(ctx context.Context, opts metav1.ListOpt klog.V(4).Infof("watch manifestworks from cluster %s", c.namespace) watcher, err := c.watcherStore.GetWatcher(c.namespace, opts) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("watch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + + generic.IncreaseWorkProcessedCounter("watch", metav1.StatusSuccess) return watcher, nil } @@ -110,20 +122,28 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub klog.V(4).Infof("patching manifestwork %s/%s", c.namespace, name) lastWork, exists, err := c.watcherStore.Get(c.namespace, name) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if !exists { - return nil, errors.NewNotFound(common.ManifestWorkGR, name) + returnErr := errors.NewNotFound(common.ManifestWorkGR, name) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } patchedWork, err := utils.Patch(pt, lastWork, data) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } eventDataType, err := types.ParseCloudEventsDataType(patchedWork.Annotations[common.CloudEventsDataTypeAnnotationKey]) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } eventType := types.CloudEventsType{ @@ -135,7 +155,9 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub statusUpdated, err := isStatusUpdate(subresources) if err != nil { - return nil, errors.NewGenericServerResponse(http.StatusMethodNotAllowed, "patch", common.ManifestWorkGR, name, err.Error(), 0, false) + returnErr := errors.NewGenericServerResponse(http.StatusMethodNotAllowed, "patch", common.ManifestWorkGR, name, err.Error(), 0, false) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if statusUpdated { @@ -147,7 +169,9 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub // publish the status update event to source, source will check the resource version // and reject the update if it's status update is outdated. if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { - return nil, workerrors.NewPublishError(common.ManifestWorkGR, name, err) + returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } // Fetch the latest work from the store and verify the resource version to avoid updating the store @@ -156,28 +180,42 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub // this update operation and one from the agent informer after receiving the event from the source. latestWork, exists, err := c.watcherStore.Get(c.namespace, name) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if !exists { - return nil, errors.NewNotFound(common.ManifestWorkGR, name) + returnErr := errors.NewNotFound(common.ManifestWorkGR, name) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } lastResourceVersion, err := strconv.ParseInt(latestWork.GetResourceVersion(), 10, 64) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } newResourceVersion, err := strconv.ParseInt(newWork.GetResourceVersion(), 10, 64) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } // ensure the resource version of the work is not outdated if newResourceVersion < lastResourceVersion { // It's safe to return a conflict error here, even if the status update event // has already been sent. The source may reject the update due to an outdated resource version. - return nil, errors.NewConflict(common.ManifestWorkGR, name, fmt.Errorf("the resource version of the work is outdated")) + returnErr := errors.NewConflict(common.ManifestWorkGR, name, fmt.Errorf("the resource version of the work is outdated")) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if err := c.watcherStore.Update(newWork); err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + + generic.IncreaseWorkProcessedCounter("patch", metav1.StatusSuccess) return newWork, nil } @@ -193,20 +231,28 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub eventType.Action = common.DeleteRequestAction if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { - return nil, workerrors.NewPublishError(common.ManifestWorkGR, name, err) + returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err) + generic.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if err := c.watcherStore.Delete(newWork); err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + generic.IncreaseWorkProcessedCounter("delete", metav1.StatusSuccess) return newWork, nil } if err := c.watcherStore.Update(newWork); err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + generic.IncreaseWorkProcessedCounter("patch", metav1.StatusSuccess) return newWork, nil } diff --git a/pkg/cloudevents/work/source/client/manifestwork.go b/pkg/cloudevents/work/source/client/manifestwork.go index f0b56a31..3da87c86 100644 --- a/pkg/cloudevents/work/source/client/manifestwork.go +++ b/pkg/cloudevents/work/source/client/manifestwork.go @@ -52,21 +52,27 @@ func (c *ManifestWorkSourceClient) SetNamespace(namespace string) { func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.CreateOptions) (*workv1.ManifestWork, error) { if manifestWork.Namespace != "" && manifestWork.Namespace != c.namespace { - return nil, errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, field.ErrorList{ + returnErr := errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, field.ErrorList{ field.Invalid( field.NewPath("metadata").Child("namespace"), manifestWork.Namespace, fmt.Sprintf("does not match the namespace %s", c.namespace), ), }) + generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } _, exists, err := c.watcherStore.Get(c.namespace, manifestWork.Name) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if exists { - return nil, errors.NewAlreadyExists(common.ManifestWorkGR, manifestWork.Name) + returnErr := errors.NewAlreadyExists(common.ManifestWorkGR, manifestWork.Name) + generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } // TODO if we support multiple data type in future, we may need to get the data type from @@ -83,21 +89,31 @@ func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *wor newWork.ResourceVersion = getWorkResourceVersion(manifestWork) if err := utils.Encode(newWork); err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if errs := utils.Validate(newWork); len(errs) != 0 { - return nil, errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, errs) + returnErr := errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, errs) + generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { - return nil, workerrors.NewPublishError(common.ManifestWorkGR, manifestWork.Name, err) + returnErr := workerrors.NewPublishError(common.ManifestWorkGR, manifestWork.Name, err) + generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } // add the new work to the local cache. if err := c.watcherStore.Add(newWork); err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + + generic.IncreaseWorkProcessedCounter("create", metav1.StatusSuccess) return newWork.DeepCopy(), nil } @@ -112,7 +128,9 @@ func (c *ManifestWorkSourceClient) UpdateStatus(ctx context.Context, manifestWor func (c *ManifestWorkSourceClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { work, exists, err := c.watcherStore.Get(c.namespace, name) if err != nil { - return errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) + return returnErr } if !exists { return nil @@ -131,7 +149,9 @@ func (c *ManifestWorkSourceClient) Delete(ctx context.Context, name string, opts deletingWork.DeletionTimestamp = &now if err := c.cloudEventsClient.Publish(ctx, eventType, deletingWork); err != nil { - return workerrors.NewPublishError(common.ManifestWorkGR, name, err) + returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err) + generic.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) + return returnErr } if len(work.Finalizers) == 0 { @@ -140,15 +160,22 @@ func (c *ManifestWorkSourceClient) Delete(ctx context.Context, name string, opts // 2) the agent is running, but the status response does not be handled by source yet, // after the deleted status is back, we need ignore this work in the ManifestWorkSourceHandler. if err := c.watcherStore.Delete(deletingWork); err != nil { - return errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) + return returnErr } + + generic.IncreaseWorkProcessedCounter("delete", metav1.StatusSuccess) return nil } // update the work with deletion timestamp in the local cache. if err := c.watcherStore.Update(deletingWork); err != nil { - return errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) + return returnErr } + return nil } @@ -160,12 +187,17 @@ func (c *ManifestWorkSourceClient) Get(ctx context.Context, name string, opts me klog.V(4).Infof("getting manifestwork %s", name) work, exists, err := c.watcherStore.Get(c.namespace, name) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("get", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if !exists { - return nil, errors.NewNotFound(common.ManifestWorkGR, name) + returnErr := errors.NewNotFound(common.ManifestWorkGR, name) + generic.IncreaseWorkProcessedCounter("get", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + generic.IncreaseWorkProcessedCounter("get", metav1.StatusSuccess) return work, nil } @@ -173,17 +205,24 @@ func (c *ManifestWorkSourceClient) List(ctx context.Context, opts metav1.ListOpt klog.V(4).Infof("list manifestworks") works, err := c.watcherStore.List(c.namespace, opts) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("list", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + generic.IncreaseWorkProcessedCounter("list", metav1.StatusSuccess) return works, nil } func (c *ManifestWorkSourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { watcher, err := c.watcherStore.GetWatcher(c.namespace, opts) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("watch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + + generic.IncreaseWorkProcessedCounter("watch", metav1.StatusSuccess) return watcher, nil } @@ -192,20 +231,28 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku if len(subresources) != 0 { msg := fmt.Sprintf("unsupported to update subresources %v", subresources) - return nil, errors.NewGenericServerResponse(http.StatusMethodNotAllowed, "patch", common.ManifestWorkGR, name, msg, 0, false) + returnErr := errors.NewGenericServerResponse(http.StatusMethodNotAllowed, "patch", common.ManifestWorkGR, name, msg, 0, false) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } lastWork, exists, err := c.watcherStore.Get(c.namespace, name) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if !exists { - return nil, errors.NewNotFound(common.ManifestWorkGR, name) + returnErr := errors.NewNotFound(common.ManifestWorkGR, name) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } patchedWork, err := utils.Patch(pt, lastWork, data) if err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } // TODO if we support multiple data type in future, we may need to get the data type from @@ -220,18 +267,25 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku newWork.ResourceVersion = getWorkResourceVersion(patchedWork) if errs := utils.Validate(newWork); len(errs) != 0 { - return nil, errors.NewInvalid(common.ManifestWorkGK, name, errs) + returnErr := errors.NewInvalid(common.ManifestWorkGK, name, errs) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { - return nil, workerrors.NewPublishError(common.ManifestWorkGR, name, err) + returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } // modify the updated work in the local cache. if err := c.watcherStore.Update(newWork); err != nil { - return nil, errors.NewInternalError(err) + returnErr := errors.NewInternalError(err) + generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) + return nil, returnErr } + generic.IncreaseWorkProcessedCounter("patch", metav1.StatusSuccess) return newWork.DeepCopy(), nil }