diff --git a/build.sbt b/build.sbt index 111594f17..a8b60805a 100644 --- a/build.sbt +++ b/build.sbt @@ -242,9 +242,9 @@ lazy val `kamon-cats-io` = (project in file("instrumentation/kamon-cats-io")) kanelaAgent % "provided", { if(scalaBinaryVersion.value == "2.11") - "org.typelevel" %% "cats-effect" % "2.0.0" % "provided" + "org.typelevel" %% "cats-effect" % "3.3.5" % "provided" else - "org.typelevel" %% "cats-effect" % "2.1.2" % "provided" + "org.typelevel" %% "cats-effect" % "3.3.5" % "provided" }, scalatest % "test", logbackClassic % "test" diff --git a/instrumentation/kamon-cats-io/src/main/resources/reference.conf b/instrumentation/kamon-cats-io/src/main/resources/reference.conf index 9d31317c2..30ffac0c8 100644 --- a/instrumentation/kamon-cats-io/src/main/resources/reference.conf +++ b/instrumentation/kamon-cats-io/src/main/resources/reference.conf @@ -1,10 +1,19 @@ ############################################# # Kamon Cats IO Reference Configuration # ############################################# - kanela.modules { executor-service { - within += "cats.effect.internals.IOShift\\$Tick" - within += "cats.effect.internals.IOTimer\\$ShiftTick" + within += "cats.effect.*" + } + cats-fibers { + name = "Cats-IO Instrumentation" + description = "Provides instrumentation for Cats IO Fibers" + + instrumentations = [ + "kamon.instrumentation.cats.IOFiberInstrumentation" + ] + within = [ + "^cats.effect.*" + ] } } \ No newline at end of file diff --git a/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala new file mode 100644 index 000000000..5db08cbdc --- /dev/null +++ b/instrumentation/kamon-cats-io/src/main/scala/kamon/instrumentation/cats/IOFiberInstrumentation.scala @@ -0,0 +1,161 @@ +package kamon.instrumentation.cats + +import cats.effect.{IO, IOLocal} +import kamon.Kamon +import kamon.context.Context +import kamon.instrumentation.context.HasContext +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.asm.Advice + + +class IOFiberInstrumentation extends InstrumentationBuilder { + + /**Approach: RunLoop Instrumentation**/ + //onTypes("cats.effect.IOFiber") + // .advise(anyMethods( + // "runLoop", + // ), InstrumentRunLoop) + /**Approach: RunLoop Instrumentation**/ + + + + /**Approach: Instrumenting run() and "forks" **/ + onTypes("cats.effect.IOFiber") + .advise(method("run"), RestoreContextFromFiber) + .advise(method("run"), SaveCurrentContextOnExit) + + onTypes("cats.effect.IOFiber") + .advise(anyMethods( + "rescheduleFiber", + "scheduleFiber", + "scheduleOnForeignEC", + ), SetContextOnNewFiber) + onTypes("cats.effect.unsafe.WorkStealingThreadPool") + .advise(anyMethods("scheduleFiber", "rescheduleFiber", "scheduleExternal"),SetContextOnNewFiberForWSTP) + /**Approach: More efficient solution**/ + + + + /**Debug: begin**/ + onTypes("cats.effect.IOFiber") + .advise(anyMethods( + "runLoop", + ), Debug) + /**Debug: end**/ + +} +object Helper { + def padTo(obj: Any, len: Int): String = + obj.toString.take(len).padTo(len, " ").mkString("") + + def setIfNotEmpty(f: HasContext)(ctx: Context): Unit = + if(ctx.nonEmpty()){ + f.setContext(ctx) + } + + def setCurrentCtxIfNotEmpty(ctx: Context): Unit = + if(ctx.nonEmpty()){ + Kamon.storeContext(ctx) + } +} +import Helper._ + + +object RestoreContextFromFiber { + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.This fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"run(enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + val ctxFiber = fiber.asInstanceOf[HasContext].context + setCurrentCtxIfNotEmpty(ctxFiber) + } +} + +object SaveCurrentContextOnExit { + @Advice.OnMethodExit(suppress = classOf[Throwable]) + def exit(@Advice.This fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"run(exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + val currentCtx = Kamon.currentContext() + setIfNotEmpty(fiber.asInstanceOf[HasContext])(currentCtx) + } +} + + +object SetContextOnNewFiber { + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.This currFiber: Any, @Advice.Argument(1) fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"ScheduleNew | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(currFiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + val currentCtx = Kamon.currentContext() + setIfNotEmpty(fiber.asInstanceOf[HasContext])(currentCtx) + } +} + +object SetContextOnNewFiberForWSTP { + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.Argument(0) fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"ScheduleNew | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("unknown", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + val currentCtx = Kamon.currentContext() + setIfNotEmpty(fiber.asInstanceOf[HasContext])(currentCtx) + } +} + +object InstrumentRunLoop { + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.This fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"run(enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + val ctxFiber = fiber.asInstanceOf[HasContext].context + Kamon.storeContext(ctxFiber) + } + + @Advice.OnMethodExit(suppress = classOf[Throwable]) + def exit(@Advice.This fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"run(exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: {${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(fiber.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + val currentCtx = Kamon.currentContext() + fiber.asInstanceOf[HasContext].setContext(currentCtx) + } +} + +object Debug { + + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.This fiber: Any, @Advice.Argument(0) io :Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"runLoop(Enter) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + } + + @Advice.OnMethodExit(suppress = classOf[Throwable]) + def exit(@Advice.This fiber: Any, @Advice.Argument(0) io :Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"runLoop(Exit) | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo(fiber.hashCode(), 10)} | ToBeScheduledFiberId: ${padTo("NA", 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(io.getClass.getCanonicalName, 25)} | Thread ${Thread.currentThread().getName}") + } +} + +object DebugWT { + + @Advice.OnMethodEnter(suppress = classOf[Throwable]) + def enter(@Advice.Argument(0) fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("undefined", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(-1, 25)} | Thread ${Thread.currentThread().getName}") + } + + @Advice.OnMethodExit(suppress = classOf[Throwable]) + def exit(@Advice.Argument(0) fiber: Any): Unit = { + val field = fiber.getClass.getDeclaredField("resumeTag") + field.setAccessible(true) + //println(s"WorkerThread | Resume Tag: ${field.get(fiber)} | CurrFiberId: ${padTo("undefined", 10)} | ToBeScheduledFiberId: ${padTo(fiber.hashCode(), 10)} | Fiber: ${padTo(fiber.asInstanceOf[HasContext].context.tags, 15)} | Thread ${padTo(Kamon.currentContext().tags, 15)} | IO: ${padTo(-1, 25)} | Thread ${Thread.currentThread().getName}") + } +} diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala deleted file mode 100644 index d8298338f..000000000 --- a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIOInstrumentationSpec.scala +++ /dev/null @@ -1,60 +0,0 @@ -package kamon.instrumentation.futures.cats - -import cats.effect.{ContextShift, IO} -import kamon.Kamon -import kamon.context.Context -import kamon.tag.Lookups.plain -import org.scalatest.OptionValues -import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec - -import java.util.concurrent.Executors -import scala.concurrent.ExecutionContext -import scala.concurrent.ExecutionContext.global -import scala.concurrent.duration._ - -class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutures with PatienceConfiguration - with OptionValues with Eventually { - - // NOTE: We have this test just to ensure that the Context propagation is working, but starting with Kamon 2.0 there - // is no need to have explicit Runnable/Callable instrumentation because the instrumentation brought by the - // kamon-executors module should take care of all non-JDK Runnable/Callable implementations. - - "an cats.effect IO created when instrumentation is active" should { - "capture the active span available when created" which { - "must be available across asynchronous boundaries" in { - implicit val ctxShift: ContextShift[IO] = IO.contextShift(global) - val anotherExecutionContext: ExecutionContext = - ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) - val context = Context.of("key", "value") - - implicit val timer = IO.timer(global) - - val contextTagAfterTransformations = - for { - scope <- IO { - Kamon.storeContext(context) - } - len <- IO("Hello Kamon!").map(_.length) - _ <- IO(len.toString) - _ <- IO.shift(global) - _ <- IO.shift - _ <- IO.sleep(Duration.Zero) - _ <- IO.shift(anotherExecutionContext) - } yield { - val tagValue = Kamon.currentContext().getTag(plain("key")) - scope.close() - tagValue - } - - val contextTagFuture = contextTagAfterTransformations.unsafeToFuture() - - - eventually(timeout(10 seconds)) { - contextTagFuture.value.get.get shouldBe "value" - } - } - } - } -} \ No newline at end of file diff --git a/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala new file mode 100644 index 000000000..7b6579a57 --- /dev/null +++ b/instrumentation/kamon-cats-io/src/test/scala/kamon/instrumentation/futures/cats/CatsIoInstrumentationSpec.scala @@ -0,0 +1,151 @@ +package kamon.instrumentation.futures.cats + +import cats.effect.unsafe.{IORuntime, IORuntimeConfig, Scheduler} +import cats.effect.{IO, Resource, Spawn} +import kamon.Kamon +import kamon.context.Context +import kamon.tag.Lookups.plain +import org.scalatest.OptionValues +import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import java.util.concurrent.Executors +import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.ExecutionContext.global +import scala.concurrent.duration._ +import cats.implicits._ +import kamon.trace.Identifier.Scheme +import kamon.trace.{Identifier, Span, Trace} + +class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutures with PatienceConfiguration + with OptionValues with Eventually { + + // NOTE: We have this test just to ensure that the Context propagation is working, but starting with Kamon 2.0 there + // is no need to have explicit Runnable/Callable instrumentation because the instrumentation brought by the + // kamon-executors module should take care of all non-JDK Runnable/Callable implementations. + "an cats.effect IO created when instrumentation is active" should { + "capture the active span available when created" which { + + "must allow the context to be cleaned" in { + val runtime = IORuntime.global + val anotherExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(10)) + val context = Context.of("key", "value") + val test = + for { + _ <- IO.delay(Kamon.storeContext(context)) + _ <- Spawn[IO].evalOn(IO.sleep(0.seconds), anotherExecutionContext) + _ <- IO.delay(Kamon.storeContext(Context.Empty)) + _ <- Spawn[IO].evalOn(IO.sleep(0.seconds), anotherExecutionContext) + afterCleaning <- IO.delay(Kamon.currentContext()) + } yield { + afterCleaning shouldBe Context.Empty + } + + test.unsafeRunSync()(runtime) + } + + "must be available across asynchronous boundaries" in { + val runtime = IORuntime.apply( + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)), //pool 4 + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)), // pool 5 + Scheduler.fromScheduledExecutor(Executors.newSingleThreadScheduledExecutor()), //pool 6 + () => (), + IORuntimeConfig.apply() + ) + val anotherExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) //pool 7 + val context = Context.of("key", "value") + val test = + for { + scope <- IO.delay({println("Context will be set"); Kamon.storeContext(context);}) + len <- IO("Hello Kamon!").map(_.length) + _ <- IO(len.toString) + beforeChanging <- getKey() + evalOnGlobalRes <- Spawn[IO].evalOn(IO.sleep(Duration.Zero).flatMap(_ => getKey()), global) + outerSpanIdBeginning <- IO.delay(Kamon.currentSpan().id.string) + innerSpan <- IO.delay(Kamon.clientSpanBuilder("Foo", "attempt").context(context).start()) + innerSpanId1 <- Spawn[IO].evalOn(IO.delay(Kamon.currentSpan()), anotherExecutionContext) + innerSpanId2 <- IO.delay(Kamon.currentSpan()) + _ <- IO.delay(innerSpan.finish()) + outerSpanIdEnd <- IO.delay(Kamon.currentSpan().id.string) + evalOnAnotherEx <- Spawn[IO].evalOn(IO.sleep(Duration.Zero).flatMap(_ => getKey()), anotherExecutionContext) + } yield { + scope.close() + withClue("before changing")(beforeChanging shouldBe "value") + withClue("on the global exec context")(evalOnGlobalRes shouldBe "value") + withClue("on a different exec context")(evalOnAnotherEx shouldBe "value") + withClue("final result")(evalOnAnotherEx shouldBe "value") + withClue("inner span should be the same on different exec")(innerSpanId1 shouldBe innerSpan) + withClue("inner span should be the same on same exec")(innerSpanId2 shouldBe innerSpan) + withClue("inner and outer should be different")(outerSpanIdBeginning should not equal innerSpan) + } + + test.unsafeRunSync()(runtime) + } + + "must allow complex Span topologies to be created" in { + val parentSpan = Span.Remote( + Scheme.Single.spanIdFactory.generate(), + Identifier.Empty, + Trace.create(Scheme.Single.traceIdFactory.generate(), Trace.SamplingDecision.Sample) + ) + val context = Context.of(Span.Key, parentSpan) + implicit val ec = ExecutionContext.global + /** + * test + * - nestedLevel0 + * - nestedUpToLevel2 + * - nestedUpToLevel2._2._1 + * - fiftyInParallel + */ + val test = for { + span <- IO.delay(Kamon.currentSpan()) + nestedLevel0 <- meteredWithSpanCapture("level1-A")(IO.sleep(100.millis)) + nestedUpToLevel2 <- meteredWithSpanCapture("level1-B")(meteredWithSpanCapture("level2-B")(IO.sleep(100.millis))) + fiftyInParallel <- (0 to 49).toList.parTraverse(i => meteredWithSpanCapture(s"operation$i")(IO.sleep(100.millis))) + afterCede <- meteredWithSpanCapture("cede")(IO.cede *> IO.delay(Kamon.currentSpan())) + afterEverything <- IO.delay(Kamon.currentSpan()) + } yield { + span.id.string should not be empty + span.id.string shouldBe nestedLevel0._1.parentId.string + span.id.string shouldBe nestedUpToLevel2._1.parentId.string + nestedUpToLevel2._1.id.string shouldBe nestedUpToLevel2._2._1.parentId.string + fiftyInParallel.map(_._1.parentId.string).toSet shouldBe Set(span.id.string) + fiftyInParallel.map(_._1.id.string).toSet should have size 50 + afterCede._1.id.string shouldBe afterCede._2.id.string //A cede should not cause the span to be lost + afterEverything.id.string shouldBe span.id.string + } + val runtime = IORuntime.global + + val result = scala.concurrent.Future.sequence( + (1 to 100).toList.map(_ => (IO.delay(Kamon.init()) *> IO.delay(Kamon.storeContext(context)) *> test).unsafeToFuture()(runtime)) + ) + Await.result(result, 10.seconds) + } + } + } + private def getKey(): IO[String] = { + IO.delay(Kamon.currentContext().getTag(plain("key"))) + } + + private def meteredWithSpanCapture[A](operation: String)(io: IO[A]): IO[(Span, A)] = { + Resource.make{ + for { + initialCtx <- IO(Kamon.currentContext()) + parentSpan <- IO(Kamon.currentSpan()) + newSpan <- IO(Kamon.spanBuilder(operation).context(initialCtx).asChildOf(parentSpan).start()) + _ <- IO(Kamon.storeContext(initialCtx.withEntry(Span.Key, newSpan))) + } yield (initialCtx, newSpan) + }{ + case (initialCtx, span) => + for { + _ <- IO.delay(span.finish()) + _ <- IO.delay(Kamon.storeContext(initialCtx)) + } yield () + } + .use(_ => (IO.delay(Kamon.currentSpan()), io).parBisequence) + } + + + +} \ No newline at end of file diff --git a/instrumentation/kamon-jdbc/src/test/resources/logback.xml b/instrumentation/kamon-jdbc/src/test/resources/logback.xml index c336bbfe4..e69de29bb 100644 --- a/instrumentation/kamon-jdbc/src/test/resources/logback.xml +++ b/instrumentation/kamon-jdbc/src/test/resources/logback.xml @@ -1,12 +0,0 @@ - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - \ No newline at end of file