-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpartition_test.go
86 lines (77 loc) · 2.47 KB
/
partition_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package MqServer
import (
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
consumerGroup "github.com/YarBor/BorsMqServer/consumer_group"
Log "github.com/YarBor/BorsMqServer/logger"
"github.com/YarBor/BorsMqServer/raft_server"
)
func pullUpRaftServers(NodeINfO ...struct {
ID, Url string
}) []*raft_server.RaftServer {
res := []*raft_server.RaftServer{}
for _, in := range NodeINfO {
ser := MakeRaftServer(in)
res = append(res, ser)
go ser.Serve()
}
return res
}
type Notifier struct {
}
func (n *Notifier) CancelReg2Cluster(consumer *consumerGroup.Consumer) {
Log.DEBUG("CancelReg2Cluster Get Call", consumer)
}
func buildMQPartitionsController(ser ...*raft_server.RaftServer) []*PartitionsController {
n, res := Notifier{}, []*PartitionsController{}
for _, server := range ser {
resT := NewPartitionsController(server, &n)
res = append(res, resT)
}
return res
}
func TestPartitions_new(t *testing.T) {
Log.SetLogLevel(Log.LogLevel_DEBUG)
ThreeRFserver := pullUpRaftServers(ThreeNodeInfo...)
PCs := buildMQPartitionsController(ThreeRFserver...)
MaxEntries, MaxSize := 10, 100000
parts := make([]*Partition, 0, 3)
for _, c := range PCs {
part, err := c.RegisterPart("Topic", "Partitions", uint64(MaxEntries), uint64(MaxSize), ThreeNodeInfo...)
if err != nil {
panic(err)
} else {
_ = part.Start()
}
parts = append(parts, part)
}
time.Sleep(time.Second)
leaderIndex := -1
for i := range parts {
err := parts[i].Write([][]byte{[]byte("topic"), []byte("Partitions")})
if err == nil {
leaderIndex = i
}
}
assert.NotEqual(t, int(-1), leaderIndex)
cons := consumerGroup.NewConsumer("ConsumerTest", "GroupTest", 1e6, int32(MaxSize), int32(MaxEntries))
_, err := parts[leaderIndex].registerConsumerGroup("GroupTest", cons, 0)
if err != nil {
panic(err)
}
time.Sleep(time.Second)
Log.DEBUG(parts[0])
Log.DEBUG(parts[1])
Log.DEBUG(parts[2])
Data, ReadBeginOffset, ReadEntriesNum, IsAllow2Del, err1 := parts[leaderIndex].Read(cons.SelfId, cons.GroupId, -1, 1)
Log.DEBUG(Data, ReadBeginOffset, ReadEntriesNum, IsAllow2Del, err1)
Data, ReadBeginOffset, ReadEntriesNum, IsAllow2Del, err1 = parts[leaderIndex].Read(cons.SelfId, cons.GroupId, ReadBeginOffset, 1)
Log.DEBUG(Data, ReadBeginOffset, ReadEntriesNum, IsAllow2Del, err1)
Data, ReadBeginOffset, ReadEntriesNum, IsAllow2Del, err1 = parts[leaderIndex].Read(cons.SelfId, cons.GroupId, ReadBeginOffset, 100)
Log.DEBUG(Data, ReadBeginOffset, ReadEntriesNum, IsAllow2Del, err1)
}