Skip to content

Commit

Permalink
feat: concurrency policy triggered counter metric
Browse files Browse the repository at this point in the history
Add a new metric called `cronworkflows_concurrencypolicy_triggered`
which counts the number of times a CronWorkflows `concurrencyPolicy`
has changed something.

The `concurency_policy` attribute can be:
* `Forbid`: Counts the number of times a workflow was not started
because one was already running
* `Replace`: Counts the number of times a workflow was terminated
because it was being replaced

This PR moves the call to increment `cronworkflows_triggered_total` so
that it doesn't count workflow runs suppressed by `Forbid`. This is
documented and the test updated to test it.

This PR also tidies some of the metrics documentation and language around
CronWorkflows.

Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel committed Sep 5, 2024
1 parent 7173a27 commit 148c990
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 16 deletions.
21 changes: 16 additions & 5 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,25 @@ Metrics for the [Four Golden Signals](https://sre.google/sre-book/monitoring-dis
Some metric attributes may have high cardinality and are marked with ⚠️ to warn you. You may need to disable this metric or disable the attribute.
<!-- titles should be the exact metric name for deep-linking, alphabetical ordered -->
<!-- titles are without argo_workflows prefix -->
#### `cronworkflows_concurrencypolicy_triggered`

A counter of the number of times a CronWorkflow has triggered it's `concurrencyPolicy` to limit the number of workflows running.

| attribute | explanation |
|-------------|-------------------------------------------|
| `name` | ⚠️ The name of the CronWorkflow |
| `namespace` | The namespace of the CronWorkflow |
| `concurrency_policy` | The concurrency policy which was triggered, will be either `Forbid` or `Replace` |

#### `cronworkflows_triggered_total`

A counter of the number of times a CronWorkflow has been
A counter of the number of times a CronWorkflow has been triggered.
Suppressed runs due to `concurrencyPolicy: Forbid` will not be counted.

| attribute | explanation |
|-------------|-------------------------------------------|
| `name` | ⚠️ The name of the CronWorkflow. |
| `namespace` | The namespace in which the pod is running |
| `name` | ⚠️ The name of the CronWorkflow |
| `namespace` | The namespace of the CronWorkflow |

#### `gauge`

Expand All @@ -229,8 +240,8 @@ A counter of certain errors incurred by the controller.
The currently tracked specific errors are

- `OperationPanic` - the controller `panic()` on a programming bug
- `CronWorkflowSubmissionError` - A cron workflow failed submission
- `CronWorkflowSpecError` - A cron workflow has an invalid specification
- `CronWorkflowSubmissionError` - A CronWorkflow failed submission
- `CronWorkflowSpecError` - A CronWorkflow has an invalid specification

#### `k8s_request_total`

Expand Down
1 change: 1 addition & 0 deletions docs/upgrading.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ These notes explain the differences in using the Prometheus `/metrics` endpoint

The following are new metrics:

* `cronworkflows_concurrencypolicy_triggered`
* `cronworkflows_triggered_total`
* `is_leader`
* `k8s_request_duration`
Expand Down
26 changes: 22 additions & 4 deletions test/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,37 @@ func (s *MetricsSuite) TestFailedMetric() {
})
}

func (s *MetricsSuite) TestCronTriggeredCounter() {
func (s *MetricsSuite) TestCronCountersForbid() {
s.Given().
CronWorkflow(`@testdata/cronworkflow-metrics.yaml`).
CronWorkflow(`@testdata/cronworkflow-metrics-forbid.yaml`).
When().
CreateCronWorkflow().
Wait(1 * time.Minute). // This pattern is used in cron_test.go too
Wait(2 * time.Minute). // This pattern is used in cron_test.go too
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
s.e(s.T()).GET("").
Expect().
Status(200).
Body().
Contains(`cronworkflows_triggered_total{name="test-cron-metric",namespace="argo"} 1`)
Contains(`cronworkflows_triggered_total{name="test-cron-metric-forbid",namespace="argo"} 1`). // 2nd run was Forbid
Contains(`cronworkflows_concurrencypolicy_triggered{concurrency_policy="Forbid",name="test-cron-metric-forbid",namespace="argo"} 1`)
})
}

