Skip to content

Commit

Permalink
Add progress of restore in eth_syncing (0xPolygon#306)
Browse files Browse the repository at this point in the history
* Add initial logic for the sync progression

* Add backup command

* Export the JSON response correctly

* Add import chain function

* Remove debug code

* Fix json tag in BackupResult

* Add comments for blockStream

* Properly close the listener channels, add simple test for the progression

* Remove mock event channel limits

* Add Initialize method in consensus

* Add restore command

* Add cancel by ctrl + c in restore command

* Remove import flag from server command

Remove import flag from server command

* Simplify fetchAndSaveBackup

* Remove restore command and add --restore flag in server

* Revert setupConsensus method

* Add comment for backup/restore function

* Revert the auto generated codes that didn't need to update

* Revert the auto generated codes that didn't need to update

* Create helper/progress package

* Add progression for restore

* Add missing error handling in backup.go

* Add missing error handling in backup.go

Display log for file closing error

* Fix error message

* Remove redundant statement

* Remove unused codes

* Add comments

* Improve codes based review and add comments for go-doc

* Fix tests

* Rename targetFrom and targetTo

* Fix build error

* Add Test_consumeCommonBlocks

* Remove unused field

* Fix wrong SystemService name

* Fix block number for current progress in ProgressionWrapper

* Change chunk size from num blocks to blocks size

* Add unit test for Test_importBlocks

* Add error check in system service

* Add error check in CreateBackup

* Use WriteBlock instead of WriteBlocks in restore

* Fix lint error

* Fix faild test

* Fix build error

* Fix error handling in CreateBackup

* Remove batch writing blocks in restore due the interface change of blockchain

* Fix lint error

* Fix lint error

Co-authored-by: Milos Zivkovic <[email protected]>
  • Loading branch information
Kourin1996 and zivkovicmilos authored Jan 21, 2022
1 parent 44b9d7f commit 75acfdb
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 164 deletions.
6 changes: 4 additions & 2 deletions archive/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ func determineTo(ctx context.Context, clt proto.SystemClient, to *uint64) (uint6
}
}
}

// otherwise use latest block number as to
return uint64(status.Current.Number), types.StringToHash(status.Current.Hash), nil
}

