diff --git a/docker/control-hornbill/Dockerfile b/docker/control-hornbill/Dockerfile index 681c817..be90066 100644 --- a/docker/control-hornbill/Dockerfile +++ b/docker/control-hornbill/Dockerfile @@ -24,17 +24,17 @@ CMD /init-ssh-control.sh && \ ####################### start test ####################### lein with-profile kafka run test \ #-------------- basic options --------------# - --db hstream \ + --db hornbill \ --workload queue \ --sub-via subscribe \ - --time-limit 180 \ - --final-time-limit 120 \ + --time-limit 360 \ + --final-time-limit 60 \ --key-dist uniform \ - --key-count 32 \ + --key-count 4 \ ##<<< # This value is enough. A larger one can make elle exhausted. --concurrency 10 \ - --rate 10 \ + --rate 20 \ ##>>> ##<<< # When writes of a key exceed the limit, the generator will pick @@ -54,6 +54,16 @@ CMD /init-ssh-control.sh && \ # It may be turned on in the future to perform more tests... --retries 0 \ ##>>> + ##<<< + # We allow the producer to send messages in batch, but do not + # be too large. + --producer-linger-ms 3000 \ + --batch-max-bytes 10240 \ + ##>>> + # auto reset offset to earliest to prevent "offset not start from 0" + ##<<< + --auto-offset-reset earliest \ + ##>>> #-------------- txns options --------------# ##<<< # We do not support txns and idempotent for now. @@ -73,15 +83,14 @@ CMD /init-ssh-control.sh && \ ##<<< # The following opts is client default (3.7.0) but we # specify them explicitly here for clarity. - --auto-offset-reset latest \ --enable-auto-commit \ - --enable-server-auto-create-topics \ --isolation-level read_uncommitted \ --acks all && \ ##>>> ##<<< # For kafka test, the following options are hard-coded, # see workload/queue.clj and workload/list_append.clj. + # - message-prefix-bytes = 1024 # - replication-factor = 3 # - partition-count = 4 # - poll-ms = 100 diff --git a/docker/control-kafka/Dockerfile b/docker/control-kafka/Dockerfile index 95b9352..7444caf 100644 --- a/docker/control-kafka/Dockerfile +++ b/docker/control-kafka/Dockerfile @@ -59,14 +59,14 @@ CMD /init-ssh.sh && \ --db hstream \ --workload queue \ --sub-via subscribe \ - --time-limit 180 \ - --final-time-limit 120 \ + --time-limit 360 \ + --final-time-limit 60 \ --key-dist uniform \ - --key-count 32 \ + --key-count 4 \ ##<<< # This value is enough. A larger one can make elle exhausted. --concurrency 10 \ - --rate 10 \ + --rate 20 \ ##>>> ##<<< # When writes of a key exceed the limit, the generator will pick @@ -86,6 +86,16 @@ CMD /init-ssh.sh && \ # It may be turned on in the future to perform more tests... --retries 0 \ ##>>> + ##<<< + # We allow the producer to send messages in batch, but do not + # be too large. + --producer-linger-ms 3000 \ + --batch-max-bytes 10240 \ + ##>>> + # auto reset offset to earliest to prevent "offset not start from 0" + ##<<< + --auto-offset-reset earliest \ + ##>>> #-------------- txns options --------------# ##<<< # We do not support txns and idempotent for now. @@ -105,9 +115,7 @@ CMD /init-ssh.sh && \ ##<<< # The following opts is client default (3.7.0) but we # specify them explicitly here for clarity. - --auto-offset-reset latest \ --enable-auto-commit \ - --enable-server-auto-create-topics \ --isolation-level read_uncommitted \ --acks all && \ ##>>> diff --git a/src/jepsen/hstream/kafka/client.clj b/src/jepsen/hstream/kafka/client.clj index 56d5ef9..fe0d4f9 100644 --- a/src/jepsen/hstream/kafka/client.clj +++ b/src/jepsen/hstream/kafka/client.clj @@ -10,6 +10,7 @@ (java.time Duration) (java.util Properties) (java.util.concurrent ExecutionException) + (org.apache.kafka.clients CommonClientConfigs) (org.apache.kafka.clients.admin Admin AdminClientConfig NewTopic) @@ -93,11 +94,17 @@ 300 ConsumerConfig/SESSION_TIMEOUT_MS_CONFIG - 6000 ; Bounded by server + 3000 ; Bounded by server ConsumerConfig/CONNECTIONS_MAX_IDLE_MS_CONFIG 60000 + ConsumerConfig/MAX_POLL_INTERVAL_MS_CONFIG + 3000 + + CommonClientConfigs/REBALANCE_TIMEOUT_MS_CONFIG + 3000 + ; ConsumerConfig/DEFAULT_ISOLATION_LEVEL ; ??? } @@ -147,10 +154,13 @@ ProducerConfig/RECONNECT_BACKOFF_MAX_MS_CONFIG 1000 ProducerConfig/SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG 500 ProducerConfig/SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG 1000 - - ;; - ProducerConfig/LINGER_MS_CONFIG 1000 } + (not= nil (:producer-linger-ms opts)) + (assoc ProducerConfig/LINGER_MS_CONFIG (:producer-linger-ms opts)) + + (not= nil (:batch-max-bytes opts)) + (assoc ProducerConfig/BATCH_SIZE_CONFIG (:batch-max-bytes opts)) + (not= nil (:acks opts)) (assoc ProducerConfig/ACKS_CONFIG (:acks opts)) @@ -313,10 +323,14 @@ ;; FIXME: Runtime configuration for extra bytes (defn ^ProducerRecord producer-record - "Constructs a ProducerRecord from a topic, partition, key, and value." - [topic partition key value] - (let [extra-bytes-len (* 1 1024) - value-bytes (long-to-bytes value) + "Constructs a ProducerRecord from a topic, partition, key, value and + how many bytes to prefix the value with. Note the actual value is a + long, and the prefixed bytes are only used to simulate large messages. + The message will be encoded to a byte array of size (8 + extra-bytes-len). + The prefixed bytes will be just dropped when reading the message back. + " + [topic partition key value extra-bytes-len] + (let [value-bytes (long-to-bytes value) extra-bytes (byte-array extra-bytes-len (byte \A)) value-to-write (byte-array (+ extra-bytes-len 8) (concat value-bytes extra-bytes))] (ProducerRecord. topic (int partition) key value-to-write))) diff --git a/src/jepsen/hstream/kafka/db/hornbill.clj b/src/jepsen/hstream/kafka/db/hornbill.clj new file mode 100644 index 0000000..6bdd77f --- /dev/null +++ b/src/jepsen/hstream/kafka/db/hornbill.clj @@ -0,0 +1,103 @@ +(ns jepsen.hstream.kafka.db.hornbill + (:require [clojure.tools.logging :refer [info]] + [jepsen.db :as db] + [jepsen.control :as c] + [jepsen.control.util :as cu] + [jepsen.hstream.kafka.db :as redpanda.db] + [jepsen.hstream.common.utils :refer [parse-int]] + [jepsen.hstream.legacy.nemesis :as legacy-nemesis])) + +(def hornbill + "Program that launches hornbill server. + WARNING: This module refers to jepsen.hstream.legacy.nemesis, + which hardcodes the server name as 'hstream-server'." + "/usr/local/bin/hstream-server") + +(def node-ips + {:n1 "172.20.0.11" + :n2 "172.20.0.12" + :n3 "172.20.0.13" + :n4 "172.20.0.14" + :n5 "172.20.0.15" + }) + +(defn f-hornbill-log-file + "Generate the name of hornbill log file by node." + [node] + (str "/tmp/" node ".log")) + +(defn f-hornbill-pid-file + "Generate the name of hornbill pid file by node." + [node] + (str "/tmp/" node ".pid")) + +(defn f-hornbill-args + "Generate the arguments for hornbill server." + [node] + [:--config-path "/etc/hstream/config.yaml" + :--bind-address "0.0.0.0" + :--port 9092 + :--metrics-port 6600 + :--advertised-address (node-ips (keyword node)) + :--meta-servers "http://meta:8964" + :--store-config "/etc/fdb.cluster" + :--server-id (parse-int (subs node 1)) + :--log-level "debug" + :--log-with-color + ]) + +(defn db + "Hornbill for a particular version. No action is executed after the DB is ready." + [version tcpdump] + (reify + db/DB + (setup! [this test node] + (when tcpdump + (db/setup! (db/tcpdump {:ports []}) test node)) + (info ">>> Setting up DB: Hornbill" version "on node" node + "But in fact we did nothing here.")) + (teardown! [this test node] + (when tcpdump + (db/teardown! (db/tcpdump {:ports []}) test node)) + (info ">>> Tearing down DB: Hornbill" version "on node" node + "But in fact we did nothing here.")) + + db/Process + ;; WARNING: Starting hstream server is not idempotent now. + ;; However, the test usually call [:start :all]. + ;; So we have to check if the server is already running. + ;; FIXME: The checking function 'is-hserver-on-node-dead?' is not + ;; well implemented... + ;; FIXME: Remove dependency on legacy-nemesis + (start! [this test node] + (if (legacy-nemesis/is-hserver-on-node-dead? node) + (c/su + (apply (partial cu/start-daemon! + {:logfile (f-hornbill-log-file node) + :pidfile (f-hornbill-pid-file node) + :chdir "/" + :make-pidfile? true} + hornbill) + (f-hornbill-args node))) + :skipped-by-us)) + (kill! [this test node] + (c/su + (cu/stop-daemon! hornbill (f-hornbill-pid-file node)))) + + db/Pause + (pause! [this test node] + ) + (resume! [this test node] + ) + + db/LogFiles + (log-files [this test node] + (when tcpdump + (db/log-files (db/tcpdump {:ports []}) test node)) + {}) + + redpanda.db/DB + (node-id [this test node] + 0) + (topic-partition-state [this node topic-partition] + :not-implemented))) diff --git a/src/jepsen/hstream/kafka/workload/list_append.clj b/src/jepsen/hstream/kafka/workload/list_append.clj index 5c8fade..d542d5d 100644 --- a/src/jepsen/hstream/kafka/workload/list_append.clj +++ b/src/jepsen/hstream/kafka/workload/list_append.clj @@ -16,10 +16,13 @@ TimeoutException UnknownTopicOrPartitionException ))) +(def message-prefix-bytes + "How many bytes should we prefix each message with?" + (* 1 1024)) (def partition-count "How many partitions per topic?" - 4) + 2) (defn k->topic "Turns a logical key into a topic." @@ -77,7 +80,7 @@ [f k nil])) :append - (let [record (rc/producer-record topic (k->partition k) nil v) + (let [record (rc/producer-record topic (k->partition k) nil v message-prefix-bytes) res @(.send producer record)] mop)))) diff --git a/src/jepsen/hstream/kafka/workload/queue.clj b/src/jepsen/hstream/kafka/workload/queue.clj index f4aa7d5..05cce27 100644 --- a/src/jepsen/hstream/kafka/workload/queue.clj +++ b/src/jepsen/hstream/kafka/workload/queue.clj @@ -173,9 +173,13 @@ UnknownServerException ))) +(def message-prefix-bytes + "How many bytes should we prefix each message with?" + (* 1 1024)) + (def partition-count "How many partitions per topic?" - 4) + 2) (def replication-factor "What replication factor should we use for each topic?" @@ -183,7 +187,7 @@ (def poll-ms "How long should we poll for, in ms?" - 100) + 3000) (defn k->topic "Turns a logical key into a topic." @@ -249,7 +253,7 @@ (swap! extant-topics conj topic)) ; Send message to Redpanda partition (k->partition k) - record (rc/producer-record topic (k->partition k) nil v) + record (rc/producer-record topic (k->partition k) nil v message-prefix-bytes) res ^RecordMetadata (-> producer (.send record) (deref 10000 nil) @@ -476,7 +480,10 @@ (catch UnknownServerException e# (assoc ~op :type :info, :error [:unknown-server-exception - (.getMessage e#)])) + (str (.getMessage e#) + (.getCause e#) + (doall (map #(.toString %) (.getStackTrace e#)))) + ])) (catch TimeoutException _# (assoc ~op :type :info, :error :kafka-timeout)) @@ -648,7 +655,10 @@ topics (rc/logging-rebalance-listener rebalance-log)) (rc/subscribe! consumer topics)) - (assoc op :type :ok)) + + (let [subs (doall (seq (.subscription consumer))) + subs (into [] subs)] + (assoc op :type :ok :subs subs))) ; Apply poll/send transactions (:poll, :send, :txn) @@ -673,10 +683,17 @@ ; https://stackoverflow.com/questions/45195010/meaning-of-sendoffsetstotransaction-in-kafka-0-11, ; commitSync is more intended for non-transactional ; workflows. - (when (and (#{:poll :txn} (:f op)) - (not (:txn test)) - (:subscribe (:sub-via test))) - (try (.commitSync consumer) + (if (and (#{:poll :txn} (:f op)) + (not (:txn test)) + (:subscribe (:sub-via test))) + ;; FIXME: hardcoded timeout for `commitSync` and `.position` + (try (.commitSync consumer (java.time.Duration/ofMillis 2000)) + (let [tps (doall (seq (.assignment consumer))) + poss (doall (map #(.position consumer % (java.time.Duration/ofMillis 1000)) tps)) + tups (doall (map #(vector (.toString %1) %2) tps poss)) + meta (.toString (.groupMetadata consumer)) + ] + (assoc op :type :ok :cur-offsets tups :group-meta meta)) ; If we crash during commitSync *outside* a ; transaction, it might be that we poll()ed some ; values in this txn which Kafka will think we @@ -685,8 +702,8 @@ ; the reads, but note the lack of commit. (catch RuntimeException e (assoc op :type :ok - :error [:consumer-commit (.getMessage e)])))) - (assoc op :type :ok))))))))) + :error [:consumer-commit (.getMessage e)]))) + (assoc op :type :ok)))))))))) (teardown! [this test]) diff --git a/src/jepsen/hstream/kafka_test.clj b/src/jepsen/hstream/kafka_test.clj index 0f48ee3..3ac86ac 100644 --- a/src/jepsen/hstream/kafka_test.clj +++ b/src/jepsen/hstream/kafka_test.clj @@ -11,11 +11,11 @@ [jepsen.hstream.kafka [nemesis :as nemesis]] [jepsen.hstream.kafka.db [kafka :as db.kafka] [hstream :as db.hstream] - [redpanda :as db.redpanda]] + [redpanda :as db.redpanda] + [hornbill :as hornbill]] [jepsen.hstream.kafka.workload [list-append :as list-append] [queue :as queue]] - [slingshot.slingshot :refer [try+ throw+]] - [jepsen.hstream.kafka.db.hstream :as db.hstream]) + [slingshot.slingshot :refer [try+ throw+]]) (:import (org.apache.http.impl.client InternalHttpClient))) (def workloads @@ -174,7 +174,8 @@ (str (case (:db opts) :kafka "kafka" :redpanda (str "redpanda " (short-version opts)) - :hstream "kafka") + :hstream "kafka" + :hornbill "hornbill") " " (name (:workload opts)) (when (:txn opts) " txn") " " @@ -201,7 +202,8 @@ db (case (:db opts) :redpanda (db.redpanda/db) :kafka (db.kafka/db) - :hstream (db.hstream/db "0.19.0" (:tcpdump opts))) + :hstream (db.hstream/db "0.19.0" (:tcpdump opts)) + :hornbill (hornbill/db "1.0.0-M1" (:tcpdump opts))) nemesis (nemesis/package {:db db :nodes (:nodes opts) @@ -260,6 +262,16 @@ [[nil "--acks ACKS" "What level of acknowledgement should our producers use? Default is unset (uses client default); try 1 or 'all'." :default nil] + [nil "--producer-linger-ms INT" "How long should producers wait before sending a batch? Note there is also a --batch-max-bytes option." + :default nil + :parse-fn parse-long + :validate validate-non-neg] + + [nil "--batch-max-bytes INT" "Max bytes of a batch to produce. Note there is also a --producer-linger-ms option." + :default nil + :parse-fn parse-long + :validate validate-non-neg] + [nil "--auto-offset-reset BEHAVIOR" "How should consumers handle it when there's no initial offset in Kafka?" :default nil] @@ -271,10 +283,10 @@ :parse-fn read-string :validate [#(and (number? %) (pos? %)) "must be a positive number"]] - [nil "--db TYPE" "Which DB do we test? Either `hstream` (default), `redpanda` or `kafka`" + [nil "--db TYPE" "Which DB do we test? Either `hstream` (default), `redpanda`, `kafka` or `hornbill`." :default :hstream :parse-fn keyword - :validate [#(some #{%} '(:hstream :kafka :redpanda)) "Must be one of hstream, kafka or redpanda"]] + :validate [#(some #{%} '(:hstream :kafka :redpanda :hornbill)) "Must be one of hstream, kafka, hornbill or redpanda"]] [nil "--db-targets TARGETS" "A comma-separated list of nodes to pause/kill/etc; e.g. one,all" ;:default [:primaries :all]