diff --git a/p2p/database/database.go b/p2p/database/database.go index 2b7e248f..9f44be00 100644 --- a/p2p/database/database.go +++ b/p2p/database/database.go @@ -18,6 +18,7 @@ type Database interface { WriteBlockHashes(context.Context, *enode.Node, []common.Hash) WriteBlockBody(context.Context, *eth.BlockBody, common.Hash) WriteTransactions(context.Context, *enode.Node, []*types.Transaction) + HasParentBlock(context.Context, common.Hash) bool MaxConcurrentWrites() int ShouldWriteBlocks() bool diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 84731be7..2c16b718 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -214,6 +214,14 @@ func (d *datastoreWrapper) ShouldWriteTransactions() bool { return d.shouldWriteTransactions } +func (d *datastoreWrapper) HasParentBlock(ctx context.Context, hash common.Hash) bool { + key := datastore.NameKey(blocksKind, hash.Hex(), nil) + var block DatastoreBlock + err := d.client.Get(ctx, key, &block) + + return err == nil && block.DatastoreHeader != nil +} + // newDatastoreHeader creates a DatastoreHeader from a types.Header. Some // values are converted into strings to prevent a loss of precision. func newDatastoreHeader(header *types.Header) *DatastoreHeader { diff --git a/p2p/rlpx.go b/p2p/rlpx.go index 64312450..1cdaf30c 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -4,6 +4,7 @@ import ( "container/list" "context" "fmt" + "math/big" "math/rand" "net" "strings" @@ -11,6 +12,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/p2p" @@ -33,9 +35,11 @@ func Dial(n *enode.Node) (*Conn, error) { } conn := Conn{ - Conn: rlpx.NewConn(fd, n.Pubkey()), - node: n, - logger: log.With().Str("peer", n.URLv4()).Logger(), + Conn: rlpx.NewConn(fd, n.Pubkey()), + node: n, + logger: log.With().Str("peer", n.URLv4()).Logger(), + requests: list.New(), + requestNum: 0, } if conn.ourKey, err = crypto.GenerateKey(); err != nil { @@ -148,12 +152,6 @@ type request struct { // ReadAndServe reads messages from peers and writes it to a database. func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error { - // requests is used to store the request ID and the block hash. This is used - // when fetching block bodies because the eth protocol block bodies do not - // contain information about the block hash. - requests := list.New() - var requestNum uint64 = 0 - // dbCh is used to limit the number of database goroutines running at one // time with a buffered channel. Without this, a large influx of messages can // bog down the system and leak memory. @@ -185,6 +183,12 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error { c.logger.Trace().Msgf("Received %v BlockHeaders", len(msg.BlockHeadersPacket)) if db != nil { + for _, header := range msg.BlockHeadersPacket { + if err := c.getParentBlock(ctx, db, header); err != nil { + return err + } + } + dbCh <- struct{}{} go func() { db.WriteBlockHeaders(ctx, msg.BlockHeadersPacket) @@ -207,7 +211,7 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error { c.logger.Trace().Msgf("Received %v BlockBodies", len(msg.BlockBodiesPacket)) var hash *common.Hash - for e := requests.Front(); e != nil; e = e.Next() { + for e := c.requests.Front(); e != nil; e = e.Next() { r, ok := e.Value.(request) if !ok { log.Error().Msg("Request type assertion failed") @@ -216,7 +220,7 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error { if r.requestID == msg.ReqID() { hash = &r.hash - requests.Remove(e) + c.requests.Remove(e) break } } @@ -250,37 +254,12 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error { hashes := make([]common.Hash, 0, len(*msg)) for _, hash := range *msg { hashes = append(hashes, hash.Hash) - - headersRequest := &GetBlockHeaders{ - GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ - // Providing both the hash and number will result in a `both origin - // hash and number` error. - Origin: eth.HashOrNumber{Hash: hash.Hash}, - Amount: 1, - }, - } - - if err := c.Write(headersRequest); err != nil { - c.logger.Error().Err(err).Msg("Failed to write GetBlockHeaders request") - return err - } - - requestNum++ - requests.PushBack(request{ - requestID: requestNum, - hash: hash.Hash, - }) - bodiesRequest := &GetBlockBodies{ - RequestId: requestNum, - GetBlockBodiesPacket: []common.Hash{hash.Hash}, - } - if err := c.Write(bodiesRequest); err != nil { - c.logger.Error().Err(err).Msg("Failed to write GetBlockBodies request") + if err := c.getBlockData(hash.Hash); err != nil { return err } } - if db != nil && db.ShouldWriteBlocks() { + if db != nil && db.ShouldWriteBlocks() && len(hashes) > 0 { dbCh <- struct{}{} go func() { db.WriteBlockHashes(ctx, c.node, hashes) @@ -292,6 +271,10 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error { c.logger.Trace().Str("hash", msg.Block.Hash().Hex()).Msg("Received NewBlock") if db != nil && db.ShouldWriteBlocks() { + if err := c.getParentBlock(ctx, db, msg.Block.Header()); err != nil { + return err + } + dbCh <- struct{}{} go func() { db.WriteBlock(ctx, c.node, msg.Block) @@ -375,3 +358,58 @@ func (c *Conn) processNewPooledTransactionHashes(count *MessageCount, hashes []c return nil } + +// getBlockData will send a GetBlockHeaders and GetBlockBodies request to the +// peer. It will return an error if the sending either of the requests failed. +func (c *Conn) getBlockData(hash common.Hash) error { + headersRequest := &GetBlockHeaders{ + GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ + // Providing both the hash and number will result in a `both origin + // hash and number` error. + Origin: eth.HashOrNumber{Hash: hash}, + Amount: 1, + }, + } + + if err := c.Write(headersRequest); err != nil { + c.logger.Error().Err(err).Msg("Failed to write GetBlockHeaders request") + return err + } + + c.requestNum++ + c.requests.PushBack(request{ + requestID: c.requestNum, + hash: hash, + }) + bodiesRequest := &GetBlockBodies{ + RequestId: c.requestNum, + GetBlockBodiesPacket: []common.Hash{hash}, + } + if err := c.Write(bodiesRequest); err != nil { + c.logger.Error().Err(err).Msg("Failed to write GetBlockBodies request") + return err + } + + return nil +} + +// getParentBlock will send a request to the peer if the parent of the header +// does not exist in the database. +func (c *Conn) getParentBlock(ctx context.Context, db database.Database, header *types.Header) error { + if c.oldestBlock == nil { + c.logger.Info().Interface("block", header).Msg("Setting oldest block") + c.oldestBlock = header + return nil + } + + if !db.HasParentBlock(ctx, header.ParentHash) && header.Number.Cmp(c.oldestBlock.Number) == 1 { + c.logger.Info(). + Str("hash", header.ParentHash.Hex()). + Str("number", new(big.Int).Sub(header.Number, big.NewInt(1)).String()). + Msg("Fetching missing parent block") + + return c.getBlockData(header.ParentHash) + } + + return nil +} diff --git a/p2p/types.go b/p2p/types.go index d454626e..99445c10 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -1,11 +1,13 @@ package p2p import ( + "container/list" "crypto/ecdsa" "fmt" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/p2p" @@ -155,6 +157,16 @@ type Conn struct { caps []p2p.Cap node *enode.Node logger zerolog.Logger + + // requests is used to store the request ID and the block hash. This is used + // when fetching block bodies because the eth protocol block bodies do not + // contain information about the block hash. + requests *list.List + requestNum uint64 + + // oldestBlock stores the first block the sensor has seen so when fetching + // parent blocks, it does not request blocks older than this. + oldestBlock *types.Header } // Read reads an eth66 packet from the connection.