Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add Scala 2.12 and drop 2.10 support #16

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ version := "0.2.3-SNAPSHOT"

scalaVersion := "2.11.8"

crossScalaVersions := Seq("2.10.5", "2.11.8")
crossScalaVersions := Seq("2.11.11", "2.12.2")

libraryDependencies ++= List(
"com.twitter" %% "finagle-core" % "6.43.0",
"org.apache.kafka" %% "kafka" % "0.8.2.1"
"com.twitter" %% "finagle-core" % "6.44.0",
"org.apache.kafka" %% "kafka" % "0.10.2.1"
exclude("com.101tec", "zkclient")
exclude("com.yammer.metrics", "metrics-core")
exclude("net.sf.jopt-simple", "jopt-simple")
exclude("org.apache.zookeeper", "zookeeper")
exclude("org.xerial.snappy", "snappy-java"),
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
"org.scalatest" %% "scalatest" % "3.0.3" % "test",
// dependencies for kafka-test
"junit" % "junit" % "4.11" % "test",
"org.apache.curator" % "curator-test" % "2.11.0" % "test",
"com.101tec" % "zkclient" % "0.8" % "test",
"com.yammer.metrics" % "metrics-core" % "2.2.0" % "test",
"org.apache.kafka" %% "kafka" % "0.8.2.1" % "test" classifier "test"
"org.apache.kafka" %% "kafka" % "0.10.2.1" % "test" classifier "test"
)

publishTo := {
Expand Down
19 changes: 11 additions & 8 deletions src/test/scala/okapies/finagle/kafka/ClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import org.scalatest.matchers._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Span, Seconds, Millis}

import java.io.File
import java.util.Properties
import java.nio.charset.Charset

import com.twitter.util.Await
import _root_.kafka.admin.AdminUtils
import _root_.kafka.utils.{Utils, TestUtils, ZKStringSerializer}
import _root_.kafka.utils.{TestUtils, ZkUtils}
import _root_.kafka.server.{KafkaConfig, KafkaServer}
import org.apache.curator.test.TestingServer
import org.apache.kafka.common.utils.Utils
import org.jboss.netty.buffer.ChannelBuffers
import org.I0Itec.zkclient.ZkClient

Expand All @@ -31,6 +33,7 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite =>

var zkServer: TestingServer = _
var zkClient: ZkClient = _
var zkUtils: ZkUtils = _
var kafkaServer: KafkaServer = _
var kafkaConn: String = _
var kafkaConfig: Properties = _
Expand All @@ -40,8 +43,7 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite =>

val zkConn = zkServer.getConnectString

kafkaConfig = TestUtils.createBrokerConfig(1)
kafkaConfig.put("zookeeper.connect", zkConn)
kafkaConfig = TestUtils.createBrokerConfig(1, zkConn)
kafkaConfig.put("host.name", "127.0.0.1")

// https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/
Expand All @@ -50,15 +52,16 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite =>
kafkaConn = s"""${kafkaConfig.get("host.name")}:${kafkaConfig.get("port")}"""
kafkaServer = TestUtils.createServer(new KafkaConfig(kafkaConfig))

zkClient = new ZkClient(zkConn, 5000, 5000, ZKStringSerializer)
zkClient = ZkUtils.createZkClient(zkConn, 5000, 5000)
zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
}

override def afterAll {
kafkaServer.shutdown()
Utils.rm(kafkaConfig.getProperty("log.dir"))
Utils.delete(new File(kafkaConfig.getProperty("log.dir")))
zkClient.close()
zkServer.stop()
Utils.rm(zkServer.getTempDirectory)
Utils.delete(zkServer.getTempDirectory)
}

}
Expand Down Expand Up @@ -89,10 +92,10 @@ with KafkaTest {
client = Kafka.newRichClient(kafkaConn)

// create the topic for testing
AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties)
AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties)

// Make sure the topic leader is available before running tests
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
}

