Skip to content

Commit

Permalink
Merge master - prepare rc2
Browse files Browse the repository at this point in the history
  • Loading branch information
simonsouter committed Aug 12, 2016
1 parent 3d82b15 commit 8177b26
Show file tree
Hide file tree
Showing 18 changed files with 489 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.testkit.TestActor.AutoPilot
import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
import cakesolutions.kafka.akka.KafkaConsumerActor.Confirm
import cakesolutions.kafka.akka.KafkaConsumerActor.Subscribe.AutoPartition
import cakesolutions.kafka.testkit.TestUtils
import cakesolutions.kafka.{KafkaConsumer, KafkaProducer, KafkaProducerRecord}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
Expand All @@ -15,6 +14,7 @@ import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.slf4j.LoggerFactory

import scala.concurrent.Promise
import scala.util.Random

/**
* Ad hoc performance test for validating async consumer performance. Pass environment variable KAFKA with contact point for
Expand Down Expand Up @@ -52,8 +52,10 @@ class KafkaConsumerActorPerfSpec(system_ : ActorSystem)
def actorConf: KafkaConsumerActor.Conf =
KafkaConsumerActor.Conf(config.getConfig("consumer"))

private def randomString: String = Random.alphanumeric.take(5).mkString("")

"KafkaConsumerActor with single partition topic" should "perform" in {
val topic = TestUtils.randomString(5)
val topic = randomString
val totalMessages = 100000

val producerConf = KafkaProducer.Conf(config.getConfig("producer"), new StringSerializer, new StringSerializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.testkit.TestActor.AutoPilot
import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
import cakesolutions.kafka.akka.KafkaConsumerActor.Confirm
import cakesolutions.kafka.akka.KafkaConsumerActor.Subscribe.AutoPartition
import cakesolutions.kafka.testkit.TestUtils
import cakesolutions.kafka.{KafkaConsumer, KafkaProducer, KafkaProducerRecord}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
Expand All @@ -15,6 +14,7 @@ import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.slf4j.LoggerFactory

import scala.concurrent.Promise
import scala.util.Random

/**
* Ad hoc performance test for validating async consumer performance. Pass environment variable KAFKA with contact point for
Expand Down Expand Up @@ -42,6 +42,8 @@ class KafkaE2EActorPerfSpec(system_ : ActorSystem)

val msg1k = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/1k.txt")).mkString

private def randomString: String = Random.alphanumeric.take(5).mkString("")

val consumerConf: KafkaConsumer.Conf[String, String] = {
KafkaConsumer.Conf(config.getConfig("consumer"),
new StringDeserializer,
Expand All @@ -55,8 +57,8 @@ class KafkaE2EActorPerfSpec(system_ : ActorSystem)
val producerConf = KafkaProducer.Conf(config.getConfig("producer"), new StringSerializer, new StringSerializer)

"KafkaConsumerActor to KafkaProducer with async commit" should "perform" in {
val sourceTopic = TestUtils.randomString(5)
val targetTopic = TestUtils.randomString(5)
val sourceTopic = randomString
val targetTopic = randomString
val totalMessages = 100000

//For loading the source topic with test data
Expand Down
36 changes: 16 additions & 20 deletions akka/src/main/scala/cakesolutions/kafka/akka/ConsumerRecords.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ import scala.reflect.runtime.universe.TypeTag
* Helper functions for [[ConsumerRecords]].
*/
object ConsumerRecords {
type Pair[Key, Value] = (Option[Key], Value)
type Partition = (String, Int)

/**
* Create consumer records for a single partition from values only.
*
Expand All @@ -24,7 +21,7 @@ object ConsumerRecords {
* The offsets will contain only one partition.
* The partition offset will be set according to the size of the given sequence.
*/
def fromValues[Key >: Null : TypeTag, Value: TypeTag](partition: Partition, values: Seq[Value]): ConsumerRecords[Key, Value] =
def fromValues[Key >: Null : TypeTag, Value: TypeTag](partition: TopicPartition, values: Seq[Value]): ConsumerRecords[Key, Value] =
fromPairs(partition, values.map(None -> _))

/**
Expand All @@ -36,7 +33,7 @@ object ConsumerRecords {
* The offsets will contain only one partition.
* The partition offset will be set according to size of the given sequence.
*/
def fromPairs[Key >: Null : TypeTag, Value: TypeTag](partition: Partition, pairs: Seq[Pair[Key, Value]]): ConsumerRecords[Key, Value] =
def fromPairs[Key >: Null : TypeTag, Value: TypeTag](partition: TopicPartition, pairs: Seq[(Option[Key], Value)]): ConsumerRecords[Key, Value] =
fromMap(Map(partition -> pairs))

/**
Expand All @@ -47,18 +44,17 @@ object ConsumerRecords {
*
* The partition offsets will be set according to the number of messages in a partition.
*/
def fromMap[Key >: Null : TypeTag, Value: TypeTag](values: Map[Partition, Seq[Pair[Key, Value]]]): ConsumerRecords[Key, Value] = {
def createConsumerRecords(topic: String, partition: Int, pairs: Seq[Pair[Key, Value]]) =
def fromMap[Key >: Null : TypeTag, Value: TypeTag](values: Map[TopicPartition, Seq[(Option[Key], Value)]]): ConsumerRecords[Key, Value] = {
def createConsumerRecords(topicPartition: TopicPartition, pairs: Seq[(Option[Key], Value)]) =
pairs.zipWithIndex.map {
case ((key, value), offset) =>
new JConsumerRecord[Key, Value](topic, partition, offset, key.orNull, value)
new JConsumerRecord[Key, Value](topicPartition.topic(), topicPartition.partition(), offset, key.orNull, value)
}

val recordsMap = values.map {
case ((topic, partition), pairs) =>
val tp = new TopicPartition(topic, partition)
val rs = createConsumerRecords(topic, partition, pairs)
tp -> rs
case (topicPartition, pairs) =>
val rs = createConsumerRecords(topicPartition, pairs)
topicPartition -> rs
}

val offsets = Offsets(recordsMap.mapValues(_.maxBy(_.offset()).offset()))
Expand Down Expand Up @@ -107,10 +103,10 @@ object ConsumerRecords {
* @tparam Key type of the key in records
* @tparam Value type of the value in records
*/
final case class ConsumerRecords[Key: TypeTag, Value: TypeTag](offsets: Offsets, records: JConsumerRecords[Key, Value])
extends TypeTagged[ConsumerRecords[Key, Value]] with HasOffsets {

import ConsumerRecords.Pair
final case class ConsumerRecords[Key: TypeTag, Value: TypeTag](
offsets: Offsets,
records: JConsumerRecords[Key, Value]
) extends TypeTagged[ConsumerRecords[Key, Value]] with HasOffsets {

/**
* Convert to Kafka's `ProducerRecord`s.
Expand All @@ -133,20 +129,20 @@ final case class ConsumerRecords[Key: TypeTag, Value: TypeTag](offsets: Offsets,
/**
* All the records as a list.
*/
val recordsList: List[JConsumerRecord[Key, Value]] = records.asScala.toList
def recordsList: List[JConsumerRecord[Key, Value]] = records.asScala.toList

/**
* All the keys and values as a sequence.
*/
val pairs: Seq[Pair[Key, Value]] = recordsList.map(r => (Option(r.key()), r.value()))
def pairs: Seq[(Option[Key], Value)] = recordsList.map(r => (Option(r.key()), r.value()))

/**
* All the values as a sequence.
*/
val values: Seq[Value] = recordsList.map(_.value())
def values: Seq[Value] = recordsList.map(_.value())

/**
* The number of records.
*/
val size: Int = records.count()
def size: Int = records.count()
}
72 changes: 36 additions & 36 deletions akka/src/main/scala/cakesolutions/kafka/akka/ProducerRecords.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ object ProducerRecords {
values: Seq[Value],
successResponse: Option[Any],
failureResponse: Option[Any]
): ProducerRecords[Nothing, Value] = {
val records = values.map(value => KafkaProducerRecord(topic, value))
ProducerRecords[Nothing, Value](records, successResponse, failureResponse)
}
): ProducerRecords[Nothing, Value] =
ProducerRecords[Nothing, Value](
KafkaProducerRecord.fromValues(topic, values),
successResponse,
failureResponse
)

/**
* Create producer records from a single key and multiple values.
Expand All @@ -45,11 +47,12 @@ object ProducerRecords {
values: Seq[Value],
successResponse: Option[Any],
failureResponse: Option[Any]
): ProducerRecords[Key, Value] = {

val records = values.map(value => KafkaProducerRecord(topic, key, value))
ProducerRecords(records, successResponse, failureResponse)
}
): ProducerRecords[Key, Value] =
ProducerRecords(
KafkaProducerRecord.fromValuesWithKey(topic, key, values),
successResponse,
failureResponse
)

/**
* Create producer records from topics and values.
Expand All @@ -63,13 +66,12 @@ object ProducerRecords {
valuesWithTopic: Seq[(String, Value)],
successResponse: Option[Any],
failureResponse: Option[Any]
): ProducerRecords[Nothing, Value] = {

val records = valuesWithTopic.map {
case (topic, value) => KafkaProducerRecord(topic, value)
}
ProducerRecords[Nothing, Value](records, successResponse)
}
): ProducerRecords[Nothing, Value] =
ProducerRecords[Nothing, Value](
KafkaProducerRecord.fromValuesWithTopic(valuesWithTopic),
successResponse,
failureResponse
)

/**
* Create producer records from key-value pairs.
Expand All @@ -85,13 +87,12 @@ object ProducerRecords {
keyValues: Seq[(Key, Value)],
successResponse: Option[Any],
failureResponse: Option[Any]
): ProducerRecords[Key, Value] = {

val records = keyValues.map {
case (key, value) => KafkaProducerRecord(topic, key, value)
}
ProducerRecords(records, successResponse, failureResponse)
}
): ProducerRecords[Key, Value] =
ProducerRecords(
KafkaProducerRecord.fromKeyValues(topic, keyValues),
successResponse,
failureResponse
)

/**
* Create producer records from topic, key, and value triples.
Expand All @@ -104,13 +105,12 @@ object ProducerRecords {
keyValuesWithTopic: Seq[(String, Key, Value)],
successResponse: Option[Any],
failureResponse: Option[Any]
): ProducerRecords[Key, Value] = {

val records = keyValuesWithTopic.map {
case (topic, key, value) => KafkaProducerRecord(topic, key, value)
}
ProducerRecords(records, successResponse, failureResponse)
}
): ProducerRecords[Key, Value] =
ProducerRecords(
KafkaProducerRecord.fromKeyValuesWithTopic(keyValuesWithTopic),
successResponse,
failureResponse
)

/**
* Convert consumer records to key-value pairs.
Expand All @@ -124,12 +124,12 @@ object ProducerRecords {
topic: String,
consumerRecords: ConsumerRecords[Key, Value],
failureResponse: Option[Any]
) =
ProducerRecords(
consumerRecords.toProducerRecords(topic),
Some(consumerRecords.offsets),
Some(failureResponse)
)
): ProducerRecords[Key, Value] =
ProducerRecords(
consumerRecords.toProducerRecords(topic),
Some(consumerRecords.offsets),
Some(failureResponse)
)

/**
* Create an extractor for pattern matching any value with a specific [[ProducerRecords]] type.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package cakesolutions.kafka.akka

import cakesolutions.kafka.KafkaTopicPartition
import org.scalatest.{FlatSpecLike, Inside, Matchers}

class ConsumerRecordsSpec extends FlatSpecLike with Matchers with Inside {

val partition = ("sometopic", 0)
val partition = KafkaTopicPartition("sometopic", 0)
val knownInput: ConsumerRecords[String, Int] = ConsumerRecords.fromPairs(partition, Seq(Some("foo") -> 1))
val partiallyKnownInput: ConsumerRecords[_, _] = knownInput
val anyInput: Any = knownInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package cakesolutions.kafka.akka
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import cakesolutions.kafka.akka.KafkaConsumerActor.Subscribe.{ManualOffset, ManualPartition}
import cakesolutions.kafka.akka.KafkaConsumerActor.{Confirm, Subscribe, TriggerConsumerFailure, Unsubscribe}
import cakesolutions.kafka.testkit.TestUtils
import cakesolutions.kafka.{KafkaConsumer, KafkaProducerRecord, KafkaTopicPartition}
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
import scala.util.Random

class KafkaConsumerActorRecoverySpec(_system: ActorSystem) extends KafkaIntSpec(_system) {

Expand All @@ -29,7 +29,9 @@ class KafkaConsumerActorRecoverySpec(_system: ActorSystem) extends KafkaIntSpec(
autoOffsetReset = OffsetResetStrategy.EARLIEST)
}

private def randomTopicPartition = KafkaTopicPartition(TestUtils.randomString(5), 0)
private def randomString: String = Random.alphanumeric.take(5).mkString("")

private def randomTopicPartition = KafkaTopicPartition(randomString, 0)

"KafkaConsumerActor with manual commit" should "recover to a commit point on resubscription" in {
val topicPartition = randomTopicPartition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package cakesolutions.kafka.akka
import akka.actor.ActorSystem
import cakesolutions.kafka.akka.KafkaConsumerActor.Subscribe.AutoPartition
import cakesolutions.kafka.akka.KafkaConsumerActor.{Confirm, Subscribe, Unsubscribe}
import cakesolutions.kafka.testkit.TestUtils
import cakesolutions.kafka.{KafkaConsumer, KafkaProducer, KafkaProducerRecord, KafkaTopicPartition}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
import scala.util.Random

object KafkaConsumerActorSpec {
def kafkaProducer(kafkaHost: String, kafkaPort: Int): KafkaProducer[String, String] =
Expand All @@ -25,12 +25,14 @@ class KafkaConsumerActorSpec(system_ : ActorSystem) extends KafkaIntSpec(system_

val log = LoggerFactory.getLogger(getClass)

private def randomString: String = Random.alphanumeric.take(5).mkString("")

val consumerConfFromConfig: KafkaConsumer.Conf[String, String] = {
KafkaConsumer.Conf(
ConfigFactory.parseString(
s"""
| bootstrap.servers = "localhost:$kafkaPort",
| group.id = "${TestUtils.randomString(5)}"
| group.id = "$randomString"
| enable.auto.commit = false
| auto.offset.reset = "earliest"
""".stripMargin), new StringDeserializer, new StringDeserializer)
Expand All @@ -41,7 +43,7 @@ class KafkaConsumerActorSpec(system_ : ActorSystem) extends KafkaIntSpec(system_
new StringDeserializer,
new StringDeserializer,
bootstrapServers = s"localhost:$kafkaPort",
groupId = TestUtils.randomString(5),
groupId = randomString,
enableAutoCommit = false,
autoOffsetReset = OffsetResetStrategy.EARLIEST
)
Expand All @@ -59,7 +61,7 @@ class KafkaConsumerActorSpec(system_ : ActorSystem) extends KafkaIntSpec(system_
ConfigFactory.parseString(
s"""
| bootstrap.servers = "localhost:$kafkaPort",
| group.id = "${TestUtils.randomString(5)}"
| group.id = "$randomString"
| enable.auto.commit = false
| auto.offset.reset = "earliest"
| topics = ["$topic"]
Expand All @@ -74,7 +76,7 @@ class KafkaConsumerActorSpec(system_ : ActorSystem) extends KafkaIntSpec(system_
(List(consumerConfFromConfig, consumerConf) zip List(KafkaConsumerActor.Conf(), actorConfFromConfig))
.foreach {
case (consumerConfig, actorConf) =>
val topic = TestUtils.randomString(5)
val topic = randomString

val producer = kafkaProducer("localhost", kafkaPort)
producer.send(KafkaProducerRecord(topic, None, "value"))
Expand All @@ -93,7 +95,7 @@ class KafkaConsumerActorSpec(system_ : ActorSystem) extends KafkaIntSpec(system_
}

"KafkaConsumerActor configured via props" should "consume a sequence of messages" in {
val topic = TestUtils.randomString(5)
val topic = randomString

val producer = kafkaProducer("localhost", kafkaPort)
producer.send(KafkaProducerRecord(topic, None, "value"))
Expand All @@ -112,7 +114,7 @@ class KafkaConsumerActorSpec(system_ : ActorSystem) extends KafkaIntSpec(system_
}

"KafkaConsumerActor configured in manual partition mode" should "consume a sequence of messages" in {
val topic = TestUtils.randomString(5)
val topic = randomString
val topicPartition = KafkaTopicPartition(topic, 0)

val producer = kafkaProducer("localhost", kafkaPort)
Expand Down
Loading

0 comments on commit 8177b26

Please sign in to comment.