Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Add ledger backend which replays ledgers #5584

Merged
merged 9 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ type ArchiveInterface interface {
ListAllBuckets() (chan string, chan error)
ListAllBucketHashes() (chan Hash, chan error)
ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error)
GetXdrStreamForHash(hash Hash) (*XdrStream, error)
GetXdrStream(pth string) (*XdrStream, error)
GetXdrStreamForHash(hash Hash) (*xdr.Stream, error)
GetXdrStream(pth string) (*xdr.Stream, error)
GetCheckpointManager() CheckpointManager
GetStats() []ArchiveStats
}
Expand Down Expand Up @@ -391,19 +391,19 @@ func (a *Archive) GetBucketPathForHash(hash Hash) string {
)
}

func (a *Archive) GetXdrStreamForHash(hash Hash) (*XdrStream, error) {
func (a *Archive) GetXdrStreamForHash(hash Hash) (*xdr.Stream, error) {
return a.GetXdrStream(a.GetBucketPathForHash(hash))
}

func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
func (a *Archive) GetXdrStream(pth string) (*xdr.Stream, error) {
if !strings.HasSuffix(pth, ".xdr.gz") {
return nil, errors.New("File has non-.xdr.gz suffix: " + pth)
}
rdr, err := a.cachedGet(pth)
if err != nil {
return nil, err
}
return NewXdrGzStream(rdr)
return xdr.NewGzStream(rdr)
}

func (a *Archive) cachedGet(pth string) (io.ReadCloser, error) {
Expand Down
8 changes: 4 additions & 4 deletions historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,17 @@ func (pa *ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions)
})
}

func (pa *ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) {
var stream *XdrStream
func (pa *ArchivePool) GetXdrStreamForHash(hash Hash) (*xdr.Stream, error) {
var stream *xdr.Stream
return stream, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
stream, err = ai.GetXdrStreamForHash(hash)
return err
})
}

