Skip to content

Commit

Permalink
[DVT-789] Fetch missing parent blocks (#74)
Browse files Browse the repository at this point in the history
* add fetch missing parent blocks

* add error handling

* fix missing headers
  • Loading branch information
minhd-vu authored Jun 7, 2023
1 parent 38438d8 commit 19c9659
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 38 deletions.
1 change: 1 addition & 0 deletions p2p/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Database interface {
WriteBlockHashes(context.Context, *enode.Node, []common.Hash)
WriteBlockBody(context.Context, *eth.BlockBody, common.Hash)
WriteTransactions(context.Context, *enode.Node, []*types.Transaction)
HasParentBlock(context.Context, common.Hash) bool

MaxConcurrentWrites() int
ShouldWriteBlocks() bool
Expand Down
8 changes: 8 additions & 0 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ func (d *datastoreWrapper) ShouldWriteTransactions() bool {
return d.shouldWriteTransactions
}

func (d *datastoreWrapper) HasParentBlock(ctx context.Context, hash common.Hash) bool {
key := datastore.NameKey(blocksKind, hash.Hex(), nil)
var block DatastoreBlock
err := d.client.Get(ctx, key, &block)

return err == nil && block.DatastoreHeader != nil
}

// newDatastoreHeader creates a DatastoreHeader from a types.Header. Some
// values are converted into strings to prevent a loss of precision.
func newDatastoreHeader(header *types.Header) *DatastoreHeader {
Expand Down
114 changes: 76 additions & 38 deletions p2p/rlpx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"container/list"
"context"
"fmt"
"math/big"
"math/rand"
"net"
"strings"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/p2p"
Expand All @@ -33,9 +35,11 @@ func Dial(n *enode.Node) (*Conn, error) {
}

conn := Conn{
Conn: rlpx.NewConn(fd, n.Pubkey()),
node: n,
logger: log.With().Str("peer", n.URLv4()).Logger(),
Conn: rlpx.NewConn(fd, n.Pubkey()),
node: n,
logger: log.With().Str("peer", n.URLv4()).Logger(),
requests: list.New(),
requestNum: 0,
}

if conn.ourKey, err = crypto.GenerateKey(); err != nil {
Expand Down Expand Up @@ -148,12 +152,6 @@ type request struct {

// ReadAndServe reads messages from peers and writes it to a database.
func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {
// requests is used to store the request ID and the block hash. This is used
// when fetching block bodies because the eth protocol block bodies do not
// contain information about the block hash.
requests := list.New()
var requestNum uint64 = 0

// dbCh is used to limit the number of database goroutines running at one
// time with a buffered channel. Without this, a large influx of messages can
// bog down the system and leak memory.
Expand Down Expand Up @@ -185,6 +183,12 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {
c.logger.Trace().Msgf("Received %v BlockHeaders", len(msg.BlockHeadersPacket))

if db != nil {
for _, header := range msg.BlockHeadersPacket {
if err := c.getParentBlock(ctx, db, header); err != nil {
return err
}
}

dbCh <- struct{}{}
go func() {
db.WriteBlockHeaders(ctx, msg.BlockHeadersPacket)
Expand All @@ -207,7 +211,7 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {
c.logger.Trace().Msgf("Received %v BlockBodies", len(msg.BlockBodiesPacket))

var hash *common.Hash
for e := requests.Front(); e != nil; e = e.Next() {
for e := c.requests.Front(); e != nil; e = e.Next() {
r, ok := e.Value.(request)
if !ok {
log.Error().Msg("Request type assertion failed")
Expand All @@ -216,7 +220,7 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {

if r.requestID == msg.ReqID() {
hash = &r.hash
requests.Remove(e)
c.requests.Remove(e)
break
}
}
Expand Down Expand Up @@ -250,37 +254,12 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {
hashes := make([]common.Hash, 0, len(*msg))
for _, hash := range *msg {
hashes = append(hashes, hash.Hash)

headersRequest := &GetBlockHeaders{
GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
// Providing both the hash and number will result in a `both origin
// hash and number` error.
Origin: eth.HashOrNumber{Hash: hash.Hash},
Amount: 1,
},
}

if err := c.Write(headersRequest); err != nil {
c.logger.Error().Err(err).Msg("Failed to write GetBlockHeaders request")
return err
}

requestNum++
requests.PushBack(request{
requestID: requestNum,
hash: hash.Hash,
})
bodiesRequest := &GetBlockBodies{
RequestId: requestNum,
GetBlockBodiesPacket: []common.Hash{hash.Hash},
}
if err := c.Write(bodiesRequest); err != nil {
c.logger.Error().Err(err).Msg("Failed to write GetBlockBodies request")
if err := c.getBlockData(hash.Hash); err != nil {
return err
}
}

if db != nil && db.ShouldWriteBlocks() {
if db != nil && db.ShouldWriteBlocks() && len(hashes) > 0 {
dbCh <- struct{}{}
go func() {
db.WriteBlockHashes(ctx, c.node, hashes)
Expand All @@ -292,6 +271,10 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {
c.logger.Trace().Str("hash", msg.Block.Hash().Hex()).Msg("Received NewBlock")

if db != nil && db.ShouldWriteBlocks() {
if err := c.getParentBlock(ctx, db, msg.Block.Header()); err != nil {
return err
}

dbCh <- struct{}{}
go func() {
db.WriteBlock(ctx, c.node, msg.Block)
Expand Down Expand Up @@ -375,3 +358,58 @@ func (c *Conn) processNewPooledTransactionHashes(count *MessageCount, hashes []c

return nil
}

// getBlockData will send a GetBlockHeaders and GetBlockBodies request to the
// peer. It will return an error if the sending either of the requests failed.
func (c *Conn) getBlockData(hash common.Hash) error {
headersRequest := &GetBlockHeaders{
GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
// Providing both the hash and number will result in a `both origin
// hash and number` error.
Origin: eth.HashOrNumber{Hash: hash},
Amount: 1,
},
}

if err := c.Write(headersRequest); err != nil {
c.logger.Error().Err(err).Msg("Failed to write GetBlockHeaders request")
return err
}

c.requestNum++
c.requests.PushBack(request{
requestID: c.requestNum,
hash: hash,
})
bodiesRequest := &GetBlockBodies{
RequestId: c.requestNum,
GetBlockBodiesPacket: []common.Hash{hash},
}
if err := c.Write(bodiesRequest); err != nil {
c.logger.Error().Err(err).Msg("Failed to write GetBlockBodies request")
return err
}

return nil
}

// getParentBlock will send a request to the peer if the parent of the header
// does not exist in the database.
func (c *Conn) getParentBlock(ctx context.Context, db database.Database, header *types.Header) error {
if c.oldestBlock == nil {
c.logger.Info().Interface("block", header).Msg("Setting oldest block")
c.oldestBlock = header
return nil
}

if !db.HasParentBlock(ctx, header.ParentHash) && header.Number.Cmp(c.oldestBlock.Number) == 1 {
c.logger.Info().
Str("hash", header.ParentHash.Hex()).
Str("number", new(big.Int).Sub(header.Number, big.NewInt(1)).String()).
Msg("Fetching missing parent block")

return c.getBlockData(header.ParentHash)
}

return nil
}
12 changes: 12 additions & 0 deletions p2p/types.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package p2p

import (
"container/list"
"crypto/ecdsa"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -155,6 +157,16 @@ type Conn struct {
caps []p2p.Cap
node *enode.Node
logger zerolog.Logger

// requests is used to store the request ID and the block hash. This is used
// when fetching block bodies because the eth protocol block bodies do not
// contain information about the block hash.
requests *list.List
requestNum uint64

// oldestBlock stores the first block the sensor has seen so when fetching
// parent blocks, it does not request blocks older than this.
oldestBlock *types.Header
}

// Read reads an eth66 packet from the connection.
Expand Down

0 comments on commit 19c9659

Please sign in to comment.