Skip to content

Commit

Permalink
network: integrate state sync module with blockfetcher
Browse files Browse the repository at this point in the history
Close #3574

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 20, 2025
1 parent 4af6927 commit ee083c5
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 47 deletions.
14 changes: 9 additions & 5 deletions config/protocol.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ ProtocolConfiguration:
- seed5t5.neo.org:20333
VerifyTransactions: false
P2PSigExtensions: false
StateRootInHeader: false
P2PStateExchangeExtensions: true
StateSyncInterval: 10
Hardforks:
Aspidochelone: 210000
Basilisk: 2680000
Cockatrice: 3967000
Domovoi: 4144000

ApplicationConfiguration:
RemoveUntraceableBlocks: true
SkipBlockVerification: false
# LogPath could be set up in case you need stdout logs to some proper file.
# LogPath: "./log/neogo.log"
Expand All @@ -57,14 +61,14 @@ ApplicationConfiguration:
# FilePath: "./chains/testnet.bolt"
P2P:
Addresses:
- ":20333" # in form of "[host]:[port][:announcedPort]"
- ":20335" # in form of "[host]:[port][:announcedPort]"
DialTimeout: 3s
ProtoTickInterval: 2s
PingInterval: 30s
PingTimeout: 90s
MaxPeers: 100
MaxPeers: 1
AttemptConnPeers: 20
MinPeers: 10
MinPeers: 1
Relay: true
Consensus:
Enabled: false
Expand All @@ -83,7 +87,7 @@ ApplicationConfiguration:
RPC:
Enabled: true
Addresses:
- ":20332"
- ":20338"
MaxGasInvoke: 15
EnableCORSWorkaround: false
TLSConfig:
Expand All @@ -93,7 +97,7 @@ ApplicationConfiguration:
CertFile: serv.crt
KeyFile: serv.key
Prometheus:
Enabled: true
Enabled: false
Addresses:
- ":2112"
Pprof:
Expand Down
1 change: 1 addition & 0 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type NeoFSBlockFetcher struct {
BQueueSize int `yaml:"BQueueSize"`
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
IndexFileSize uint32 `yaml:"IndexFileSize"`
BlocksOnly bool `yaml:"BlocksOnly"`
}

