Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 207: Add Sentinels support for spark-redis library #245

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
33 changes: 33 additions & 0 deletions doc/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,36 @@ def twoEndpointExample ( sc: SparkContext) = {
}
```
If you want to use multiple Redis clusters/instances, an implicit RedisConfig can be used in a code block to specify the target cluster/instance.

### Connecting to Sentinels
#### Using parameters
```scala
df
.option("table", "table")
.option("key.column", "key")
.option("host", "host1,host2,host3")
.option("port", "6000")
.option("dbNum", "0")
.option("timeout", "2000")
.option("auth", "pwd")
.option("ssl", "true")
.option("sentinel.master", "mymaster")
.option("sentinel.auth", "sentinelPwd")
```

#### Using sparkContext
```scala
val spark = SparkSession
.builder()
.appName("myApp")
.master("local[*]")
.config("spark.redis.host", "host1,host2,host3")
.config("spark.redis.port", "6000")
.config("spark.redis.auth", "passwd")
.config("spark.redis.ssl", "true")
.config("spark.redis.sentinel.master", "mymaster")
.config("spark.redis.sentinel.auth", "sentinelPwd")
.getOrCreate()

val sc = spark.sparkContext
```
5 changes: 4 additions & 1 deletion doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ The supported configuration parameters are:
## Spark Context configuration parameters

* `spark.redis.host` - host or IP of the initial node we connect to. The connector will read the cluster
topology from the initial node, so there is no need to provide the rest of the cluster nodes.
topology from the initial node, so there is no need to provide the rest of the cluster nodes. For sentinel mode all sentinels should be add comma separated `sentinel1,sentinel2,...`
* `spark.redis.port` - the initial node's TCP redis port.
* `spark.redis.auth` - the initial node's AUTH password
* `spark.redis.db` - optional DB number. Avoid using this, especially in cluster mode.
* `spark.redis.timeout` - connection timeout in ms, 2000 ms by default
* `spark.redis.max.pipeline.size` - the maximum number of commands per pipeline (used to batch commands). The default value is 100.
* `spark.redis.scan.count` - count option of SCAN command (used to iterate over keys). The default value is 100.
* `spark.redis.ssl` - set to true to use tls
* `spark.redis.sentinel.master` - master node name in Sentinel mode
* `spark.redis.sentinel.auth` - the sentinel's password




2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<java.version>1.8</java.version>
<scala.major.version>2.11</scala.major.version>
<scala.complete.version>${scala.major.version}.12</scala.complete.version>
<jedis.version>3.2.0</jedis.version>
<jedis.version>3.3.0</jedis.version>
<spark.version>2.4.1</spark.version>
<plugins.scalatest.version>1.0</plugins.scalatest.version>
</properties>
Expand Down
62 changes: 43 additions & 19 deletions src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
Original file line number Diff line number Diff line change
@@ -1,31 +1,58 @@
package com.redislabs.provider.redis

import redis.clients.jedis.{JedisPoolConfig, Jedis, JedisPool}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig, JedisSentinelPool}
import redis.clients.jedis.exceptions.JedisConnectionException

import java.util.concurrent.ConcurrentHashMap

import redis.clients.jedis.util.Pool

import scala.collection.JavaConversions._


object ConnectionPool {
@transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, JedisPool] =
new ConcurrentHashMap[RedisEndpoint, JedisPool]()
@transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, Pool[Jedis]] =
new ConcurrentHashMap[RedisEndpoint, Pool[Jedis]]()

private lazy val buildPoolConfig = {
val poolConfig: JedisPoolConfig = new JedisPoolConfig()
poolConfig.setMaxTotal(250)
poolConfig.setMaxIdle(32)
poolConfig.setTestOnBorrow(false)
poolConfig.setTestOnReturn(false)
poolConfig.setTestWhileIdle(false)
poolConfig.setMinEvictableIdleTimeMillis(60000)
poolConfig.setTimeBetweenEvictionRunsMillis(30000)
poolConfig.setNumTestsPerEvictionRun(-1)

poolConfig
}

def connect(re: RedisEndpoint): Jedis = {
val pool = pools.getOrElseUpdate(re,
{
val poolConfig: JedisPoolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(250)
poolConfig.setMaxIdle(32)
poolConfig.setTestOnBorrow(false)
poolConfig.setTestOnReturn(false)
poolConfig.setTestWhileIdle(false)
poolConfig.setMinEvictableIdleTimeMillis(60000)
poolConfig.setTimeBetweenEvictionRunsMillis(30000)
poolConfig.setNumTestsPerEvictionRun(-1)

new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl)
val poolConfig = buildPoolConfig

if (null == re.master || re.master.trim.isEmpty) {
new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl)
} else {
val sentinels = re.host.split(",").map(x => x + ":" + re.port).toSet
new JedisSentinelPool(
re.master.trim, //masterName
sentinels, //set of sentinels
poolConfig, //initial poolConfig
re.timeout, //initial timeOut
2000, //initialsocketTimeout
null, //initaluser
re.auth, //initialPassword
re.dbNum, //initialDbNum
null, //clientName
2000, //SentinelConnTimeout
2000, //SentinelSocketTimeout
null, //SentinelUser
re.sentinelAuth, //SentinelPassword
null //SentinelClientName
)
}
}
)
var sleepTime: Int = 4
Expand All @@ -35,15 +62,12 @@ object ConnectionPool {
conn = pool.getResource
}
catch {
case e: JedisConnectionException if e.getCause.toString.
contains("ERR max number of clients reached") => {
case e: JedisConnectionException if e.getCause.toString.contains("ERR max number of clients reached") =>
if (sleepTime < 500) sleepTime *= 2
Thread.sleep(sleepTime)
}
case e: Exception => throw e
}
}
conn
}
}

16 changes: 10 additions & 6 deletions src/main/scala/com/redislabs/provider/redis/RedisConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
auth: String = null,
dbNum: Int = Protocol.DEFAULT_DATABASE,
timeout: Int = Protocol.DEFAULT_TIMEOUT,
ssl: Boolean = false)
ssl: Boolean = false,
master: String = null,
sentinelAuth: String = null)
extends Serializable {

/**
Expand All @@ -39,7 +41,9 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
conf.get("spark.redis.auth", null),
conf.getInt("spark.redis.db", Protocol.DEFAULT_DATABASE),
conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT),
conf.getBoolean("spark.redis.ssl", false)
conf.getBoolean("spark.redis.ssl", defaultValue = false),
conf.get("spark.redis.sentinel.master", null),
conf.get("spark.redis.sentinel.auth", null)
)
}

Expand Down Expand Up @@ -253,8 +257,8 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val port = replinfo.filter(_.contains("master_port:"))(0).trim.substring(12).toInt

//simply re-enter this function witht he master host/port
getNonClusterNodes(initialHost = new RedisEndpoint(host, port,
initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl))
getNonClusterNodes(initialHost = RedisEndpoint(host, port,
initialHost.auth, initialHost.dbNum, initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth))

} else {
//this is a master - take its slaves
Expand All @@ -270,7 +274,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val range = nodes.length
(0 until range).map(i =>
RedisNode(RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum,
initialHost.timeout, initialHost.ssl),
initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth),
0, 16383, i, range)).toArray
}
}
Expand Down Expand Up @@ -300,7 +304,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]])
val port = node.get(1).toString.toInt
RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum,
initialHost.timeout, initialHost.ssl),
initialHost.timeout, initialHost.ssl, initialHost.master, initialHost.sentinelAuth),
sPos,
ePos,
i,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ class RedisSourceRelation(override val sqlContext: SQLContext,
val auth = parameters.getOrElse("auth", null)
val dbNum = parameters.get("dbNum").map(_.toInt).getOrElse(Protocol.DEFAULT_DATABASE)
val timeout = parameters.get("timeout").map(_.toInt).getOrElse(Protocol.DEFAULT_TIMEOUT)
val ssl = parameters.get("ssl").map(_.toBoolean).getOrElse(false)
RedisEndpoint(host, port, auth, dbNum, timeout, ssl)
val ssl = parameters.get("ssl").exists(_.toBoolean)
val master = parameters.getOrElse("sentinel.master", null)
val sentinelAuth = parameters.getOrElse("sentinel.auth", null)
RedisEndpoint(host, port, auth, dbNum, timeout, ssl, master, sentinelAuth)
}
)
}
Expand Down