Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: OpenTelemetry metrics #13232

Closed
wants to merge 15 commits into from
4 changes: 4 additions & 0 deletions .spelling
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ CRDs
CloudSQL
ClusterRoleBinding
ClusterRoles
ClusterWorkflowTemplate
Codespaces
ConfigMap
ConfigMaps
Expand Down Expand Up @@ -92,6 +93,7 @@ OAuth
OAuth2
Okta
OpenAPI
OpenTelemetry
PDBs
PProf
PVCs
Expand Down Expand Up @@ -203,6 +205,7 @@ sandboxed
shortcodes
stateful
stderr
temporality
triaged
un-reconciled
v1
Expand Down Expand Up @@ -247,4 +250,5 @@ webHDFS
webhook
webhooks
workflow-controller-configmap
workqueue
yaml
18 changes: 9 additions & 9 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,17 @@ func NewRootCommand() *cobra.Command {
if err != nil {
return err
}
// start a controller on instances of our custom resource
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

version := argo.GetVersion()
config = restclient.AddUserAgent(config, fmt.Sprintf("argo-workflows/%s argo-controller", version.Version))
config.Burst = burst
config.QPS = qps

logs.AddK8SLogTransportWrapper(config)
metrics.AddMetricsTransportWrapper(config)
metrics.AddMetricsTransportWrapper(ctx, config)

namespace, _, err := clientConfig.Namespace()
if err != nil {
Expand All @@ -106,10 +110,6 @@ func NewRootCommand() *cobra.Command {
managedNamespace = namespace
}

// start a controller on instances of our custom resource
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wfController, err := controller.NewWorkflowController(ctx, config, kubeclientset, wfclientset, namespace, managedNamespace, executorImage, executorImagePullPolicy, logFormat, configMap, executorPlugins)
errors.CheckError(err)

Expand All @@ -118,7 +118,7 @@ func NewRootCommand() *cobra.Command {
log.Info("Leader election is turned off. Running in single-instance mode")
log.WithField("id", "single-instance").Info("starting leading")
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers)
go wfController.RunMetricsServer(ctx, false)
go wfController.RunPrometheusServer(ctx, false)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
if !ok {
Expand All @@ -133,7 +133,7 @@ func NewRootCommand() *cobra.Command {
// for controlling the dummy metrics server
dummyCtx, dummyCancel := context.WithCancel(context.Background())
defer dummyCancel()
go wfController.RunMetricsServer(dummyCtx, true)
go wfController.RunPrometheusServer(dummyCtx, true)

go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
Expand All @@ -148,12 +148,12 @@ func NewRootCommand() *cobra.Command {
OnStartedLeading: func(ctx context.Context) {
dummyCancel()
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers)
go wfController.RunMetricsServer(ctx, false)
go wfController.RunPrometheusServer(ctx, false)
},
OnStoppedLeading: func() {
log.WithField("id", nodeID).Info("stopped leading")
cancel()
go wfController.RunMetricsServer(dummyCtx, true)
go wfController.RunPrometheusServer(dummyCtx, true)
},
OnNewLeader: func(identity string) {
log.WithField("leader", identity).Info("new leader")
Expand Down
26 changes: 25 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,24 @@ type MySQLConfig struct {
Options map[string]string `json:"options,omitempty"`
}

// MetricModifier are modifiers for an individual named metric to change their behaviour
type MetricModifier struct {
// Disabled disables the emission of this metric completely
Disabled bool `json:"disabled,omitempty"`
// DisabledAttributes lists labels for this metric to remove that attributes to save on cardinality
DisabledAttributes []string `json:"disabledAttributes"`
// HistogramBuckets allow configuring of the buckets used in a histogram
// Has no effect on non-histogram buckets
HistogramBuckets []float64 `json:"histogramBuckets,omitempty"`
}

type MetricsTemporality string

const (
MetricsTemporalityCumulative MetricsTemporality = "Cumulative"
MetricsTemporalityDelta MetricsTemporality = "Delta"
)

// MetricsConfig defines a config for a metrics server
type MetricsConfig struct {
// Enabled controls metric emission. Default is true, set "enabled: false" to turn off
Expand All @@ -260,8 +278,14 @@ type MetricsConfig struct {
Port int `json:"port,omitempty"`
// IgnoreErrors is a flag that instructs prometheus to ignore metric emission errors
IgnoreErrors bool `json:"ignoreErrors,omitempty"`
// Secure is a flag that starts the metrics servers using TLS
// Secure is a flag that starts the metrics servers using TLS, defaults to true
Secure *bool `json:"secure,omitempty"`
// Modifiers configure metrics by name
Modifiers map[string]MetricModifier `json:"modifiers,omitempty"`
// Temporality configures the temporality of the opentelemetry metrics.
// Valid values are Cumulative and Delta, defaulting to cumulative.
// This has no effect on prometheus metrics, which are always cumulative
Temporality MetricsTemporality `json:"temporality,omitempty"`
}

func (mc MetricsConfig) GetSecure(defaultValue bool) bool {
Expand Down
Loading
Loading