Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Bump Akka to 2.6 with Scala 2.13 #152

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ dist: trusty
jdk: oraclejdk8
language: scala
scala:
- 2.12.8 # keep build.sbt crossScalaVersions synced with these versions, top version should be newest
- 2.11.8
- 2.13.4 # keep build.sbt crossScalaVersions synced with these versions, top version should be newest
- 2.12.12
stages:
- name: test
- name: release
Expand All @@ -12,7 +12,7 @@ jobs:
include:
- stage: release
script: .travis/publish.sh
- scala: "2.11.8"
- scala: "2.12.12"
script: .travis/publish.sh

before_install:
Expand Down
20 changes: 11 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import java.lang.{Runtime => JRuntime}

name := "sirius"

version := "2.2.1"
version := "2.3.0"

scalaVersion := "2.12.8"
crossScalaVersions := Seq("2.11.8", "2.12.8") // NOTE: keep sync'd with .travis.yml
scalaVersion := "2.13.4"
crossScalaVersions := Seq("2.12.13", "2.13.4") // NOTE: keep sync'd with .travis.yml

organization := "com.comcast"

Expand All @@ -29,19 +29,21 @@ resolvers += "Typesafe Public Repo" at "https://repo.typesafe.com/typesafe/relea
resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"

libraryDependencies ++= {
val akkaV = "2.4.20"
val akkaV = "2.6.11"

Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-remote" % akkaV,
"com.typesafe.akka" %% "akka-cluster" % akkaV,
"com.typesafe.akka" %% "akka-slf4j" % akkaV,
"com.typesafe.akka" %% "akka-agent" % akkaV,
"org.slf4j" % "slf4j-api" % "1.7.7",
"com.github.pathikrit" %% "better-files" % "3.8.0",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"io.netty" % "netty" % "3.10.6.Final",
"org.scala-stm" %% "scala-stm" % "0.11.0",
"org.slf4j" % "slf4j-api" % "1.7.30",
"com.github.pathikrit" %% "better-files" % "3.9.1",
"org.scalatest" %% "scalatest" % "3.0.8" % "test",
"org.mockito" % "mockito-core" % "1.10.19" % "test",
"junit" % "junit" % "4.12" % "test",
"org.slf4j" % "slf4j-log4j12" % "1.7.7" % "test",
"org.slf4j" % "slf4j-log4j12" % "1.7.30" % "test",
"log4j" % "log4j" % "1.2.17" % "test",
"com.typesafe.akka" %% "akka-testkit" % akkaV % "test"
)
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.3.7
sbt.version=1.4.6
56 changes: 28 additions & 28 deletions src/main/resources/sirius-akka-base.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,40 @@ akka {
log-config-on-startup = off

actor {
provider = "akka.remote.RemoteActorRefProvider"
provider = "cluster"
}

remote {
netty.tcp = ${common-netty-settings}
netty.ssl = ${common-netty-settings}
netty.ssl.security {
protocol = "TLSv1"
# random-number-generator defaults to SecureRandom
enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA]
artery {
enabled = off
}
gate-invalid-addresses-for = 5 s
quarantine-systems-for = off
transport-failure-detector {
acceptable-heartbeat-pause = 20 s
}
watch-failure-detector {
acceptable-heartbeat-pause = 10 s
classic {
enabled-transports = ["akka.remote.classic.netty.tcp"]
netty.tcp = ${common-netty-settings}
netty.ssl = ${common-netty-settings}

netty.ssl.security {
protocol = "TLSv1"
random-number-generator = ""
enabled-algorithms = [TLS_RSA_WITH_AES_128_CBC_SHA]
}
}
retry-gate-closed-for = 0.2 s
}
}



common-netty-settings {
# defaults to InetAddress.getLocalHost.getHostAddress
hostname = ""
port = 2552

# this is probably too liberal
connection-timeout = 2s

# how long to try to send a message before deciding
# it ain't happening, this is important to make sure
# we don't spend too long trying to talk to a node
# that clearly isn't online
backoff-timeout = 250ms
}
# defaults to InetAddress.getLocalHost.getHostAddress
hostname = ""
port = 2552

# this is probably too liberal
connection-timeout = 2s

# how long to try to send a message before deciding
# it ain't happening, this is important to make sure
# we don't spend too long trying to talk to a node
# that clearly isn't online
backoff-timeout = 250ms
}
261 changes: 261 additions & 0 deletions src/main/scala/akka/agent/Agent.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.agent

import scala.concurrent.stm._
import scala.concurrent.{ ExecutionContext, Future, Promise }
import akka.util.SerializedSuspendableExecutionContext

@deprecated(
"Agents are deprecated and scheduled for removal in the next major version, use Actors instead.",
since = "2.5.0")
object Agent {

/**
* Factory method for creating an Agent.
*/
@deprecated(
"Agents are deprecated and scheduled for removal in the next major version, use Actors instead.",
since = "2.5.0")
def apply[T](initialValue: T)(implicit context: ExecutionContext): Agent[T] = new SecretAgent(initialValue, context)

/**
* Java API: Factory method for creating an Agent.
* @deprecated Agents are deprecated and scheduled for removal in the next major version, use Actors instead.i
*/
@Deprecated
@deprecated(
"Agents are deprecated and scheduled for removal in the next major version, use Actors instead.",
since = "2.5.0")
def create[T](initialValue: T, context: ExecutionContext): Agent[T] = Agent(initialValue)(context)

/**
* Default agent implementation.
*/
private final class SecretAgent[T](initialValue: T, context: ExecutionContext) extends Agent[T] {
private val ref = Ref(initialValue)
private val updater = SerializedSuspendableExecutionContext(10)(context)

def get(): T = ref.single.get

def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) })

