From 1554b2a69ff74f0f4ac0815ab4afa44ad8e6bf40 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Thu, 14 Nov 2024 13:23:48 -0400 Subject: [PATCH 1/6] Adds compute metrics --- core/capabilities/compute/compute.go | 20 ++++++++++++-- core/capabilities/compute/monitoring.go | 36 +++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/core/capabilities/compute/compute.go b/core/capabilities/compute/compute.go index 78a4cc1e033..95a9828b04b 100644 --- a/core/capabilities/compute/compute.go +++ b/core/capabilities/compute/compute.go @@ -20,6 +20,7 @@ import ( capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" "github.com/smartcontractkit/chainlink-common/pkg/services" coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" @@ -75,8 +76,9 @@ var ( var _ capabilities.ActionCapability = (*Compute)(nil) type Compute struct { - stopCh services.StopChan - log logger.Logger + stopCh services.StopChan + log logger.Logger + metrics computeMetricsLabeler // emitter is used to emit messages from the WASM module to a configured collector. emitter custmsg.MessageEmitter @@ -245,6 +247,11 @@ func (c *Compute) Info(ctx context.Context) (capabilities.CapabilityInfo, error) func (c *Compute) Start(ctx context.Context) error { c.modules.start() + err := initMonitoringResources() + if err != nil { + return fmt.Errorf("failed to initialize monitoring resources: %w", err) + } + c.wg.Add(c.numWorkers) for i := 0; i < c.numWorkers; i++ { go func() { @@ -329,6 +336,14 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err) } + c.metrics.with( + "status", fmt.Sprintf("%d", response.StatusCode), + platform.KeyWorkflowID, req.Metadata.WorkflowId, + platform.KeyWorkflowName, req.Metadata.WorkflowName, + platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner, + platform.KeyWorkflowExecutionID, req.Metadata.WorkflowExecutionId, + ).incrementHTTPRequestCounter(ctx) + // Only log if the response is not in the 200 range if response.StatusCode < http.StatusOK || response.StatusCode >= http.StatusMultipleChoices { msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode) @@ -369,6 +384,7 @@ func NewAction( stopCh: make(services.StopChan), log: lggr, emitter: labeler, + metrics: computeMetricsLabeler{metrics.NewLabeler().With("capability", CapabilityIDCompute)}, registry: registry, modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3), transformer: NewTransformer(lggr, labeler), diff --git a/core/capabilities/compute/monitoring.go b/core/capabilities/compute/monitoring.go index 4b676c25f7d..b2e0c110a8a 100644 --- a/core/capabilities/compute/monitoring.go +++ b/core/capabilities/compute/monitoring.go @@ -1,3 +1,39 @@ package compute +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" + + localMonitoring "github.com/smartcontractkit/chainlink/v2/core/monitoring" +) + const timestampKey = "computeTimestamp" + +var computeHTTPRequestCounter metric.Int64Counter + +func initMonitoringResources() (err error) { + computeHTTPRequestCounter, err = beholder.GetMeter().Int64Counter("capabilities_compute_http_request_count") + if err != nil { + return fmt.Errorf("failed to register compute http request counter: %w", err) + } + + return nil +} + +type computeMetricsLabeler struct { + metrics.Labeler +} + +func (c computeMetricsLabeler) with(keyValues ...string) computeMetricsLabeler { + return computeMetricsLabeler{c.With(keyValues...)} +} + +func (c computeMetricsLabeler) incrementHTTPRequestCounter(ctx context.Context) { + otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) + computeHTTPRequestCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} From 8c7de513bb50b8ee428c0945d96a9560e39bb4d1 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Thu, 14 Nov 2024 13:35:19 -0400 Subject: [PATCH 2/6] Fixes lint --- core/capabilities/compute/compute.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/capabilities/compute/compute.go b/core/capabilities/compute/compute.go index 95a9828b04b..ad046eaca06 100644 --- a/core/capabilities/compute/compute.go +++ b/core/capabilities/compute/compute.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net/http" + "strconv" "strings" "sync" "time" @@ -337,7 +338,7 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq } c.metrics.with( - "status", fmt.Sprintf("%d", response.StatusCode), + "status", strconv.FormatUint(uint64(response.StatusCode), 10), platform.KeyWorkflowID, req.Metadata.WorkflowId, platform.KeyWorkflowName, req.Metadata.WorkflowName, platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner, From 5595d0186b52fb8ca6d99aff2633237cd367bb40 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Thu, 14 Nov 2024 13:37:15 -0400 Subject: [PATCH 3/6] Removes execution ID from metrics labels --- core/capabilities/compute/compute.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/capabilities/compute/compute.go b/core/capabilities/compute/compute.go index ad046eaca06..3d3a734bec9 100644 --- a/core/capabilities/compute/compute.go +++ b/core/capabilities/compute/compute.go @@ -342,7 +342,6 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq platform.KeyWorkflowID, req.Metadata.WorkflowId, platform.KeyWorkflowName, req.Metadata.WorkflowName, platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner, - platform.KeyWorkflowExecutionID, req.Metadata.WorkflowExecutionId, ).incrementHTTPRequestCounter(ctx) // Only log if the response is not in the 200 range From 331bcedcf4f5bceded7871e6102b62045ad3ae5d Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Thu, 14 Nov 2024 14:18:52 -0400 Subject: [PATCH 4/6] Fixes CI --- core/capabilities/compute/compute.go | 17 ++++++++--------- core/capabilities/compute/monitoring.go | 25 ++++++++++++------------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/core/capabilities/compute/compute.go b/core/capabilities/compute/compute.go index 3d3a734bec9..89675c0e7cc 100644 --- a/core/capabilities/compute/compute.go +++ b/core/capabilities/compute/compute.go @@ -79,7 +79,7 @@ var _ capabilities.ActionCapability = (*Compute)(nil) type Compute struct { stopCh services.StopChan log logger.Logger - metrics computeMetricsLabeler + metrics *computeMetricsLabeler // emitter is used to emit messages from the WASM module to a configured collector. emitter custmsg.MessageEmitter @@ -248,11 +248,6 @@ func (c *Compute) Info(ctx context.Context) (capabilities.CapabilityInfo, error) func (c *Compute) Start(ctx context.Context) error { c.modules.start() - err := initMonitoringResources() - if err != nil { - return fmt.Errorf("failed to initialize monitoring resources: %w", err) - } - c.wg.Add(c.numWorkers) for i := 0; i < c.numWorkers; i++ { go func() { @@ -373,10 +368,14 @@ func NewAction( handler *webapi.OutgoingConnectorHandler, idGenerator func() string, opts ...func(*Compute), -) *Compute { +) (*Compute, error) { if config.NumWorkers == 0 { config.NumWorkers = defaultNumWorkers } + metricsLabeler, err := newComputeMetricsLabeler(metrics.NewLabeler().With("capability", CapabilityIDCompute)) + if err != nil { + return nil, fmt.Errorf("failed to create compute metrics labeler: %w", err) + } var ( lggr = logger.Named(log, "CustomCompute") labeler = custmsg.NewLabeler() @@ -384,7 +383,7 @@ func NewAction( stopCh: make(services.StopChan), log: lggr, emitter: labeler, - metrics: computeMetricsLabeler{metrics.NewLabeler().With("capability", CapabilityIDCompute)}, + metrics: metricsLabeler, registry: registry, modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3), transformer: NewTransformer(lggr, labeler), @@ -399,5 +398,5 @@ func NewAction( opt(compute) } - return compute + return compute, nil } diff --git a/core/capabilities/compute/monitoring.go b/core/capabilities/compute/monitoring.go index b2e0c110a8a..71354c014a7 100644 --- a/core/capabilities/compute/monitoring.go +++ b/core/capabilities/compute/monitoring.go @@ -14,26 +14,25 @@ import ( const timestampKey = "computeTimestamp" -var computeHTTPRequestCounter metric.Int64Counter +type computeMetricsLabeler struct { + metrics.Labeler + computeHTTPRequestCounter metric.Int64Counter +} -func initMonitoringResources() (err error) { - computeHTTPRequestCounter, err = beholder.GetMeter().Int64Counter("capabilities_compute_http_request_count") +func newComputeMetricsLabeler(l metrics.Labeler) (*computeMetricsLabeler, error) { + computeHTTPRequestCounter, err := beholder.GetMeter().Int64Counter("capabilities_compute_http_request_count") if err != nil { - return fmt.Errorf("failed to register compute http request counter: %w", err) + return nil, fmt.Errorf("failed to register compute http request counter: %w", err) } - return nil -} - -type computeMetricsLabeler struct { - metrics.Labeler + return &computeMetricsLabeler{Labeler: l, computeHTTPRequestCounter: computeHTTPRequestCounter}, nil } -func (c computeMetricsLabeler) with(keyValues ...string) computeMetricsLabeler { - return computeMetricsLabeler{c.With(keyValues...)} +func (c *computeMetricsLabeler) with(keyValues ...string) *computeMetricsLabeler { + return &computeMetricsLabeler{c.With(keyValues...), c.computeHTTPRequestCounter} } -func (c computeMetricsLabeler) incrementHTTPRequestCounter(ctx context.Context) { +func (c *computeMetricsLabeler) incrementHTTPRequestCounter(ctx context.Context) { otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - computeHTTPRequestCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + c.computeHTTPRequestCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } From 1c30ee5b6f4e8e9fcd305db64373534cf014bed7 Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Thu, 14 Nov 2024 14:34:05 -0400 Subject: [PATCH 5/6] Fixes CI --- core/capabilities/compute/compute_test.go | 4 +++- core/services/workflows/engine_test.go | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/capabilities/compute/compute_test.go b/core/capabilities/compute/compute_test.go index 719bff82edf..ef622c21787 100644 --- a/core/capabilities/compute/compute_test.go +++ b/core/capabilities/compute/compute_test.go @@ -18,6 +18,7 @@ import ( cappkg "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-common/pkg/values" + corecapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" @@ -60,7 +61,8 @@ func setup(t *testing.T, config Config) testHarness { connectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, config.ServiceConfig, ghcapabilities.MethodComputeAction, log) require.NoError(t, err) - compute := NewAction(config, log, registry, connectorHandler, idGeneratorFn) + compute, err := NewAction(config, log, registry, connectorHandler, idGeneratorFn) + require.NoError(t, err) compute.modules.clock = clockwork.NewFakeClock() return testHarness{ diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index e6667fe0bc6..3a2bc17bc36 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -1448,7 +1448,8 @@ func TestEngine_WithCustomComputeStep(t *testing.T) { require.NoError(t, err) idGeneratorFn := func() string { return "validRequestID" } - compute := compute.NewAction(cfg, log, reg, handler, idGeneratorFn) + compute, err := compute.NewAction(cfg, log, reg, handler, idGeneratorFn) + require.NoError(t, err) require.NoError(t, compute.Start(ctx)) defer compute.Close() @@ -1513,7 +1514,8 @@ func TestEngine_CustomComputePropagatesBreaks(t *testing.T) { require.NoError(t, err) idGeneratorFn := func() string { return "validRequestID" } - compute := compute.NewAction(cfg, log, reg, handler, idGeneratorFn) + compute, err := compute.NewAction(cfg, log, reg, handler, idGeneratorFn) + require.NoError(t, err) require.NoError(t, compute.Start(ctx)) defer compute.Close() From ec81c232958ae6b2ab65b510f14cb8f66b9a534d Mon Sep 17 00:00:00 2001 From: vyzaldysanchez Date: Thu, 14 Nov 2024 14:39:33 -0400 Subject: [PATCH 6/6] Fixes CI --- core/services/standardcapabilities/delegate.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/services/standardcapabilities/delegate.go b/core/services/standardcapabilities/delegate.go index a92e082dead..08972c95152 100644 --- a/core/services/standardcapabilities/delegate.go +++ b/core/services/standardcapabilities/delegate.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/compute" gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector" "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" @@ -253,7 +254,10 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser return uuid.New().String() } - computeSrvc := compute.NewAction(cfg, log, d.registry, handler, idGeneratorFn) + computeSrvc, err := compute.NewAction(cfg, log, d.registry, handler, idGeneratorFn) + if err != nil { + return nil, err + } return []job.ServiceCtx{computeSrvc}, nil }