Skip to content

Commit

Permalink
add TestRepliStreamEmitterRequest/current_fileID
Browse files Browse the repository at this point in the history
  • Loading branch information
octu0 committed Jul 19, 2022
1 parent daffc01 commit 9253c97
Showing 1 changed file with 101 additions and 0 deletions.
101 changes: 101 additions & 0 deletions repli/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,103 @@ func TestRapliStreamEmitterStartStop(t *testing.T) {
})
}

type testCurrentFileIDSource struct {
id datafile.FileID
}

func (t *testCurrentFileIDSource) CurrentFileID() datafile.FileID {
return t.id
}

func (t *testCurrentFileIDSource) FileIds() []datafile.FileID {
return nil
}

func (t *testCurrentFileIDSource) LastIndex(datafile.FileID) int64 {
return 0
}

func (t *testCurrentFileIDSource) Header(datafile.FileID, int64) (*datafile.Header, datafile.EOFType, error) {
return nil, true, errors.Errorf("no header")
}

func (t *testCurrentFileIDSource) Read(datafile.FileID, int64, int64) (*datafile.Entry, error) {
return nil, errors.Errorf("no entry")
}

func testRepliStreamEmitterRequestCurrentFileID(t *testing.T) {
e := NewStreamEmitter(runtime.DefaultContext(), log.Default(), "", 0)
defer e.Stop()

id := datafile.NextFileID()
s := &testCurrentFileIDSource{
id: id,
}
if err := e.Start(s, "127.0.0.1", -1); err != nil {
t.Errorf("no error ephemeral port %+v", err)
}

natsUrl := fmt.Sprintf("nats://%s", e.server.Addr().String())
nc, err := conn(natsUrl, t.Name())
if err != nil {
t.Fatalf("no error %+v", err)
}
defer nc.Close()

t.Run("empty_request_data", func(tt *testing.T) {
msg, err := nc.Request(SubjectCurrentFileID, []byte{}, 1*time.Second)
if err != nil {
tt.Fatalf("no error %+v", err)
}
res := ResponseCurrentFileID{}
if err := gob.NewDecoder(bytes.NewReader(msg.Data)).Decode(&res); err != nil {
tt.Fatalf("no error %+v", err)
}
if res.Err == "" {
tt.Errorf("not empty error")
}
tt.Logf("decoder empty byte read: %s", res.Err)
})
t.Run("mismatch_request_data", func(tt *testing.T) {
out := bytes.NewBuffer(nil)
gob.NewEncoder(out).Encode(time.Time{})

msg, err := nc.Request(SubjectCurrentFileID, out.Bytes(), 1*time.Second)
if err != nil {
tt.Fatalf("no error %+v", err)
}
res := ResponseCurrentFileID{}
if err := gob.NewDecoder(bytes.NewReader(msg.Data)).Decode(&res); err != nil {
tt.Fatalf("no error %+v", err)
}
if res.Err == "" {
tt.Errorf("not empty error")
}
tt.Logf("decoder mismatch type read: %s", res.Err)
})
t.Run("data_read", func(tt *testing.T) {
out := bytes.NewBuffer(nil)
if err := gob.NewEncoder(out).Encode(RequestCurrentFileID{}); err != nil {
tt.Fatalf("no error %+v", err)
}

msg, err := nc.Request(SubjectCurrentFileID, out.Bytes(), 1*time.Second)
if err != nil {
tt.Fatalf("no error %+v", err)
}
res := ResponseCurrentFileID{}
if err := gob.NewDecoder(bytes.NewReader(msg.Data)).Decode(&res); err != nil {
tt.Fatalf("no error %+v", err)
}
if res.Err != "" {
tt.Errorf("no response err: actual'%s'", res.Err)
}
if id.Equal(res.FileID) != true {
tt.Errorf("mismatch fileID actual:%v", res.FileID)
}
})
}

type testFileIdsSource struct {
ids []datafile.FileID
}
Expand Down Expand Up @@ -752,6 +849,7 @@ func testRepliStreamEmitterRequestFetchData(t *testing.T) {
}

func TestRepliStreamEmitterRequest(t *testing.T) {
t.Run("current_fileID", testRepliStreamEmitterRequestCurrentFileID)
t.Run("current_file_ids", testRepliStreamEmitterRequestCurrentFileIds)
t.Run("current_index", testRepliStreamEmitterRequestCurrentIndex)
t.Run("fetch_size", testRepliStreamEmitterRequestFetchSize)
Expand Down Expand Up @@ -2226,3 +2324,6 @@ func TestRepliTemporaryRepliData(t *testing.T) {
}
})
}

func TestRepliStreamReconnect(t *testing.T) {
}

0 comments on commit 9253c97

Please sign in to comment.