Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cooperative stickies w/ metrics #43

Open
wants to merge 12 commits into
base: qox-stats-callback
Choose a base branch
from
12 changes: 12 additions & 0 deletions nix/sources.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
{
"hw-kafka-client": {
"branch": "main",
"description": "Kafka client for Haskell, including auto-rebalancing consumers",
"homepage": null,
"owner": "NoRedInk",
"repo": "hw-kafka-client",
"rev": "afb77994286f9c4876f562fb0a8c7b098b56248f",
"sha256": "1ygmvw508n7dc6is9yzz8yc1k8nhz66f6snagvb7sjijfsym31lw",
"type": "tarball",
"url": "https://github.com/NoRedInk/hw-kafka-client/archive/afb77994286f9c4876f562fb0a8c7b098b56248f.tar.gz",
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
},
"niv": {
"branch": "master",
"description": "Easy dependency management for Nix projects",
Expand Down
4 changes: 2 additions & 2 deletions nri-kafka/nri-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ library
, bytestring >=0.10.8.2 && <0.12
, conduit >=1.3.0 && <1.4
, containers >=0.6.0.1 && <0.7
, hw-kafka-client >=4.0.3 && <5.0
, hw-kafka-client >=5.0.0 && <6.0
, nri-env-parser >=0.1.0.0 && <0.2
, nri-observability >=0.1.1.1 && <0.2
, nri-prelude >=0.1.0.0 && <0.7
Expand Down Expand Up @@ -135,7 +135,7 @@ test-suite tests
, bytestring >=0.10.8.2 && <0.12
, conduit >=1.3.0 && <1.4
, containers >=0.6.0.1 && <0.7
, hw-kafka-client >=4.0.3 && <5.0
, hw-kafka-client >=5.0.0 && <6.0
, nri-env-parser >=0.1.0.0 && <0.2
, nri-observability >=0.1.1.1 && <0.2
, nri-prelude >=0.1.0.0 && <0.7
Expand Down
2 changes: 1 addition & 1 deletion nri-kafka/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies:
- bytestring >= 0.10.8.2 && < 0.12
- conduit >= 1.3.0 && < 1.4
- containers >= 0.6.0.1 && < 0.7
- hw-kafka-client >=4.0.3 && < 5.0
- hw-kafka-client >=5.0.0 && < 6.0
- nri-env-parser >= 0.1.0.0 && < 0.2
- nri-observability >= 0.1.1.1 && < 0.2
- nri-prelude >= 0.1.0.0 && < 0.7
Expand Down
19 changes: 10 additions & 9 deletions nri-kafka/src/Kafka.hs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ record msg = do
|> ByteString.Lazy.toStrict
)
(Internal.payload msg)
, Producer.prHeaders = Prelude.mempty
}

-- | The topic of a message. This function might sometimes be useful in tests.
Expand All @@ -159,32 +160,32 @@ key msg = Maybe.map Internal.unKey (Internal.key msg)
handler :: Settings.Settings -> Maybe Stats.StatsCallback -> Conduit.Acquire Internal.Handler
handler settings maybeStatsCallback = do
producer <- Conduit.mkAcquire (mkProducer settings maybeStatsCallback) Producer.closeProducer
_ <- Conduit.mkAcquire (startPollEventLoop producer) (\terminator -> STM.atomically (TMVar.putTMVar terminator Terminate))
_ <- Conduit.mkAcquire (startFlushLoop producer) (\terminator -> STM.atomically (TMVar.putTMVar terminator Terminate))
liftIO (mkHandler settings producer)

data Terminate = Terminate

-- | By default events only get polled right before sending a record to kafka.
-- This means that the deliveryCallback only gets fired on the next call to produceMessage'.
-- We want to be informed about delivery status as soon as possible though.
startPollEventLoop :: Producer.KafkaProducer -> Prelude.IO (TMVar.TMVar b)
startPollEventLoop producer = do
-- The only way to do that right now in hw-kafka-client is by flushing the queue
startFlushLoop :: Producer.KafkaProducer -> Prelude.IO (TMVar.TMVar b)
startFlushLoop producer = do
terminator <- STM.atomically TMVar.newEmptyTMVar
_ <-
Async.race_
(pollEvents producer)
(flushProducer producer)
(STM.atomically <| TMVar.readTMVar terminator)
|> Async.async
Prelude.pure terminator

-- | We use a little trick here to poll events, by sending an empty message batch.
-- This will call the internal pollEvent function in hw-kafka-client.
pollEvents :: Producer.KafkaProducer -> Prelude.IO ()
pollEvents producer = do
Producer.produceMessageBatch producer []
|> map (\_ -> ())
flushProducer :: Producer.KafkaProducer -> Prelude.IO ()
flushProducer producer = do
Producer.flushProducer producer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've used flushProducer before in order to use sync sending messages. I think it's also a blocking function and I think to remember that it also slowed down sending immensely. ℹ️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's fixed though.

Control.Concurrent.threadDelay 100_000 {- 100ms -}
pollEvents producer
flushProducer producer

-- |
mkHandler :: Settings.Settings -> Producer.KafkaProducer -> Prelude.IO Internal.Handler
Expand Down
23 changes: 22 additions & 1 deletion nri-kafka/src/Kafka/Settings.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Kafka.Settings
where

