Skip to content
This repository has been archived by the owner on Jan 21, 2025. It is now read-only.

Commit

Permalink
fix: update python metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kafkaphoenix committed Feb 19, 2024
1 parent 3858cf3 commit fd501a6
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 122 deletions.
15 changes: 6 additions & 9 deletions py-sdk/runner/exit/exit_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from loguru import logger
from nats.aio.client import Client as NatsClient
from nats.js.client import JetStreamContext
from opentelemetry.metrics._internal.instrument import Counter, Histogram
from opentelemetry.metrics._internal.instrument import Histogram

from runner.common.common import Finalizer, Handler, Initializer, Task
from runner.exit.exceptions import FailedToInitializeMetricsError, UndefinedDefaultHandlerFunctionError
Expand Down Expand Up @@ -38,8 +38,7 @@ class ExitRunner:
preprocessor: Optional[Preprocessor] = None
postprocessor: Optional[Postprocessor] = None
finalizer: Optional[Finalizer] = None
elapsed_time_metric: Histogram = field(init=False)
number_of_messages_metric: Counter = field(init=False)
messages_metric: Histogram = field(init=False)

def __post_init__(self) -> None:
logger.configure(extra={"context": "", "metadata": {}, "origin": "[EXIT]"})
Expand All @@ -49,12 +48,10 @@ def __post_init__(self) -> None:

def _init_metrics(self) -> None:
try:
self.elapsed_time_metric = self.sdk.measurements.get_metrics_client().create_histogram(
name="runner-process-message-time", unit="ms", description="How long it takes to process a message."
)

self.number_of_messages_metric = self.sdk.measurements.get_metrics_client().create_counter(
name="runner-processed-messages", description="Number of messages processed."
self.messages_metric = self.sdk.measurements.get_metrics_client().create_histogram(
name="runner-process-message-metric",
unit="ms",
description="How long it takes to process a message and times called.",
)
except Exception as e:
self.logger.error(f"error initializing metrics: {e}")
Expand Down
38 changes: 6 additions & 32 deletions py-sdk/runner/exit/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(msg, Exception(f"no handler defined for {from_node}"))
return

Expand All @@ -116,12 +111,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(
msg, HandlerError(from_node, to_node, error=e, type="handler preprocessor")
)
Expand All @@ -133,12 +123,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(msg, HandlerError(from_node, to_node, error=e))
return

Expand All @@ -149,12 +134,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(
msg, HandlerError(from_node, to_node, error=e, type="handler postprocessor")
)
Expand All @@ -166,19 +146,13 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
self.logger.error(f"error acknowledging message: {e}")

end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.exit_runner.elapsed_time_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
self.exit_runner.number_of_messages_metric.add(1, attributes=self.get_attributes(request_msg.request_id))
self.exit_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))

async def _process_runner_error(self, msg: Msg, error: Exception) -> None:
error_msg = str(error)
Expand Down
8 changes: 3 additions & 5 deletions py-sdk/runner/exit/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, DeliverPolicy
from nats.js.client import JetStreamContext
from opentelemetry.metrics._internal.instrument import Counter, Histogram
from opentelemetry.metrics._internal.instrument import Histogram
from vyper import v

