Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace chill-akka with akka-kryo-serialization #3288

Merged
merged 3 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions core/amber/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ libraryDependencies += "org.jgrapht" % "jgrapht-core" % "1.4.0"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "4.5.4"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "4.5.4" classifier "models"

// https://mvnrepository.com/artifact/com.twitter/chill-akka
libraryDependencies += "com.twitter" %% "chill-akka" % "0.10.0"
libraryDependencies += "io.altoo" %% "akka-kryo-serialization" % "2.5.0"

// https://mvnrepository.com/artifact/com.twitter/util-core
libraryDependencies += "com.twitter" %% "util-core" % "22.12.0"
Expand Down
2 changes: 1 addition & 1 deletion core/amber/src/main/resources/cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ akka {
enable-additional-serialization-bindings = on
allow-java-serialization = off
serializers {
kryo = "com.twitter.chill.akka.AkkaSerializer"
kryo = "io.altoo.akka.serialization.kryo.KryoSerializer"
}
serialization-bindings {
"java.io.Serializable" = kryo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import edu.uci.ics.amber.engine.common.storage.{EmptyRecordStorage, SequentialRe
import edu.uci.ics.amber.core.virtualidentity.{ChannelIdentity, ChannelMarkerIdentity}

//In-mem formats:
sealed trait ReplayLogRecord
sealed trait ReplayLogRecord extends Serializable

case class MessageContent(message: WorkflowFIFOMessage) extends ReplayLogRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import org.apache.commons.io.input.NullInputStream
import org.apache.hadoop.io.IOUtils.NullOutputStream

import java.io.{DataInputStream, DataOutputStream}
import scala.reflect.ClassTag

class EmptyRecordStorage[T >: Null <: AnyRef] extends SequentialRecordStorage[T] {
class EmptyRecordStorage[T >: Null <: AnyRef: ClassTag] extends SequentialRecordStorage[T] {
override def getWriter(fileName: String): SequentialRecordWriter[T] = {
new SequentialRecordWriter(
new DataOutputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import java.net.URI
import scala.reflect.ClassTag

class HDFSRecordStorage[T >: Null <: AnyRef](hdfsLogFolderURI: URI)
class HDFSRecordStorage[T >: Null <: AnyRef: ClassTag](hdfsLogFolderURI: URI)
extends SequentialRecordStorage[T]
with LazyLogging {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
package edu.uci.ics.amber.engine.common.storage

import com.esotericsoftware.kryo.io.{Input, Output}
import com.twitter.chill.{KryoBase, KryoPool, KryoSerializer, ScalaKryoInstantiator}
import edu.uci.ics.amber.engine.architecture.logreplay.{
MessageContent,
ProcessingStep,
ReplayLogRecord
}
import edu.uci.ics.amber.engine.architecture.worker.statistics.WorkerState
import edu.uci.ics.amber.engine.common.AmberRuntime
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage.{
SequentialRecordReader,
SequentialRecordWriter
Expand All @@ -16,22 +10,9 @@ import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage.{
import java.io.{DataInputStream, DataOutputStream}
import java.net.URI
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{ClassTag, classTag}

object SequentialRecordStorage {
private val kryoPool = {
val r = KryoSerializer.registerAll
val ki = new ScalaKryoInstantiator {
override def newKryo(): KryoBase = {
val kryo = super.newKryo()
kryo.register(classOf[ReplayLogRecord])
kryo.register(classOf[MessageContent])
kryo.register(classOf[ProcessingStep])
kryo.register(classOf[WorkerState])
kryo
}
}.withRegistrar(r)
KryoPool.withByteArrayOutputStream(Runtime.getRuntime.availableProcessors * 2, ki)
}

// For debugging purpose only
def fetchAllRecords[T >: Null <: AnyRef](
Expand All @@ -50,7 +31,7 @@ object SequentialRecordStorage {
class SequentialRecordWriter[T >: Null <: AnyRef](outputStream: DataOutputStream) {
lazy val output = new Output(outputStream)
def writeRecord(obj: T): Unit = {
val bytes = kryoPool.toBytesWithClass(obj)
val bytes = AmberRuntime.serde.serialize(obj).get
output.writeInt(bytes.length)
output.write(bytes)
}
Expand All @@ -62,7 +43,11 @@ object SequentialRecordStorage {
}
}

class SequentialRecordReader[T >: Null <: AnyRef](inputStreamGen: () => DataInputStream) {
class SequentialRecordReader[T >: Null <: AnyRef: ClassTag](
inputStreamGen: () => DataInputStream
) {
val clazz = classTag[T].runtimeClass.asInstanceOf[Class[T]]

def mkRecordIterator(): Iterator[T] = {
lazy val input = new Input(inputStreamGen())
new Iterator[T] {
Expand All @@ -71,7 +56,7 @@ object SequentialRecordStorage {
try {
val len = input.readInt()
val bytes = input.readBytes(len)
kryoPool.fromBytes(bytes).asInstanceOf[T]
AmberRuntime.serde.deserialize(bytes, clazz).get
} catch {
case e: Throwable =>
input.close()
Expand All @@ -88,7 +73,9 @@ object SequentialRecordStorage {
}
}

def getStorage[T >: Null <: AnyRef](storageLocation: Option[URI]): SequentialRecordStorage[T] = {
def getStorage[T >: Null <: AnyRef: ClassTag](
storageLocation: Option[URI]
): SequentialRecordStorage[T] = {
storageLocation match {
case Some(location) =>
if (location.getScheme.toLowerCase == "hdfs") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import org.apache.commons.vfs2.{FileObject, FileSystemManager, VFS}

import java.io.{DataInputStream, DataOutputStream}
import java.net.URI
import scala.reflect.ClassTag

class VFSRecordStorage[T >: Null <: AnyRef](vfsLogFolderURI: URI)
class VFSRecordStorage[T >: Null <: AnyRef: ClassTag](vfsLogFolderURI: URI)
extends SequentialRecordStorage[T]
with LazyLogging {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.uci.ics.amber.engine.faulttolerance

import akka.actor.ActorSystem
import akka.serialization.SerializationExtension
import akka.testkit.{ImplicitSender, TestKit}
import edu.uci.ics.amber.core.tuple.{AttributeType, Schema, TupleLike}
import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayLogManager, ReplayLogRecord}
Expand Down Expand Up @@ -32,16 +33,22 @@ import edu.uci.ics.amber.core.virtualidentity.{
PhysicalOpIdentity
}
import edu.uci.ics.amber.core.workflow.{PhysicalLink, PortIdentity}
import edu.uci.ics.amber.engine.common.AmberRuntime
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpecLike

import java.net.URI

class LoggingSpec
extends TestKit(ActorSystem("LoggingSpec"))
extends TestKit(ActorSystem("LoggingSpec", AmberRuntime.akkaConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll {

override def beforeAll(): Unit = {
AmberRuntime.serde = SerializationExtension(system)
}

private val identifier1 = ActorVirtualIdentity("Worker:WF1-E1-op-layer-1")
private val identifier2 = ActorVirtualIdentity("Worker:WF1-E1-op-layer-2")
private val operatorIdentity = OperatorIdentity("testOperator")
Expand Down
Loading