import qualified Environment
import qualified Kafka.Consumer.AssignmentStrategy as AssignmentStrategy
import qualified Kafka.Producer
import qualified Kafka.Settings.Internal as Internal
import qualified Prelude
Expand All @@ -27,7 +28,11 @@ data Settings = Settings
-- | librdkafka statistics emit interval. The application also needs to
-- register a stats callback using rd_kafka_conf_set_stats_cb(). The
-- granularity is 1000ms. A value of 0 disables statistics.
statisticsIntervalMs :: StatisticsIntervalMs
statisticsIntervalMs :: StatisticsIntervalMs,
-- | partition assignment strategy for workers. one of
-- RangeAssignor: https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html
-- CooperativeStickyAssignor: https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html
partitionAssignmentStrategy :: AssignmentStrategy.ConsumerAssignmentStrategy
}

-- | Number of messages to batch together before sending to Kafka.
Expand All @@ -52,6 +57,7 @@ decoder =
|> andMap decoderDeliveryTimeout
|> andMap decoderBatchNumMessages
|> andMap decoderStatisticsIntervalMs
|> andMap decoderPartitionAssignmentStrategy

decoderDeliveryTimeout :: Environment.Decoder Kafka.Producer.Timeout
decoderDeliveryTimeout =
Expand Down Expand Up @@ -82,3 +88,18 @@ decoderStatisticsIntervalMs =
Environment.defaultValue = "0"
}
(map StatisticsIntervalMs Environment.int)

decoderPartitionAssignmentStrategy :: Environment.Decoder AssignmentStrategy.ConsumerAssignmentStrategy
decoderPartitionAssignmentStrategy =
Environment.variable
Environment.Variable
{ Environment.name = "KAFKA_PARTITION_ASSIGNMENT_STRATEGY",
Environment.description = "sets the kafka partition assignemnt strategy. one of: {cooperative-sticky,range-assignor}",
Environment.defaultValue = "cooperative-sticky"
}
( Environment.custom Environment.text
<| \str -> case str of
"cooperative-sticky" -> Ok AssignmentStrategy.CooperativeStickyAssignor
"range-assignor" -> Ok AssignmentStrategy.RangeAssignor
invalidValue -> Err ("Invalid value: " ++ invalidValue)
)
2 changes: 2 additions & 0 deletions nri-kafka/src/Kafka/Worker/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import qualified Data.UUID.V4
import qualified Dict
import qualified GHC.Clock
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Consumer.AssignmentStrategy as AssignmentStrategy
import qualified Kafka.Internal as Kafka
import qualified Kafka.Metadata
import qualified Kafka.Stats as Stats
Expand Down Expand Up @@ -269,6 +270,7 @@ createConsumer
++ Consumer.logLevel logLevel
++ Consumer.setCallback (Consumer.rebalanceCallback rebalance)
++ Consumer.compression Consumer.Snappy
++ Consumer.setAssignmentStrategy [AssignmentStrategy.CooperativeStickyAssignor]
++ Consumer.extraProps
( Dict.fromList
[ ("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs)),
Expand Down
1 change: 1 addition & 0 deletions nri-kafka/test/Helpers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ record topicName partitionId val =
{ Producer.prTopic = Producer.TopicName (Internal.unTopic topicName),
Producer.prPartition = Producer.SpecifiedPartition (Prelude.fromIntegral partitionId),
Producer.prKey = Nothing,
Producer.prHeaders = Prelude.mempty,
Producer.prValue =
Internal.MsgWithMetaData
{ Internal.metaData =
Expand Down
17 changes: 12 additions & 5 deletions run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@ pg_ctl start -o '-k .'
mkdir -p ./_build/redis/data
redis-server --daemonize yes --dir ./_build/redis/data

## start zookeeper (for kafka)
## start zookeeper (for kafka)
zk_server_properties_path=$(dirname "$(which zkServer.sh)")/../conf/zoo_sample.cfg
mkdir -p /tmp/zookeeper /tmp/zookeeper-logs
ZOOPIDFILE=/tmp/zookeeper-logs/pid ZOO_LOG_DIR=/tmp/zookeeper-logs zkServer.sh stop zoo_sample.cfg
ZOOPIDFILE=/tmp/zookeeper-logs/pid \
ZOO_LOG_DIR=/tmp/zookeeper-logs \
zkServer.sh stop "$zk_server_properties_path"
rm -rf /tmp/zookeeper/* /tmp/zookeeper-logs/*
ZOOPIDFILE=/tmp/zookeeper-logs/pid ZOO_LOG_DIR=/tmp/zookeeper-logs zkServer.sh start zoo_sample.cfg
ZOOPIDFILE=/tmp/zookeeper-logs/pid \
ZOO_LOG_DIR=/tmp/zookeeper-logs \
zkServer.sh start "$zk_server_properties_path"

## wait for zookeeper
echo "waiting for zookeeper to start"
until nc -vz localhost 2181
do
sleep 1
done
done
echo "zookeeper available"

## start kafka
Expand All @@ -50,4 +55,6 @@ cabal test all

# cleanup
kafka-server-stop.sh
ZOOPIDFILE=/tmp/zookeeper-logs/pid ZOO_LOG_DIR=/tmp/zookeeper-logs zkServer.sh stop zoo_sample.cfg
ZOOPIDFILE=/tmp/zookeeper-logs/pid \
ZOO_LOG_DIR=/tmp/zookeeper-logs \
zkServer.sh stop "$zk_server_properties_path"
2 changes: 2 additions & 0 deletions shell-ghc-8-10.nix
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ in import nix/mk-shell.nix {
safe-coloured-text-terminfo =
super.callCabal2nix "safe-coloured-text-terminfo"
"${sources.safe-coloured-text}/safe-coloured-text-terminfo" { };
hw-kafka-client = pkgs.haskell.lib.dontCheck
(super.callCabal2nix "hw-kafka-client" sources.hw-kafka-client { });
});
}