Skip to content

Commit

Permalink
Add test using aws client
Browse files Browse the repository at this point in the history
  • Loading branch information
egs33 committed Jun 14, 2024
1 parent 0bfdbdf commit 7ab1949
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 117 deletions.
355 changes: 240 additions & 115 deletions test/gluttony/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
(gluttony.record.consumer
Consumer)
(java.util
UUID)))
UUID)
(software.amazon.awssdk.services.sqs.model
Message)))

(use-fixtures :each th/read-config-fixture th/test-client-fixture)

Expand Down Expand Up @@ -68,126 +70,249 @@

(deftest verify-work-of-receiver-and-worker
(when (:queue-name th/config)
(let [queue-url (th/get-queue-url)]
;; Make queue empty
(th/purge-queue queue-url)
(testing "cognitect-client"
(let [queue-url (th/get-queue-url)]
;; Make queue empty
(th/purge-queue queue-url)

(testing "Gather every data in order"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 20]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [message respond _]
(log/infof "start to consume:%s" (:body message))
(swap! collected
conj (:id (edn/read-string (:body message))))
(respond))
consumer (start-consumer queue-url consume th/client
{:num-workers 1
:num-receivers 1
:long-polling-duration 10})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 20))))
(is (= (vec (range 1 21))
@collected))
(stop-consumer consumer)))

(testing "Concurrent gathering"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 20]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [message respond _]
(log/infof "start to consume:%s" (:body message))
(swap! collected
conj (:id (edn/read-string (:body message))))
(respond))
consumer (start-consumer queue-url consume th/client
{:num-workers 3
:num-receivers 2
:long-polling-duration 10})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 20))))
(is (= (set (range 1 21))
(set @collected)))
(stop-consumer consumer)))

(testing "Heartbeat work"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 1]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [message respond _]
(log/infof "start to consume:%s" (:body message))
(a/go
;; wait 3 seconds
(a/<! (a/timeout 3000))
(testing "Gather every data in order"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 20]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [message respond _]
(log/infof "start to consume:%s" (:body message))
(swap! collected
conj (:id (edn/read-string (:body message))))
(respond)))
consumer (start-consumer queue-url consume th/client
{:num-workers 1
:num-receivers 1
:long-polling-duration 10
:heartbeat 1
:heartbeat-timeout 5})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 1))))
(is (= [1]
@collected))
(stop-consumer consumer)))

(testing "Check consume-limit"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 3]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [message respond _]
(log/infof "start to consume:%s" (:body message))
(a/go
;(is (instance? gluttony.record.message.SQSMessage message))
(respond))
consumer (start-consumer queue-url consume th/client
{:num-workers 1
:num-receivers 1
:long-polling-duration 10})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 20))))
(is (= (vec (range 1 21))
@collected))
(stop-consumer consumer)))

(testing "Concurrent gathering"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 20]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [message respond _]
(log/infof "start to consume:%s" (:body message))
(swap! collected
conj (:id (edn/read-string (:body message))))
(a/<! (a/timeout 10)) ; Make a point of park
(respond))
consumer (start-consumer queue-url consume th/client
{:num-workers 3
:num-receivers 2
:long-polling-duration 10})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 20))))
(is (= (set (range 1 21))
(set @collected)))
(stop-consumer consumer)))

(testing "Heartbeat work"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 1]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [message respond _]
(log/infof "start to consume:%s" (:body message))
(a/go
;; wait 3 seconds
(a/<! (a/timeout 3000))
(swap! collected
conj (:id (edn/read-string (:body message))))
(respond)))
consumer (start-consumer queue-url consume th/client
{:num-workers 1
:num-receivers 1
:long-polling-duration 10
:heartbeat 1
:heartbeat-timeout 5})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 1))))
(is (= [1]
@collected))
(stop-consumer consumer)))

