Skip to content

Commit

Permalink
fix(eth): check events are indexed within in requested range
Browse files Browse the repository at this point in the history
Complete work from #12728
  • Loading branch information
rvagg committed Jan 6, 2025
1 parent 4c32069 commit ef5b7b7
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 65 deletions.
145 changes: 90 additions & 55 deletions chain/index/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import (
"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"

"github.com/filecoin-project/lotus/chain/types"
)

var (
ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
ErrRangeInFuture = fmt.Errorf("range end is in the future")
ErrMaxResultsReached = xerrors.New("filter matches too many events, try a more restricted filter")
ErrRangeInFuture = xerrors.New("range end is in the future")
)

const maxLookBackForWait = 120 // one hour of tipsets
Expand Down Expand Up @@ -238,101 +239,135 @@ func loadExecutedMessages(ctx context.Context, cs ChainStore, recomputeTipSetSta
return ems, nil
}

// checkRangeIndexedStatus verifies if a range of heights is indexed.
// It checks for the existence of non-null rounds at the range boundaries.
func (si *SqliteIndexer) checkRangeIndexedStatus(ctx context.Context, f *EventFilter) error {
minHeight := f.MinHeight
maxHeight := f.MaxHeight
// checkFilterTipsetsIndexed verifies if a tipset, or a range of tipsets, specified by a given
// filter is indexed. It checks for the existence of non-null rounds at the range boundaries.
func (si *SqliteIndexer) checkFilterTipsetsIndexed(ctx context.Context, f *EventFilter) error {
// Three cases to consider:
// 1. Specific tipset is provided
// 2. Single tipset is specified by the height range (min=max)
// 3. Range of tipsets is specified by the height range (min!=max)
// We'll handle the first two cases here and the third case in checkRangeIndexedStatus

var tipsetKeyCid []byte
var err error

switch {
case f.TipsetCid != cid.Undef:
tipsetKeyCid = f.TipsetCid.Bytes()
case f.MinHeight >= 0 && f.MinHeight == f.MaxHeight:
tipsetKeyCid, err = si.getTipsetKeyCidByHeight(ctx, f.MinHeight)
if err != nil {
if err == ErrNotFound {
// this means that this is a null round and there exist no events for this epoch
return nil
}
return xerrors.Errorf("failed to get tipset key cid by height: %w", err)
}
default:
return si.checkRangeIndexedStatus(ctx, f.MinHeight, f.MaxHeight)
}

// If we couldn't determine a specific tipset, return ErrNotFound
if tipsetKeyCid == nil {
return ErrNotFound
}

// Check if the determined tipset is indexed
if exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid); err != nil {
return xerrors.Errorf("failed to check if tipset is indexed: %w", err)
} else if exists {
return nil // Tipset is indexed
}

return ErrNotFound // Tipset is not indexed
}

