Skip to content

Commit

Permalink
important fixes on kafka & hornbill tests (#28)
Browse files Browse the repository at this point in the history
* add more options for kafka client

* add nemesis for hornbill

* kafka & hornbill: modify a lot of configs for consumer

- WARNING: there is false-positive for :poll-skip. Just ignore it please...
- Default to 2t * 2p
- Set consumer timeouts (rebalance.timeout.ms, session.timeout.ms, max.poll.interval.ms and poll-ms) to 3000ms. This ensures poll work actively
- Add some info about groups in :poll/:subscription history
- Set test time to 360s and final poll time to 60s
  • Loading branch information
Commelina authored Jul 16, 2024
1 parent 165af45 commit 0a81c49
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 41 deletions.
23 changes: 16 additions & 7 deletions docker/control-hornbill/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ CMD /init-ssh-control.sh && \
####################### start test #######################
lein with-profile kafka run test \
#-------------- basic options --------------#
--db hstream \
--db hornbill \
--workload queue \
--sub-via subscribe \
--time-limit 180 \
--final-time-limit 120 \
--time-limit 360 \
--final-time-limit 60 \
--key-dist uniform \
--key-count 32 \
--key-count 4 \
##<<<
# This value is enough. A larger one can make elle exhausted.
--concurrency 10 \
--rate 10 \
--rate 20 \
##>>>
##<<<
# When writes of a key exceed the limit, the generator will pick
Expand All @@ -54,6 +54,16 @@ CMD /init-ssh-control.sh && \
# It may be turned on in the future to perform more tests...
--retries 0 \
##>>>
##<<<
# We allow the producer to send messages in batch, but do not
# be too large.
--producer-linger-ms 3000 \
--batch-max-bytes 10240 \
##>>>
# auto reset offset to earliest to prevent "offset not start from 0"
##<<<
--auto-offset-reset earliest \
##>>>
#-------------- txns options --------------#
##<<<
# We do not support txns and idempotent for now.
Expand All @@ -73,15 +83,14 @@ CMD /init-ssh-control.sh && \
##<<<
# The following opts is client default (3.7.0) but we
# specify them explicitly here for clarity.
--auto-offset-reset latest \
--enable-auto-commit \
--enable-server-auto-create-topics \
--isolation-level read_uncommitted \
--acks all && \
##>>>
##<<<
# For kafka test, the following options are hard-coded,
# see workload/queue.clj and workload/list_append.clj.
# - message-prefix-bytes = 1024
# - replication-factor = 3
# - partition-count = 4
# - poll-ms = 100
Expand Down
20 changes: 14 additions & 6 deletions docker/control-kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ CMD /init-ssh.sh && \
--db hstream \
--workload queue \
--sub-via subscribe \
--time-limit 180 \
--final-time-limit 120 \
--time-limit 360 \
--final-time-limit 60 \
--key-dist uniform \
--key-count 32 \
--key-count 4 \
##<<<
# This value is enough. A larger one can make elle exhausted.
--concurrency 10 \
--rate 10 \
--rate 20 \
##>>>
##<<<
# When writes of a key exceed the limit, the generator will pick
Expand All @@ -86,6 +86,16 @@ CMD /init-ssh.sh && \
# It may be turned on in the future to perform more tests...
--retries 0 \
##>>>
##<<<
# We allow the producer to send messages in batch, but do not
# be too large.
--producer-linger-ms 3000 \
--batch-max-bytes 10240 \
##>>>
# auto reset offset to earliest to prevent "offset not start from 0"
##<<<
--auto-offset-reset earliest \
##>>>
#-------------- txns options --------------#
##<<<
# We do not support txns and idempotent for now.
Expand All @@ -105,9 +115,7 @@ CMD /init-ssh.sh && \
##<<<
# The following opts is client default (3.7.0) but we
# specify them explicitly here for clarity.
--auto-offset-reset latest \
--enable-auto-commit \
--enable-server-auto-create-topics \
--isolation-level read_uncommitted \
--acks all && \
##>>>
Expand Down
30 changes: 22 additions & 8 deletions src/jepsen/hstream/kafka/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
(java.time Duration)
(java.util Properties)
(java.util.concurrent ExecutionException)
(org.apache.kafka.clients CommonClientConfigs)
(org.apache.kafka.clients.admin Admin
AdminClientConfig
NewTopic)
Expand Down Expand Up @@ -93,11 +94,17 @@
300

ConsumerConfig/SESSION_TIMEOUT_MS_CONFIG
6000 ; Bounded by server
3000 ; Bounded by server

ConsumerConfig/CONNECTIONS_MAX_IDLE_MS_CONFIG
60000

ConsumerConfig/MAX_POLL_INTERVAL_MS_CONFIG
3000

CommonClientConfigs/REBALANCE_TIMEOUT_MS_CONFIG
3000

; ConsumerConfig/DEFAULT_ISOLATION_LEVEL
; ???
}
Expand Down Expand Up @@ -147,10 +154,13 @@
ProducerConfig/RECONNECT_BACKOFF_MAX_MS_CONFIG 1000
ProducerConfig/SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG 500
ProducerConfig/SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG 1000

