This repository has been archived by the owner on Oct 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 215
/
tailer_test.go
113 lines (99 loc) · 2.87 KB
/
tailer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package postgres
import (
"database/sql"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/compose/transporter/client"
)
func addTestReplicationSlot(s *sql.DB) error {
_, err := s.Exec(`
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'test_decoding');
`)
return err
}
func dropTestReplicationSlot(s *sql.DB) error {
_, err := s.Exec(`
SELECT * FROM pg_drop_replication_slot('test_slot');
`)
return err
}
var (
tailerTestData = &TestData{"tailer_test", "tailer_test_table", basicSchema, 10}
)
func TestTailer(t *testing.T) {
if testing.Short() {
t.Skip("skipping Tailer in short mode")
}
c, err := NewClient(WithURI(fmt.Sprintf("postgres://postgres@transporter-db:5432/%s?sslmode=disable", tailerTestData.DB)))
if err != nil {
t.Fatalf("unable to initialize connection to postgres, %s", err)
}
defer c.Close()
s, err := c.Connect()
if err != nil {
t.Fatalf("unable to obtain session to postgres, %s", err)
}
dropTestReplicationSlot(s.(*Session).pqSession)
if err := addTestReplicationSlot(s.(*Session).pqSession); err != nil {
t.Fatalf("unable to create replication slot, %s", err)
}
time.Sleep(1 * time.Second)
r := newTailer("test_slot")
readFunc := r.Read(map[string]client.MessageSet{}, func(table string) bool {
if strings.HasPrefix(table, "information_schema.") || strings.HasPrefix(table, "pg_catalog.") {
return false
}
return table == fmt.Sprintf("public.%s", tailerTestData.Table)
})
done := make(chan struct{})
msgChan, err := readFunc(s, done)
if err != nil {
t.Fatalf("unexpected Read error, %s\n", err)
}
checkCount("initial drain", tailerTestData.InsertCount, msgChan, t)
for i := 10; i < 20; i++ {
s.(*Session).pqSession.Exec(fmt.Sprintf(`INSERT INTO %s VALUES (
%d, -- id
'%s', -- colvar VARCHAR(255),
now() at time zone 'utc' -- coltimestamp TIMESTAMP,
);`, tailerTestData.Table, i, randomHeros[i%len(randomHeros)]))
}
checkCount("tailed data", 10, msgChan, t)
for i := 10; i < 20; i++ {
s.(*Session).pqSession.Exec(fmt.Sprintf("UPDATE %s SET colvar = 'hello' WHERE id = %d;", tailerTestData.Table, i))
}
checkCount("updated data", 10, msgChan, t)
for i := 10; i < 20; i++ {
s.(*Session).pqSession.Exec(fmt.Sprintf(`DELETE FROM %v WHERE id = %d; `, tailerTestData.Table, i))
}
checkCount("deleted data", 10, msgChan, t)
close(done)
}
func checkCount(desc string, expected int, msgChan <-chan client.MessageSet, t *testing.T) {
var numMsgs int
var wg sync.WaitGroup
wg.Add(1)
go func(wg *sync.WaitGroup) {
for {
select {
case <-msgChan:
numMsgs++
case <-time.After(1 * time.Second):
if numMsgs == expected {
wg.Done()
return
}
case <-time.After(20 * time.Second):
wg.Done()
return
}
}
}(&wg)
wg.Wait()
if numMsgs != expected {
t.Errorf("[%s] bad message count, expected %d, got %d\n", desc, expected, numMsgs)
}
}