Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Add ledger backend which replays ledgers #5584

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 296 additions & 0 deletions ingest/replay_backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
package ingest

import (
"bytes"
"context"
"encoding"
"errors"
"fmt"
"io"
"os"
"time"

"github.com/klauspost/compress/zstd"

"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/xdr"
)

type ReplayBackend struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably call this LocalFileBackend or FsBackend to indicate it comes from a local file.

ledgerBackend ledgerbackend.LedgerBackend
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ledger backend has a strange role. It composes in a strange way with the actual backend, implementing only certain operations. Also in order for it to work, the passphrase of the passed backend need to be the same, so it's duplicated.

I wonder if we can simply mock some parameters so that passing the backend isn't necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ledger backend is necessary because we want to merge the synthetic ledgers with real ledgers in order to have a mix of soroban activity and stellar classic activity. We could do the merging process offline and supply a file of ledgers that already have been merged with real ledgers but I think there's an advantage to merging on the fly. If we merge on the fly we can ingest any ledger range at runtime. However, if we merge the ledgers offline then we need to predetermine the ledger range we will be ingesting in the benchmarks ahead of time.

mergedLedgers []xdr.LedgerCloseMeta
generatedLedgers []xdr.LedgerCloseMeta
generatedLedgerEntries []xdr.LedgerEntry
// ledgerCloseDuration is the time in between ledgers
ledgerCloseDuration time.Duration
startTime time.Time
startLedger uint32
networkPassphrase string
}

type ReplayBackendConfig struct {
NetworkPassphrase string
LedgersFilePath string
LedgerEntriesFilePath string
LedgerCloseDuration time.Duration
}

func unmarshallCompressedXDRFile(path string, dst any) error {
file, err := os.Open(path)
if err != nil {
return err
}
reader, err := zstd.NewReader(file)
if err != nil {
return err
}
if _, err = xdr.Unmarshal(reader, dst); err != nil {
return err
}
reader.Close()
if err = file.Close(); err != nil {
return err
}
return nil
}

func NewReplayBackend(config ReplayBackendConfig, ledgerBackend ledgerbackend.LedgerBackend) (*ReplayBackend, error) {
var generatedLedgers []xdr.LedgerCloseMeta
var generatedLedgerEntries []xdr.LedgerEntry

if err := unmarshallCompressedXDRFile(config.LedgerEntriesFilePath, &generatedLedgerEntries); err != nil {
return nil, err
}
if err := unmarshallCompressedXDRFile(config.LedgersFilePath, &generatedLedgers); err != nil {
return nil, err
}
Comment on lines +61 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be better to stream the decoding instead of doing it upfront. It allows for larger files not fitting memory and allows parelellizing the decompression and decoding in the future.

return &ReplayBackend{
ledgerBackend: ledgerBackend,
ledgerCloseDuration: config.LedgerCloseDuration,
generatedLedgers: generatedLedgers,
generatedLedgerEntries: generatedLedgerEntries,
networkPassphrase: config.NetworkPassphrase,
}, nil
}

func (r *ReplayBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return r.ledgerBackend.GetLatestLedgerSequence(ctx)
}

func (r *ReplayBackend) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error {
err := r.ledgerBackend.PrepareRange(ctx, ledgerRange)
if err != nil {
return err
}
cur := ledgerRange.From()
ledger, err := r.ledgerBackend.GetLedger(ctx, cur)
if err != nil {
return err
}
var changes xdr.LedgerEntryChanges
for i := 0; i < len(r.generatedLedgerEntries); i++ {
changes = append(changes, xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryCreated,
Created: &r.generatedLedgerEntries[i],
})
}
var flag xdr.Uint32 = 1
ledger.V1.UpgradesProcessing = append(ledger.V1.UpgradesProcessing, xdr.UpgradeEntryMeta{
tamirms marked this conversation as resolved.
Show resolved Hide resolved
Upgrade: xdr.LedgerUpgrade{
Type: xdr.LedgerUpgradeTypeLedgerUpgradeFlags,
NewFlags: &flag,
},
Changes: changes,
})
r.mergedLedgers = append(r.mergedLedgers, ledger)
end := ledgerRange.From() + uint32(len(r.generatedLedgers))
if ledgerRange.Bounded() && end > ledgerRange.To() {
end = ledgerRange.To()
}
for cur = cur + 1; cur <= end; cur++ {
ledger, err = r.ledgerBackend.GetLedger(ctx, cur)
if err != nil {
return err
}
if err = MergeLedgers(r.networkPassphrase, &ledger, r.generatedLedgers[0]); err != nil {
return err
}
r.mergedLedgers = append(r.mergedLedgers, ledger)
r.generatedLedgers = r.generatedLedgers[1:]
}
// from this point, ledgers will be available at a rate of once
// every r.ledgerCloseDuration time has elapsed
r.startTime = time.Now()
tamirms marked this conversation as resolved.
Show resolved Hide resolved
r.startLedger = ledgerRange.From()
return nil
}

