Skip to content

Commit

Permalink
add cloudevents metrics.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Nov 15, 2024
1 parent f50d6e8 commit 70b15f7
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 80 deletions.
1 change: 1 addition & 0 deletions pkg/cloudevents/generic/agentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func NewCloudEventAgentClient[T ResourceObject](
codecs ...Codec[T],
) (*CloudEventAgentClient[T], error) {
baseClient := &baseClient{
clientID: agentOptions.AgentID,
cloudEventsOptions: agentOptions.CloudEventsOptions,
cloudEventsRateLimiter: NewRateLimiter(agentOptions.EventRateLimit),
reconnectedChan: make(chan struct{}),
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloudevents/generic/agentclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestAgentResync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

lister := newMockResourceLister(c.resources...)
agent, err := NewCloudEventAgentClient[*mockResource](ctx, fake.NewAgentOptions(gochan.New(), c.clusterName, testAgentName), lister, statusHash, newMockResourceCodec())
agent, err := NewCloudEventAgentClient[*mockResource](ctx, fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName), lister, statusHash, newMockResourceCodec())
require.NoError(t, err)

// start a cloudevents receiver client go to receive the event
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestAgentPublish(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

agentOptions := fake.NewAgentOptions(gochan.New(), c.clusterName, testAgentName)
agentOptions := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName)
lister := newMockResourceLister()
agent, err := NewCloudEventAgentClient[*mockResource](context.TODO(), agentOptions, lister, statusHash, newMockResourceCodec())
require.Nil(t, err)
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestStatusResyncResponse(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

agentOptions := fake.NewAgentOptions(gochan.New(), c.clusterName, testAgentName)
agentOptions := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName)
lister := newMockResourceLister(c.resources...)
agent, err := NewCloudEventAgentClient[*mockResource](ctx, agentOptions, lister, statusHash, newMockResourceCodec())
require.NoError(t, err)
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestReceiveResourceSpec(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
agentOptions := fake.NewAgentOptions(gochan.New(), c.clusterName, testAgentName)
agentOptions := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName)
lister := newMockResourceLister(c.resources...)
agent, err := NewCloudEventAgentClient[*mockResource](context.TODO(), agentOptions, lister, statusHash, newMockResourceCodec())
require.NoError(t, err)
Expand Down
17 changes: 17 additions & 0 deletions pkg/cloudevents/generic/baseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/utils/clock"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)

const (
Expand All @@ -35,6 +36,7 @@ type receiveFn func(ctx context.Context, evt cloudevents.Event)

type baseClient struct {
sync.RWMutex
clientID string // the client id is used to identify the client, either a source or an agent ID
cloudEventsOptions options.CloudEventsOptions
cloudEventsProtocol options.CloudEventsProtocol
cloudEventsClient cloudevents.Client
Expand Down Expand Up @@ -68,6 +70,7 @@ func (c *baseClient) connect(ctx context.Context) error {
}
// the cloudevents network connection is back, mark the client ready and send the receiver restart signal
klog.V(4).Infof("the cloudevents client is reconnected")
increaseClientReconnectedCounter(c.clientID)
c.setClientReady(true)
c.sendReceiverSignal(restartReceiverSignal)
c.sendReconnectedSignal()
Expand Down Expand Up @@ -130,6 +133,14 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error {
return fmt.Errorf("failed to send event %s, %v", evt.Context, result)
}

clusterName := evt.Context.GetExtensions()[types.ExtensionClusterName].(string)

eventType, err := types.ParseCloudEventsType(evt.Type())
if err == nil {
// only increase the sent counter for the known event types
increaseCloudEventsSentCounter(evt.Source(), clusterName, eventType.CloudEventsDataType.String())
}

return nil
}

Expand All @@ -155,6 +166,12 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
go func() {
if err := c.cloudEventsClient.StartReceiver(receiverCtx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event: %s", evt)
clusterName := evt.Context.GetExtensions()[types.ExtensionClusterName].(string)
eventType, err := types.ParseCloudEventsType(evt.Type())
if err == nil {
// only increase the received counter for the known event types
increaseCloudEventsReceivedCounter(evt.Source(), clusterName, eventType.CloudEventsDataType.String())
}
receive(receiverCtx, evt)
}); err != nil {
runtime.HandleError(fmt.Errorf("failed to receive cloudevents, %v", err))
Expand Down
161 changes: 140 additions & 21 deletions pkg/cloudevents/generic/metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,74 @@ import (
)

// Subsystem used to define the metrics:
const metricsSubsystem = "resources"
const (
cloudeventsMetricsSubsystem = "cloudevents"
resourcesMetricsSubsystem = "resources"
manifestworkMetricsSubsystem = "manifestworks"
)

// Names of the labels added to metrics:
const (
metricsSourceLabel = "source"
metricsClusterLabel = "cluster"
metrucsDataTypeLabel = "type"
metricsSourceLabel = "source"
metricsClusterLabel = "cluster"
metricsDataTypeLabel = "type"
metricsClientIDLabel = "client_id"
metricsWorkActionLabel = "action"
metricsWorkCodeLabel = "code"
)

// metricsLabels - Array of labels added to metrics:
var metricsLabels = []string{
// cloudeventsMetricsLabels - Array of labels added to cloudevents metrics:
var cloudeventsMetricsLabels = []string{
metricsSourceLabel, // source
metricsClusterLabel, // cluster
metrucsDataTypeLabel, // resource type
metricsDataTypeLabel, // data type, e.g. manifests, manifestbundles
}

// cloudeventsClientMetricsLabels - Array of labels added to cloudevents client metrics:
var cloudeventsClientMetricsLabels = []string{
metricsClientIDLabel, // client_id
}

// workMetricsLabels - Array of labels added to manifestwork metrics:
var workMetricsLabels = []string{
metricsWorkActionLabel, // action
metricsWorkCodeLabel, // code
}

// Names of the metrics:
const (
receivedCounterMetric = "received_total"
sentCounterMetric = "sent_total"
specResyncDurationMetric = "spec_resync_duration_seconds"
statusResyncDurationMetric = "status_resync_duration_seconds"
clientReconnectedCounter = "client_reconnected_total"
workProcessedCounter = "processed_total"
)

// The cloudevents received counter metric is a counter with a base metric name of 'received_total'
// and a help string of 'The total number of received CloudEvents.'
// For example, 2 CloudEvents received from source1 to agent on cluster1 with data type manifests would result in the following metrics:
// cloudevents_received_total{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests"} 2
var cloudeventsReceivedCounterMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: cloudeventsMetricsSubsystem,
Name: receivedCounterMetric,
Help: "The total number of received CloudEvents.",
},
cloudeventsMetricsLabels,
)

// The cloudevents sent counter metric is a counter with a base metric name of 'sent_total'
// and a help string of 'The total number of sent CloudEvents.'
// For example, 2 CloudEvents sent from agent on cluster1 to source1 with data type manifestbundles would result in the following metrics:
// cloudevents_sent_total{source="cluster1-work-agent",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles"} 2
var cloudeventsSentCounterMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: cloudeventsMetricsSubsystem,
Name: sentCounterMetric,
Help: "The total number of sent CloudEvents.",
},
cloudeventsMetricsLabels,
)

