Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[scheduler] pekko integration #1032

Merged
merged 4 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ lazy val kyoJVM = project
`kyo-scheduler-zio`.jvm,
`kyo-scheduler-cats`.jvm,
`kyo-scheduler-finagle`.jvm,
`kyo-scheduler-pekko`.jvm,
`kyo-data`.jvm,
`kyo-kernel`.jvm,
`kyo-prelude`.jvm,
Expand Down Expand Up @@ -200,6 +201,7 @@ lazy val `kyo-scheduler-zio` = sbtcrossproject.CrossProject("kyo-scheduler-zio",
scalacOptions ++= scalacOptionToken(ScalacOptions.source3).value,
crossScalaVersions := List(scala3Version, scala212Version, scala213Version)
)

lazy val `kyo-scheduler-cats` =
crossProject(JVMPlatform)
.withoutSuffixFor(JVMPlatform)
Expand All @@ -216,6 +218,23 @@ lazy val `kyo-scheduler-cats` =
crossScalaVersions := List(scala3Version, scala212Version, scala213Version)
)

lazy val `kyo-scheduler-pekko` =
crossProject(JVMPlatform)
.withoutSuffixFor(JVMPlatform)
.crossType(CrossType.Full)
.dependsOn(`kyo-scheduler`)
.in(file("kyo-scheduler-pekko"))
.settings(
`kyo-settings`,
libraryDependencies += "org.apache.pekko" %%% "pekko-actor" % "1.1.3",
libraryDependencies += "org.apache.pekko" %%% "pekko-testkit" % "1.1.3" % Test
)
.jvmSettings(mimaCheck(false))
.settings(
scalacOptions ++= scalacOptionToken(ScalacOptions.source3).value,
crossScalaVersions := List(scala3Version, scala212Version, scala213Version)
)

lazy val `kyo-scheduler-finagle` =
crossProject(JVMPlatform)
.withoutSuffixFor(JVMPlatform)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package kyo.scheduler

import com.typesafe.config.Config
import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadFactory
import org.apache.pekko.dispatch.DispatcherPrerequisites
import org.apache.pekko.dispatch.ExecutorServiceConfigurator
import org.apache.pekko.dispatch.ExecutorServiceFactory

/** A Pekko ExecutorServiceConfigurator that integrates Kyo's adaptive scheduling capabilities with Pekko's dispatcher system. The
* configurator enables Kyo's scheduler to handle all actor executions within your Pekko application, allowing it to make optimal thread
* utilization decisions by having full visibility of the workload.
*
* To use Kyo's scheduler in your Pekko application, configure it as the default dispatcher:
*
* {{{
* pekko.actor.default-dispatcher {
* type = "Dispatcher"
* executor = "kyo.scheduler.KyoExecutorServiceConfigurator"
* }
* }}}
*
* The configurator uses Kyo's scheduler singleton instance, allowing it to share resources and optimization decisions across the entire
* application. By handling all actor executions, it can efficiently adapt to varying workloads and system conditions, optimizing thread
* utilization across your entire application.
*
* For effective load management, use Kyo's admission control through Scheduler.get.reject() methods at the boundaries of your application
* where external work enters the system. See the Admission class documentation for details on admission control behavior and
* configuration.
*
* @param config
* The dispatcher configuration from Pekko
* @param prerequisites
* Core Pekko prerequisites for dispatcher creation
*
* @see
* [[kyo.scheduler.Scheduler]] for details on the underlying scheduling capabilities, admisison `reject` methods, and available
* configurations.
* @see
* [[kyo.scheduler.regulator.Admission]] for details on admission control behavior
* @see
* [[org.apache.pekko.dispatch.ExecutorServiceConfigurator]] for the Pekko dispatcher interface
*/
class KyoExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we configure the Kyo scheduler in detail? Seems it's just using the default one.

Copy link
Collaborator Author

@fwbrasil fwbrasil Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kyo's scheduler is designed to work as a JVM-global resource so it doesn't allow creating separate instances. It has several configs that can be set via system properties but I'd recommend trying the defaults first.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elegant


override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory) = {
val exec = Scheduler.get.asExecutorService
new ExecutorServiceFactory {
def createExecutorService = exec
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package kyo.scheduler

import com.typesafe.config.ConfigFactory
import java.util.concurrent.CountDownLatch
import org.apache.pekko.actor.Actor
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.actor.Props
import org.apache.pekko.pattern.ask
import org.apache.pekko.testkit.TestKit
import org.apache.pekko.testkit.TestProbe
import org.apache.pekko.util.Timeout
import org.scalatest.BeforeAndAfterAll
import org.scalatest.NonImplicitAssertions
import org.scalatest.freespec.AnyFreeSpecLike
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.*

class KyoExecutorServiceConfiguratorTest
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@He-Pin I think the last time I used Akka/Pekko was a decade ago :) Does it have custom locals/threadlocals that we need to check for propagation like in Finagle's integration?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, NO, thread local is rarely used.

extends TestKit(ActorSystem(
"KyoTest",
ConfigFactory.parseString("""
pekko.actor.default-dispatcher {
type = "Dispatcher"
executor = "kyo.scheduler.KyoExecutorServiceConfigurator"
}
""")
))
with AnyFreeSpecLike
with NonImplicitAssertions
with BeforeAndAfterAll {

implicit def timeout: Timeout = Timeout(5.seconds)
implicit def execCtx: ExecutionContext = Scheduler.get.asExecutionContext

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

"executes tasks on kyo threads" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case msg => sender() ! Thread.currentThread().getName
}
}))

val threadName = Await.result((actor ? "test").mapTo[String], 5.seconds)
assert(threadName.contains("kyo"))
}

"handles concurrent messages" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case msg => sender() ! Thread.currentThread().getName
}
}))

val futures = (1 to 1000).map(_ => (actor ? "test").mapTo[String])
val threadNames = Await.result(Future.sequence(futures), 5.seconds)

assert(threadNames.forall(_.contains("kyo")))
assert(threadNames.toSet.size > 1)
}

"handles multiple actors" in {
val actors =
(1 to 10).map { i =>
system.actorOf(Props(new Actor {
def receive = {
case msg => sender() ! Thread.currentThread().getName
}
}))
}

val futures =
for {
actor <- actors
_ <- 1 to 10
} yield (actor ? "test").mapTo[String]

val threadNames = Await.result(Future.sequence(futures), 5.seconds)
assert(threadNames.forall(_.contains("kyo")))
assert(threadNames.toSet.size > 1)
}
}
Loading