;;
ProducerConfig/LINGER_MS_CONFIG 1000
}
(not= nil (:producer-linger-ms opts))
(assoc ProducerConfig/LINGER_MS_CONFIG (:producer-linger-ms opts))

(not= nil (:batch-max-bytes opts))
(assoc ProducerConfig/BATCH_SIZE_CONFIG (:batch-max-bytes opts))

(not= nil (:acks opts))
(assoc ProducerConfig/ACKS_CONFIG (:acks opts))

Expand Down Expand Up @@ -313,10 +323,14 @@

;; FIXME: Runtime configuration for extra bytes
(defn ^ProducerRecord producer-record
"Constructs a ProducerRecord from a topic, partition, key, and value."
[topic partition key value]
(let [extra-bytes-len (* 1 1024)
value-bytes (long-to-bytes value)
"Constructs a ProducerRecord from a topic, partition, key, value and
how many bytes to prefix the value with. Note the actual value is a
long, and the prefixed bytes are only used to simulate large messages.
The message will be encoded to a byte array of size (8 + extra-bytes-len).
The prefixed bytes will be just dropped when reading the message back.
"
[topic partition key value extra-bytes-len]
(let [value-bytes (long-to-bytes value)
extra-bytes (byte-array extra-bytes-len (byte \A))
value-to-write (byte-array (+ extra-bytes-len 8) (concat value-bytes extra-bytes))]
(ProducerRecord. topic (int partition) key value-to-write)))
Expand Down
103 changes: 103 additions & 0 deletions src/jepsen/hstream/kafka/db/hornbill.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
(ns jepsen.hstream.kafka.db.hornbill
(:require [clojure.tools.logging :refer [info]]
[jepsen.db :as db]
[jepsen.control :as c]
[jepsen.control.util :as cu]
[jepsen.hstream.kafka.db :as redpanda.db]
[jepsen.hstream.common.utils :refer [parse-int]]
[jepsen.hstream.legacy.nemesis :as legacy-nemesis]))

(def hornbill
"Program that launches hornbill server.
WARNING: This module refers to jepsen.hstream.legacy.nemesis,
which hardcodes the server name as 'hstream-server'."
"/usr/local/bin/hstream-server")

(def node-ips
{:n1 "172.20.0.11"
:n2 "172.20.0.12"
:n3 "172.20.0.13"
:n4 "172.20.0.14"
:n5 "172.20.0.15"
})

(defn f-hornbill-log-file
"Generate the name of hornbill log file by node."
[node]
(str "/tmp/" node ".log"))

(defn f-hornbill-pid-file
"Generate the name of hornbill pid file by node."
[node]
(str "/tmp/" node ".pid"))

(defn f-hornbill-args
"Generate the arguments for hornbill server."
[node]
[:--config-path "/etc/hstream/config.yaml"
:--bind-address "0.0.0.0"
:--port 9092
:--metrics-port 6600
:--advertised-address (node-ips (keyword node))
:--meta-servers "http://meta:8964"
:--store-config "/etc/fdb.cluster"
:--server-id (parse-int (subs node 1))
:--log-level "debug"
:--log-with-color
])

(defn db
"Hornbill for a particular version. No action is executed after the DB is ready."
[version tcpdump]
(reify
db/DB
(setup! [this test node]
(when tcpdump
(db/setup! (db/tcpdump {:ports []}) test node))
(info ">>> Setting up DB: Hornbill" version "on node" node
"But in fact we did nothing here."))
(teardown! [this test node]
(when tcpdump
(db/teardown! (db/tcpdump {:ports []}) test node))
(info ">>> Tearing down DB: Hornbill" version "on node" node
"But in fact we did nothing here."))

db/Process
;; WARNING: Starting hstream server is not idempotent now.
;; However, the test usually call [:start :all].
;; So we have to check if the server is already running.
;; FIXME: The checking function 'is-hserver-on-node-dead?' is not
;; well implemented...
;; FIXME: Remove dependency on legacy-nemesis
(start! [this test node]
(if (legacy-nemesis/is-hserver-on-node-dead? node)
(c/su
(apply (partial cu/start-daemon!
{:logfile (f-hornbill-log-file node)
:pidfile (f-hornbill-pid-file node)
:chdir "/"
:make-pidfile? true}
hornbill)
(f-hornbill-args node)))
:skipped-by-us))
(kill! [this test node]
(c/su
(cu/stop-daemon! hornbill (f-hornbill-pid-file node))))

db/Pause
(pause! [this test node]
)
(resume! [this test node]
)

db/LogFiles
(log-files [this test node]
(when tcpdump
(db/log-files (db/tcpdump {:ports []}) test node))
{})

redpanda.db/DB
(node-id [this test node]
0)
(topic-partition-state [this node topic-partition]
:not-implemented)))
7 changes: 5 additions & 2 deletions src/jepsen/hstream/kafka/workload/list_append.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
TimeoutException
UnknownTopicOrPartitionException
)))
(def message-prefix-bytes
"How many bytes should we prefix each message with?"
(* 1 1024))