// The resource spec resync duration metric is a histogram with a base metric name of 'resource_spec_resync_duration_second'
Expand All @@ -47,7 +95,7 @@ const (
// resource_spec_resync_duration_seconds_count{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests"} 2
var resourceSpecResyncDurationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: metricsSubsystem,
Subsystem: resourcesMetricsSubsystem,
Name: specResyncDurationMetric,
Help: "The duration of the resource spec resync in seconds.",
Buckets: []float64{
Expand All @@ -60,7 +108,7 @@ var resourceSpecResyncDurationMetric = prometheus.NewHistogramVec(
30.0,
},
},
metricsLabels,
cloudeventsMetricsLabels,
)

// The resource status resync duration metric is a histogram with a base metric name of 'resource_status_resync_duration_second'
Expand All @@ -81,7 +129,7 @@ var resourceSpecResyncDurationMetric = prometheus.NewHistogramVec(
// resource_status_resync_duration_seconds_count{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles"} 2
var resourceStatusResyncDurationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: metricsSubsystem,
Subsystem: resourcesMetricsSubsystem,
Name: statusResyncDurationMetric,
Help: "The duration of the resource status resync in seconds.",
Buckets: []float64{
Expand All @@ -94,33 +142,87 @@ var resourceStatusResyncDurationMetric = prometheus.NewHistogramVec(
30.0,
},
},
metricsLabels,
cloudeventsMetricsLabels,
)

// The cloudevents client reconnected counter metric is a counter with a base metric name of 'client_reconnected_total'
// and a help string of 'The total number of reconnects for the CloudEvents client.'
// For example, 2 reconnects for the CloudEvents client with client_id=client1 would result in the following metrics:
// client_reconnected_total{client_id="client1"} 2
var clientReconnectedCounterMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: cloudeventsMetricsSubsystem,
Name: clientReconnectedCounter,
Help: "The total number of reconnects for the CloudEvents client.",
},
cloudeventsClientMetricsLabels,
)

var workProcessedCounterMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: manifestworkMetricsSubsystem,
Name: workProcessedCounter,
Help: "The total number of processed manifestworks.",
},
workMetricsLabels,
)

// Register the metrics:
func RegisterResourceResyncMetrics() {
prometheus.MustRegister(resourceSpecResyncDurationMetric)
prometheus.MustRegister(resourceStatusResyncDurationMetric)
func RegisterCloudEventsMetrics(register prometheus.Registerer) {
register.MustRegister(cloudeventsReceivedCounterMetric)
register.MustRegister(cloudeventsSentCounterMetric)
register.MustRegister(resourceSpecResyncDurationMetric)
register.MustRegister(resourceStatusResyncDurationMetric)
register.MustRegister(clientReconnectedCounterMetric)
register.MustRegister(workProcessedCounterMetric)
}

// Unregister the metrics:
func UnregisterResourceResyncMetrics() {
prometheus.Unregister(resourceStatusResyncDurationMetric)
prometheus.Unregister(resourceStatusResyncDurationMetric)
func UnregisterCloudEventsMetrics(register prometheus.Registerer) {
register.Unregister(cloudeventsReceivedCounterMetric)
register.Unregister(cloudeventsSentCounterMetric)
register.Unregister(resourceStatusResyncDurationMetric)
register.Unregister(resourceStatusResyncDurationMetric)
register.Unregister(clientReconnectedCounterMetric)
register.Unregister(workProcessedCounterMetric)
}

// ResetResourceResyncMetricsCollectors resets all collectors
func ResetResourceResyncMetricsCollectors() {
// ResetCloudEventsMetrics resets all collectors
func ResetCloudEventsMetrics() {
cloudeventsReceivedCounterMetric.Reset()
cloudeventsSentCounterMetric.Reset()
resourceSpecResyncDurationMetric.Reset()
resourceStatusResyncDurationMetric.Reset()
clientReconnectedCounterMetric.Reset()
workProcessedCounterMetric.Reset()
}

// increaseCloudEventsReceivedCounter increases the cloudevents sent counter metric:
func increaseCloudEventsReceivedCounter(source, cluster, dataType string) {
labels := prometheus.Labels{
metricsSourceLabel: source,
metricsClusterLabel: cluster,
metricsDataTypeLabel: dataType,
}
cloudeventsReceivedCounterMetric.With(labels).Inc()
}

// increaseCloudEventsSentCounter increases the cloudevents sent counter metric:
func increaseCloudEventsSentCounter(source, cluster, dataType string) {
labels := prometheus.Labels{
metricsSourceLabel: source,
metricsClusterLabel: cluster,
metricsDataTypeLabel: dataType,
}
cloudeventsSentCounterMetric.With(labels).Inc()
}

// updateResourceSpecResyncDurationMetric updates the resource spec resync duration metric:
func updateResourceSpecResyncDurationMetric(source, cluster, dataType string, startTime time.Time) {
labels := prometheus.Labels{
metricsSourceLabel: source,
metricsClusterLabel: cluster,
metrucsDataTypeLabel: dataType,
metricsDataTypeLabel: dataType,
}
duration := time.Since(startTime)
resourceSpecResyncDurationMetric.With(labels).Observe(duration.Seconds())
Expand All @@ -131,8 +233,25 @@ func updateResourceStatusResyncDurationMetric(source, cluster, dataType string,
labels := prometheus.Labels{
metricsSourceLabel: source,
metricsClusterLabel: cluster,
metrucsDataTypeLabel: dataType,
metricsDataTypeLabel: dataType,
}
duration := time.Since(startTime)
resourceStatusResyncDurationMetric.With(labels).Observe(duration.Seconds())
}

// increaseClientReconnectedCounter increases the client reconnected counter metric:
func increaseClientReconnectedCounter(clientID string) {
labels := prometheus.Labels{
metricsClientIDLabel: clientID,
}
clientReconnectedCounterMetric.With(labels).Inc()
}

// IncreaseWorkProcessedCounter increases the work processed counter metric:
func IncreaseWorkProcessedCounter(action, code string) {
labels := prometheus.Labels{
metricsWorkActionLabel: action,
metricsWorkCodeLabel: code,
}
workProcessedCounterMetric.With(labels).Inc()
}
Loading

0 comments on commit 70b15f7

Please sign in to comment.