Skip to content
This repository has been archived by the owner on Jan 21, 2025. It is now read-only.

Commit

Permalink
Merge pull request #231 from konstellation-io/fix/update-metrics
Browse files Browse the repository at this point in the history
Fix/update metrics
  • Loading branch information
kafkaphoenix authored Feb 19, 2024
2 parents cd0f596 + ccb31e0 commit c67de17
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 199 deletions.
19 changes: 9 additions & 10 deletions go-sdk/runner/exit/exit_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ type Handler common.Handler
type Postprocessor common.Handler

type Runner struct {
sdk sdk.KaiSDK
nats *nats.Conn
jetstream nats.JetStreamContext
responseHandlers map[string]Handler
initializer common.Initializer
preprocessor Preprocessor
postprocessor Postprocessor
finalizer common.Finalizer
elapsedTimeMetric metric.Int64Histogram
numberOfMessagesMetric metric.Int64Counter
sdk sdk.KaiSDK
nats *nats.Conn
jetstream nats.JetStreamContext
responseHandlers map[string]Handler
initializer common.Initializer
preprocessor Preprocessor
postprocessor Postprocessor
finalizer common.Finalizer
messagesMetric metric.Int64Histogram
}

func NewExitRunner(logger logr.Logger, ns *nats.Conn, js nats.JetStreamContext) *Runner {
Expand Down
19 changes: 4 additions & 15 deletions go-sdk/runner/exit/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,16 @@ func (er *Runner) startSubscriber() {

var err error

er.elapsedTimeMetric, err = er.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-time",
metric.WithDescription("How long it takes to process a message."),
er.messagesMetric, err = er.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-metric",
metric.WithDescription("How long it takes to process a message and times called."),
metric.WithUnit("ms"),
)
if err != nil {
er.getLoggerWithName().Error(err, "Error initializing metric")
os.Exit(1)
}

er.numberOfMessagesMetric, err = er.sdk.Measurements.GetMetricsClient().Int64Counter(
"runner-processed-messages",
metric.WithDescription("Number of messages processed."),
)
if err != nil {
er.getLoggerWithName().Error(err, "Error initializing metric")
os.Exit(1)
}

subscriptions := make([]*nats.Subscription, 0, len(inputSubjects))

for _, subject := range inputSubjects {
Expand Down Expand Up @@ -125,11 +116,9 @@ func (er *Runner) processMessage(msg *nats.Msg) {
executionTime := time.Since(start).Milliseconds()
er.sdk.Logger.V(1).Info(fmt.Sprintf("%s execution time: %d ms", er.sdk.Metadata.GetProcess(), executionTime))

er.elapsedTimeMetric.Record(context.Background(), executionTime,
er.messagesMetric.Record(context.Background(), executionTime,
metric.WithAttributeSet(er.getMetricAttributes(requestMsg.RequestId)),
)

er.numberOfMessagesMetric.Add(context.Background(), 1, metric.WithAttributeSet(er.getMetricAttributes(requestMsg.RequestId)))
}()

er.getLoggerWithName().Info(fmt.Sprintf("New message received with subject %s",
Expand Down
20 changes: 4 additions & 16 deletions go-sdk/runner/task/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,10 @@ func (tr *Runner) startSubscriber() {

var err error

tr.elapsedTimeMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-time",
metric.WithDescription("How long it takes to process a message."),
tr.messagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-metric",
metric.WithDescription("How long it takes to process a message and times called."),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(250, 500, 1000),
)
if err != nil {
tr.getLoggerWithName().Error(err, "Error initializing metric")
os.Exit(1)
}

tr.numberOfMessagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Counter(
"runner-processed-messages",
metric.WithDescription("Number of messages processed."),
)
if err != nil {
tr.getLoggerWithName().Error(err, "Error initializing metric")
Expand Down Expand Up @@ -124,11 +114,9 @@ func (tr *Runner) processMessage(msg *nats.Msg) {
executionTime := time.Since(start).Milliseconds()
tr.sdk.Logger.V(1).Info(fmt.Sprintf("%s execution time: %d ms", tr.sdk.Metadata.GetProcess(), executionTime))

tr.elapsedTimeMetric.Record(context.Background(), executionTime,
tr.messagesMetric.Record(context.Background(), executionTime,
metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId)),
)

tr.numberOfMessagesMetric.Add(context.Background(), 1, metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId)))
}()

tr.getLoggerWithName().Info(fmt.Sprintf("New message received with subject %s",
Expand Down
19 changes: 9 additions & 10 deletions go-sdk/runner/task/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ type Handler common.Handler
type Postprocessor common.Handler

type Runner struct {
sdk sdk.KaiSDK
nats *nats.Conn
jetstream nats.JetStreamContext
responseHandlers map[string]Handler
initializer common.Initializer
preprocessor Preprocessor
postprocessor Postprocessor
finalizer common.Finalizer
elapsedTimeMetric metric.Int64Histogram
numberOfMessagesMetric metric.Int64Counter
sdk sdk.KaiSDK
nats *nats.Conn
jetstream nats.JetStreamContext
responseHandlers map[string]Handler
initializer common.Initializer
preprocessor Preprocessor
postprocessor Postprocessor
finalizer common.Finalizer
messagesMetric metric.Int64Histogram
}

func NewTaskRunner(logger logr.Logger, ns *nats.Conn, js nats.JetStreamContext) *Runner {
Expand Down
20 changes: 4 additions & 16 deletions go-sdk/runner/trigger/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,10 @@ func (tr *Runner) startSubscriber() {

var err error

tr.elapsedTimeMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-time",
metric.WithDescription("How long it takes to process a message."),
tr.messagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram(
"runner-process-message-metric",
metric.WithDescription("How long it takes to process a message and times called."),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(250, 500, 1000),
)
if err != nil {
tr.getLoggerWithName().Error(err, "Error initializing metric")
os.Exit(1)
}

tr.numberOfMessagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Counter(
"runner-processed-messages",
metric.WithDescription("Number of messages processed."),
)
if err != nil {
tr.getLoggerWithName().Error(err, "Error initializing metric")
Expand Down Expand Up @@ -126,11 +116,9 @@ func (tr *Runner) processMessage(msg *nats.Msg) {
executionTime := time.Since(start).Milliseconds()
tr.sdk.Logger.V(1).Info(fmt.Sprintf("%s execution time: %d ms", tr.sdk.Metadata.GetProcess(), executionTime))

tr.elapsedTimeMetric.Record(context.Background(), executionTime,
tr.messagesMetric.Record(context.Background(), executionTime,
metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId)),
)

tr.numberOfMessagesMetric.Add(context.Background(), 1, metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId)))
}()

tr.getLoggerWithName().Info(fmt.Sprintf("New message received with subject %s",
Expand Down
19 changes: 9 additions & 10 deletions go-sdk/runner/trigger/trigger_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ type RunnerFunc func(tr *Runner, sdk sdk.KaiSDK)
type ResponseHandler func(sdk sdk.KaiSDK, response *anypb.Any) error

type Runner struct {
sdk sdk.KaiSDK
nats *nats.Conn
jetstream nats.JetStreamContext
responseHandler ResponseHandler
responseChannels sync.Map
initializer common.Initializer
runner RunnerFunc
finalizer common.Finalizer
elapsedTimeMetric metric.Int64Histogram
numberOfMessagesMetric metric.Int64Counter
sdk sdk.KaiSDK
nats *nats.Conn
jetstream nats.JetStreamContext
responseHandler ResponseHandler
responseChannels sync.Map
initializer common.Initializer
runner RunnerFunc
finalizer common.Finalizer
messagesMetric metric.Int64Histogram
}

var wg sync.WaitGroup //nolint:gochecknoglobals // WaitGroup is used to wait for goroutines to finish
Expand Down
15 changes: 6 additions & 9 deletions py-sdk/runner/exit/exit_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from loguru import logger
from nats.aio.client import Client as NatsClient
from nats.js.client import JetStreamContext
from opentelemetry.metrics._internal.instrument import Counter, Histogram
from opentelemetry.metrics._internal.instrument import Histogram

from runner.common.common import Finalizer, Handler, Initializer, Task
from runner.exit.exceptions import FailedToInitializeMetricsError, UndefinedDefaultHandlerFunctionError
Expand Down Expand Up @@ -38,8 +38,7 @@ class ExitRunner:
preprocessor: Optional[Preprocessor] = None
postprocessor: Optional[Postprocessor] = None
finalizer: Optional[Finalizer] = None
elapsed_time_metric: Histogram = field(init=False)
number_of_messages_metric: Counter = field(init=False)
messages_metric: Histogram = field(init=False)

def __post_init__(self) -> None:
logger.configure(extra={"context": "", "metadata": {}, "origin": "[EXIT]"})
Expand All @@ -49,12 +48,10 @@ def __post_init__(self) -> None:

def _init_metrics(self) -> None:
try:
self.elapsed_time_metric = self.sdk.measurements.get_metrics_client().create_histogram(
name="runner-process-message-time", unit="ms", description="How long it takes to process a message."
)

self.number_of_messages_metric = self.sdk.measurements.get_metrics_client().create_counter(
name="runner-processed-messages", description="Number of messages processed."
self.messages_metric = self.sdk.measurements.get_metrics_client().create_histogram(
name="runner-process-message-metric",
unit="ms",
description="How long it takes to process a message and times called.",
)
except Exception as e:
self.logger.error(f"error initializing metrics: {e}")
Expand Down
38 changes: 6 additions & 32 deletions py-sdk/runner/exit/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(msg, Exception(f"no handler defined for {from_node}"))
return

Expand All @@ -116,12 +111,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(
msg, HandlerError(from_node, to_node, error=e, type="handler preprocessor")
)
Expand All @@ -133,12 +123,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(msg, HandlerError(from_node, to_node, error=e))
return

Expand All @@ -149,12 +134,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(
msg, HandlerError(from_node, to_node, error=e, type="handler postprocessor")
)
Expand All @@ -166,19 +146,13 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
self.logger.error(f"error acknowledging message: {e}")

end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
self.exit_runner.number_of_messages_metric.add(1, attributes=self.get_attributes(request_msg.request_id))
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))

async def _process_runner_error(self, msg: Msg, error: Exception) -> None:
error_msg = str(error)
Expand Down
8 changes: 3 additions & 5 deletions py-sdk/runner/exit/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, DeliverPolicy
from nats.js.client import JetStreamContext
from opentelemetry.metrics._internal.instrument import Counter, Histogram
from opentelemetry.metrics._internal.instrument import Histogram
from vyper import v

from runner.exit.exceptions import NewRequestMsgError
Expand Down Expand Up @@ -59,8 +59,7 @@ def m_exit_runner(_: ModelRegistry, __: PersistentStorage, ___: Predictions, ___
exit_runner.sdk = m_sdk
exit_runner.sdk.metadata = Mock(spec=Metadata)
exit_runner.sdk.metadata.get_process = Mock(return_value="test.process")
exit_runner.elapsed_time_metric = Mock(spec=Histogram)
exit_runner.number_of_messages_metric = Mock(spec=Counter)
exit_runner.messages_metric = Mock(spec=Histogram)

return exit_runner

Expand Down Expand Up @@ -211,8 +210,7 @@ async def test_process_message_ok(m_msg, m_exit_subscriber):
assert m_exit_subscriber._get_response_handler.called
assert m_handler.called
assert m_msg.ack.called
assert m_exit_subscriber.exit_runner.elapsed_time_metric.record.called
assert m_exit_subscriber.exit_runner.number_of_messages_metric.add.called
assert m_exit_subscriber.exit_runner.messages_metric.record.called


async def test_process_message_not_valid_protobuf_ko(m_msg, m_exit_subscriber):
Expand Down
Loading

0 comments on commit c67de17

Please sign in to comment.