From b0ab7dab0b63022fa9751f95879a779dc0d86f1c Mon Sep 17 00:00:00 2001 From: Yuta Okamoto Date: Wed, 31 May 2017 08:06:40 +0900 Subject: [PATCH] add Scala 2.12 support (still has a compile error) --- build.sbt | 10 +++++----- .../okapies/finagle/kafka/ClientTest.scala | 19 +++++++++++-------- .../okapies/finagle/kafka/ServerTest.scala | 4 ++-- .../kafka/protocol/MessageSetTest.scala | 13 +++++++------ .../finagle/kafka/protocol/MessageTest.scala | 14 ++++++-------- .../okapies/finagle/kafka/util/Helper.scala | 12 ++++++++++++ 6 files changed, 43 insertions(+), 29 deletions(-) diff --git a/build.sbt b/build.sbt index df69583..1bb7c6e 100644 --- a/build.sbt +++ b/build.sbt @@ -10,23 +10,23 @@ version := "0.2.3-SNAPSHOT" scalaVersion := "2.11.8" -crossScalaVersions := Seq("2.10.5", "2.11.8") +crossScalaVersions := Seq("2.11.11", "2.12.2") libraryDependencies ++= List( - "com.twitter" %% "finagle-core" % "6.43.0", - "org.apache.kafka" %% "kafka" % "0.8.2.1" + "com.twitter" %% "finagle-core" % "6.44.0", + "org.apache.kafka" %% "kafka" % "0.10.2.1" exclude("com.101tec", "zkclient") exclude("com.yammer.metrics", "metrics-core") exclude("net.sf.jopt-simple", "jopt-simple") exclude("org.apache.zookeeper", "zookeeper") exclude("org.xerial.snappy", "snappy-java"), - "org.scalatest" %% "scalatest" % "2.2.4" % "test", + "org.scalatest" %% "scalatest" % "3.0.3" % "test", // dependencies for kafka-test "junit" % "junit" % "4.11" % "test", "org.apache.curator" % "curator-test" % "2.11.0" % "test", "com.101tec" % "zkclient" % "0.8" % "test", "com.yammer.metrics" % "metrics-core" % "2.2.0" % "test", - "org.apache.kafka" %% "kafka" % "0.8.2.1" % "test" classifier "test" + "org.apache.kafka" %% "kafka" % "0.10.2.1" % "test" classifier "test" ) publishTo := { diff --git a/src/test/scala/okapies/finagle/kafka/ClientTest.scala b/src/test/scala/okapies/finagle/kafka/ClientTest.scala index 2c56f75..4667c57 100644 --- a/src/test/scala/okapies/finagle/kafka/ClientTest.scala +++ b/src/test/scala/okapies/finagle/kafka/ClientTest.scala @@ -5,14 +5,16 @@ import org.scalatest.matchers._ import org.scalatest.concurrent.Eventually import org.scalatest.time.{Span, Seconds, Millis} +import java.io.File import java.util.Properties import java.nio.charset.Charset import com.twitter.util.Await import _root_.kafka.admin.AdminUtils -import _root_.kafka.utils.{Utils, TestUtils, ZKStringSerializer} +import _root_.kafka.utils.{TestUtils, ZkUtils} import _root_.kafka.server.{KafkaConfig, KafkaServer} import org.apache.curator.test.TestingServer +import org.apache.kafka.common.utils.Utils import org.jboss.netty.buffer.ChannelBuffers import org.I0Itec.zkclient.ZkClient @@ -31,6 +33,7 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite => var zkServer: TestingServer = _ var zkClient: ZkClient = _ + var zkUtils: ZkUtils = _ var kafkaServer: KafkaServer = _ var kafkaConn: String = _ var kafkaConfig: Properties = _ @@ -40,8 +43,7 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite => val zkConn = zkServer.getConnectString - kafkaConfig = TestUtils.createBrokerConfig(1) - kafkaConfig.put("zookeeper.connect", zkConn) + kafkaConfig = TestUtils.createBrokerConfig(1, zkConn) kafkaConfig.put("host.name", "127.0.0.1") // https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/ @@ -50,15 +52,16 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite => kafkaConn = s"""${kafkaConfig.get("host.name")}:${kafkaConfig.get("port")}""" kafkaServer = TestUtils.createServer(new KafkaConfig(kafkaConfig)) - zkClient = new ZkClient(zkConn, 5000, 5000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConn, 5000, 5000) + zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false) } override def afterAll { kafkaServer.shutdown() - Utils.rm(kafkaConfig.getProperty("log.dir")) + Utils.delete(new File(kafkaConfig.getProperty("log.dir"))) zkClient.close() zkServer.stop() - Utils.rm(zkServer.getTempDirectory) + Utils.delete(zkServer.getTempDirectory) } } @@ -89,10 +92,10 @@ with KafkaTest { client = Kafka.newRichClient(kafkaConn) // create the topic for testing - AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties) + AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties) // Make sure the topic leader is available before running tests - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) } "A kafka client" should "return metadata" in { diff --git a/src/test/scala/okapies/finagle/kafka/ServerTest.scala b/src/test/scala/okapies/finagle/kafka/ServerTest.scala index 9cea2c8..326528c 100644 --- a/src/test/scala/okapies/finagle/kafka/ServerTest.scala +++ b/src/test/scala/okapies/finagle/kafka/ServerTest.scala @@ -120,14 +120,14 @@ with BeforeAndAfterEach { var server: ListeningServer = _ var rand = new scala.util.Random() - private def newAddr:String = { + private def newAddr: String = { s":${10000 + rand.nextInt(1000)}" } override def beforeEach = { val addr = newAddr server = Kafka.serve(addr, service) - client = Kafka.newService(server) + client = Kafka.newService(addr) } override def afterEach = { diff --git a/src/test/scala/okapies/finagle/kafka/protocol/MessageSetTest.scala b/src/test/scala/okapies/finagle/kafka/protocol/MessageSetTest.scala index ae060d2..b95f122 100644 --- a/src/test/scala/okapies/finagle/kafka/protocol/MessageSetTest.scala +++ b/src/test/scala/okapies/finagle/kafka/protocol/MessageSetTest.scala @@ -11,6 +11,7 @@ import org.jboss.netty.buffer.ChannelBuffers class MessageSetTest extends FlatSpec with Matchers { + import kafka.common.LongRef import kafka.message.{ ByteBufferMessageSet, Message => KafkaMessage, @@ -65,9 +66,9 @@ class MessageSetTest extends FlatSpec with Matchers { it should "decode bytes into a MessageSet" in { val kafkaMsgs1 = new ByteBufferMessageSet( NoCompressionCodec, - new AtomicLong(1), - new KafkaMessage("value1".getBytes(utf8), "key1".getBytes(utf8)), - new KafkaMessage("value2".getBytes(utf8), "key2".getBytes(utf8)) + new LongRef(1L), + kafkaMessage("value1".getBytes(utf8), "key1".getBytes(utf8)), + kafkaMessage("value2".getBytes(utf8), "key2".getBytes(utf8)) ) val size1 = kafkaMsgs1.sizeInBytes val buf1 = ByteBuffer.allocateDirect(4 /* Size */ + size1) @@ -93,9 +94,9 @@ class MessageSetTest extends FlatSpec with Matchers { it should "decode bytes including a partial message into a MessageSet" in { val kafkaMsgs1 = new ByteBufferMessageSet( NoCompressionCodec, - new AtomicLong(1), - new KafkaMessage("value1".getBytes(utf8), "key1".getBytes(utf8)), - new KafkaMessage("value2".getBytes(utf8), "key2".getBytes(utf8)) + new LongRef(1L), + kafkaMessage("value1".getBytes(utf8), "key1".getBytes(utf8)), + kafkaMessage("value2".getBytes(utf8), "key2".getBytes(utf8)) ) val size1 = kafkaMsgs1.sizeInBytes - 5 // make 2nd message partial val buf1 = ByteBuffer.allocateDirect(4 /* Size */ + size1) diff --git a/src/test/scala/okapies/finagle/kafka/protocol/MessageTest.scala b/src/test/scala/okapies/finagle/kafka/protocol/MessageTest.scala index 729ed72..65c0fe9 100644 --- a/src/test/scala/okapies/finagle/kafka/protocol/MessageTest.scala +++ b/src/test/scala/okapies/finagle/kafka/protocol/MessageTest.scala @@ -22,7 +22,7 @@ class MessageTest extends FlatSpec with Matchers { Some(ChannelBuffers.wrappedBuffer("key1".getBytes(utf8))), // key 0 // codec ) - val kafkaMsg1 = new KafkaMessage(msg1.underlying.toByteBuffer) + val kafkaMsg1 = new KafkaMessage(buffer = msg1.underlying.toByteBuffer) assert(kafkaMsg1.checksum === msg1.crc) assert(kafkaMsg1.magic === msg1.magicByte) @@ -35,7 +35,7 @@ class MessageTest extends FlatSpec with Matchers { None, // key 0 // codec ) - val kafkaMsg2 = new KafkaMessage(msg2.underlying.toByteBuffer) + val kafkaMsg2 = new KafkaMessage(buffer = msg2.underlying.toByteBuffer) assert(kafkaMsg2.checksum === msg2.crc) assert(kafkaMsg2.magic === msg2.magicByte) @@ -45,10 +45,9 @@ class MessageTest extends FlatSpec with Matchers { } it should "decode a no compressed message" in { - val kafkaMsg1 = new KafkaMessage( - "value1".getBytes(utf8), // value - "key1".getBytes(utf8), // key - NoCompressionCodec // codec + val kafkaMsg1 = kafkaMessage( + bytes = "value1".getBytes(utf8), + key = "key1".getBytes(utf8) ) val msg1 = Message(ChannelBuffers.wrappedBuffer(kafkaMsg1.buffer)) @@ -59,8 +58,7 @@ class MessageTest extends FlatSpec with Matchers { assert(msg1.value.toString(utf8) === "value1") val kafkaMsg2 = new KafkaMessage( - "value2".getBytes(utf8), // value - NoCompressionCodec // codec + bytes = "value2".getBytes(utf8) ) val msg2 = Message(ChannelBuffers.wrappedBuffer(kafkaMsg2.buffer)) diff --git a/src/test/scala/okapies/finagle/kafka/util/Helper.scala b/src/test/scala/okapies/finagle/kafka/util/Helper.scala index dec1c9f..7bb0185 100644 --- a/src/test/scala/okapies/finagle/kafka/util/Helper.scala +++ b/src/test/scala/okapies/finagle/kafka/util/Helper.scala @@ -8,6 +8,10 @@ import java.nio.charset.Charset object Helper { import scala.language.implicitConversions + import kafka.message.{ + Message => KafkaMessage, + NoCompressionCodec + } val utf8 = Charset.forName("UTF-8") @@ -22,4 +26,12 @@ object Helper { } } + def kafkaMessage(bytes: Array[Byte], key: Array[Byte]) = new KafkaMessage( + bytes = bytes, + key = key, + timestamp = KafkaMessage.NoTimestamp, + codec = NoCompressionCodec, + magicValue = KafkaMessage.CurrentMagicValue + ) + }