def send(f: T => T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) })

def sendOff(f: T => T)(implicit ec: ExecutionContext): Unit =
withinTransaction(new Runnable {
def run =
try updater.suspend()
finally ec.execute(new Runnable {
def run =
try ref.single.transform(f)
finally updater.resume()
})
})

def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue })

def alter(f: T => T): Future[T] = doAlter(ref.single.transformAndGet(f))

def alterOff(f: T => T)(implicit ec: ExecutionContext): Future[T] = {
val result = Promise[T]()
withinTransaction(new Runnable {
def run = {
updater.suspend()
result.completeWith(
Future(try ref.single.transformAndGet(f)
finally updater.resume()))
}
})
result.future
}

/**
* Internal helper method
*/
private final def withinTransaction(run: Runnable): Unit = {
Txn.findCurrent match {
case Some(txn) => Txn.afterCommit(_ => updater.execute(run))(txn)
case _ => updater.execute(run)
}
}

/**
* Internal helper method
*/
private final def doAlter(f: => T): Future[T] = {
Txn.findCurrent match {
case Some(txn) =>
val result = Promise[T]()
Txn.afterCommit(status => result.completeWith(Future(f)(updater)))(txn)
result.future
case _ => Future(f)(updater)
}
}

def future(): Future[T] = Future(ref.single.get)(updater)

def map[B](f: T => B): Agent[B] = Agent(f(get))(updater)

def flatMap[B](f: T => Agent[B]): Agent[B] = f(get)

def foreach[U](f: T => U): Unit = f(get)
}
}

/**
* The Agent class was inspired by agents in Clojure.
*
* Agents provide asynchronous change of individual locations. Agents
* are bound to a single storage location for their lifetime, and only
* allow mutation of that location (to a new state) to occur as a result
* of an action. Update actions are functions that are asynchronously
* applied to the Agent's state and whose return value becomes the
* Agent's new state. The state of an Agent should be immutable.
*
* While updates to Agents are asynchronous, the state of an Agent is
* always immediately available for reading by any thread (using ''get''
* or ''apply'') without any messages.
*
* Agents are reactive. The update actions of all Agents get interleaved
* amongst threads in a thread pool. At any point in time, at most one
* ''send'' action for each Agent is being executed. Actions dispatched to
* an agent from another thread will occur in the order they were sent,
* potentially interleaved with actions dispatched to the same agent from
* other sources.
*
* Example of usage:
* {{{
* val agent = Agent(5)
*
* agent send (_ * 2)
*
* ...
*
* val result = agent()
* // use result ...
*
* }}}
*
* Agent is also monadic, which means that you can compose operations using
* for-comprehensions. In monadic usage the original agents are not touched
* but new agents are created. So the old values (agents) are still available
* as-is. They are so-called 'persistent'.
*
* Example of monadic usage:
* {{{
* val agent1 = Agent(3)
* val agent2 = Agent(5)
*
* for (value <- agent1) {
* result = value + 1
* }
*
* val agent3 = for (value <- agent1) yield value + 1
*
* val agent4 = for {
* value1 <- agent1
* value2 <- agent2
* } yield value1 + value2
*
* }}}
*
* ==DEPRECATED STM SUPPORT==
*
* Agents participating in enclosing STM transaction is a deprecated feature in 2.3.
*
* If an Agent is used within an enclosing transaction, then it will
* participate in that transaction. Agents are integrated with the STM -
* any dispatches made in a transaction are held until that transaction
* commits, and are discarded if it is retried or aborted.
*
* @deprecated Agents are deprecated and scheduled for removal in the next major version, use Actors instead.
*/
@deprecated(
"Agents are deprecated and scheduled for removal in the next major version, use Actors instead.",
since = "2.5.0")
abstract class Agent[T] {

/**
* Java API: Read the internal state of the agent.
*/
def get(): T

/**
* Read the internal state of the agent.
*/
def apply(): T = get

/**
* Dispatch a new value for the internal state. Behaves the same
* as sending a function (x => newValue).
*/
def send(newValue: T): Unit

/**
* Dispatch a function to update the internal state.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def send(f: T => T): Unit

/**
* Dispatch a function to update the internal state but on its own thread.
* This does not use the reactive thread pool and can be used for long-running
* or blocking operations. Dispatches using either `sendOff` or `send` will
* still be executed in order.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def sendOff(f: T => T)(implicit ec: ExecutionContext): Unit

/**
* Dispatch an update to the internal state, and return a Future where
* that new state can be obtained.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alter(newValue: T): Future[T]

/**
* Dispatch a function to update the internal state, and return a Future where
* that new state can be obtained.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alter(f: T => T): Future[T]

/**
* Dispatch a function to update the internal state but on its own thread,
* and return a Future where that new state can be obtained.
* This does not use the reactive thread pool and can be used for long-running
* or blocking operations. Dispatches using either `alterOff` or `alter` will
* still be executed in order.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alterOff(f: T => T)(implicit ec: ExecutionContext): Future[T]

/**
* A future to the current value that will be completed after any currently
* queued updates.
*/
def future(): Future[T]

/**
* Map this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def map[B](f: T => B): Agent[B]

/**
* Flatmap this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def flatMap[B](f: T => Agent[B]): Agent[B]

/**
* Applies the function to the internal state. Does not change the value of this agent.
* In Java, pass in an instance of `akka.dispatch.Foreach`.
*/
def foreach[U](f: T => U): Unit
}
Loading