Skip to content

Commit

Permalink
Remove stash when disconnected
Browse files Browse the repository at this point in the history
  • Loading branch information
Damien Levin committed May 22, 2015
1 parent 2413a0e commit 06a5e90
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 98 deletions.
4 changes: 3 additions & 1 deletion src/main/scala/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ class Redis(
disconnectedWithRetry orElse super.disconnected

def disconnectedWithRetry: Receive = {
case _@ (_: Request | _: Batch)
sender ! Status.Failure(new RedisDisconnectedException(s"Disconnected from $host:$port"))

case ("auth_ok", x: Connection.Connected)
retries = 0
notifyStateChange(x)
context.become(connected)
unstashAll()

case Reconnect
(connectionRetryDelay, connectionRetryAttempts) match {
Expand Down
16 changes: 3 additions & 13 deletions src/main/scala/RedisConnectionSupervisor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ private[brando] abstract class RedisConnectionSupervisor(
auth: Option[String],
var listeners: Set[ActorRef],
connectionTimeout: FiniteDuration,
connectionHeartbeatDelay: Option[FiniteDuration]) extends Actor with Stash {
connectionHeartbeatDelay: Option[FiniteDuration]) extends Actor {

import ConnectionSupervisor.{ Connect, Reconnect }
import context.dispatcher
Expand All @@ -29,11 +29,8 @@ private[brando] abstract class RedisConnectionSupervisor(
def receive = disconnected

def connected: Receive = handleListeners orElse {
case request: Request
connection forward request

case batch: Batch
connection forward batch
case m @ (_: Request | _: Batch)
connection forward m

case x: Connection.Disconnected
notifyStateChange(x)
Expand All @@ -42,12 +39,6 @@ private[brando] abstract class RedisConnectionSupervisor(
}

def disconnected: Receive = handleListeners orElse {
case request: Request
stash()

case batch: Batch
stash()

case Connect(host, port)
connection ! PoisonPill
connection = context.actorOf(Props(classOf[Connection],
Expand All @@ -62,7 +53,6 @@ private[brando] abstract class RedisConnectionSupervisor(
case ("auth_ok", x: Connection.Connected)
notifyStateChange(x)
context.become(connected)
unstashAll()

case x: Connection.ConnectionFailed
notifyStateChange(x)
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/RedisSentinel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class RedisSentinel(
disconnectedWithSentinel orElse super.disconnected

def disconnectedWithSentinel: Receive = {
case _@ (_: Request | _: Batch)
sender ! Status.Failure(new RedisDisconnectedException(s"Disconnected from $master"))

case Reconnect
context.system.scheduler.scheduleOnce(connectionRetryDelay, self, SentinelConnect)

Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import akka.util.ByteString

case class PubSubMessage(channel: String, message: String)

class RedisException(message: String) extends Exception(message) {
case class RedisException(message: String) extends Exception(message) {
override lazy val toString = "%s: %s\n".format(getClass.getName, message)
}
case class RedisDisconnectedException(message: String) extends Exception(message) {
override lazy val toString = "%s: %s\n".format(getClass.getName, message)
}

Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/Sentinel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Sentinel(
var sentinels: Seq[Sentinel.Server],
var listeners: Set[ActorRef],
connectionTimeout: FiniteDuration,
connectionHeartbeatDelay: Option[FiniteDuration]) extends Actor with Stash {
connectionHeartbeatDelay: Option[FiniteDuration]) extends Actor {

import Sentinel._
import context.dispatcher
Expand Down Expand Up @@ -72,7 +72,6 @@ class Sentinel(

case x: Connection.Connected
context.become(connected)
unstashAll()
retries = 0
val Server(host, port) = sentinels.head
notifyStateChange(Connection.Connected(host, port))
Expand All @@ -88,7 +87,7 @@ class Sentinel(
}

case request: Request
stash()
sender ! Status.Failure(new RedisException("Disconnected from the sentinel cluster"))

case _
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,35 +81,16 @@ class RedisClientSentinelTest extends TestKit(ActorSystem("RedisClientSentinelTe
Connected("127.0.0.1", 6379))
}

it("should stash requests") {
val redisProbe = TestProbe()
val sentinelProbe = TestProbe()

it("should return a failure when disconnected") {
val sentinel = system.actorOf(Sentinel(
sentinels = Seq(Server("localhost", 26379)),
listeners = Set(sentinelProbe.ref)))
sentinels = Seq(Server("localhost", 26379))))
val brando = system.actorOf(RedisSentinel(
master = "mymaster",
sentinelClient = sentinel,
listeners = Set(redisProbe.ref)))

redisProbe.expectMsg(
Connecting("127.0.0.1", 6379))
redisProbe.expectMsg(
Connected("127.0.0.1", 6379))

brando ! Disconnected("127.0.0.1", 6379)
redisProbe.expectMsg(
Disconnected("127.0.0.1", 6379))
sentinelClient = sentinel))

brando ! Request("PING")

redisProbe.expectMsg(
Connecting("127.0.0.1", 6379))
redisProbe.expectMsg(
Connected("127.0.0.1", 6379))

expectMsg(Some(Pong))
expectMsg(Status.Failure(new RedisDisconnectedException("Disconnected from mymaster")))
}
}
}
Expand Down
104 changes: 80 additions & 24 deletions src/test/scala/RedisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("ping") {
it("should respond with Pong") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("PING")

Expand All @@ -28,7 +30,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("flushdb") {
it("should respond with OK") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("FLUSHDB")

Expand All @@ -38,7 +42,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("set") {
it("should respond with OK") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("SET", "mykey", "somevalue")

Expand All @@ -51,7 +57,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("get") {
it("should respond with value option for existing key") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("SET", "mykey", "somevalue")

Expand All @@ -66,7 +74,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike
}

it("should respond with None for non-existent key") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("GET", "mykey")

Expand All @@ -76,7 +86,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("incr") {
it("should increment and return value for existing key") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("SET", "incr-test", "10")

Expand All @@ -91,7 +103,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike
}

it("should return 1 for non-existent key") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("INCR", "incr-test")

Expand All @@ -104,7 +118,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("sadd") {
it("should return number of members added to set") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("SADD", "sadd-test", "one")

Expand All @@ -125,7 +141,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("smembers") {
it("should return all members in a set") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("SADD", "smembers-test", "one", "two", "three", "four")

Expand All @@ -146,7 +164,10 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("pipelining") {
it("should respond to a Seq of multiple requests all at once") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

val ping = Request("PING")

brando ! ping
Expand All @@ -160,7 +181,10 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike
}

it("should support pipelines of setex commands") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

val setex = Request("SETEX", "pipeline-setex-path", "10", "Some data")

brando ! setex
Expand All @@ -173,7 +197,10 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike
}

it("should receive responses in the right order") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

val ping = Request("PING")
val setex = Request("SETEX", "pipeline-setex-path", "10", "Some data")

Expand Down Expand Up @@ -203,7 +230,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

val largeText = new String(bytes, "UTF-8")

val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("SET", "crime+and+punishment", largeText)

Expand All @@ -220,7 +249,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("error reply") {
it("should receive a failure with the redis error message") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("SET", "key")

Expand All @@ -242,7 +273,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("select") {
it("should execute commands on the selected database") {
val brando = system.actorOf(Redis("localhost", 6379, 5))
val brando = system.actorOf(Redis("localhost", 6379, 5, listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Request("SET", "mykey", "somevalue")

Expand Down Expand Up @@ -270,7 +303,10 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

describe("multi/exec requests") {
it("should support multi requests as an atomic transaction") {
val brando = system.actorOf(Redis("localhost", 6379, 5))
val brando = system.actorOf(Redis("localhost", 6379, 5, listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Batch(Request("MULTI"), Request("SET", "mykey", "somevalue"), Request("GET", "mykey"), Request("EXEC"))
expectMsg(List(Some(Ok),
Some(Queued),
Expand All @@ -279,7 +315,10 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike
}

it("should support multi requests with multiple results") {
val brando = system.actorOf(Redis("localhost", 6379, 5))
val brando = system.actorOf(Redis("localhost", 6379, 5, listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

brando ! Batch(Request("MULTI"), Request("SET", "mykey", "somevalue"), Request("GET", "mykey"), Request("GET", "mykey"), Request("EXEC"))
expectMsg(List(Some(Ok),
Some(Queued),
Expand All @@ -294,7 +333,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

it("should be able to subscribe to a pubsub channel") {
val channel = UUID.randomUUID().toString
val subscriber = system.actorOf(Redis())
val subscriber = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

subscriber ! Request("SUBSCRIBE", channel)

Expand All @@ -306,8 +347,13 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

it("should receive published messages from a pubsub channel") {
val channel = UUID.randomUUID().toString
val subscriber = system.actorOf(Redis())
val publisher = system.actorOf(Redis())
val subscriber = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

val publisher = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

subscriber ! Request("SUBSCRIBE", channel)

Expand All @@ -324,8 +370,13 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike

it("should be able to unsubscribe from a pubsub channel") {
val channel = UUID.randomUUID().toString
val subscriber = system.actorOf(Redis())
val publisher = system.actorOf(Redis())
val subscriber = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

val publisher = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

subscriber ! Request("SUBSCRIBE", channel)

Expand All @@ -349,10 +400,15 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike
}

describe("should be able to block on blpop") {
val brando = system.actorOf(Redis())
val brando = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

try {
val channel = UUID.randomUUID().toString
val popRedis = system.actorOf(Redis())
val popRedis = system.actorOf(Redis(listeners = Set(self)))
expectMsg(Connecting("localhost", 6379))
expectMsg(Connected("localhost", 6379))

val probeRedis = TestProbe()
val probePopRedis = TestProbe()
Expand Down
Loading

0 comments on commit 06a5e90

Please sign in to comment.