From 75acfdb7da3857aa4289136bf3a5a7c4fdf37b6d Mon Sep 17 00:00:00 2001 From: kourin Date: Fri, 21 Jan 2022 18:49:27 +0900 Subject: [PATCH] Add progress of restore in eth_syncing (#306) * Add initial logic for the sync progression * Add backup command * Export the JSON response correctly * Add import chain function * Remove debug code * Fix json tag in BackupResult * Add comments for blockStream * Properly close the listener channels, add simple test for the progression * Remove mock event channel limits * Add Initialize method in consensus * Add restore command * Add cancel by ctrl + c in restore command * Remove import flag from server command Remove import flag from server command * Simplify fetchAndSaveBackup * Remove restore command and add --restore flag in server * Revert setupConsensus method * Add comment for backup/restore function * Revert the auto generated codes that didn't need to update * Revert the auto generated codes that didn't need to update * Create helper/progress package * Add progression for restore * Add missing error handling in backup.go * Add missing error handling in backup.go Display log for file closing error * Fix error message * Remove redundant statement * Remove unused codes * Add comments * Improve codes based review and add comments for go-doc * Fix tests * Rename targetFrom and targetTo * Fix build error * Add Test_consumeCommonBlocks * Remove unused field * Fix wrong SystemService name * Fix block number for current progress in ProgressionWrapper * Change chunk size from num blocks to blocks size * Add unit test for Test_importBlocks * Add error check in system service * Add error check in CreateBackup * Use WriteBlock instead of WriteBlocks in restore * Fix lint error * Fix faild test * Fix build error * Fix error handling in CreateBackup * Remove batch writing blocks in restore due the interface change of blockchain * Fix lint error * Fix lint error Co-authored-by: Milos Zivkovic --- archive/backup.go | 6 +- archive/restore.go | 21 ++++-- archive/restore_test.go | 10 ++- archive/types.go | 5 ++ consensus/consensus.go | 4 +- consensus/dev/dev.go | 5 +- consensus/dummy/dummy.go | 4 +- consensus/ibft/ibft.go | 5 +- consensus/ibft/ibft_test.go | 3 +- helper/progress/chain.go | 129 ++++++++++++++++++++++++++++++++++++ jsonrpc/eth_endpoint.go | 8 ++- jsonrpc/types.go | 1 + protocol/syncer.go | 127 ++++------------------------------- protocol/syncer_test.go | 12 ++-- protocol/testing.go | 2 +- server/server.go | 73 +++++++++++++------- 16 files changed, 251 insertions(+), 164 deletions(-) create mode 100644 helper/progress/chain.go diff --git a/archive/backup.go b/archive/backup.go index 1cba28bd49..ec6050aa04 100644 --- a/archive/backup.go +++ b/archive/backup.go @@ -122,10 +122,12 @@ func determineTo(ctx context.Context, clt proto.SystemClient, to *uint64) (uint6 } } } + // otherwise use latest block number as to return uint64(status.Current.Number), types.StringToHash(status.Current.Hash), nil } +// writeMetadata writes the latest block height and the block hash to the writer func writeMetadata(writer io.Writer, logger hclog.Logger, to uint64, toHash types.Hash) error { metadata := Metadata{ Latest: to, @@ -153,9 +155,9 @@ func processExportStream( getResult := func() (*uint64, *uint64, error) { if from == nil || to == nil { return nil, nil, errors.New("couldn't get any blocks") - } else { - return from, to, nil } + + return from, to, nil } var total uint64 diff --git a/archive/restore.go b/archive/restore.go index 6e5823e078..10cd6bf0c2 100644 --- a/archive/restore.go +++ b/archive/restore.go @@ -7,19 +7,22 @@ import ( "math/big" "os" + "github.com/0xPolygon/polygon-edge/blockchain" "github.com/0xPolygon/polygon-edge/helper/common" + "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/types" ) type blockchainInterface interface { + SubscribeEvents() blockchain.Subscription Genesis() types.Hash GetBlockByNumber(uint64, bool) (*types.Block, bool) GetHashByNumber(uint64) types.Hash WriteBlock(*types.Block) error } -// RestoreChain loads blockchain archive from file and write blocks to the chain -func RestoreChain(chain blockchainInterface, filePath string) error { +// RestoreChain reads blocks from the archive and write to the chain +func RestoreChain(chain blockchainInterface, filePath string, progression *progress.ProgressionWrapper) error { fp, err := os.Open(filePath) if err != nil { return err @@ -27,11 +30,11 @@ func RestoreChain(chain blockchainInterface, filePath string) error { blockStream := newBlockStream(fp) - return importBlocks(chain, blockStream) + return importBlocks(chain, blockStream, progression) } // import blocks scans all blocks from stream and write them to chain -func importBlocks(chain blockchainInterface, blockStream *blockStream) error { +func importBlocks(chain blockchainInterface, blockStream *blockStream, progression *progress.ProgressionWrapper) error { shutdownCh := common.GetTerminationSignalCh() metadata, err := blockStream.getMetadata() @@ -59,6 +62,14 @@ func importBlocks(chain blockchainInterface, blockStream *blockStream) error { return nil } + // Create a blockchain subscription for the sync progression and start tracking + progression.StartProgression(firstBlock.Number(), chain.SubscribeEvents()) + // Stop monitoring the sync progression upon exit + defer progression.StopProgression() + + // Set the goal + progression.UpdateHighestProgression(metadata.Latest) + nextBlock := firstBlock for { @@ -66,6 +77,8 @@ func importBlocks(chain blockchainInterface, blockStream *blockStream) error { return err } + progression.UpdateCurrentProgression(nextBlock.Number()) + nextBlock, err = blockStream.nextBlock() if err != nil { return err diff --git a/archive/restore_test.go b/archive/restore_test.go index ea1699935a..eb3a8da61c 100644 --- a/archive/restore_test.go +++ b/archive/restore_test.go @@ -8,6 +8,9 @@ import ( "os" "testing" + "github.com/0xPolygon/polygon-edge/blockchain" + "github.com/0xPolygon/polygon-edge/helper/progress" + "github.com/0xPolygon/polygon-edge/protocol" "github.com/0xPolygon/polygon-edge/types" "github.com/stretchr/testify/assert" ) @@ -53,6 +56,10 @@ func (m *mockChain) WriteBlock(block *types.Block) error { return nil } +func (m *mockChain) SubscribeEvents() blockchain.Subscription { + return protocol.NewMockSubscription() +} + func getLatestBlockFromMockChain(m *mockChain) *types.Block { if l := len(m.blocks); l != 0 { return m.blocks[l-1] @@ -103,8 +110,9 @@ func Test_importBlocks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + progression := progress.NewProgressionWrapper(progress.ChainSyncRestore) blockStream := newTestBlockStream(tt.metadata, tt.archiveBlocks...) - err := importBlocks(tt.chain, blockStream) + err := importBlocks(tt.chain, blockStream, progression) assert.Equal(t, tt.err, err) latestBlock := getLatestBlockFromMockChain(tt.chain) diff --git a/archive/types.go b/archive/types.go index 9d876a1712..2fa6e38c56 100644 --- a/archive/types.go +++ b/archive/types.go @@ -13,14 +13,17 @@ type Metadata struct { LatestHash types.Hash } +// MarshalRLP returns RLP encoded bytes func (m *Metadata) MarshalRLP() []byte { return m.MarshalRLPTo(nil) } +// MarshalRLPTo sets RLP encoded bytes to given byte slice func (m *Metadata) MarshalRLPTo(dst []byte) []byte { return types.MarshalRLPTo(m.MarshalRLPWith, dst) } +// MarshalRLPWith appends own field into arena for encode func (m *Metadata) MarshalRLPWith(arena *fastrlp.Arena) *fastrlp.Value { vv := arena.NewArray() @@ -30,10 +33,12 @@ func (m *Metadata) MarshalRLPWith(arena *fastrlp.Arena) *fastrlp.Value { return vv } +// UnmarshalRLP unmarshals and sets the fields from RLP encoded bytes func (m *Metadata) UnmarshalRLP(input []byte) error { return types.UnmarshalRlp(m.UnmarshalRLPFrom, input) } +// UnmarshalRLPFrom sets the fields from parsed RLP encoded value func (m *Metadata) UnmarshalRLPFrom(p *fastrlp.Parser, v *fastrlp.Value) error { elems, err := v.GetElems() if err != nil { diff --git a/consensus/consensus.go b/consensus/consensus.go index 63c4b0e8b9..d65afd1c2e 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -6,8 +6,8 @@ import ( "github.com/0xPolygon/polygon-edge/blockchain" "github.com/0xPolygon/polygon-edge/chain" + "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/network" - "github.com/0xPolygon/polygon-edge/protocol" "github.com/0xPolygon/polygon-edge/secrets" "github.com/0xPolygon/polygon-edge/state" "github.com/0xPolygon/polygon-edge/txpool" @@ -26,7 +26,7 @@ type Consensus interface { GetBlockCreator(header *types.Header) (types.Address, error) // GetSyncProgression retrieves the current sync progression, if any - GetSyncProgression() *protocol.Progression + GetSyncProgression() *progress.Progression // Initialize initializes the consensus (e.g. setup data) Initialize() error diff --git a/consensus/dev/dev.go b/consensus/dev/dev.go index 90cedf82d4..3518287180 100644 --- a/consensus/dev/dev.go +++ b/consensus/dev/dev.go @@ -5,10 +5,9 @@ import ( "fmt" "time" - "github.com/0xPolygon/polygon-edge/protocol" - "github.com/0xPolygon/polygon-edge/blockchain" "github.com/0xPolygon/polygon-edge/consensus" + "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/state" "github.com/0xPolygon/polygon-edge/txpool" "github.com/0xPolygon/polygon-edge/types" @@ -219,7 +218,7 @@ func (d *Dev) GetBlockCreator(header *types.Header) (types.Address, error) { return header.Miner, nil } -func (d *Dev) GetSyncProgression() *protocol.Progression { +func (d *Dev) GetSyncProgression() *progress.Progression { return nil } diff --git a/consensus/dummy/dummy.go b/consensus/dummy/dummy.go index 427735acb5..6699577b2b 100644 --- a/consensus/dummy/dummy.go +++ b/consensus/dummy/dummy.go @@ -3,7 +3,7 @@ package dummy import ( "github.com/0xPolygon/polygon-edge/blockchain" "github.com/0xPolygon/polygon-edge/consensus" - "github.com/0xPolygon/polygon-edge/protocol" + "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/state" "github.com/0xPolygon/polygon-edge/txpool" "github.com/0xPolygon/polygon-edge/types" @@ -56,7 +56,7 @@ func (d *Dummy) GetBlockCreator(header *types.Header) (types.Address, error) { return header.Miner, nil } -func (d *Dummy) GetSyncProgression() *protocol.Progression { +func (d *Dummy) GetSyncProgression() *progress.Progression { return nil } diff --git a/consensus/ibft/ibft.go b/consensus/ibft/ibft.go index b0af665ef7..83a8b8649f 100644 --- a/consensus/ibft/ibft.go +++ b/consensus/ibft/ibft.go @@ -11,6 +11,7 @@ import ( "github.com/0xPolygon/polygon-edge/consensus/ibft/proto" "github.com/0xPolygon/polygon-edge/crypto" "github.com/0xPolygon/polygon-edge/helper/hex" + "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/network" "github.com/0xPolygon/polygon-edge/protocol" "github.com/0xPolygon/polygon-edge/secrets" @@ -52,7 +53,7 @@ type syncerInterface interface { BestPeer() *protocol.SyncPeer BulkSyncWithPeer(p *protocol.SyncPeer, newBlockHandler func(block *types.Block)) error WatchSyncWithPeer(p *protocol.SyncPeer, newBlockHandler func(b *types.Block) bool) - GetSyncProgression() *protocol.Progression + GetSyncProgression() *progress.Progression Broadcast(b *types.Block) } @@ -305,7 +306,7 @@ func (i *Ibft) Start() error { } // GetSyncProgression gets the latest sync progression, if any -func (i *Ibft) GetSyncProgression() *protocol.Progression { +func (i *Ibft) GetSyncProgression() *progress.Progression { return i.syncer.GetSyncProgression() } diff --git a/consensus/ibft/ibft_test.go b/consensus/ibft/ibft_test.go index 1ca7c87d20..2deb3e1792 100644 --- a/consensus/ibft/ibft_test.go +++ b/consensus/ibft/ibft_test.go @@ -9,6 +9,7 @@ import ( "github.com/0xPolygon/polygon-edge/consensus" "github.com/0xPolygon/polygon-edge/consensus/ibft/proto" "github.com/0xPolygon/polygon-edge/helper/hex" + "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/protocol" "github.com/0xPolygon/polygon-edge/state" "github.com/0xPolygon/polygon-edge/types" @@ -692,7 +693,7 @@ func (s *mockSyncer) WatchSyncWithPeer(p *protocol.SyncPeer, handler func(b *typ } } -func (s *mockSyncer) GetSyncProgression() *protocol.Progression { +func (s *mockSyncer) GetSyncProgression() *progress.Progression { return nil } diff --git a/helper/progress/chain.go b/helper/progress/chain.go new file mode 100644 index 0000000000..b69fe68230 --- /dev/null +++ b/helper/progress/chain.go @@ -0,0 +1,129 @@ +package progress + +import ( + "sync" + + "github.com/0xPolygon/polygon-edge/blockchain" +) + +type ChainSyncType string + +const ( + ChainSyncRestore ChainSyncType = "restore" + ChainSyncBulk ChainSyncType = "bulk-sync" +) + +// Progression defines the status of the sync +// progression of the node +type Progression struct { + // SyncType is indicating the sync method + SyncType ChainSyncType + + // StartingBlock is the initial block that the node is starting + // the sync from. It is reset after every sync batch + StartingBlock uint64 + + // CurrentBlock is the last written block from the sync batch + CurrentBlock uint64 + + // HighestBlock is the target block in the sync batch + HighestBlock uint64 +} + +type ProgressionWrapper struct { + // progression is a reference to the ongoing batch sync. + // Nil if no batch sync is currently in progress + progression *Progression + + // stopCh is the channel for receiving stop signals + // in progression tracking + stopCh chan struct{} + + lock sync.RWMutex + + syncType ChainSyncType +} + +func NewProgressionWrapper(syncType ChainSyncType) *ProgressionWrapper { + return &ProgressionWrapper{ + progression: nil, + stopCh: make(chan struct{}), + syncType: syncType, + } +} + +// startProgression initializes the progression tracking +func (pw *ProgressionWrapper) StartProgression( + startingBlock uint64, + subscription blockchain.Subscription, +) { + pw.lock.Lock() + defer pw.lock.Unlock() + + pw.progression = &Progression{ + SyncType: pw.syncType, + StartingBlock: startingBlock, + } + + go pw.RunUpdateLoop(subscription) +} + +// runUpdateLoop starts the blockchain event monitoring loop and +// updates the currently written block in the batch sync +func (pw *ProgressionWrapper) RunUpdateLoop(subscription blockchain.Subscription) { + eventCh := subscription.GetEventCh() + + for { + select { + case event := <-eventCh: + if event.Type == blockchain.EventFork { + continue + } + + if len(event.NewChain) == 0 { + continue + } + + lastBlock := event.NewChain[len(event.NewChain)-1] + pw.UpdateCurrentProgression(lastBlock.Number) + case <-pw.stopCh: + subscription.Close() + + return + } + } +} + +// StopProgression stops the progression tracking +func (pw *ProgressionWrapper) StopProgression() { + pw.stopCh <- struct{}{} + + pw.lock.Lock() + defer pw.lock.Unlock() + + pw.progression = nil +} + +// UpdateCurrentProgression sets the currently written block in the bulk sync +func (pw *ProgressionWrapper) UpdateCurrentProgression(currentBlock uint64) { + pw.lock.Lock() + defer pw.lock.Unlock() + + pw.progression.CurrentBlock = currentBlock +} + +// UpdateHighestProgression sets the highest-known target block in the bulk sync +func (pw *ProgressionWrapper) UpdateHighestProgression(highestBlock uint64) { + pw.lock.Lock() + defer pw.lock.Unlock() + + pw.progression.HighestBlock = highestBlock +} + +// GetProgression returns the latest sync progression +func (pw *ProgressionWrapper) GetProgression() *Progression { + pw.lock.RLock() + defer pw.lock.RUnlock() + + return pw.progression +} diff --git a/jsonrpc/eth_endpoint.go b/jsonrpc/eth_endpoint.go index 7bffd87d73..9c9f7f8e0a 100644 --- a/jsonrpc/eth_endpoint.go +++ b/jsonrpc/eth_endpoint.go @@ -3,15 +3,16 @@ package jsonrpc import ( "errors" "fmt" + "math/big" + "github.com/0xPolygon/polygon-edge/chain" "github.com/0xPolygon/polygon-edge/helper/hex" - "github.com/0xPolygon/polygon-edge/protocol" + "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/state" "github.com/0xPolygon/polygon-edge/state/runtime" "github.com/0xPolygon/polygon-edge/types" "github.com/hashicorp/go-hclog" "github.com/umbracle/fastrlp" - "math/big" ) type ethTxPoolStore interface { @@ -58,7 +59,7 @@ type ethBlockchainStore interface { ApplyTxn(header *types.Header, txn *types.Transaction) (*runtime.ExecutionResult, error) // GetSyncProgression retrieves the current sync progression, if any - GetSyncProgression() *protocol.Progression + GetSyncProgression() *progress.Progression } // ethStore provides access to the methods needed by eth endpoint @@ -109,6 +110,7 @@ func (e *Eth) Syncing() (interface{}, error) { if syncProgression := e.store.GetSyncProgression(); syncProgression != nil { // Node is bulk syncing, return the status return progression{ + Type: string(syncProgression.SyncType), StartingBlock: hex.EncodeUint64(syncProgression.StartingBlock), CurrentBlock: hex.EncodeUint64(syncProgression.CurrentBlock), HighestBlock: hex.EncodeUint64(syncProgression.HighestBlock), diff --git a/jsonrpc/types.go b/jsonrpc/types.go index d706f0069e..2e3213abe0 100644 --- a/jsonrpc/types.go +++ b/jsonrpc/types.go @@ -305,6 +305,7 @@ type txnArgs struct { } type progression struct { + Type string `json:"type"` StartingBlock string `json:"startingBlock"` CurrentBlock string `json:"currentBlock"` HighestBlock string `json:"highestBlock"` diff --git a/protocol/syncer.go b/protocol/syncer.go index 21b9db20e5..69a3961c02 100644 --- a/protocol/syncer.go +++ b/protocol/syncer.go @@ -10,6 +10,7 @@ import ( "time" "github.com/0xPolygon/polygon-edge/blockchain" + "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/network" libp2pGrpc "github.com/0xPolygon/polygon-edge/network/grpc" "github.com/0xPolygon/polygon-edge/protocol/proto" @@ -192,106 +193,6 @@ func statusFromProto(p *proto.V1Status) (*Status, error) { return s, nil } -type progressionWrapper struct { - // progression is a reference to the ongoing batch sync. - // Nil if no batch sync is currently in progress - progression *Progression - - // stopCh is the channel for receiving stop signals - // in progression tracking - stopCh chan struct{} - - lock sync.RWMutex -} - -// startProgression initializes the progression tracking -func (pw *progressionWrapper) startProgression( - startingBlock uint64, - subscription blockchain.Subscription, -) { - pw.lock.Lock() - defer pw.lock.Unlock() - - pw.progression = &Progression{ - StartingBlock: startingBlock, - } - - go pw.runUpdateLoop(subscription) -} - -// runUpdateLoop starts the blockchain event monitoring loop and -// updates the currently written block in the batch sync -func (pw *progressionWrapper) runUpdateLoop(subscription blockchain.Subscription) { - eventCh := subscription.GetEventCh() - - for { - select { - case event := <-eventCh: - if event.Type == blockchain.EventFork { - continue - } - - if len(event.NewChain) == 0 { - continue - } - - pw.updateCurrentProgression(event.NewChain[0].Number) - case <-pw.stopCh: - subscription.Close() - - return - } - } -} - -// stopProgression stops the progression tracking -func (pw *progressionWrapper) stopProgression() { - pw.stopCh <- struct{}{} - - pw.lock.Lock() - defer pw.lock.Unlock() - - pw.progression = nil -} - -// updateCurrentProgression sets the currently written block in the bulk sync -func (pw *progressionWrapper) updateCurrentProgression(currentBlock uint64) { - pw.lock.Lock() - defer pw.lock.Unlock() - - pw.progression.CurrentBlock = currentBlock -} - -// updateHighestProgression sets the highest-known target block in the bulk sync -func (pw *progressionWrapper) updateHighestProgression(highestBlock uint64) { - pw.lock.Lock() - defer pw.lock.Unlock() - - pw.progression.HighestBlock = highestBlock -} - -// getProgression returns the latest sync progression -func (pw *progressionWrapper) getProgression() *Progression { - pw.lock.RLock() - defer pw.lock.RUnlock() - - return pw.progression -} - -// Progression defines the status of the sync -// progression of the node -type Progression struct { - // StartingBlock is the initial block that the node is starting - // the sync from. It is reset after every sync batch - StartingBlock uint64 - - // CurrentBlock is the last written block from the sync batch - CurrentBlock uint64 - - // HighestBlock is the target block in the sync batch - HighestBlock uint64 -} - // Syncer is a sync protocol type Syncer struct { logger hclog.Logger @@ -307,28 +208,25 @@ type Syncer struct { server *network.Server - syncProgression *progressionWrapper + syncProgression *progress.ProgressionWrapper } // NewSyncer creates a new Syncer instance func NewSyncer(logger hclog.Logger, server *network.Server, blockchain blockchainShim) *Syncer { s := &Syncer{ - logger: logger.Named("syncer"), - stopCh: make(chan struct{}), - blockchain: blockchain, - server: server, - syncProgression: &progressionWrapper{ - progression: nil, - stopCh: make(chan struct{}), - }, + logger: logger.Named("syncer"), + stopCh: make(chan struct{}), + blockchain: blockchain, + server: server, + syncProgression: progress.NewProgressionWrapper(progress.ChainSyncBulk), } return s } // GetSyncProgression returns the latest sync progression, if any -func (s *Syncer) GetSyncProgression() *Progression { - return s.syncProgression.getProgression() +func (s *Syncer) GetSyncProgression() *progress.Progression { + return s.syncProgression.GetProgression() } // syncCurrentStatus taps into the blockchain event steam and updates the Syncer.status field @@ -715,16 +613,17 @@ func (s *Syncer) BulkSyncWithPeer(p *SyncPeer, newBlockHandler func(block *types var lastTarget uint64 // Create a blockchain subscription for the sync progression and start tracking - s.syncProgression.startProgression(startBlock.Number, s.blockchain.SubscribeEvents()) + s.syncProgression.StartProgression(startBlock.Number, s.blockchain.SubscribeEvents()) // Stop monitoring the sync progression upon exit - defer s.syncProgression.stopProgression() + defer s.syncProgression.StopProgression() // sync up to the current known header for { // update target target := p.status.Number - s.syncProgression.updateHighestProgression(target) + + s.syncProgression.UpdateHighestProgression(target) if target == lastTarget { // there are no more changes to pull for now diff --git a/protocol/syncer_test.go b/protocol/syncer_test.go index 9b077a023d..729ea7c50f 100644 --- a/protocol/syncer_test.go +++ b/protocol/syncer_test.go @@ -385,26 +385,26 @@ func TestSyncer_GetSyncProgression(t *testing.T) { syncHeaders := blockchain.NewTestHeaderChainWithSeed(nil, targetChainSize, 0) syncBlocks := blockchain.HeadersToBlocks(syncHeaders) - syncer.syncProgression.startProgression(uint64(initialChainSize), syncerChain.SubscribeEvents()) + syncer.syncProgression.StartProgression(uint64(initialChainSize), syncerChain.SubscribeEvents()) if syncer.GetSyncProgression() == nil { t.Fatalf("Unable to start progression") } - assert.Equal(t, uint64(initialChainSize), syncer.syncProgression.getProgression().StartingBlock) + assert.Equal(t, uint64(initialChainSize), syncer.syncProgression.GetProgression().StartingBlock) - syncer.syncProgression.updateHighestProgression(uint64(targetChainSize)) + syncer.syncProgression.UpdateHighestProgression(uint64(targetChainSize)) - assert.Equal(t, uint64(targetChainSize), syncer.syncProgression.getProgression().HighestBlock) + assert.Equal(t, uint64(targetChainSize), syncer.syncProgression.GetProgression().HighestBlock) writeErr := syncerChain.WriteBlocks(syncBlocks[initialChainSize+1:]) assert.NoError(t, writeErr) WaitUntilProgressionUpdated(t, syncer, 15*time.Second, uint64(targetChainSize-1)) - assert.Equal(t, uint64(targetChainSize-1), syncer.syncProgression.getProgression().CurrentBlock) + assert.Equal(t, uint64(targetChainSize-1), syncer.syncProgression.GetProgression().CurrentBlock) - syncer.syncProgression.stopProgression() + syncer.syncProgression.StopProgression() } type mockBlockStore struct { diff --git a/protocol/testing.go b/protocol/testing.go index cdb3b858ee..5d87754db2 100644 --- a/protocol/testing.go +++ b/protocol/testing.go @@ -119,7 +119,7 @@ func WaitUntilProgressionUpdated(t *testing.T, syncer *Syncer, timeout time.Dura }) _, err := tests.RetryUntilTimeout(ctx, func() (interface{}, bool) { - return nil, syncer.syncProgression.getProgression().CurrentBlock < target + return nil, syncer.syncProgression.GetProgression().CurrentBlock < target }) assert.NoError(t, err) } diff --git a/server/server.go b/server/server.go index 6bbf96c73f..2f55279630 100644 --- a/server/server.go +++ b/server/server.go @@ -18,9 +18,9 @@ import ( "github.com/0xPolygon/polygon-edge/crypto" "github.com/0xPolygon/polygon-edge/helper/common" "github.com/0xPolygon/polygon-edge/helper/keccak" + "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/jsonrpc" "github.com/0xPolygon/polygon-edge/network" - "github.com/0xPolygon/polygon-edge/protocol" "github.com/0xPolygon/polygon-edge/secrets" "github.com/0xPolygon/polygon-edge/server/proto" "github.com/0xPolygon/polygon-edge/state" @@ -67,8 +67,12 @@ type Server struct { serverMetrics *serverMetrics prometheusServer *http.Server + // secrets manager secretsManager secrets.SecretsManager + + // restore + restoreProgression *progress.ProgressionWrapper } var dirPaths = []string{ @@ -80,10 +84,11 @@ var dirPaths = []string{ // NewServer creates a new Minimal server, using the passed in configuration func NewServer(logger hclog.Logger, config *Config) (*Server, error) { m := &Server{ - logger: logger, - config: config, - chain: config.Chain, - grpcServer: grpc.NewServer(), + logger: logger, + config: config, + chain: config.Chain, + grpcServer: grpc.NewServer(), + restoreProgression: progress.NewProgressionWrapper(progress.ChainSyncRestore), } m.logger.Info("Data dir", "path", config.DataDir) @@ -195,11 +200,14 @@ func NewServer(logger hclog.Logger, config *Config) (*Server, error) { return nil, err } + // setup and start jsonrpc server + if err := m.setupJSONRPC(); err != nil { + return nil, err + } + // restore archive data before starting - if config.RestoreFile != nil { - if err := archive.RestoreChain(m.blockchain, *config.RestoreFile); err != nil { - return nil, err - } + if err := m.restoreChain(); err != nil { + return nil, err } // start consensus @@ -212,11 +220,6 @@ func NewServer(logger hclog.Logger, config *Config) (*Server, error) { return nil, err } - // setup and start jsonrpc server - if err := m.setupJSONRPC(); err != nil { - return nil, err - } - if err := m.network.Start(); err != nil { return nil, err } @@ -226,6 +229,18 @@ func NewServer(logger hclog.Logger, config *Config) (*Server, error) { return m, nil } +func (s *Server) restoreChain() error { + if s.config.RestoreFile == nil { + return nil + } + + if err := archive.RestoreChain(s.blockchain, *s.config.RestoreFile, s.restoreProgression); err != nil { + return err + } + + return nil +} + type txpoolHub struct { state state.State *blockchain.Blockchain @@ -360,7 +375,8 @@ func (s *Server) setupConsensus() error { } type jsonRPCHub struct { - state state.State + state state.State + restoreProgression *progress.ProgressionWrapper *blockchain.Blockchain *txpool.TxPool @@ -458,8 +474,18 @@ func (j *jsonRPCHub) ApplyTxn( return } -func (j *jsonRPCHub) GetSyncProgression() *protocol.Progression { - return j.Consensus.GetSyncProgression() +func (j *jsonRPCHub) GetSyncProgression() *progress.Progression { + // restore progression + if restoreProg := j.restoreProgression.GetProgression(); restoreProg != nil { + return restoreProg + } + + // consensus sync progression + if consensusSyncProg := j.Consensus.GetSyncProgression(); consensusSyncProg != nil { + return consensusSyncProg + } + + return nil } // SETUP // @@ -467,12 +493,13 @@ func (j *jsonRPCHub) GetSyncProgression() *protocol.Progression { // setupJSONRCP sets up the JSONRPC server, using the set configuration func (s *Server) setupJSONRPC() error { hub := &jsonRPCHub{ - state: s.state, - Blockchain: s.blockchain, - TxPool: s.txpool, - Executor: s.executor, - Consensus: s.consensus, - Server: s.network, + state: s.state, + restoreProgression: s.restoreProgression, + Blockchain: s.blockchain, + TxPool: s.txpool, + Executor: s.executor, + Consensus: s.consensus, + Server: s.network, } conf := &jsonrpc.Config{