from runner.exit.exceptions import NewRequestMsgError
Expand Down Expand Up @@ -59,8 +59,7 @@ def m_exit_runner(_: ModelRegistry, __: PersistentStorage, ___: Predictions, ___
exit_runner.sdk = m_sdk
exit_runner.sdk.metadata = Mock(spec=Metadata)
exit_runner.sdk.metadata.get_process = Mock(return_value="test.process")
exit_runner.elapsed_time_metric = Mock(spec=Histogram)
exit_runner.number_of_messages_metric = Mock(spec=Counter)
exit_runner.messages_metric = Mock(spec=Histogram)

return exit_runner

Expand Down Expand Up @@ -211,8 +210,7 @@ async def test_process_message_ok(m_msg, m_exit_subscriber):
assert m_exit_subscriber._get_response_handler.called
assert m_handler.called
assert m_msg.ack.called
assert m_exit_subscriber.exit_runner.elapsed_time_metric.record.called
assert m_exit_subscriber.exit_runner.number_of_messages_metric.add.called
assert m_exit_subscriber.exit_runner.messages_metric.record.called


async def test_process_message_not_valid_protobuf_ko(m_msg, m_exit_subscriber):
Expand Down
38 changes: 6 additions & 32 deletions py-sdk/runner/task/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.task_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.task_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(msg, Exception(f"no handler defined for {from_node}"))
return

Expand All @@ -118,12 +113,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.task_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.task_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(
msg, HandlerError(from_node, to_node, error=e, type="handler preprocessor")
)
Expand All @@ -135,12 +125,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.task_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.task_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(msg, HandlerError(from_node, to_node, error=e))
return

Expand All @@ -151,12 +136,7 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.task_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.task_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
await self._process_runner_error(
msg, HandlerError(from_node, to_node, error=e, type="handler postprocessor")
)
Expand All @@ -168,19 +148,13 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.task_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.task_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
self.logger.error(f"error acknowledging message: {e}")

end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.task_runner.elapsed_time_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))
self.task_runner.number_of_messages_metric.add(1, attributes=self.get_attributes(request_msg.request_id))
self.task_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))

async def _process_runner_error(self, msg: Msg, error: Exception) -> None:
error_msg = str(error)
Expand Down
8 changes: 3 additions & 5 deletions py-sdk/runner/task/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, DeliverPolicy
from nats.js.client import JetStreamContext
from opentelemetry.metrics._internal.instrument import Counter, Histogram
from opentelemetry.metrics._internal.instrument import Histogram
from vyper import v