func (pa *ArchivePool) GetXdrStream(pth string) (*XdrStream, error) {
var stream *XdrStream
func (pa *ArchivePool) GetXdrStream(pth string) (*xdr.Stream, error) {
var stream *xdr.Stream
return stream, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
stream, err = ai.GetXdrStream(pth)
Expand Down
4 changes: 3 additions & 1 deletion historyarchive/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"

"github.com/stellar/go/xdr"
)

type Hash [sha256.Size]byte
type Hash xdr.Hash

func DecodeHash(s string) (Hash, error) {
var h Hash
Expand Down
2 changes: 1 addition & 1 deletion historyarchive/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func DumpXdrAsJson(args []string) error {
}

base := path.Base(arg)
xr := NewXdrStream(rdr)
xr := xdr.NewStream(rdr)
n := 0
for {
var lhe xdr.LedgerHeaderHistoryEntry
Expand Down
11 changes: 6 additions & 5 deletions historyarchive/mocks.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package historyarchive

import (
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/mock"

"github.com/stellar/go/xdr"
)

// MockArchive is a mockable archive.
Expand Down Expand Up @@ -99,14 +100,14 @@ func (m *MockArchive) ListCategoryCheckpoints(cat string, pth string) (chan uint
return make(chan uint32), make(chan error)
}

func (m *MockArchive) GetXdrStreamForHash(hash Hash) (*XdrStream, error) {
func (m *MockArchive) GetXdrStreamForHash(hash Hash) (*xdr.Stream, error) {
a := m.Called(hash)
return a.Get(0).(*XdrStream), a.Error(1)
return a.Get(0).(*xdr.Stream), a.Error(1)
}

func (m *MockArchive) GetXdrStream(pth string) (*XdrStream, error) {
func (m *MockArchive) GetXdrStream(pth string) (*xdr.Stream, error) {
a := m.Called(pth)
return a.Get(0).(*XdrStream), a.Error(1)
return a.Get(0).(*xdr.Stream), a.Error(1)
}

func (m *MockArchive) GetStats() []ArchiveStats {
Expand Down
14 changes: 7 additions & 7 deletions historyarchive/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func SortTxsForHash(txset *xdr.TransactionSet) error {
hsh: make([]Hash, len(txset.Txs)),
}
for i, tx := range txset.Txs {
h, err := HashXdr(&tx)
h, err := xdr.HashXdr(&tx)
if err != nil {
return err
}
bh.hsh[i] = h
bh.hsh[i] = Hash(h)
}
sort.Sort(bh)
return nil
Expand Down Expand Up @@ -86,18 +86,18 @@ func HashEmptyTxSet(previousLedgerHash Hash) Hash {
}

func (arch *Archive) VerifyLedgerHeaderHistoryEntry(entry *xdr.LedgerHeaderHistoryEntry) error {
h, err := HashXdr(&entry.Header)
h, err := xdr.HashXdr(&entry.Header)
if err != nil {
return err
}
if h != Hash(entry.Hash) {
if h != entry.Hash {
return fmt.Errorf("Ledger %d expected hash %s, got %s",
entry.Header.LedgerSeq, Hash(entry.Hash), Hash(h))
}
arch.mutex.Lock()
defer arch.mutex.Unlock()
seq := uint32(entry.Header.LedgerSeq)
arch.actualLedgerHashes[seq] = h
arch.actualLedgerHashes[seq] = Hash(h)
arch.expectLedgerHashes[seq-1] = Hash(entry.Header.PreviousLedgerHash)
arch.expectTxSetHashes[seq] = Hash(entry.Header.ScpValue.TxSetHash)
arch.expectTxResultSetHashes[seq] = Hash(entry.Header.TxSetResultHash)
Expand All @@ -117,13 +117,13 @@ func (arch *Archive) VerifyTransactionHistoryEntry(entry *xdr.TransactionHistory
}

func (arch *Archive) VerifyTransactionHistoryResultEntry(entry *xdr.TransactionHistoryResultEntry) error {
h, err := HashXdr(&entry.TxResultSet)
h, err := xdr.HashXdr(&entry.TxResultSet)
if err != nil {
return err
}
arch.mutex.Lock()
defer arch.mutex.Unlock()
arch.actualTxResultSetHashes[uint32(entry.LedgerSeq)] = h
arch.actualTxResultSetHashes[uint32(entry.LedgerSeq)] = Hash(h)
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions ingest/checkpoint_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewCheckpointChangeReader(
// associated with the CheckpointChangeReader matches the expectedHash.
// Assuming expectedHash comes from a trusted source (captive-core running in unbounded mode), this
// check will give you full security that the data returned by the CheckpointChangeReader can be trusted.
// Note that XdrStream will verify all the ledger entries from an individual bucket and
// Note that Stream will verify all the ledger entries from an individual bucket and
// VerifyBucketList() verifies the entire list of bucket hashes.
func (r *CheckpointChangeReader) VerifyBucketList(expectedHash xdr.Hash) error {
historyBucketListHash, err := r.has.BucketListHash()
Expand Down Expand Up @@ -216,14 +216,14 @@ func (r *CheckpointChangeReader) streamBuckets() {
// If any errors are encountered while reading from `stream`, readBucketEntry will
// retry the operation using a new *historyarchive.XdrStream.
// The total number of retries will not exceed `maxStreamRetries`.
func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStream, hash historyarchive.Hash) (
func (r *CheckpointChangeReader) readBucketEntry(stream *xdr.Stream, hash historyarchive.Hash) (
xdr.BucketEntry,
error,
) {
var entry xdr.BucketEntry
var err error
currentPosition := stream.BytesRead()
gzipCurrentPosition := stream.GzipBytesRead()
gzipCurrentPosition := stream.CompressedBytesRead()

for attempts := 0; ; attempts++ {
if r.ctx.Err() != nil {
Expand All @@ -234,7 +234,7 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStrea
err = stream.ReadOne(&entry)
if err == nil || err == io.EOF {
r.readBytesMutex.Lock()
r.totalRead += stream.GzipBytesRead() - gzipCurrentPosition
r.totalRead += stream.CompressedBytesRead() - gzipCurrentPosition
r.readBytesMutex.Unlock()
break
}
Expand All @@ -245,7 +245,7 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStrea

stream.Close()

var retryStream *historyarchive.XdrStream
var retryStream *xdr.Stream
retryStream, err = r.newXDRStream(hash)
if err != nil {
err = errors.Wrap(err, "Error creating new xdr stream")
Expand All @@ -265,7 +265,7 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStrea
}

func (r *CheckpointChangeReader) newXDRStream(hash historyarchive.Hash) (
*historyarchive.XdrStream,
*xdr.Stream,
error,
) {
rdr, e := r.archive.GetXdrStreamForHash(hash)
Expand Down
16 changes: 8 additions & 8 deletions ingest/checkpoint_change_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *SingleLedgerStateReaderTestSuite) SetupTest() {
s.Require().NoError(err)
s.Assert().Equal(ledgerSeq, s.reader.sequence)

// Disable hash validation. We trust historyarchive.XdrStream tests here.
// Disable hash validation. We trust historyarchive.Stream tests here.
s.reader.disableBucketListHashValidation = true
}

Expand Down Expand Up @@ -630,7 +630,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetryFailsToCreateNewStream() {
On("GetXdrStreamForHash", emptyHash).
Return(createInvalidXdrStream(nil), nil).Once()

var nilStream *historyarchive.XdrStream
var nilStream *xdr.Stream
s.mockArchive.
On("GetXdrStreamForHash", emptyHash).
Return(nilStream, errors.New("cannot create new stream")).Times(3)
Expand All @@ -649,7 +649,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceedsAfterFailsToCreateN
On("GetXdrStreamForHash", emptyHash).
Return(createInvalidXdrStream(nil), nil).Once()

var nilStream *historyarchive.XdrStream
var nilStream *xdr.Stream
s.mockArchive.
On("GetXdrStreamForHash", emptyHash).
Return(nilStream, errors.New("cannot create new stream")).Once()
Expand Down Expand Up @@ -948,11 +948,11 @@ type errCloser struct {

func (e errCloser) Close() error { return e.err }

func createInvalidXdrStream(closeError error) *historyarchive.XdrStream {
func createInvalidXdrStream(closeError error) *xdr.Stream {
b := &bytes.Buffer{}
writeInvalidFrame(b)

return historyarchive.NewXdrStream(errCloser{b, closeError})
return xdr.NewStream(errCloser{b, closeError})
}

func writeInvalidFrame(b *bytes.Buffer) {
Expand All @@ -965,7 +965,7 @@ func writeInvalidFrame(b *bytes.Buffer) {
b.Truncate(bufferSize + frameSize/2)
}

func createXdrStream(entries ...xdr.BucketEntry) *historyarchive.XdrStream {
func createXdrStream(entries ...xdr.BucketEntry) *xdr.Stream {
b := &bytes.Buffer{}
for _, e := range entries {
err := xdr.MarshalFramed(b, e)
Expand All @@ -977,8 +977,8 @@ func createXdrStream(entries ...xdr.BucketEntry) *historyarchive.XdrStream {
return xdrStreamFromBuffer(b)
}

func xdrStreamFromBuffer(b *bytes.Buffer) *historyarchive.XdrStream {
return historyarchive.NewXdrStream(ioutil.NopCloser(b))
func xdrStreamFromBuffer(b *bytes.Buffer) *xdr.Stream {
return xdr.NewStream(ioutil.NopCloser(b))
}

// getNextBucket is a helper that returns next bucket hash in the order of processing.
Expand Down
Loading
Loading