diff --git a/test/gluttony/core_test.clj b/test/gluttony/core_test.clj index c82332d..4821924 100644 --- a/test/gluttony/core_test.clj +++ b/test/gluttony/core_test.clj @@ -10,7 +10,9 @@ (gluttony.record.consumer Consumer) (java.util - UUID))) + UUID) + (software.amazon.awssdk.services.sqs.model + Message))) (use-fixtures :each th/read-config-fixture th/test-client-fixture) @@ -68,126 +70,249 @@ (deftest verify-work-of-receiver-and-worker (when (:queue-name th/config) - (let [queue-url (th/get-queue-url)] - ;; Make queue empty - (th/purge-queue queue-url) + (testing "cognitect-client" + (let [queue-url (th/get-queue-url)] + ;; Make queue empty + (th/purge-queue queue-url) - (testing "Gather every data in order" - ;; Add test data - (let [uuid (UUID/randomUUID)] - (dotimes [i 20] - (th/send-message {:QueueUrl queue-url - :MessageBody (pr-str {:id (inc i)}) - :MessageDeduplicationId (str uuid ":" i) - :MessageGroupId (str uuid)}))) - - (let [collected (atom []) - consume (fn [message respond _] - (log/infof "start to consume:%s" (:body message)) - (swap! collected - conj (:id (edn/read-string (:body message)))) - (respond)) - consumer (start-consumer queue-url consume th/client - {:num-workers 1 - :num-receivers 1 - :long-polling-duration 10})] - (a/= (count @collected) 20)))) - (is (= (vec (range 1 21)) - @collected)) - (stop-consumer consumer))) - - (testing "Concurrent gathering" - ;; Add test data - (let [uuid (UUID/randomUUID)] - (dotimes [i 20] - (th/send-message {:QueueUrl queue-url - :MessageBody (pr-str {:id (inc i)}) - :MessageDeduplicationId (str uuid ":" i) - :MessageGroupId (str uuid)}))) - - (let [collected (atom []) - consume (fn [message respond _] - (log/infof "start to consume:%s" (:body message)) - (swap! collected - conj (:id (edn/read-string (:body message)))) - (respond)) - consumer (start-consumer queue-url consume th/client - {:num-workers 3 - :num-receivers 2 - :long-polling-duration 10})] - (a/= (count @collected) 20)))) - (is (= (set (range 1 21)) - (set @collected))) - (stop-consumer consumer))) - - (testing "Heartbeat work" - ;; Add test data - (let [uuid (UUID/randomUUID)] - (dotimes [i 1] - (th/send-message {:QueueUrl queue-url - :MessageBody (pr-str {:id (inc i)}) - :MessageDeduplicationId (str uuid ":" i) - :MessageGroupId (str uuid)}))) - - (let [collected (atom []) - consume (fn [message respond _] - (log/infof "start to consume:%s" (:body message)) - (a/go - ;; wait 3 seconds - (a/= (count @collected) 1)))) - (is (= [1] - @collected)) - (stop-consumer consumer))) - - (testing "Check consume-limit" - ;; Add test data - (let [uuid (UUID/randomUUID)] - (dotimes [i 3] - (th/send-message {:QueueUrl queue-url - :MessageBody (pr-str {:id (inc i)}) - :MessageDeduplicationId (str uuid ":" i) - :MessageGroupId (str uuid)}))) - - (let [collected (atom []) - consume (fn [message respond _] - (log/infof "start to consume:%s" (:body message)) - (a/go - ;(is (instance? gluttony.record.message.SQSMessage message)) + (respond)) + consumer (start-consumer queue-url consume th/client + {:num-workers 1 + :num-receivers 1 + :long-polling-duration 10})] + (a/= (count @collected) 20)))) + (is (= (vec (range 1 21)) + @collected)) + (stop-consumer consumer))) + + (testing "Concurrent gathering" + ;; Add test data + (let [uuid (UUID/randomUUID)] + (dotimes [i 20] + (th/send-message {:QueueUrl queue-url + :MessageBody (pr-str {:id (inc i)}) + :MessageDeduplicationId (str uuid ":" i) + :MessageGroupId (str uuid)}))) + + (let [collected (atom []) + consume (fn [message respond _] + (log/infof "start to consume:%s" (:body message)) (swap! collected conj (:id (edn/read-string (:body message)))) - (a/= (count @collected) 20)))) + (is (= (set (range 1 21)) + (set @collected))) + (stop-consumer consumer))) + + (testing "Heartbeat work" + ;; Add test data + (let [uuid (UUID/randomUUID)] + (dotimes [i 1] + (th/send-message {:QueueUrl queue-url + :MessageBody (pr-str {:id (inc i)}) + :MessageDeduplicationId (str uuid ":" i) + :MessageGroupId (str uuid)}))) + + (let [collected (atom []) + consume (fn [message respond _] + (log/infof "start to consume:%s" (:body message)) + (a/go + ;; wait 3 seconds + (a/= (count @collected) 1)))) + (is (= [1] + @collected)) + (stop-consumer consumer))) + + (testing "Check consume-limit" + ;; Add test data + (let [uuid (UUID/randomUUID)] + (dotimes [i 3] + (th/send-message {:QueueUrl queue-url + :MessageBody (pr-str {:id (inc i)}) + :MessageDeduplicationId (str uuid ":" i) + :MessageGroupId (str uuid)}))) + + (let [collected (atom []) + consume (fn [message respond _] + (log/infof "start to consume:%s" (:body message)) + (a/go + ;(is (instance? gluttony.record.message.SQSMessage message)) + (swap! collected + conj (:id (edn/read-string (:body message)))) + (a/= (count @collected) 6)))) + (is (= (set (range 1 4)) + (set (keep-indexed (fn [i v] + (when (even? i) + v)) + @collected)))) + (is (= [Integer/MIN_VALUE Integer/MIN_VALUE Integer/MIN_VALUE] + (keep-indexed (fn [i v] + (when (odd? i) + v)) + @collected))) + (stop-consumer consumer))))) + + (testing "aws-client" + (let [queue-url (th/get-queue-url)] + ;; Make queue empty + (th/purge-queue queue-url) + + (testing "Gather every data in order" + ;; Add test data + (let [uuid (UUID/randomUUID)] + (dotimes [i 20] + (th/send-message {:QueueUrl queue-url + :MessageBody (pr-str {:id (inc i)}) + :MessageDeduplicationId (str uuid ":" i) + :MessageGroupId (str uuid)}))) + + (let [collected (atom []) + consume (fn [^Message message respond _] + (log/infof "start to consume:%s" (.body message)) + (swap! collected + conj (:id (edn/read-string (.body message)))) + (respond)) + consumer (start-consumer queue-url consume th/aws-client + {:num-workers 1 + :num-receivers 1 + :long-polling-duration 10})] + (a/= (count @collected) 20)))) + (is (= (vec (range 1 21)) + @collected)) + (stop-consumer consumer))) + + (testing "Concurrent gathering" + ;; Add test data + (let [uuid (UUID/randomUUID)] + (dotimes [i 20] + (th/send-message {:QueueUrl queue-url + :MessageBody (pr-str {:id (inc i)}) + :MessageDeduplicationId (str uuid ":" i) + :MessageGroupId (str uuid)}))) + + (let [collected (atom []) + consume (fn [^Message message respond _] + (log/infof "start to consume:%s" (.body message)) (swap! collected - conj Integer/MIN_VALUE) - (respond) - ;; Respond twice on purpose - (respond))) - consumer (start-consumer queue-url consume th/client - {:num-workers 3 - :num-receivers 1 - :long-polling-duration 10 - :consume-limit 1})] - (a/= (count @collected) 6)))) - (is (= (set (range 1 4)) - (set (keep-indexed (fn [i v] - (when (even? i) - v)) - @collected)))) - (is (= [Integer/MIN_VALUE Integer/MIN_VALUE Integer/MIN_VALUE] - (keep-indexed (fn [i v] - (when (odd? i) - v)) - @collected))) - (stop-consumer consumer)))))) + conj (:id (edn/read-string (.body message)))) + (respond)) + consumer (start-consumer queue-url consume th/aws-client + {:num-workers 3 + :num-receivers 2 + :long-polling-duration 10})] + (a/= (count @collected) 20)))) + (is (= (set (range 1 21)) + (set @collected))) + (stop-consumer consumer))) + + (testing "Heartbeat work" + ;; Add test data + (let [uuid (UUID/randomUUID)] + (dotimes [i 1] + (th/send-message {:QueueUrl queue-url + :MessageBody (pr-str {:id (inc i)}) + :MessageDeduplicationId (str uuid ":" i) + :MessageGroupId (str uuid)}))) + + (let [collected (atom []) + consume (fn [^Message message respond _] + (log/infof "start to consume:%s" (.body message)) + (a/go + ;; wait 3 seconds + (a/= (count @collected) 1)))) + (is (= [1] + @collected)) + (stop-consumer consumer))) + + (testing "Check consume-limit" + ;; Add test data + (let [uuid (UUID/randomUUID)] + (dotimes [i 3] + (th/send-message {:QueueUrl queue-url + :MessageBody (pr-str {:id (inc i)}) + :MessageDeduplicationId (str uuid ":" i) + :MessageGroupId (str uuid)}))) + + (let [collected (atom []) + consume (fn [^Message message respond _] + (log/infof "start to consume:%s" (.body message)) + (a/go + ;(is (instance? gluttony.record.message.SQSMessage message)) + (swap! collected + conj (:id (edn/read-string (.body message)))) + (a/= (count @collected) 6)))) + (is (= (set (range 1 4)) + (set (keep-indexed (fn [i v] + (when (even? i) + v)) + @collected)))) + (is (= [Integer/MIN_VALUE Integer/MIN_VALUE Integer/MIN_VALUE] + (keep-indexed (fn [i v] + (when (odd? i) + v)) + @collected))) + (stop-consumer consumer))))))) (deftest disable-and-enable-receivers-test (when (:queue-name th/config) diff --git a/test/gluttony/test_helper.clj b/test/gluttony/test_helper.clj index 4f2eb9a..3e84043 100644 --- a/test/gluttony/test_helper.clj +++ b/test/gluttony/test_helper.clj @@ -4,13 +4,23 @@ [clojure.core.async :as a] [clojure.java.io :as io] [cognitect.aws.client.api :as aws] + [gluttony.record.aws-sqs-client :as aws-client] [gluttony.record.cognitect-sqs-client :as cognitect-client] - [unilog.config :as unilog])) + [unilog.config :as unilog]) + (:import + (java.net + URI) + (software.amazon.awssdk.regions + Region) + (software.amazon.awssdk.services.sqs + SqsAsyncClient))) (def config nil) (def client nil) +(def aws-client nil) + (defn read-config-fixture [f] (alter-var-root #'config (constantly (some-> (io/resource "test-config.edn") @@ -25,10 +35,26 @@ endpoint (assoc :endpoint-override endpoint) true (aws/client)))) +(defn- create-aws-client + [] + (let [{:keys [region endpoint]} config] + (cond-> (SqsAsyncClient/builder) + region (.region (Region/of (name region))) + endpoint (.endpointOverride (URI/create (str (name (:protocol endpoint)) + "://" + (:hostname endpoint) + ":" + (:port endpoint) + (:path endpoint)))) + true (.build)))) + (defn test-client-fixture [f] - (let [cognitect-client (create-cognitect-client)] + (let [cognitect-client (create-cognitect-client) + aws-client (create-aws-client)] (alter-var-root #'client (constantly (cognitect-client/make-client cognitect-client))) + (alter-var-root #'aws-client + (constantly (aws-client/make-client aws-client))) (f) (aws/stop cognitect-client)))