This repository has been archived by the owner on Oct 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 215
/
Copy pathreader_test.go
163 lines (152 loc) · 5.37 KB
/
reader_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package mysql
import (
_ "embed"
"encoding/hex"
"fmt"
"strings"
"testing"
"time"
"github.com/compose/transporter/client"
"github.com/compose/transporter/message"
)
var (
readerTestData = &TestData{"reader_test", "reader_test_table", basicSchema, 10}
// For testing Blob
//go:embed logo-mysql-170x115.png
blobdata string
)
func TestRead(t *testing.T) {
if testing.Short() {
t.Skip("skipping Read in short mode")
}
reader := newReader()
readFunc := reader.Read(map[string]client.MessageSet{}, func(table string) bool {
if strings.HasPrefix(table, "information_schema.") ||
strings.HasPrefix(table, "performance_schema.") ||
strings.HasPrefix(table, "mysql.") ||
strings.HasPrefix(table, "sys.") {
return false
}
return table == readerTestData.DB+"."+readerTestData.Table
})
done := make(chan struct{})
c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", readerTestData.DB)))
if err != nil {
t.Fatalf("unable to initialize connection to mysql, %s", err)
}
defer c.Close()
s, err := c.Connect()
if err != nil {
t.Fatalf("unable to obtain session to mysql, %s", err)
}
msgChan, err := readFunc(s, done)
if err != nil {
t.Fatalf("unexpected Read error, %s\n", err)
}
var numMsgs int
for range msgChan {
numMsgs++
}
if numMsgs != readerTestData.InsertCount {
t.Errorf("bad message count, expected %d, got %d\n", readerTestData.InsertCount, numMsgs)
}
close(done)
}
var (
readerComplexTestData = &TestData{"reader_complex_test", "reader_complex_test_table", complexSchema, 10}
)
func TestReadComplex(t *testing.T) {
if testing.Short() {
t.Skip("skipping Read in short mode")
}
reader := newReader()
readFunc := reader.Read(map[string]client.MessageSet{}, func(table string) bool {
if strings.HasPrefix(table, "information_schema.") ||
strings.HasPrefix(table, "performance_schema.") ||
strings.HasPrefix(table, "mysql.") ||
strings.HasPrefix(table, "sys.") {
return false
}
return table == readerComplexTestData.DB+"."+readerComplexTestData.Table
})
done := make(chan struct{})
c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", readerComplexTestData.DB)))
if err != nil {
t.Fatalf("unable to initialize connection to mysql, %s", err)
}
defer c.Close()
s, err := c.Connect()
if err != nil {
t.Fatalf("unable to obtain session to mysql, %s", err)
}
msgChan, err := readFunc(s, done)
if err != nil {
t.Fatalf("unexpected Read error, %s\n", err)
}
msgs := make([]message.Msg, 0)
for msg := range msgChan {
msgs = append(msgs, msg.Msg)
}
if len(msgs) != readerComplexTestData.InsertCount {
t.Errorf("bad message count, expected %d, got %d\n", readerComplexTestData.InsertCount, len(msgs))
}
for i := 0; i < readerTestData.InsertCount; i++ {
for key, value := range map[string]interface{}{
"id": int64(i) + 1,
"colinteger": int64(i),
"colsmallint": int64(32767),
"coltinyint": int64(127),
"colmediumint": int64(8388607),
"colbigint": int64(21474836471),
"coldecimal": 0.23509838,
"colfloat": 0.31426,
"coldoubleprecision": 0.314259892323,
"colbit": "101",
"coldate": time.Date(2021, 12, 10, 0, 0, 0, 0, time.UTC),
"coltime": "13:45:00",
"colyear": uint64(2021),
"colchar": "a",
"colvar": randomHeros[i%len(randomHeros)],
"colbinary": "deadbeef000000000000",
"colblob": blobdata,
"coltext": "this is extremely important",
"coljson": "{\"name\": \"batman\", \"sidekick\": \"robin\"}",
"colpoint": "POINT (15 15)",
"collinestring": "LINESTRING (0 0, 1 1, 2 2)",
"colpolygon": "POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0), (5 5, 7 5, 7 7, 5 7, 5 5))",
"colgeometrycollection": "GEOMETRYCOLLECTION (POINT (1 1), LINESTRING (0 0, 1 1, 2 2, 3 3, 4 4))",
} {
switch {
case key == "colbinary":
// NOTE: This is a "hack" for testing purposes.
// True binary data (colblob) works fine and no additional parsing is required
// (i.e. nothing in `casifyValue` for it and the blob comparison works)
// When we insert the Golang value of 0xDEADBEEF into MySQL and just read it we
// get a weird string. I.e. the actual binary data. But I cannot for the life
// of me figure out how in Golang to convert 0xDEADBEEF to the same form. I.e.
// like the blobdata. So...
//
// In a mysql shell you can get a human readable form from:
//
// mysql> select hex(colbinary) from reader_complex_test_table limit 1;
// +----------------------+
// | hex(colbinary) |
// +----------------------+
// | DEADBEEF000000000000 |
// +----------------------+
//
// So that is what we do here just for the ease of testing
binvalue := hex.EncodeToString([]byte(msgs[i].Data().Get(key).(string)))
if binvalue != value {
t.Errorf("Expected %v of row to equal %v (%T), but was %v (%T)", key, value, value, binvalue, binvalue)
}
default:
if msgs[i].Data().Get(key) != value {
// Fatalf here hides other errors because it's a FailNow so use Error instead
t.Errorf("Expected %v of row to equal %v (%T), but was %v (%T)", key, value, value, msgs[i].Data().Get(key), msgs[i].Data().Get(key))
}
}
}
}
close(done)
}