Skip to content

Commit

Permalink
Dynamic subscription to new topic #81 (#82)
Browse files Browse the repository at this point in the history
* Update KafkaConsumer.scala

Added Meta Data Max configuration to KafkaConsumer

* Fixed Typo
  • Loading branch information
Nitendra Gautam authored and simonsouter committed Jan 20, 2017
1 parent 382e99d commit 0bd8be0
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object KafkaConsumer {
* @param maxPartitionFetchBytes the maximum amount of data per-partition the server will return
* @param maxPollRecords the maximum number of records returned in a single call to poll()
* @param maxPollInterval the maximum delay between invocations of poll() when using consumer group management
* @param maxMetaDataAge period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions
* @param autoOffsetReset what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
* @tparam K key deserialiser type
* @tparam V value deserialiser type
Expand All @@ -61,6 +62,7 @@ object KafkaConsumer {
maxPartitionFetchBytes: Int = ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
maxPollRecords: Int = 500,
maxPollInterval: Int = 300000,
maxMetaDataAge : Long = 300000,
autoOffsetReset: OffsetResetStrategy = OffsetResetStrategy.LATEST): Conf[K, V] = {

val configMap = Map[String, AnyRef](
Expand All @@ -72,6 +74,7 @@ object KafkaConsumer {
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG -> maxPartitionFetchBytes.toString,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPollRecords.toString,
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG -> maxPollInterval.toString,
ConsumerConfig.METADATA_MAX_AGE_CONFIG ->maxMetaDataAge.toString,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> autoOffsetReset.toString.toLowerCase
)

Expand Down Expand Up @@ -141,4 +144,4 @@ object KafkaConsumer {
*/
def apply[K, V](conf: Conf[K, V]): JKafkaConsumer[K, V] =
new JKafkaConsumer[K, V](conf.props, conf.keyDeserializer, conf.valueDeserializer)
}
}

0 comments on commit 0bd8be0

Please sign in to comment.