diff --git a/go-sdk/runner/exit/exit_runner.go b/go-sdk/runner/exit/exit_runner.go index f774ea7f..29be226f 100644 --- a/go-sdk/runner/exit/exit_runner.go +++ b/go-sdk/runner/exit/exit_runner.go @@ -20,16 +20,15 @@ type Handler common.Handler type Postprocessor common.Handler type Runner struct { - sdk sdk.KaiSDK - nats *nats.Conn - jetstream nats.JetStreamContext - responseHandlers map[string]Handler - initializer common.Initializer - preprocessor Preprocessor - postprocessor Postprocessor - finalizer common.Finalizer - elapsedTimeMetric metric.Int64Histogram - numberOfMessagesMetric metric.Int64Counter + sdk sdk.KaiSDK + nats *nats.Conn + jetstream nats.JetStreamContext + responseHandlers map[string]Handler + initializer common.Initializer + preprocessor Preprocessor + postprocessor Postprocessor + finalizer common.Finalizer + messagesMetric metric.Int64Histogram } func NewExitRunner(logger logr.Logger, ns *nats.Conn, js nats.JetStreamContext) *Runner { diff --git a/go-sdk/runner/exit/subscriber.go b/go-sdk/runner/exit/subscriber.go index 3957c442..ea410181 100644 --- a/go-sdk/runner/exit/subscriber.go +++ b/go-sdk/runner/exit/subscriber.go @@ -39,9 +39,9 @@ func (er *Runner) startSubscriber() { var err error - er.elapsedTimeMetric, err = er.sdk.Measurements.GetMetricsClient().Int64Histogram( - "runner-process-message-time", - metric.WithDescription("How long it takes to process a message."), + er.messagesMetric, err = er.sdk.Measurements.GetMetricsClient().Int64Histogram( + "runner-process-message-metric", + metric.WithDescription("How long it takes to process a message and times called."), metric.WithUnit("ms"), ) if err != nil { @@ -49,15 +49,6 @@ func (er *Runner) startSubscriber() { os.Exit(1) } - er.numberOfMessagesMetric, err = er.sdk.Measurements.GetMetricsClient().Int64Counter( - "runner-processed-messages", - metric.WithDescription("Number of messages processed."), - ) - if err != nil { - er.getLoggerWithName().Error(err, "Error initializing metric") - os.Exit(1) - } - subscriptions := make([]*nats.Subscription, 0, len(inputSubjects)) for _, subject := range inputSubjects { @@ -125,11 +116,9 @@ func (er *Runner) processMessage(msg *nats.Msg) { executionTime := time.Since(start).Milliseconds() er.sdk.Logger.V(1).Info(fmt.Sprintf("%s execution time: %d ms", er.sdk.Metadata.GetProcess(), executionTime)) - er.elapsedTimeMetric.Record(context.Background(), executionTime, + er.messagesMetric.Record(context.Background(), executionTime, metric.WithAttributeSet(er.getMetricAttributes(requestMsg.RequestId)), ) - - er.numberOfMessagesMetric.Add(context.Background(), 1, metric.WithAttributeSet(er.getMetricAttributes(requestMsg.RequestId))) }() er.getLoggerWithName().Info(fmt.Sprintf("New message received with subject %s", diff --git a/go-sdk/runner/task/subscriber.go b/go-sdk/runner/task/subscriber.go index 84b629dc..c97338cd 100644 --- a/go-sdk/runner/task/subscriber.go +++ b/go-sdk/runner/task/subscriber.go @@ -38,20 +38,10 @@ func (tr *Runner) startSubscriber() { var err error - tr.elapsedTimeMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram( - "runner-process-message-time", - metric.WithDescription("How long it takes to process a message."), + tr.messagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram( + "runner-process-message-metric", + metric.WithDescription("How long it takes to process a message and times called."), metric.WithUnit("ms"), - metric.WithExplicitBucketBoundaries(250, 500, 1000), - ) - if err != nil { - tr.getLoggerWithName().Error(err, "Error initializing metric") - os.Exit(1) - } - - tr.numberOfMessagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Counter( - "runner-processed-messages", - metric.WithDescription("Number of messages processed."), ) if err != nil { tr.getLoggerWithName().Error(err, "Error initializing metric") @@ -124,11 +114,9 @@ func (tr *Runner) processMessage(msg *nats.Msg) { executionTime := time.Since(start).Milliseconds() tr.sdk.Logger.V(1).Info(fmt.Sprintf("%s execution time: %d ms", tr.sdk.Metadata.GetProcess(), executionTime)) - tr.elapsedTimeMetric.Record(context.Background(), executionTime, + tr.messagesMetric.Record(context.Background(), executionTime, metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId)), ) - - tr.numberOfMessagesMetric.Add(context.Background(), 1, metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId))) }() tr.getLoggerWithName().Info(fmt.Sprintf("New message received with subject %s", diff --git a/go-sdk/runner/task/task_runner.go b/go-sdk/runner/task/task_runner.go index 73a7c2de..e81bb3d5 100644 --- a/go-sdk/runner/task/task_runner.go +++ b/go-sdk/runner/task/task_runner.go @@ -20,16 +20,15 @@ type Handler common.Handler type Postprocessor common.Handler type Runner struct { - sdk sdk.KaiSDK - nats *nats.Conn - jetstream nats.JetStreamContext - responseHandlers map[string]Handler - initializer common.Initializer - preprocessor Preprocessor - postprocessor Postprocessor - finalizer common.Finalizer - elapsedTimeMetric metric.Int64Histogram - numberOfMessagesMetric metric.Int64Counter + sdk sdk.KaiSDK + nats *nats.Conn + jetstream nats.JetStreamContext + responseHandlers map[string]Handler + initializer common.Initializer + preprocessor Preprocessor + postprocessor Postprocessor + finalizer common.Finalizer + messagesMetric metric.Int64Histogram } func NewTaskRunner(logger logr.Logger, ns *nats.Conn, js nats.JetStreamContext) *Runner { diff --git a/go-sdk/runner/trigger/subscriber.go b/go-sdk/runner/trigger/subscriber.go index da4a7a73..6efd9f68 100644 --- a/go-sdk/runner/trigger/subscriber.go +++ b/go-sdk/runner/trigger/subscriber.go @@ -36,20 +36,10 @@ func (tr *Runner) startSubscriber() { var err error - tr.elapsedTimeMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram( - "runner-process-message-time", - metric.WithDescription("How long it takes to process a message."), + tr.messagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Histogram( + "runner-process-message-metric", + metric.WithDescription("How long it takes to process a message and times called."), metric.WithUnit("ms"), - metric.WithExplicitBucketBoundaries(250, 500, 1000), - ) - if err != nil { - tr.getLoggerWithName().Error(err, "Error initializing metric") - os.Exit(1) - } - - tr.numberOfMessagesMetric, err = tr.sdk.Measurements.GetMetricsClient().Int64Counter( - "runner-processed-messages", - metric.WithDescription("Number of messages processed."), ) if err != nil { tr.getLoggerWithName().Error(err, "Error initializing metric") @@ -126,11 +116,9 @@ func (tr *Runner) processMessage(msg *nats.Msg) { executionTime := time.Since(start).Milliseconds() tr.sdk.Logger.V(1).Info(fmt.Sprintf("%s execution time: %d ms", tr.sdk.Metadata.GetProcess(), executionTime)) - tr.elapsedTimeMetric.Record(context.Background(), executionTime, + tr.messagesMetric.Record(context.Background(), executionTime, metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId)), ) - - tr.numberOfMessagesMetric.Add(context.Background(), 1, metric.WithAttributeSet(tr.getMetricAttributes(requestMsg.RequestId))) }() tr.getLoggerWithName().Info(fmt.Sprintf("New message received with subject %s", diff --git a/go-sdk/runner/trigger/trigger_runner.go b/go-sdk/runner/trigger/trigger_runner.go index 93414344..6d821fdc 100644 --- a/go-sdk/runner/trigger/trigger_runner.go +++ b/go-sdk/runner/trigger/trigger_runner.go @@ -18,16 +18,15 @@ type RunnerFunc func(tr *Runner, sdk sdk.KaiSDK) type ResponseHandler func(sdk sdk.KaiSDK, response *anypb.Any) error type Runner struct { - sdk sdk.KaiSDK - nats *nats.Conn - jetstream nats.JetStreamContext - responseHandler ResponseHandler - responseChannels sync.Map - initializer common.Initializer - runner RunnerFunc - finalizer common.Finalizer - elapsedTimeMetric metric.Int64Histogram - numberOfMessagesMetric metric.Int64Counter + sdk sdk.KaiSDK + nats *nats.Conn + jetstream nats.JetStreamContext + responseHandler ResponseHandler + responseChannels sync.Map + initializer common.Initializer + runner RunnerFunc + finalizer common.Finalizer + messagesMetric metric.Int64Histogram } var wg sync.WaitGroup //nolint:gochecknoglobals // WaitGroup is used to wait for goroutines to finish diff --git a/py-sdk/runner/exit/exit_runner.py b/py-sdk/runner/exit/exit_runner.py index fcda9af5..bf2dde02 100644 --- a/py-sdk/runner/exit/exit_runner.py +++ b/py-sdk/runner/exit/exit_runner.py @@ -10,7 +10,7 @@ from loguru import logger from nats.aio.client import Client as NatsClient from nats.js.client import JetStreamContext -from opentelemetry.metrics._internal.instrument import Counter, Histogram +from opentelemetry.metrics._internal.instrument import Histogram from runner.common.common import Finalizer, Handler, Initializer, Task from runner.exit.exceptions import FailedToInitializeMetricsError, UndefinedDefaultHandlerFunctionError @@ -38,8 +38,7 @@ class ExitRunner: preprocessor: Optional[Preprocessor] = None postprocessor: Optional[Postprocessor] = None finalizer: Optional[Finalizer] = None - elapsed_time_metric: Histogram = field(init=False) - number_of_messages_metric: Counter = field(init=False) + messages_metric: Histogram = field(init=False) def __post_init__(self) -> None: logger.configure(extra={"context": "", "metadata": {}, "origin": "[EXIT]"}) @@ -49,12 +48,10 @@ def __post_init__(self) -> None: def _init_metrics(self) -> None: try: - self.elapsed_time_metric = self.sdk.measurements.get_metrics_client().create_histogram( - name="runner-process-message-time", unit="ms", description="How long it takes to process a message." - ) - - self.number_of_messages_metric = self.sdk.measurements.get_metrics_client().create_counter( - name="runner-processed-messages", description="Number of messages processed." + self.messages_metric = self.sdk.measurements.get_metrics_client().create_histogram( + name="runner-process-message-metric", + unit="ms", + description="How long it takes to process a message and times called.", ) except Exception as e: self.logger.error(f"error initializing metrics: {e}") diff --git a/py-sdk/runner/exit/subscriber.py b/py-sdk/runner/exit/subscriber.py index 386cbf4a..74341d98 100644 --- a/py-sdk/runner/exit/subscriber.py +++ b/py-sdk/runner/exit/subscriber.py @@ -100,12 +100,7 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.exit_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.exit_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) + self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) await self._process_runner_error(msg, Exception(f"no handler defined for {from_node}")) return @@ -116,12 +111,7 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.exit_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.exit_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) + self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) await self._process_runner_error( msg, HandlerError(from_node, to_node, error=e, type="handler preprocessor") ) @@ -133,12 +123,7 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.exit_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.exit_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) + self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) await self._process_runner_error(msg, HandlerError(from_node, to_node, error=e)) return @@ -149,12 +134,7 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.exit_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.exit_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) + self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) await self._process_runner_error( msg, HandlerError(from_node, to_node, error=e, type="handler postprocessor") ) @@ -166,19 +146,13 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.exit_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.exit_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) + self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) self.logger.error(f"error acknowledging message: {e}") end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.exit_runner.elapsed_time_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) - self.exit_runner.number_of_messages_metric.add(1, attributes=self.get_attributes(request_msg.request_id)) + self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) async def _process_runner_error(self, msg: Msg, error: Exception) -> None: error_msg = str(error) diff --git a/py-sdk/runner/exit/subscriber_test.py b/py-sdk/runner/exit/subscriber_test.py index 7ef83a0d..10da3628 100644 --- a/py-sdk/runner/exit/subscriber_test.py +++ b/py-sdk/runner/exit/subscriber_test.py @@ -7,7 +7,7 @@ from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, DeliverPolicy from nats.js.client import JetStreamContext -from opentelemetry.metrics._internal.instrument import Counter, Histogram +from opentelemetry.metrics._internal.instrument import Histogram from vyper import v from runner.exit.exceptions import NewRequestMsgError @@ -59,8 +59,7 @@ def m_exit_runner(_: ModelRegistry, __: PersistentStorage, ___: Predictions, ___ exit_runner.sdk = m_sdk exit_runner.sdk.metadata = Mock(spec=Metadata) exit_runner.sdk.metadata.get_process = Mock(return_value="test.process") - exit_runner.elapsed_time_metric = Mock(spec=Histogram) - exit_runner.number_of_messages_metric = Mock(spec=Counter) + exit_runner.messages_metric = Mock(spec=Histogram) return exit_runner @@ -211,8 +210,7 @@ async def test_process_message_ok(m_msg, m_exit_subscriber): assert m_exit_subscriber._get_response_handler.called assert m_handler.called assert m_msg.ack.called - assert m_exit_subscriber.exit_runner.elapsed_time_metric.record.called - assert m_exit_subscriber.exit_runner.number_of_messages_metric.add.called + assert m_exit_subscriber.exit_runner.messages_metric.record.called async def test_process_message_not_valid_protobuf_ko(m_msg, m_exit_subscriber): diff --git a/py-sdk/runner/task/subscriber.py b/py-sdk/runner/task/subscriber.py index 7d6e70e6..62c29d28 100644 --- a/py-sdk/runner/task/subscriber.py +++ b/py-sdk/runner/task/subscriber.py @@ -102,12 +102,7 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.task_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.task_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) + self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) await self._process_runner_error(msg, Exception(f"no handler defined for {from_node}")) return @@ -118,12 +113,7 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.task_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.task_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) + self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) await self._process_runner_error( msg, HandlerError(from_node, to_node, error=e, type="handler preprocessor") ) @@ -135,12 +125,7 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.task_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.task_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) + self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) await self._process_runner_error(msg, HandlerError(from_node, to_node, error=e)) return @@ -151,12 +136,7 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.task_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.task_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) + self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) await self._process_runner_error( msg, HandlerError(from_node, to_node, error=e, type="handler postprocessor") ) @@ -168,19 +148,13 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.task_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.task_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) + self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) self.logger.error(f"error acknowledging message: {e}") end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.task_runner.elapsed_time_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) - self.task_runner.number_of_messages_metric.add(1, attributes=self.get_attributes(request_msg.request_id)) + self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) async def _process_runner_error(self, msg: Msg, error: Exception) -> None: error_msg = str(error) diff --git a/py-sdk/runner/task/subscriber_test.py b/py-sdk/runner/task/subscriber_test.py index 4258efb2..f6cd82ad 100644 --- a/py-sdk/runner/task/subscriber_test.py +++ b/py-sdk/runner/task/subscriber_test.py @@ -7,7 +7,7 @@ from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, DeliverPolicy from nats.js.client import JetStreamContext -from opentelemetry.metrics._internal.instrument import Counter, Histogram +from opentelemetry.metrics._internal.instrument import Histogram from vyper import v from runner.task.exceptions import NewRequestMsgError @@ -59,8 +59,7 @@ def m_task_runner(_: ModelRegistry, __: PersistentStorage, ___: Predictions, ___ task_runner.sdk = m_sdk task_runner.sdk.metadata = Mock(spec=Metadata) task_runner.sdk.metadata.get_process = Mock(return_value="test.process") - task_runner.elapsed_time_metric = Mock(spec=Histogram) - task_runner.number_of_messages_metric = Mock(spec=Counter) + task_runner.messages_metric = Mock(spec=Histogram) return task_runner @@ -211,8 +210,7 @@ async def test_process_message_ok(m_msg, m_task_subscriber): assert m_task_subscriber._get_response_handler.called assert m_handler.called assert m_msg.ack.called - assert m_task_subscriber.task_runner.elapsed_time_metric.record.called - assert m_task_subscriber.task_runner.number_of_messages_metric.add.called + assert m_task_subscriber.task_runner.messages_metric.record.called async def test_process_message_not_valid_protobuf_ko(m_msg, m_task_subscriber): diff --git a/py-sdk/runner/task/task_runner.py b/py-sdk/runner/task/task_runner.py index 9d16eea1..092d3f70 100644 --- a/py-sdk/runner/task/task_runner.py +++ b/py-sdk/runner/task/task_runner.py @@ -10,7 +10,7 @@ from loguru import logger from nats.aio.client import Client as NatsClient from nats.js.client import JetStreamContext -from opentelemetry.metrics._internal.instrument import Counter, Histogram +from opentelemetry.metrics._internal.instrument import Histogram from runner.common.common import Finalizer, Handler, Initializer, Task from runner.task.exceptions import FailedToInitializeMetricsError, UndefinedDefaultHandlerFunctionError @@ -38,8 +38,7 @@ class TaskRunner: preprocessor: Optional[Preprocessor] = None postprocessor: Optional[Postprocessor] = None finalizer: Optional[Finalizer] = None - elapsed_time_metric: Histogram = field(init=False) - number_of_messages_metric: Counter = field(init=False) + messages_metric: Histogram = field(init=False) def __post_init__(self) -> None: logger.configure(extra={"context": "", "metadata": {}, "origin": "[TASK]"}) @@ -49,12 +48,10 @@ def __post_init__(self) -> None: def _init_metrics(self) -> None: try: - self.elapsed_time_metric = self.sdk.measurements.get_metrics_client().create_histogram( - name="runner-process-message-time", unit="ms", description="How long it takes to process a message." - ) - - self.number_of_messages_metric = self.sdk.measurements.get_metrics_client().create_counter( - name="runner-processed-messages", description="Number of messages processed." + self.messages_metric = self.sdk.measurements.get_metrics_client().create_histogram( + name="runner-process-message-metric", + unit="ms", + description="How long it takes to process a message and times called.", ) except Exception as e: self.logger.error(f"error initializing metrics: {e}") diff --git a/py-sdk/runner/trigger/subscriber.py b/py-sdk/runner/trigger/subscriber.py index 1f15a1a0..158f0763 100644 --- a/py-sdk/runner/trigger/subscriber.py +++ b/py-sdk/runner/trigger/subscriber.py @@ -98,12 +98,9 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.trigger_runner.elapsed_time_metric.record( + self.trigger_runner.messages_metric.record( elapsed, attributes=self.get_attributes(request_msg.request_id) ) - self.trigger_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) await self._process_runner_error(msg, UndefinedResponseHandlerError(msg.subject)) return @@ -113,12 +110,9 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.trigger_runner.elapsed_time_metric.record( + self.trigger_runner.messages_metric.record( elapsed, attributes=self.get_attributes(request_msg.request_id) ) - self.trigger_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) from_node = request_msg.from_node to_node = self.trigger_runner.sdk.metadata.get_process() await self._process_runner_error(msg, HandlerError(from_node, to_node, error=e)) @@ -130,21 +124,15 @@ async def _process_message(self, msg: Msg) -> None: end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.trigger_runner.elapsed_time_metric.record( + self.trigger_runner.messages_metric.record( elapsed, attributes=self.get_attributes(request_msg.request_id) ) - self.trigger_runner.number_of_messages_metric.add( - 1, attributes=self.get_attributes(request_msg.request_id) - ) self.logger.error(f"error acknowledging message: {e}") end = time.time() * 1000 elapsed = end - start self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms") - self.trigger_runner.elapsed_time_metric.record( - elapsed, attributes=self.get_attributes(request_msg.request_id) - ) - self.trigger_runner.number_of_messages_metric.add(1, attributes=self.get_attributes(request_msg.request_id)) + self.trigger_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id)) async def _process_runner_error(self, msg: Msg, error: Exception) -> None: error_msg = str(error) diff --git a/py-sdk/runner/trigger/subscriber_test.py b/py-sdk/runner/trigger/subscriber_test.py index 7eec0de3..9ef0ddc7 100644 --- a/py-sdk/runner/trigger/subscriber_test.py +++ b/py-sdk/runner/trigger/subscriber_test.py @@ -9,7 +9,7 @@ from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, DeliverPolicy from nats.js.client import JetStreamContext -from opentelemetry.metrics._internal.instrument import Counter, Histogram +from opentelemetry.metrics._internal.instrument import Histogram from vyper import v from runner.trigger.exceptions import NewRequestMsgError @@ -65,8 +65,7 @@ def m_trigger_runner( trigger_runner.sdk = m_sdk trigger_runner.sdk.metadata = Mock(spec=Metadata) trigger_runner.sdk.metadata.get_process = Mock(return_value="test.process") - trigger_runner.elapsed_time_metric = Mock(spec=Histogram) - trigger_runner.number_of_messages_metric = Mock(spec=Counter) + trigger_runner.messages_metric = Mock(spec=Histogram) return trigger_runner @@ -228,8 +227,7 @@ async def test_process_message_ok(m_getattr, m_msg, m_trigger_subscriber): assert m_getattr.called assert m_handler.called assert m_msg.ack.called - assert m_trigger_subscriber.trigger_runner.elapsed_time_metric.record.called - assert m_trigger_subscriber.trigger_runner.number_of_messages_metric.add.called + assert m_trigger_subscriber.trigger_runner.messages_metric.record.called async def test_process_message_not_valid_protobuf_ko(m_msg, m_trigger_subscriber): diff --git a/py-sdk/runner/trigger/trigger_runner.py b/py-sdk/runner/trigger/trigger_runner.py index eb216f57..8e0ca754 100644 --- a/py-sdk/runner/trigger/trigger_runner.py +++ b/py-sdk/runner/trigger/trigger_runner.py @@ -13,7 +13,7 @@ from loguru import logger from nats.aio.client import Client as NatsClient from nats.js.client import JetStreamContext -from opentelemetry.metrics._internal.instrument import Counter, Histogram +from opentelemetry.metrics._internal.instrument import Histogram from runner.common.common import Finalizer, Initializer from runner.trigger.exceptions import FailedToInitializeMetricsError, UndefinedRunnerFunctionError @@ -37,8 +37,7 @@ class TriggerRunner: subscriber: TriggerSubscriber = field(init=False) finalizer: Optional[Finalizer] = None tasks: list[threading.Thread] = field(init=False, default_factory=list) - elapsed_time_metric: Histogram = field(init=False) - number_of_messages_metric: Counter = field(init=False) + messages_metric: Histogram = field(init=False) def __post_init__(self) -> None: logger.configure(extra={"context": "", "metadata": {}, "origin": "[TRIGGER]"}) @@ -48,12 +47,10 @@ def __post_init__(self) -> None: def _init_metrics(self) -> None: try: - self.elapsed_time_metric = self.sdk.measurements.get_metrics_client().create_histogram( - name="runner-process-message-time", unit="ms", description="How long it takes to process a message." - ) - - self.number_of_messages_metric = self.sdk.measurements.get_metrics_client().create_counter( - name="runner-processed-messages", description="Number of messages processed." + self.messages_metric = self.sdk.measurements.get_metrics_client().create_histogram( + name="runner-process-message-metric", + unit="ms", + description="How long it takes to process a message and times called.", ) except Exception as e: self.logger.error(f"error initializing metrics: {e}")