// writeMetadata writes the latest block height and the block hash to the writer
func writeMetadata(writer io.Writer, logger hclog.Logger, to uint64, toHash types.Hash) error {
metadata := Metadata{
Latest: to,
Expand Down Expand Up @@ -153,9 +155,9 @@ func processExportStream(
getResult := func() (*uint64, *uint64, error) {
if from == nil || to == nil {
return nil, nil, errors.New("couldn't get any blocks")
} else {
return from, to, nil
}

return from, to, nil
}

var total uint64
Expand Down
21 changes: 17 additions & 4 deletions archive/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,34 @@ import (
"math/big"
"os"

"github.com/0xPolygon/polygon-edge/blockchain"
"github.com/0xPolygon/polygon-edge/helper/common"
"github.com/0xPolygon/polygon-edge/helper/progress"
"github.com/0xPolygon/polygon-edge/types"
)

type blockchainInterface interface {
SubscribeEvents() blockchain.Subscription
Genesis() types.Hash
GetBlockByNumber(uint64, bool) (*types.Block, bool)
GetHashByNumber(uint64) types.Hash
WriteBlock(*types.Block) error
}

// RestoreChain loads blockchain archive from file and write blocks to the chain
func RestoreChain(chain blockchainInterface, filePath string) error {
// RestoreChain reads blocks from the archive and write to the chain
func RestoreChain(chain blockchainInterface, filePath string, progression *progress.ProgressionWrapper) error {
fp, err := os.Open(filePath)
if err != nil {
return err
}

blockStream := newBlockStream(fp)

return importBlocks(chain, blockStream)
return importBlocks(chain, blockStream, progression)
}

// import blocks scans all blocks from stream and write them to chain
func importBlocks(chain blockchainInterface, blockStream *blockStream) error {
func importBlocks(chain blockchainInterface, blockStream *blockStream, progression *progress.ProgressionWrapper) error {
shutdownCh := common.GetTerminationSignalCh()

metadata, err := blockStream.getMetadata()
Expand Down Expand Up @@ -59,13 +62,23 @@ func importBlocks(chain blockchainInterface, blockStream *blockStream) error {
return nil
}

// Create a blockchain subscription for the sync progression and start tracking
progression.StartProgression(firstBlock.Number(), chain.SubscribeEvents())
// Stop monitoring the sync progression upon exit
defer progression.StopProgression()

// Set the goal
progression.UpdateHighestProgression(metadata.Latest)

nextBlock := firstBlock

for {
if err := chain.WriteBlock(nextBlock); err != nil {
return err
}

progression.UpdateCurrentProgression(nextBlock.Number())

nextBlock, err = blockStream.nextBlock()
if err != nil {
return err
Expand Down
10 changes: 9 additions & 1 deletion archive/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"os"
"testing"

"github.com/0xPolygon/polygon-edge/blockchain"
"github.com/0xPolygon/polygon-edge/helper/progress"
"github.com/0xPolygon/polygon-edge/protocol"
"github.com/0xPolygon/polygon-edge/types"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -53,6 +56,10 @@ func (m *mockChain) WriteBlock(block *types.Block) error {
return nil
}

func (m *mockChain) SubscribeEvents() blockchain.Subscription {
return protocol.NewMockSubscription()
}

func getLatestBlockFromMockChain(m *mockChain) *types.Block {
if l := len(m.blocks); l != 0 {
return m.blocks[l-1]
Expand Down Expand Up @@ -103,8 +110,9 @@ func Test_importBlocks(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
progression := progress.NewProgressionWrapper(progress.ChainSyncRestore)
blockStream := newTestBlockStream(tt.metadata, tt.archiveBlocks...)
err := importBlocks(tt.chain, blockStream)
err := importBlocks(tt.chain, blockStream, progression)

assert.Equal(t, tt.err, err)
latestBlock := getLatestBlockFromMockChain(tt.chain)
Expand Down
5 changes: 5 additions & 0 deletions archive/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ type Metadata struct {
LatestHash types.Hash
}

// MarshalRLP returns RLP encoded bytes
func (m *Metadata) MarshalRLP() []byte {
return m.MarshalRLPTo(nil)
}

// MarshalRLPTo sets RLP encoded bytes to given byte slice
func (m *Metadata) MarshalRLPTo(dst []byte) []byte {
return types.MarshalRLPTo(m.MarshalRLPWith, dst)
}

// MarshalRLPWith appends own field into arena for encode
func (m *Metadata) MarshalRLPWith(arena *fastrlp.Arena) *fastrlp.Value {
vv := arena.NewArray()

Expand All @@ -30,10 +33,12 @@ func (m *Metadata) MarshalRLPWith(arena *fastrlp.Arena) *fastrlp.Value {
return vv
}

// UnmarshalRLP unmarshals and sets the fields from RLP encoded bytes
func (m *Metadata) UnmarshalRLP(input []byte) error {
return types.UnmarshalRlp(m.UnmarshalRLPFrom, input)
}

// UnmarshalRLPFrom sets the fields from parsed RLP encoded value
func (m *Metadata) UnmarshalRLPFrom(p *fastrlp.Parser, v *fastrlp.Value) error {
elems, err := v.GetElems()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

"github.com/0xPolygon/polygon-edge/blockchain"
"github.com/0xPolygon/polygon-edge/chain"
"github.com/0xPolygon/polygon-edge/helper/progress"
"github.com/0xPolygon/polygon-edge/network"
"github.com/0xPolygon/polygon-edge/protocol"
"github.com/0xPolygon/polygon-edge/secrets"
"github.com/0xPolygon/polygon-edge/state"
"github.com/0xPolygon/polygon-edge/txpool"
Expand All @@ -26,7 +26,7 @@ type Consensus interface {
GetBlockCreator(header *types.Header) (types.Address, error)

// GetSyncProgression retrieves the current sync progression, if any
GetSyncProgression() *protocol.Progression
GetSyncProgression() *progress.Progression

// Initialize initializes the consensus (e.g. setup data)
Initialize() error
Expand Down
5 changes: 2 additions & 3 deletions consensus/dev/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
"fmt"
"time"

"github.com/0xPolygon/polygon-edge/protocol"

"github.com/0xPolygon/polygon-edge/blockchain"
"github.com/0xPolygon/polygon-edge/consensus"
"github.com/0xPolygon/polygon-edge/helper/progress"
"github.com/0xPolygon/polygon-edge/state"
"github.com/0xPolygon/polygon-edge/txpool"
"github.com/0xPolygon/polygon-edge/types"
Expand Down Expand Up @@ -219,7 +218,7 @@ func (d *Dev) GetBlockCreator(header *types.Header) (types.Address, error) {
return header.Miner, nil
}

func (d *Dev) GetSyncProgression() *protocol.Progression {
func (d *Dev) GetSyncProgression() *progress.Progression {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/dummy/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package dummy
import (
"github.com/0xPolygon/polygon-edge/blockchain"
"github.com/0xPolygon/polygon-edge/consensus"
"github.com/0xPolygon/polygon-edge/protocol"
"github.com/0xPolygon/polygon-edge/helper/progress"
"github.com/0xPolygon/polygon-edge/state"
"github.com/0xPolygon/polygon-edge/txpool"
"github.com/0xPolygon/polygon-edge/types"
Expand Down Expand Up @@ -56,7 +56,7 @@ func (d *Dummy) GetBlockCreator(header *types.Header) (types.Address, error) {
return header.Miner, nil
}

func (d *Dummy) GetSyncProgression() *protocol.Progression {
func (d *Dummy) GetSyncProgression() *progress.Progression {
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions consensus/ibft/ibft.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/0xPolygon/polygon-edge/consensus/ibft/proto"
"github.com/0xPolygon/polygon-edge/crypto"
"github.com/0xPolygon/polygon-edge/helper/hex"
"github.com/0xPolygon/polygon-edge/helper/progress"
"github.com/0xPolygon/polygon-edge/network"
"github.com/0xPolygon/polygon-edge/protocol"
"github.com/0xPolygon/polygon-edge/secrets"
Expand Down Expand Up @@ -52,7 +53,7 @@ type syncerInterface interface {
BestPeer() *protocol.SyncPeer
BulkSyncWithPeer(p *protocol.SyncPeer, newBlockHandler func(block *types.Block)) error
WatchSyncWithPeer(p *protocol.SyncPeer, newBlockHandler func(b *types.Block) bool)
GetSyncProgression() *protocol.Progression
GetSyncProgression() *progress.Progression
Broadcast(b *types.Block)
}

Expand Down Expand Up @@ -305,7 +306,7 @@ func (i *Ibft) Start() error {
}

// GetSyncProgression gets the latest sync progression, if any
func (i *Ibft) GetSyncProgression() *protocol.Progression {
func (i *Ibft) GetSyncProgression() *progress.Progression {
return i.syncer.GetSyncProgression()
}

Expand Down
3 changes: 2 additions & 1 deletion consensus/ibft/ibft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0xPolygon/polygon-edge/consensus"
"github.com/0xPolygon/polygon-edge/consensus/ibft/proto"
"github.com/0xPolygon/polygon-edge/helper/hex"
"github.com/0xPolygon/polygon-edge/helper/progress"
"github.com/0xPolygon/polygon-edge/protocol"
"github.com/0xPolygon/polygon-edge/state"
"github.com/0xPolygon/polygon-edge/types"
Expand Down Expand Up @@ -692,7 +693,7 @@ func (s *mockSyncer) WatchSyncWithPeer(p *protocol.SyncPeer, handler func(b *typ
}
}

func (s *mockSyncer) GetSyncProgression() *protocol.Progression {
func (s *mockSyncer) GetSyncProgression() *progress.Progression {
return nil
}

Expand Down
129 changes: 129 additions & 0 deletions helper/progress/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package progress

import (
"sync"

"github.com/0xPolygon/polygon-edge/blockchain"
)

type ChainSyncType string

const (
ChainSyncRestore ChainSyncType = "restore"
ChainSyncBulk ChainSyncType = "bulk-sync"
)

// Progression defines the status of the sync
// progression of the node
type Progression struct {
// SyncType is indicating the sync method
SyncType ChainSyncType

// StartingBlock is the initial block that the node is starting
// the sync from. It is reset after every sync batch
StartingBlock uint64

// CurrentBlock is the last written block from the sync batch
CurrentBlock uint64

// HighestBlock is the target block in the sync batch
HighestBlock uint64
}

type ProgressionWrapper struct {
// progression is a reference to the ongoing batch sync.
// Nil if no batch sync is currently in progress
progression *Progression

// stopCh is the channel for receiving stop signals
// in progression tracking
stopCh chan struct{}

lock sync.RWMutex

syncType ChainSyncType
}

func NewProgressionWrapper(syncType ChainSyncType) *ProgressionWrapper {
return &ProgressionWrapper{
progression: nil,
stopCh: make(chan struct{}),
syncType: syncType,
}
}

// startProgression initializes the progression tracking
func (pw *ProgressionWrapper) StartProgression(
startingBlock uint64,
subscription blockchain.Subscription,
) {
pw.lock.Lock()
defer pw.lock.Unlock()

pw.progression = &Progression{
SyncType: pw.syncType,
StartingBlock: startingBlock,
}

go pw.RunUpdateLoop(subscription)
}

// runUpdateLoop starts the blockchain event monitoring loop and
// updates the currently written block in the batch sync
func (pw *ProgressionWrapper) RunUpdateLoop(subscription blockchain.Subscription) {
eventCh := subscription.GetEventCh()

for {
select {
case event := <-eventCh:
if event.Type == blockchain.EventFork {
continue
}

if len(event.NewChain) == 0 {
continue
}

lastBlock := event.NewChain[len(event.NewChain)-1]
pw.UpdateCurrentProgression(lastBlock.Number)
case <-pw.stopCh:
subscription.Close()

return
}
}
}

// StopProgression stops the progression tracking
func (pw *ProgressionWrapper) StopProgression() {
pw.stopCh <- struct{}{}

pw.lock.Lock()
defer pw.lock.Unlock()

pw.progression = nil
}

// UpdateCurrentProgression sets the currently written block in the bulk sync
func (pw *ProgressionWrapper) UpdateCurrentProgression(currentBlock uint64) {
pw.lock.Lock()
defer pw.lock.Unlock()

pw.progression.CurrentBlock = currentBlock
}

// UpdateHighestProgression sets the highest-known target block in the bulk sync
func (pw *ProgressionWrapper) UpdateHighestProgression(highestBlock uint64) {
pw.lock.Lock()
defer pw.lock.Unlock()

pw.progression.HighestBlock = highestBlock
}

// GetProgression returns the latest sync progression
func (pw *ProgressionWrapper) GetProgression() *Progression {
pw.lock.RLock()
defer pw.lock.RUnlock()

return pw.progression
}
Loading

0 comments on commit 75acfdb

Please sign in to comment.