Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use JavaConverters to replace JavaConversions #136

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import redis.clients.jedis.exceptions.JedisConnectionException

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._


object ConnectionPool {
@transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, JedisPool] =
new ConcurrentHashMap[RedisEndpoint, JedisPool]()
def connect(re: RedisEndpoint): Jedis = {
val pool = pools.getOrElseUpdate(re,
val pool = pools.asScala.getOrElseUpdate(re,
{
val poolConfig: JedisPoolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(250)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/redislabs/provider/redis/RedisConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.spark.SparkConf
import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper, SafeEncoder}
import redis.clients.jedis.{Jedis, Protocol}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._


/**
Expand Down Expand Up @@ -254,7 +254,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
*/
private def getClusterNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
val conn = initialHost.connect()
val res = conn.clusterSlots().flatMap {
val res = conn.clusterSlots().asScala.flatMap {
slotInfoObj => {
val slotInfo = slotInfoObj.asInstanceOf[java.util.List[java.lang.Object]]
val sPos = slotInfo.get(0).toString.toInt
Expand All @@ -269,7 +269,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
* filter master.
*/
(0 until (slotInfo.size - 2)).map(i => {
val node = slotInfo(i + 2).asInstanceOf[java.util.List[java.lang.Object]]
val node = slotInfo.asScala(i + 2).asInstanceOf[java.util.List[java.lang.Object]]
val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]])
val port = node.get(1).toString.toInt
RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum,
Expand Down
26 changes: 13 additions & 13 deletions src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.spark.rdd.RDD
import redis.clients.jedis.{Jedis, ScanParams}
import redis.clients.jedis.util.JedisClusterCRC16

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.reflect.{ClassTag, classTag}


Expand Down Expand Up @@ -52,7 +52,7 @@ class RedisKVRDD(prev: RDD[String],
groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) =>
val conn = node.endpoint.connect()
val hashKeys = filterKeysByType(conn, nodeKeys, "hash")
val res = hashKeys.flatMap(conn.hgetAll).iterator
val res = hashKeys.flatMap(conn.hgetAll(_).asScala).iterator
conn.close()
res
}
Expand Down Expand Up @@ -81,7 +81,7 @@ class RedisListRDD(prev: RDD[String],
groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) =>
val conn = node.endpoint.connect()
val setKeys = filterKeysByType(conn, nodeKeys, "set")
val res = setKeys.flatMap(conn.smembers).iterator
val res = setKeys.flatMap(conn.smembers(_).asScala).iterator
conn.close()
res
}
Expand All @@ -91,7 +91,7 @@ class RedisListRDD(prev: RDD[String],
groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) =>
val conn = node.endpoint.connect()
val listKeys = filterKeysByType(conn, nodeKeys, "list")
val res = listKeys.flatMap(conn.lrange(_, 0, -1)).iterator
val res = listKeys.flatMap(conn.lrange(_, 0, -1).asScala).iterator
conn.close()
res
}
Expand Down Expand Up @@ -136,10 +136,10 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String],
val zsetKeys = filterKeysByType(conn, nodeKeys, "zset")
val res = {
if (classTag[T] == classTag[(String, Double)]) {
zsetKeys.flatMap(k => conn.zrangeWithScores(k, startPos, endPos)).
zsetKeys.flatMap(k => conn.zrangeWithScores(k, startPos, endPos).asScala).
map(tup => (tup.getElement, tup.getScore)).iterator
} else if (classTag[T] == classTag[String]) {
zsetKeys.flatMap(k => conn.zrange(k, startPos, endPos)).iterator
zsetKeys.flatMap(k => conn.zrange(k, startPos, endPos).asScala).iterator
} else {
throw new scala.Exception("Unknown RedisZSetRDD type")
}
Expand All @@ -158,10 +158,10 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String],
val zsetKeys = filterKeysByType(conn, nodeKeys, "zset")
val res = {
if (classTag[T] == classTag[(String, Double)]) {
zsetKeys.flatMap(k => conn.zrangeByScoreWithScores(k, startScore, endScore)).
zsetKeys.flatMap(k => conn.zrangeByScoreWithScores(k, startScore, endScore).asScala).
map(tup => (tup.getElement, tup.getScore)).iterator
} else if (classTag[T] == classTag[String]) {
zsetKeys.flatMap(k => conn.zrangeByScore(k, startScore, endScore)).iterator
zsetKeys.flatMap(k => conn.zrangeByScore(k, startScore, endScore).asScala).iterator
} else {
throw new scala.Exception("Unknown RedisZSetRDD type")
}
Expand Down Expand Up @@ -428,17 +428,17 @@ trait Keys {
val endpoints = nodes.map(_.endpoint).distinct

if (isRedisRegex(keyPattern)) {
endpoints.iterator.map { endpoint =>
endpoints.iterator.flatMap { endpoint =>
val keys = new util.HashSet[String]()
val conn = endpoint.connect()
val params = new ScanParams().`match`(keyPattern).count(readWriteConfig.scanCount)
keys.addAll(scanKeys(conn, params).filter { key =>
keys.addAll(scanKeys(conn, params).asScala.filter { key =>
val slot = JedisClusterCRC16.getSlot(key)
slot >= sPos && slot <= ePos
})
}.asJava)
conn.close()
keys.iterator()
}.flatten
keys.iterator().asScala
}
} else {
val slot = JedisClusterCRC16.getSlot(keyPattern)
if (slot >= sPos && slot <= ePos) Iterator(keyPattern) else Iterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.spark.streaming.receiver.Receiver
import org.spark_project.guava.util.concurrent.RateLimiter
import redis.clients.jedis.{EntryID, Jedis, StreamEntry}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

/**
* Receives messages from Redis Stream
Expand Down Expand Up @@ -81,12 +81,12 @@ class RedisStreamReceiver(consumersConfig: Seq[ConsumerConfig],
false,
unackId)

val unackMessagesMap = response.map(e => (e.getKey, e.getValue)).toMap
val unackMessagesMap = response.asScala.map(e => (e.getKey, e.getValue)).toMap
val entries = unackMessagesMap(conf.streamKey)
if (entries.isEmpty) {
continue = false
}
storeAndAck(conf.streamKey, entries)
storeAndAck(conf.streamKey, entries.asScala)
}
}

Expand All @@ -103,9 +103,9 @@ class RedisStreamReceiver(consumersConfig: Seq[ConsumerConfig],
false,
newMessId)

for (streamMessages <- response) {
for (streamMessages <- response.asScala) {
val key = streamMessages.getKey
val entries = streamMessages.getValue
val entries = streamMessages.getValue.asScala
storeAndAck(key, entries)
}
}
Expand All @@ -128,7 +128,7 @@ class RedisStreamReceiver(consumersConfig: Seq[ConsumerConfig],
def entriesToItems(key: String, entries: Seq[StreamEntry]): Seq[StreamItem] = {
entries.map { e =>
val itemId = ItemId(e.getID.getTime, e.getID.getSequence)
StreamItem(key, itemId, e.getFields.toMap)
StreamItem(key, itemId, e.getFields.asScala.toMap)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import redis.clients.jedis.{PipelineBase, Protocol}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

class RedisSourceRelation(override val sqlContext: SQLContext,
Expand Down Expand Up @@ -75,7 +74,7 @@ class RedisSourceRelation(override val sqlContext: SQLContext,
private val keysPatternOpt: Option[String] = parameters.get(SqlOptionKeysPattern)
private val numPartitions = parameters.get(SqlOptionNumPartitions).map(_.toInt)
.getOrElse(SqlOptionNumPartitionsDefault)
private val persistenceModel = parameters.getOrDefault(SqlOptionModel, SqlOptionModelHash)
private val persistenceModel = parameters.asJava.getOrDefault(SqlOptionModel, SqlOptionModelHash)
private val persistence = RedisPersistence(persistenceModel)
private val tableNameOpt: Option[String] = parameters.get(SqlOptionTableName)
private val ttl = parameters.get(SqlOptionTTL).map(_.toInt).getOrElse(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.time.{Millis, Span}
import redis.clients.jedis.EntryID

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

// scalastyle:off multiple.string.literals
trait RedisXStreamSuite extends SparkStreamingRedisSuite with Matchers {
Expand Down Expand Up @@ -41,9 +41,9 @@ trait RedisXStreamSuite extends SparkStreamingRedisSuite with Matchers {

// write to stream
withConnection(redisConfig.connectionForKey(streamKey)) { conn =>
conn.xadd(streamKey, new EntryID(1, 0), Map("a" -> "1", "z" -> "4"))
conn.xadd(streamKey, new EntryID(1, 1), Map("b" -> "2"))
conn.xadd(streamKey, new EntryID(2, 0), Map("c" -> "3"))
conn.xadd(streamKey, new EntryID(1, 0), Map("a" -> "1", "z" -> "4").asJava)
conn.xadd(streamKey, new EntryID(1, 1), Map("b" -> "2").asJava)
conn.xadd(streamKey, new EntryID(2, 0), Map("c" -> "3").asJava)
}

ssc.start()
Expand Down Expand Up @@ -86,9 +86,9 @@ trait RedisXStreamSuite extends SparkStreamingRedisSuite with Matchers {

// write to stream
withConnection(redisConfig.connectionForKey(streamKey)) { conn =>
conn.xadd(streamKey, new EntryID(1, 0), Map("a" -> "1", "z" -> "4"))
conn.xadd(streamKey, new EntryID(1, 1), Map("b" -> "2"))
conn.xadd(streamKey, new EntryID(2, 0), Map("c" -> "3"))
conn.xadd(streamKey, new EntryID(1, 0), Map("a" -> "1", "z" -> "4").asJava)
conn.xadd(streamKey, new EntryID(1, 1), Map("b" -> "2").asJava)
conn.xadd(streamKey, new EntryID(2, 0), Map("c" -> "3").asJava)
}

ssc.start()
Expand Down Expand Up @@ -137,11 +137,11 @@ trait RedisXStreamSuite extends SparkStreamingRedisSuite with Matchers {

// write to stream
withConnection(redisConfig.connectionForKey(stream1Key)) { conn =>
conn.xadd(stream1Key, new EntryID(1, 0), Map("a" -> "1", "z" -> "4"))
conn.xadd(stream1Key, new EntryID(1, 0), Map("a" -> "1", "z" -> "4").asJava)
}
withConnection(redisConfig.connectionForKey(stream2Key)) { conn =>
conn.xadd(stream2Key, new EntryID(1, 1), Map("b" -> "2"))
conn.xadd(stream2Key, new EntryID(2, 0), Map("c" -> "3"))
conn.xadd(stream2Key, new EntryID(1, 1), Map("b" -> "2").asJava)
conn.xadd(stream2Key, new EntryID(2, 0), Map("c" -> "3").asJava)
}

ssc.start()
Expand Down