Skip to content

Commit

Permalink
feat: add more metrics and logs for measurement (#324)
Browse files Browse the repository at this point in the history
* temp

* add more metrics and logs for metrics

* address comment

Co-authored-by: Ryan Zhang <[email protected]>
  • Loading branch information
ryanzhang-oss and Ryan Zhang authored Sep 30, 2022
1 parent cfb41a1 commit bbc7b34
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
REGISTRY ?= ghcr.io
KIND_IMAGE ?= kindest/node:v1.23.3
KIND_IMAGE ?= kindest/node:v1.24.6
ifndef TAG
TAG ?= $(shell git rev-parse --short=7 HEAD)
endif
Expand Down
2 changes: 1 addition & 1 deletion cmd/hubagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func init() {
//+kubebuilder:scaffold:scheme
klog.InitFlags(nil)

metrics.Registry.MustRegister(fleetmetrics.JoinResultMetrics, fleetmetrics.LeaveResultMetrics)
metrics.Registry.MustRegister(fleetmetrics.JoinResultMetrics, fleetmetrics.LeaveResultMetrics, fleetmetrics.PlacementApplyFailedCount, fleetmetrics.PlacementApplySucceedCount)
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/memberagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func init() {
utilruntime.Must(workv1alpha1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme

metrics.Registry.MustRegister(fleetmetrics.JoinResultMetrics, fleetmetrics.LeaveResultMetrics)
metrics.Registry.MustRegister(fleetmetrics.JoinResultMetrics, fleetmetrics.LeaveResultMetrics, fleetmetrics.WorkApplyTime)
}

func main() {
Expand Down
27 changes: 18 additions & 9 deletions pkg/controllers/clusterresourceplacement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/metrics"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informer"
Expand Down Expand Up @@ -59,6 +60,7 @@ type Reconciler struct {
}

func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
startTime := time.Now()
name, ok := key.(string)
if !ok {
err := fmt.Errorf("get place key %+v not of type string", key)
Expand All @@ -73,9 +75,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
}
placeRef := klog.KObj(placementOld)
placementNew := placementOld.DeepCopy()
// add latency log
defer func() {
klog.V(2).InfoS("ClusterResourcePlacement reconciliation loop ends", "placement", placeRef, "latency", time.Since(startTime).Milliseconds())
}()

// TODO: add finalizer logic if we need it in the future

klog.V(2).InfoS("Start to reconcile a ClusterResourcePlacement", "placement", placeRef)
// select the new clusters and record that in the placementNew status
selectedClusters, scheduleErr := r.selectClusters(placementNew)
Expand Down Expand Up @@ -120,7 +125,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
}
klog.V(2).InfoS("Successfully persisted the intermediate scheduling result", "placement", placementOld.Name,
"totalClusters", totalCluster, "totalResources", totalResources)
// pick up the new version so placementNew can continue to update
// pick up the newly updated schedule condition so that the last schedule time will change every time we run the reconcile loop
meta.SetStatusCondition(&placementNew.Status.Conditions, *placementOld.GetCondition(string(fleetv1alpha1.ResourcePlacementConditionTypeScheduled)))
// pick up the new version so that we can update placementNew without getting it again
placementNew.SetResourceVersion(placementOld.GetResourceVersion())

// schedule works for each cluster by placing them in the cluster scoped namespace
Expand Down Expand Up @@ -254,17 +261,17 @@ func (r *Reconciler) updatePlacementScheduledCondition(placement *fleetv1alpha1.
placementRef := klog.KObj(placement)
schedCond := placement.GetCondition(string(fleetv1alpha1.ResourcePlacementConditionTypeScheduled))
if scheduleErr == nil {
if schedCond == nil || schedCond.Status != metav1.ConditionTrue {
klog.V(2).InfoS("successfully scheduled all selected resources to their clusters", "placement", placementRef)
r.Recorder.Event(placement, corev1.EventTypeNormal, "ResourceScheduled", "successfully scheduled all selected resources to their clusters")
}
placement.SetConditions(metav1.Condition{
Status: metav1.ConditionTrue,
Type: string(fleetv1alpha1.ResourcePlacementConditionTypeScheduled),
Reason: "ScheduleSucceeded",
Message: "Successfully scheduled resources for placement",
ObservedGeneration: placement.Generation,
})
if schedCond == nil || schedCond.Status != metav1.ConditionTrue {
klog.V(2).InfoS("successfully scheduled all selected resources to their clusters", "placement", placementRef)
r.Recorder.Event(placement, corev1.EventTypeNormal, "ResourceScheduled", "successfully scheduled all selected resources to their clusters")
}
} else {
placement.SetConditions(metav1.Condition{
Status: metav1.ConditionFalse,
Expand Down Expand Up @@ -293,8 +300,9 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl
Message: "Successfully applied resources to member clusters",
ObservedGeneration: placement.Generation,
})
klog.V(2).InfoS("successfully applied all selected resources", "placement", placementRef)
if preAppliedCond == nil || preAppliedCond.Status != metav1.ConditionTrue {
klog.V(2).InfoS("successfully applied all selected resources", "placement", placementRef)
metrics.PlacementApplySucceedCount.WithLabelValues(placement.GetName()).Inc()
r.Recorder.Event(placement, corev1.EventTypeNormal, "ResourceApplied", "successfully applied all selected resources")
}
case errors.Is(applyErr, ErrStillPendingManifest):
Expand All @@ -305,8 +313,8 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl
Message: applyErr.Error(),
ObservedGeneration: placement.Generation,
})
klog.V(2).InfoS("Some selected resources are still waiting to be applied", "placement", placementRef)
if preAppliedCond == nil || preAppliedCond.Status == metav1.ConditionTrue {
klog.V(2).InfoS("Some selected resources are still waiting to be applied", "placement", placementRef)
r.Recorder.Event(placement, corev1.EventTypeWarning, "ResourceApplyPending", "Some applied resources are now waiting to be applied to the member cluster")
}
default:
Expand All @@ -318,8 +326,9 @@ func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.Cl
Message: applyErr.Error(),
ObservedGeneration: placement.Generation,
})
klog.V(2).InfoS("failed to apply some selected resources", "placement", placementRef)
if preAppliedCond == nil || preAppliedCond.Status != metav1.ConditionFalse {
klog.V(2).InfoS("failed to apply some selected resources", "placement", placementRef)
metrics.PlacementApplyFailedCount.WithLabelValues(placement.GetName()).Inc()
r.Recorder.Event(placement, corev1.EventTypeWarning, "ResourceApplyFailed", "failed to apply some selected resources")
}
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/controllers/clusterresourceplacement/work_propagation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import (
)

const (
LastUpdateAnnotationKey = "work.fleet.azure.com/last-update-time"
SpecHashAnnotationKey = "work.fleet.azure.com/spec-hash-value"
specHashAnnotationKey = "work.fleet.azure.com/spec-hash-value"
)

// scheduleWork creates or updates the work object to reflect the new placement decision.
Expand Down Expand Up @@ -63,8 +62,8 @@ func (r *Reconciler) scheduleWork(ctx context.Context, placement *fleetv1alpha1.
utils.LabelFleetObj: utils.LabelFleetObjValue,
}
workAnnotation := map[string]string{
LastUpdateAnnotationKey: time.Now().Format(time.RFC3339),
SpecHashAnnotationKey: specHash,
utils.LastWorkUpdateTimeAnnotationKey: time.Now().Format(time.RFC3339),
specHashAnnotationKey: specHash,
}
changed := false
for _, memberClusterName := range memberClusterNames {
Expand Down Expand Up @@ -98,7 +97,7 @@ func (r *Reconciler) scheduleWork(ctx context.Context, placement *fleetv1alpha1.
changed = true
continue
}
existingHash := curWork.GetAnnotations()[SpecHashAnnotationKey]
existingHash := curWork.GetAnnotations()[specHashAnnotationKey]
if existingHash == specHash || reflect.DeepEqual(curWork.Spec.Workload.Manifests, workerSpec.Workload.Manifests) {
klog.V(2).InfoS("skip updating work spec as its identical",
"member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests))
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/membercluster/membercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func markMemberClusterLeft(recorder record.EventRecorder, mc apis.ConditionedObj
if existingCondition == nil || existingCondition.Status != newCondition.Status {
recorder.Event(mc, corev1.EventTypeNormal, reasonMemberClusterJoined, "member cluster left")
klog.V(2).InfoS("memberCluster left", "memberCluster", klog.KObj(mc))
metrics.ReportJoinResultMetric()
metrics.ReportLeaveResultMetric()
}

mc.SetConditions(newCondition, notReadyCondition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package memberclusterplacement
import (
"context"
"fmt"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -34,20 +35,27 @@ type Reconciler struct {
}

func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
startTime := time.Now()
memberClusterName, ok := key.(string)
if !ok {
err := fmt.Errorf("got a resource key %+v not of type cluster wide key", key)
klog.ErrorS(err, "we have encountered a fatal error that can't be retried")
return ctrl.Result{}, err
}

// add latency log
defer func() {
klog.V(2).InfoS("MemberClusterPlacement reconciliation loop ends", "memberCluster", memberClusterName, "latency", time.Since(startTime).Milliseconds())
}()

klog.V(2).InfoS("Start to reconcile a member cluster to enqueue placement events", "memberCluster", memberClusterName)
mObj, err := r.InformerManager.Lister(utils.MemberClusterGVR).Get(memberClusterName)
if err != nil {
klog.ErrorS(err, "failed to get the member cluster", "memberCluster", memberClusterName)
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
mObj = nil //guard against unexpected informer lib behavior
}
crpList, err := r.InformerManager.Lister(utils.ClusterResourcePlacementGVR).List(labels.Everything())
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/controllers/resourcechange/resourcechange_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package resourcechange
import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -50,6 +51,7 @@ type Reconciler struct {
}

func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
startTime := time.Now()
clusterWideKey, ok := key.(keys.ClusterWideKey)
if !ok {
err := fmt.Errorf("got a resource key %+v not of type cluster wide key", key)
Expand All @@ -58,6 +60,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
}
klog.V(2).InfoS("Reconciling object", "obj", clusterWideKey)

// add latency log
defer func() {
klog.V(2).InfoS("ResourceChange reconciliation loop ends", "obj", clusterWideKey, "latency", time.Since(startTime).Milliseconds())
}()

// the clusterObj is set to be the object that the placement direct selects,
// in the case of a deleted namespace scoped object, the clusterObj is set to be its parent namespace object.
clusterObj, isClusterScoped, err := r.getUnstructuredObject(clusterWideKey)
Expand Down
21 changes: 19 additions & 2 deletions pkg/controllers/work/apply_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"

"go.goms.io/fleet/pkg/metrics"
"go.goms.io/fleet/pkg/utils"
)

const (
Expand Down Expand Up @@ -139,9 +141,24 @@ func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
BlockOwnerDeletion: pointer.Bool(false),
}

// Apply the manifests to the member cluster
// apply the manifests to the member cluster
results := r.applyManifests(ctx, work.Spec.Workload.Manifests, owner)

// collect the latency from the work update time to now.
lastUpdateTime, ok := work.GetAnnotations()[utils.LastWorkUpdateTimeAnnotationKey]
if ok {
workUpdateTime, parseErr := time.Parse(time.RFC3339, lastUpdateTime)
if parseErr != nil {
klog.ErrorS(parseErr, "failed to parse the last work update time", "work", logObjRef)
} else {
latency := time.Since(workUpdateTime)
metrics.WorkApplyTime.WithLabelValues(work.GetName()).Observe(latency.Seconds())
klog.V(2).InfoS("work is applied", "work", work.GetName(), "latency", latency.Milliseconds())
}
} else {
klog.V(2).InfoS("work has no last update time", "work", work.GetName())
}

// generate the work condition based on the manifest apply result
errs := r.generateWorkCondition(results, work)

Expand Down
14 changes: 14 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ var (
Name: "leave_result_counter",
Help: "Number of successful Leave operations",
}, []string{"result"})
WorkApplyTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "work_apply_time_seconds",
Help: "Length of time between when a work resource is created/updated to when it is applied on the member cluster",
Buckets: []float64{0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.7, 0.9, 1.0,
1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 7, 9, 10, 15, 20, 30, 60, 120},
}, []string{"name"})
PlacementApplyFailedCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "placement_apply_failed_counter",
Help: "Number of failed to apply cluster resource placement",
}, []string{"name"})
PlacementApplySucceedCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "placement_apply_succeed_counter",
Help: "Number of successfully applied cluster resource placement",
}, []string{"name"})
)