// checkRangeIndexedStatus verifies if a range of tipsets specified by the given height range is
// indexed. It checks for the existence of non-null rounds at the range boundaries.
func (si *SqliteIndexer) checkRangeIndexedStatus(ctx context.Context, minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch) error {
head := si.cs.GetHeaviestTipSet()
if minHeight > head.Height() || maxHeight > head.Height() {
return ErrRangeInFuture
}

// Find the first non-null round in the range
startCid, err := si.findFirstNonNullRound(ctx, &minHeight, maxHeight)
startCid, startHeight, err := si.findFirstNonNullRound(ctx, minHeight, maxHeight)
if err != nil {
return xerrors.Errorf("failed to find first non-null round: %w", err)
}

// If all rounds are null, consider the range valid
if startCid == nil {
return nil
}

// Find the last non-null round in the range
endCid, err := si.findLastNonNullRound(ctx, &maxHeight, minHeight)
endCid, endHeight, err := si.findLastNonNullRound(ctx, maxHeight, minHeight)
if err != nil {
if errors.Is(err, ErrRangeInFuture) {
return xerrors.Errorf("range end is in the future: %w", err)
}
return xerrors.Errorf("failed to find last non-null round: %w", err)
}

// If all rounds are null, consider the range valid
if endCid == nil {
return nil
return xerrors.Errorf("unexpected error finding last non-null round: all rounds are null but start round is not (%d to %d)", minHeight, maxHeight)
}

// Check indexing for start and end tipsets
if err := si.checkTipsetByKeyCid(ctx, startCid, minHeight); err != nil {
// Check indexing status for start and end tipsets
if err := si.checkTipsetIndexedStatus(ctx, startCid, startHeight); err != nil {
return err
}

if err := si.checkTipsetByKeyCid(ctx, endCid, maxHeight); err != nil {
if err := si.checkTipsetIndexedStatus(ctx, endCid, endHeight); err != nil {
return err
}
// Assume (not necessarily correctly, but likely) that all tipsets within the range are indexed

return nil
}

// checkTipsetByKeyCid checks if a tipset identified by its key CID is indexed.
func (si *SqliteIndexer) checkTipsetByKeyCid(ctx context.Context, tipsetKeyCid []byte, height abi.ChainEpoch) error {
func (si *SqliteIndexer) checkTipsetIndexedStatus(ctx context.Context, tipsetKeyCid []byte, height abi.ChainEpoch) error {
exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid)
if err != nil {
return xerrors.Errorf("failed to check if tipset at height %d is indexed: %w", height, err)
return xerrors.Errorf("failed to check if tipset at epoch %d is indexed: %w", height, err)
} else if exists {
return nil // has been indexed
}

if exists {
return nil // null round
}

return ErrNotFound // tipset is not indexed
return ErrNotFound
}

// findFirstNonNullRound finds the first non-null round starting from minHeight up to maxHeight
func (si *SqliteIndexer) findFirstNonNullRound(ctx context.Context, minHeight *abi.ChainEpoch, maxHeight abi.ChainEpoch) ([]byte, error) {
for height := *minHeight; height <= maxHeight; height++ {
// findFirstNonNullRound finds the first non-null round starting from minHeight up to maxHeight.
// It updates the minHeight to the found height and returns the tipset key CID.
func (si *SqliteIndexer) findFirstNonNullRound(ctx context.Context, minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch) ([]byte, abi.ChainEpoch, error) {
for height := minHeight; height <= maxHeight; height++ {
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err == nil {
*minHeight = height // Update the minHeight to the found height
return cid, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
if err != nil {
if !errors.Is(err, ErrNotFound) {
return nil, 0, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
// else null round, keep searching
continue
}
minHeight = height // Update the minHeight to the found height
return cid, minHeight, nil
}

return nil, nil
// All rounds are null
return nil, 0, nil
}

// findLastNonNullRound finds the last non-null round starting from maxHeight down to minHeight
func (si *SqliteIndexer) findLastNonNullRound(ctx context.Context, maxHeight *abi.ChainEpoch, minHeight abi.ChainEpoch) ([]byte, error) {
head := si.cs.GetHeaviestTipSet()
if head == nil || *maxHeight > head.Height() {
return nil, ErrRangeInFuture
}

for height := *maxHeight; height >= minHeight; height-- {
func (si *SqliteIndexer) findLastNonNullRound(ctx context.Context, maxHeight abi.ChainEpoch, minHeight abi.ChainEpoch) ([]byte, abi.ChainEpoch, error) {
for height := maxHeight; height >= minHeight; height-- {
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err == nil {
*maxHeight = height // Update the maxHeight to the found height
return cid, nil
maxHeight = height // Update the maxHeight to the found height
return cid, maxHeight, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
return nil, 0, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
}

return nil, nil
return nil, 0, nil
}

// getTipsetKeyCidByHeight retrieves the tipset key CID for a given height.
// getTipsetKeyCidByHeight retrieves the tipset key CID for a given height from the ChainStore
func (si *SqliteIndexer) getTipsetKeyCidByHeight(ctx context.Context, height abi.ChainEpoch) ([]byte, error) {
ts, err := si.cs.GetTipsetByHeight(ctx, height, nil, false)
if err != nil {
Expand Down Expand Up @@ -504,15 +539,15 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
if height > 0 {
head := si.cs.GetHeaviestTipSet()
if head == nil {
return nil, errors.New("failed to get head: head is nil")
return nil, xerrors.New("failed to get head: head is nil")
}
headHeight := head.Height()
maxLookBackHeight := headHeight - maxLookBackForWait

// if the height is old enough, we'll assume the index is caught up to it and not bother
// waiting for it to be indexed
if height <= maxLookBackHeight {
return nil, si.checkRangeIndexedStatus(ctx, f)
return nil, si.checkFilterTipsetsIndexed(ctx, f)
}
}

Expand All @@ -526,7 +561,7 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
}

if len(ces) == 0 {
return nil, si.checkRangeIndexedStatus(ctx, f)
return nil, si.checkFilterTipsetsIndexed(ctx, f)
}
}

Expand Down
39 changes: 29 additions & 10 deletions chain/index/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ func TestGetEventsForFilterNoEvents(t *testing.T) {
si, _, cs := setupWithHeadIndexed(t, headHeight, rng)
t.Cleanup(func() { _ = si.Close() })

// Create a fake tipset at height 1
fakeTipSet1 := fakeTipSet(t, rng, 1, nil)

// Set the dummy chainstore to return this tipset for height 1
cs.SetTipsetByHeightAndKey(1, fakeTipSet1.Key(), fakeTipSet1) // empty DB
cs.SetTipSetByCid(t, fakeTipSet1)
// Create a fake tipset at various heights used in the test
fakeTipsets := make(map[abi.ChainEpoch]*types.TipSet)
for _, ts := range []abi.ChainEpoch{1, 10, 20} {
fakeTipsets[ts] = fakeTipSet(t, rng, ts, nil)
cs.SetTipsetByHeightAndKey(ts, fakeTipsets[ts].Key(), fakeTipsets[ts])
cs.SetTipSetByCid(t, fakeTipsets[ts])
}

// tipset is not indexed
f := &EventFilter{
Expand All @@ -46,7 +47,7 @@ func TestGetEventsForFilterNoEvents(t *testing.T) {
require.True(t, errors.Is(err, ErrNotFound))
require.Equal(t, 0, len(ces))

tsCid, err := fakeTipSet1.Key().Cid()
tsCid, err := fakeTipsets[1].Key().Cid()
require.NoError(t, err)
f = &EventFilter{
TipsetCid: tsCid,
Expand All @@ -58,7 +59,7 @@ func TestGetEventsForFilterNoEvents(t *testing.T) {

// tipset is indexed but has no events
err = withTx(ctx, si.db, func(tx *sql.Tx) error {
return si.indexTipset(ctx, tx, fakeTipSet1)
return si.indexTipset(ctx, tx, fakeTipsets[1])
})
require.NoError(t, err)

Expand All @@ -73,13 +74,31 @@ func TestGetEventsForFilterNoEvents(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(ces))

// search for a range that is absent
// search for a range that is not indexed
f = &EventFilter{
MinHeight: 10,
MaxHeight: 20,
}
ces, err = si.GetEventsForFilter(ctx, f)
require.ErrorIs(t, err, ErrNotFound)
require.Equal(t, 0, len(ces))

// search for a range (end) that is in the future
f = &EventFilter{
MinHeight: 10,
MaxHeight: 200,
}
ces, err = si.GetEventsForFilter(ctx, f)
require.ErrorIs(t, err, ErrRangeInFuture)
require.Equal(t, 0, len(ces))

// search for a range (start too) that is in the future
f = &EventFilter{
MinHeight: 100,
MaxHeight: 200,
}
ces, err = si.GetEventsForFilter(ctx, f)
require.NoError(t, err)
require.ErrorIs(t, err, ErrRangeInFuture)
require.Equal(t, 0, len(ces))
}

Expand Down

0 comments on commit ef5b7b7

Please sign in to comment.