Skip to content

Commit

Permalink
Merge pull request mDialog#46 from damienlevin/sentinel_wip
Browse files Browse the repository at this point in the history
Added Sentinel support
  • Loading branch information
chrisdinn committed May 26, 2015
2 parents d8e0e66 + 06a5e90 commit 87d1883
Show file tree
Hide file tree
Showing 24 changed files with 2,835 additions and 538 deletions.
10 changes: 8 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ language: scala
scala:
- "2.10.4"
- "2.11.0"
services:
- redis-server
cache:
directories:
- $HOME/.ivy2
before_script:
- sudo redis-server `pwd`/test-config/sentinel.conf --sentinel &
- sudo redis-server `pwd`/test-config/redis.conf --loglevel verbose
- sudo mkdir /var/lib/redis-slave
- sudo redis-server `pwd`/test-config/redis-slave.conf --loglevel verbose
- cat /var/log/redis/redis-slave-server.log
- cat /var/log/redis/redis-server.log
- sleep 5
108 changes: 86 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@ In your build.sbt

resolvers += "http://chrisdinn.github.io/releases/"

libraryDependencies += "com.digital-achiever" %% "brando" % "2.1.2"
libraryDependencies += "com.digital-achiever" %% "brando" % "3.0.0-SNAPSHOT"

### Getting started

