Skip to content
This repository has been archived by the owner on Jan 21, 2025. It is now read-only.

Commit

Permalink
fix: metrics golang
Browse files Browse the repository at this point in the history
  • Loading branch information
kafkaphoenix committed Feb 19, 2024
1 parent fd501a6 commit ffef6bc
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 53 deletions.
3 changes: 1 addition & 2 deletions go-sdk/runner/exit/exit_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ type Runner struct {
preprocessor Preprocessor
postprocessor Postprocessor
finalizer common.Finalizer
elapsedTimeMetric metric.Int64Histogram
numberOfMessagesMetric metric.Int64Counter
messagesMetric metric.Int64Histogram
}

func NewExitRunner(logger logr.Logger, ns *nats.Conn, js nats.JetStreamContext) *Runner {
Expand Down
19 changes: 4 additions & 15 deletions go-sdk/runner/exit/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,16 @@ func (er *Runner) startSubscriber() {

var err error

er.elapsedTimeMetric, err = er.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-time",
metric.WithDescription("How long it takes to process a message."),
er.messagesMetric, err = er.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-metric",
metric.WithDescription("How long it takes to process a message and times called."),
metric.WithUnit("ms"),
)
if err != nil {
er.getLoggerWithName().Error(err, "Error initializing metric")
os.Exit(1)
}

er.numberOfMessagesMetric, err = er.sdk.Measurements.GetMetricsClient().Int64Counter(
"runner-processed-messages",
metric.WithDescription("Number of messages processed."),
)
if err != nil {
er.getLoggerWithName().Error(err, "Error initializing metric")
os.Exit(1)
}

subscriptions := make([]*nats.Subscription, 0, len(inputSubjects))

for _, subject := range inputSubjects {
Expand Down Expand Up @@ -125,11 +116,9 @@ func (er *Runner) processMessage(msg *nats.Msg) {
executionTime := time.Since(start).Milliseconds()
er.sdk.Logger.V(1).Info(fmt.Sprintf("%s execution time: %d ms", er.sdk.Metadata.GetProcess(), executionTime))

er.elapsedTimeMetric.Record(context.Background(), executionTime,
er.messagesMetric.Record(context.Background(), executionTime,
metric.WithAttributeSet(er.getMetricAttributes(requestMsg.RequestId)),
)

er.numberOfMessagesMetric.Add(context.Background(), 1, metric.WithAttributeSet(er.getMetricAttributes(requestMsg.RequestId)))
}()

er.getLoggerWithName().Info(fmt.Sprintf("New message received with subject %s",
Expand Down
20 changes: 4 additions & 16 deletions go-sdk/runner/task/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,10 @@ func (tr *Runner) startSubscriber() {

var err error

tr.elapsedTimeMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-time",
metric.WithDescription("How long it takes to process a message."),
tr.messagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-metric",
metric.WithDescription("How long it takes to process a message and times called."),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(250, 500, 1000),
)
if err != nil {
tr.getLoggerWithName().Error(err, "Error initializing metric")
os.Exit(1)
}

tr.numberOfMessagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Counter(
"runner-processed-messages",
metric.WithDescription("Number of messages processed."),
)
if err != nil {
tr.getLoggerWithName().Error(err, "Error initializing metric")
Expand Down Expand Up @@ -124,11 +114,9 @@ func (tr *Runner) processMessage(msg *nats.Msg) {
executionTime := time.Since(start).Milliseconds()
tr.sdk.Logger.V(1).Info(fmt.Sprintf("%s execution time: %d ms", tr.sdk.Metadata.GetProcess(), executionTime))

tr.elapsedTimeMetric.Record(context.Background(), executionTime,
tr.messagesMetric.Record(context.Background(), executionTime,
metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId)),
)

tr.numberOfMessagesMetric.Add(context.Background(), 1, metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId)))
}()

tr.getLoggerWithName().Info(fmt.Sprintf("New message received with subject %s",
Expand Down
3 changes: 1 addition & 2 deletions go-sdk/runner/task/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ type Runner struct {
preprocessor Preprocessor
postprocessor Postprocessor
finalizer common.Finalizer
elapsedTimeMetric metric.Int64Histogram
numberOfMessagesMetric metric.Int64Counter
messagesMetric metric.Int64Histogram
}

func NewTaskRunner(logger logr.Logger, ns *nats.Conn, js nats.JetStreamContext) *Runner {
Expand Down
20 changes: 4 additions & 16 deletions go-sdk/runner/trigger/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,10 @@ func (tr *Runner) startSubscriber() {

var err error

tr.elapsedTimeMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-time",
metric.WithDescription("How long it takes to process a message."),
tr.messagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-metric",
metric.WithDescription("How long it takes to process a message and times called."),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(250, 500, 1000),
)
if err != nil {
tr.getLoggerWithName().Error(err, "Error initializing metric")
os.Exit(1)
}

tr.numberOfMessagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Counter(
"runner-processed-messages",
metric.WithDescription("Number of messages processed."),
)
if err != nil {
tr.getLoggerWithName().Error(err, "Error initializing metric")
Expand Down Expand Up @@ -126,11 +116,9 @@ func (tr *Runner) processMessage(msg *nats.Msg) {
executionTime := time.Since(start).Milliseconds()
tr.sdk.Logger.V(1).Info(fmt.Sprintf("%s execution time: %d ms", tr.sdk.Metadata.GetProcess(), executionTime))

tr.elapsedTimeMetric.Record(context.Background(), executionTime,
tr.messagesMetric.Record(context.Background(), executionTime,
metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId)),
)

tr.numberOfMessagesMetric.Add(context.Background(), 1, metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId)))
}()

tr.getLoggerWithName().Info(fmt.Sprintf("New message received with subject %s",
Expand Down
3 changes: 1 addition & 2 deletions go-sdk/runner/trigger/trigger_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ type Runner struct {
initializer common.Initializer
runner RunnerFunc
finalizer common.Finalizer
elapsedTimeMetric metric.Int64Histogram
numberOfMessagesMetric metric.Int64Counter
messagesMetric metric.Int64Histogram
}

var wg sync.WaitGroup //nolint:gochecknoglobals // WaitGroup is used to wait for goroutines to finish
Expand Down

0 comments on commit ffef6bc

Please sign in to comment.