-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
control_record_test.go
69 lines (57 loc) · 1.71 KB
/
control_record_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//go:build !functional
package sarama
import (
"testing"
)
var (
abortTxCtrlRecKey = []byte{
0, 0, // version
0, 0, // TX_ABORT = 0
}
abortTxCtrlRecValue = []byte{
0, 0, // version
0, 0, 0, 10, // coordinator epoch
}
commitTxCtrlRecKey = []byte{
0, 0, // version
0, 1, // TX_COMMIT = 1
}
commitTxCtrlRecValue = []byte{
0, 0, // version
0, 0, 0, 15, // coordinator epoch
}
unknownCtrlRecKey = []byte{
0, 0, // version
0, 128, // UNKNOWN = -1
}
// empty value for unknown record
unknownCtrlRecValue = []byte{}
)
func testDecode(t *testing.T, tp string, key []byte, value []byte) ControlRecord {
controlRecord := ControlRecord{}
err := controlRecord.decode(&realDecoder{raw: key}, &realDecoder{raw: value})
if err != nil {
t.Error("Decoding control record of type " + tp + " failed")
return ControlRecord{}
}
return controlRecord
}
func assertRecordType(t *testing.T, r *ControlRecord, expected ControlRecordType) {
if r.Type != expected {
t.Errorf("control record type mismatch, expected: %v, have %v", expected, r.Type)
}
}
func TestDecodingControlRecords(t *testing.T) {
abortTx := testDecode(t, "abort transaction", abortTxCtrlRecKey, abortTxCtrlRecValue)
assertRecordType(t, &abortTx, ControlRecordAbort)
if abortTx.CoordinatorEpoch != 10 {
t.Errorf("abort tx control record coordinator epoch mismatch")
}
commitTx := testDecode(t, "commit transaction", commitTxCtrlRecKey, commitTxCtrlRecValue)
if commitTx.CoordinatorEpoch != 15 {
t.Errorf("commit tx control record coordinator epoch mismatch")
}
assertRecordType(t, &commitTx, ControlRecordCommit)
unknown := testDecode(t, "unknown", unknownCtrlRecKey, unknownCtrlRecValue)
assertRecordType(t, &unknown, ControlRecordUnknown)
}