func (r *ReplayBackend) IsPrepared(ctx context.Context, ledgerRange ledgerbackend.Range) (bool, error) {
return r.ledgerBackend.IsPrepared(ctx, ledgerRange)
}

func (r *ReplayBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) {
if r.startLedger == 0 {
return xdr.LedgerCloseMeta{}, fmt.Errorf("PrepareRange() must be called before GetLedger()")
}
if sequence < r.startLedger {
return xdr.LedgerCloseMeta{}, fmt.Errorf(
"sequence number %v is less than the lower bound of the prepared range: %v",
sequence,
r.startLedger,
)
}
i := int(sequence - r.startLedger)
if i >= len(r.mergedLedgers) {
tamirms marked this conversation as resolved.
Show resolved Hide resolved
return xdr.LedgerCloseMeta{}, fmt.Errorf(
"sequence number %v is greater than the latest ledger available",
sequence,
)
}
// the i'th ledger will only be available after (i+1) * r.ledgerCloseDuration time has elapsed
closeTime := r.startTime.Add(time.Duration(i+1) * r.ledgerCloseDuration)
time.Sleep(time.Until(closeTime))
return r.mergedLedgers[i], nil
}

func (r *ReplayBackend) Close() error {
return r.ledgerBackend.Close()
}

func validLedger(ledger xdr.LedgerCloseMeta) error {
if _, ok := ledger.GetV1(); !ok {
return fmt.Errorf("ledger version %v is not supported", ledger.V)
}
if _, ok := ledger.MustV1().TxSet.GetV1TxSet(); !ok {
return fmt.Errorf("ledger txset %v is not supported", ledger.MustV1().TxSet.V)
}
return nil
}

func extractChanges(networkPassphrase string, changeMap map[string][]Change, ledger xdr.LedgerCloseMeta) error {
reader, err := NewLedgerChangeReaderFromLedgerCloseMeta(networkPassphrase, ledger)
if err != nil {
return err
}
for {
var change Change
var ledgerKey xdr.LedgerKey
var b64 string
change, err = reader.Read()
if err == io.EOF {
break
} else if err != nil {
return err
}
ledgerKey, err = change.LedgerKey()
if err != nil {
return err
}
b64, err = ledgerKey.MarshalBinaryBase64()
if err != nil {
return err
}
changeMap[b64] = append(changeMap[b64], change)
}
return nil
}

func xdrEquals(a, b encoding.BinaryMarshaler) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this live in the XDR package?

serialized, err := a.MarshalBinary()
if err != nil {
return false, err
}
otherSerialized, err := b.MarshalBinary()
if err != nil {
return false, err
}
return bytes.Equal(serialized, otherSerialized), nil
}

func changeIsEqual(a, b Change) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this shud be moved from here to be a receiver function in ingest/change.go
Something like func (a Change)IsEqual(b Change)
I remember that I had some usecase for comparing changes when I was working on some change related issues, and I was needing such a compare function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function deliberately doesn't compare all the fields in the change struct so in that sense it is tailored specifically to the replay backend's needs. That's why I did not expose it as a public utility function.

The changeIsEqual function compares the xdr values of the pre and post but it does not compare the the transaction, operation, or ledger because it is used to determine that the changes from merging two input ledgers is equal to changes from the first of the input ledgers followed by the changes from the second input ledgers.

