From b07ac6522e4fb73601b5d984f1fda43d8a41b390 Mon Sep 17 00:00:00 2001 From: Wennn Date: Sun, 24 Feb 2019 21:39:21 +0800 Subject: [PATCH] use JavaConverters to replace JavaConversions --- .../provider/redis/ConnectionPool.scala | 4 +-- .../provider/redis/RedisConfig.scala | 6 ++--- .../provider/redis/rdd/RedisRDD.scala | 26 +++++++++---------- .../redis/streaming/RedisStreamReceiver.scala | 12 ++++----- .../spark/sql/redis/RedisSourceRelation.scala | 3 +-- .../redis/stream/RedisXStreamSuite.scala | 20 +++++++------- 6 files changed, 35 insertions(+), 36 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala index 22bfe897..5eee2422 100644 --- a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala +++ b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala @@ -5,14 +5,14 @@ import redis.clients.jedis.exceptions.JedisConnectionException import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ object ConnectionPool { @transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, JedisPool] = new ConcurrentHashMap[RedisEndpoint, JedisPool]() def connect(re: RedisEndpoint): Jedis = { - val pool = pools.getOrElseUpdate(re, + val pool = pools.asScala.getOrElseUpdate(re, { val poolConfig: JedisPoolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(250) diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index 073edc24..12dda0ee 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -6,7 +6,7 @@ import org.apache.spark.SparkConf import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper, SafeEncoder} import redis.clients.jedis.{Jedis, Protocol} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** @@ -253,7 +253,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { */ private def getClusterNodes(initialHost: RedisEndpoint): Array[RedisNode] = { val conn = initialHost.connect() - val res = conn.clusterSlots().flatMap { + val res = conn.clusterSlots().asScala.flatMap { slotInfoObj => { val slotInfo = slotInfoObj.asInstanceOf[java.util.List[java.lang.Object]] val sPos = slotInfo.get(0).toString.toInt @@ -268,7 +268,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { * filter master. */ (0 until (slotInfo.size - 2)).map(i => { - val node = slotInfo(i + 2).asInstanceOf[java.util.List[java.lang.Object]] + val node = slotInfo.asScala(i + 2).asInstanceOf[java.util.List[java.lang.Object]] val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]]) val port = node.get(1).toString.toInt RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum, diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 18dd424d..324315ab 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -10,7 +10,7 @@ import org.apache.spark.rdd.RDD import redis.clients.jedis.{Jedis, ScanParams} import redis.clients.jedis.util.JedisClusterCRC16 -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.{ClassTag, classTag} @@ -52,7 +52,7 @@ class RedisKVRDD(prev: RDD[String], groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() val hashKeys = filterKeysByType(conn, nodeKeys, "hash") - val res = hashKeys.flatMap(conn.hgetAll).iterator + val res = hashKeys.flatMap(conn.hgetAll(_).asScala).iterator conn.close() res } @@ -81,7 +81,7 @@ class RedisListRDD(prev: RDD[String], groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() val setKeys = filterKeysByType(conn, nodeKeys, "set") - val res = setKeys.flatMap(conn.smembers).iterator + val res = setKeys.flatMap(conn.smembers(_).asScala).iterator conn.close() res } @@ -91,7 +91,7 @@ class RedisListRDD(prev: RDD[String], groupKeysByNode(nodes, keys).flatMap { case (node, nodeKeys) => val conn = node.endpoint.connect() val listKeys = filterKeysByType(conn, nodeKeys, "list") - val res = listKeys.flatMap(conn.lrange(_, 0, -1)).iterator + val res = listKeys.flatMap(conn.lrange(_, 0, -1).asScala).iterator conn.close() res } @@ -136,10 +136,10 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], val zsetKeys = filterKeysByType(conn, nodeKeys, "zset") val res = { if (classTag[T] == classTag[(String, Double)]) { - zsetKeys.flatMap(k => conn.zrangeWithScores(k, startPos, endPos)). + zsetKeys.flatMap(k => conn.zrangeWithScores(k, startPos, endPos).asScala). map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { - zsetKeys.flatMap(k => conn.zrange(k, startPos, endPos)).iterator + zsetKeys.flatMap(k => conn.zrange(k, startPos, endPos).asScala).iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") } @@ -158,10 +158,10 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], val zsetKeys = filterKeysByType(conn, nodeKeys, "zset") val res = { if (classTag[T] == classTag[(String, Double)]) { - zsetKeys.flatMap(k => conn.zrangeByScoreWithScores(k, startScore, endScore)). + zsetKeys.flatMap(k => conn.zrangeByScoreWithScores(k, startScore, endScore).asScala). map(tup => (tup.getElement, tup.getScore)).iterator } else if (classTag[T] == classTag[String]) { - zsetKeys.flatMap(k => conn.zrangeByScore(k, startScore, endScore)).iterator + zsetKeys.flatMap(k => conn.zrangeByScore(k, startScore, endScore).asScala).iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") } @@ -428,17 +428,17 @@ trait Keys { val endpoints = nodes.map(_.endpoint).distinct if (isRedisRegex(keyPattern)) { - endpoints.iterator.map { endpoint => + endpoints.iterator.flatMap { endpoint => val keys = new util.HashSet[String]() val conn = endpoint.connect() val params = new ScanParams().`match`(keyPattern).count(readWriteConfig.scanCount) - keys.addAll(scanKeys(conn, params).filter { key => + keys.addAll(scanKeys(conn, params).asScala.filter { key => val slot = JedisClusterCRC16.getSlot(key) slot >= sPos && slot <= ePos - }) + }.asJava) conn.close() - keys.iterator() - }.flatten + keys.iterator().asScala + } } else { val slot = JedisClusterCRC16.getSlot(keyPattern) if (slot >= sPos && slot <= ePos) Iterator(keyPattern) else Iterator() diff --git a/src/main/scala/com/redislabs/provider/redis/streaming/RedisStreamReceiver.scala b/src/main/scala/com/redislabs/provider/redis/streaming/RedisStreamReceiver.scala index b05d8dde..3cc1d068 100644 --- a/src/main/scala/com/redislabs/provider/redis/streaming/RedisStreamReceiver.scala +++ b/src/main/scala/com/redislabs/provider/redis/streaming/RedisStreamReceiver.scala @@ -11,7 +11,7 @@ import org.apache.spark.streaming.receiver.Receiver import org.spark_project.guava.util.concurrent.RateLimiter import redis.clients.jedis.{EntryID, Jedis, StreamEntry} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** * Receives messages from Redis Stream @@ -81,12 +81,12 @@ class RedisStreamReceiver(consumersConfig: Seq[ConsumerConfig], false, unackId) - val unackMessagesMap = response.map(e => (e.getKey, e.getValue)).toMap + val unackMessagesMap = response.asScala.map(e => (e.getKey, e.getValue)).toMap val entries = unackMessagesMap(conf.streamKey) if (entries.isEmpty) { continue = false } - storeAndAck(conf.streamKey, entries) + storeAndAck(conf.streamKey, entries.asScala) } } @@ -103,9 +103,9 @@ class RedisStreamReceiver(consumersConfig: Seq[ConsumerConfig], false, newMessId) - for (streamMessages <- response) { + for (streamMessages <- response.asScala) { val key = streamMessages.getKey - val entries = streamMessages.getValue + val entries = streamMessages.getValue.asScala storeAndAck(key, entries) } } @@ -128,7 +128,7 @@ class RedisStreamReceiver(consumersConfig: Seq[ConsumerConfig], def entriesToItems(key: String, entries: Seq[StreamEntry]): Seq[StreamItem] = { entries.map { e => val itemId = ItemId(e.getID.getTime, e.getID.getSequence) - StreamItem(key, itemId, e.getFields.toMap) + StreamItem(key, itemId, e.getFields.asScala.toMap) } } } diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala index 90b6feb9..dd868e62 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala @@ -16,7 +16,6 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SQLContext} import redis.clients.jedis.{PipelineBase, Protocol} -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ class RedisSourceRelation(override val sqlContext: SQLContext, @@ -72,7 +71,7 @@ class RedisSourceRelation(override val sqlContext: SQLContext, private val keysPatternOpt: Option[String] = parameters.get(SqlOptionKeysPattern) private val numPartitions = parameters.get(SqlOptionNumPartitions).map(_.toInt) .getOrElse(SqlOptionNumPartitionsDefault) - private val persistenceModel = parameters.getOrDefault(SqlOptionModel, SqlOptionModelHash) + private val persistenceModel = parameters.asJava.getOrDefault(SqlOptionModel, SqlOptionModelHash) private val persistence = RedisPersistence(persistenceModel) private val tableNameOpt: Option[String] = parameters.get(SqlOptionTableName) private val ttl = parameters.get(SqlOptionTTL).map(_.toInt).getOrElse(0) diff --git a/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala b/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala index 7b14a2e4..117c04e4 100644 --- a/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala @@ -10,7 +10,7 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.{Millis, Span} import redis.clients.jedis.EntryID -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ trait RedisXStreamSuite extends SparkStreamingRedisSuite with Matchers { @@ -40,9 +40,9 @@ trait RedisXStreamSuite extends SparkStreamingRedisSuite with Matchers { // write to stream withConnection(redisConfig.connectionForKey(streamKey)) { conn => - conn.xadd(streamKey, new EntryID(1, 0), Map("a" -> "1", "z" -> "4")) - conn.xadd(streamKey, new EntryID(1, 1), Map("b" -> "2")) - conn.xadd(streamKey, new EntryID(2, 0), Map("c" -> "3")) + conn.xadd(streamKey, new EntryID(1, 0), Map("a" -> "1", "z" -> "4").asJava) + conn.xadd(streamKey, new EntryID(1, 1), Map("b" -> "2").asJava) + conn.xadd(streamKey, new EntryID(2, 0), Map("c" -> "3").asJava) } ssc.start() @@ -85,9 +85,9 @@ trait RedisXStreamSuite extends SparkStreamingRedisSuite with Matchers { // write to stream withConnection(redisConfig.connectionForKey(streamKey)) { conn => - conn.xadd(streamKey, new EntryID(1, 0), Map("a" -> "1", "z" -> "4")) - conn.xadd(streamKey, new EntryID(1, 1), Map("b" -> "2")) - conn.xadd(streamKey, new EntryID(2, 0), Map("c" -> "3")) + conn.xadd(streamKey, new EntryID(1, 0), Map("a" -> "1", "z" -> "4").asJava) + conn.xadd(streamKey, new EntryID(1, 1), Map("b" -> "2").asJava) + conn.xadd(streamKey, new EntryID(2, 0), Map("c" -> "3").asJava) } ssc.start() @@ -136,11 +136,11 @@ trait RedisXStreamSuite extends SparkStreamingRedisSuite with Matchers { // write to stream withConnection(redisConfig.connectionForKey(stream1Key)) { conn => - conn.xadd(stream1Key, new EntryID(1, 0), Map("a" -> "1", "z" -> "4")) + conn.xadd(stream1Key, new EntryID(1, 0), Map("a" -> "1", "z" -> "4").asJava) } withConnection(redisConfig.connectionForKey(stream2Key)) { conn => - conn.xadd(stream2Key, new EntryID(1, 1), Map("b" -> "2")) - conn.xadd(stream2Key, new EntryID(2, 0), Map("c" -> "3")) + conn.xadd(stream2Key, new EntryID(1, 1), Map("b" -> "2").asJava) + conn.xadd(stream2Key, new EntryID(2, 0), Map("c" -> "3").asJava) } ssc.start()