// Validate checks NeoFSBlockFetcher for internal consistency and ensures
Expand Down
5 changes: 1 addition & 4 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,6 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
}
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
}
Expand Down Expand Up @@ -1605,7 +1602,7 @@ func (bc *Blockchain) GetStateModule() StateRoot {

// GetStateSyncModule returns new state sync service instance.
func (bc *Blockchain) GetStateSyncModule() *statesync.Module {
return statesync.NewModule(bc, bc.stateRoot, bc.log, bc.dao, bc.jumpToState)
return statesync.NewModule(bc, bc.log, bc.dao, bc.jumpToState)
}

// storeBlock performs chain update using the block given, it executes all
Expand Down
27 changes: 14 additions & 13 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/dao"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/stateroot"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/util"
Expand Down Expand Up @@ -85,30 +84,27 @@ type Module struct {
// blockHeight is the index of the latest stored block.
blockHeight uint32

dao *dao.Simple
bc Ledger
stateMod *stateroot.Module
mptpool *Pool
dao *dao.Simple
bc Ledger
mptpool *Pool

billet *mpt.Billet

jumpCallback func(p uint32) error
}

// NewModule returns new instance of statesync module.
func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module {
func NewModule(bc Ledger, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module {
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) {
return &Module{
dao: s,
bc: bc,
stateMod: stateMod,
syncStage: inactive,
}
}
return &Module{
dao: s,
bc: bc,
stateMod: stateMod,
log: log,
syncInterval: uint32(bc.GetConfig().StateSyncInterval),
mptpool: NewPool(),
Expand Down Expand Up @@ -160,10 +156,10 @@ func (s *Module) Init(currChainHeight uint32) error {
// current chain's state until new state is completely fetched, outdated state-related data
// will be removed from storage during (*Blockchain).jumpToState(...) execution.
// All we need to do right now is to remove genesis-related MPT nodes.
err = s.stateMod.CleanStorage()
if err != nil {
return fmt.Errorf("failed to remove outdated MPT data from storage: %w", err)
}
//err = s.stateMod.CleanStorage()
//if err != nil {
// return fmt.Errorf("failed to remove outdated MPT data from storage: %w", err)
//}
}

s.syncPoint = p
Expand Down Expand Up @@ -212,7 +208,7 @@ func (s *Module) defineSyncStage() error {
if s.blockHeight > s.syncPoint {
s.syncStage |= mptSynced
s.log.Info("MPT is in sync",
zap.Uint32("stateroot height", s.stateMod.CurrentLocalHeight()))
zap.Uint32("stateroot height", s.syncPoint))
} else if s.syncStage&headersSynced != 0 {
header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint + 1))
if err != nil {
Expand Down Expand Up @@ -516,3 +512,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {

return s.mptpool.GetBatch(limit)
}

// GetConfig returns current blockchain configuration.
func (s *Module) GetConfig() config.Blockchain {
return s.bc.GetConfig()
}
66 changes: 47 additions & 19 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type (
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
headerFetcher *blockfetcher.Service
blockFetcher *blockfetcher.Service

serviceLock sync.RWMutex
Expand Down Expand Up @@ -227,10 +228,15 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 {
s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize
}
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
s.bFetcherQueue = bqueue.New(s.stateSync, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock,
sync.OnceFunc(func() { close(s.blockFetcherFin) }))
s.headerFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader,
nil)
if err != nil {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}
s.NeoFSBlockFetcherCfg.BlocksOnly = true
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, sync.OnceFunc(func() { close(s.blockFetcherFin) }))
if err != nil {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}
Expand Down Expand Up @@ -311,12 +317,6 @@ func (s *Server) Start() {
go s.bQueue.Run()
go s.bSyncQueue.Run()
go s.bFetcherQueue.Run()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.blockFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
for _, tr := range s.transports {
go tr.Accept()
}
Expand All @@ -333,6 +333,7 @@ func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
s.bFetcherQueue.Discard()
s.headerFetcher.Shutdown()
s.blockFetcher.Shutdown()
}
for _, tr := range s.transports {
Expand Down Expand Up @@ -571,6 +572,12 @@ func (s *Server) run() {
s.discovery.RegisterGood(p)

s.tryInitStateSync()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.headerFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
s.tryStartServices()
case <-s.blockFetcherFin:
if s.started.Load() {
Expand Down Expand Up @@ -732,7 +739,7 @@ func (s *Server) IsInSync() bool {
var peersNumber int
var notHigher int

if s.stateSync.IsActive() || s.blockFetcher.IsActive() {
if s.stateSync.IsActive() || s.headerFetcher.IsActive() || s.blockFetcher.IsActive() {
return false
}

Expand Down Expand Up @@ -792,7 +799,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {

// handleBlockCmd processes the block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.blockFetcher.IsActive() {
if s.headerFetcher.IsActive() {
return nil
}
if s.stateSync.IsActive() {
Expand All @@ -815,15 +822,25 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
}

func (s *Server) requestBlocksOrHeaders(p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.NeedHeaders() {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p)
}
return nil
}
fmt.Println(" requestBlocksOrHeaders end Of headers")
if s.headerFetcher.IsActive() {
fmt.Println("s.headerFetcher.Shutdown()")
s.headerFetcher.Shutdown()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.blockFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
}
//stop bf and start new+ mpt

var (
bq bqueue.Blockqueuer = s.chain
requestMPTNodes bool
Expand All @@ -847,6 +864,10 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error {

// requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers.
func (s *Server) requestHeaders(p Peer) error {
if s.headerFetcher.IsActive() {
//add by blockfetcher
return nil
}
pl := getRequestBlocksPayload(p, s.chain.HeaderHeight(), &s.lastRequestedHeader)
return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, pl))
}
Expand Down Expand Up @@ -1136,7 +1157,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error

// handleHeadersCmd processes headers payload.
func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error {
if s.blockFetcher.IsActive() {
if s.headerFetcher.IsActive() {
return nil
}
return s.stateSync.AddHeaders(h.Hdrs...)
Expand Down Expand Up @@ -1335,6 +1356,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// 2. Send requests for chunk in increasing order.
// 3. After all requests have been sent, request random height.
func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}
pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock)
lq, capLeft := s.bQueue.LastQueued()
if capLeft == 0 {
Expand Down Expand Up @@ -1452,6 +1476,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
} else {
switch msg.Command {
case CMDVersion:
fmt.Println("case CMDVersion:")
version := msg.Payload.(*payload.Version)
return s.handleVersionCmd(peer, version)
case CMDVerack:
Expand All @@ -1468,18 +1493,16 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
}

func (s *Server) tryInitStateSync() {
if s.blockFetcher.IsActive() {
return
}
if !s.stateSync.IsActive() {
s.bSyncQueue.Discard()
return
}

fmt.Println("tryInitStateSync 1")
if s.stateSync.IsInitialized() {
return
}

fmt.Println("tryInitStateSync 2")
s.lock.RLock()
heights := make([]uint32, 0, len(s.peers))
for p := range s.peers {
Expand All @@ -1489,7 +1512,11 @@ func (s *Server) tryInitStateSync() {
}
s.lock.RUnlock()
slices.Sort(heights)
fmt.Println("tryInitStateSync 3")
fmt.Println("heights", heights)
fmt.Println("MinPeers", s.MinPeers)
if len(heights) >= s.MinPeers && len(heights) > 0 {
fmt.Println("tryInitStateSync 4")
// choose the height of the median peer as the current chain's height
h := heights[len(heights)/2]
err := s.stateSync.Init(h)
Expand All @@ -1506,6 +1533,7 @@ func (s *Server) tryInitStateSync() {
s.bSyncQueue.Discard()
}
}
fmt.Println("tryInitStateSync end")
}

// BroadcastExtensible add a locally-generated Extensible payload to the pool
Expand Down
2 changes: 2 additions & 0 deletions pkg/network/state_sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package network

import (
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/util"
Expand All @@ -17,4 +18,5 @@ type StateSync interface {
NeedHeaders() bool
NeedMPTNodes() bool
Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error
GetConfig() config.Blockchain
}
Loading

0 comments on commit ee083c5

Please sign in to comment.