diff --git a/historyarchive/archive.go b/historyarchive/archive.go index d97471b42f..5819aa8f5a 100644 --- a/historyarchive/archive.go +++ b/historyarchive/archive.go @@ -79,8 +79,8 @@ type ArchiveInterface interface { ListAllBuckets() (chan string, chan error) ListAllBucketHashes() (chan Hash, chan error) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) - GetXdrStreamForHash(hash Hash) (*XdrStream, error) - GetXdrStream(pth string) (*XdrStream, error) + GetXdrStreamForHash(hash Hash) (*xdr.Stream, error) + GetXdrStream(pth string) (*xdr.Stream, error) GetCheckpointManager() CheckpointManager GetStats() []ArchiveStats } @@ -391,11 +391,11 @@ func (a *Archive) GetBucketPathForHash(hash Hash) string { ) } -func (a *Archive) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { +func (a *Archive) GetXdrStreamForHash(hash Hash) (*xdr.Stream, error) { return a.GetXdrStream(a.GetBucketPathForHash(hash)) } -func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) { +func (a *Archive) GetXdrStream(pth string) (*xdr.Stream, error) { if !strings.HasSuffix(pth, ".xdr.gz") { return nil, errors.New("File has non-.xdr.gz suffix: " + pth) } @@ -403,7 +403,7 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) { if err != nil { return nil, err } - return NewXdrGzStream(rdr) + return xdr.NewGzStream(rdr) } func (a *Archive) cachedGet(pth string) (io.ReadCloser, error) { diff --git a/historyarchive/archive_pool.go b/historyarchive/archive_pool.go index 28967d8aa6..784cbba36b 100644 --- a/historyarchive/archive_pool.go +++ b/historyarchive/archive_pool.go @@ -226,8 +226,8 @@ func (pa *ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) }) } -func (pa *ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { - var stream *XdrStream +func (pa *ArchivePool) GetXdrStreamForHash(hash Hash) (*xdr.Stream, error) { + var stream *xdr.Stream return stream, pa.runRoundRobin(func(ai ArchiveInterface) error { var err error stream, err = ai.GetXdrStreamForHash(hash) @@ -235,8 +235,8 @@ func (pa *ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { }) } -func (pa *ArchivePool) GetXdrStream(pth string) (*XdrStream, error) { - var stream *XdrStream +func (pa *ArchivePool) GetXdrStream(pth string) (*xdr.Stream, error) { + var stream *xdr.Stream return stream, pa.runRoundRobin(func(ai ArchiveInterface) error { var err error stream, err = ai.GetXdrStream(pth) diff --git a/historyarchive/hash.go b/historyarchive/hash.go index ef4727ab1b..868d741d61 100644 --- a/historyarchive/hash.go +++ b/historyarchive/hash.go @@ -8,9 +8,11 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + + "github.com/stellar/go/xdr" ) -type Hash [sha256.Size]byte +type Hash xdr.Hash func DecodeHash(s string) (Hash, error) { var h Hash diff --git a/historyarchive/json.go b/historyarchive/json.go index 73bcfa11f1..734b776f5e 100644 --- a/historyarchive/json.go +++ b/historyarchive/json.go @@ -35,7 +35,7 @@ func DumpXdrAsJson(args []string) error { } base := path.Base(arg) - xr := NewXdrStream(rdr) + xr := xdr.NewStream(rdr) n := 0 for { var lhe xdr.LedgerHeaderHistoryEntry diff --git a/historyarchive/mocks.go b/historyarchive/mocks.go index efe333cd33..4224506cc0 100644 --- a/historyarchive/mocks.go +++ b/historyarchive/mocks.go @@ -1,8 +1,9 @@ package historyarchive import ( - "github.com/stellar/go/xdr" "github.com/stretchr/testify/mock" + + "github.com/stellar/go/xdr" ) // MockArchive is a mockable archive. @@ -99,14 +100,14 @@ func (m *MockArchive) ListCategoryCheckpoints(cat string, pth string) (chan uint return make(chan uint32), make(chan error) } -func (m *MockArchive) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { +func (m *MockArchive) GetXdrStreamForHash(hash Hash) (*xdr.Stream, error) { a := m.Called(hash) - return a.Get(0).(*XdrStream), a.Error(1) + return a.Get(0).(*xdr.Stream), a.Error(1) } -func (m *MockArchive) GetXdrStream(pth string) (*XdrStream, error) { +func (m *MockArchive) GetXdrStream(pth string) (*xdr.Stream, error) { a := m.Called(pth) - return a.Get(0).(*XdrStream), a.Error(1) + return a.Get(0).(*xdr.Stream), a.Error(1) } func (m *MockArchive) GetStats() []ArchiveStats { diff --git a/historyarchive/verify.go b/historyarchive/verify.go index b7171c69c6..4cfdd10b23 100644 --- a/historyarchive/verify.go +++ b/historyarchive/verify.go @@ -51,11 +51,11 @@ func SortTxsForHash(txset *xdr.TransactionSet) error { hsh: make([]Hash, len(txset.Txs)), } for i, tx := range txset.Txs { - h, err := HashXdr(&tx) + h, err := xdr.HashXdr(&tx) if err != nil { return err } - bh.hsh[i] = h + bh.hsh[i] = Hash(h) } sort.Sort(bh) return nil @@ -86,18 +86,18 @@ func HashEmptyTxSet(previousLedgerHash Hash) Hash { } func (arch *Archive) VerifyLedgerHeaderHistoryEntry(entry *xdr.LedgerHeaderHistoryEntry) error { - h, err := HashXdr(&entry.Header) + h, err := xdr.HashXdr(&entry.Header) if err != nil { return err } - if h != Hash(entry.Hash) { + if h != entry.Hash { return fmt.Errorf("Ledger %d expected hash %s, got %s", entry.Header.LedgerSeq, Hash(entry.Hash), Hash(h)) } arch.mutex.Lock() defer arch.mutex.Unlock() seq := uint32(entry.Header.LedgerSeq) - arch.actualLedgerHashes[seq] = h + arch.actualLedgerHashes[seq] = Hash(h) arch.expectLedgerHashes[seq-1] = Hash(entry.Header.PreviousLedgerHash) arch.expectTxSetHashes[seq] = Hash(entry.Header.ScpValue.TxSetHash) arch.expectTxResultSetHashes[seq] = Hash(entry.Header.TxSetResultHash) @@ -117,13 +117,13 @@ func (arch *Archive) VerifyTransactionHistoryEntry(entry *xdr.TransactionHistory } func (arch *Archive) VerifyTransactionHistoryResultEntry(entry *xdr.TransactionHistoryResultEntry) error { - h, err := HashXdr(&entry.TxResultSet) + h, err := xdr.HashXdr(&entry.TxResultSet) if err != nil { return err } arch.mutex.Lock() defer arch.mutex.Unlock() - arch.actualTxResultSetHashes[uint32(entry.LedgerSeq)] = h + arch.actualTxResultSetHashes[uint32(entry.LedgerSeq)] = Hash(h) return nil } diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index e84e7631cf..6d26ec7f3b 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -103,7 +103,7 @@ func NewCheckpointChangeReader( // associated with the CheckpointChangeReader matches the expectedHash. // Assuming expectedHash comes from a trusted source (captive-core running in unbounded mode), this // check will give you full security that the data returned by the CheckpointChangeReader can be trusted. -// Note that XdrStream will verify all the ledger entries from an individual bucket and +// Note that Stream will verify all the ledger entries from an individual bucket and // VerifyBucketList() verifies the entire list of bucket hashes. func (r *CheckpointChangeReader) VerifyBucketList(expectedHash xdr.Hash) error { historyBucketListHash, err := r.has.BucketListHash() @@ -216,14 +216,14 @@ func (r *CheckpointChangeReader) streamBuckets() { // If any errors are encountered while reading from `stream`, readBucketEntry will // retry the operation using a new *historyarchive.XdrStream. // The total number of retries will not exceed `maxStreamRetries`. -func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStream, hash historyarchive.Hash) ( +func (r *CheckpointChangeReader) readBucketEntry(stream *xdr.Stream, hash historyarchive.Hash) ( xdr.BucketEntry, error, ) { var entry xdr.BucketEntry var err error currentPosition := stream.BytesRead() - gzipCurrentPosition := stream.GzipBytesRead() + gzipCurrentPosition := stream.CompressedBytesRead() for attempts := 0; ; attempts++ { if r.ctx.Err() != nil { @@ -234,7 +234,7 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStrea err = stream.ReadOne(&entry) if err == nil || err == io.EOF { r.readBytesMutex.Lock() - r.totalRead += stream.GzipBytesRead() - gzipCurrentPosition + r.totalRead += stream.CompressedBytesRead() - gzipCurrentPosition r.readBytesMutex.Unlock() break } @@ -245,7 +245,7 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStrea stream.Close() - var retryStream *historyarchive.XdrStream + var retryStream *xdr.Stream retryStream, err = r.newXDRStream(hash) if err != nil { err = errors.Wrap(err, "Error creating new xdr stream") @@ -265,7 +265,7 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStrea } func (r *CheckpointChangeReader) newXDRStream(hash historyarchive.Hash) ( - *historyarchive.XdrStream, + *xdr.Stream, error, ) { rdr, e := r.archive.GetXdrStreamForHash(hash) diff --git a/ingest/checkpoint_change_reader_test.go b/ingest/checkpoint_change_reader_test.go index 35ec2bb076..872367ef17 100644 --- a/ingest/checkpoint_change_reader_test.go +++ b/ingest/checkpoint_change_reader_test.go @@ -67,7 +67,7 @@ func (s *SingleLedgerStateReaderTestSuite) SetupTest() { s.Require().NoError(err) s.Assert().Equal(ledgerSeq, s.reader.sequence) - // Disable hash validation. We trust historyarchive.XdrStream tests here. + // Disable hash validation. We trust historyarchive.Stream tests here. s.reader.disableBucketListHashValidation = true } @@ -630,7 +630,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetryFailsToCreateNewStream() { On("GetXdrStreamForHash", emptyHash). Return(createInvalidXdrStream(nil), nil).Once() - var nilStream *historyarchive.XdrStream + var nilStream *xdr.Stream s.mockArchive. On("GetXdrStreamForHash", emptyHash). Return(nilStream, errors.New("cannot create new stream")).Times(3) @@ -649,7 +649,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceedsAfterFailsToCreateN On("GetXdrStreamForHash", emptyHash). Return(createInvalidXdrStream(nil), nil).Once() - var nilStream *historyarchive.XdrStream + var nilStream *xdr.Stream s.mockArchive. On("GetXdrStreamForHash", emptyHash). Return(nilStream, errors.New("cannot create new stream")).Once() @@ -948,11 +948,11 @@ type errCloser struct { func (e errCloser) Close() error { return e.err } -func createInvalidXdrStream(closeError error) *historyarchive.XdrStream { +func createInvalidXdrStream(closeError error) *xdr.Stream { b := &bytes.Buffer{} writeInvalidFrame(b) - return historyarchive.NewXdrStream(errCloser{b, closeError}) + return xdr.NewStream(errCloser{b, closeError}) } func writeInvalidFrame(b *bytes.Buffer) { @@ -965,7 +965,7 @@ func writeInvalidFrame(b *bytes.Buffer) { b.Truncate(bufferSize + frameSize/2) } -func createXdrStream(entries ...xdr.BucketEntry) *historyarchive.XdrStream { +func createXdrStream(entries ...xdr.BucketEntry) *xdr.Stream { b := &bytes.Buffer{} for _, e := range entries { err := xdr.MarshalFramed(b, e) @@ -977,8 +977,8 @@ func createXdrStream(entries ...xdr.BucketEntry) *historyarchive.XdrStream { return xdrStreamFromBuffer(b) } -func xdrStreamFromBuffer(b *bytes.Buffer) *historyarchive.XdrStream { - return historyarchive.NewXdrStream(ioutil.NopCloser(b)) +func xdrStreamFromBuffer(b *bytes.Buffer) *xdr.Stream { + return xdr.NewStream(ioutil.NopCloser(b)) } // getNextBucket is a helper that returns next bucket hash in the order of processing. diff --git a/ingest/loadtest/ledger_backend.go b/ingest/loadtest/ledger_backend.go new file mode 100644 index 0000000000..b02188edb6 --- /dev/null +++ b/ingest/loadtest/ledger_backend.go @@ -0,0 +1,405 @@ +package loadtest + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "time" + + "github.com/klauspost/compress/zstd" + + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/xdr" +) + +// LedgerBackend is used to load test ingestion. +// LedgerBackend will take a file of synthetically generated ledgers (see +// services/horizon/internal/integration/generate_ledgers_test.go) and merge those ledgers +// with real ledgers from the Stellar network. The merged ledgers will then be replayed to +// the ingesting down stream system at a configurable rate. +type LedgerBackend struct { + config LedgerBackendConfig + mergedLedgersFilePath string + mergedLedgersStream *xdr.Stream + startTime time.Time + startLedgerSeq uint32 + nextLedgerSeq uint32 + latestLedgerSeq uint32 + preparedRange ledgerbackend.Range + cachedLedger xdr.LedgerCloseMeta +} + +// LedgerBackendConfig configures LedgerBackend +type LedgerBackendConfig struct { + // NetworkPassphrase is the passphrase of the Stellar network from where the real ledgers + // will be obtained + NetworkPassphrase string + // LedgerBackend is the source of the real ledgers + LedgerBackend ledgerbackend.LedgerBackend + // LedgersFilePath is a file containing the synthetic ledgers that will be combined with the + // real ledgers and then replayed by LedgerBackend + LedgersFilePath string + // LedgerEntriesFilePath is a file containing the ledger entry fixtures for the synthetic ledgers + LedgerEntriesFilePath string + // LedgerCloseDuration is the rate at which ledgers will be replayed from LedgerBackend + LedgerCloseDuration time.Duration +} + +// NewLedgerBackend constructs an LedgerBackend instance +func NewLedgerBackend(config LedgerBackendConfig) *LedgerBackend { + return &LedgerBackend{ + config: config, + } +} + +func (r *LedgerBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { + if r.nextLedgerSeq == 0 { + return 0, fmt.Errorf("PrepareRange() must be called before GetLatestLedgerSequence()") + } + + return r.latestLedgerSeq, nil +} + +func readLedgerEntries(path string) ([]xdr.LedgerEntry, error) { + file, err := os.Open(path) + if err != nil { + return nil, err + } + stream, err := xdr.NewZstdStream(file) + if err != nil { + return nil, err + } + + var entries []xdr.LedgerEntry + for { + var entry xdr.LedgerEntry + err = stream.ReadOne(&entry) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + entries = append(entries, entry) + } + + if err = stream.Close(); err != nil { + return nil, err + } + return entries, nil +} + +func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error { + if r.nextLedgerSeq != 0 { + if r.isPrepared(ledgerRange) { + return nil + } + return fmt.Errorf("PrepareRange() already called") + } + generatedLedgerEntries, err := readLedgerEntries(r.config.LedgerEntriesFilePath) + if err != nil { + return err + } + generatedLedgersFile, err := os.Open(r.config.LedgersFilePath) + if err != nil { + return err + } + generatedLedgers, err := xdr.NewZstdStream(generatedLedgersFile) + if err != nil { + return err + } + + err = r.config.LedgerBackend.PrepareRange(ctx, ledgerRange) + if err != nil { + return err + } + cur := ledgerRange.From() + firstLedger, err := r.config.LedgerBackend.GetLedger(ctx, cur) + if err != nil { + return err + } + var changes xdr.LedgerEntryChanges + for i := 0; i < len(generatedLedgerEntries); i++ { + changes = append(changes, xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryCreated, + Created: &generatedLedgerEntries[i], + }) + } + var flag xdr.Uint32 = 1 + firstLedger.V1.UpgradesProcessing = append(firstLedger.V1.UpgradesProcessing, xdr.UpgradeEntryMeta{ + Upgrade: xdr.LedgerUpgrade{ + Type: xdr.LedgerUpgradeTypeLedgerUpgradeFlags, + NewFlags: &flag, + }, + Changes: changes, + }) + + mergedLedgersFile, err := os.CreateTemp("", "merged-ledgers") + if err != nil { + return err + } + cleanup := true + defer func() { + if cleanup { + os.Remove(mergedLedgersFile.Name()) + } + }() + writer, err := zstd.NewWriter(mergedLedgersFile) + if err != nil { + return err + } + + var latestLedgerSeq uint32 + for cur = cur + 1; !ledgerRange.Bounded() || cur <= ledgerRange.To(); cur++ { + var ledger xdr.LedgerCloseMeta + ledger, err = r.config.LedgerBackend.GetLedger(ctx, cur) + if err != nil { + return err + } + var generatedLedger xdr.LedgerCloseMeta + if err = generatedLedgers.ReadOne(&generatedLedger); err == io.EOF { + break + } else if err != nil { + return err + } + if err = MergeLedgers(r.config.NetworkPassphrase, &ledger, generatedLedger); err != nil { + return err + } + if err = xdr.MarshalFramed(writer, ledger); err != nil { + return err + } + latestLedgerSeq = cur + } + if err = generatedLedgers.Close(); err != nil { + return err + } + if err = writer.Close(); err != nil { + return err + } + if err = mergedLedgersFile.Sync(); err != nil { + return err + } + + if _, err = mergedLedgersFile.Seek(0, 0); err != nil { + return err + } + mergedLedgersStream, err := xdr.NewZstdStream(mergedLedgersFile) + if err != nil { + return err + } + cleanup = false + + r.mergedLedgersFilePath = mergedLedgersFile.Name() + r.mergedLedgersStream = mergedLedgersStream + // from this point, ledgers will be available at a rate of once + // every r.ledgerCloseDuration time has elapsed + r.startTime = time.Now() + r.startLedgerSeq = ledgerRange.From() + r.nextLedgerSeq = r.startLedgerSeq + 1 + r.latestLedgerSeq = latestLedgerSeq + r.cachedLedger = firstLedger + r.preparedRange = ledgerRange + return nil +} + +func (r *LedgerBackend) IsPrepared(ctx context.Context, ledgerRange ledgerbackend.Range) (bool, error) { + return r.isPrepared(ledgerRange), nil +} + +func (r *LedgerBackend) isPrepared(ledgerRange ledgerbackend.Range) bool { + if r.nextLedgerSeq == 0 { + return false + } + + if r.preparedRange.Bounded() != ledgerRange.Bounded() { + return false + } + + if ledgerRange.From() < r.cachedLedger.LedgerSequence() { + return false + } + + return ledgerRange.From() >= r.cachedLedger.LedgerSequence() && ledgerRange.To() <= r.preparedRange.To() +} + +func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + if r.nextLedgerSeq == 0 { + return xdr.LedgerCloseMeta{}, fmt.Errorf("PrepareRange() must be called before GetLedger()") + } + if sequence < r.cachedLedger.LedgerSequence() { + return xdr.LedgerCloseMeta{}, fmt.Errorf( + "sequence number %v is behind the ledger stream sequence %d", + sequence, + r.cachedLedger.LedgerSequence(), + ) + } + for ; r.nextLedgerSeq <= sequence; r.nextLedgerSeq++ { + var ledger xdr.LedgerCloseMeta + if err := r.mergedLedgersStream.ReadOne(&ledger); err == io.EOF { + return ledger, fmt.Errorf( + "sequence number %v is greater than the latest ledger available", + sequence, + ) + } else if err != nil { + return ledger, err + } + if ledger.LedgerSequence() != r.nextLedgerSeq { + return ledger, fmt.Errorf( + "unexpected ledger sequence (expected=%d actual=%d)", + r.nextLedgerSeq, + ledger.LedgerSequence(), + ) + } + r.cachedLedger = ledger + } + i := int(sequence - r.startLedgerSeq) + // the i'th ledger will only be available after (i+1) * r.ledgerCloseDuration time has elapsed + closeTime := r.startTime.Add(time.Duration(i+1) * r.config.LedgerCloseDuration) + time.Sleep(time.Until(closeTime)) + return r.cachedLedger, nil +} + +func (r *LedgerBackend) Close() error { + if err := r.config.LedgerBackend.Close(); err != nil { + return err + } + if r.mergedLedgersStream != nil { + // closing the stream will also close the ledgers file + if err := r.mergedLedgersStream.Close(); err != nil { + return err + } + if err := os.Remove(r.mergedLedgersFilePath); err != nil { + return err + } + } + return nil +} + +func validLedger(ledger xdr.LedgerCloseMeta) error { + if _, ok := ledger.GetV1(); !ok { + return fmt.Errorf("ledger version %v is not supported", ledger.V) + } + if _, ok := ledger.MustV1().TxSet.GetV1TxSet(); !ok { + return fmt.Errorf("ledger txset %v is not supported", ledger.MustV1().TxSet.V) + } + return nil +} + +func extractChanges(networkPassphrase string, changeMap map[string][]ingest.Change, ledger xdr.LedgerCloseMeta) error { + reader, err := ingest.NewLedgerChangeReaderFromLedgerCloseMeta(networkPassphrase, ledger) + if err != nil { + return err + } + for { + var change ingest.Change + var ledgerKey xdr.LedgerKey + var b64 string + change, err = reader.Read() + if err == io.EOF { + break + } else if err != nil { + return err + } + ledgerKey, err = change.LedgerKey() + if err != nil { + return err + } + b64, err = ledgerKey.MarshalBinaryBase64() + if err != nil { + return err + } + changeMap[b64] = append(changeMap[b64], change) + } + return nil +} + +func changeIsEqual(a, b ingest.Change) (bool, error) { + if a.Type != b.Type || a.Reason != b.Reason { + return false, nil + } + if a.Pre == nil { + if b.Pre != nil { + return false, nil + } + } else { + if ok, err := xdr.Equals(a.Pre, b.Pre); err != nil || !ok { + return ok, err + } + } + if a.Post == nil { + if b.Post != nil { + return false, nil + } + } else { + if ok, err := xdr.Equals(a.Post, b.Post); err != nil || !ok { + return ok, err + } + } + return true, nil +} + +func changesAreEqual(a, b map[string][]ingest.Change) (bool, error) { + if len(a) != len(b) { + return false, nil + } + for key, aChanges := range a { + bChanges := b[key] + if len(aChanges) != len(bChanges) { + return false, nil + } + for i, aChange := range aChanges { + bChange := bChanges[i] + if ok, err := changeIsEqual(aChange, bChange); !ok || err != nil { + return ok, err + } + } + } + return true, nil +} + +// MergeLedgers merges two xdr.LedgerCloseMeta instances. +func MergeLedgers(networkPassphrase string, dst *xdr.LedgerCloseMeta, src xdr.LedgerCloseMeta) error { + if err := validLedger(*dst); err != nil { + return err + } + if err := validLedger(src); err != nil { + return err + } + + combinedChangesByKey := map[string][]ingest.Change{} + if err := extractChanges(networkPassphrase, combinedChangesByKey, *dst); err != nil { + return err + } + if err := extractChanges(networkPassphrase, combinedChangesByKey, src); err != nil { + return err + } + + // src is merged into dst by appending all the transactions from src into dst, + // appending all the upgrades from src into dst, and appending all the evictions + // from src into dst + dst.V1.TxSet.V1TxSet.Phases = append(dst.V1.TxSet.V1TxSet.Phases, src.V1.TxSet.V1TxSet.Phases...) + dst.V1.TxProcessing = append(dst.V1.TxProcessing, src.V1.TxProcessing...) + dst.V1.UpgradesProcessing = append(dst.V1.UpgradesProcessing, src.V1.UpgradesProcessing...) + dst.V1.EvictedTemporaryLedgerKeys = append(dst.V1.EvictedTemporaryLedgerKeys, src.V1.EvictedTemporaryLedgerKeys...) + dst.V1.EvictedPersistentLedgerEntries = append(dst.V1.EvictedPersistentLedgerEntries, src.V1.EvictedPersistentLedgerEntries...) + + mergedChangesByKey := map[string][]ingest.Change{} + if err := extractChanges(networkPassphrase, mergedChangesByKey, *dst); err != nil { + return err + } + + // a merge is valid if the ordered list of changes emitted by the merged ledger is equal to + // the list of changes emitted by dst concatenated by the list of changes emitted by src, or + // in other words: + // extractChanges(merge(dst, src)) == concat(extractChanges(dst), extractChanges(src)) + if ok, err := changesAreEqual(combinedChangesByKey, mergedChangesByKey); err != nil { + return err + } else if !ok { + return errors.New("order of changes are not preserved") + } + + return nil +} diff --git a/services/horizon/internal/ingest/history_archive_adapter_test.go b/services/horizon/internal/ingest/history_archive_adapter_test.go index 168c812d5d..2f82ed3a5a 100644 --- a/services/horizon/internal/ingest/history_archive_adapter_test.go +++ b/services/horizon/internal/ingest/history_archive_adapter_test.go @@ -197,6 +197,6 @@ func getTestArchive() (historyarchive.ArchiveInterface, error) { Return(int64(100), nil) mockArchive. On("GetXdrStreamForHash", mock.AnythingOfType("historyarchive.Hash")). - Return(historyarchive.CreateXdrStream(bucketEntry), nil) + Return(xdr.CreateXdrStream(bucketEntry), nil) return mockArchive, nil } diff --git a/services/horizon/internal/integration/change_test.go b/services/horizon/internal/integration/change_test.go index 0f4a19908d..b80ee6770b 100644 --- a/services/horizon/internal/integration/change_test.go +++ b/services/horizon/internal/integration/change_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" @@ -150,15 +151,10 @@ func getLedgers(itest *integration.Test, startingLedger uint32, endLedger uint32 t := itest.CurrentTest() ccConfig, err := itest.CreateCaptiveCoreConfig() - if err != nil { - t.Fatalf("unable to create captive core config: %v", err) - } + require.NoError(t, err) - captiveCore, err := ledgerbackend.NewCaptive(*ccConfig) - if err != nil { - t.Fatalf("unable to create captive core: %v", err) - } - defer captiveCore.Close() + captiveCore, err := ledgerbackend.NewCaptive(ccConfig) + require.NoError(t, err) ctx := context.Background() err = captiveCore.PrepareRange(ctx, ledgerbackend.BoundedRange(startingLedger, endLedger)) @@ -175,6 +171,7 @@ func getLedgers(itest *integration.Test, startingLedger uint32, endLedger uint32 seqToLedgersMap[ledgerSeq] = ledger } + require.NoError(t, captiveCore.Close()) return seqToLedgersMap } diff --git a/services/horizon/internal/integration/load_test.go b/services/horizon/internal/integration/generate_ledgers_test.go similarity index 90% rename from services/horizon/internal/integration/load_test.go rename to services/horizon/internal/integration/generate_ledgers_test.go index 3e97be721d..04d48bead5 100644 --- a/services/horizon/internal/integration/load_test.go +++ b/services/horizon/internal/integration/generate_ledgers_test.go @@ -13,12 +13,14 @@ import ( "testing" "time" + "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/require" "github.com/stellar/go/amount" "github.com/stellar/go/clients/horizonclient" "github.com/stellar/go/clients/stellarcore" "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/loadtest" "github.com/stellar/go/keypair" "github.com/stellar/go/protocols/horizon" proto "github.com/stellar/go/protocols/stellarcore" @@ -34,11 +36,13 @@ type sorobanTransaction struct { sequenceNumber int64 } -func TestLoad(t *testing.T) { +func TestGenerateLedgers(t *testing.T) { var transactionsPerLedger, ledgers, transfersPerTx int + var output bool flag.IntVar(&transactionsPerLedger, "transactions-per-ledger", 100, "number of transactions per ledger") flag.IntVar(&transfersPerTx, "transfers-per-tx", 10, "number of asset transfers for each transaction") flag.IntVar(&ledgers, "ledgers", 2, "number of ledgers to generate") + flag.BoolVar(&output, "output", false, "overwrite the generated output files") flag.Parse() if integration.GetCoreMaxSupportedProtocol() < 22 { @@ -205,12 +209,6 @@ func TestLoad(t *testing.T) { return sortedLegers[i].LedgerSequence() < sortedLegers[j].LedgerSequence() }) - output, err := os.Create(filepath.Join("testdata", "load-test-ledgers.xdr")) - require.NoError(t, err) - _, err = xdr.Marshal(output, sortedLegers) - require.NoError(t, err) - require.NoError(t, output.Close()) - ledgersForAccounts := getLedgers(itest, accountLedgers[0], accountLedgers[len(accountLedgers)-1]) var accountLedgerEntries []xdr.LedgerEntry accountSet := map[string]bool{} @@ -225,13 +223,11 @@ func TestLoad(t *testing.T) { } require.Len(t, accountLedgerEntries, 2*transactionsPerLedger) - output, err = os.Create(filepath.Join("testdata", "load-test-accounts.xdr")) - require.NoError(t, err) - _, err = xdr.Marshal(output, accountLedgerEntries) - require.NoError(t, err) - require.NoError(t, output.Close()) + if output { + writeFile(t, filepath.Join("testdata", "load-test-accounts.xdr.zstd"), accountLedgerEntries) + } - merged := mergeLedgers(t, sortedLegers, transactionsPerLedger) + merged := merge(t, sortedLegers, transactionsPerLedger) changes := extractChanges(t, sortedLegers) for _, change := range changes { if change.Type != xdr.LedgerEntryTypeAccount { @@ -250,6 +246,38 @@ func TestLoad(t *testing.T) { for i, original := range orignalTransactions { requireTransactionsMatch(t, original, mergedTransactions[i]) } + + if output { + writeFile(t, filepath.Join("testdata", "load-test-ledgers.xdr.zstd"), merged) + } +} + +func writeFile[T any](t *testing.T, path string, data []T) { + file, err := os.Create(path) + require.NoError(t, err) + writer, err := zstd.NewWriter(file) + require.NoError(t, err) + for _, entry := range data { + require.NoError(t, xdr.MarshalFramed(writer, entry)) + } + require.NoError(t, writer.Close()) + require.NoError(t, file.Close()) +} + +func readFile[T xdr.DecoderFrom](t *testing.T, path string, constructor func() T, consume func(T)) { + file, err := os.Open(path) + require.NoError(t, err) + stream, err := xdr.NewZstdStream(file) + require.NoError(t, err) + for { + entry := constructor() + if err = stream.ReadOne(entry); err == io.EOF { + break + } + require.NoError(t, err) + consume(entry) + } + require.NoError(t, stream.Close()) } func bulkTransfer( @@ -332,6 +360,7 @@ func requireChangesAreEqual(t *testing.T, a, b []ingest.Change) { aByLedgerKey := groupChangesByLedgerKey(t, a) bByLedgerKey := groupChangesByLedgerKey(t, b) + require.Equal(t, len(aByLedgerKey), len(bByLedgerKey)) for key, aChanges := range aByLedgerKey { bChanges := bByLedgerKey[key] require.Equal(t, len(aChanges), len(bChanges)) @@ -363,11 +392,9 @@ func requireTransactionsMatch(t *testing.T, a, b ingest.LedgerTransaction) { } func requireXDREquals(t *testing.T, a, b encoding.BinaryMarshaler) { - serialized, err := a.MarshalBinary() - require.NoError(t, err) - otherSerialized, err := b.MarshalBinary() + ok, err := xdr.Equals(a, b) require.NoError(t, err) - require.Equal(t, serialized, otherSerialized) + require.True(t, ok) } func txSubWorker( @@ -437,7 +464,7 @@ func waitForTransactions( }, time.Second*90, time.Millisecond*100) } -func mergeLedgers(t *testing.T, ledgers []xdr.LedgerCloseMeta, transactionsPerLedger int) []xdr.LedgerCloseMeta { +func merge(t *testing.T, ledgers []xdr.LedgerCloseMeta, transactionsPerLedger int) []xdr.LedgerCloseMeta { var merged []xdr.LedgerCloseMeta if len(ledgers) == 0 { return merged @@ -455,13 +482,11 @@ func mergeLedgers(t *testing.T, ledgers []xdr.LedgerCloseMeta, transactionsPerLe if curCount == 0 { cur = copyLedger(t, ledger) - cur.V1.TxProcessing = nil } else { - cur.V1.TxSet.V1TxSet.Phases = append(cur.V1.TxSet.V1TxSet.Phases, ledger.V1.TxSet.V1TxSet.Phases...) + require.NoError(t, loadtest.MergeLedgers(integration.StandaloneNetworkPassphrase, &cur, ledger)) } require.LessOrEqual(t, curCount+transactionCount, transactionsPerLedger) - cur.V1.TxProcessing = append(cur.V1.TxProcessing, ledger.V1.TxProcessing...) curCount += transactionCount if curCount == transactionsPerLedger { merged = append(merged, cur) diff --git a/services/horizon/internal/integration/ingestion_load_test.go b/services/horizon/internal/integration/ingestion_load_test.go new file mode 100644 index 0000000000..29c00998ac --- /dev/null +++ b/services/horizon/internal/integration/ingestion_load_test.go @@ -0,0 +1,169 @@ +package integration + +import ( + "context" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/ingest/loadtest" + "github.com/stellar/go/services/horizon/internal/test/integration" + "github.com/stellar/go/txnbuild" + "github.com/stellar/go/xdr" +) + +func TestLoadTestLedgerBackend(t *testing.T) { + itest := integration.NewTest(t, integration.Config{}) + senderKP, senderAccount := itest.CreateAccount("10000000") + recipientKP, _ := itest.CreateAccount("10000000") + + tx := itest.MustSubmitOperations( + senderAccount, + senderKP, + &txnbuild.Payment{ + SourceAccount: senderKP.Address(), + Destination: recipientKP.Address(), + Asset: txnbuild.NativeAsset{}, + Amount: "1000", + }, + ) + require.True(t, tx.Successful) + + ccConfig, err := itest.CreateCaptiveCoreConfig() + require.NoError(t, err) + + captiveCore, err := ledgerbackend.NewCaptive(ccConfig) + require.NoError(t, err) + + replayConfig := loadtest.LedgerBackendConfig{ + NetworkPassphrase: integration.StandaloneNetworkPassphrase, + LedgersFilePath: filepath.Join("testdata", "load-test-ledgers.xdr.zstd"), + LedgerEntriesFilePath: filepath.Join("testdata", "load-test-accounts.xdr.zstd"), + LedgerCloseDuration: 3 * time.Second / 2, + LedgerBackend: captiveCore, + } + loadTestBackend := loadtest.NewLedgerBackend(replayConfig) + + var generatedLedgers []xdr.LedgerCloseMeta + var generatedLedgerEntries []xdr.LedgerEntry + + readFile(t, replayConfig.LedgersFilePath, + func() *xdr.LedgerCloseMeta { return &xdr.LedgerCloseMeta{} }, + func(ledger *xdr.LedgerCloseMeta) { + generatedLedgers = append(generatedLedgers, *ledger) + }, + ) + readFile(t, replayConfig.LedgerEntriesFilePath, + func() *xdr.LedgerEntry { return &xdr.LedgerEntry{} }, + func(ledgerEntry *xdr.LedgerEntry) { + generatedLedgerEntries = append(generatedLedgerEntries, *ledgerEntry) + }, + ) + + startLedger := uint32(tx.Ledger - 1) + endLedger := startLedger + uint32(len(generatedLedgers)) + + _, err = loadTestBackend.GetLatestLedgerSequence(context.Background()) + require.EqualError(t, err, "PrepareRange() must be called before GetLatestLedgerSequence()") + + prepared, err := loadTestBackend.IsPrepared(context.Background(), ledgerbackend.BoundedRange(startLedger, endLedger)) + require.NoError(t, err) + require.False(t, prepared) + + waitForLedgerInArchive(t, 6*time.Minute, endLedger) + require.NoError(t, loadTestBackend.PrepareRange(context.Background(), ledgerbackend.BoundedRange(startLedger, endLedger))) + + latest, err := loadTestBackend.GetLatestLedgerSequence(context.Background()) + require.NoError(t, err) + require.Equal(t, endLedger, latest) + + prepared, err = loadTestBackend.IsPrepared(context.Background(), ledgerbackend.BoundedRange(startLedger, endLedger)) + require.NoError(t, err) + require.True(t, prepared) + + prepared, err = loadTestBackend.IsPrepared(context.Background(), ledgerbackend.BoundedRange(startLedger+1, endLedger)) + require.NoError(t, err) + require.True(t, prepared) + + prepared, err = loadTestBackend.IsPrepared(context.Background(), ledgerbackend.BoundedRange(startLedger-1, endLedger)) + require.NoError(t, err) + require.False(t, prepared) + + prepared, err = loadTestBackend.IsPrepared(context.Background(), ledgerbackend.BoundedRange(endLedger+1, endLedger+2)) + require.NoError(t, err) + require.False(t, prepared) + + prepared, err = loadTestBackend.IsPrepared(context.Background(), ledgerbackend.UnboundedRange(startLedger)) + require.NoError(t, err) + require.False(t, prepared) + + require.NoError( + t, + loadTestBackend.PrepareRange(context.Background(), ledgerbackend.BoundedRange(startLedger, endLedger)), + ) + require.NoError( + t, + loadTestBackend.PrepareRange(context.Background(), ledgerbackend.BoundedRange(startLedger+1, endLedger)), + ) + require.EqualError( + t, + loadTestBackend.PrepareRange(context.Background(), ledgerbackend.UnboundedRange(startLedger)), + "PrepareRange() already called", + ) + + _, err = loadTestBackend.GetLedger(context.Background(), startLedger-1) + require.EqualError(t, err, + fmt.Sprintf( + "sequence number %v is behind the ledger stream sequence %v", + startLedger-1, + startLedger, + ), + ) + + var ledgers []xdr.LedgerCloseMeta + for cur := startLedger; cur <= endLedger; cur++ { + startTime := time.Now() + var ledger xdr.LedgerCloseMeta + ledger, err = loadTestBackend.GetLedger(context.Background(), cur) + duration := time.Since(startTime) + require.NoError(t, err) + ledgers = append(ledgers, ledger) + require.WithinDuration(t, startTime.Add(replayConfig.LedgerCloseDuration), startTime.Add(duration), time.Millisecond*100) + } + + prepared, err = loadTestBackend.IsPrepared(context.Background(), ledgerbackend.BoundedRange(startLedger, endLedger)) + require.NoError(t, err) + require.False(t, prepared) + + _, err = loadTestBackend.GetLedger(context.Background(), endLedger+1) + require.EqualError(t, err, + fmt.Sprintf("sequence number %v is greater than the latest ledger available", endLedger+1), + ) + + require.NoError(t, loadTestBackend.Close()) + + originalLedgers := getLedgers(itest, startLedger, endLedger) + + changes := extractChanges(t, ledgers[0:1]) + expectedChanges := extractChanges(t, []xdr.LedgerCloseMeta{originalLedgers[startLedger]}) + for i := range generatedLedgerEntries { + expectedChanges = append(expectedChanges, ingest.Change{ + Type: generatedLedgerEntries[i].Data.Type, + Post: &generatedLedgerEntries[i], + Reason: ingest.LedgerEntryChangeReasonUpgrade, + }) + } + requireChangesAreEqual(t, expectedChanges, changes) + + for cur := startLedger + 1; cur <= endLedger; cur++ { + i := int(cur - startLedger) + changes = extractChanges(t, ledgers[i:i+1]) + expectedChanges = extractChanges(t, []xdr.LedgerCloseMeta{originalLedgers[cur], generatedLedgers[i-1]}) + requireChangesAreEqual(t, expectedChanges, changes) + } +} diff --git a/services/horizon/internal/integration/testdata/load-test-accounts.xdr.zstd b/services/horizon/internal/integration/testdata/load-test-accounts.xdr.zstd new file mode 100644 index 0000000000..7da222c8f9 Binary files /dev/null and b/services/horizon/internal/integration/testdata/load-test-accounts.xdr.zstd differ diff --git a/services/horizon/internal/integration/testdata/load-test-ledgers.xdr.zstd b/services/horizon/internal/integration/testdata/load-test-ledgers.xdr.zstd new file mode 100644 index 0000000000..008a8d698d Binary files /dev/null and b/services/horizon/internal/integration/testdata/load-test-ledgers.xdr.zstd differ diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index 99aa8af719..a6c8bf98ef 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -649,7 +649,7 @@ func (i *Test) setupHorizonClient(webArgs map[string]string) { } } -func (i *Test) CreateCaptiveCoreConfig() (*ledgerbackend.CaptiveCoreConfig, error) { +func (i *Test) CreateCaptiveCoreConfig() (ledgerbackend.CaptiveCoreConfig, error) { captiveCoreConfig := ledgerbackend.CaptiveCoreConfig{ BinaryPath: i.CoreBinaryPath(), HistoryArchiveURLs: []string{HistoryArchiveUrl}, @@ -667,11 +667,11 @@ func (i *Test) CreateCaptiveCoreConfig() (*ledgerbackend.CaptiveCoreConfig, erro toml, err := ledgerbackend.NewCaptiveCoreTomlFromFile(i.coreConfig.configPath, tomlParams) if err != nil { - return nil, err + return ledgerbackend.CaptiveCoreConfig{}, err } captiveCoreConfig.Toml = toml - return &captiveCoreConfig, nil + return captiveCoreConfig, nil } const maxWaitForCoreStartup = 30 * time.Second diff --git a/xdr/equals.go b/xdr/equals.go new file mode 100644 index 0000000000..4645242702 --- /dev/null +++ b/xdr/equals.go @@ -0,0 +1,18 @@ +package xdr + +import ( + "bytes" + "encoding" +) + +func Equals(a, b encoding.BinaryMarshaler) (bool, error) { + serialized, err := a.MarshalBinary() + if err != nil { + return false, err + } + otherSerialized, err := b.MarshalBinary() + if err != nil { + return false, err + } + return bytes.Equal(serialized, otherSerialized), nil +} diff --git a/historyarchive/xdrstream.go b/xdr/xdrstream.go similarity index 56% rename from historyarchive/xdrstream.go rename to xdr/xdrstream.go index 313c600f8b..2903950e35 100644 --- a/historyarchive/xdrstream.go +++ b/xdr/xdrstream.go @@ -2,7 +2,7 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 -package historyarchive +package xdr import ( "bufio" @@ -15,20 +15,20 @@ import ( "io" "io/ioutil" + "github.com/klauspost/compress/zstd" + "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" ) -type XdrStream struct { - buf bytes.Buffer - gzipReader *countReader - rdr *countReader - rdr2 io.ReadCloser - sha256Hash hash.Hash +type Stream struct { + buf bytes.Buffer + compressedReader *countReader + reader *countReader + sha256Hash hash.Hash validateHash bool expectedHash [sha256.Size]byte - xdrDecoder *xdr.BytesDecoder + xdrDecoder *BytesDecoder } type countReader struct { @@ -48,70 +48,91 @@ func newCountReader(r io.ReadCloser) *countReader { } } -func NewXdrStream(in io.ReadCloser) *XdrStream { +func NewStream(in io.ReadCloser) *Stream { // We write all we read from in to sha256Hash that can be later // compared with `expectedHash` using SetExpectedHash and Close. sha256Hash := sha256.New() teeReader := io.TeeReader(in, sha256Hash) - return &XdrStream{ - rdr: newCountReader( + return &Stream{ + reader: newCountReader( struct { io.Reader io.Closer }{bufio.NewReader(teeReader), in}, ), sha256Hash: sha256Hash, - xdrDecoder: xdr.NewBytesDecoder(), + xdrDecoder: NewBytesDecoder(), } } -func NewXdrGzStream(in io.ReadCloser) (*XdrStream, error) { +func newCompressedXdrStream(in io.ReadCloser, decompressor func(r io.Reader) (io.ReadCloser, error)) (*Stream, error) { gzipCountReader := newCountReader(in) - rdr, err := gzip.NewReader(bufReadCloser(gzipCountReader)) + rdr, err := decompressor(bufio.NewReader(gzipCountReader)) if err != nil { in.Close() return nil, err } - stream := NewXdrStream(rdr) - stream.rdr2 = in - stream.gzipReader = gzipCountReader + stream := NewStream(rdr) + stream.compressedReader = gzipCountReader return stream, nil } +func NewGzStream(in io.ReadCloser) (*Stream, error) { + return newCompressedXdrStream(in, func(r io.Reader) (io.ReadCloser, error) { + return gzip.NewReader(r) + }) +} + +type zstdReader struct { + *zstd.Decoder +} + +func (z zstdReader) Close() error { + z.Decoder.Close() + return nil +} + +func NewZstdStream(in io.ReadCloser) (*Stream, error) { + return newCompressedXdrStream(in, func(r io.Reader) (io.ReadCloser, error) { + decoder, err := zstd.NewReader(r) + return zstdReader{decoder}, err + }) +} + func HashXdr(x interface{}) (Hash, error) { var msg bytes.Buffer - _, err := xdr.Marshal(&msg, x) + _, err := Marshal(&msg, x) if err != nil { var zero Hash return zero, err } - return Hash(sha256.Sum256(msg.Bytes())), nil + return sha256.Sum256(msg.Bytes()), nil } // SetExpectedHash sets expected hash that will be checked in Close(). // This (obviously) needs to be set before Close() is called. -func (x *XdrStream) SetExpectedHash(hash [sha256.Size]byte) { +func (x *Stream) SetExpectedHash(hash [sha256.Size]byte) { x.validateHash = true x.expectedHash = hash } // ExpectedHash returns the expected hash and a boolean indicating if the // expected hash was set -func (x *XdrStream) ExpectedHash() ([sha256.Size]byte, bool) { +func (x *Stream) ExpectedHash() ([sha256.Size]byte, bool) { return x.expectedHash, x.validateHash } // Close closes all internal readers and checks if the expected hash // (if set by SetExpectedHash) matches the actual hash of the stream. -func (x *XdrStream) Close() error { +func (x *Stream) Close() error { if x.validateHash { - // Read all remaining data from rdr - _, err := io.Copy(io.Discard, x.rdr) + // Read all remaining data from reader + _, err := io.Copy(io.Discard, x.reader) if err != nil { // close the internal readers to avoid memory leaks x.closeReaders() - return errors.Wrap(err, "Error reading remaining bytes from rdr") + return errors.Wrap(err, "Error reading remaining bytes from reader") } actualHash := x.sha256Hash.Sum([]byte{}) @@ -126,17 +147,17 @@ func (x *XdrStream) Close() error { return x.closeReaders() } -func (x *XdrStream) closeReaders() error { +func (x *Stream) closeReaders() error { var err error - if x.rdr != nil { - if err2 := x.rdr.Close(); err2 != nil { + if x.reader != nil { + if err2 := x.reader.Close(); err2 != nil { err = err2 } } - if x.gzipReader != nil { - if err2 := x.gzipReader.Close(); err2 != nil { + if x.compressedReader != nil { + if err2 := x.compressedReader.Close(); err2 != nil { err = err2 } } @@ -144,11 +165,11 @@ func (x *XdrStream) closeReaders() error { return err } -func (x *XdrStream) ReadOne(in xdr.DecoderFrom) error { +func (x *Stream) ReadOne(in DecoderFrom) error { var nbytes uint32 - err := binary.Read(x.rdr, binary.BigEndian, &nbytes) + err := binary.Read(x.reader, binary.BigEndian, &nbytes) if err != nil { - x.rdr.Close() + x.reader.Close() if err == io.EOF { // Do not wrap io.EOF return err @@ -158,23 +179,23 @@ func (x *XdrStream) ReadOne(in xdr.DecoderFrom) error { nbytes &= 0x7fffffff x.buf.Reset() if nbytes == 0 { - x.rdr.Close() + x.reader.Close() return io.EOF } x.buf.Grow(int(nbytes)) - read, err := x.buf.ReadFrom(io.LimitReader(x.rdr, int64(nbytes))) + read, err := x.buf.ReadFrom(io.LimitReader(x.reader, int64(nbytes))) if err != nil { - x.rdr.Close() + x.reader.Close() return err } if read != int64(nbytes) { - x.rdr.Close() + x.reader.Close() return errors.New("Read wrong number of bytes from XDR") } readi, err := x.xdrDecoder.DecodeBytes(in, x.buf.Bytes()) if err != nil { - x.rdr.Close() + x.reader.Close() return err } if int64(readi) != int64(nbytes) { @@ -185,32 +206,32 @@ func (x *XdrStream) ReadOne(in xdr.DecoderFrom) error { } // BytesRead returns the number of bytes read in the stream -func (x *XdrStream) BytesRead() int64 { - return x.rdr.bytesRead +func (x *Stream) BytesRead() int64 { + return x.reader.bytesRead } -// GzipBytesRead returns the number of gzip bytes read in the stream. -// Returns -1 if underlying reader is not gzipped. -func (x *XdrStream) GzipBytesRead() int64 { - if x.gzipReader == nil { +// CompressedBytesRead returns the number of compressed bytes read in the stream. +// Returns -1 if underlying reader is not compressed. +func (x *Stream) CompressedBytesRead() int64 { + if x.compressedReader == nil { return -1 } - return x.gzipReader.bytesRead + return x.compressedReader.bytesRead } // Discard removes n bytes from the stream -func (x *XdrStream) Discard(n int64) (int64, error) { - return io.CopyN(ioutil.Discard, x.rdr, n) +func (x *Stream) Discard(n int64) (int64, error) { + return io.CopyN(ioutil.Discard, x.reader, n) } -func CreateXdrStream(entries ...xdr.BucketEntry) *XdrStream { +func CreateXdrStream(entries ...BucketEntry) *Stream { b := &bytes.Buffer{} for _, e := range entries { - err := xdr.MarshalFramed(b, e) + err := MarshalFramed(b, e) if err != nil { panic(err) } } - return NewXdrStream(ioutil.NopCloser(b)) + return NewStream(ioutil.NopCloser(b)) } diff --git a/historyarchive/xdrstream_test.go b/xdr/xdrstream_test.go similarity index 64% rename from historyarchive/xdrstream_test.go rename to xdr/xdrstream_test.go index 76323edb34..2622e53ab4 100644 --- a/historyarchive/xdrstream_test.go +++ b/xdr/xdrstream_test.go @@ -2,7 +2,7 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 -package historyarchive +package xdr import ( "bytes" @@ -10,20 +10,19 @@ import ( "io" "testing" - "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestXdrStreamHash(t *testing.T) { - bucketEntry := xdr.BucketEntry{ - Type: xdr.BucketEntryTypeLiveentry, - LiveEntry: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeAccount, - Account: &xdr.AccountEntry{ - AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - Balance: xdr.Int64(200000000), + bucketEntry := BucketEntry{ + Type: BucketEntryTypeLiveentry, + LiveEntry: &LedgerEntry{ + Data: LedgerEntryData{ + Type: LedgerEntryTypeAccount, + Account: &AccountEntry{ + AccountId: MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: Int64(200000000), }, }, }, @@ -34,13 +33,13 @@ func TestXdrStreamHash(t *testing.T) { // - uint32 representing the number of bytes of a structure, // - xdr-encoded `BucketEntry` above. b := &bytes.Buffer{} - err := xdr.MarshalFramed(b, bucketEntry) + err := MarshalFramed(b, bucketEntry) require.NoError(t, err) expectedHash := sha256.Sum256(b.Bytes()) stream.SetExpectedHash(expectedHash) - var readBucketEntry xdr.BucketEntry + var readBucketEntry BucketEntry err = stream.ReadOne(&readBucketEntry) require.NoError(t, err) assert.Equal(t, bucketEntry, readBucketEntry) @@ -54,26 +53,26 @@ func TestXdrStreamHash(t *testing.T) { } func TestXdrStreamDiscard(t *testing.T) { - firstEntry := xdr.BucketEntry{ - Type: xdr.BucketEntryTypeLiveentry, - LiveEntry: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeAccount, - Account: &xdr.AccountEntry{ - AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - Balance: xdr.Int64(200000000), + firstEntry := BucketEntry{ + Type: BucketEntryTypeLiveentry, + LiveEntry: &LedgerEntry{ + Data: LedgerEntryData{ + Type: LedgerEntryTypeAccount, + Account: &AccountEntry{ + AccountId: MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: Int64(200000000), }, }, }, } - secondEntry := xdr.BucketEntry{ - Type: xdr.BucketEntryTypeLiveentry, - LiveEntry: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeAccount, - Account: &xdr.AccountEntry{ - AccountId: xdr.MustAddress("GC23QF2HUE52AMXUFUH3AYJAXXGXXV2VHXYYR6EYXETPKDXZSAW67XO4"), - Balance: xdr.Int64(100000000), + secondEntry := BucketEntry{ + Type: BucketEntryTypeLiveentry, + LiveEntry: &LedgerEntry{ + Data: LedgerEntryData{ + Type: LedgerEntryTypeAccount, + Account: &AccountEntry{ + AccountId: MustAddress("GC23QF2HUE52AMXUFUH3AYJAXXGXXV2VHXYYR6EYXETPKDXZSAW67XO4"), + Balance: Int64(100000000), }, }, }, @@ -81,15 +80,15 @@ func TestXdrStreamDiscard(t *testing.T) { fullStream := CreateXdrStream(firstEntry, secondEntry) b := &bytes.Buffer{} - require.NoError(t, xdr.MarshalFramed(b, firstEntry)) - require.NoError(t, xdr.MarshalFramed(b, secondEntry)) + require.NoError(t, MarshalFramed(b, firstEntry)) + require.NoError(t, MarshalFramed(b, secondEntry)) expectedHash := sha256.Sum256(b.Bytes()) fullStream.SetExpectedHash(expectedHash) discardStream := CreateXdrStream(firstEntry, secondEntry) discardStream.SetExpectedHash(expectedHash) - var readBucketEntry xdr.BucketEntry + var readBucketEntry BucketEntry require.NoError(t, fullStream.ReadOne(&readBucketEntry)) assert.Equal(t, firstEntry, readBucketEntry)