Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compute metrics #15246

Merged
merged 7 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -20,6 +21,7 @@ import (
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/metrics"
"github.com/smartcontractkit/chainlink-common/pkg/services"
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
Expand Down Expand Up @@ -75,8 +77,9 @@ var (
var _ capabilities.ActionCapability = (*Compute)(nil)

type Compute struct {
stopCh services.StopChan
log logger.Logger
stopCh services.StopChan
log logger.Logger
metrics computeMetricsLabeler

// emitter is used to emit messages from the WASM module to a configured collector.
emitter custmsg.MessageEmitter
Expand Down Expand Up @@ -245,6 +248,11 @@ func (c *Compute) Info(ctx context.Context) (capabilities.CapabilityInfo, error)
func (c *Compute) Start(ctx context.Context) error {
c.modules.start()

err := initMonitoringResources()
if err != nil {
return fmt.Errorf("failed to initialize monitoring resources: %w", err)
}

c.wg.Add(c.numWorkers)
for i := 0; i < c.numWorkers; i++ {
go func() {
Expand Down Expand Up @@ -329,6 +337,14 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err)
}

c.metrics.with(
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
"status", strconv.FormatUint(uint64(response.StatusCode), 10),
platform.KeyWorkflowID, req.Metadata.WorkflowId,
platform.KeyWorkflowName, req.Metadata.WorkflowName,
platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner,
platform.KeyWorkflowExecutionID, req.Metadata.WorkflowExecutionId,
).incrementHTTPRequestCounter(ctx)

// Only log if the response is not in the 200 range
if response.StatusCode < http.StatusOK || response.StatusCode >= http.StatusMultipleChoices {
msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode)
Expand Down Expand Up @@ -369,6 +385,7 @@ func NewAction(
stopCh: make(services.StopChan),
log: lggr,
emitter: labeler,
metrics: computeMetricsLabeler{metrics.NewLabeler().With("capability", CapabilityIDCompute)},
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(lggr, labeler),
Expand Down
36 changes: 36 additions & 0 deletions core/capabilities/compute/monitoring.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,39 @@
package compute

import (
"context"
"fmt"

"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/metrics"

localMonitoring "github.com/smartcontractkit/chainlink/v2/core/monitoring"
)

const timestampKey = "computeTimestamp"

var computeHTTPRequestCounter metric.Int64Counter

func initMonitoringResources() (err error) {
computeHTTPRequestCounter, err = beholder.GetMeter().Int64Counter("capabilities_compute_http_request_count")
if err != nil {
return fmt.Errorf("failed to register compute http request counter: %w", err)
}

return nil
}

type computeMetricsLabeler struct {
metrics.Labeler
}

func (c computeMetricsLabeler) with(keyValues ...string) computeMetricsLabeler {
return computeMetricsLabeler{c.With(keyValues...)}
}

func (c computeMetricsLabeler) incrementHTTPRequestCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
computeHTTPRequestCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}
Loading