diff --git a/.formatter.exs b/.formatter.exs index b9eb5ad..0a70dc0 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,3 +1,5 @@ +# Used by "mix format" [ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], line_length: 120 ] diff --git a/.gitignore b/.gitignore index a2318a8..b29c7cc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1,17 @@ # The directory Mix will write compiled artifacts to. -/_build +/_build/ # If you run "mix test --cover", coverage assets end up here. -/cover +/cover/ # The directory Mix downloads your dependencies sources to. -/deps +/deps/ -# Where 3rd-party dependencies like ExDoc output generated docs. -/doc +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch # If the VM crashes, it generates a dump, let's ignore it too. erl_crash.dump @@ -16,5 +19,12 @@ erl_crash.dump # Also ignore archive artifacts (built via "mix archive.build"). *.ez +# Ignore package tarball (built via "mix hex.build"). +kaffe-*.tar + +# Temporary files for e.g. tests. +/tmp/ + +# Misc. /priv .tool-versions diff --git a/LICENSE b/LICENSE.md similarity index 95% rename from LICENSE rename to LICENSE.md index f175159..2d93f87 100644 --- a/LICENSE +++ b/LICENSE.md @@ -1,6 +1,6 @@ -MIT License +# MIT License -Copyright (c) [2017] [Spreedly, Inc.] +Copyright (c) 2017 Spreedly, Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 20f4bd1..bdcf549 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,34 @@ # Kaffe -An opinionated, highly specific, Elixir wrapper around -[Brod](https://github.com/klarna/brod): the Erlang Kafka client. -:coffee: +[![Module Version](https://img.shields.io/hexpm/v/kaffe.svg)](https://hex.pm/packages/kaffe) +[![Hex Docs](https://img.shields.io/badge/hex-docs-lightgreen.svg)](https://hexdocs.pm/kaffe/) +[![Total Download](https://img.shields.io/hexpm/dt/kaffe.svg)](https://hex.pm/packages/kaffe) +[![License](https://img.shields.io/hexpm/l/kaffe.svg)](https://github.com/spreedly/kaffe/blob/master/LICENSE.md) +[![Last Updated](https://img.shields.io/github/last-commit/spreedly/kaffe.svg)](https://github.com/spreedly/kaffe/commits/master) + +An opinionated, highly specific, Elixir wrapper around [Brod](https://github.com/klarna/brod): the Erlang Kafka client. :coffee: **NOTE**: Although we're using this in production at Spreedly it is still under active development. The API may change and there may be serious bugs we've yet to encounter. + + +**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)* + +- [Installation](#installation) +- [Kaffe Consumer Usage](#kaffe-consumer-usage) + - [Kaffe GroupMember - Batch Message Consumer](#kaffe-groupmember---batch-message-consumer) + - [Managing how offsets are committed](#managing-how-offsets-are-committed) + - [Kaffe Consumer - Single Message Consumer (Deprecated)](#kaffe-consumer---single-message-consumer-deprecated) + - [async message acknowledgement](#async-message-acknowledgement) +- [Kaffe Producer Usage](#kaffe-producer-usage) +- [Heroku Configuration](#heroku-configuration) +- [Producing to Kafka](#producing-to-kafka) +- [Testing](#testing) + - [Setup](#setup) + - [Running](#running) + + + ## Installation 1. Add `kaffe` to your list of dependencies in `mix.exs`: @@ -56,44 +79,29 @@ There is also legacy support for single message consumers, which process one mes end ``` -2. The configuration options for the `GroupMember` consumer are a - superset of those for `Kaffe.Consumer`, except for - `:async_message_ack`, which is not supported. The additional options - are: +2. The configuration options for the `GroupMember` consumer are a superset of those for `Kaffe.Consumer`, except for `:async_message_ack`, which is not supported. The additional options are: + + * `:rebalance_delay_ms` The time to allow for rebalancing among workers. The default is 10,000, which should give the consumers time to rebalance when scaling. - `:rebalance_delay_ms` which is the time to allow for rebalancing - among workers. The default is 10,000, which should give the - consumers time to rebalance when scaling. + * `:max_bytes` Limits the number of message bytes received from Kafka for a particular topic subscriber. The default is 1MB. This parameter might need tuning depending on the number of partitions in the topics being read (there is one subscriber per topic per partition). For example, if you are reading from two topics, each with 32 partitions, there is the potential of 64MB in buffered messages at any one time. - `:max_bytes` limits the number of message bytes received from Kafka - for a particular topic subscriber. The default is 1MB. This - parameter might need tuning depending on the number of partitions - in the topics being read (there is one subscriber per topic per - partition). For example, if you are reading from two topics, each - with 32 partitions, there is the potential of 64MB in buffered - messages at any one time. + * `:min_bytes` Sets a minimum threshold for the number of bytes to fetch for a batch of messages. The default is 0MB. - `:min_bytes` Sets a minimum threshold for the number of - bytes to fetch for a batch of messages. The default is 0MB. + * `:max_wait_time` Sets the maximum number of milliseconds that the broker is allowed to collect min_bytes of messages in a batch of messages. - `:max_wait_time` Sets the maximum number of milliseconds that the - broker is allowed to collect min_bytes of messages in a batch of messages + * `:offset_reset_policy` Controls how the subscriber handles an expired offset. See the Kafka consumer option, [`auto.offset.reset`](https://kafka.apache.org/documentation/#newconsumerconfigs). Valid values for this option are: - `:offset_reset_policy` controls how the subscriber handles an - expired offset. See the Kafka consumer option, - [`auto.offset.reset`](https://kafka.apache.org/documentation/#newconsumerconfigs). - Valid values for this option are: + * `:reset_to_earliest` Reset to the earliest available offset. + * `:reset_to_latest` Reset to the latest offset. + * `:reset_by_subscriber` The subscriber receives the `OffsetOutOfRange` error. - - `:reset_to_earliest` - reset to the earliest available offset - - `:reset_to_latest` - reset to the latest offset - - `:reset_by_subscriber` - The subscriber receives the `OffsetOutOfRange` error + More information in the [Brod consumer](https://github.com/klarna/brod/blob/master/src/brod_consumer.erl). - More information in the [Brod - consumer](https://github.com/klarna/brod/blob/master/src/brod_consumer.erl). + * `:worker_allocation_strategy` Controls how workers are allocated with respect to consumed topics and partitions. - `:worker_allocation_strategy` controls how workers are allocated with respect to consumed topics and partitions. - - `:worker_per_partition` - this is the default (for backward compatibilty) and allocates a single worker per partition across topics. This is useful for managing concurrent processing of messages that may be received from any consumed topic. - - `:worker_per_topic_partition` - this strategy allocates a worker per topic partition. This means there will be a worker for every topic partition consumed. Unless you need to control concurrency across topics, you should use this strategy. + * `:worker_per_partition` The default (for backward compatibilty) and allocates a single worker per partition across topics. This is useful for managing concurrent processing of messages that may be received from any consumed topic. + + * `:worker_per_topic_partition` This strategy allocates a worker per topic partition. This means there will be a worker for every topic partition consumed. Unless you need to control concurrency across topics, you should use this strategy. ```elixir config :kaffe, @@ -115,8 +123,7 @@ There is also legacy support for single message consumers, which process one mes ], ``` -3. Add `Kaffe.GroupMemberSupervisor` as a supervisor in your - supervision tree +3. Add `Kaffe.GroupMemberSupervisor` as a supervisor in your supervision tree. ```elixir defmodule MyApp.Application do @@ -131,7 +138,7 @@ There is also legacy support for single message consumers, which process one mes } ] - opts = [strategy: :one_for_one, name: Sample.Supervisor] + opts = [strategy: :one_for_one, name: MyApp.Application.Supervisor] Supervisor.start_link(children, opts) end end @@ -139,16 +146,18 @@ There is also legacy support for single message consumers, which process one mes #### Managing how offsets are committed -In some cases you may not want to commit back the most recent offset after processing a list of messages. For example, if you're batching messages to be sent elsewhere and want to ensure that a batch can be rebuilt should there be an error further downstream. In that example you might want to keep the offset of the first message in your batch so your consumer can restart back at that point to reprocess and rebatch the messages. Your message handler can respond in the following ways to manage how offsets are committed back: +In some cases you may not want to commit back the most recent offset after processing a list of messages. For example, if you're batching messages to be sent elsewhere and want to ensure that a batch can be rebuilt should there be an error further downstream. In that example you might want to keep the offset of the first message in your batch so your consumer can restart back at that point to reprocess and rebatch the messages. + +Your message handler can respond in the following ways to manage how offsets are committed back: `:ok` - commit back the most recent offset and request more messages -`{:ok, :no_commit}` - do _not_ commit back the most recent offset and request more message from the offset of the last message +`{:ok, :no_commit}` - do _not_ commit back the most recent offset and request more messages from the offset of the last message `{:ok, offset}` - commit back at the offset specified and request messages from that point forward Example: ```elixir -defmodule MessageProcessor +defmodule MessageProcessor do def handle_messages(messages) do for %{key: key, value: value} = message <- messages do IO.inspect message @@ -161,11 +170,11 @@ end ### Kaffe Consumer - Single Message Consumer (Deprecated) -_For backward compatiblitly only, you should use `Kaffe.GroupMemberSupervisor` instead!_ +_For backward compatibility only! `Kaffe.GroupMemberSupervisor` is recommended instead!_ 1. Add a `handle_message/1` function to a local module (e.g. `MessageProcessor`). This function will be called with each Kafka message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata. - The module's `handle_message/1` function _must_ return `:ok` or Kaffe will throw an error. In normal (synchronous consumer) operation the Kaffe consumer will block until your `handle_message/1` function returns `:ok`. + The module's `handle_message/1` function _must_ return `:ok` or Kaffe will throw an error. In normal (synchronous consumer) operation the Kaffe consumer will block until your `handle_message/1` function returns `:ok`. ### Example @@ -212,7 +221,7 @@ _For backward compatiblitly only, you should use `Kaffe.GroupMemberSupervisor` i ], ``` - The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted then your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages. + The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted, your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages. ### Heroku Configuration @@ -269,7 +278,7 @@ If you need asynchronous message consumption: Kaffe.Consumer.ack(pid, message) ``` -**NOTE**: Asynchronous consumption means your system will no longer provide any backpressure to the Kaffe.Consumer. You will also need to add robust measures to your system to ensure that no messages are lost in processing. IE if you spawn 5 workers processing a series of asynchronous messages from Kafka and 1 of them crashes without acknowledgement then it's possible and likely that the message will be skipped entirely. +**NOTE**: Asynchronous consumption means your system will no longer provide any backpressure to the Kaffe.Consumer. You will also need to add robust measures to your system to ensure that no messages are lost in processing. I.e., if you spawn 5 workers processing a series of asynchronous messages from Kafka and 1 of them crashes without acknowledgement then it's possible and likely that the message will be skipped entirely. Kafka only tracks a single numeric offset, not individual messages. If a message fails and a later offset is committed then the failed message will _not_ be sent again. @@ -289,6 +298,7 @@ config :kaffe, # optional partition_strategy: :md5, + ssl: true, sasl: %{ mechanism: :plain, login: System.get_env("KAFFE_PRODUCER_USER"), @@ -299,17 +309,19 @@ config :kaffe, The `partition_strategy` setting can be one of: -- `:md5`: (default) provides even and deterministic distrbution of the messages over the available partitions based on an MD5 hash of the key +- `:md5`: (default) provides even and deterministic distribution of the messages over the available partitions based on an MD5 hash of the key - `:random`: select a random partition for each message - function: a given function to call to determine the correct partition You can also set any of the Brod producer configuration options in the `producer` section - see [the Brod sources](https://github.com/klarna/brod/blob/master/src/brod_producer.erl#L90) for a list of keys and their meaning. -If kafka broker configured with `SASL_PLAINTEXT` auth, `sasl` option can be added +If the Kafka broker is configured with `SASL_PLAINTEXT` auth, the `sasl` option can be added. + +If using Confluent Hosted Kafka, also add `ssl: true` as shown above. ## Heroku Configuration -To configure a Kaffe Producer for a Heroku Kafka compatible environment including SSL omit the `endpoint` and instead set `heroku_kafka_env: true` +To configure a Kaffe Producer for a Heroku Kafka compatible environment, including SSL, omit the `endpoint` and instead set `heroku_kafka_env: true` ```elixir config :kaffe, @@ -371,12 +383,11 @@ There are several ways to produce: **NOTE**: With this approach Kaffe will not calculate the next partition since it assumes you're taking over that job by giving it a specific partition. - ## Testing ### Setup -In order to run the end to end tests, a Kafka topic is required. It must: +In order to run the end-to-end tests, a Kafka topic is required. It must: * be named `kaffe-test` * have 32 partitions @@ -395,3 +406,9 @@ mix test # end to end test mix test --only e2e ``` + +## Copyright and License + +Copyright (c) 2017 Spreedly, Inc. + +This software is released under the [MIT License](./LICENSE.md). diff --git a/config/config.exs b/config/config.exs index 437231f..885b957 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,4 +1,4 @@ -use Mix.Config +import Config config :kaffe, kafka_mod: :brod, diff --git a/config/dev.exs b/config/dev.exs index d2d855e..becde76 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -1 +1 @@ -use Mix.Config +import Config diff --git a/config/test.exs b/config/test.exs index 284076a..a8f6344 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,4 +1,4 @@ -use Mix.Config +import Config config :kaffe, kafka_mod: TestBrod, @@ -16,6 +16,7 @@ config :kaffe, max_bytes: 10_000, subscriber_retries: 1, subscriber_retry_delay_ms: 5, + client_down_retry_expire: 15_000, sasl: %{ mechanism: :plain, login: System.get_env("KAFFE_PRODUCER_USER"), diff --git a/lib/kaffe/config.ex b/lib/kaffe/config.ex index 0c78b1e..064be7f 100644 --- a/lib/kaffe/config.ex +++ b/lib/kaffe/config.ex @@ -5,8 +5,19 @@ defmodule Kaffe.Config do |> parse_endpoints() end - def parse_endpoints(endpoints) when is_list(endpoints), do: endpoints + @doc """ + Transform the list of endpoints into a list of `{charlist, port}` tuples. + """ + def parse_endpoints(endpoints) when is_list(endpoints) do + endpoints + |> Enum.map(fn {host, port} -> + {to_charlist(host), port} + end) + end + @doc """ + Transform the encoded string into a list of `{charlist, port}` tuples. + """ def parse_endpoints(url) when is_binary(url) do url |> String.replace("kafka+ssl://", "") @@ -17,23 +28,31 @@ defmodule Kaffe.Config do def url_endpoint_to_tuple(endpoint) do [ip, port] = endpoint |> String.split(":") - {ip |> String.to_atom(), port |> String.to_integer()} + {ip |> String.to_charlist(), port |> String.to_integer()} end def sasl_config(%{mechanism: :plain, login: login, password: password}) when not is_nil(password) and not is_nil(login), do: [sasl: {:plain, login, password}] + def sasl_config(%{mechanism: :scram_sha_256, login: login, password: password}) + when not is_nil(password) and not is_nil(login), + do: [sasl: {:scram_sha_256, login, password}] + + def sasl_config(%{mechanism: :scram_sha_512, login: login, password: password}) + when not is_nil(password) and not is_nil(login), + do: [sasl: {:scram_sha_512, login, password}] + def sasl_config(_), do: [] def ssl_config do ssl_config(client_cert(), client_cert_key()) end - def ssl_config(_client_cert = nil, _client_cert_key = nil) do - [] - end + def ssl_config(true), do: [ssl: true] + def ssl_config(_), do: [] + def ssl_config(_client_cert = nil, _client_cert_key = nil), do: [] def ssl_config(client_cert, client_cert_key) do [ ssl: [ diff --git a/lib/kaffe/config/consumer.ex b/lib/kaffe/config/consumer.ex index 95a2e8d..781ff99 100644 --- a/lib/kaffe/config/consumer.ex +++ b/lib/kaffe/config/consumer.ex @@ -18,7 +18,8 @@ defmodule Kaffe.Config.Consumer do subscriber_retries: subscriber_retries(), subscriber_retry_delay_ms: subscriber_retry_delay_ms(), offset_reset_policy: offset_reset_policy(), - worker_allocation_strategy: worker_allocation_strategy() + worker_allocation_strategy: worker_allocation_strategy(), + client_down_retry_expire: client_down_retry_expire() } end @@ -72,7 +73,7 @@ defmodule Kaffe.Config.Consumer do end def client_consumer_config do - default_client_consumer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options() + default_client_consumer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options() ++ ssl_options() end def sasl_options do @@ -81,6 +82,12 @@ defmodule Kaffe.Config.Consumer do |> Kaffe.Config.sasl_config() end + def ssl_options do + :ssl + |> config_get(false) + |> Kaffe.Config.ssl_config() + end + def default_client_consumer_config do [ auto_start_producers: false, @@ -104,6 +111,10 @@ defmodule Kaffe.Config.Consumer do config_get(:worker_allocation_strategy, :worker_per_partition) end + def client_down_retry_expire do + config_get(:client_down_retry_expire, 30_000) + end + def maybe_heroku_kafka_ssl do case heroku_kafka?() do true -> Kaffe.Config.ssl_config() diff --git a/lib/kaffe/config/producer.ex b/lib/kaffe/config/producer.ex index 54e4c05..d2e09a1 100644 --- a/lib/kaffe/config/producer.ex +++ b/lib/kaffe/config/producer.ex @@ -22,7 +22,7 @@ defmodule Kaffe.Config.Producer do end def client_producer_config do - default_client_producer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options() + default_client_producer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options() ++ ssl_options() end def sasl_options do @@ -38,6 +38,12 @@ defmodule Kaffe.Config.Producer do end end + def ssl_options do + :ssl + |> config_get(false) + |> Kaffe.Config.ssl_config() + end + def default_client_producer_config do [ auto_start_producers: true, diff --git a/lib/kaffe/consumer_group/group_manager.ex b/lib/kaffe/consumer_group/group_manager.ex index fda0915..0a854c9 100644 --- a/lib/kaffe/consumer_group/group_manager.ex +++ b/lib/kaffe/consumer_group/group_manager.ex @@ -14,6 +14,7 @@ defmodule Kaffe.GroupManager do """ use GenServer + use Retry require Logger defmodule State do @@ -28,6 +29,14 @@ defmodule Kaffe.GroupManager do worker_manager_pid: nil end + defmodule ClientDownException do + defexception [:message] + + def exception(_term) do + %ClientDownException{message: "Kafka client is down"} + end + end + ## ========================================================================== ## Public API ## ========================================================================== @@ -57,7 +66,14 @@ defmodule Kaffe.GroupManager do Logger.info("event#startup=#{__MODULE__} name=#{name()}") config = Kaffe.Config.Consumer.configuration() - :ok = kafka().start_client(config.endpoints, config.subscriber_name, config.consumer_config) + + case kafka().start_client(config.endpoints, config.subscriber_name, config.consumer_config) do + :ok -> + :ok + + {_, :already_present} -> + Logger.info("The brod client is already present, continuing.") + end GenServer.cast(self(), {:start_group_members}) @@ -79,12 +95,14 @@ defmodule Kaffe.GroupManager do def handle_cast({:start_group_members}, state) do Logger.debug("Starting worker supervisors for group manager: #{inspect(self())}") - {:ok, worker_supervisor_pid} = group_member_supervisor().start_worker_supervisor(state.supervisor_pid, state.subscriber_name) + {:ok, worker_supervisor_pid} = + group_member_supervisor().start_worker_supervisor(state.supervisor_pid, state.subscriber_name) + {:ok, worker_manager_pid} = worker_supervisor().start_worker_manager(worker_supervisor_pid, state.subscriber_name) state = %State{state | worker_manager_pid: worker_manager_pid} - subscribe_to_topics(state, state.topics) + :ok = subscribe_to_topics(state, state.topics) {:noreply, state} end @@ -95,7 +113,7 @@ defmodule Kaffe.GroupManager do """ def handle_call({:subscribe_to_topics, requested_topics}, _from, %State{topics: topics} = state) do new_topics = requested_topics -- topics - subscribe_to_topics(state, new_topics) + :ok = subscribe_to_topics(state, new_topics) {:reply, {:ok, new_topics}, %State{state | topics: state.topics ++ new_topics}} end @@ -110,13 +128,42 @@ defmodule Kaffe.GroupManager do ## ========================================================================== ## Helpers ## ========================================================================== + defp subscribe_to_topics(state, topics) do - for topic <- topics do - Logger.debug("Starting group member for topic: #{topic}") - {:ok, _pid} = subscribe_to_topic(state, topic) + Logger.debug("Starting group members for the following topics: #{inspect(topics)}") + + retry with: exponential_backoff() |> expiry(client_down_retry_expire()), + rescue_only: [Kaffe.GroupManager.ClientDownException] do + Enum.each(topics, fn topic -> + case subscribe_to_topic(state, topic) do + {:ok, _pid} -> + Logger.debug("Started group member for topic: #{topic}") + :ok + + error -> + Logger.debug("Starting group member for #{topic} failed, attempting retry with exponential backoff") + + is_client_down_error?(error) + |> do_a_retry?(error) + end + end) + after + :ok -> + Logger.debug("Group members succesfully started") + else + {:error, reason} = error -> + Logger.error("Starting group members failed: #{inspect(reason)}") + error + + _ = error -> + Logger.error("Starting group members failed: #{inspect(error)}") + {:error, error} end end + defp do_a_retry?(true, _error), do: raise(Kaffe.GroupManager.ClientDownException) + defp do_a_retry?(false, error), do: raise(error) + defp subscribe_to_topic(state, topic) do group_member_supervisor().start_group_member( state.supervisor_pid, @@ -146,4 +193,25 @@ defmodule Kaffe.GroupManager do defp worker_supervisor do Application.get_env(:kaffe, :worker_supervisor_mod, Kaffe.WorkerSupervisor) end + + defp client_down_retry_expire() do + Kaffe.Config.Consumer.configuration().client_down_retry_expire + end + + # Brod client errors are erlang exceptions and are hard to pattern match correctly. + # This function casts the error to a string, and returns if the error is a client down error + defp is_client_down_error?({:error, error}) do + error_string = "#{inspect(error)}" + + cond do + String.match?(error_string, ~r(:econnrefused)) -> + true + + String.match?(error_string, ~r({:error, :client_down})) -> + true + + true -> + false + end + end end diff --git a/lib/kaffe/consumer_group/group_member_supervisor.ex b/lib/kaffe/consumer_group/group_member_supervisor.ex index c68aaa0..0dc53ba 100644 --- a/lib/kaffe/consumer_group/group_member_supervisor.ex +++ b/lib/kaffe/consumer_group/group_member_supervisor.ex @@ -32,7 +32,14 @@ defmodule Kaffe.GroupMemberSupervisor do end def start_worker_supervisor(supervisor_pid, subscriber_name) do - Supervisor.start_child(supervisor_pid, supervisor(Kaffe.WorkerSupervisor, [subscriber_name])) + Supervisor.start_child( + supervisor_pid, + %{ + id: :"Kaffe.WorkerSupervisor.#{subscriber_name}", + start: {Kaffe.WorkerSupervisor, :start_link, [subscriber_name]}, + type: :supervisor + } + ) end def start_group_member( @@ -44,11 +51,14 @@ defmodule Kaffe.GroupMemberSupervisor do ) do Supervisor.start_child( supervisor_pid, - worker( - Kaffe.GroupMember, - [subscriber_name, consumer_group, worker_manager_pid, topic], - id: :"group_member_#{subscriber_name}_#{topic}" - ) + %{ + id: :"group_member_#{subscriber_name}_#{topic}", + start: { + Kaffe.GroupMember, + :start_link, + [subscriber_name, consumer_group, worker_manager_pid, topic] + } + } ) end @@ -56,11 +66,14 @@ defmodule Kaffe.GroupMemberSupervisor do Logger.info("event#starting=#{__MODULE__}") children = [ - worker(Kaffe.GroupManager, []) + %{ + id: Kaffe.GroupManager, + start: {Kaffe.GroupManager, :start_link, []} + } ] # If we get a failure, we need to reset so the states are all consistent. - supervise(children, strategy: :one_for_all, max_restarts: 0, max_seconds: 1) + Supervisor.init(children, strategy: :one_for_all, max_restarts: 0, max_seconds: 1) end defp name do diff --git a/lib/kaffe/consumer_group/subscriber/group_member.ex b/lib/kaffe/consumer_group/subscriber/group_member.ex index 5487bb8..ffcc306 100644 --- a/lib/kaffe/consumer_group/subscriber/group_member.ex +++ b/lib/kaffe/consumer_group/subscriber/group_member.ex @@ -57,12 +57,12 @@ defmodule Kaffe.GroupMember do # Should not receive this def get_committed_offsets(_group_member_pid, _topic_partitions) do - Logger.warn("event#get_committed_offsets") + Logger.warning("event#get_committed_offsets") end # Should not receive this def assign_partitions(_pid, _members, _topic_partitions) do - Logger.warn("event#assign_partitions") + Logger.warning("event#assign_partitions") end def assignments_received(pid, _member_id, generation_id, assignments) do diff --git a/lib/kaffe/consumer_group/subscriber/subscriber.ex b/lib/kaffe/consumer_group/subscriber/subscriber.ex index c2b85c9..96eb32d 100644 --- a/lib/kaffe/consumer_group/subscriber/subscriber.ex +++ b/lib/kaffe/consumer_group/subscriber/subscriber.ex @@ -144,12 +144,12 @@ defmodule Kaffe.Subscriber do def handle_info({:DOWN, _ref, _process, pid, reason}, %{subscriber_pid: subscriber_pid} = state) when pid == subscriber_pid do - Logger.warn("event#consumer_down=#{inspect(self())} reason=#{inspect(reason)}") + Logger.warning("event#consumer_down=#{inspect(self())} reason=#{inspect(reason)}") {:stop, {:shutdown, {:consumer_down, reason}}, state} end def handle_info(unknown, state) do - Logger.warn("event#unknown_message=#{inspect(self())} reason=#{inspect(unknown)}") + Logger.warning("event#unknown_message=#{inspect(self())} reason=#{inspect(unknown)}") {:noreply, state} end @@ -196,7 +196,7 @@ defmodule Kaffe.Subscriber do end defp handle_subscribe({:error, reason}, state) do - Logger.warn("event#subscribe_failed=#{inspect(self())} reason=#{inspect(reason)}") + Logger.warning("event#subscribe_failed=#{inspect(self())} reason=#{inspect(reason)}") {:stop, {:subscribe_failed, :retries_exceeded, reason}, state} end diff --git a/lib/kaffe/consumer_group/worker/worker_supervisor.ex b/lib/kaffe/consumer_group/worker/worker_supervisor.ex index a19bddb..7a617a3 100644 --- a/lib/kaffe/consumer_group/worker/worker_supervisor.ex +++ b/lib/kaffe/consumer_group/worker/worker_supervisor.ex @@ -11,7 +11,10 @@ defmodule Kaffe.WorkerSupervisor do end def start_worker_manager(pid, subscriber_name) do - Supervisor.start_child(pid, worker(Kaffe.WorkerManager, [subscriber_name])) + Supervisor.start_child( + pid, + {Kaffe.WorkerManager, subscriber_name} + ) end def start_worker(pid, message_handler, subscriber_name, worker_name) do @@ -19,9 +22,14 @@ defmodule Kaffe.WorkerSupervisor do Supervisor.start_child( pid, - worker(Kaffe.Worker, [message_handler, subscriber_name, worker_name], - id: :"worker_#{subscriber_name}_#{worker_name}" - ) + %{ + id: :"worker_#{subscriber_name}_#{worker_name}", + start: { + Kaffe.Worker, + :start_link, + [message_handler, subscriber_name, worker_name] + } + } ) end @@ -33,7 +41,7 @@ defmodule Kaffe.WorkerSupervisor do # If anything fails, the state is inconsistent with the state of # `Kaffe.Subscriber` and `Kaffe.GroupMember`. We need the failure # to cascade all the way up so that they are terminated. - supervise(children, strategy: :one_for_all, max_restarts: 0, max_seconds: 1) + Supervisor.init(children, strategy: :one_for_all, max_restarts: 0, max_seconds: 1) end defp name(subscriber_name) do diff --git a/lib/kaffe/partition_selector.ex b/lib/kaffe/partition_selector.ex index e80f34d..76312f0 100644 --- a/lib/kaffe/partition_selector.ex +++ b/lib/kaffe/partition_selector.ex @@ -24,8 +24,8 @@ defmodule Kaffe.PartitionSelector do end end - def random(total) do - :crypto.rand_uniform(0, total) + def random(total) when total >= 1 do + :rand.uniform(total) - 1 end def md5(key, total) do diff --git a/lib/kaffe/producer.ex b/lib/kaffe/producer.ex index 3418878..22d6657 100644 --- a/lib/kaffe/producer.ex +++ b/lib/kaffe/producer.ex @@ -21,6 +21,25 @@ defmodule Kaffe.Producer do @kafka Application.get_env(:kaffe, :kafka_mod, :brod) + @typedoc """ + A Kafka message can be represented as a tuple containing a key value pair of type binary() + """ + @type message :: {key :: binary(), value :: binary()} + + @typedoc """ + A Kafka message can also be represented as a map, containing `:key`, `:value`, and `:headers` + """ + @type message_object :: %{ + key: key :: binary(), + value: value :: binary(), + headers: headers :: headers() + } + + @typedoc """ + Headers represent a list of tuples containing key, value pairs of type binary() + """ + @type headers :: [{key :: binary(), value :: binary()}] + require Logger ## ------------------------------------------------------------------------- @@ -34,7 +53,7 @@ defmodule Kaffe.Producer do @doc """ Synchronously produce the `messages_list` to `topic` - - `messages_list` must be a list of `{key, value}` tuples + - `messages_list` must be a list of type `message()` or `message_object()` - `opts` may include the partition strategy to use, `partition_strategy: :md5`, or `:random` or a function. @@ -50,7 +69,7 @@ defmodule Kaffe.Producer do @doc """ Synchronously produce the `message_list` to `topic` - `messages` must be a list of `{key, value}` tuples + `messages` must be a list of type `message()` or `message_object()` Returns: @@ -80,7 +99,7 @@ defmodule Kaffe.Producer do @doc """ Synchronously produce the `message_list` to `topic`/`partition` - `message_list` must be a list of `{key, value}` tuples + `message_list` must be a list of type `message()` or `message_type()` Returns: @@ -119,7 +138,12 @@ defmodule Kaffe.Producer do message_list |> add_timestamp |> group_by_partition(topic, partition_strategy) - |> produce_list_to_topic(topic) + |> case do + messages = %{} -> produce_list_to_topic(messages, topic) + {:error, reason} -> + Logger.warning("Error while grouping by partition #{inspect(reason)}") + {:error, reason} + end end defp produce_value(topic, key, value) do @@ -133,7 +157,7 @@ defmodule Kaffe.Producer do @kafka.produce_sync(client_name(), topic, partition, key, value) error -> - Logger.warn( + Logger.warning( "event#produce topic=#{topic} key=#{key} error=#{inspect(error)}" ) @@ -143,18 +167,27 @@ defmodule Kaffe.Producer do defp add_timestamp(messages) do messages - |> Enum.map(fn {key, message} -> - {System.system_time(:millisecond), key, message} - end) + |> Enum.map(&add_timestamp_to_message/1) end - defp group_by_partition(messages, topic, partition_strategy) do - {:ok, partitions_count} = @kafka.get_partitions_count(client_name(), topic) + defp add_timestamp_to_message(message) when is_map(message) and not :erlang.is_map_key(:ts, message), + do: message |> Map.put(:ts, System.system_time(:millisecond)) - messages - |> Enum.group_by(fn {_timestamp, key, message} -> - choose_partition(topic, partitions_count, key, message, partition_strategy) - end) + defp add_timestamp_to_message(message) when is_map(message), do: message + + defp add_timestamp_to_message({key, message}), do: {System.system_time(:millisecond), key, message} + + defp group_by_partition(messages, topic, partition_strategy) do + with {:ok, partitions_count} <- @kafka.get_partitions_count(client_name(), topic) do + messages + |> Enum.group_by(fn + {_timestamp, key, message} -> + choose_partition(topic, partitions_count, key, message, partition_strategy) + + %{key: key, value: message} -> + choose_partition(topic, partitions_count, key, message, partition_strategy) + end) + end end defp produce_list_to_topic(message_list, topic) do diff --git a/mix.exs b/mix.exs index a6916f7..c6541a0 100644 --- a/mix.exs +++ b/mix.exs @@ -1,25 +1,29 @@ defmodule Kaffe.Mixfile do use Mix.Project + @source_url "https://github.com/spreedly/kaffe" + @version "1.26.0" + def project do [ app: :kaffe, - version: "1.12.0", - description: - "An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box.", + version: @version, name: "Kaffe", - source_url: "https://github.com/spreedly/kaffe", - package: package(), - elixir: "~> 1.3", + elixir: "~> 1.14", build_embedded: Mix.env() == :prod, start_permanent: Mix.env() == :prod, elixirc_paths: elixirc_paths(Mix.env()), - deps: deps() + deps: deps(), + docs: docs(), + package: package() ] end def application do - [applications: [:logger, :brod], mod: {Kaffe, []}] + [ + applications: [:logger, :brod, :retry], + mod: {Kaffe, []} + ] end defp elixirc_paths(:test), do: ["lib", "test/support"] @@ -27,17 +31,35 @@ defmodule Kaffe.Mixfile do defp deps do [ - {:brod, ">= 3.0.0 and < 3.5.0"}, - {:ex_doc, "~> 0.14", only: :dev, runtime: false}, + {:brod, "~> 3.0"}, + {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, + {:retry, "~> 0.15.0"} ] end defp package do [ name: :kaffe, - licenses: ["MIT License"], - maintainers: ["Kevin Lewis", "David Santoso", "Ryan Daigle", "Spreedly"], - links: %{"GitHub" => "https://github.com/spreedly/kaffe"} + description: + "An opinionated Elixir wrapper around brod, the Erlang Kafka client, " <> + "that supports encrypted connections to Heroku Kafka out of the box.", + licenses: ["MIT"], + maintainers: ["Kevin Lewis", "David Santoso", "Ryan Daigle", "Spreedly", "Joe Peck", "Brittany Hayes", "Anthony Walker"], + links: %{ + "GitHub" => @source_url + } + ] + end + + defp docs do + [ + extras: [ + "README.md": [title: "Overview"], + "LICENSE.md": [title: "License"] + ], + main: "readme", + source_url: @source_url, + formatters: ["html"] ] end end diff --git a/mix.lock b/mix.lock index 354d013..721513f 100644 --- a/mix.lock +++ b/mix.lock @@ -1,8 +1,16 @@ -%{"brod": {:hex, :brod, "3.2.0", "64f0778a7a32ec0a39cec9a564f4686bdfe72b147b48076e114a156fd0a30222", [:make, :rebar, :rebar3], [{:kafka_protocol, "1.1.0", [repo: "hexpm", hex: :kafka_protocol, optional: false]}, {:supervisor3, "1.1.5", [repo: "hexpm", hex: :supervisor3, optional: false]}], "hexpm"}, - "earmark": {:hex, :earmark, "1.2.2", "f718159d6b65068e8daeef709ccddae5f7fdc770707d82e7d126f584cd925b74", [:mix], []}, - "ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]}, - "kafka_protocol": {:hex, :kafka_protocol, "1.1.0", "817c07a6339cbfb32d1f20a588353bf8d9a8944df296eb2e930360b83760c171", [:rebar, :rebar3], [{:snappyer, "1.2.1", [repo: "hexpm", hex: :snappyer, optional: false]}], "hexpm"}, +%{ + "brod": {:hex, :brod, "3.2.0", "64f0778a7a32ec0a39cec9a564f4686bdfe72b147b48076e114a156fd0a30222", [:make, :rebar, :rebar3], [{:kafka_protocol, "1.1.0", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.5", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "50232a668d3b495f129e8d588fd1ebe36bdc0df3ff43a5b77a1c056bea1df032"}, + "earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm", "e3be2bc3ae67781db529b80aa7e7c49904a988596e2dbff897425b48b3581161"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.13", "0c98163e7d04a15feb62000e1a891489feb29f3d10cb57d4f845c405852bbef8", [:mix], [], "hexpm", "d602c26af3a0af43d2f2645613f65841657ad6efc9f0e361c3b6c06b578214ba"}, + "ex_doc": {:hex, :ex_doc, "0.24.2", "e4c26603830c1a2286dae45f4412a4d1980e1e89dc779fcd0181ed1d5a05c8d9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "e134e1d9e821b8d9e4244687fb2ace58d479b67b282de5158333b0d57c6fb7da"}, + "kafka_protocol": {:hex, :kafka_protocol, "1.1.0", "817c07a6339cbfb32d1f20a588353bf8d9a8944df296eb2e930360b83760c171", [:rebar, :rebar3], [{:snappyer, "1.2.1", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "9cbb8830c7bc0b9942e1f03b9b132d5ea548516a3a67249c156654f2a902a34f"}, "logfmt": {:hex, :logfmt, "3.2.0", "887a091adad28acc6e4d8b3d3bce177b934e7c61e7655c86946410f44aca6d84", [:mix], []}, + "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "metrix": {:git, "https://github.com/rwdaigle/metrix.git", "a6738df9346da0412ca68f82a24a67d2a32b066e", [branch: "master"]}, - "snappyer": {:hex, :snappyer, "1.2.1", "06c5f5c8afe80ba38e94e1ca1bd9253de95d8f2c85b08783e8d0f63815580556", [:make, :rebar, :rebar3], [], "hexpm"}, - "supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], []}} + "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, + "retry": {:hex, :retry, "0.15.0", "ba6aaeba92905a396c18c299a07e638947b2ba781e914f803202bc1b9ae867c3", [:mix], [], "hexpm", "93d3310bce78c0a30cc94610684340a14adfc9136856a3f662e4d9ce6013c784"}, + "snappyer": {:hex, :snappyer, "1.2.1", "06c5f5c8afe80ba38e94e1ca1bd9253de95d8f2c85b08783e8d0f63815580556", [:make, :rebar, :rebar3], [], "hexpm", "e09171f1c7106d4082db88a409d5648425b3699d55319c2cd09c4bb8cd1ba8a2"}, + "supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], [], "hexpm", "e6f489d6b819df4d8f202eb00a77515a949bf87dae4d0a060f534722a63d8977"}, +} diff --git a/test/kaffe/config/consumer_test.exs b/test/kaffe/config/consumer_test.exs index d55b9f8..c5a2095 100644 --- a/test/kaffe/config/consumer_test.exs +++ b/test/kaffe/config/consumer_test.exs @@ -6,6 +6,7 @@ defmodule Kaffe.Config.ConsumerTest do consumer_config = Application.get_env(:kaffe, :consumer) |> Keyword.delete(:offset_reset_policy) + |> Keyword.delete(:ssl) |> Keyword.put(:start_with_earliest_message, true) Application.put_env(:kaffe, :consumer, consumer_config) @@ -20,7 +21,7 @@ defmodule Kaffe.Config.ConsumerTest do Application.put_env(:kaffe, :consumer, no_sasl_config) expected = %{ - endpoints: [kafka: 9092], + endpoints: [{'kafka', 9092}], subscriber_name: :"kaffe-test-group", consumer_group: "kaffe-test-group", topics: ["kaffe-test"], @@ -42,7 +43,8 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retries: 1, subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, - worker_allocation_strategy: :worker_per_partition + worker_allocation_strategy: :worker_per_partition, + client_down_retry_expire: 15_000 } assert Kaffe.Config.Consumer.configuration() == expected @@ -54,7 +56,7 @@ defmodule Kaffe.Config.ConsumerTest do Application.put_env(:kaffe, :consumer, Keyword.put(config, :endpoints, "kafka:9092,localhost:9092")) expected = %{ - endpoints: [kafka: 9092, localhost: 9092], + endpoints: [{'kafka', 9092}, {'localhost', 9092}], subscriber_name: :"kaffe-test-group", consumer_group: "kaffe-test-group", topics: ["kaffe-test"], @@ -76,7 +78,8 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retries: 1, subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, - worker_allocation_strategy: :worker_per_partition + worker_allocation_strategy: :worker_per_partition, + client_down_retry_expire: 15_000 } on_exit(fn -> @@ -95,7 +98,7 @@ defmodule Kaffe.Config.ConsumerTest do Application.put_env(:kaffe, :consumer, sasl_config) expected = %{ - endpoints: [kafka: 9092], + endpoints: [{'kafka', 9092}], subscriber_name: :"kaffe-test-group", consumer_group: "kaffe-test-group", topics: ["kaffe-test"], @@ -118,7 +121,8 @@ defmodule Kaffe.Config.ConsumerTest do subscriber_retries: 1, subscriber_retry_delay_ms: 5, offset_reset_policy: :reset_by_subscriber, - worker_allocation_strategy: :worker_per_partition + worker_allocation_strategy: :worker_per_partition, + client_down_retry_expire: 15_000 } on_exit(fn -> @@ -128,6 +132,48 @@ defmodule Kaffe.Config.ConsumerTest do assert Kaffe.Config.Consumer.configuration() == expected end + test "correct settings with ssl are extracted" do + config = Application.get_env(:kaffe, :consumer) + ssl = Keyword.get(config, :ssl) + ssl_config = Keyword.put(config, :ssl, true) + + Application.put_env(:kaffe, :consumer, ssl_config) + + expected = %{ + endpoints: [{'kafka', 9092}], + subscriber_name: :"kaffe-test-group", + consumer_group: "kaffe-test-group", + topics: ["kaffe-test"], + group_config: [ + offset_commit_policy: :commit_to_kafka_v2, + offset_commit_interval_seconds: 10 + ], + consumer_config: [ + auto_start_producers: false, + allow_topic_auto_creation: false, + begin_offset: :earliest, + ssl: true + ], + message_handler: SilentMessage, + async_message_ack: false, + rebalance_delay_ms: 100, + max_bytes: 10_000, + min_bytes: 0, + max_wait_time: 10_000, + subscriber_retries: 1, + subscriber_retry_delay_ms: 5, + offset_reset_policy: :reset_by_subscriber, + worker_allocation_strategy: :worker_per_partition, + client_down_retry_expire: 15_000 + } + + on_exit(fn -> + Application.put_env(:kaffe, :consumer, Keyword.put(config, :ssl, ssl)) + end) + + assert Kaffe.Config.Consumer.configuration() == expected + end + describe "offset_reset_policy" do test "computes correctly from start_with_earliest_message == true" do consumer_config = diff --git a/test/kaffe/config/producer_test.exs b/test/kaffe/config/producer_test.exs index d373ec1..a804658 100644 --- a/test/kaffe/config/producer_test.exs +++ b/test/kaffe/config/producer_test.exs @@ -11,7 +11,7 @@ defmodule Kaffe.Config.ProducerTest do Application.put_env(:kaffe, :producer, no_sasl_config) expected = %{ - endpoints: [kafka: 9092], + endpoints: [{'kafka', 9092}], producer_config: [ auto_start_producers: true, allow_topic_auto_creation: false, @@ -43,7 +43,7 @@ defmodule Kaffe.Config.ProducerTest do Application.put_env(:kaffe, :producer, sasl_config) expected = %{ - endpoints: [kafka: 9092], + endpoints: [{'kafka', 9092}], producer_config: [ auto_start_producers: true, allow_topic_auto_creation: false, @@ -79,7 +79,7 @@ defmodule Kaffe.Config.ProducerTest do Application.put_env(:kaffe, :producer, Keyword.put(config, :endpoints, "kafka:9092,localhost:9092")) expected = %{ - endpoints: [kafka: 9092, localhost: 9092], + endpoints: [{'kafka', 9092}, {'localhost', 9092}], producer_config: [ auto_start_producers: true, allow_topic_auto_creation: false, @@ -106,4 +106,39 @@ defmodule Kaffe.Config.ProducerTest do assert Kaffe.Config.Producer.configuration() == expected end + + test "adds ssl when true" do + config = Application.get_env(:kaffe, :producer) + ssl = Keyword.get(config, :ssl) + Application.put_env(:kaffe, :producer, Keyword.put(config, :ssl, true)) + + expected = %{ + endpoints: [{'kafka', 9092}], + producer_config: [ + auto_start_producers: true, + allow_topic_auto_creation: false, + default_producer_config: [ + required_acks: -1, + ack_timeout: 1000, + partition_buffer_limit: 512, + partition_onwire_limit: 1, + max_batch_size: 1_048_576, + max_retries: 3, + retry_backoff_ms: 500, + compression: :no_compression, + min_compression_batch_size: 1024 + ], + ssl: true + ], + topics: ["kaffe-test"], + client_name: :kaffe_producer_client, + partition_strategy: :md5 + } + + on_exit(fn -> + Application.put_env(:kaffe, :producer, Keyword.put(config, :ssl, ssl)) + end) + + assert Kaffe.Config.Producer.configuration() == expected + end end diff --git a/test/kaffe/config_test.exs b/test/kaffe/config_test.exs index cae80e3..b4c40e3 100644 --- a/test/kaffe/config_test.exs +++ b/test/kaffe/config_test.exs @@ -8,7 +8,7 @@ defmodule Kaffe.ConfigTest do "kafka+ssl://192.168.1.100:9096,kafka+ssl://192.168.1.101:9096,kafka+ssl://192.168.1.102:9096" ) - expected = [{:"192.168.1.100", 9096}, {:"192.168.1.101", 9096}, {:"192.168.1.102", 9096}] + expected = [{'192.168.1.100', 9096}, {'192.168.1.101', 9096}, {'192.168.1.102', 9096}] on_exit(fn -> System.delete_env("KAFKA_URL") @@ -19,7 +19,7 @@ defmodule Kaffe.ConfigTest do test "transforms endpoints into the correct format" do kafka_url = "kafka+ssl://192.168.1.100:9096,kafka+ssl://192.168.1.101:9096,kafka+ssl://192.168.1.102:9096" - expected = [{:"192.168.1.100", 9096}, {:"192.168.1.101", 9096}, {:"192.168.1.102", 9096}] + expected = [{'192.168.1.100', 9096}, {'192.168.1.101', 9096}, {'192.168.1.102', 9096}] assert Kaffe.Config.parse_endpoints(kafka_url) == expected end diff --git a/test/kaffe/consumer_group/subscriber/group_member_startup_test.exs b/test/kaffe/consumer_group/subscriber/group_member_startup_test.exs index db0f79b..697dbab 100644 --- a/test/kaffe/consumer_group/subscriber/group_member_startup_test.exs +++ b/test/kaffe/consumer_group/subscriber/group_member_startup_test.exs @@ -33,7 +33,7 @@ defmodule Kaffe.GroupMemberStartupTest do Application.put_env(:kaffe, :consumer, Keyword.put(consumer_config, :subscriber_name, "s2")) {:ok, _pid} = Kaffe.GroupMemberSupervisor.start_link() - :timer.sleep(Kaffe.Config.Consumer.configuration().rebalance_delay_ms + 100) + Process.sleep(Kaffe.Config.Consumer.configuration().rebalance_delay_ms + 100) assignments = Enum.reduce(0..31, %{}, fn _partition, map -> diff --git a/test/kaffe/consumer_group/subscriber/subscriber_test.exs b/test/kaffe/consumer_group/subscriber/subscriber_test.exs index 4d55522..95c1da2 100644 --- a/test/kaffe/consumer_group/subscriber/subscriber_test.exs +++ b/test/kaffe/consumer_group/subscriber/subscriber_test.exs @@ -121,11 +121,8 @@ defmodule Kaffe.SubscriberTest do Enum.map(1..10, fn n -> Subscriber.kafka_message( offset: n, - magic_byte: 0, - attributes: 0, key: "key-#{n}", - value: "#{n}", - crc: -1 + value: "#{n}" ) end) end