var (
Expand Down
5 changes: 4 additions & 1 deletion pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ const (
// This label aims to enable different work objects to be managed by different placement.
LabelWorkPlacementName = "work.fleet.azure.com/placement-name"

// MemberClusterFinalizer is used to make sure that we handle gc of all the member cluster resources on the hub cluster
// MemberClusterFinalizer is used to make sure that we handle gc of all the member cluster resources on the hub cluster.
MemberClusterFinalizer = "work.fleet.azure.com/membercluster-finalizer"

// LastWorkUpdateTimeAnnotationKey is used to mark the last update time on a work object.
LastWorkUpdateTimeAnnotationKey = "work.fleet.azure.com/last-update-time"
)

var (
Expand Down
15 changes: 15 additions & 0 deletions test/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,28 @@ kubectl --context=kind-hub-testing apply -f test/integration/manifests/placement
check the logs of the hub cluster controller
```shell
kubectl --context=kind-hub-testing -n fleet-system get pod

NAME READY STATUS RESTARTS AGE
hub-agent-8bb6d658-6jj7n 1/1 Running 0 11m

```

check the logs of the member cluster controller
```shell
kubectl --context=kind-member-testing -n fleet-system get pod
```

check the hub metrics
```shell
kubectl --context=kind-hub-testing -n fleet-system port-forward hub-agent-xxxx-xxx 13622:8080

Forwarding from 127.0.0.1:13622 -> 8080
Forwarding from [::1]:13622 -> 8080

curl http://127.0.0.1:13622/metrics
```


5.uninstall the resources
```shell
make uninstall-helm
Expand Down
3 changes: 1 addition & 2 deletions test/integration/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/controllers/clusterresourceplacement"
"go.goms.io/fleet/pkg/utils"
)

Expand Down Expand Up @@ -283,7 +282,7 @@ func verifyPartialWorkObjects(crp *fleetv1alpha1.ClusterResourcePlacement, expec
}
}
}
lastUpdateTime, err := time.Parse(time.RFC3339, clusterWork.GetAnnotations()[clusterresourceplacement.LastUpdateAnnotationKey])
lastUpdateTime, err := time.Parse(time.RFC3339, clusterWork.GetAnnotations()[utils.LastWorkUpdateTimeAnnotationKey])
Expect(err).Should(Succeed())
return lastUpdateTime
}
Expand Down

0 comments on commit bbc7b34

Please sign in to comment.