from runner.task.exceptions import NewRequestMsgError
Expand Down Expand Up @@ -59,8 +59,7 @@ def m_task_runner(_: ModelRegistry, __: PersistentStorage, ___: Predictions, ___
task_runner.sdk = m_sdk
task_runner.sdk.metadata = Mock(spec=Metadata)
task_runner.sdk.metadata.get_process = Mock(return_value="test.process")
task_runner.elapsed_time_metric = Mock(spec=Histogram)
task_runner.number_of_messages_metric = Mock(spec=Counter)
task_runner.messages_metric = Mock(spec=Histogram)

return task_runner

Expand Down Expand Up @@ -211,8 +210,7 @@ async def test_process_message_ok(m_msg, m_task_subscriber):
assert m_task_subscriber._get_response_handler.called
assert m_handler.called
assert m_msg.ack.called
assert m_task_subscriber.task_runner.elapsed_time_metric.record.called
assert m_task_subscriber.task_runner.number_of_messages_metric.add.called
assert m_task_subscriber.task_runner.messages_metric.record.called


async def test_process_message_not_valid_protobuf_ko(m_msg, m_task_subscriber):
Expand Down
15 changes: 6 additions & 9 deletions py-sdk/runner/task/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from loguru import logger
from nats.aio.client import Client as NatsClient
from nats.js.client import JetStreamContext
from opentelemetry.metrics._internal.instrument import Counter, Histogram
from opentelemetry.metrics._internal.instrument import Histogram

from runner.common.common import Finalizer, Handler, Initializer, Task
from runner.task.exceptions import FailedToInitializeMetricsError, UndefinedDefaultHandlerFunctionError
Expand Down Expand Up @@ -38,8 +38,7 @@ class TaskRunner:
preprocessor: Optional[Preprocessor] = None
postprocessor: Optional[Postprocessor] = None
finalizer: Optional[Finalizer] = None
elapsed_time_metric: Histogram = field(init=False)
number_of_messages_metric: Counter = field(init=False)
messages_metric: Histogram = field(init=False)

def __post_init__(self) -> None:
logger.configure(extra={"context": "", "metadata": {}, "origin": "[TASK]"})
Expand All @@ -49,12 +48,10 @@ def __post_init__(self) -> None:

def _init_metrics(self) -> None:
try:
self.elapsed_time_metric = self.sdk.measurements.get_metrics_client().create_histogram(
name="runner-process-message-time", unit="ms", description="How long it takes to process a message."
)

self.number_of_messages_metric = self.sdk.measurements.get_metrics_client().create_counter(
name="runner-processed-messages", description="Number of messages processed."
self.messages_metric = self.sdk.measurements.get_metrics_client().create_histogram(
name="runner-process-message-metric",
unit="ms",
description="How long it takes to process a message and times called.",
)
except Exception as e:
self.logger.error(f"error initializing metrics: {e}")
Expand Down
20 changes: 4 additions & 16 deletions py-sdk/runner/trigger/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,9 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.trigger_runner.elapsed_time_metric.record(
self.trigger_runner.messages_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.trigger_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
await self._process_runner_error(msg, UndefinedResponseHandlerError(msg.subject))
return

Expand All @@ -113,12 +110,9 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.trigger_runner.elapsed_time_metric.record(
self.trigger_runner.messages_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.trigger_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
from_node = request_msg.from_node
to_node = self.trigger_runner.sdk.metadata.get_process()
await self._process_runner_error(msg, HandlerError(from_node, to_node, error=e))
Expand All @@ -130,21 +124,15 @@ async def _process_message(self, msg: Msg) -> None:
end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.trigger_runner.elapsed_time_metric.record(
self.trigger_runner.messages_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.trigger_runner.number_of_messages_metric.add(
1, attributes=self.get_attributes(request_msg.request_id)
)
self.logger.error(f"error acknowledging message: {e}")

end = time.time() * 1000
elapsed = end - start
self.logger.info(f"{Metadata.get_process()} execution time: {elapsed} ms")
self.trigger_runner.elapsed_time_metric.record(
elapsed, attributes=self.get_attributes(request_msg.request_id)
)
self.trigger_runner.number_of_messages_metric.add(1, attributes=self.get_attributes(request_msg.request_id))
self.trigger_runner.messages_metric.record(elapsed, attributes=self.get_attributes(request_msg.request_id))

async def _process_runner_error(self, msg: Msg, error: Exception) -> None:
error_msg = str(error)
Expand Down
8 changes: 3 additions & 5 deletions py-sdk/runner/trigger/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, DeliverPolicy
from nats.js.client import JetStreamContext
from opentelemetry.metrics._internal.instrument import Counter, Histogram
from opentelemetry.metrics._internal.instrument import Histogram
from vyper import v

from runner.trigger.exceptions import NewRequestMsgError
Expand Down Expand Up @@ -65,8 +65,7 @@ def m_trigger_runner(
trigger_runner.sdk = m_sdk
trigger_runner.sdk.metadata = Mock(spec=Metadata)
trigger_runner.sdk.metadata.get_process = Mock(return_value="test.process")
trigger_runner.elapsed_time_metric = Mock(spec=Histogram)
trigger_runner.number_of_messages_metric = Mock(spec=Counter)
trigger_runner.messages_metric = Mock(spec=Histogram)

return trigger_runner

Expand Down Expand Up @@ -228,8 +227,7 @@ async def test_process_message_ok(m_getattr, m_msg, m_trigger_subscriber):
assert m_getattr.called
assert m_handler.called
assert m_msg.ack.called
assert m_trigger_subscriber.trigger_runner.elapsed_time_metric.record.called
assert m_trigger_subscriber.trigger_runner.number_of_messages_metric.add.called
assert m_trigger_subscriber.trigger_runner.messages_metric.record.called


async def test_process_message_not_valid_protobuf_ko(m_msg, m_trigger_subscriber):
Expand Down
Loading

0 comments on commit fd501a6

Please sign in to comment.