This repository has been archived by the owner on Oct 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 215
/
Copy pathtailer_test.go
144 lines (131 loc) · 4.59 KB
/
tailer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package mysql
import (
"database/sql"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/compose/transporter/client"
)
func checkBinLogReadable(s *sql.DB) error {
var File string
var Position int
var _BinlogDoDB string
var _BinlogIgnoreDB string
var _ExecutedGtidSet string
err := s.QueryRow(`SHOW MASTER STATUS;`).Scan(&File, &Position, &_BinlogDoDB, &_BinlogIgnoreDB, &_ExecutedGtidSet)
return err
}
var (
tailerTestData = &TestData{"tailer_test", "tailer_test_table", basicSchema, 10}
)
func TestTailer(t *testing.T) {
if testing.Short() {
t.Skip("skipping Tailer in short mode")
}
dsn := "mysql://root@localhost:3306?%s"
c, err := NewClient(WithURI(fmt.Sprintf(dsn, tailerTestData.DB)))
if err != nil {
t.Fatalf("unable to initialize connection to mysql, %s", err)
}
defer c.Close()
s, err := c.Connect()
if err != nil {
t.Fatalf("unable to obtain session to mysql, %s", err)
}
if err := checkBinLogReadable(s.(*Session).mysqlSession); err != nil {
t.Fatalf("unable to query binlog, %s", err)
}
time.Sleep(1 * time.Second)
// There is no t.Debug unfortunately so retaining below but commented out
//t.Log("DEBUG: Starting tailer...")
r := newTailer(dsn)
// There is no t.Debug unfortunately so retaining below but commented out
//t.Log("DEBUG: Tailer running")
readFunc := r.Read(map[string]client.MessageSet{}, func(table string) bool {
if strings.HasPrefix(table, "information_schema.") ||
strings.HasPrefix(table, "performance_schema.") ||
strings.HasPrefix(table, "mysql.") ||
strings.HasPrefix(table, "sys.") {
return false
}
return table == fmt.Sprintf("%s.%s", tailerTestData.DB, tailerTestData.Table)
})
done := make(chan struct{})
msgChan, err := readFunc(s, done)
if err != nil {
t.Fatalf("unexpected Read error, %s\n", err)
}
// There is no t.Debug unfortunately so retaining below but commented out
//t.Log("DEBUG: Checking count for initial drain")
checkCount("initial drain", tailerTestData.InsertCount, msgChan, t)
// There is no t.Debug unfortunately so retaining below but commented out
//t.Log("DEBUG: Inserting some stuff")
for i := 10; i < 20; i++ {
// No error handling, this is testing
_, _ = s.(*Session).mysqlSession.Exec(fmt.Sprintf(`INSERT INTO %s VALUES (
%d, -- id
'%s', -- colvar VARCHAR(255),
now() -- coltimestamp TIMESTAMP,
);`, tailerTestData.Table, i, randomHeros[i%len(randomHeros)]))
}
// There is no t.Debug unfortunately so retaining below but commented out
//t.Log("DEBUG: Checking count for tailed data")
checkCount("tailed data", 10, msgChan, t)
// There is no t.Debug unfortunately so retaining below but commented out
//t.Log("DEBUG: Updating data")
for i := 10; i < 20; i++ {
// No error handling, this is testing
_, _ = s.(*Session).mysqlSession.Exec(fmt.Sprintf("UPDATE %s SET colvar = 'hello' WHERE id = %d;", tailerTestData.Table, i))
}
// There is no t.Debug unfortunately so retaining below but commented out
//t.Log("DEBUG: Checking count for updated data")
// Note: During developing found this was returning 20 messages
// This is because binlog returns a before and after for the update
// Handling this in processEvent
// See more comments about this in that function
checkCount("updated data", 10, msgChan, t)
// There is no t.Debug unfortunately so retaining below but commented out
//t.Log("DEBUG: Deleting data")
for i := 10; i < 20; i++ {
// No error handling, this is testing
_, _ = s.(*Session).mysqlSession.Exec(fmt.Sprintf(`DELETE FROM %v WHERE id = %d; `, tailerTestData.Table, i))
}
// There is no t.Debug unfortunately so retaining below but commented out
//t.Log("DEBUG: Checking count for deleted data")
checkCount("deleted data", 10, msgChan, t)
close(done)
}
func checkCount(desc string, expected int, msgChan <-chan client.MessageSet, t *testing.T) {
// There is no t.Debug unfortunately so retaining below but commented out
//t.Log("DEBUG: Running checkCount")
var numMsgs int
var wg sync.WaitGroup
wg.Add(1)
go func(wg *sync.WaitGroup) {
for {
select {
case <-msgChan:
numMsgs++
case <-time.After(1 * time.Second):
if numMsgs == expected {
wg.Done()
return
}
// The below isn't quitting things as quickly as intended
case <-time.After(20 * time.Second):
wg.Done()
return
}
// There is no t.Debug unfortunately so retaining below, but commented out
//t.Logf("DEBUG: %d messages so far", numMsgs)
}
}(&wg)
wg.Wait()
if numMsgs != expected {
t.Errorf("[%s] bad message count, expected %d, got %d\n", desc, expected, numMsgs)
} else {
t.Logf("[%s] message count ok", desc)
}
}