diff --git a/.travis.yml b/.travis.yml index a971878e..9252bc13 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,8 +2,8 @@ dist: trusty jdk: oraclejdk8 language: scala scala: - - 2.12.8 # keep build.sbt crossScalaVersions synced with these versions, top version should be newest - - 2.11.8 + - 2.13.4 # keep build.sbt crossScalaVersions synced with these versions, top version should be newest + - 2.12.12 stages: - name: test - name: release @@ -12,7 +12,7 @@ jobs: include: - stage: release script: .travis/publish.sh - - scala: "2.11.8" + - scala: "2.12.12" script: .travis/publish.sh before_install: diff --git a/build.sbt b/build.sbt index 1d5726bf..e991bcf4 100644 --- a/build.sbt +++ b/build.sbt @@ -16,10 +16,10 @@ import java.lang.{Runtime => JRuntime} name := "sirius" -version := "2.2.1" +version := "2.3.0" -scalaVersion := "2.12.8" -crossScalaVersions := Seq("2.11.8", "2.12.8") // NOTE: keep sync'd with .travis.yml +scalaVersion := "2.13.4" +crossScalaVersions := Seq("2.12.13", "2.13.4") // NOTE: keep sync'd with .travis.yml organization := "com.comcast" @@ -29,19 +29,21 @@ resolvers += "Typesafe Public Repo" at "https://repo.typesafe.com/typesafe/relea resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/" libraryDependencies ++= { - val akkaV = "2.4.20" + val akkaV = "2.6.11" Seq( "com.typesafe.akka" %% "akka-actor" % akkaV, "com.typesafe.akka" %% "akka-remote" % akkaV, + "com.typesafe.akka" %% "akka-cluster" % akkaV, "com.typesafe.akka" %% "akka-slf4j" % akkaV, - "com.typesafe.akka" %% "akka-agent" % akkaV, - "org.slf4j" % "slf4j-api" % "1.7.7", - "com.github.pathikrit" %% "better-files" % "3.8.0", - "org.scalatest" %% "scalatest" % "3.0.5" % "test", + "io.netty" % "netty" % "3.10.6.Final", + "org.scala-stm" %% "scala-stm" % "0.11.0", + "org.slf4j" % "slf4j-api" % "1.7.30", + "com.github.pathikrit" %% "better-files" % "3.9.1", + "org.scalatest" %% "scalatest" % "3.0.8" % "test", "org.mockito" % "mockito-core" % "1.10.19" % "test", "junit" % "junit" % "4.12" % "test", - "org.slf4j" % "slf4j-log4j12" % "1.7.7" % "test", + "org.slf4j" % "slf4j-log4j12" % "1.7.30" % "test", "log4j" % "log4j" % "1.2.17" % "test", "com.typesafe.akka" %% "akka-testkit" % akkaV % "test" ) diff --git a/project/build.properties b/project/build.properties index a82bb05e..d91c272d 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.7 +sbt.version=1.4.6 diff --git a/src/main/resources/sirius-akka-base.conf b/src/main/resources/sirius-akka-base.conf index 7d43d0c2..26e844c3 100644 --- a/src/main/resources/sirius-akka-base.conf +++ b/src/main/resources/sirius-akka-base.conf @@ -4,40 +4,40 @@ akka { log-config-on-startup = off actor { - provider = "akka.remote.RemoteActorRefProvider" + provider = "cluster" } remote { - netty.tcp = ${common-netty-settings} - netty.ssl = ${common-netty-settings} - netty.ssl.security { - protocol = "TLSv1" - # random-number-generator defaults to SecureRandom - enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA] + artery { + enabled = off } - gate-invalid-addresses-for = 5 s - quarantine-systems-for = off - transport-failure-detector { - acceptable-heartbeat-pause = 20 s - } - watch-failure-detector { - acceptable-heartbeat-pause = 10 s + classic { + enabled-transports = ["akka.remote.classic.netty.tcp"] + netty.tcp = ${common-netty-settings} + netty.ssl = ${common-netty-settings} + + netty.ssl.security { + protocol = "TLSv1" + random-number-generator = "" + enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA] + } } - retry-gate-closed-for = 0.2 s } } + + common-netty-settings { - # defaults to InetAddress.getLocalHost.getHostAddress - hostname = "" - port = 2552 - - # this is probably too liberal - connection-timeout = 2s - - # how long to try to send a message before deciding - # it ain't happening, this is important to make sure - # we don't spend too long trying to talk to a node - # that clearly isn't online - backoff-timeout = 250ms -} +# defaults to InetAddress.getLocalHost.getHostAddress +hostname = "" +port = 2552 + +# this is probably too liberal +connection-timeout = 2s + +# how long to try to send a message before deciding +# it ain't happening, this is important to make sure +# we don't spend too long trying to talk to a node +# that clearly isn't online +backoff-timeout = 250ms +} \ No newline at end of file diff --git a/src/main/scala/akka/agent/Agent.scala b/src/main/scala/akka/agent/Agent.scala new file mode 100644 index 00000000..db2c2eb4 --- /dev/null +++ b/src/main/scala/akka/agent/Agent.scala @@ -0,0 +1,261 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.agent + +import scala.concurrent.stm._ +import scala.concurrent.{ ExecutionContext, Future, Promise } +import akka.util.SerializedSuspendableExecutionContext + +@deprecated( + "Agents are deprecated and scheduled for removal in the next major version, use Actors instead.", + since = "2.5.0") +object Agent { + + /** + * Factory method for creating an Agent. + */ + @deprecated( + "Agents are deprecated and scheduled for removal in the next major version, use Actors instead.", + since = "2.5.0") + def apply[T](initialValue: T)(implicit context: ExecutionContext): Agent[T] = new SecretAgent(initialValue, context) + + /** + * Java API: Factory method for creating an Agent. + * @deprecated Agents are deprecated and scheduled for removal in the next major version, use Actors instead.i + */ + @Deprecated + @deprecated( + "Agents are deprecated and scheduled for removal in the next major version, use Actors instead.", + since = "2.5.0") + def create[T](initialValue: T, context: ExecutionContext): Agent[T] = Agent(initialValue)(context) + + /** + * Default agent implementation. + */ + private final class SecretAgent[T](initialValue: T, context: ExecutionContext) extends Agent[T] { + private val ref = Ref(initialValue) + private val updater = SerializedSuspendableExecutionContext(10)(context) + + def get(): T = ref.single.get + + def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) }) + + def send(f: T => T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) }) + + def sendOff(f: T => T)(implicit ec: ExecutionContext): Unit = + withinTransaction(new Runnable { + def run = + try updater.suspend() + finally ec.execute(new Runnable { + def run = + try ref.single.transform(f) + finally updater.resume() + }) + }) + + def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue }) + + def alter(f: T => T): Future[T] = doAlter(ref.single.transformAndGet(f)) + + def alterOff(f: T => T)(implicit ec: ExecutionContext): Future[T] = { + val result = Promise[T]() + withinTransaction(new Runnable { + def run = { + updater.suspend() + result.completeWith( + Future(try ref.single.transformAndGet(f) + finally updater.resume())) + } + }) + result.future + } + + /** + * Internal helper method + */ + private final def withinTransaction(run: Runnable): Unit = { + Txn.findCurrent match { + case Some(txn) => Txn.afterCommit(_ => updater.execute(run))(txn) + case _ => updater.execute(run) + } + } + + /** + * Internal helper method + */ + private final def doAlter(f: => T): Future[T] = { + Txn.findCurrent match { + case Some(txn) => + val result = Promise[T]() + Txn.afterCommit(status => result.completeWith(Future(f)(updater)))(txn) + result.future + case _ => Future(f)(updater) + } + } + + def future(): Future[T] = Future(ref.single.get)(updater) + + def map[B](f: T => B): Agent[B] = Agent(f(get))(updater) + + def flatMap[B](f: T => Agent[B]): Agent[B] = f(get) + + def foreach[U](f: T => U): Unit = f(get) + } +} + +/** + * The Agent class was inspired by agents in Clojure. + * + * Agents provide asynchronous change of individual locations. Agents + * are bound to a single storage location for their lifetime, and only + * allow mutation of that location (to a new state) to occur as a result + * of an action. Update actions are functions that are asynchronously + * applied to the Agent's state and whose return value becomes the + * Agent's new state. The state of an Agent should be immutable. + * + * While updates to Agents are asynchronous, the state of an Agent is + * always immediately available for reading by any thread (using ''get'' + * or ''apply'') without any messages. + * + * Agents are reactive. The update actions of all Agents get interleaved + * amongst threads in a thread pool. At any point in time, at most one + * ''send'' action for each Agent is being executed. Actions dispatched to + * an agent from another thread will occur in the order they were sent, + * potentially interleaved with actions dispatched to the same agent from + * other sources. + * + * Example of usage: + * {{{ + * val agent = Agent(5) + * + * agent send (_ * 2) + * + * ... + * + * val result = agent() + * // use result ... + * + * }}} + * + * Agent is also monadic, which means that you can compose operations using + * for-comprehensions. In monadic usage the original agents are not touched + * but new agents are created. So the old values (agents) are still available + * as-is. They are so-called 'persistent'. + * + * Example of monadic usage: + * {{{ + * val agent1 = Agent(3) + * val agent2 = Agent(5) + * + * for (value <- agent1) { + * result = value + 1 + * } + * + * val agent3 = for (value <- agent1) yield value + 1 + * + * val agent4 = for { + * value1 <- agent1 + * value2 <- agent2 + * } yield value1 + value2 + * + * }}} + * + * ==DEPRECATED STM SUPPORT== + * + * Agents participating in enclosing STM transaction is a deprecated feature in 2.3. + * + * If an Agent is used within an enclosing transaction, then it will + * participate in that transaction. Agents are integrated with the STM - + * any dispatches made in a transaction are held until that transaction + * commits, and are discarded if it is retried or aborted. + * + * @deprecated Agents are deprecated and scheduled for removal in the next major version, use Actors instead. + */ +@deprecated( + "Agents are deprecated and scheduled for removal in the next major version, use Actors instead.", + since = "2.5.0") +abstract class Agent[T] { + + /** + * Java API: Read the internal state of the agent. + */ + def get(): T + + /** + * Read the internal state of the agent. + */ + def apply(): T = get + + /** + * Dispatch a new value for the internal state. Behaves the same + * as sending a function (x => newValue). + */ + def send(newValue: T): Unit + + /** + * Dispatch a function to update the internal state. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def send(f: T => T): Unit + + /** + * Dispatch a function to update the internal state but on its own thread. + * This does not use the reactive thread pool and can be used for long-running + * or blocking operations. Dispatches using either `sendOff` or `send` will + * still be executed in order. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def sendOff(f: T => T)(implicit ec: ExecutionContext): Unit + + /** + * Dispatch an update to the internal state, and return a Future where + * that new state can be obtained. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def alter(newValue: T): Future[T] + + /** + * Dispatch a function to update the internal state, and return a Future where + * that new state can be obtained. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def alter(f: T => T): Future[T] + + /** + * Dispatch a function to update the internal state but on its own thread, + * and return a Future where that new state can be obtained. + * This does not use the reactive thread pool and can be used for long-running + * or blocking operations. Dispatches using either `alterOff` or `alter` will + * still be executed in order. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def alterOff(f: T => T)(implicit ec: ExecutionContext): Future[T] + + /** + * A future to the current value that will be completed after any currently + * queued updates. + */ + def future(): Future[T] + + /** + * Map this agent to a new agent, applying the function to the internal state. + * Does not change the value of this agent. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def map[B](f: T => B): Agent[B] + + /** + * Flatmap this agent to a new agent, applying the function to the internal state. + * Does not change the value of this agent. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def flatMap[B](f: T => Agent[B]): Agent[B] + + /** + * Applies the function to the internal state. Does not change the value of this agent. + * In Java, pass in an instance of `akka.dispatch.Foreach`. + */ + def foreach[U](f: T => U): Unit +} diff --git a/src/main/scala/com/comcast/xfinity/sirius/admin/SiriusMonitorReader.scala b/src/main/scala/com/comcast/xfinity/sirius/admin/SiriusMonitorReader.scala index ed5e1fdc..e97bfb89 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/admin/SiriusMonitorReader.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/admin/SiriusMonitorReader.scala @@ -16,7 +16,7 @@ package com.comcast.xfinity.sirius.admin import com.comcast.xfinity.sirius.api.SiriusConfiguration -import collection.JavaConversions.asScalaSet +import scala.collection.JavaConverters._ import javax.management.{ObjectName, MBeanServer} /** @@ -54,7 +54,7 @@ class SiriusMonitorReader { } private def readMonitors(mBeanServer: MBeanServer) = { - val objectNames = asScalaSet(mBeanServer.queryNames(siriusDomainQuery, null)) + val objectNames = mBeanServer.queryNames(siriusDomainQuery, null).asScala objectNames.foldLeft(Map[String, Map[String, Any]]()) ( (monitorsMap, objectName) => diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala index c80f1c24..507e5158 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala @@ -155,10 +155,10 @@ object SiriusFactory { private def createHostPortConfig(siriusConfig: SiriusConfiguration): Config = { val configMap = new JHashMap[String, Any]() val sslEnabled = siriusConfig.getProp(SiriusConfiguration.ENABLE_SSL, false) - val transportPrefix = if (sslEnabled) "akka.remote.netty.ssl" else "akka.remote.netty.tcp" + val transportPrefix = if (sslEnabled) "akka.remote.classic.netty.ssl" else "akka.remote.classic.netty.tcp" traceLog.info(s"AKKA using transport: $transportPrefix") - configMap.put("akka.remote.enabled-transports", List(transportPrefix).asJava) + configMap.put("akka.remote.classic.enabled-transports", List(transportPrefix).asJava) configMap.put(s"$transportPrefix.hostname", siriusConfig.getProp(SiriusConfiguration.HOST, InetAddress.getLocalHost.getHostName)) configMap.put(s"$transportPrefix.port", siriusConfig.getProp(SiriusConfiguration.PORT, 2552)) diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala index bb892567..157afae5 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala @@ -16,25 +16,27 @@ package com.comcast.xfinity.sirius.api.impl import bridge.PaxosStateBridge -import com.comcast.xfinity.sirius.api.impl.membership.{MembershipHelper, MembershipActor} +import com.comcast.xfinity.sirius.api.impl.membership.{MembershipActor, MembershipHelper} import paxos.PaxosMessages.PaxosMessage import akka.actor._ import akka.agent.Agent import com.comcast.xfinity.sirius.api.impl.paxos.Replica + import scala.concurrent.duration._ import paxos.PaxosSupervisor import state.SiriusPersistenceActor.LogQuery import state.StateSup import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import akka.event.Logging -import com.comcast.xfinity.sirius.api.{SiriusConfiguration, RequestHandler} +import com.comcast.xfinity.sirius.api.{RequestHandler, SiriusConfiguration} import status.StatusWorker import com.comcast.xfinity.sirius.util.AkkaExternalAddressResolver import status.StatusWorker.StatusQuery import com.comcast.xfinity.sirius.uberstore.CompactionManager import com.comcast.xfinity.sirius.uberstore.CompactionManager.CompactionMessage -import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.{ChildProvider, CheckPaxosMembership} +import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.{CheckPaxosMembership, ChildProvider} import com.comcast.xfinity.sirius.api.impl.membership.MembershipActor.MembershipMessage + import scala.language.postfixOps object SiriusSupervisor { diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderHelper.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderHelper.scala index 121f8f94..a5ac61a4 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderHelper.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderHelper.scala @@ -54,7 +54,7 @@ class LeaderHelper { pvals => pvals.maxBy(_.ballot) ) - val slotsToCommands = maxPValBySlot.mapValues(_.proposedCommand) + val slotsToCommands = maxPValBySlot.mapValues(_.proposedCommand).toMap RichJTreeMap(slotsToCommands) } diff --git a/src/main/scala/com/comcast/xfinity/sirius/util/RichJTreeMap.scala b/src/main/scala/com/comcast/xfinity/sirius/util/RichJTreeMap.scala index ef2f00cf..cd970b98 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/util/RichJTreeMap.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/util/RichJTreeMap.scala @@ -15,8 +15,9 @@ */ package com.comcast.xfinity.sirius.util -import collection.JavaConversions.asScalaIterator import java.util.{TreeMap => JTreeMap} + +import scala.collection.JavaConverters._ import scala.util.control.Breaks._ object RichJTreeMap { @@ -58,7 +59,7 @@ class RichJTreeMap[K, V] private extends JTreeMap[K, V] { * @param fun function to execute on each entry */ def foreach(fun: (K, V) => Unit) { - asScalaIterator(entrySet.iterator).foreach( + entrySet.iterator.asScala.foreach( entry => fun(entry.getKey, entry.getValue) ) } diff --git a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala index 52bb4060..72dc8539 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala @@ -16,7 +16,7 @@ package com.comcast.xfinity.sirius.writeaheadlog import com.comcast.xfinity.sirius.api.impl.OrderedEvent -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import java.util.{TreeMap => JTreeMap} import com.comcast.xfinity.sirius.api.SiriusConfiguration @@ -105,7 +105,8 @@ class CachedSiriusLog(log: SiriusLog, maxCacheSize: Int) extends SiriusLog { */ private def foldLeftRangeCached[T](startSeq: Long, endSeq: Long) (acc0: T)(foldFun: (T, OrderedEvent) => T): T = writeCache.synchronized { - writeCache.subMap(startSeq, true, endSeq, true).values.foldLeft(acc0)(foldFun) + writeCache.subMap(startSeq, true, endSeq, true) + .values.asScala.foldLeft(acc0)(foldFun) } def getNextSeq = log.getNextSeq diff --git a/src/test/resources/sirius-akka-base.conf b/src/test/resources/sirius-akka-base.conf index c3261c12..8b7e70d7 100644 --- a/src/test/resources/sirius-akka-base.conf +++ b/src/test/resources/sirius-akka-base.conf @@ -1,26 +1,30 @@ akka { - //loggers = ["com.comcast.xfinity.sirius.util.Slf4jEventHandlerWithRemotingSilencer"] loglevel = "ERROR" stdout-loglevel = "ERROR" log-config-on-startup = off actor { - provider = "akka.remote.RemoteActorRefProvider" + provider = "cluster" } remote { - netty.tcp = ${common-netty-settings} - netty.ssl = ${common-netty-settings} + artery { + enabled = off + } + classic { + enabled-transports = ["akka.remote.classic.netty.tcp"] + netty.tcp = ${common-netty-settings} + netty.ssl = ${common-netty-settings} - netty.ssl.security { - protocol = "TLSv1" - random-number-generator = "" - enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA] + netty.ssl.security { + protocol = "TLSv1" + random-number-generator = "" + enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA] + } } } -} -common-netty-settings { + common-netty-settings { # defaults to InetAddress.getLocalHost.getHostAddress hostname = "" port = 2552 @@ -34,4 +38,5 @@ common-netty-settings { # that clearly isn't online backoff-timeout = 250ms -} + } +} \ No newline at end of file diff --git a/src/test/scala/com/comcast/xfinity/sirius/admin/ObjectNameHelperTest.scala b/src/test/scala/com/comcast/xfinity/sirius/admin/ObjectNameHelperTest.scala index d7386d25..732c453b 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/admin/ObjectNameHelperTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/admin/ObjectNameHelperTest.scala @@ -76,8 +76,10 @@ class ObjectNameHelperTest extends NiceTest with BeforeAndAfterAll { val configMap = new JHashMap[String, Any]() configMap.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider") configMap.put("akka.remote.transport", "akka.remote.netty.NettyRemoteTransport") - configMap.put("akka.remote.netty.tcp.hostname", "127.0.0.1") - configMap.put("akka.remote.netty.tcp.port", 2556) + configMap.put("akka.remote.artery.enabled", "false") + configMap.put("akka.remote.classic.transport", "akka.remote.netty.NettyRemoteTransport") + configMap.put("akka.remote.classic.netty.tcp.hostname", "127.0.0.1") + configMap.put("akka.remote.classic.netty.tcp.port", 2556) // this makes intellij not get mad val akkaConfig = configMap.asInstanceOf[java.util.Map[String, _ <: AnyRef]] implicit val actorSystem = ActorSystem("test-system", ConfigFactory.parseMap(akkaConfig)) diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala index a0a2d30e..7305d57e 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala @@ -37,6 +37,8 @@ import com.comcast.xfinity.sirius.api.impl.SiriusImplTestCompanion.ProbeWrapper import scala.concurrent.Await +import scala.language.postfixOps + object SiriusImplTestCompanion { object ProbeWrapper{ diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/PaxosStateBridgeTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/PaxosStateBridgeTest.scala index 4318a99e..d1e2bef1 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/PaxosStateBridgeTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/PaxosStateBridgeTest.scala @@ -35,6 +35,7 @@ import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{EmptySu class PaxosStateBridgeTest extends NiceTest with BeforeAndAfterAll with TimedTest { implicit val actorSystem = ActorSystem("PaxosStateBridgeTest") + import scala.language.postfixOps val config = new SiriusConfiguration val defaultMockMembershipHelper = mock[MembershipHelper] diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/membership/MembershipActorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/membership/MembershipActorTest.scala index f06b2508..f2be29cf 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/membership/MembershipActorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/membership/MembershipActorTest.scala @@ -32,6 +32,8 @@ import org.mockito.ArgumentCaptor import com.comcast.xfinity.sirius.api.impl.membership.MembershipActor.{CheckMembershipHealth, MembershipInfoMBean} import com.comcast.xfinity.sirius.util.AkkaExternalAddressResolver +import scala.language.postfixOps + class MembershipActorTest extends NiceTest with TimedTest { val pingInterval = 120 seconds diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderPingerTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderPingerTest.scala index 28ec824a..e1278b6d 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderPingerTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderPingerTest.scala @@ -17,14 +17,17 @@ package com.comcast.xfinity.sirius.api.impl.paxos import com.comcast.xfinity.sirius.NiceTest import akka.testkit.{TestActorRef, TestProbe} + import scala.concurrent.duration._ -import com.comcast.xfinity.sirius.api.impl.paxos.LeaderPinger.{Pong, Ping} +import com.comcast.xfinity.sirius.api.impl.paxos.LeaderPinger.{Ping, Pong} import akka.actor._ -import com.comcast.xfinity.sirius.api.impl.paxos.LeaderWatcher.{LeaderPong, LeaderGone, DifferentLeader} +import com.comcast.xfinity.sirius.api.impl.paxos.LeaderWatcher.{DifferentLeader, LeaderGone, LeaderPong} import com.comcast.xfinity.sirius.api.impl.paxos.LeaderWatcher.LeaderPong import com.comcast.xfinity.sirius.api.impl.paxos.LeaderWatcher.DifferentLeader import com.comcast.xfinity.sirius.api.impl.paxos.LeaderPinger.Pong + import scala.Some +import scala.language.postfixOps class LeaderPingerTest extends NiceTest { diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderTest.scala index 7798936c..17bff952 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/LeaderTest.scala @@ -17,13 +17,14 @@ package com.comcast.xfinity.sirius.api.impl.paxos import org.scalatest.BeforeAndAfterAll import com.comcast.xfinity.sirius.{NiceTest, TimedTest} -import akka.testkit.{TestProbe, TestActorRef} +import akka.testkit.{TestActorRef, TestProbe} import org.mockito.Mockito._ import org.mockito.Matchers._ import akka.actor._ + import collection.immutable.SortedMap -import java.util.{TreeMap => JTreeMap} -import scala.collection.JavaConversions._ +import java.util.{TreeMap => JTreeMap, SortedMap => JSortedMap} + import com.comcast.xfinity.sirius.api.impl.paxos.PaxosMessages._ import com.comcast.xfinity.sirius.api.impl.Delete import com.comcast.xfinity.sirius.api.impl.paxos.LeaderWatcher.{Close, LeaderGone} @@ -32,8 +33,9 @@ import com.comcast.xfinity.sirius.api.impl.paxos.Leader._ import com.comcast.xfinity.sirius.api.SiriusConfiguration import com.comcast.xfinity.sirius.api.impl.membership.MembershipHelper.ClusterInfo import com.comcast.xfinity.sirius.api.impl.membership.MembershipHelper -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.FiniteDuration +import scala.collection.JavaConverters._ class LeaderTest extends NiceTest with TimedTest with BeforeAndAfterAll { implicit val actorSystem = ActorSystem("LeaderTest") @@ -204,7 +206,7 @@ class LeaderTest extends NiceTest with TimedTest with BeforeAndAfterAll { leader ! Propose(slotNum, command) - assert(new JTreeMap[Long, Command](SortedMap(slotNum -> command)) === leader.underlyingActor.proposals) + assert(new JTreeMap[Long, Command](SortedMap(slotNum -> command).toMap.asJava) === leader.underlyingActor.proposals) assert(false === commanderStarted) } @@ -252,7 +254,7 @@ class LeaderTest extends NiceTest with TimedTest with BeforeAndAfterAll { leader ! Propose(slotNum, command) - assert(new JTreeMap[Long, Command](SortedMap(slotNum -> command)) === leader.underlyingActor.proposals) + assert(new JTreeMap[Long, Command](SortedMap(slotNum -> command).toMap.asJava) === leader.underlyingActor.proposals) assert(true === commanderStarted) } } diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/PaxosITest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/PaxosITest.scala index ed219e31..59794722 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/PaxosITest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/PaxosITest.scala @@ -18,17 +18,22 @@ package com.comcast.xfinity.sirius.api.impl.paxos import com.comcast.xfinity.sirius.NiceTest import akka.agent.Agent import akka.actor.{ActorRef, ActorSystem} -import com.comcast.xfinity.sirius.api.impl.{Delete, Put, NonCommutativeSiriusRequest} +import com.comcast.xfinity.sirius.api.impl.{Delete, NonCommutativeSiriusRequest, Put} + import scala.concurrent.duration._ import org.scalatest.BeforeAndAfterAll import akka.testkit.{TestLatch, TestProbe} + import scala.concurrent.Await import com.comcast.xfinity.sirius.api.impl.paxos.PaxosMessages.Decision import com.comcast.xfinity.sirius.api.SiriusConfiguration import com.comcast.xfinity.sirius.api.impl.membership.MembershipHelper + import scala.concurrent.ExecutionContext.Implicits.global import com.comcast.xfinity.sirius.util.AkkaExternalAddressResolver +import scala.language.postfixOps + object PaxosITest { class TestNode(membership: MembershipHelper, decisionLatch: TestLatch)(implicit as: ActorSystem) { diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/PaxosSupervisorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/PaxosSupervisorTest.scala index 7e26e14f..fb340ac4 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/PaxosSupervisorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/PaxosSupervisorTest.scala @@ -18,13 +18,16 @@ package com.comcast.xfinity.sirius.api.impl.paxos import org.scalatest.BeforeAndAfterAll import com.comcast.xfinity.sirius.NiceTest import akka.testkit.{TestActorRef, TestProbe} -import akka.actor.{ActorRef, ActorContext, Props, ActorSystem} +import akka.actor.{ActorContext, ActorRef, ActorSystem, Props} import com.comcast.xfinity.sirius.api.impl.paxos.PaxosMessages._ + import scala.concurrent.duration._ -import com.comcast.xfinity.sirius.api.impl.{Put, Delete} +import com.comcast.xfinity.sirius.api.impl.{Delete, Put} import com.comcast.xfinity.sirius.api.SiriusConfiguration import com.comcast.xfinity.sirius.util.AkkaExternalAddressResolver +import scala.language.postfixOps + class PaxosSupervisorTest extends NiceTest with BeforeAndAfterAll { implicit val actorSystem = ActorSystem("PaxosSupTest") diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/ReplicaTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/ReplicaTest.scala index 52cac19b..7bc1e36e 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/ReplicaTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/paxos/ReplicaTest.scala @@ -27,10 +27,10 @@ import akka.actor.ActorSystem import akka.agent.Agent import akka.testkit.TestActorRef import akka.testkit.TestProbe -import scala.collection.JavaConversions._ import collection.SortedMap import com.comcast.xfinity.sirius.api.impl.paxos.Replica.Reap import com.comcast.xfinity.sirius.api.SiriusConfiguration +import scala.collection.JavaConverters._ class ReplicaTest extends NiceTest with BeforeAndAfterAll { @@ -228,7 +228,7 @@ class ReplicaTest extends NiceTest with BeforeAndAfterAll { 1L -> Command(null, now - 15000, Delete("1")), 2L -> Command(null, now, Delete("2")), 3L -> Command(null, now - 12000, Delete("3")) - )) + ).toMap.asJava) replica ! Reap diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/status/StatusWorkerTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/status/StatusWorkerTest.scala index 81ab94bd..86e990c9 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/status/StatusWorkerTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/status/StatusWorkerTest.scala @@ -21,9 +21,11 @@ import akka.actor.ActorSystem import com.comcast.xfinity.sirius.admin.SiriusMonitorReader import com.comcast.xfinity.sirius.api.SiriusConfiguration import org.mockito.Mockito._ -import akka.testkit.{TestProbe, TestActorRef} -import com.comcast.xfinity.sirius.api.impl.status.NodeStats.{NodeConfig, MonitorStats, MemoryUsage, FullNodeStatus} +import akka.testkit.{TestActorRef, TestProbe} +import com.comcast.xfinity.sirius.api.impl.status.NodeStats.{FullNodeStatus, MemoryUsage, MonitorStats, NodeConfig} + import scala.concurrent.duration._ +import scala.language.postfixOps class StatusWorkerTest extends NiceTest with BeforeAndAfterAll { diff --git a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala index bb956037..0ea7c112 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala @@ -43,7 +43,7 @@ object FullSystemITest { def getProtocol(sslEnabled: Boolean): String = sslEnabled match { - case true => "akka.ssl.tcp" + case true => "akka.ssl" case false => "akka.tcp" } diff --git a/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentTest.scala b/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentTest.scala index 64dd55ac..a8aea9db 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentTest.scala @@ -26,7 +26,6 @@ import java.io.{File => JFile} import better.files.File import org.scalatest.BeforeAndAfterAll -import scala.collection.immutable.StringOps import com.comcast.xfinity.sirius.api.impl.{Delete, OrderedEvent, Put} object SegmentTest { @@ -171,14 +170,14 @@ class SegmentTest extends NiceTest with BeforeAndAfterAll { } it ("Should return the correct number of keys if the set of keys is not empty. With Puts only"){ val underTest = buildSegment(tempDir, "hasKeys-Put") - val newByteArray = new StringOps("data").getBytes + val newByteArray = "data".getBytes underTest.writeEntry(OrderedEvent(1, 678, Put("yarr",newByteArray))) underTest.writeEntry(OrderedEvent(2, 1000, Put("secondYarr",newByteArray))) assert(Set("yarr", "secondYarr") === underTest.keys) } it ("Should return the correct number of keys if the set of keys is not empty. With Puts & Deletes"){ val underTest = buildSegment(tempDir, "hasKeys-All") - val newByteArray = new StringOps("data").getBytes + val newByteArray = "data".getBytes underTest.writeEntry(OrderedEvent(1, 678, Put("yarr",newByteArray))) underTest.writeEntry(OrderedEvent(2, 1000, Put("secondYarr",newByteArray))) underTest.writeEntry(OrderedEvent(3, 1100, Delete("thirdYarr"))) @@ -187,7 +186,7 @@ class SegmentTest extends NiceTest with BeforeAndAfterAll { } it ("should return a unique number of keys if the set of keys is not empty and has duplicates"){ val underTest = buildSegment(tempDir, "hasUniqueKeys") - val newByteArray = new StringOps("data").getBytes + val newByteArray = "data".getBytes underTest.writeEntry(OrderedEvent(1, 678, Delete("yarr"))) underTest.writeEntry(OrderedEvent(2, 1200, Delete("secondYarr"))) underTest.writeEntry(OrderedEvent(3, 1300, Delete("secondYarr"))) diff --git a/src/test/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndexBinaryFileOpsTest.scala b/src/test/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndexBinaryFileOpsTest.scala index fb71caf8..a7f93d85 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndexBinaryFileOpsTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndexBinaryFileOpsTest.scala @@ -18,18 +18,23 @@ package com.comcast.xfinity.sirius.uberstore.seqindex import com.comcast.xfinity.sirius.NiceTest import java.io.RandomAccessFile + import org.mockito.stubbing.Answer import org.mockito.invocation.InvocationOnMock import org.mockito.ArgumentCaptor import org.mockito.Mockito._ -import java.nio.{ByteOrder, ByteBuffer} +import java.nio.{ByteBuffer, ByteOrder} import java.util.Arrays + import org.mockito.Matchers.{any, anyInt} + import collection.immutable.SortedMap import java.util.{TreeMap => JTreeMap} -import collection.JavaConversions._ + import com.comcast.xfinity.sirius.uberstore.common.Checksummer +import scala.collection.JavaConverters._ + object SeqIndexBinaryFileOpsTest { def mockReadAnswerForBytes(bytes: Array[Byte], toReturn: Int) = @@ -95,7 +100,7 @@ class SeqIndexBinaryFileOpsTest extends NiceTest { doAnswer(chunkAnswer).when(mockHandle).read(any[Array[Byte]]) val actual = underTest.loadIndex(mockHandle) - val expected = new JTreeMap[Long, Long](SortedMap(1L -> 2L, 2L -> 3L)) + val expected = new JTreeMap[Long, Long](SortedMap(1L -> 2L, 2L -> 3L).toMap.asJava) assert(actual === expected) } @@ -145,7 +150,7 @@ class SeqIndexBinaryFileOpsTest extends NiceTest { doAnswer(chunk1Answer).doAnswer(chunk2Answer).when(mockHandle).read(any[Array[Byte]]) val actual = underTest.loadIndex(mockHandle) - val expected = new JTreeMap[Long, Long](SortedMap(1L -> 2L, 2L -> 3L, 3L -> 4L)) + val expected = new JTreeMap[Long, Long](SortedMap(1L -> 2L, 2L -> 3L, 3L -> 4L).asJava) assert(actual === expected) } diff --git a/src/test/scala/com/comcast/xfinity/sirius/util/AkkaExternalAddressResolverITest.scala b/src/test/scala/com/comcast/xfinity/sirius/util/AkkaExternalAddressResolverITest.scala index 97dead08..e97defdb 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/util/AkkaExternalAddressResolverITest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/util/AkkaExternalAddressResolverITest.scala @@ -44,8 +44,9 @@ class AkkaExternalAddressResolverITest extends NiceTest with BeforeAndAfterAll { val configMap = new JHashMap[String, Any]() configMap.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider") configMap.put("akka.remote.transport", "akka.remote.netty.NettyRemoteTransport") - configMap.put("akka.remote.netty.tcp.hostname", "127.0.0.1") - configMap.put("akka.remote.netty.tcp.port", 2559) + configMap.put("akka.remote.artery.enabled", "false") + configMap.put("akka.remote.classic.netty.tcp.hostname", "127.0.0.1") + configMap.put("akka.remote.classic.netty.tcp.port", 2559) // this makes intellij not get mad val config = configMap.asInstanceOf[java.util.Map[String, _ <: AnyRef]] val siriusConfig = new SiriusConfiguration