Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Headers fetching via NeoFS BlockFetcher service #3789

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

AliceInHunterland
Copy link
Contributor

Close #3574

@AliceInHunterland AliceInHunterland force-pushed the headers branch 2 times, most recently from ee083c5 to 6b9d29e Compare January 20, 2025 15:52
Copy link
Member

@AnnaShaleva AnnaShaleva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HeadersFetcher is started in the wrong place. Ensure that nothing prevents StateSync module to request blocks/headers during the node start.

pkg/network/bqueue/queue.go Outdated Show resolved Hide resolved
pkg/network/server.go Outdated Show resolved Hide resolved
pkg/network/server.go Outdated Show resolved Hide resolved
pkg/network/server.go Outdated Show resolved Hide resolved
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock,
sync.OnceFunc(func() { close(s.blockFetcherFin) }))
s.headerFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide s.stateSync instead of chain as the first argument to blockfetcher.New.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The height is different for s.stateSync. It starts from 3 mln, so the block/header can persist, and the queue becomes locked. That's why I rolled it back to the chain. Why can't it be the chain? Blockfetcher uses it only for config and BlockHeight(). will recheck why its s.stateSync starts from the middle of the sequence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need BlockHeight and HeaderHeight to be aligned with statesync module vision, that's the main idea.

pkg/core/statesync/module.go Outdated Show resolved Hide resolved
Copy link
Member

@AnnaShaleva AnnaShaleva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the service start work as expected?

@@ -285,9 +285,6 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more sophisticated check. If BlockFetcher is on, then it's OK, but if Blockfetcher is off and StateRootInHeader is also off, then it's an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix error message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, when we have NeoFSStateSyncExtensions this conversation is not relevant. Untie NeoFSStateSyncExtensions from P2PStateExchangeExtensions, these are two different settings. Implement esparate verification logic for NeoFSStateSyncExtensions.

Comment on lines 822 to 886
if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if check must be adjusted. Right now it's related to peers only, but if blockfetcher is used, then we don't care about peer height because another source of data is used (NeoFS storage).

if s.stateSync.NeedHeaders() {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p)
}
return nil
}
s.headerFetcher.Shutdown()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.blockFetcher.Start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blockfetcher should fetch exactly the range of blocks that is required by statesymc module if statesync module is enabled. If statesync module is disabled, then blockfetcher should fetch all blocks starting from 0. Check that this logic is present in the blockfetcher.

