diff --git a/docker/docker-compose-hornbill.yml b/docker/docker-compose-hornbill.yml index db20a48..c9d733a 100644 --- a/docker/docker-compose-hornbill.yml +++ b/docker/docker-compose-hornbill.yml @@ -101,8 +101,8 @@ services: jepsen-hornbill-network: ipv4_address: 172.20.0.8 volumes: + - "../:/home/Work" - "jepsen-hornbill-shared:/var/jepsen/shared" - - "/home/commelina/.m2/:/root/.m2" hserver-1: << : *hserver-node diff --git a/docker/docker-compose-kafka.yml b/docker/docker-compose-kafka.yml index 7441ca4..930586b 100644 --- a/docker/docker-compose-kafka.yml +++ b/docker/docker-compose-kafka.yml @@ -134,7 +134,6 @@ services: volumes: - "../:/home/Work" - "jepsen-kafka-shared:/var/jepsen/shared" - - "/home/commelina/.m2/:/root/.m2" hstore-1: << : *logdevice-node diff --git a/scripts/build_hornbill.sh b/scripts/build_hornbill.sh index 0c63d1f..45b47ad 100755 --- a/scripts/build_hornbill.sh +++ b/scripts/build_hornbill.sh @@ -6,5 +6,4 @@ docker compose --file ./docker/docker-compose-hornbill.yml \ --build-arg USE_CHINA_MIRROR=false \ --build-arg arg_http_proxy="" \ --build-arg arg_https_proxy="" \ - --build-arg BASE_IMAGE=jepsen-hornbill:base \ - --build-arg HORNBILL_IMAGE=hstreamdb/hornbill:latest + --build-arg BASE_IMAGE=jepsen-hornbill:base diff --git a/scripts/build_hornbill_base.sh b/scripts/build_hornbill_base.sh index 9a1d854..594fe65 100755 --- a/scripts/build_hornbill_base.sh +++ b/scripts/build_hornbill_base.sh @@ -1,5 +1,5 @@ #!/usr/bin/bash docker build -t jepsen-hornbill:base \ --build-arg "USE_CHINA_MIRROR=false" \ - --build-arg "HORNBILL_IMAGE=hornbill:dev" \ + --build-arg "HORNBILL_IMAGE=ghcr.io/hstreamdb/hornbill:v1.0.0-m0" \ ./docker/base-hornbill diff --git a/src/jepsen/hstream/kafka/client.clj b/src/jepsen/hstream/kafka/client.clj index 382ae3f..56d5ef9 100644 --- a/src/jepsen/hstream/kafka/client.clj +++ b/src/jepsen/hstream/kafka/client.clj @@ -6,7 +6,8 @@ map-vals pprint-str]] [slingshot.slingshot :refer [try+ throw+]]) - (:import (java.time Duration) + (:import (java.nio ByteBuffer) + (java.time Duration) (java.util Properties) (java.util.concurrent ExecutionException) (org.apache.kafka.clients.admin Admin @@ -67,7 +68,8 @@ "org.apache.kafka.common.serialization.LongDeserializer" ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG - "org.apache.kafka.common.serialization.LongDeserializer" + ;"org.apache.kafka.common.serialization.LongDeserializer" + "org.apache.kafka.common.serialization.ByteArrayDeserializer" ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG (str node ":" port) @@ -128,7 +130,8 @@ ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG ;"org.apache.kafka.common.serialization.StringSerializer" - "org.apache.kafka.common.serialization.LongSerializer" + ;"org.apache.kafka.common.serialization.LongSerializer" + "org.apache.kafka.common.serialization.ByteArraySerializer" ProducerConfig/DELIVERY_TIMEOUT_MS_CONFIG 10000 ; We choose this lower than DELIVERY_TIMEOUT_MS so that we have a @@ -145,6 +148,8 @@ ProducerConfig/SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG 500 ProducerConfig/SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG 1000 + ;; + ProducerConfig/LINGER_MS_CONFIG 1000 } (not= nil (:acks opts)) (assoc ProducerConfig/ACKS_CONFIG (:acks opts)) @@ -253,10 +258,68 @@ [topic partition] (TopicPartition. topic partition)) +;;;;;; Helpers for adding/removing extra bytes ;;;;;; +(defn long-to-bytes [n] + (let [buffer (ByteBuffer/allocate (Long/BYTES))] + (.putLong buffer n) + (.array buffer))) + +(defn bytes-to-long [byte-array] + (let [buffer (ByteBuffer/wrap byte-array)] + (.getLong buffer))) + +;; FIXME: shit +(defn modify-list-elements [java-list f] + (if (nil? java-list) + (java.util.ArrayList.) + (if (= 0 (.size java-list)) + java-list + (let [new-list (java.util.ArrayList. (.size java-list))] + (dotimes [i (.size java-list)] + (let [element (.get java-list i) + new-element (f element)] + (.add new-list i new-element))) + new-list)))) + +;; remove extra bytes from a 'ConsumerRecord' +(defn extract-value-from-consumer-record + [consumer-record] + (let [topic (.topic consumer-record) + partition (.partition consumer-record) + offset (.offset consumer-record) + timestamp (.timestamp consumer-record) + timestampType (.timestampType consumer-record) + serializedKeySize (.serializedKeySize consumer-record) + serializedValueSize (.serializedValueSize consumer-record) + key (.key consumer-record) + value (.value consumer-record) + headers (.headers consumer-record) + leaderEpoch (.leaderEpoch consumer-record) + actual-value (bytes-to-long (byte-array (take 8 value)))] + (ConsumerRecord. topic partition offset timestamp timestampType serializedKeySize serializedValueSize key actual-value headers leaderEpoch))) + +;; remove extra bytes from a 'ConsumerRecords' +(defn recover-consumer-records + [consumer-records] + (let [new-map (java.util.HashMap.) + tp-set (.partitions consumer-records) + _ (doseq [tp (seq tp-set)] + (let [lst (.records consumer-records tp) + lst (modify-list-elements lst + extract-value-from-consumer-record)] + (.put new-map tp lst)))] + (ConsumerRecords. new-map))) +;;; + +;; FIXME: Runtime configuration for extra bytes (defn ^ProducerRecord producer-record "Constructs a ProducerRecord from a topic, partition, key, and value." [topic partition key value] - (ProducerRecord. topic (int partition) key value)) + (let [extra-bytes-len (* 1 1024) + value-bytes (long-to-bytes value) + extra-bytes (byte-array extra-bytes-len (byte \A)) + value-to-write (byte-array (+ extra-bytes-len 8) (concat value-bytes extra-bytes))] + (ProducerRecord. topic (int partition) key value-to-write))) (defn ^OffsetAndMetadata offset+metadata "Constructs an OffsetAndMetadata." @@ -295,6 +358,7 @@ ; immediately. (when (pos? offset) (let [records (.poll consumer duration) + records (recover-consumer-records records) records (vec records) last-record ^ConsumerRecord (peek records)] ;(info :poll-through-records offset records) diff --git a/src/jepsen/hstream/kafka/workload/queue.clj b/src/jepsen/hstream/kafka/workload/queue.clj index c494169..f4aa7d5 100644 --- a/src/jepsen/hstream/kafka/workload/queue.clj +++ b/src/jepsen/hstream/kafka/workload/queue.clj @@ -220,7 +220,8 @@ (case (first mop) :poll (try (rc/unwrap-errors - (let [records (.poll consumer (rc/ms->duration poll-ms))] + (let [records (.poll consumer (rc/ms->duration poll-ms)) + records (rc/recover-consumer-records records)] (->> (.partitions records) (map (fn per-topic-partition [topic-partition] ; Return a key, messages pair