(testing "Check consume-limit"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 3]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [message respond _]
(log/infof "start to consume:%s" (:body message))
(a/go
;(is (instance? gluttony.record.message.SQSMessage message))
(swap! collected
conj (:id (edn/read-string (:body message))))
(a/<! (a/timeout 10)) ; Make a point of park
(swap! collected
conj Integer/MIN_VALUE)
(respond)
;; Respond twice on purpose
(respond)))
consumer (start-consumer queue-url consume th/client
{:num-workers 3
:num-receivers 1
:long-polling-duration 10
:consume-limit 1})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 6))))
(is (= (set (range 1 4))
(set (keep-indexed (fn [i v]
(when (even? i)
v))
@collected))))
(is (= [Integer/MIN_VALUE Integer/MIN_VALUE Integer/MIN_VALUE]
(keep-indexed (fn [i v]
(when (odd? i)
v))
@collected)))
(stop-consumer consumer)))))

(testing "aws-client"
(let [queue-url (th/get-queue-url)]
;; Make queue empty
(th/purge-queue queue-url)

(testing "Gather every data in order"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 20]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [^Message message respond _]
(log/infof "start to consume:%s" (.body message))
(swap! collected
conj (:id (edn/read-string (.body message))))
(respond))
consumer (start-consumer queue-url consume th/aws-client
{:num-workers 1
:num-receivers 1
:long-polling-duration 10})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 20))))
(is (= (vec (range 1 21))
@collected))
(stop-consumer consumer)))

(testing "Concurrent gathering"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 20]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [^Message message respond _]
(log/infof "start to consume:%s" (.body message))
(swap! collected
conj Integer/MIN_VALUE)
(respond)
;; Respond twice on purpose
(respond)))
consumer (start-consumer queue-url consume th/client
{:num-workers 3
:num-receivers 1
:long-polling-duration 10
:consume-limit 1})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 6))))
(is (= (set (range 1 4))
(set (keep-indexed (fn [i v]
(when (even? i)
v))
@collected))))
(is (= [Integer/MIN_VALUE Integer/MIN_VALUE Integer/MIN_VALUE]
(keep-indexed (fn [i v]
(when (odd? i)
v))
@collected)))
(stop-consumer consumer))))))
conj (:id (edn/read-string (.body message))))
(respond))
consumer (start-consumer queue-url consume th/aws-client
{:num-workers 3
:num-receivers 2
:long-polling-duration 10})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 20))))
(is (= (set (range 1 21))
(set @collected)))
(stop-consumer consumer)))

(testing "Heartbeat work"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 1]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [^Message message respond _]
(log/infof "start to consume:%s" (.body message))
(a/go
;; wait 3 seconds
(a/<! (a/timeout 3000))
(swap! collected
conj (:id (edn/read-string (.body message))))
(respond)))
consumer (start-consumer queue-url consume th/aws-client
{:num-workers 1
:num-receivers 1
:long-polling-duration 10
:heartbeat 1
:heartbeat-timeout 5})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 1))))
(is (= [1]
@collected))
(stop-consumer consumer)))

(testing "Check consume-limit"
;; Add test data
(let [uuid (UUID/randomUUID)]
(dotimes [i 3]
(th/send-message {:QueueUrl queue-url
:MessageBody (pr-str {:id (inc i)})
:MessageDeduplicationId (str uuid ":" i)
:MessageGroupId (str uuid)})))

(let [collected (atom [])
consume (fn [^Message message respond _]
(log/infof "start to consume:%s" (.body message))
(a/go
;(is (instance? gluttony.record.message.SQSMessage message))
(swap! collected
conj (:id (edn/read-string (.body message))))
(a/<! (a/timeout 10)) ; Make a point of park
(swap! collected
conj Integer/MIN_VALUE)
(respond)
;; Respond twice on purpose
(respond)))
consumer (start-consumer queue-url consume th/aws-client
{:num-workers 3
:num-receivers 1
:long-polling-duration 10
:consume-limit 1})]
(a/<!! (th/wait-chan (* 1000 45) (fn [] (>= (count @collected) 6))))
(is (= (set (range 1 4))
(set (keep-indexed (fn [i v]
(when (even? i)
v))
@collected))))
(is (= [Integer/MIN_VALUE Integer/MIN_VALUE Integer/MIN_VALUE]
(keep-indexed (fn [i v]
(when (odd? i)
v))
@collected)))
(stop-consumer consumer)))))))

(deftest disable-and-enable-receivers-test
(when (:queue-name th/config)
Expand Down
Loading

0 comments on commit 7ab1949

Please sign in to comment.