if a.Type != b.Type || a.Reason != b.Reason {
return false, nil
}
if a.Pre == nil {
if b.Pre != nil {
return false, nil
}
} else {
if ok, err := xdrEquals(a.Pre, b.Pre); err != nil || !ok {
return ok, err
}
}
if a.Post == nil {
if b.Post != nil {
return false, nil
}
} else {
if ok, err := xdrEquals(a.Post, b.Post); err != nil || !ok {
return ok, err
}
}
return true, nil
}

func changesAreEqual(a, b map[string][]Change) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this live Change is defined?

if len(a) != len(b) {
return false, nil
}
for key, aChanges := range a {
bChanges := b[key]
if len(aChanges) != len(bChanges) {
return false, nil
}
for i, aChange := range aChanges {
bChange := bChanges[i]
if ok, err := changeIsEqual(aChange, bChange); !ok || err != nil {
return ok, err
}
}
}
return true, nil
}

// MergeLedgers merges two xdr.LedgerCloseMeta instances.
func MergeLedgers(networkPassphrase string, dst *xdr.LedgerCloseMeta, src xdr.LedgerCloseMeta) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this live in the XDR package?

if err := validLedger(*dst); err != nil {
return err
}
if err := validLedger(src); err != nil {
return err
}

combinedChangesByKey := map[string][]Change{}
if err := extractChanges(networkPassphrase, combinedChangesByKey, *dst); err != nil {
return err
}
if err := extractChanges(networkPassphrase, combinedChangesByKey, src); err != nil {
return err
}

// src is merged into dst by appending all the transactions from src into dst,
// appending all the upgrades from src into dst, and appending all the evictions
// from src into dst
dst.V1.TxSet.V1TxSet.Phases = append(dst.V1.TxSet.V1TxSet.Phases, src.V1.TxSet.V1TxSet.Phases...)
dst.V1.TxProcessing = append(dst.V1.TxProcessing, src.V1.TxProcessing...)
dst.V1.UpgradesProcessing = append(dst.V1.UpgradesProcessing, src.V1.UpgradesProcessing...)
dst.V1.EvictedTemporaryLedgerKeys = append(dst.V1.EvictedTemporaryLedgerKeys, src.V1.EvictedTemporaryLedgerKeys...)
dst.V1.EvictedPersistentLedgerEntries = append(dst.V1.EvictedPersistentLedgerEntries, src.V1.EvictedPersistentLedgerEntries...)

mergedChangesByKey := map[string][]Change{}
if err := extractChanges(networkPassphrase, mergedChangesByKey, *dst); err != nil {
return err
}

// a merge is valid if the ordered list of changes emitted by the merged ledger is equal to
// the list of changes emitted by dst concatenated by the list of changes emitted by src, or
// in other words:
// extractChanges(merge(dst, src)) == concat(extractChanges(dst), extractChanges(src))
if ok, err := changesAreEqual(combinedChangesByKey, mergedChangesByKey); err != nil {
return err
} else if !ok {
return errors.New("order of changes are not preserved")
}

return nil
}
13 changes: 5 additions & 8 deletions services/horizon/internal/integration/change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
Expand Down Expand Up @@ -150,15 +151,10 @@ func getLedgers(itest *integration.Test, startingLedger uint32, endLedger uint32
t := itest.CurrentTest()

ccConfig, err := itest.CreateCaptiveCoreConfig()
if err != nil {
t.Fatalf("unable to create captive core config: %v", err)
}
require.NoError(t, err)

captiveCore, err := ledgerbackend.NewCaptive(*ccConfig)
if err != nil {
t.Fatalf("unable to create captive core: %v", err)
}
defer captiveCore.Close()
captiveCore, err := ledgerbackend.NewCaptive(ccConfig)
require.NoError(t, err)

ctx := context.Background()
err = captiveCore.PrepareRange(ctx, ledgerbackend.BoundedRange(startingLedger, endLedger))
Expand All @@ -175,6 +171,7 @@ func getLedgers(itest *integration.Test, startingLedger uint32, endLedger uint32
seqToLedgersMap[ledgerSeq] = ledger
}

require.NoError(t, captiveCore.Close())
return seqToLedgersMap
}

Expand Down
Loading
Loading