forked from gocelery/gocelery
-
Notifications
You must be signed in to change notification settings - Fork 1
/
broker_test.go
109 lines (102 loc) · 3.3 KB
/
broker_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package gocelery
import (
"encoding/json"
"math/rand"
"reflect"
"testing"
)
func makeCeleryMessage() (*CeleryMessage, error) {
taskMessage := getTaskMessage("add")
taskMessage.Args = []interface{}{rand.Intn(10), rand.Intn(10)}
defer releaseTaskMessage(taskMessage)
encodedTaskMessage, err := taskMessage.Encode()
if err != nil {
return nil, err
}
return getCeleryMessage(encodedTaskMessage), nil
}
// test all brokers
func getBrokers() []CeleryBroker {
return []CeleryBroker{
NewRedisCeleryBroker("localhost:6379", ""),
//NewAMQPCeleryBroker("amqp://"),
}
}
// TestSend is Redis specific test that sets CeleryMessage to queue
func TestSend(t *testing.T) {
broker := NewRedisCeleryBroker("localhost:6379", "")
celeryMessage, err := makeCeleryMessage()
if err != nil || celeryMessage == nil {
t.Errorf("failed to construct celery message: %v", err)
}
defer releaseCeleryMessage(celeryMessage)
err = broker.SendCeleryMessage(celeryMessage)
if err != nil {
t.Errorf("failed to send celery message to broker: %v", err)
}
conn := broker.Get()
defer conn.Close()
messageJSON, err := conn.Do("BLPOP", broker.queueName, "1")
if err != nil || messageJSON == nil {
t.Errorf("failed to get celery message from broker: %v", err)
}
messageList := messageJSON.([]interface{})
if string(messageList[0].([]byte)) != "celery" {
t.Errorf("non celery message received")
}
// parse celery message
var message CeleryMessage
json.Unmarshal(messageList[1].([]byte), &message)
if !reflect.DeepEqual(celeryMessage, &message) {
t.Errorf("received message %v different from original message %v", &message, celeryMessage)
}
}
// TestGet is Redis specific test that gets CeleryMessage from queue
func TestGet(t *testing.T) {
broker := NewRedisCeleryBroker("localhost:6379", "")
celeryMessage, err := makeCeleryMessage()
if err != nil || celeryMessage == nil {
t.Errorf("failed to construct celery message: %v", err)
}
defer releaseCeleryMessage(celeryMessage)
jsonBytes, err := json.Marshal(celeryMessage)
if err != nil {
t.Errorf("failed to marshal celery message: %v", err)
}
conn := broker.Get()
defer conn.Close()
_, err = conn.Do("LPUSH", broker.queueName, jsonBytes)
if err != nil {
t.Errorf("failed to push celery message to redis: %v", err)
}
// test Get
message, err := broker.GetCeleryMessage()
if err != nil {
t.Errorf("failed to get celery message from broker: %v", err)
}
if !reflect.DeepEqual(message, celeryMessage) {
t.Errorf("received message %v different from original message %v", message, celeryMessage)
}
}
// TestSendGet tests set/get features for all brokers
func TestSendGet(t *testing.T) {
for _, broker := range getBrokers() {
celeryMessage, err := makeCeleryMessage()
if err != nil || celeryMessage == nil {
t.Errorf("failed to construct celery message: %v", err)
}
defer releaseCeleryMessage(celeryMessage)
err = broker.SendCeleryMessage(celeryMessage)
if err != nil {
t.Errorf("failed to send celery message to broker: %v", err)
}
message, err := broker.GetTaskMessage()
if err != nil {
t.Errorf("failed to get celery message from broker: %v", err)
}
originalMessage := celeryMessage.GetTaskMessage()
if !reflect.DeepEqual(message, originalMessage) {
t.Errorf("received message %v different from original message %v", message, originalMessage)
}
}
}