Skip to content

Commit

Permalink
LogPoller replay & USDC API fix cherry picks (#679)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara authored Apr 3, 2024
2 parents 3e85789 + 9ed19c6 commit 1f800bd
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 17 deletions.
5 changes: 5 additions & 0 deletions .changeset/light-suns-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ccip": patch
---

Do not read res if http errors
5 changes: 5 additions & 0 deletions .changeset/small-beers-perform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Making LogPoller's replay more robust by backfilling up to finalized block and processing rest in the main loop
43 changes: 42 additions & 1 deletion core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,13 @@ func (lp *logPoller) Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQ
// If ctx is cancelled before the replay request has been initiated, ErrReplayRequestAborted is returned. If the replay
// is already in progress, the replay will continue and ErrReplayInProgress will be returned. If the client needs a
// guarantee that the replay is complete before proceeding, it should either avoid cancelling or retry until nil is returned
func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error {
func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) {
defer func() {
if errors.Is(err, context.Canceled) {
err = ErrReplayRequestAborted
}
}()

lp.lggr.Debugf("Replaying from block %d", fromBlock)
latest, err := lp.ec.HeadByNumber(ctx, nil)
if err != nil {
Expand All @@ -366,6 +372,27 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error {
if fromBlock < 1 || fromBlock > latest.Number {
return errors.Errorf("Invalid replay block number %v, acceptable range [1, %v]", fromBlock, latest.Number)
}

// Backfill all logs up to the latest saved finalized block outside the LogPoller's main loop.
// This is safe, because chain cannot be rewinded deeper than that, so there must not be any race conditions.
savedFinalizedBlockNumber, err := lp.savedFinalizedBlockNumber(ctx)
if err != nil {
return err
}
if fromBlock <= savedFinalizedBlockNumber {
err = lp.backfill(ctx, fromBlock, savedFinalizedBlockNumber)
if err != nil {
return err
}
}

// Poll everything after latest finalized block in main loop to avoid concurrent writes during reorg
// We assume that number of logs between saved finalized block and current head is small enough to be processed in main loop
fromBlock = mathutil.Max(fromBlock, savedFinalizedBlockNumber+1)
// Don't continue if latest block number is the same as saved finalized block number
if fromBlock > latest.Number {
return nil
}
// Block until replay notification accepted or cancelled.
select {
case lp.replayStart <- fromBlock:
Expand All @@ -384,6 +411,20 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error {
}
}

// savedFinalizedBlockNumber returns the FinalizedBlockNumber saved with the last processed block in the db
// (latestFinalizedBlock at the time the last processed block was saved)
// If this is the first poll and no blocks are in the db, it returns 0
func (lp *logPoller) savedFinalizedBlockNumber(ctx context.Context) (int64, error) {
latestProcessed, err := lp.LatestBlock(pg.WithParentCtx(ctx))
if err == nil {
return latestProcessed.FinalizedBlockNumber, nil
}
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, err
}

func (lp *logPoller) recvReplayComplete() {
err := <-lp.replayComplete
if err != nil {
Expand Down
49 changes: 37 additions & 12 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
Expand Down Expand Up @@ -241,6 +242,7 @@ func TestLogPoller_Replay(t *testing.T) {
chainID := testutils.FixtureChainID
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
ctx := testutils.Context(t)

head := evmtypes.Head{Number: 4}
events := []common.Hash{EmitterABI.Events["Log1"].ID}
Expand All @@ -256,7 +258,7 @@ func TestLogPoller_Replay(t *testing.T) {

ec := evmclimocks.NewClient(t)
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil)
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once()
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Twice()
ec.On("ConfiguredChainID").Return(chainID, nil)
lp := NewLogPoller(orm, ec, lggr, time.Hour, false, 3, 3, 3, 20)

Expand All @@ -265,12 +267,13 @@ func TestLogPoller_Replay(t *testing.T) {
latest, err := lp.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(4), latest.BlockNumber)
require.Equal(t, int64(1), latest.FinalizedBlockNumber)

t.Run("abort before replayStart received", func(t *testing.T) {
// Replay() should abort immediately if caller's context is cancelled before request signal is read
ctx, cancel := context.WithCancel(testutils.Context(t))
cancelCtx, cancel := context.WithCancel(testutils.Context(t))
cancel()
err = lp.Replay(ctx, 3)
err = lp.Replay(cancelCtx, 3)
assert.ErrorIs(t, err, ErrReplayRequestAborted)
})

Expand All @@ -285,12 +288,11 @@ func TestLogPoller_Replay(t *testing.T) {

// Replay() should return error code received from replayComplete
t.Run("returns error code on replay complete", func(t *testing.T) {
ctx := testutils.Context(t)
anyErr := errors.New("any error")
done := make(chan struct{})
go func() {
defer close(done)
recvStartReplay(ctx, 1)
recvStartReplay(ctx, 2)
lp.replayComplete <- anyErr
}()
assert.ErrorIs(t, lp.Replay(ctx, 1), anyErr)
Expand All @@ -299,14 +301,14 @@ func TestLogPoller_Replay(t *testing.T) {

// Replay() should return ErrReplayInProgress if caller's context is cancelled after replay has begun
t.Run("late abort returns ErrReplayInProgress", func(t *testing.T) {
ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second) // Intentionally abort replay after 1s
cancelCtx, cancel := context.WithTimeout(testutils.Context(t), time.Second) // Intentionally abort replay after 1s
done := make(chan struct{})
go func() {
defer close(done)
recvStartReplay(ctx, 4)
recvStartReplay(cancelCtx, 4)
cancel()
}()
assert.ErrorIs(t, lp.Replay(ctx, 4), ErrReplayInProgress)
assert.ErrorIs(t, lp.Replay(cancelCtx, 4), ErrReplayInProgress)
<-done
lp.replayComplete <- nil
lp.wg.Wait()
Expand All @@ -316,8 +318,6 @@ func TestLogPoller_Replay(t *testing.T) {
t.Run("client abort doesnt hang run loop", func(t *testing.T) {
lp.backupPollerNextBlock = 0

ctx := testutils.Context(t)

pass := make(chan struct{})
cancelled := make(chan struct{})

Expand Down Expand Up @@ -372,7 +372,6 @@ func TestLogPoller_Replay(t *testing.T) {
done := make(chan struct{})
defer func() { <-done }()

ctx := testutils.Context(t)
ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) {
go func() {
defer close(done)
Expand Down Expand Up @@ -405,7 +404,7 @@ func TestLogPoller_Replay(t *testing.T) {

lp.ReplayAsync(1)

recvStartReplay(testutils.Context(t), 1)
recvStartReplay(testutils.Context(t), 2)
})

t.Run("ReplayAsync error", func(t *testing.T) {
Expand All @@ -427,6 +426,32 @@ func TestLogPoller_Replay(t *testing.T) {
require.Equal(t, 1, observedLogs.Len())
assert.Equal(t, observedLogs.All()[0].Message, anyErr.Error())
})

t.Run("run regular replay when there are not blocks in db", func(t *testing.T) {
err := lp.orm.DeleteLogsAndBlocksAfter(0)
require.NoError(t, err)

lp.ReplayAsync(1)
recvStartReplay(testutils.Context(t), 1)
})

t.Run("run only backfill when everything is finalized", func(t *testing.T) {
err := lp.orm.DeleteLogsAndBlocksAfter(0)
require.NoError(t, err)

err = lp.orm.InsertLogsWithBlock([]Log{}, LogPollerBlock{
EvmChainId: ubig.New(chainID),
BlockHash: head.Hash,
BlockNumber: head.Number,
BlockTimestamp: head.Timestamp,
FinalizedBlockNumber: head.Number,
CreatedAt: time.Time{},
})
require.NoError(t, err)

err = lp.Replay(ctx, 1)
require.NoError(t, err)
})
}

func (lp *logPoller) reset() {
Expand Down
4 changes: 1 addition & 3 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,7 @@ func TestLogPoller_DBErrorHandling(t *testing.T) {
time.Sleep(100 * time.Millisecond)
require.NoError(t, lp.Start(ctx))
require.Eventually(t, func() bool {
return observedLogs.Len() >= 5
return observedLogs.Len() >= 1
}, 2*time.Second, 20*time.Millisecond)
lp.Close()

Expand All @@ -1401,8 +1401,6 @@ func TestLogPoller_DBErrorHandling(t *testing.T) {

assert.Contains(t, logMsgs, "SQL ERROR")
assert.Contains(t, logMsgs, "Failed loading filters in main logpoller loop, retrying later")
assert.Contains(t, logMsgs, "Error executing replay, could not get fromBlock")
assert.Contains(t, logMsgs, "Backup log poller ran before filters loaded, skipping")
}

type getLogErrData struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration)
return nil, http.StatusRequestTimeout, nil, tokendata.ErrTimeout
}
// On error, res is nil in most cases, do not read res.StatusCode, return BadRequest
return nil, http.StatusBadRequest, res.Header, err
return nil, http.StatusBadRequest, nil, err
}
defer res.Body.Close()

Expand Down

0 comments on commit 1f800bd

Please sign in to comment.