Brando is a lightweight wrapper around the [Redis protocol](http://redis.io/topics/protocol).

Create a Brando actor with your server host and port.
Create a Redis actor with your server host and port.

import brando._

val redis = system.actorOf(Brando("localhost", 6379))
val redis = system.actorOf(Redis("localhost", 6379))

You should specify a database and password if you intend to use them.

val redis = system.actorOf(Brando("localhost", 6379, database = Some(5), auth = Some("password")))
val redis = system.actorOf(Redis("localhost", 6379, database = 5, auth = "password"))

This is important; if your Brando actor restarts you want be sure it reconnects successfully and to the same database.
This is important; if your Redis actor restarts you want be sure it reconnects successfully and to the same database.

Next, send it a command and get your response as a reply.

Expand All @@ -47,7 +47,7 @@ Error replies are returned as `akka.actor.Status.Failure` objects containing an

redis ! Request("EXPIRE", "1", "key")
// Response: Failure(brando.BrandoException: ERR value is not an integer or out of range)
// Response: Failure(brando.RedisException: ERR value is not an integer or out of range)

Integer replies are returned as `Option[Long]`.

Expand Down Expand Up @@ -75,7 +75,7 @@ NULL replies are returned as `None` and may appear either on their own or nested

If you're not sure what to expect in response to a request, please refer to the Redis command documentation at [http://redis.io/commands](http://redis.io/commands) where the reply type for each is clearly stated.

To ensure that a list of requests are executed back to back, the brando actor can receive the following message :
To ensure that a list of requests are executed back to back, the Redis actor can receive the following message :

redis ! Batch(Request("MULTI"), Request("SET", "mykey", "somevalue"), Request("GET", "mykey"), Request("EXEC"))

Expand Down Expand Up @@ -106,28 +106,65 @@ Use the provided response extractors to map your Redis reply to a more appropria
### Monitoring Connection State Changes

If a set of listeners is provided to the Brando actor when it is created , it will inform the those listeners about state changes to the underlying Redis connection. For example (from inside an actor):
If a set of listeners is provided to the Redis actor when it is created , it will inform the those listeners about state changes to the underlying Redis connection. For example (from inside an actor):

val redis = context.actorOf(Brando("localhost", 6379, listeners = Set(self)))
val redis = context.actorOf(Redis("localhost", 6379, listeners = Set(self)))

Currently, the possible messages sent to each listener include the following:

* `Connecting`: When creating a TCP connection.
* `Connected`: When a TCP connection has been created, and Authentication (if applicable) has succeeded.
* `Disconnected`: The connection has been lost. Brando transparently handles disconnects and will automatically reconnect, so typically no user action at all is needed here. During the time that Brando is disconnected, Redis commands sent to Brando will be queued, and will be processed when a connection is established.
* `Disconnected`: The connection has been lost. Redis transparently handles disconnects and will automatically reconnect, so typically no user action at all is needed here. During the time that Redis is disconnected, Redis commands sent will be queued be processed once the connection is reestablished.
* `AuthenticationFailed`: The TCP connected was made, but Redis auth failed.
* `ConnectionFailed`: A connection could not be (re-) established after three attempts. Brando will not attempt to recover from this state; the user should take action.
* `ConnectionFailed`: A connection could not be established after the number of attempts defined during creation `connectionRetryAttempts`. Brando will not attempt to recover from this state; the user should take action.

All these messages inherit from the `BrandoStateChange` trait.
All these messages inherit from the `Connection.StateChange` trait.


### Sentinel

#### Sentinel Client

Sentinel provides support for `monitoring`, `notification` and `automatic failover` using [sentinel](http://redis.io/topics/sentinel). It is implemented based on the following [guidelines](http://redis.io/topics/sentinel-clients) and requires redis 2.8.12 or later.

A sentinel client can be created like this. Here, we are using two servers and we provide a listener to receive `Connection.StateChange` events.

val sentinel = system.actorOf(Sentinel(Seq(
Server("localhost", 26380),
Server("localhost", 26379)), Set(probe.ref)))

You can listen for events using the following:

sentinel ! Request("SENTINEL","SUBSCRIBE", "failover-end")

You can also send commands such as

sentinel ! Request("SENTINEL", "MASTERS")


#### Redis with Sentinel

Redis can be used with Sentinel to provide automatic failover and discovery. To do so you need to create a `Sentinel` and a `RedisSentinel` actor. In this example we are connecting to the master `mymaster`

val sentinel = system.actorOf(Sentinel(Seq(
Server("localhost", 26380),
Server("localhost", 26379))))

val redis = system.actorOf(RedisSentinel("mymaster", sentinel))

redis ! Request("PING")

For reliability we encourage to pass `connectionHeartbeatDelay` when using RedisSentinel, this will generate a heartbeat to Redis and will improve failures detections in the case of network partitions.

### Sharding

Brando provides support for sharding, as outlined [in the Redis documentation](http://redis.io/topics/partitioning) and in [this blog post from antirez](http://oldblog.antirez.com/post/redis-presharding.html).

To use it, simply create an instance of `ShardManager`, passing it a list of Redis shards you'd like it to connect to. From there, we create a pool of `Brando` instances - one for each shard.
To use it, simply create an instance of `ShardManager`, passing it a list of Redis shards you'd like it to connect to. From there, we create a pool of `Redis` instances - one for each shard.

val shards = Seq(Shard("redis1", "10.0.0.1", 6379),
Shard("redis2", "10.0.0.2", 6379),
Shard("redis3", "10.0.0.3", 6379))
val shards = Seq(RedisShard("redis1", "10.0.0.1", 6379),
RedisShard("redis2", "10.0.0.2", 6379),
RedisShard("redis3", "10.0.0.3", 6379))

val shardManager = context.actorOf(ShardManager(shards))

Expand All @@ -147,19 +184,46 @@ Note that the `ShardManager` explicitly requires a key for all operations except

Individual shards can have their configuration updated on the fly. To do this, send a `Shard` message to `ShardManager`.

shardManager ! Shard("redis1", "10.0.0.4", 6379)
shardManager ! RedisShard("redis1", "10.0.0.4", 6379)


val shardManager = context.actorOf(ShardManager(shards, listeners = Set(self)))

The `ShardManager` will forward all `Connection.StateChange` messages when a shard changes state.


#### Sharding with sentinel

It's possible to use sharding with Sentinel, to do so you need to use `SentinelShard` instead of `RedisShard`

val shards = Seq(
SentinelShard("mymaster1"),
SentinelShard("mymaster2"))

val sentinel = system.actorOf(Sentinel()) //defaults host and port are localhost:26379

val shardManager = context.actorOf(ShardManager(shards,sentinel))

## Run the tests

* Start sentinel

sudo redis-sentinel redis-config/sentinel.conf --sentinel

* Start a Redis master and slave

This is intended to support failover via [Redis Sentinel](http://redis.io/topics/sentinel). Note that the id of the shard __MUST__ match one of the original shards configured when the `ShardManager` instance was created. Adding new shards is not supported.
sudo redis-server test-config/redis.conf --loglevel verbose
sudo mkdir /var/lib/redis-slave
sudo redis-server test-config/redis-slave.conf --loglevel verbose

State changes such as disconnects and connection failures can be monitored by providing a set of listeners to the `ShardManager`:
* Run the tests

val shardManager = context.actorOf(ShardManager(shards, listeners = Set(self)))
sbt test

The `ShardManager` will send a `ShardStateChange(shard, state)` message when a shard changes state; here `shard` is a shard object indicating which shard has changed state, and `state` is a `BrandoStateChange` object, documented above, indicating which new state the shard has entered.

## Documentation

Read the API documentation here: [http://chrisdinn.github.io/api/brando-2.1.2/](http://chrisdinn.github.io/api/brando-2.1.2/)
Read the API documentation here: [http://chrisdinn.github.io/api/brando-3.0.0-SNAPSHOT/](http://chrisdinn.github.io/api/brando-3.0.0-SNAPSHOT/)

## Mailing list

Expand Down
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "brando"

organization := "com.digital-achiever"

version := "2.1.2"
version := "3.0.0-SNAPSHOT"

scalaVersion := "2.11.4"

Expand All @@ -11,11 +11,13 @@ crossScalaVersions := Seq("2.10.4", "2.11.4")
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature")

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.3.4",
"com.typesafe.akka" %% "akka-actor" % "2.3.9",
"org.scalatest" %% "scalatest" % "2.1.3" % "test",
"com.typesafe.akka" %% "akka-testkit" % "2.3.4" % "test"
"com.typesafe.akka" %% "akka-testkit" % "2.3.9" % "test"
)

parallelExecution in Test := false

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

publishTo <<= version { (v: String) =>
Expand Down
15 changes: 7 additions & 8 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
brando {
# Default connection retry delay on disconnect
connection_retry = 2s
brando.connection{
timeout = 2s

#Max number of times to attempt connection before failing
#connection_attempts = 3
}
#Delay before trying to reconnect
retry.delay = 1 s

#Redis connection/authentication timeout
redis.timeout = 2s
#Number of connect attempts before failure
retry.attempts = 3
}
Loading

0 comments on commit 87d1883

Please sign in to comment.