Skip to content

Commit

Permalink
blockfetcher: add getRange for headers processing
Browse files Browse the repository at this point in the history
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 22, 2025
1 parent e62e4d2 commit 2112fe0
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 21 deletions.
5 changes: 5 additions & 0 deletions pkg/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ func (b *Block) GetExpectedBlockSizeWithoutTransactions(txCount int) int {
return size
}

// GetExpectedHeaderSize returns the expected header size with empty witness.
func (b *Block) GetExpectedHeaderSize() int {
return expectedHeaderSizeWithEmptyWitness
}

// ToStackItem converts Block to stackitem.Item.
func (b *Block) ToStackItem() stackitem.Item {
items := []stackitem.Item{
Expand Down
109 changes: 88 additions & 21 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (

"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
Expand Down Expand Up @@ -59,12 +62,12 @@ type Service struct {
log *zap.Logger
cfg config.NeoFSBlockFetcher
stateRootInHeader bool

chain Ledger
pool poolWrapper
enqueueBlock func(*block.Block) error
enqueueHeader func(*block.Header) error
account *wallet.Account
headerSize int
chain Ledger
pool poolWrapper
enqueueBlock func(*block.Block) error
enqueueHeader func(*block.Header) error
account *wallet.Account

oidsCh chan oid.ID
// wg is a wait group for block downloaders.
Expand Down Expand Up @@ -140,6 +143,18 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
if err != nil {
return nil, err
}

getCfg := chain.GetConfig()
m := smartcontract.GetDefaultHonestNodeCount(int(getCfg.ValidatorsCount))
baseHeaderSize := block.New(chain.GetConfig().StateRootInHeader).GetExpectedHeaderSize() - 2
vs, _ := keys.NewPublicKeysFromStrings(getCfg.StandbyCommittee)
verification, _ := smartcontract.CreateDefaultMultiSigRedeemScript(vs[:getCfg.GetNumOfCNs(0)])
defaultWitness := transaction.Witness{
InvocationScript: make([]byte, 66*m),
VerificationScript: verification,
}
w := gio.NewBufBinWriter()
defaultWitness.EncodeBinary(w.BinWriter)
return &Service{
chain: chain,
pool: poolWrapper{Pool: p},
Expand All @@ -150,6 +165,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
enqueueHeader: putHeader,
account: account,
stateRootInHeader: chain.GetConfig().StateRootInHeader,
headerSize: w.Len() + baseHeaderSize,
shutdownCallback: shutdownCallback,

quit: make(chan bool),
Expand Down Expand Up @@ -243,32 +259,59 @@ func (bfs *Service) blockDownloader() {
for blkOid := range bfs.oidsCh {
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer cancel()

rc, err := bfs.objectGet(ctx, blkOid.String())
if err != nil {
if isContextCanceledErr(err) {
var (
h *block.Header
b *block.Block
err error
rc io.ReadCloser
)

if bfs.cfg.BlocksOnly {
rc, err = bfs.objectGet(ctx, blkOid.String())
if err != nil {
if isContextCanceledErr(err) {
return
}
bfs.log.Error("failed to objectGet block", zap.String("oid", blkOid.String()), zap.Error(err))
bfs.stopService(true)
return
}
bfs.log.Error("failed to objectGet block", zap.String("oid", blkOid.String()), zap.Error(err))
bfs.stopService(true)
return
}

b, err := bfs.readBlock(rc)
if err != nil {
if isContextCanceledErr(err) {
b, err = bfs.readBlock(rc)
if err != nil {
if isContextCanceledErr(err) {
return
}
bfs.log.Error("failed to decode block from stream", zap.String("oid", blkOid.String()), zap.Error(err))
bfs.stopService(true)
return
}
bfs.log.Error("failed to decode block from stream", zap.String("oid", blkOid.String()), zap.Error(err))
bfs.stopService(true)
return
} else {
rc, err = bfs.objectGetRange(ctx, blkOid.String(), 0, uint64(bfs.headerSize))
if err != nil {
if isContextCanceledErr(err) {
return
}
bfs.log.Error("failed to objectGetRange block", zap.String("oid", blkOid.String()), zap.Error(err))
bfs.stopService(true)
return
}

h, err = bfs.readHeader(rc)
if err != nil {
if isContextCanceledErr(err) {
return
}
bfs.log.Error("failed to decode block from stream", zap.String("oid", blkOid.String()), zap.Error(err))
bfs.stopService(true)
}
}
select {
case <-bfs.ctx.Done():
return
default:
if !bfs.cfg.BlocksOnly {
err = bfs.enqueueHeader(&b.Header)
err = bfs.enqueueHeader(h)
} else {
err = bfs.enqueueBlock(b)
}
Expand Down Expand Up @@ -433,6 +476,15 @@ func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) {
return b, r.Err
}

// readHeader decodes the header from the read closer and prepares it for adding to the blockchain.
func (bfs *Service) readHeader(rc io.ReadCloser) (*block.Header, error) {
b := block.New(bfs.stateRootInHeader)
r := gio.NewBinReaderFromIO(rc)
b.Header.DecodeBinary(r)
rc.Close()
return &b.Header, r.Err
}

// Shutdown stops the NeoFS BlockFetcher service. It prevents service from new
// block OIDs search, cancels all in-progress downloading operations and waits
// until all service routines finish their work.
Expand Down Expand Up @@ -555,6 +607,21 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e
return rc, err
}

func (bfs *Service) objectGetRange(ctx context.Context, oid string, offset, length uint64) (io.ReadCloser, error) {
rangeParam := fmt.Sprintf("%d|%d", offset, length)
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", rangeParam))
if err != nil {
return nil, err
}

var rc io.ReadCloser
err = bfs.retry(func() error {
rc, err = neofs.GetWithClient(ctx, bfs.pool, bfs.account.PrivateKey(), u, false)
return err
})
return rc, err
}

func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) {
var (
oids []oid.ID
Expand Down

0 comments on commit 2112fe0

Please sign in to comment.