"A kafka client" should "return metadata" in {
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/okapies/finagle/kafka/ServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,14 @@ with BeforeAndAfterEach {
var server: ListeningServer = _
var rand = new scala.util.Random()

private def newAddr:String = {
private def newAddr: String = {
s":${10000 + rand.nextInt(1000)}"
}

override def beforeEach = {
val addr = newAddr
server = Kafka.serve(addr, service)
client = Kafka.newService(server)
client = Kafka.newService(addr)
}

override def afterEach = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.jboss.netty.buffer.ChannelBuffers

class MessageSetTest extends FlatSpec with Matchers {

import kafka.common.LongRef
import kafka.message.{
ByteBufferMessageSet,
Message => KafkaMessage,
Expand Down Expand Up @@ -65,9 +66,9 @@ class MessageSetTest extends FlatSpec with Matchers {
it should "decode bytes into a MessageSet" in {
val kafkaMsgs1 = new ByteBufferMessageSet(
NoCompressionCodec,
new AtomicLong(1),
new KafkaMessage("value1".getBytes(utf8), "key1".getBytes(utf8)),
new KafkaMessage("value2".getBytes(utf8), "key2".getBytes(utf8))
new LongRef(1L),
kafkaMessage("value1".getBytes(utf8), "key1".getBytes(utf8)),
kafkaMessage("value2".getBytes(utf8), "key2".getBytes(utf8))
)
val size1 = kafkaMsgs1.sizeInBytes
val buf1 = ByteBuffer.allocateDirect(4 /* Size */ + size1)
Expand All @@ -93,9 +94,9 @@ class MessageSetTest extends FlatSpec with Matchers {
it should "decode bytes including a partial message into a MessageSet" in {
val kafkaMsgs1 = new ByteBufferMessageSet(
NoCompressionCodec,
new AtomicLong(1),
new KafkaMessage("value1".getBytes(utf8), "key1".getBytes(utf8)),
new KafkaMessage("value2".getBytes(utf8), "key2".getBytes(utf8))
new LongRef(1L),
kafkaMessage("value1".getBytes(utf8), "key1".getBytes(utf8)),
kafkaMessage("value2".getBytes(utf8), "key2".getBytes(utf8))
)
val size1 = kafkaMsgs1.sizeInBytes - 5 // make 2nd message partial
val buf1 = ByteBuffer.allocateDirect(4 /* Size */ + size1)
Expand Down
14 changes: 6 additions & 8 deletions src/test/scala/okapies/finagle/kafka/protocol/MessageTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class MessageTest extends FlatSpec with Matchers {
Some(ChannelBuffers.wrappedBuffer("key1".getBytes(utf8))), // key
0 // codec
)
val kafkaMsg1 = new KafkaMessage(msg1.underlying.toByteBuffer)
val kafkaMsg1 = new KafkaMessage(buffer = msg1.underlying.toByteBuffer)

assert(kafkaMsg1.checksum === msg1.crc)
assert(kafkaMsg1.magic === msg1.magicByte)
Expand All @@ -35,7 +35,7 @@ class MessageTest extends FlatSpec with Matchers {
None, // key
0 // codec
)
val kafkaMsg2 = new KafkaMessage(msg2.underlying.toByteBuffer)
val kafkaMsg2 = new KafkaMessage(buffer = msg2.underlying.toByteBuffer)

assert(kafkaMsg2.checksum === msg2.crc)
assert(kafkaMsg2.magic === msg2.magicByte)
Expand All @@ -45,10 +45,9 @@ class MessageTest extends FlatSpec with Matchers {
}

it should "decode a no compressed message" in {
val kafkaMsg1 = new KafkaMessage(
"value1".getBytes(utf8), // value
"key1".getBytes(utf8), // key
NoCompressionCodec // codec
val kafkaMsg1 = kafkaMessage(
bytes = "value1".getBytes(utf8),
key = "key1".getBytes(utf8)
)
val msg1 = Message(ChannelBuffers.wrappedBuffer(kafkaMsg1.buffer))

Expand All @@ -59,8 +58,7 @@ class MessageTest extends FlatSpec with Matchers {
assert(msg1.value.toString(utf8) === "value1")

val kafkaMsg2 = new KafkaMessage(
"value2".getBytes(utf8), // value
NoCompressionCodec // codec
bytes = "value2".getBytes(utf8)
)
val msg2 = Message(ChannelBuffers.wrappedBuffer(kafkaMsg2.buffer))

Expand Down
12 changes: 12 additions & 0 deletions src/test/scala/okapies/finagle/kafka/util/Helper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import java.nio.charset.Charset
object Helper {

import scala.language.implicitConversions
import kafka.message.{
Message => KafkaMessage,
NoCompressionCodec
}

val utf8 = Charset.forName("UTF-8")

Expand All @@ -22,4 +26,12 @@ object Helper {
}
}

def kafkaMessage(bytes: Array[Byte], key: Array[Byte]) = new KafkaMessage(
bytes = bytes,
key = key,
timestamp = KafkaMessage.NoTimestamp,
codec = NoCompressionCodec,
magicValue = KafkaMessage.CurrentMagicValue
)

}