diff --git a/src/main/scala/Redis.scala b/src/main/scala/Redis.scala index 05f5bfa..68d664f 100644 --- a/src/main/scala/Redis.scala +++ b/src/main/scala/Redis.scala @@ -63,11 +63,13 @@ class Redis( disconnectedWithRetry orElse super.disconnected def disconnectedWithRetry: Receive = { + case _@ (_: Request | _: Batch) ⇒ + sender ! Status.Failure(new RedisDisconnectedException(s"Disconnected from $host:$port")) + case ("auth_ok", x: Connection.Connected) ⇒ retries = 0 notifyStateChange(x) context.become(connected) - unstashAll() case Reconnect ⇒ (connectionRetryDelay, connectionRetryAttempts) match { diff --git a/src/main/scala/RedisConnectionSupervisor.scala b/src/main/scala/RedisConnectionSupervisor.scala index 4e4d7d9..a40c4c3 100644 --- a/src/main/scala/RedisConnectionSupervisor.scala +++ b/src/main/scala/RedisConnectionSupervisor.scala @@ -17,7 +17,7 @@ private[brando] abstract class RedisConnectionSupervisor( auth: Option[String], var listeners: Set[ActorRef], connectionTimeout: FiniteDuration, - connectionHeartbeatDelay: Option[FiniteDuration]) extends Actor with Stash { + connectionHeartbeatDelay: Option[FiniteDuration]) extends Actor { import ConnectionSupervisor.{ Connect, Reconnect } import context.dispatcher @@ -29,11 +29,8 @@ private[brando] abstract class RedisConnectionSupervisor( def receive = disconnected def connected: Receive = handleListeners orElse { - case request: Request ⇒ - connection forward request - - case batch: Batch ⇒ - connection forward batch + case m @ (_: Request | _: Batch) ⇒ + connection forward m case x: Connection.Disconnected ⇒ notifyStateChange(x) @@ -42,12 +39,6 @@ private[brando] abstract class RedisConnectionSupervisor( } def disconnected: Receive = handleListeners orElse { - case request: Request ⇒ - stash() - - case batch: Batch ⇒ - stash() - case Connect(host, port) ⇒ connection ! PoisonPill connection = context.actorOf(Props(classOf[Connection], @@ -62,7 +53,6 @@ private[brando] abstract class RedisConnectionSupervisor( case ("auth_ok", x: Connection.Connected) ⇒ notifyStateChange(x) context.become(connected) - unstashAll() case x: Connection.ConnectionFailed ⇒ notifyStateChange(x) diff --git a/src/main/scala/RedisSentinel.scala b/src/main/scala/RedisSentinel.scala index b3baa50..4910a26 100644 --- a/src/main/scala/RedisSentinel.scala +++ b/src/main/scala/RedisSentinel.scala @@ -62,6 +62,9 @@ class RedisSentinel( disconnectedWithSentinel orElse super.disconnected def disconnectedWithSentinel: Receive = { + case _@ (_: Request | _: Batch) ⇒ + sender ! Status.Failure(new RedisDisconnectedException(s"Disconnected from $master")) + case Reconnect ⇒ context.system.scheduler.scheduleOnce(connectionRetryDelay, self, SentinelConnect) diff --git a/src/main/scala/Response.scala b/src/main/scala/Response.scala index 0248aad..75b63f4 100644 --- a/src/main/scala/Response.scala +++ b/src/main/scala/Response.scala @@ -4,7 +4,10 @@ import akka.util.ByteString case class PubSubMessage(channel: String, message: String) -class RedisException(message: String) extends Exception(message) { +case class RedisException(message: String) extends Exception(message) { + override lazy val toString = "%s: %s\n".format(getClass.getName, message) +} +case class RedisDisconnectedException(message: String) extends Exception(message) { override lazy val toString = "%s: %s\n".format(getClass.getName, message) } diff --git a/src/main/scala/Sentinel.scala b/src/main/scala/Sentinel.scala index ae09eb7..9edab87 100644 --- a/src/main/scala/Sentinel.scala +++ b/src/main/scala/Sentinel.scala @@ -33,7 +33,7 @@ class Sentinel( var sentinels: Seq[Sentinel.Server], var listeners: Set[ActorRef], connectionTimeout: FiniteDuration, - connectionHeartbeatDelay: Option[FiniteDuration]) extends Actor with Stash { + connectionHeartbeatDelay: Option[FiniteDuration]) extends Actor { import Sentinel._ import context.dispatcher @@ -72,7 +72,6 @@ class Sentinel( case x: Connection.Connected ⇒ context.become(connected) - unstashAll() retries = 0 val Server(host, port) = sentinels.head notifyStateChange(Connection.Connected(host, port)) @@ -88,7 +87,7 @@ class Sentinel( } case request: Request ⇒ - stash() + sender ! Status.Failure(new RedisException("Disconnected from the sentinel cluster")) case _ ⇒ } diff --git a/src/test/scala/RedisSentinelTest.scala b/src/test/scala/RedisClientSentinelTest.scala similarity index 78% rename from src/test/scala/RedisSentinelTest.scala rename to src/test/scala/RedisClientSentinelTest.scala index 9396163..b511d31 100644 --- a/src/test/scala/RedisSentinelTest.scala +++ b/src/test/scala/RedisClientSentinelTest.scala @@ -81,35 +81,16 @@ class RedisClientSentinelTest extends TestKit(ActorSystem("RedisClientSentinelTe Connected("127.0.0.1", 6379)) } - it("should stash requests") { - val redisProbe = TestProbe() - val sentinelProbe = TestProbe() - + it("should return a failure when disconnected") { val sentinel = system.actorOf(Sentinel( - sentinels = Seq(Server("localhost", 26379)), - listeners = Set(sentinelProbe.ref))) + sentinels = Seq(Server("localhost", 26379)))) val brando = system.actorOf(RedisSentinel( master = "mymaster", - sentinelClient = sentinel, - listeners = Set(redisProbe.ref))) - - redisProbe.expectMsg( - Connecting("127.0.0.1", 6379)) - redisProbe.expectMsg( - Connected("127.0.0.1", 6379)) - - brando ! Disconnected("127.0.0.1", 6379) - redisProbe.expectMsg( - Disconnected("127.0.0.1", 6379)) + sentinelClient = sentinel)) brando ! Request("PING") - redisProbe.expectMsg( - Connecting("127.0.0.1", 6379)) - redisProbe.expectMsg( - Connected("127.0.0.1", 6379)) - - expectMsg(Some(Pong)) + expectMsg(Status.Failure(new RedisDisconnectedException("Disconnected from mymaster"))) } } } diff --git a/src/test/scala/RedisTest.scala b/src/test/scala/RedisTest.scala index 4d1c1ce..e7c61eb 100644 --- a/src/test/scala/RedisTest.scala +++ b/src/test/scala/RedisTest.scala @@ -18,7 +18,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("ping") { it("should respond with Pong") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("PING") @@ -28,7 +30,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("flushdb") { it("should respond with OK") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("FLUSHDB") @@ -38,7 +42,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("set") { it("should respond with OK") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("SET", "mykey", "somevalue") @@ -51,7 +57,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("get") { it("should respond with value option for existing key") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("SET", "mykey", "somevalue") @@ -66,7 +74,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike } it("should respond with None for non-existent key") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("GET", "mykey") @@ -76,7 +86,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("incr") { it("should increment and return value for existing key") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("SET", "incr-test", "10") @@ -91,7 +103,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike } it("should return 1 for non-existent key") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("INCR", "incr-test") @@ -104,7 +118,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("sadd") { it("should return number of members added to set") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("SADD", "sadd-test", "one") @@ -125,7 +141,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("smembers") { it("should return all members in a set") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("SADD", "smembers-test", "one", "two", "three", "four") @@ -146,7 +164,10 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("pipelining") { it("should respond to a Seq of multiple requests all at once") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) + val ping = Request("PING") brando ! ping @@ -160,7 +181,10 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike } it("should support pipelines of setex commands") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) + val setex = Request("SETEX", "pipeline-setex-path", "10", "Some data") brando ! setex @@ -173,7 +197,10 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike } it("should receive responses in the right order") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) + val ping = Request("PING") val setex = Request("SETEX", "pipeline-setex-path", "10", "Some data") @@ -203,7 +230,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike val largeText = new String(bytes, "UTF-8") - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("SET", "crime+and+punishment", largeText) @@ -220,7 +249,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("error reply") { it("should receive a failure with the redis error message") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("SET", "key") @@ -242,7 +273,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("select") { it("should execute commands on the selected database") { - val brando = system.actorOf(Redis("localhost", 6379, 5)) + val brando = system.actorOf(Redis("localhost", 6379, 5, listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) brando ! Request("SET", "mykey", "somevalue") @@ -270,7 +303,10 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike describe("multi/exec requests") { it("should support multi requests as an atomic transaction") { - val brando = system.actorOf(Redis("localhost", 6379, 5)) + val brando = system.actorOf(Redis("localhost", 6379, 5, listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) + brando ! Batch(Request("MULTI"), Request("SET", "mykey", "somevalue"), Request("GET", "mykey"), Request("EXEC")) expectMsg(List(Some(Ok), Some(Queued), @@ -279,7 +315,10 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike } it("should support multi requests with multiple results") { - val brando = system.actorOf(Redis("localhost", 6379, 5)) + val brando = system.actorOf(Redis("localhost", 6379, 5, listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) + brando ! Batch(Request("MULTI"), Request("SET", "mykey", "somevalue"), Request("GET", "mykey"), Request("GET", "mykey"), Request("EXEC")) expectMsg(List(Some(Ok), Some(Queued), @@ -294,7 +333,9 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike it("should be able to subscribe to a pubsub channel") { val channel = UUID.randomUUID().toString - val subscriber = system.actorOf(Redis()) + val subscriber = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) subscriber ! Request("SUBSCRIBE", channel) @@ -306,8 +347,13 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike it("should receive published messages from a pubsub channel") { val channel = UUID.randomUUID().toString - val subscriber = system.actorOf(Redis()) - val publisher = system.actorOf(Redis()) + val subscriber = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) + + val publisher = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) subscriber ! Request("SUBSCRIBE", channel) @@ -324,8 +370,13 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike it("should be able to unsubscribe from a pubsub channel") { val channel = UUID.randomUUID().toString - val subscriber = system.actorOf(Redis()) - val publisher = system.actorOf(Redis()) + val subscriber = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) + + val publisher = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) subscriber ! Request("SUBSCRIBE", channel) @@ -349,10 +400,15 @@ class RedisClientTest extends TestKit(ActorSystem("RedisTest")) with FunSpecLike } describe("should be able to block on blpop") { - val brando = system.actorOf(Redis()) + val brando = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) + try { val channel = UUID.randomUUID().toString - val popRedis = system.actorOf(Redis()) + val popRedis = system.actorOf(Redis(listeners = Set(self))) + expectMsg(Connecting("localhost", 6379)) + expectMsg(Connected("localhost", 6379)) val probeRedis = TestProbe() val probePopRedis = TestProbe() diff --git a/src/test/scala/SentinelTest.scala b/src/test/scala/SentinelTest.scala index 56687e0..377b7d7 100644 --- a/src/test/scala/SentinelTest.scala +++ b/src/test/scala/SentinelTest.scala @@ -99,32 +99,28 @@ class SentinelTest extends TestKit(ActorSystem("SentinelTest")) with FunSpecLike } describe("Request") { - it("should stash requests when disconnected") { - val probe = TestProbe() + it("should return a failure when disconnected") { val sentinel = system.actorOf(Sentinel(Seq( - Server("localhost", 26379)), Set(probe.ref))) - - probe.expectMsg(Connecting("localhost", 26379)) - probe.expectMsg(Connected("localhost", 26379)) - - sentinel ! Disconnected("localhost", 26379) - probe.expectMsg(Disconnected("localhost", 26379)) + Server("localhost", 26379)))) sentinel ! Request("PING") - probe.expectMsg(Connecting("localhost", 26379)) - probe.expectMsg(Connected("localhost", 26379)) + expectMsg(Status.Failure(new RedisException("Disconnected from the sentinel cluster"))) - expectMsg(Some(Pong)) } } describe("Subscriptions") { it("should receive pub/sub notifications") { val sentinel = system.actorOf(Sentinel(Seq( - Server("localhost", 26379)))) + Server("localhost", 26379)), Set(self))) val sentinel2 = system.actorOf(Sentinel(Seq( - Server("localhost", 26379)))) + Server("localhost", 26379)), Set(self))) + + expectMsg(Connecting("localhost", 26379)) + expectMsg(Connecting("localhost", 26379)) + expectMsg(Connected("localhost", 26379)) + expectMsg(Connected("localhost", 26379)) sentinel ! Request("subscribe", "+failover-end") diff --git a/src/test/scala/ShardManagerTest.scala b/src/test/scala/ShardManagerTest.scala index a698918..da6b101 100644 --- a/src/test/scala/ShardManagerTest.scala +++ b/src/test/scala/ShardManagerTest.scala @@ -51,13 +51,19 @@ class ShardManagerTest extends TestKit(ActorSystem("ShardManagerTest")) describe("sending requests") { describe("using sentinel") { it("should forward each request to the appropriate client transparently") { + val sentinelProbe = TestProbe() val redisProbe = TestProbe() - val sentinel = system.actorOf(Sentinel()) + val sentinel = system.actorOf(Sentinel(listeners = Set(sentinelProbe.ref))) val shardManager = system.actorOf(ShardManager( shards = Seq(SentinelShard("mymaster", 0)), sentinelClient = Some(sentinel), listeners = Set(redisProbe.ref))) + sentinelProbe.expectMsg( + Connecting("localhost", 26379)) + sentinelProbe.expectMsg( + Connected("localhost", 26379)) + redisProbe.expectMsg( Connecting("127.0.0.1", 6379)) redisProbe.expectMsg( @@ -75,12 +81,36 @@ class ShardManagerTest extends TestKit(ActorSystem("ShardManagerTest")) it("should forward each request to the appropriate client transparently") { val shards = Seq( - RedisShard("server1", "localhost", 6379, 0), - RedisShard("server2", "localhost", 6379, 1), - RedisShard("server3", "localhost", 6379, 2)) - - val shardManager = TestActorRef[ShardManager](ShardManager( - shards)) + RedisShard("server1", "127.0.0.1", 6379, 0), + RedisShard("server2", "127.0.0.1", 6379, 1), + RedisShard("server3", "127.0.0.1", 6379, 2)) + + val sentinelProbe = TestProbe() + val redisProbe = TestProbe() + val sentinel = system.actorOf(Sentinel(listeners = Set(sentinelProbe.ref))) + val shardManager = system.actorOf(ShardManager( + shards = shards, + sentinelClient = Some(sentinel), + listeners = Set(redisProbe.ref))) + + sentinelProbe.expectMsg( + Connecting("localhost", 26379)) + sentinelProbe.expectMsg( + Connected("localhost", 26379)) + + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) shardManager ! ("key", Request("SET", "shard_manager_test", "some value")) @@ -93,12 +123,27 @@ class ShardManagerTest extends TestKit(ActorSystem("ShardManagerTest")) it("should infer the key from the params list") { val shards = Seq( - RedisShard("server1", "localhost", 6379, 0), - RedisShard("server2", "localhost", 6379, 1), - RedisShard("server3", "localhost", 6379, 2)) + RedisShard("server1", "127.0.0.1", 6379, 0), + RedisShard("server2", "127.0.0.1", 6379, 1), + RedisShard("server3", "127.0.0.1", 6379, 2)) + val redisProbe = TestProbe() val shardManager = TestActorRef[ShardManager](ShardManager( - shards)) + shards, listeners = Set(redisProbe.ref))) + + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) shardManager ! Request("SET", "shard_manager_test", "some value") @@ -111,12 +156,27 @@ class ShardManagerTest extends TestKit(ActorSystem("ShardManagerTest")) it("should fail with IllegalArgumentException when params is empty") { val shards = Seq( - RedisShard("server1", "localhost", 6379, 0), - RedisShard("server2", "localhost", 6379, 1), - RedisShard("server3", "localhost", 6379, 2)) + RedisShard("server1", "127.0.0.1", 6379, 0), + RedisShard("server2", "127.0.0.1", 6379, 1), + RedisShard("server3", "127.0.0.1", 6379, 2)) + val redisProbe = TestProbe() val shardManager = TestActorRef[ShardManager](ShardManager( - shards)) + shards, listeners = Set(redisProbe.ref))) + + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) shardManager ! Request("SET") @@ -125,12 +185,27 @@ class ShardManagerTest extends TestKit(ActorSystem("ShardManagerTest")) it("should broadcast a Request to all shards") { val shards = Seq( - RedisShard("server1", "localhost", 6379, 0), - RedisShard("server2", "localhost", 6379, 1), - RedisShard("server3", "localhost", 6379, 2)) + RedisShard("server1", "127.0.0.1", 6379, 0), + RedisShard("server2", "127.0.0.1", 6379, 1), + RedisShard("server3", "127.0.0.1", 6379, 2)) + val redisProbe = TestProbe() val shardManager = TestActorRef[ShardManager](ShardManager( - shards)) + shards, listeners = Set(redisProbe.ref))) + + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connecting("127.0.0.1", 6379)) + + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) + redisProbe.expectMsg( + Connected("127.0.0.1", 6379)) val listName = scala.util.Random.nextString(5)