(def partition-count
"How many partitions per topic?"
4)
2)

(defn k->topic
"Turns a logical key into a topic."
Expand Down Expand Up @@ -77,7 +80,7 @@
[f k nil]))

:append
(let [record (rc/producer-record topic (k->partition k) nil v)
(let [record (rc/producer-record topic (k->partition k) nil v message-prefix-bytes)
res @(.send producer record)]
mop))))

Expand Down
39 changes: 28 additions & 11 deletions src/jepsen/hstream/kafka/workload/queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,21 @@
UnknownServerException
)))

(def message-prefix-bytes
"How many bytes should we prefix each message with?"
(* 1 1024))

(def partition-count
"How many partitions per topic?"
4)
2)

(def replication-factor
"What replication factor should we use for each topic?"
3)

(def poll-ms
"How long should we poll for, in ms?"
100)
3000)

(defn k->topic
"Turns a logical key into a topic."
Expand Down Expand Up @@ -249,7 +253,7 @@
(swap! extant-topics conj topic))
; Send message to Redpanda
partition (k->partition k)
record (rc/producer-record topic (k->partition k) nil v)
record (rc/producer-record topic (k->partition k) nil v message-prefix-bytes)
res ^RecordMetadata (-> producer
(.send record)
(deref 10000 nil)
Expand Down Expand Up @@ -476,7 +480,10 @@

(catch UnknownServerException e#
(assoc ~op :type :info, :error [:unknown-server-exception
(.getMessage e#)]))
(str (.getMessage e#)
(.getCause e#)
(doall (map #(.toString %) (.getStackTrace e#))))
]))

(catch TimeoutException _#
(assoc ~op :type :info, :error :kafka-timeout))
Expand Down Expand Up @@ -648,7 +655,10 @@
topics
(rc/logging-rebalance-listener rebalance-log))
(rc/subscribe! consumer topics))
(assoc op :type :ok))

(let [subs (doall (seq (.subscription consumer)))
subs (into [] subs)]
(assoc op :type :ok :subs subs)))

; Apply poll/send transactions
(:poll, :send, :txn)
Expand All @@ -673,10 +683,17 @@
; https://stackoverflow.com/questions/45195010/meaning-of-sendoffsetstotransaction-in-kafka-0-11,
; commitSync is more intended for non-transactional
; workflows.
(when (and (#{:poll :txn} (:f op))
(not (:txn test))
(:subscribe (:sub-via test)))
(try (.commitSync consumer)
(if (and (#{:poll :txn} (:f op))
(not (:txn test))
(:subscribe (:sub-via test)))
;; FIXME: hardcoded timeout for `commitSync` and `.position`
(try (.commitSync consumer (java.time.Duration/ofMillis 2000))
(let [tps (doall (seq (.assignment consumer)))
poss (doall (map #(.position consumer % (java.time.Duration/ofMillis 1000)) tps))
tups (doall (map #(vector (.toString %1) %2) tps poss))
meta (.toString (.groupMetadata consumer))
]
(assoc op :type :ok :cur-offsets tups :group-meta meta))
; If we crash during commitSync *outside* a
; transaction, it might be that we poll()ed some
; values in this txn which Kafka will think we
Expand All @@ -685,8 +702,8 @@
; the reads, but note the lack of commit.
(catch RuntimeException e
(assoc op :type :ok
:error [:consumer-commit (.getMessage e)]))))
(assoc op :type :ok)))))))))
:error [:consumer-commit (.getMessage e)])))
(assoc op :type :ok))))))))))

(teardown! [this test])

Expand Down
Loading

0 comments on commit 0a81c49

Please sign in to comment.