func (s *MetricsSuite) TestCronCountersReplace() {
s.Given().
CronWorkflow(`@testdata/cronworkflow-metrics-replace.yaml`).
When().
CreateCronWorkflow().
Wait(2 * time.Minute). // This pattern is used in cron_test.go too
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
s.e(s.T()).GET("").
Expect().
Status(200).
Body().
Contains(`cronworkflows_triggered_total{name="test-cron-metric-replace",namespace="argo"} 2`).
Contains(`cronworkflows_concurrencypolicy_triggered{concurrency_policy="Replace",name="test-cron-metric-replace",namespace="argo"} 1`)
})
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: test-cron-metric
name: test-cron-metric-forbid
spec:
schedule: "* * * * *"
concurrencyPolicy: "Allow"
concurrencyPolicy: "Forbid"
startingDeadlineSeconds: 0
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
entrypoint: sleep
templates:
- name: whalesay
- name: sleep
container:
image: argoproj/argosay:v2
image: alpine:latest
command: [sh, -c, "sleep 120"]
20 changes: 20 additions & 0 deletions test/e2e/testdata/cronworkflow-metrics-replace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: test-cron-metric-replace
spec:
schedule: "* * * * *"
concurrencyPolicy: "Replace"
startingDeadlineSeconds: 0
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: sleep
templates:
- name: sleep
container:
image: alpine:latest
command: [sh, -c, "sleep 120"]
3 changes: 2 additions & 1 deletion util/telemetry/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ const (
AttribBuildGitTreeState string = `git_treestate`
AttribBuildGitTag string = `git_tag`

AttribCronWFName string = `name`
AttribCronWFName string = `name`
AttribConcurrencyPolicy string = `concurrency_policy`

AttribErrorCause string = "cause"

Expand Down
5 changes: 4 additions & 1 deletion workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (woc *cronWfOperationCtx) run(ctx context.Context, scheduledRuntime time.Ti
defer woc.persistUpdate(ctx)

woc.log.Infof("Running %s", woc.name)
woc.metrics.CronWfTrigger(ctx, woc.name, woc.cronWf.ObjectMeta.Namespace)

// If the cron workflow has a schedule that was just updated, update its annotation
if woc.cronWf.IsUsingNewSchedule() {
Expand Down Expand Up @@ -108,6 +107,8 @@ func (woc *cronWfOperationCtx) run(ctx context.Context, scheduledRuntime time.Ti
return
}

woc.metrics.CronWfTrigger(ctx, woc.name, woc.cronWf.ObjectMeta.Namespace)

wf := common.ConvertCronWorkflowToWorkflowWithProperties(woc.cronWf, getChildWorkflowName(woc.cronWf.Name, scheduledRuntime), scheduledRuntime)

runWf, err := util.SubmitWorkflow(ctx, woc.wfClient, woc.wfClientset, woc.cronWf.Namespace, wf, &v1alpha1.SubmitOpts{})
Expand Down Expand Up @@ -196,11 +197,13 @@ func (woc *cronWfOperationCtx) enforceRuntimePolicy(ctx context.Context) (bool,
// Do nothing
case v1alpha1.ForbidConcurrent:
if len(woc.cronWf.Status.Active) > 0 {
woc.metrics.CronWfPolicy(ctx, woc.name, woc.cronWf.ObjectMeta.Namespace, v1alpha1.ForbidConcurrent)
woc.log.Infof("%s has 'ConcurrencyPolicy: Forbid' and has an active Workflow so it was not run", woc.name)
return false, nil
}
case v1alpha1.ReplaceConcurrent:
if len(woc.cronWf.Status.Active) > 0 {
woc.metrics.CronWfPolicy(ctx, woc.name, woc.cronWf.ObjectMeta.Namespace, v1alpha1.ReplaceConcurrent)
woc.log.Infof("%s has 'ConcurrencyPolicy: Replace' and has active Workflows", woc.name)
err := woc.terminateOutstandingWorkflows(ctx)
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions workflow/metrics/counter_cronworkflow_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package metrics

import (
"context"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/telemetry"
)

const (
nameCronPolicy = `cronworkflows_concurrencypolicy_triggered`
)

func addCronWfPolicyCounter(_ context.Context, m *Metrics) error {
return m.CreateInstrument(telemetry.Int64Counter,
nameCronPolicy,
"Total number of times CronWorkflow concurrencyPolicy has triggered",
"{cronworkflow}",
telemetry.WithAsBuiltIn(),
)
}

func (m *Metrics) CronWfPolicy(ctx context.Context, name, namespace string, policy wfv1.ConcurrencyPolicy) {
m.AddInt(ctx, nameCronPolicy, 1, telemetry.InstAttribs{
{Name: telemetry.AttribCronWFName, Value: name},
{Name: telemetry.AttribWorkflowNamespace, Value: namespace},
{Name: telemetry.AttribConcurrencyPolicy, Value: string(policy)},
})
}
1 change: 1 addition & 0 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func New(ctx context.Context, serviceName, prometheusName string, config *teleme
addPodPendingCounter,
addWorkflowPhaseGauge,
addCronWfTriggerCounter,
addCronWfPolicyCounter,
addWorkflowPhaseCounter,
addWorkflowTemplateCounter,
addWorkflowTemplateHistogram,
Expand Down

0 comments on commit 148c990

Please sign in to comment.