Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from spreedly:master #8

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c16eee9
Fix Confluent Kafka connectivity with SSL support
andyleclair May 7, 2019
7b4f932
Fix compatibilty with Brod versions > 3.4
Jun 4, 2019
6ef394d
Update ex_docs
Jun 5, 2019
cc984b3
Allow :already_present as a valid result from brod.create_client
Aug 15, 2019
ff860d0
Handle client down error
MortezaHosseini Mar 4, 2020
de80b73
Bump version to 1.17.0
Mar 23, 2020
d77fba0
Update README
fatcatt316 Aug 14, 2020
f8a526e
Bump to version 1.18.0
fatcatt316 Aug 14, 2020
f77735a
Allow Kaffe Group Manager to recover when Kafka is unreachable
redmaner Jan 26, 2021
fcd3e4b
Refactor Group Manager recovery with Retry using exponential backoff
redmaner Jan 28, 2021
b62b0ee
Update tests to include new config: client_down_retry_expire
redmaner Jan 29, 2021
065608c
Properly ignore unused variable in Kaffe.GroupManager.do_a_retry?/2
redmaner Jan 29, 2021
fccc9ae
Bump to version 1.19.0
fatcatt316 Jan 29, 2021
6d6a890
Fix undefined function exponential_backoff not found
redmaner Mar 4, 2021
6d7b860
Bump to version 1.20.0
fatcatt316 Mar 9, 2021
3ad74e2
Fix crypto:rand_uniform deprecation (#121)
rodrigues Feb 9, 2022
978320d
Feat added support sha256/sha_512 mechanism for sasl auth (#112)
EdmondFrank Feb 10, 2022
bceba75
Remove deprecated supervisor spec (#119)
rodrigues Feb 10, 2022
4f7a0bc
Misc doc changes (#111)
kianmeng Feb 11, 2022
921fd1a
Bump kaffe version to 1.21 (#122)
britth Feb 11, 2022
621e33b
Update retry dependency
redmaner Mar 15, 2022
303fbfa
producer: Support producing messages with headers
redmaner Apr 19, 2022
2556210
Bump kaffe version to 1.22
fatcatt316 Apr 22, 2022
388a3a4
Bump kaffe version to 1.23
schwarzgeist Jul 28, 2023
5d80548
Match on subscribe_to_topics success to avoid getting stuck
rodrigues Dec 15, 2021
8ab0225
Bump kaffe version to 1.24
fatcatt316 Sep 29, 2023
28f9548
Fix Logger.warn deprecation
Jan 2, 2024
590a331
Merge pull request #138 from StanAnsems/fix-warn-deprecation
hdeters Feb 23, 2024
4895a48
Bump version to 1.25.0
hdeters Feb 23, 2024
320b448
Upgrade to Elixir 1.14
schwarzgeist Oct 9, 2024
ee74e39
Bump version to 1.26.0
schwarzgeist Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
line_length: 120
]
20 changes: 15 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
# The directory Mix will write compiled artifacts to.
/_build
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover
/cover/

# The directory Mix downloads your dependencies sources to.
/deps
/deps/

# Where 3rd-party dependencies like ExDoc output generated docs.
/doc
# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
kaffe-*.tar

# Temporary files for e.g. tests.
/tmp/

# Misc.
/priv
.tool-versions
4 changes: 2 additions & 2 deletions LICENSE → LICENSE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License
# MIT License

Copyright (c) [2017] [Spreedly, Inc.]
Copyright (c) 2017 Spreedly, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
113 changes: 65 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
# Kaffe

An opinionated, highly specific, Elixir wrapper around
[Brod](https://github.com/klarna/brod): the Erlang Kafka client.
:coffee:
[![Module Version](https://img.shields.io/hexpm/v/kaffe.svg)](https://hex.pm/packages/kaffe)
[![Hex Docs](https://img.shields.io/badge/hex-docs-lightgreen.svg)](https://hexdocs.pm/kaffe/)
[![Total Download](https://img.shields.io/hexpm/dt/kaffe.svg)](https://hex.pm/packages/kaffe)
[![License](https://img.shields.io/hexpm/l/kaffe.svg)](https://github.com/spreedly/kaffe/blob/master/LICENSE.md)
[![Last Updated](https://img.shields.io/github/last-commit/spreedly/kaffe.svg)](https://github.com/spreedly/kaffe/commits/master)

An opinionated, highly specific, Elixir wrapper around [Brod](https://github.com/klarna/brod): the Erlang Kafka client. :coffee:

**NOTE**: Although we're using this in production at Spreedly it is still under active development. The API may change and there may be serious bugs we've yet to encounter.

<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*

- [Installation](#installation)
- [Kaffe Consumer Usage](#kaffe-consumer-usage)
- [Kaffe GroupMember - Batch Message Consumer](#kaffe-groupmember---batch-message-consumer)
- [Managing how offsets are committed](#managing-how-offsets-are-committed)
- [Kaffe Consumer - Single Message Consumer (Deprecated)](#kaffe-consumer---single-message-consumer-deprecated)
- [async message acknowledgement](#async-message-acknowledgement)
- [Kaffe Producer Usage](#kaffe-producer-usage)
- [Heroku Configuration](#heroku-configuration)
- [Producing to Kafka](#producing-to-kafka)
- [Testing](#testing)
- [Setup](#setup)
- [Running](#running)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

## Installation

1. Add `kaffe` to your list of dependencies in `mix.exs`:
Expand Down Expand Up @@ -56,44 +79,29 @@ There is also legacy support for single message consumers, which process one mes
end
```

2. The configuration options for the `GroupMember` consumer are a
superset of those for `Kaffe.Consumer`, except for
`:async_message_ack`, which is not supported. The additional options
are:
2. The configuration options for the `GroupMember` consumer are a superset of those for `Kaffe.Consumer`, except for `:async_message_ack`, which is not supported. The additional options are:

* `:rebalance_delay_ms` The time to allow for rebalancing among workers. The default is 10,000, which should give the consumers time to rebalance when scaling.

`:rebalance_delay_ms` which is the time to allow for rebalancing
among workers. The default is 10,000, which should give the
consumers time to rebalance when scaling.
* `:max_bytes` Limits the number of message bytes received from Kafka for a particular topic subscriber. The default is 1MB. This parameter might need tuning depending on the number of partitions in the topics being read (there is one subscriber per topic per partition). For example, if you are reading from two topics, each with 32 partitions, there is the potential of 64MB in buffered messages at any one time.

`:max_bytes` limits the number of message bytes received from Kafka
for a particular topic subscriber. The default is 1MB. This
parameter might need tuning depending on the number of partitions
in the topics being read (there is one subscriber per topic per
partition). For example, if you are reading from two topics, each
with 32 partitions, there is the potential of 64MB in buffered
messages at any one time.
* `:min_bytes` Sets a minimum threshold for the number of bytes to fetch for a batch of messages. The default is 0MB.

`:min_bytes` Sets a minimum threshold for the number of
bytes to fetch for a batch of messages. The default is 0MB.
* `:max_wait_time` Sets the maximum number of milliseconds that the broker is allowed to collect min_bytes of messages in a batch of messages.

`:max_wait_time` Sets the maximum number of milliseconds that the
broker is allowed to collect min_bytes of messages in a batch of messages
* `:offset_reset_policy` Controls how the subscriber handles an expired offset. See the Kafka consumer option, [`auto.offset.reset`](https://kafka.apache.org/documentation/#newconsumerconfigs). Valid values for this option are:

`:offset_reset_policy` controls how the subscriber handles an
expired offset. See the Kafka consumer option,
[`auto.offset.reset`](https://kafka.apache.org/documentation/#newconsumerconfigs).
Valid values for this option are:
* `:reset_to_earliest` Reset to the earliest available offset.
* `:reset_to_latest` Reset to the latest offset.
* `:reset_by_subscriber` The subscriber receives the `OffsetOutOfRange` error.

- `:reset_to_earliest` - reset to the earliest available offset
- `:reset_to_latest` - reset to the latest offset
- `:reset_by_subscriber` - The subscriber receives the `OffsetOutOfRange` error
More information in the [Brod consumer](https://github.com/klarna/brod/blob/master/src/brod_consumer.erl).

More information in the [Brod
consumer](https://github.com/klarna/brod/blob/master/src/brod_consumer.erl).
* `:worker_allocation_strategy` Controls how workers are allocated with respect to consumed topics and partitions.

`:worker_allocation_strategy` controls how workers are allocated with respect to consumed topics and partitions.
- `:worker_per_partition` - this is the default (for backward compatibilty) and allocates a single worker per partition across topics. This is useful for managing concurrent processing of messages that may be received from any consumed topic.
- `:worker_per_topic_partition` - this strategy allocates a worker per topic partition. This means there will be a worker for every topic partition consumed. Unless you need to control concurrency across topics, you should use this strategy.
* `:worker_per_partition` The default (for backward compatibilty) and allocates a single worker per partition across topics. This is useful for managing concurrent processing of messages that may be received from any consumed topic.

* `:worker_per_topic_partition` This strategy allocates a worker per topic partition. This means there will be a worker for every topic partition consumed. Unless you need to control concurrency across topics, you should use this strategy.

```elixir
config :kaffe,
Expand All @@ -115,8 +123,7 @@ There is also legacy support for single message consumers, which process one mes
],
```

3. Add `Kaffe.GroupMemberSupervisor` as a supervisor in your
supervision tree
3. Add `Kaffe.GroupMemberSupervisor` as a supervisor in your supervision tree.

```elixir
defmodule MyApp.Application do
Expand All @@ -131,24 +138,26 @@ There is also legacy support for single message consumers, which process one mes
}
]

opts = [strategy: :one_for_one, name: Sample.Supervisor]
opts = [strategy: :one_for_one, name: MyApp.Application.Supervisor]
Supervisor.start_link(children, opts)
end
end
```

#### Managing how offsets are committed

In some cases you may not want to commit back the most recent offset after processing a list of messages. For example, if you're batching messages to be sent elsewhere and want to ensure that a batch can be rebuilt should there be an error further downstream. In that example you might want to keep the offset of the first message in your batch so your consumer can restart back at that point to reprocess and rebatch the messages. Your message handler can respond in the following ways to manage how offsets are committed back:
In some cases you may not want to commit back the most recent offset after processing a list of messages. For example, if you're batching messages to be sent elsewhere and want to ensure that a batch can be rebuilt should there be an error further downstream. In that example you might want to keep the offset of the first message in your batch so your consumer can restart back at that point to reprocess and rebatch the messages.

Your message handler can respond in the following ways to manage how offsets are committed back:

`:ok` - commit back the most recent offset and request more messages
`{:ok, :no_commit}` - do _not_ commit back the most recent offset and request more message from the offset of the last message
`{:ok, :no_commit}` - do _not_ commit back the most recent offset and request more messages from the offset of the last message
`{:ok, offset}` - commit back at the offset specified and request messages from that point forward

Example:

```elixir
defmodule MessageProcessor
defmodule MessageProcessor do
def handle_messages(messages) do
for %{key: key, value: value} = message <- messages do
IO.inspect message
Expand All @@ -161,11 +170,11 @@ end

### Kaffe Consumer - Single Message Consumer (Deprecated)

_For backward compatiblitly only, you should use `Kaffe.GroupMemberSupervisor` instead!_
_For backward compatibility only! `Kaffe.GroupMemberSupervisor` is recommended instead!_

1. Add a `handle_message/1` function to a local module (e.g. `MessageProcessor`). This function will be called with each Kafka message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.

The module's `handle_message/1` function _must_ return `:ok` or Kaffe will throw an error. In normal (synchronous consumer) operation the Kaffe consumer will block until your `handle_message/1` function returns `:ok`.
The module's `handle_message/1` function _must_ return `:ok` or Kaffe will throw an error. In normal (synchronous consumer) operation the Kaffe consumer will block until your `handle_message/1` function returns `:ok`.

### Example

Expand Down Expand Up @@ -212,7 +221,7 @@ _For backward compatiblitly only, you should use `Kaffe.GroupMemberSupervisor` i
],
```

The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted then your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages.
The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted, your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages.

### Heroku Configuration

Expand Down Expand Up @@ -269,7 +278,7 @@ If you need asynchronous message consumption:
Kaffe.Consumer.ack(pid, message)
```

**NOTE**: Asynchronous consumption means your system will no longer provide any backpressure to the Kaffe.Consumer. You will also need to add robust measures to your system to ensure that no messages are lost in processing. IE if you spawn 5 workers processing a series of asynchronous messages from Kafka and 1 of them crashes without acknowledgement then it's possible and likely that the message will be skipped entirely.
**NOTE**: Asynchronous consumption means your system will no longer provide any backpressure to the Kaffe.Consumer. You will also need to add robust measures to your system to ensure that no messages are lost in processing. I.e., if you spawn 5 workers processing a series of asynchronous messages from Kafka and 1 of them crashes without acknowledgement then it's possible and likely that the message will be skipped entirely.

Kafka only tracks a single numeric offset, not individual messages. If a message fails and a later offset is committed then the failed message will _not_ be sent again.

Expand All @@ -289,6 +298,7 @@ config :kaffe,

# optional
partition_strategy: :md5,
ssl: true,
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
Expand All @@ -299,17 +309,19 @@ config :kaffe,

The `partition_strategy` setting can be one of:

- `:md5`: (default) provides even and deterministic distrbution of the messages over the available partitions based on an MD5 hash of the key
- `:md5`: (default) provides even and deterministic distribution of the messages over the available partitions based on an MD5 hash of the key
- `:random`: select a random partition for each message
- function: a given function to call to determine the correct partition

You can also set any of the Brod producer configuration options in the `producer` section - see [the Brod sources](https://github.com/klarna/brod/blob/master/src/brod_producer.erl#L90) for a list of keys and their meaning.

If kafka broker configured with `SASL_PLAINTEXT` auth, `sasl` option can be added
If the Kafka broker is configured with `SASL_PLAINTEXT` auth, the `sasl` option can be added.

If using Confluent Hosted Kafka, also add `ssl: true` as shown above.

## Heroku Configuration

To configure a Kaffe Producer for a Heroku Kafka compatible environment including SSL omit the `endpoint` and instead set `heroku_kafka_env: true`
To configure a Kaffe Producer for a Heroku Kafka compatible environment, including SSL, omit the `endpoint` and instead set `heroku_kafka_env: true`

```elixir
config :kaffe,
Expand Down Expand Up @@ -371,12 +383,11 @@ There are several ways to produce:

**NOTE**: With this approach Kaffe will not calculate the next partition since it assumes you're taking over that job by giving it a specific partition.


## Testing

### Setup

In order to run the end to end tests, a Kafka topic is required. It must:
In order to run the end-to-end tests, a Kafka topic is required. It must:

* be named `kaffe-test`
* have 32 partitions
Expand All @@ -395,3 +406,9 @@ mix test
# end to end test
mix test --only e2e
```

## Copyright and License

Copyright (c) 2017 Spreedly, Inc.

This software is released under the [MIT License](./LICENSE.md).
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use Mix.Config
import Config

config :kaffe,
kafka_mod: :brod,
Expand Down
2 changes: 1 addition & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
use Mix.Config
import Config
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use Mix.Config
import Config

config :kaffe,
kafka_mod: TestBrod,
Expand All @@ -16,6 +16,7 @@ config :kaffe,
max_bytes: 10_000,
subscriber_retries: 1,
subscriber_retry_delay_ms: 5,
client_down_retry_expire: 15_000,
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
Expand Down
29 changes: 24 additions & 5 deletions lib/kaffe/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,19 @@ defmodule Kaffe.Config do
|> parse_endpoints()
end

def parse_endpoints(endpoints) when is_list(endpoints), do: endpoints
@doc """
Transform the list of endpoints into a list of `{charlist, port}` tuples.
"""
def parse_endpoints(endpoints) when is_list(endpoints) do
endpoints
|> Enum.map(fn {host, port} ->
{to_charlist(host), port}
end)
end

@doc """
Transform the encoded string into a list of `{charlist, port}` tuples.
"""
def parse_endpoints(url) when is_binary(url) do
url
|> String.replace("kafka+ssl://", "")
Expand All @@ -17,23 +28,31 @@ defmodule Kaffe.Config do

def url_endpoint_to_tuple(endpoint) do
[ip, port] = endpoint |> String.split(":")
{ip |> String.to_atom(), port |> String.to_integer()}
{ip |> String.to_charlist(), port |> String.to_integer()}
end

def sasl_config(%{mechanism: :plain, login: login, password: password})
when not is_nil(password) and not is_nil(login),
do: [sasl: {:plain, login, password}]

def sasl_config(%{mechanism: :scram_sha_256, login: login, password: password})
when not is_nil(password) and not is_nil(login),
do: [sasl: {:scram_sha_256, login, password}]

def sasl_config(%{mechanism: :scram_sha_512, login: login, password: password})
when not is_nil(password) and not is_nil(login),
do: [sasl: {:scram_sha_512, login, password}]

def sasl_config(_), do: []

def ssl_config do
ssl_config(client_cert(), client_cert_key())
end

def ssl_config(_client_cert = nil, _client_cert_key = nil) do
[]
end
def ssl_config(true), do: [ssl: true]
def ssl_config(_), do: []

def ssl_config(_client_cert = nil, _client_cert_key = nil), do: []
def ssl_config(client_cert, client_cert_key) do
[
ssl: [
Expand Down
15 changes: 13 additions & 2 deletions lib/kaffe/config/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ defmodule Kaffe.Config.Consumer do
subscriber_retries: subscriber_retries(),
subscriber_retry_delay_ms: subscriber_retry_delay_ms(),
offset_reset_policy: offset_reset_policy(),
worker_allocation_strategy: worker_allocation_strategy()
worker_allocation_strategy: worker_allocation_strategy(),
client_down_retry_expire: client_down_retry_expire()
}
end

Expand Down Expand Up @@ -72,7 +73,7 @@ defmodule Kaffe.Config.Consumer do
end

def client_consumer_config do
default_client_consumer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options()
default_client_consumer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options() ++ ssl_options()
end

def sasl_options do
Expand All @@ -81,6 +82,12 @@ defmodule Kaffe.Config.Consumer do
|> Kaffe.Config.sasl_config()
end

def ssl_options do
:ssl
|> config_get(false)
|> Kaffe.Config.ssl_config()
end

def default_client_consumer_config do
[
auto_start_producers: false,
Expand All @@ -104,6 +111,10 @@ defmodule Kaffe.Config.Consumer do
config_get(:worker_allocation_strategy, :worker_per_partition)
end

def client_down_retry_expire do
config_get(:client_down_retry_expire, 30_000)
end

def maybe_heroku_kafka_ssl do
case heroku_kafka?() do
true -> Kaffe.Config.ssl_config()
Expand Down
Loading