Skip to content

Commit

Permalink
blockfetcher: add headerSizeMap to GetRange headers accordingly
Browse files Browse the repository at this point in the history
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Feb 4, 2025
1 parent b08166f commit 0165688
Showing 1 changed file with 43 additions and 18 deletions.
61 changes: 43 additions & 18 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (p poolWrapper) Close() error {
return nil
}

type indexedOID struct {
Index int
OID oid.ID
}

// Service is a service that fetches blocks from NeoFS.
type Service struct {
// isActive denotes whether the service is working or in the process of shutdown.
Expand All @@ -71,15 +76,15 @@ type Service struct {
operationMode OperationMode

stateRootInHeader bool
// headerSize is the size of the header in bytes.
headerSize int
// headerSizeMap is a map of height to expected header size.
headerSizeMap map[int]int

chain Ledger
pool poolWrapper
enqueue func(obj any) error
account *wallet.Account

oidsCh chan oid.ID
oidsCh chan indexedOID
// wg is a wait group for block downloaders.
wg sync.WaitGroup

Expand All @@ -97,7 +102,7 @@ type Service struct {
shutdownCallback func()

// Depends on the OperationMode, the following functions are set to the appropriate functions.
getFunc func(ctx context.Context, oid string) (io.ReadCloser, error)
getFunc func(ctx context.Context, oid string, index int) (io.ReadCloser, error)
readFunc func(rc io.ReadCloser) (any, error)
heightFunc func() uint32
}
Expand Down Expand Up @@ -164,7 +169,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
log: logger,
cfg: cfg,
operationMode: opt,
headerSize: getHeaderSize(chain.GetConfig()),
headerSizeMap: getHeaderSizeMap(chain.GetConfig()),

enqueue: put,
account: account,
Expand All @@ -180,12 +185,17 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
// * first full block of OIDs is processing by Downloader
// * second full block of OIDs is available to be fetched by Downloader immediately
// * third half-filled block of OIDs is being collected by OIDsFetcher.
oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize),
oidsCh: make(chan indexedOID, 2*cfg.OIDBatchSize),
}, nil
}

func getHeaderSize(chain config.Blockchain) int {
return block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0))
func getHeaderSizeMap(chain config.Blockchain) map[int]int {
headerSizeMap := make(map[int]int)
headerSizeMap[0] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0))
for height := range chain.CommitteeHistory {
headerSizeMap[int(height)] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(height))
}
return headerSizeMap
}

// Start runs the NeoFS BlockFetcher service.
Expand Down Expand Up @@ -276,11 +286,13 @@ func (bfs *Service) oidDownloader() {
func (bfs *Service) blockDownloader() {
defer bfs.wg.Done()

for blkOid := range bfs.oidsCh {
for indexedOid := range bfs.oidsCh {
index := indexedOid.Index
blkOid := indexedOid.OID
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer cancel()

rc, err := bfs.getFunc(ctx, blkOid.String())
rc, err := bfs.getFunc(ctx, blkOid.String(), index)
if err != nil {
if isContextCanceledErr(err) {
return
Expand Down Expand Up @@ -346,15 +358,15 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {

blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer blockCancel()
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String())
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String(), -1)
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)
}

err = bfs.streamBlockOIDs(oidsRC, int(skip))
err = bfs.streamBlockOIDs(oidsRC, int(startIndex), int(skip))
if err != nil {
if isContextCanceledErr(err) {
return nil
Expand All @@ -369,7 +381,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
}

// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, startIndex, skip int) error {
defer rc.Close()
oidBytes := make([]byte, oid.Size)
oidsProcessed := 0
Expand All @@ -396,7 +408,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oidBlock:
case bfs.oidsCh <- indexedOID{Index: startIndex*int(bfs.cfg.IndexFileSize) + oidsProcessed, OID: oidBlock}:
}

oidsProcessed++
Expand Down Expand Up @@ -441,12 +453,14 @@ func (bfs *Service) fetchOIDsBySearch() error {
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex))
return nil
}
index := int(startIndex)
for _, oid := range blockOids {
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oid:
case bfs.oidsCh <- indexedOID{Index: index, OID: oid}:
}
index++ //Won't work properly if neofs.ObjectSearch results are not ordered.
}
startIndex += batchSize
}
Expand Down Expand Up @@ -580,7 +594,7 @@ func (bfs *Service) retry(action func() error) error {
return err
}

func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
func (bfs *Service) objectGet(ctx context.Context, oid string, index int) (io.ReadCloser, error) {
u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid))
if err != nil {
return nil, err
Expand All @@ -593,8 +607,19 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e
return rc, err
}

func (bfs *Service) objectGetRange(ctx context.Context, oid string) (io.ReadCloser, error) {
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, bfs.headerSize))
func (bfs *Service) objectGetRange(ctx context.Context, oid string, height int) (io.ReadCloser, error) {
nearestHeight := 0
for h := range bfs.headerSizeMap {
if h <= height && h > nearestHeight {
nearestHeight = h
}
if nearestHeight >= height {
break
}
}

size := bfs.headerSizeMap[nearestHeight]
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, size))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 0165688

Please sign in to comment.