Comment on lines 858 to 922
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.headerFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
if s.headerFetcher.IsActive() {
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition may lead to deadlock, consider the situation: NeoFS storage is missing the latest headers. Once started, headerfetcher will fetch all available headers and shutdown. However, if some latest headers are still required by statesync, then we need to switch to P2P. Right now this code will restart headersfetcher anyway. Setting s.headerFetcher to nil after the first headerfetcher shutdown (and checking for nil in dependent places) may do the trick.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with nil we cant use isActive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without nil we can't distinguish finished service from not started. So additional nil checks is a trade-off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added isShutdown to the blockfetcher. if you don't like it, will look more precisely to the nil.

Comment on lines 265 to 271
err = bfs.enqueueBlock(b)
if !bfs.cfg.BlocksOnly {
err = bfs.enqueueHeader(&b.Header)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the code that will read header from stream without transactions reading.

Comment on lines 230 to 231
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
s.bFetcherQueue = bqueue.New(s.stateSync, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need a switch here. The main idea is that blockfetcher should work both with statesync module enabled/disabled. If statesync module is disabled then we need a usual blockfetcher operation flow that will fetch blocks, put them directly into chain-backed queue and shutdown. If statesync module is enabled then blockfetcher should use statesync-backed queue and headersfetcher should also be started.

Copy link

codecov bot commented Jan 21, 2025

Codecov Report

Attention: Patch coverage is 41.21864% with 164 lines in your changes missing coverage. Please review.

Project coverage is 82.82%. Comparing base (72e0cff) to head (d94eb81).

Files with missing lines Patch % Lines
pkg/services/blockfetcher/blockfetcher.go 21.25% 61 Missing and 2 partials ⚠️
pkg/network/server.go 27.53% 41 Missing and 9 partials ⚠️
pkg/core/statesync/module.go 67.44% 13 Missing and 1 partial ⚠️
pkg/core/blockchain.go 0.00% 10 Missing and 2 partials ⚠️
pkg/network/bqueue_adapters.go 44.44% 10 Missing ⚠️
pkg/network/bqueue/queue.go 73.52% 8 Missing and 1 partial ⚠️
pkg/core/block/header.go 73.68% 4 Missing and 1 partial ⚠️
pkg/consensus/consensus.go 0.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3789      +/-   ##
==========================================
- Coverage   83.02%   82.82%   -0.20%     
==========================================
  Files         336      337       +1     
  Lines       47090    47286     +196     
==========================================
+ Hits        39095    39164      +69     
- Misses       6395     6510     +115     
- Partials     1600     1612      +12     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@AliceInHunterland
Copy link
Contributor Author

sometimes:

2025-01-21T23:53:55.597+0300    INFO    persisted to disk       {"blocks": 0, "keys": 1543, "headerHeight": 625633, "blockHeight": 0, "took": "15.490041ms"}
^C2025-01-21T23:53:56.155+0300  INFO    shutting down server    {"peers": 18}
2025-01-21T23:53:56.155+0300    INFO    shutting down NeoFS BlockFetcher service        {"force": true}
panic: send on closed channel

goroutine 533 [running]:
github.com/nspcc-dev/neo-go/pkg/network/bqueue.(*Queue).PutHeader(0x140000a5d40, 0x140050b9d40)
        github.com/nspcc-dev/neo-go/pkg/network/bqueue/queue.go:301 +0x228
github.com/nspcc-dev/neo-go/pkg/services/blockfetcher.(*Service).blockDownloader(0x140018cdce0)
        github.com/nspcc-dev/neo-go/pkg/services/blockfetcher/blockfetcher.go:314 +0x408
created by github.com/nspcc-dev/neo-go/pkg/services/blockfetcher.(*Service).Start in goroutine 171
        github.com/nspcc-dev/neo-go/pkg/services/blockfetcher/blockfetcher.go:229 +0x62c

pkg/network/server.go Outdated Show resolved Hide resolved
@AliceInHunterland AliceInHunterland force-pushed the headers branch 2 times, most recently from 2112fe0 to f3629c4 Compare January 22, 2025 20:10
@AliceInHunterland AliceInHunterland marked this pull request as ready for review January 23, 2025 09:22
Copy link
Member

@AnnaShaleva AnnaShaleva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better, but still needs polishing.

@@ -285,9 +285,6 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix error message.

@@ -131,6 +136,9 @@ func (s *Module) Init(currChainHeight uint32) error {
if p < 2*s.syncInterval {
// chain is too low to start state exchange process, use the standard sync mechanism
s.syncStage = inactive
if s.syncStage != oldStage {
s.notifyStageChanged()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it to the defered statement.


// stageCallback is an optional callback that is triggered whenever
// the sync stage changes.
stageCallback func()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stageChangedCallback?

// notifyStageChanged triggers stage callback if it's set.
func (s *Module) notifyStageChanged() {
if s.stageCallback != nil {
go s.stageCallback()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NACK, not a proper place for goroutine, make it synchronious on the module side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it fixed in the current implementation? moved it to go notifyStageChanged()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

if s.syncStage != oldStage {
s.notifyStageChanged()
oldStage = s.syncStage
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defineSyncStage is an atomic operation, you can't notify in the middle of it.

@@ -140,6 +143,18 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
if err != nil {
return nil, err
}

getCfg := chain.GetConfig()
m := smartcontract.GetDefaultHonestNodeCount(int(getCfg.ValidatorsCount))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to make it adaptive. Validators count depends on the node's setting, we have pre-defined list of heights where validators count may be updated. See CommitteeUpdateHistory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not yet implemented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we do Get on oid we do not know the index, so we can't check the relevant for this index Validation count.

Copy link
Contributor Author

@AliceInHunterland AliceInHunterland Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do something like this: pass to the oidsCh oid and Index, but its not really cool

Copy link
Member

@AnnaShaleva AnnaShaleva Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass to the oidsCh oid and Index

Do it. Implement GETRANGE-based solution for index-files-based headers search and for single-object header search, keep the old solution for SEARCH-based headers search (just set the index to -1 to reuse the same oidsCh). Create an issue for the latter case.

if isContextCanceledErr(err) {
return
}
bfs.log.Error("failed to decode block from stream", zap.String("oid", blkOid.String()), zap.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjust error text, it's not block anymore.

pkg/services/blockfetcher/blockfetcher.go Outdated Show resolved Hide resolved
pkg/services/blockfetcher/blockfetcher.go Outdated Show resolved Hide resolved
pkg/core/block/block.go Outdated Show resolved Hide resolved
@@ -509,7 +568,7 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
return s.mptpool.GetBatch(limit)
}

// HeaderHeight returns the height of the latest header.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollback

// New creates an instance of BlockQueue.
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue {
// New creates an instance of Queue that handles both blocks and headers.
func New(bc Blockqueuer, log *zap.Logger, relayer any, cacheSize int, lenMetricsUpdater func(l int), mode OperationMode, objectMode TypeMode) *Queue {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relayer is not any, it's a function with pre-defined signature.

Comment on lines 155 to 163
func (bl *Queue) add(item Queueble) error {
if b, ok := item.(*block.Block); ok {
return bl.chain.AddBlock(b)
}
if h, ok := item.(*block.Header); ok {
return bl.chain.AddHeaders(h)
}
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NACK. I literally mean make the queue generic:

var bQueue := New[Block](blockAdapter{chain}, ...)
var bSyncQueue := New[Block](blockAdapter{stateSyncModule}, ...)
var hSyncQueue := New[Header](headerAdapter{stateSyncModule}, ...)

With func New[Q Queueable](bc Blockqueuer[Q], ...). Then implement Blockqueuer over blockAdapter and headerAdapter. And then inside the queue use bq.chain.AddItem(item) for a single block/header or bq.chain.AddItems(item) for a multiple headers, that's it.

You will need to modify Blockqueuer:

type Blockqueuer[Q Queueable] interface {
	AddItem(Q) error
	AddItems(...Q) error
	Height() uint32
}

Comment on lines 54 to 60
// GetIndex returns the index of the block.
func (b *Block) GetIndex() uint32 {
return b.Header.Index
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need that, Header already has this definition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fixed.

pkg/core/block/header.go Outdated Show resolved Hide resolved
pkg/consensus/consensus.go Outdated Show resolved Hide resolved
s.stageChangedCallback = cb
}

// notifyStageChanged triggers stage callback if it's set.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not "stage callback". It notifies about state changes.

@@ -351,6 +399,9 @@ func (s *Module) AddBlock(block *block.Block) error {
s.syncStage |= blocksSynced
s.log.Info("blocks are in sync",
zap.Uint32("blockHeight", s.blockHeight))
if s.syncStage != oldStage {
s.notifyStageChanged()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that you need to move notification to the checkSyncIsCompleted and pass the old stage as an argument.

@@ -351,6 +399,9 @@ func (s *Module) AddBlock(block *block.Block) error {
s.syncStage |= blocksSynced
s.log.Info("blocks are in sync",
zap.Uint32("blockHeight", s.blockHeight))
if s.syncStage != oldStage {
s.notifyStageChanged()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it actually won't work properly because we need to notify only after mutex is removed. Hence, just move this to the defered statement to the start of the function.

@@ -1136,7 +1209,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error

// handleHeadersCmd processes headers payload.
func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error {
if s.blockFetcher.IsActive() {
if s.syncHeaderFetcher.IsActive() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fixed.

@@ -16,6 +16,9 @@ const (
// MaxVerificationScript is the maximum allowed length of verification
// script. It should be appropriate for 11/21 multisignature committee.
MaxVerificationScript = 1024

// DefaultInvocationScriptSize is the default length of invocation script with one signature.
DefaultInvocationScriptSize = 66
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line width. And it's relevant not only for Transaction, think about better place for it. Comment must be more specific, it's not about "default", tell something about Crypto.System.CheckSig. VM has definition of this constant, may be we can export.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// New creates a new BlockFetcher Service.
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) {
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put func(any) error, shutdownCallback func(), typeMode bqueue.TypeMode) (*Service, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to define this mode for blockfetcher, not for queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why typeMode when it's an operationMode?

account: account,
stateRootInHeader: chain.GetConfig().StateRootInHeader,
headerSize: getHeaderSize(chain.GetConfig()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It must be a mapping from height to header size, it's not a single value.

@AliceInHunterland AliceInHunterland force-pushed the headers branch 2 times, most recently from 231f7d7 to c3af507 Compare January 28, 2025 16:51
@@ -59,7 +59,7 @@ type (
Ledger interface {
extpool.Ledger
mempool.Feer
bqueue.Blockqueuer
bqueue.Blockqueuer[bqueue.Queueable]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need new interface for that. Starting from this commit bqueue.Blockqueuer serves only queue needs.


s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
s.bSyncQueue = bqueue.New[*block.Block](blockAdapter{s.stateSync}, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will have two adapters for statesync module. So it's more like stateSyncBlockQueueAdapter and stateSyncHeaderQueueAdapter for statesync module and chainBlockQueueAdapter.

@@ -282,6 +279,48 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
return s, nil
}

type blockAdapter struct {
bc StateSync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not bc here, it's stateSync.

@@ -282,6 +279,48 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
return s, nil
}

type blockAdapter struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move all adapters to a separate file. Declare a variable to ensure it implements proper interface, like `var _ = (queue.Blockqueuer[*block.Block])(&blockAdapter{})

}

func (a blockAdapter) AddItem(b *block.Block) error {
return a.bc.AddItem(b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Statesync module itself must not implement any Item-related things. That's exactly the purpose of adapters. Use AddBlock method here.

Comment on lines 290 to 296
func (a blockAdapter) AddItems(bs ...*block.Block) error {
for _, b := range bs {
if err := a.bc.AddItem(b); err != nil {
return err
}
}
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must not be supported for blocks, it's only designated for headers. Panic.

}

func (a chainAdapter) AddItem(b *block.Block) error {
return a.chain.AddItem(b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for the blockchain, no AddItem is defined on the blockchain.

}

func (a chainAdapter) Height() uint32 {
return a.chain.Height()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for height. It must be BlockHeight() for blocks adapter and HeaderHeight() for headers adapter.

Comment on lines 1181 to 1182
for _, header := range h.Hdrs {
err := s.stateSync.AddItems(header)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You must use AddItems(h.Hrs...), otherwise it's useless. This operation must be batched.

@@ -9,12 +10,15 @@ import (
// StateSync represents state sync module.
type StateSync interface {
AddMPTNodes([][]byte) error
bqueue.Blockqueuer
bqueue.Blockqueuer[bqueue.Queueable]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for statesync, bqueue.Blockqueuer[bqueue.Queueable] is now queue-related. Create one more interface with specific methods and reuse it for statesync and for Ledger.


for height := range chain.CommitteeHistory {
m = smartcontract.GetDefaultHonestNodeCount(chain.GetNumOfCNs(height))
verification, _ = smartcontract.CreateDefaultMultiSigRedeemScript(vs[:chain.GetNumOfCNs(height)])
Copy link
Contributor Author

@AliceInHunterland AliceInHunterland Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be new vs here? in CommitteeHistory we do not have public keys. or it should be ValidatorsHistory?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really care about particular keys, size estimators doesn't need it.

}
index++ //Won't work properly if neofs.ObjectSearch results are not ordered.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

full get for this mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or headers as an object(s) in the container

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or adapt batchSize to headerSizeMap

Copy link
Member

@AnnaShaleva AnnaShaleva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Process queue feedback first, then update PR and I'll review the rest, there's a lot of things that need to be fixed.

@@ -49,6 +49,10 @@ type FakeStateSync struct {
AddMPTNodesFunc func(nodes [][]byte) error
}

func (s *FakeStateSync) HeaderHeight() uint32 {
return s.HeaderHeight()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is s.HeaderHeight() in this context? You're implementing interface, you can't use methods of this interface in the implementation, it just won't work.

internal/fakechain/fakechain.go Show resolved Hide resolved
@@ -58,7 +59,7 @@ type Ledger interface {

// BlockQueuer is an interface to the block queue manager sufficient for Service.
type BlockQueuer interface {
PutBlock(block *coreb.Block) error
Put(block bqueue.Queueable) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not block anymore. It's something queueable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*coreb.Block can be saved?

Comment on lines 532 to 537
func (bq testBlockQueuer) Put(b bqueue.Queueable) error {
blk, ok := b.(*coreb.Block)
if !ok {
return nil
}
return bq.bc.AddBlock(blk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use unsafe cast here, because nothing else may be used here except the *coreb.Block.

Comment on lines 54 to 60
// GetIndex returns the index of the block.
func (b *Block) GetIndex() uint32 {
return b.Header.Index
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fixed.

s.bQueue = bqueue.New(chain, log, func(b *block.Block) {
s.tryStartServices()
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
s.bQueue = bqueue.New[*block.Block](chainBlockQueueAdapter{chain}, log, func(b bqueue.Queueable) { s.tryStartServices() }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func(b bqueue.Queueable)

It's not. You need to practice with generics and learn the specification. It's func(b *block.Block)

pkg/network/server.go Outdated Show resolved Hide resolved
pkg/network/server.go Outdated Show resolved Hide resolved
pkg/network/server.go Outdated Show resolved Hide resolved
pkg/network/state_sync.go Outdated Show resolved Hide resolved
@@ -51,6 +51,11 @@ type auxBlockIn struct {
Transactions []json.RawMessage `json:"tx"`
}

// GetIndex returns the index of the block.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some additional clarification on why it's needed would be helpful.

}

// Queue is the queue of queueable elements.
type Queue[Q queueable] struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't use non-exported type for an exported thing. Why queueable at all? Queueable should be sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because then i can't use put func(item bqueue.Queueable) as Cannot use type comparable outside a type constraint: interface is (or embeds) comparable. Made it any for blockfetcher.

zap.String("error", err.Error()),
zap.Uint32("blockHeight", bq.chain.BlockHeight()),
zap.Uint32("nextIndex", b.Index))
zap.Uint32("Height", bq.chain.Height()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Height/height/, I doubt we start with capitals anywhere.

@@ -87,6 +87,13 @@ type (
Shutdown()
}

// ledger is a minimal subset of Ledger interface.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ledger is too vague for it. blockHeaderQueuer maybe? At least we could recognize queuer there to match with its real use.

@@ -85,6 +85,16 @@ func (b *Header) GetIndex() uint32 {
return b.Index
}

// GetExpectedHeaderSize returns the expected header size with empty witness.
func (b *Header) GetExpectedHeaderSize() int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's very useful, you rather need some GetExpectedHeaderSize(stateRootInHeader bool, numOfValidators int) that is a function, not a method. stateRootInHeader will be replaced by version after #3566.

pkg/core/blockchain.go Show resolved Hide resolved
return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off")
}
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("NeoFSStateSyncExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is similar to P2PStateExchangeExtensions, shouldn't it be like that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2P state exchange requires nodes to have some old MPTs to answer GetMPTData requests, it won't work if every node is doing KeepOnlyLatestState. Quite the contrary, NeoFS state sync is possible if node is stripping the state completely, we don't need data from other nodes. And this doesn't require any special support, just drop this check.

defer s.lock.Unlock()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

// Start runs the NeoFS BlockFetcher service.
func (bfs *Service) Start() error {
if !bfs.isActive.CompareAndSwap(false, true) {
if !bfs.isActive.CompareAndSwap(false, true) || bfs.IsShutdown() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can't reuse this instance, Start() should return an error. If you can it should just start.


for height := range chain.CommitteeHistory {
m = smartcontract.GetDefaultHonestNodeCount(chain.GetNumOfCNs(height))
verification, _ = smartcontract.CreateDefaultMultiSigRedeemScript(vs[:chain.GetNumOfCNs(height)])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really care about particular keys, size estimators doesn't need it.

The queue can operate with any item that implements proper interface.

Signed-off-by: Ekaterina Pavlova <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement headers fetching via NeoFS BlockFetcher service
3 participants