From 3c6474ed3a989ab1a7f79015e55e7aa65d8bfd6f Mon Sep 17 00:00:00 2001 From: Christopher Davis Date: Sun, 14 Jul 2024 19:51:39 -0500 Subject: [PATCH] Add Optional OpenTelemetry Instrumentation So we get spans around `once` mostly. --- composer.json | 26 ++- src/Otel/PmgQueueInstrumentation.php | 188 +++++++++++++++++++ src/Otel/_register.php | 36 ++++ test/integration/ConsumerOtelIntTest.php | 139 ++++++++++++++ test/integration/OtelIntegrationTestCase.php | 60 ++++++ test/integration/ProducerOtelIntTest.php | 83 ++++++++ 6 files changed, 529 insertions(+), 3 deletions(-) create mode 100644 src/Otel/PmgQueueInstrumentation.php create mode 100644 src/Otel/_register.php create mode 100644 test/integration/ConsumerOtelIntTest.php create mode 100644 test/integration/OtelIntegrationTestCase.php create mode 100644 test/integration/ProducerOtelIntTest.php diff --git a/composer.json b/composer.json index ec305b1..95f5deb 100644 --- a/composer.json +++ b/composer.json @@ -13,10 +13,22 @@ }, "require-dev": { "phpunit/phpunit": "^9.5", - "symfony/phpunit-bridge": "^5.4" + "symfony/phpunit-bridge": "^5.4", + "open-telemetry/api": "^1.0", + "open-telemetry/context": "^1.0", + "open-telemetry/sem-conv": "^1.25", + "open-telemetry/sdk": "^1.0", + "symfony/http-client": "^7.1", + "nyholm/psr7": "^1.8" }, "suggest": { - "pmg/queue-pheanstalk": "Power pmg/queue with Beanstalkd" + "pmg/queue-pheanstalk": "Power pmg/queue with Beanstalkd", + "open-telemetry/api": "enables open telemetry auto instrumentation", + "open-telemetry/context": "enables open telemetry auto instrumentation", + "open-telemetry/sem-conv": "enables open telemetry auto instrumentation" + }, + "conflict": { + "open-telemetry/sem-conv": "<1.25" }, "autoload": { "psr-4": { @@ -29,7 +41,10 @@ "test/unit/", "test/integration/" ] - } + }, + "files": [ + "src/Otel/_register.php" + ] }, "extra": { "branch-alias": { @@ -37,5 +52,10 @@ "dev-version-3": "3.0-dev", "dev-version-2": "2.0-dev" } + }, + "config": { + "allow-plugins": { + "php-http/discovery": true + } } } diff --git a/src/Otel/PmgQueueInstrumentation.php b/src/Otel/PmgQueueInstrumentation.php new file mode 100644 index 0000000..36900ec --- /dev/null +++ b/src/Otel/PmgQueueInstrumentation.php @@ -0,0 +1,188 @@ + + * + * For full copyright information see the LICENSE file distributed + * with this source code. + * + * @license http://opensource.org/licenses/Apache-2.0 Apache-2.0 + */ + +namespace PMG\Queue\Otel; + +use function OpenTelemetry\Instrumentation\hook; +use OpenTelemetry\API\Instrumentation\CachedInstrumentation; +use OpenTelemetry\API\Trace\Span; +use OpenTelemetry\API\Trace\SpanBuilderInterface; +use OpenTelemetry\API\Trace\SpanKind; +use OpenTelemetry\API\Trace\StatusCode; +use OpenTelemetry\Context\Context; +use OpenTelemetry\SemConv\TraceAttributes; +use PMG\Queue\Consumer; +use PMG\Queue\Driver; +use PMG\Queue\Envelope; + +final class PmgQueueInstrumentation +{ + public const NAME = 'pmg-queue'; + public const INSTRUMENTATION_NAME = 'com.pmg.opentelemetry.'.self::NAME; + + // these two are in semconv, but have not yet maded it to the PHP SDK + // type is generic and defined in semconv where name is system specific + public const OPERATION_TYPE = 'messaging.operation.type'; + public const OPERATION_NAME = 'messaging.operation.name'; + + public static bool $registered = false; + + public static function register(): bool + { + if (self::$registered) { + return false; + } + + if (!extension_loaded('opentelemetry')) { + return false; + } + + self::$registered = true; + + $instrumentation = new CachedInstrumentation(self::INSTRUMENTATION_NAME); + + hook( + Consumer::class, + 'once', + pre: static function ( + Consumer $consumer, + array $params, + string $class, + string $function, + ?string $filename, + ?int $lineno, + ) use ($instrumentation): array { + $queueName = $params[0]; + assert(is_string($queueName)); + + $builder = $instrumentation + ->tracer() + ->spanBuilder($queueName.' receive') + ->setSpanKind(SpanKind::KIND_CONSUMER) + ->setAttribute(TraceAttributes::CODE_FUNCTION, $function) + ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) + ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) + ->setAttribute(TraceAttributes::CODE_LINENO, $lineno) + ->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $queueName) + ->setAttribute(self::OPERATION_TYPE, 'receive') // generic + ->setAttribute(self::OPERATION_NAME, 'once') // system specific + ; + + $parent = Context::getCurrent(); + $span = $builder + ->setParent($parent) + ->startSpan(); + + $context = $span->storeInContext($parent); + Context::storage()->attach($context); + + return $params; + }, + post: static function ( + Consumer $consumer, + array $params, + mixed $result, + ?\Throwable $exception + ): void { + $scope = Context::storage()->scope(); + if (null === $scope) { + return; + } + + $queueName = $params[0]; + assert(is_string($queueName)); + + $scope->detach(); + $span = Span::fromContext($scope->context()); + + if (null !== $exception) { + $span->recordException($exception, [ + TraceAttributes::EXCEPTION_ESCAPED => true, + ]); + $span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage()); + } elseif ($result === false) { + $span->setStatus(StatusCode::STATUS_ERROR, 'Message was not handled successfully'); + } + + $span->end(); + } + ); + + hook( + Driver::class, + 'enqueue', + pre: static function ( + Driver $bus, + array $params, + string $class, + string $function, + ?string $filename, + ?int $lineno, + ) use ($instrumentation): array { + $queueName = $params[0]; + assert(is_string($queueName)); + + $message = $params[1]; + assert(is_object($message)); + + $builder = $instrumentation + ->tracer() + ->spanBuilder($queueName.' publish') + ->setSpanKind(SpanKind::KIND_PRODUCER) + ->setAttribute(TraceAttributes::CODE_FUNCTION, $function) + ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) + ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) + ->setAttribute(TraceAttributes::CODE_LINENO, $lineno) + ->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $queueName) + ->setAttribute(self::OPERATION_TYPE, 'publish') + ->setAttribute(self::OPERATION_NAME, 'enqueue') + ; + + $parent = Context::getCurrent(); + $span = $builder + ->setParent($parent) + ->startSpan(); + + $context = $span->storeInContext($parent); + Context::storage()->attach($context); + + return $params; + }, + post: static function ( + Driver $driver, + array $params, + ?Envelope $envelope, + ?\Throwable $exception + ): void { + $scope = Context::storage()->scope(); + if (null === $scope) { + return; + } + + $scope->detach(); + $span = Span::fromContext($scope->context()); + + if (null !== $exception) { + $span->recordException($exception, [ + TraceAttributes::EXCEPTION_ESCAPED => true, + ]); + $span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage()); + } + + $span->end(); + } + ); + + return self::$registered; + } +} diff --git a/src/Otel/_register.php b/src/Otel/_register.php new file mode 100644 index 0000000..732f816 --- /dev/null +++ b/src/Otel/_register.php @@ -0,0 +1,36 @@ + + * + * For full copyright information see the LICENSE file distributed + * with this source code. + * + * @license http://opensource.org/licenses/Apache-2.0 Apache-2.0 + */ + +use PMG\Queue\Otel\PmgQueueInstrumentation; +use OpenTelemetry\API\Trace\Span; +use OpenTelemetry\Context\Context; +use OpenTelemetry\SemConv\TraceAttributes; +use OpenTelemetry\SDK\Sdk; + +// look for deps and if we have them all we'll load the instrumentation. +if ( + !extension_loaded('opentelemetry') || + !class_exists(Span::class) || + !class_exists(Context::class) || + !interface_exists(TraceAttributes::class) +) { + return; +} + + +// allow disabling instrumentation via the SDK's supported environment variables +if (class_exists(Sdk::class) && Sdk::isInstrumentationDisabled(PmgQueueInstrumentation::NAME)) { + return; +} + +PmgQueueInstrumentation::register(); diff --git a/test/integration/ConsumerOtelIntTest.php b/test/integration/ConsumerOtelIntTest.php new file mode 100644 index 0000000..3eeec74 --- /dev/null +++ b/test/integration/ConsumerOtelIntTest.php @@ -0,0 +1,139 @@ + + * + * For full copyright information see the LICENSE file distributed + * with this source code. + * + * @license http://opensource.org/licenses/Apache-2.0 Apache-2.0 + */ + +namespace PMG\Queue; + +use OpenTelemetry\API\Trace\SpanKind; +use OpenTelemetry\API\Trace\StatusCode; +use OpenTelemetry\SemConv\TraceAttributes; +use PMG\Queue\Otel\PmgQueueInstrumentation; +use PMG\Queue\Exception\SimpleMustStop; + +/** + * @requires extension opentelemetry + */ +class ConsumerOtelIntTest extends OtelIntegrationTestCase +{ + const Q = 'test'; + + private Driver $driver; + private Producer $producer; + + public function testConsumerOnceEmitsSpansWhenNoMessagesAreHandled() : void + { + $called = false; + $consumer = $this->createConsumer(function () use (&$called) { + $called = true; + return false; + }); + + $result = $consumer->once(self::Q); + + $this->assertNull($result); + $this->assertFalse($called); + $this->assertCount(1, $this->spans); + $span = $this->spans[0]; + $this->assertSame(self::Q.' receive', $span->getName()); + $attr = $span->getAttributes(); + $this->assertSame(self::Q, $attr->get(TraceAttributes::MESSAGING_DESTINATION_NAME)); + $this->assertSame('receive', $attr->get(PmgQueueInstrumentation::OPERATION_TYPE)); + $this->assertSame('once', $attr->get(PmgQueueInstrumentation::OPERATION_NAME)); + } + + public function testSuccessfullyHandledMessagesDoNotSetSpanStatusCode() : void + { + // this will produce a span + $this->producer->send(new SimpleMessage('test')); + $called = false; + $consumer = $this->createConsumer(function () use (&$called) : bool { + $called = true; + return true; + }); + + $result = $consumer->once(self::Q); + + $this->assertTrue($result); + $this->assertTrue($called); + $this->assertCount(2, $this->spans, 'one span for the enqueue one for consume'); + $span = $this->spans[1]; + $this->assertSame(self::Q.' receive', $span->getName()); + $status = $span->getStatus(); + $this->assertSame(StatusCode::STATUS_UNSET, $status->getCode()); + } + + public function testUnsuccessfulMessagesSetSpanStatusAsErrored() : void + { + $this->producer->send(new SimpleMessage('test')); + $called = false; + $consumer = $this->createConsumer(function () use (&$called) : bool { + $called = true; + return false; + }); + + $result = $consumer->once(self::Q); + + $this->assertFalse($result); + $this->assertTrue($called); + $this->assertCount(2, $this->spans, 'one span for the enqueue one for consume'); + $span = $this->spans[1]; + $this->assertSame(self::Q.' receive', $span->getName()); + $status = $span->getStatus(); + $this->assertSame(StatusCode::STATUS_ERROR, $status->getCode()); + $this->assertStringContainsStringIgnoringCase( + 'not handled successfully', + $status->getDescription() + ); + } + + public function testErrorsDuringOnceMarkSpanAsErrored() : void + { + // this will produce a span + $this->producer->send(new SimpleMessage('test')); + $consumer = $this->createConsumer(function () { + throw new SimpleMustStop('oh noz'); + }); + + $e = null; + try { + $consumer->once(self::Q); + } catch (SimpleMustStop $e) { + } + + $this->assertInstanceOf(SimpleMustStop::class, $e); + $this->assertCount(2, $this->spans, 'one span for the enqueue one for consume'); + $span = $this->spans[1]; + $this->assertSame(self::Q.' receive', $span->getName()); + $status = $span->getStatus(); + $this->assertSame(StatusCode::STATUS_ERROR, $status->getCode()); + $this->assertSame('oh noz', $status->getDescription()); + } + + protected function setUp() : void + { + parent::setUp(); + $this->driver = new Driver\MemoryDriver(); + $this->producer = new DefaultProducer( + $this->driver, + new Router\SimpleRouter(self::Q) + ); + } + + private function createConsumer(callable $handler) : DefaultConsumer + { + return new DefaultConsumer( + $this->driver, + new Handler\CallableHandler($handler), + new Retry\NeverSpec() + ); + } +} diff --git a/test/integration/OtelIntegrationTestCase.php b/test/integration/OtelIntegrationTestCase.php new file mode 100644 index 0000000..f8901bc --- /dev/null +++ b/test/integration/OtelIntegrationTestCase.php @@ -0,0 +1,60 @@ + + * + * For full copyright information see the LICENSE file distributed + * with this source code. + * + * @license http://opensource.org/licenses/Apache-2.0 Apache-2.0 + */ + +namespace PMG\Queue; + +use ArrayObject; +use OpenTelemetry\API\Instrumentation\Configurator; +use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator; +use OpenTelemetry\API\Baggage\Propagation\BaggagePropagator; +use OpenTelemetry\Context\ScopeInterface; +use OpenTelemetry\Context\Propagation\MultiTextMapPropagator; +use OpenTelemetry\SDK\Trace\ImmutableSpan; +use OpenTelemetry\SDK\Trace\SpanExporter\InMemoryExporter; +use OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor; +use OpenTelemetry\SDK\Trace\TracerProvider; + +abstract class OtelIntegrationTestCase extends IntegrationTestCase +{ + private ScopeInterface $scope; + + /** + * @var ArrayObject $storage + */ + protected ArrayObject $spans; + + protected function setUp(): void + { + $this->spans = new ArrayObject(); + $tracerProvider = new TracerProvider( + new SimpleSpanProcessor( + new InMemoryExporter($this->spans) + ) + ); + + $propagator = new MultiTextMapPropagator([ + TraceContextPropagator::getInstance(), + BaggagePropagator::getInstance(), + ]); + + $this->scope = Configurator::create() + ->withTracerProvider($tracerProvider) + ->withPropagator($propagator) + ->activate(); + } + + protected function tearDown(): void + { + $this->scope->detach(); + } +} diff --git a/test/integration/ProducerOtelIntTest.php b/test/integration/ProducerOtelIntTest.php new file mode 100644 index 0000000..f9e4846 --- /dev/null +++ b/test/integration/ProducerOtelIntTest.php @@ -0,0 +1,83 @@ + + * + * For full copyright information see the LICENSE file distributed + * with this source code. + * + * @license http://opensource.org/licenses/Apache-2.0 Apache-2.0 + */ + +namespace PMG\Queue; + +use LogicException; +use OpenTelemetry\API\Trace\SpanKind; +use OpenTelemetry\API\Trace\StatusCode; +use OpenTelemetry\SemConv\TraceAttributes; +use PMG\Queue\Otel\PmgQueueInstrumentation; +use PMG\Queue\Exception\SimpleMustStop; + +/** + * @requires extension opentelemetry + */ +class ProducerOtelIntTest extends OtelIntegrationTestCase +{ + const Q = 'test'; + + private Driver $driver; + private Producer $producer; + + public function testSendingMessagesProducesSpans() : void + { + $this->producer->send(new SimpleMessage('test')); + + $this->assertCount(1, $this->spans); + $span = $this->spans[0]; + $this->assertSame(self::Q.' publish', $span->getName()); + $status = $span->getStatus(); + $this->assertSame(StatusCode::STATUS_UNSET, $status->getCode()); + $attr = $span->getAttributes(); + $this->assertSame(self::Q, $attr->get(TraceAttributes::MESSAGING_DESTINATION_NAME)); + $this->assertSame('publish', $attr->get(PmgQueueInstrumentation::OPERATION_TYPE)); + $this->assertSame('enqueue', $attr->get(PmgQueueInstrumentation::OPERATION_NAME)); + } + + public function testErrorsDuringSendMarkSpansAsErrored() : void + { + $driver = $this->createMock(Driver::class); + $driver->expects($this->once()) + ->method('enqueue') + ->willThrowException(new LogicException('ope')); + $producer = new DefaultProducer( + $driver, + new Router\SimpleRouter(self::Q) + ); + + $e = null; + try { + $producer->send(new SimpleMessage('test')); + } catch (LogicException $e) { + } + + $this->assertInstanceOf(LogicException::class, $e); + $this->assertCount(1, $this->spans); + $span = $this->spans[0]; + $this->assertSame(self::Q.' publish', $span->getName()); + $status = $span->getStatus(); + $this->assertSame(StatusCode::STATUS_ERROR, $status->getCode()); + $this->assertSame('ope', $status->getDescription()); + } + + protected function setUp() : void + { + parent::setUp(); + $this->driver = new Driver\MemoryDriver(); + $this->producer = new DefaultProducer( + $this->driver, + new Router\SimpleRouter(self::Q) + ); + } +}