diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index 0b712e69e..bd80df8a8 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -18,7 +18,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
- go-version: 1.19
+ go-version: 1.21.6
- name: Checkout code
uses: actions/checkout@v2
diff --git a/core/vm/interface.go b/core/vm/interface.go
index 6ce7ca4be..9b82449b1 100644
--- a/core/vm/interface.go
+++ b/core/vm/interface.go
@@ -79,7 +79,6 @@ type StateDB interface {
AddLog(*types.Log)
AddPreimage(common.Hash, []byte)
- ForEachStorage(common.Address, func(common.Hash, common.Hash) bool) error
GetStateSpecimen() *types.StateSpecimen
}
diff --git a/internal/ethapi/multicall_api.go b/internal/ethapi/multicall_api.go
index 82bdc9bbc..530d76ba0 100644
--- a/internal/ethapi/multicall_api.go
+++ b/internal/ethapi/multicall_api.go
@@ -59,15 +59,7 @@ func (s *BlockChainAPI) Multicall(ctx context.Context, commonCallArgs Transactio
// get a new instance of the EVM to be used once
// ethapi's vmError callback always returns nil, so it is dropped here
//GetEVM(ctx context.Context, msg *core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config, blockCtx *vm.BlockContext) (*vm.EVM, func() error)
- evm, getVmErr := s.b.GetEVM(ctx, msg, state, header, nil, nil)
- vmErr := getVmErr()
-
- if vmErr != nil {
- // if we cannot retrieve the an EVM for any message, that failure
- // implies a fault in the node as a whole, so we should give up on
- // processing the entire request
- return nil, vmErr
- }
+ evm := s.b.GetEVM(ctx, msg, state, header, nil, nil)
execResult, applyMsgErr := core.ApplyMessage(evm, msg, gp)
diff --git a/les/api_backend.go b/les/api_backend.go
deleted file mode 100644
index 8d70695e4..000000000
--- a/les/api_backend.go
+++ /dev/null
@@ -1,347 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package les
-
-import (
- "context"
- "errors"
- "math/big"
- "time"
-
- "github.com/ethereum/go-ethereum"
- "github.com/ethereum/go-ethereum/accounts"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/consensus"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/bloombits"
- "github.com/ethereum/go-ethereum/core/rawdb"
- "github.com/ethereum/go-ethereum/core/state"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/core/vm"
- "github.com/ethereum/go-ethereum/eth/gasprice"
- "github.com/ethereum/go-ethereum/eth/tracers"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/params"
- "github.com/ethereum/go-ethereum/rpc"
-)
-
-type LesApiBackend struct {
- extRPCEnabled bool
- allowUnprotectedTxs bool
- eth *LightEthereum
- gpo *gasprice.Oracle
-}
-
-func (b *LesApiBackend) ChainConfig() *params.ChainConfig {
- return b.eth.chainConfig
-}
-
-func (b *LesApiBackend) CurrentBlock() *types.Header {
- return b.eth.BlockChain().CurrentHeader()
-}
-
-func (b *LesApiBackend) SetHead(number uint64) {
- b.eth.blockchain.SetHead(number)
-}
-
-func (b *LesApiBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) {
- // Return the latest current as the pending one since there
- // is no pending notion in the light client. TODO(rjl493456442)
- // unify the behavior of `HeaderByNumber` and `PendingBlockAndReceipts`.
- if number == rpc.PendingBlockNumber {
- return b.eth.blockchain.CurrentHeader(), nil
- }
- if number == rpc.LatestBlockNumber {
- return b.eth.blockchain.CurrentHeader(), nil
- }
- return b.eth.blockchain.GetHeaderByNumberOdr(ctx, uint64(number))
-}
-
-func (b *LesApiBackend) HeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Header, error) {
- if blockNr, ok := blockNrOrHash.Number(); ok {
- return b.HeaderByNumber(ctx, blockNr)
- }
- if hash, ok := blockNrOrHash.Hash(); ok {
- header, err := b.HeaderByHash(ctx, hash)
- if err != nil {
- return nil, err
- }
- if header == nil {
- return nil, errors.New("header for hash not found")
- }
- if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(header.Number.Uint64()) != hash {
- return nil, errors.New("hash is not currently canonical")
- }
- return header, nil
- }
- return nil, errors.New("invalid arguments; neither block nor hash specified")
-}
-
-func (b *LesApiBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
- return b.eth.blockchain.GetHeaderByHash(hash), nil
-}
-
-func (b *LesApiBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) {
- header, err := b.HeaderByNumber(ctx, number)
- if header == nil || err != nil {
- return nil, err
- }
- return b.BlockByHash(ctx, header.Hash())
-}
-
-func (b *LesApiBackend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
- return b.eth.blockchain.GetBlockByHash(ctx, hash)
-}
-
-func (b *LesApiBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) {
- if blockNr, ok := blockNrOrHash.Number(); ok {
- return b.BlockByNumber(ctx, blockNr)
- }
- if hash, ok := blockNrOrHash.Hash(); ok {
- block, err := b.BlockByHash(ctx, hash)
- if err != nil {
- return nil, err
- }
- if block == nil {
- return nil, errors.New("header found, but block body is missing")
- }
- if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(block.NumberU64()) != hash {
- return nil, errors.New("hash is not currently canonical")
- }
- return block, nil
- }
- return nil, errors.New("invalid arguments; neither block nor hash specified")
-}
-
-func (b *LesApiBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
- return light.GetBody(ctx, b.eth.odr, hash, uint64(number))
-}
-
-func (b *LesApiBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
- return nil, nil
-}
-
-func (b *LesApiBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) {
- header, err := b.HeaderByNumber(ctx, number)
- if err != nil {
- return nil, nil, err
- }
- if header == nil {
- return nil, nil, errors.New("header not found")
- }
- return light.NewState(ctx, header, b.eth.odr), header, nil
-}
-
-func (b *LesApiBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) {
- if blockNr, ok := blockNrOrHash.Number(); ok {
- return b.StateAndHeaderByNumber(ctx, blockNr)
- }
- if hash, ok := blockNrOrHash.Hash(); ok {
- header := b.eth.blockchain.GetHeaderByHash(hash)
- if header == nil {
- return nil, nil, errors.New("header for hash not found")
- }
- if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(header.Number.Uint64()) != hash {
- return nil, nil, errors.New("hash is not currently canonical")
- }
- return light.NewState(ctx, header, b.eth.odr), header, nil
- }
- return nil, nil, errors.New("invalid arguments; neither block nor hash specified")
-}
-
-func (b *LesApiBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
- if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil {
- return light.GetBlockReceipts(ctx, b.eth.odr, hash, *number)
- }
- return nil, nil
-}
-
-func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
- return light.GetBlockLogs(ctx, b.eth.odr, hash, number)
-}
-
-func (b *LesApiBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {
- if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil {
- return b.eth.blockchain.GetTdOdr(ctx, hash, *number)
- }
- return nil
-}
-
-func (b *LesApiBackend) GetEVM(ctx context.Context, msg *core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config, blockCtx *vm.BlockContext) (*vm.EVM, func() error) {
- if vmConfig == nil {
- vmConfig = new(vm.Config)
- }
- txContext := core.NewEVMTxContext(msg)
- context := core.NewEVMBlockContext(header, b.eth.blockchain, nil)
- if blockCtx != nil {
- context = *blockCtx
- }
- return vm.NewEVM(context, txContext, state, b.eth.chainConfig, *vmConfig), state.Error
-}
-
-func (b *LesApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
- return b.eth.txPool.Add(ctx, signedTx)
-}
-
-func (b *LesApiBackend) RemoveTx(txHash common.Hash) {
- b.eth.txPool.RemoveTx(txHash)
-}
-
-func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) {
- return b.eth.txPool.GetTransactions()
-}
-
-func (b *LesApiBackend) GetPoolTransaction(txHash common.Hash) *types.Transaction {
- return b.eth.txPool.GetTransaction(txHash)
-}
-
-func (b *LesApiBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
- return light.GetTransaction(ctx, b.eth.odr, txHash)
-}
-
-func (b *LesApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
- return b.eth.txPool.GetNonce(ctx, addr)
-}
-
-func (b *LesApiBackend) Stats() (pending int, queued int) {
- return b.eth.txPool.Stats(), 0
-}
-
-func (b *LesApiBackend) TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
- return b.eth.txPool.Content()
-}
-
-func (b *LesApiBackend) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
- return b.eth.txPool.ContentFrom(addr)
-}
-
-func (b *LesApiBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
- return b.eth.txPool.SubscribeNewTxsEvent(ch)
-}
-
-func (b *LesApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
- return b.eth.blockchain.SubscribeChainEvent(ch)
-}
-
-func (b *LesApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
- return b.eth.blockchain.SubscribeChainHeadEvent(ch)
-}
-
-func (b *LesApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
- return b.eth.blockchain.SubscribeChainSideEvent(ch)
-}
-
-func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
- return b.eth.blockchain.SubscribeLogsEvent(ch)
-}
-
-func (b *LesApiBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
- return event.NewSubscription(func(quit <-chan struct{}) error {
- <-quit
- return nil
- })
-}
-
-func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
- return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
-}
-
-func (b *LesApiBackend) SyncProgress() ethereum.SyncProgress {
- return ethereum.SyncProgress{}
-}
-
-func (b *LesApiBackend) ProtocolVersion() int {
- return b.eth.LesVersion() + 10000
-}
-
-func (b *LesApiBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
- return b.gpo.SuggestTipCap(ctx)
-}
-
-func (b *LesApiBackend) FeeHistory(ctx context.Context, blockCount uint64, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (firstBlock *big.Int, reward [][]*big.Int, baseFee []*big.Int, gasUsedRatio []float64, err error) {
- return b.gpo.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles)
-}
-
-func (b *LesApiBackend) ChainDb() ethdb.Database {
- return b.eth.chainDb
-}
-
-func (b *LesApiBackend) AccountManager() *accounts.Manager {
- return b.eth.accountManager
-}
-
-func (b *LesApiBackend) ExtRPCEnabled() bool {
- return b.extRPCEnabled
-}
-
-func (b *LesApiBackend) UnprotectedAllowed() bool {
- return b.allowUnprotectedTxs
-}
-
-func (b *LesApiBackend) RPCGasCap() uint64 {
- return b.eth.config.RPCGasCap
-}
-
-func (b *LesApiBackend) RPCEVMTimeout() time.Duration {
- return b.eth.config.RPCEVMTimeout
-}
-
-func (b *LesApiBackend) RPCTxFeeCap() float64 {
- return b.eth.config.RPCTxFeeCap
-}
-
-func (b *LesApiBackend) BloomStatus() (uint64, uint64) {
- if b.eth.bloomIndexer == nil {
- return 0, 0
- }
- sections, _, _ := b.eth.bloomIndexer.Sections()
- return params.BloomBitsBlocksClient, sections
-}
-
-func (b *LesApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
- for i := 0; i < bloomFilterThreads; i++ {
- go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests)
- }
-}
-
-func (b *LesApiBackend) Engine() consensus.Engine {
- return b.eth.engine
-}
-
-func (b *LesApiBackend) CurrentHeader() *types.Header {
- return b.eth.blockchain.CurrentHeader()
-}
-
-func (b *LesApiBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) {
- return b.eth.stateAtBlock(ctx, block, reexec)
-}
-
-func (b *LesApiBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (*core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) {
- return b.eth.stateAtTransaction(ctx, block, txIndex, reexec)
-}
-
-func (b *LesApiBackend) SetHistoricalBlocksSynced() bool {
- if b.SyncProgress().CurrentBlock < b.SyncProgress().HighestBlock {
- log.Crit("Light Client not fully synced yet, block specimen producer not supported in light client mode")
- } else {
- log.Crit("Fully Synced but block specimen producer not supported in light client mode")
- }
-
- return false
-}
diff --git a/les/servingqueue.go b/les/servingqueue.go
deleted file mode 100644
index 68cad9cb5..000000000
--- a/les/servingqueue.go
+++ /dev/null
@@ -1,364 +0,0 @@
-// Copyright 2019 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package les
-
-import (
- "sync"
- "sync/atomic"
-
- "github.com/ethereum/go-ethereum/common/mclock"
- "github.com/ethereum/go-ethereum/common/prque"
- "golang.org/x/exp/slices"
-)
-
-// servingQueue allows running tasks in a limited number of threads and puts the
-// waiting tasks in a priority queue
-type servingQueue struct {
- recentTime, queuedTime, servingTimeDiff uint64
- burstLimit, burstDropLimit uint64
- burstDecRate float64
- lastUpdate mclock.AbsTime
-
- queueAddCh, queueBestCh chan *servingTask
- stopThreadCh, quit chan struct{}
- setThreadsCh chan int
-
- wg sync.WaitGroup
- threadCount int // number of currently running threads
- queue *prque.Prque[int64, *servingTask] // priority queue for waiting or suspended tasks
- best *servingTask // the highest priority task (not included in the queue)
- suspendBias int64 // priority bias against suspending an already running task
-}
-
-// servingTask represents a request serving task. Tasks can be implemented to
-// run in multiple steps, allowing the serving queue to suspend execution between
-// steps if higher priority tasks are entered. The creator of the task should
-// set the following fields:
-//
-// - priority: greater value means higher priority; values can wrap around the int64 range
-// - run: execute a single step; return true if finished
-// - after: executed after run finishes or returns an error, receives the total serving time
-type servingTask struct {
- sq *servingQueue
- servingTime, timeAdded, maxTime, expTime uint64
- peer *clientPeer
- priority int64
- biasAdded bool
- token runToken
- tokenCh chan runToken
-}
-
-// runToken received by servingTask.start allows the task to run. Closing the
-// channel by servingTask.stop signals the thread controller to allow a new task
-// to start running.
-type runToken chan struct{}
-
-// start blocks until the task can start and returns true if it is allowed to run.
-// Returning false means that the task should be cancelled.
-func (t *servingTask) start() bool {
- if t.peer.isFrozen() {
- return false
- }
- t.tokenCh = make(chan runToken, 1)
- select {
- case t.sq.queueAddCh <- t:
- case <-t.sq.quit:
- return false
- }
- select {
- case t.token = <-t.tokenCh:
- case <-t.sq.quit:
- return false
- }
- if t.token == nil {
- return false
- }
- t.servingTime -= uint64(mclock.Now())
- return true
-}
-
-// done signals the thread controller about the task being finished and returns
-// the total serving time of the task in nanoseconds.
-func (t *servingTask) done() uint64 {
- t.servingTime += uint64(mclock.Now())
- close(t.token)
- diff := t.servingTime - t.timeAdded
- t.timeAdded = t.servingTime
- if t.expTime > diff {
- t.expTime -= diff
- atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime)
- } else {
- t.expTime = 0
- }
- return t.servingTime
-}
-
-// waitOrStop can be called during the execution of the task. It blocks if there
-// is a higher priority task waiting (a bias is applied in favor of the currently
-// running task). Returning true means that the execution can be resumed. False
-// means the task should be cancelled.
-func (t *servingTask) waitOrStop() bool {
- t.done()
- if !t.biasAdded {
- t.priority += t.sq.suspendBias
- t.biasAdded = true
- }
- return t.start()
-}
-
-// newServingQueue returns a new servingQueue
-func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue {
- sq := &servingQueue{
- queue: prque.New[int64, *servingTask](nil),
- suspendBias: suspendBias,
- queueAddCh: make(chan *servingTask, 100),
- queueBestCh: make(chan *servingTask),
- stopThreadCh: make(chan struct{}),
- quit: make(chan struct{}),
- setThreadsCh: make(chan int, 10),
- burstLimit: uint64(utilTarget * bufLimitRatio * 1200000),
- burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000),
- burstDecRate: utilTarget,
- lastUpdate: mclock.Now(),
- }
- sq.wg.Add(2)
- go sq.queueLoop()
- go sq.threadCountLoop()
- return sq
-}
-
-// newTask creates a new task with the given priority
-func (sq *servingQueue) newTask(peer *clientPeer, maxTime uint64, priority int64) *servingTask {
- return &servingTask{
- sq: sq,
- peer: peer,
- maxTime: maxTime,
- expTime: maxTime,
- priority: priority,
- }
-}
-
-// threadController is started in multiple goroutines and controls the execution
-// of tasks. The number of active thread controllers equals the allowed number of
-// concurrently running threads. It tries to fetch the highest priority queued
-// task first. If there are no queued tasks waiting then it can directly catch
-// run tokens from the token channel and allow the corresponding tasks to run
-// without entering the priority queue.
-func (sq *servingQueue) threadController() {
- defer sq.wg.Done()
- for {
- token := make(runToken)
- select {
- case best := <-sq.queueBestCh:
- best.tokenCh <- token
- case <-sq.stopThreadCh:
- return
- case <-sq.quit:
- return
- }
- select {
- case <-sq.stopThreadCh:
- return
- case <-sq.quit:
- return
- case <-token:
- }
- }
-}
-
-// peerTasks lists the tasks received from a given peer when selecting peers to freeze
-type peerTasks struct {
- peer *clientPeer
- list []*servingTask
- sumTime uint64
- priority float64
-}
-
-// freezePeers selects the peers with the worst priority queued tasks and freezes
-// them until burstTime goes under burstDropLimit or all peers are frozen
-func (sq *servingQueue) freezePeers() {
- peerMap := make(map[*clientPeer]*peerTasks)
- var peerList []*peerTasks
- if sq.best != nil {
- sq.queue.Push(sq.best, sq.best.priority)
- }
- sq.best = nil
- for sq.queue.Size() > 0 {
- task := sq.queue.PopItem()
- tasks := peerMap[task.peer]
- if tasks == nil {
- bufValue, bufLimit := task.peer.fcClient.BufferStatus()
- if bufLimit < 1 {
- bufLimit = 1
- }
- tasks = &peerTasks{
- peer: task.peer,
- priority: float64(bufValue) / float64(bufLimit), // lower value comes first
- }
- peerMap[task.peer] = tasks
- peerList = append(peerList, tasks)
- }
- tasks.list = append(tasks.list, task)
- tasks.sumTime += task.expTime
- }
- slices.SortFunc(peerList, func(a, b *peerTasks) int {
- if a.priority < b.priority {
- return -1
- }
- if a.priority > b.priority {
- return 1
- }
- return 0
- })
- drop := true
- for _, tasks := range peerList {
- if drop {
- tasks.peer.freeze()
- tasks.peer.fcClient.Freeze()
- sq.queuedTime -= tasks.sumTime
- sqQueuedGauge.Update(int64(sq.queuedTime))
- clientFreezeMeter.Mark(1)
- drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit
- for _, task := range tasks.list {
- task.tokenCh <- nil
- }
- } else {
- for _, task := range tasks.list {
- sq.queue.Push(task, task.priority)
- }
- }
- }
- if sq.queue.Size() > 0 {
- sq.best = sq.queue.PopItem()
- }
-}
-
-// updateRecentTime recalculates the recent serving time value
-func (sq *servingQueue) updateRecentTime() {
- subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0)
- now := mclock.Now()
- dt := now - sq.lastUpdate
- sq.lastUpdate = now
- if dt > 0 {
- subTime += uint64(float64(dt) * sq.burstDecRate)
- }
- if sq.recentTime > subTime {
- sq.recentTime -= subTime
- } else {
- sq.recentTime = 0
- }
-}
-
-// addTask inserts a task into the priority queue
-func (sq *servingQueue) addTask(task *servingTask) {
- if sq.best == nil {
- sq.best = task
- } else if task.priority-sq.best.priority > 0 {
- sq.queue.Push(sq.best, sq.best.priority)
- sq.best = task
- } else {
- sq.queue.Push(task, task.priority)
- }
- sq.updateRecentTime()
- sq.queuedTime += task.expTime
- sqServedGauge.Update(int64(sq.recentTime))
- sqQueuedGauge.Update(int64(sq.queuedTime))
- if sq.recentTime+sq.queuedTime > sq.burstLimit {
- sq.freezePeers()
- }
-}
-
-// queueLoop is an event loop running in a goroutine. It receives tasks from queueAddCh
-// and always tries to send the highest priority task to queueBestCh. Successfully sent
-// tasks are removed from the queue.
-func (sq *servingQueue) queueLoop() {
- defer sq.wg.Done()
- for {
- if sq.best != nil {
- expTime := sq.best.expTime
- select {
- case task := <-sq.queueAddCh:
- sq.addTask(task)
- case sq.queueBestCh <- sq.best:
- sq.updateRecentTime()
- sq.queuedTime -= expTime
- sq.recentTime += expTime
- sqServedGauge.Update(int64(sq.recentTime))
- sqQueuedGauge.Update(int64(sq.queuedTime))
- if sq.queue.Size() == 0 {
- sq.best = nil
- } else {
- sq.best = sq.queue.PopItem()
- }
- case <-sq.quit:
- return
- }
- } else {
- select {
- case task := <-sq.queueAddCh:
- sq.addTask(task)
- case <-sq.quit:
- return
- }
- }
- }
-}
-
-// threadCountLoop is an event loop running in a goroutine. It adjusts the number
-// of active thread controller goroutines.
-func (sq *servingQueue) threadCountLoop() {
- var threadCountTarget int
- defer sq.wg.Done()
- for {
- for threadCountTarget > sq.threadCount {
- sq.wg.Add(1)
- go sq.threadController()
- sq.threadCount++
- }
- if threadCountTarget < sq.threadCount {
- select {
- case threadCountTarget = <-sq.setThreadsCh:
- case sq.stopThreadCh <- struct{}{}:
- sq.threadCount--
- case <-sq.quit:
- return
- }
- } else {
- select {
- case threadCountTarget = <-sq.setThreadsCh:
- case <-sq.quit:
- return
- }
- }
- }
-}
-
-// setThreads sets the allowed processing thread count, suspending tasks as soon as
-// possible if necessary.
-func (sq *servingQueue) setThreads(threadCount int) {
- select {
- case sq.setThreadsCh <- threadCount:
- case <-sq.quit:
- return
- }
-}
-
-// stop stops task processing as soon as possible and shuts down the serving queue.
-func (sq *servingQueue) stop() {
- close(sq.quit)
- sq.wg.Wait()
-}
diff --git a/les/utils/limiter.go b/les/utils/limiter.go
deleted file mode 100644
index 70b7ff64f..000000000
--- a/les/utils/limiter.go
+++ /dev/null
@@ -1,398 +0,0 @@
-// Copyright 2021 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package utils
-
-import (
- "sync"
-
- "github.com/ethereum/go-ethereum/p2p/enode"
- "golang.org/x/exp/slices"
-)
-
-const maxSelectionWeight = 1000000000 // maximum selection weight of each individual node/address group
-
-// Limiter protects a network request serving mechanism from denial-of-service attacks.
-// It limits the total amount of resources used for serving requests while ensuring that
-// the most valuable connections always have a reasonable chance of being served.
-type Limiter struct {
- lock sync.Mutex
- cond *sync.Cond
- quit bool
-
- nodes map[enode.ID]*nodeQueue
- addresses map[string]*addressGroup
- addressSelect, valueSelect *WeightedRandomSelect
- maxValue float64
- maxCost, sumCost, sumCostLimit uint
- selectAddressNext bool
-}
-
-// nodeQueue represents queued requests coming from a single node ID
-type nodeQueue struct {
- queue []request // always nil if penaltyCost != 0
- id enode.ID
- address string
- value float64
- flatWeight, valueWeight uint64 // current selection weights in the address/value selectors
- sumCost uint // summed cost of requests queued by the node
- penaltyCost uint // cumulative cost of dropped requests since last processed request
- groupIndex int
-}
-
-// addressGroup is a group of node IDs that have sent their last requests from the same
-// network address
-type addressGroup struct {
- nodes []*nodeQueue
- nodeSelect *WeightedRandomSelect
- sumFlatWeight, groupWeight uint64
-}
-
-// request represents an incoming request scheduled for processing
-type request struct {
- process chan chan struct{}
- cost uint
-}
-
-// flatWeight distributes weights equally between each active network address
-func flatWeight(item interface{}) uint64 { return item.(*nodeQueue).flatWeight }
-
-// add adds the node queue to the address group. It is the caller's responsibility to
-// add the address group to the address map and the address selector if it wasn't
-// there before.
-func (ag *addressGroup) add(nq *nodeQueue) {
- if nq.groupIndex != -1 {
- panic("added node queue is already in an address group")
- }
- l := len(ag.nodes)
- nq.groupIndex = l
- ag.nodes = append(ag.nodes, nq)
- ag.sumFlatWeight += nq.flatWeight
- ag.groupWeight = ag.sumFlatWeight / uint64(l+1)
- ag.nodeSelect.Update(ag.nodes[l])
-}
-
-// update updates the selection weight of the node queue inside the address group.
-// It is the caller's responsibility to update the group's selection weight in the
-// address selector.
-func (ag *addressGroup) update(nq *nodeQueue, weight uint64) {
- if nq.groupIndex == -1 || nq.groupIndex >= len(ag.nodes) || ag.nodes[nq.groupIndex] != nq {
- panic("updated node queue is not in this address group")
- }
- ag.sumFlatWeight += weight - nq.flatWeight
- nq.flatWeight = weight
- ag.groupWeight = ag.sumFlatWeight / uint64(len(ag.nodes))
- ag.nodeSelect.Update(nq)
-}
-
-// remove removes the node queue from the address group. It is the caller's responsibility
-// to remove the address group from the address map if it is empty.
-func (ag *addressGroup) remove(nq *nodeQueue) {
- if nq.groupIndex == -1 || nq.groupIndex >= len(ag.nodes) || ag.nodes[nq.groupIndex] != nq {
- panic("removed node queue is not in this address group")
- }
-
- l := len(ag.nodes) - 1
- if nq.groupIndex != l {
- ag.nodes[nq.groupIndex] = ag.nodes[l]
- ag.nodes[nq.groupIndex].groupIndex = nq.groupIndex
- }
- nq.groupIndex = -1
- ag.nodes = ag.nodes[:l]
- ag.sumFlatWeight -= nq.flatWeight
- if l >= 1 {
- ag.groupWeight = ag.sumFlatWeight / uint64(l)
- } else {
- ag.groupWeight = 0
- }
- ag.nodeSelect.Remove(nq)
-}
-
-// choose selects one of the node queues belonging to the address group
-func (ag *addressGroup) choose() *nodeQueue {
- return ag.nodeSelect.Choose().(*nodeQueue)
-}
-
-// NewLimiter creates a new Limiter
-func NewLimiter(sumCostLimit uint) *Limiter {
- l := &Limiter{
- addressSelect: NewWeightedRandomSelect(func(item interface{}) uint64 { return item.(*addressGroup).groupWeight }),
- valueSelect: NewWeightedRandomSelect(func(item interface{}) uint64 { return item.(*nodeQueue).valueWeight }),
- nodes: make(map[enode.ID]*nodeQueue),
- addresses: make(map[string]*addressGroup),
- sumCostLimit: sumCostLimit,
- }
- l.cond = sync.NewCond(&l.lock)
- go l.processLoop()
- return l
-}
-
-// selectionWeights calculates the selection weights of a node for both the address and
-// the value selector. The selection weight depends on the next request cost or the
-// summed cost of recently dropped requests.
-func (l *Limiter) selectionWeights(reqCost uint, value float64) (flatWeight, valueWeight uint64) {
- if value > l.maxValue {
- l.maxValue = value
- }
- if value > 0 {
- // normalize value to <= 1
- value /= l.maxValue
- }
- if reqCost > l.maxCost {
- l.maxCost = reqCost
- }
- relCost := float64(reqCost) / float64(l.maxCost)
- var f float64
- if relCost <= 0.001 {
- f = 1
- } else {
- f = 0.001 / relCost
- }
- f *= maxSelectionWeight
- flatWeight, valueWeight = uint64(f), uint64(f*value)
- if flatWeight == 0 {
- flatWeight = 1
- }
- return
-}
-
-// Add adds a new request to the node queue belonging to the given id. Value belongs
-// to the requesting node. A higher value gives the request a higher chance of being
-// served quickly in case of heavy load or a DDoS attack. Cost is a rough estimate
-// of the serving cost of the request. A lower cost also gives the request a
-// better chance.
-func (l *Limiter) Add(id enode.ID, address string, value float64, reqCost uint) chan chan struct{} {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- process := make(chan chan struct{}, 1)
- if l.quit {
- close(process)
- return process
- }
- if reqCost == 0 {
- reqCost = 1
- }
- if nq, ok := l.nodes[id]; ok {
- if nq.queue != nil {
- nq.queue = append(nq.queue, request{process, reqCost})
- nq.sumCost += reqCost
- nq.value = value
- if address != nq.address {
- // known id sending request from a new address, move to different address group
- l.removeFromGroup(nq)
- l.addToGroup(nq, address)
- }
- } else {
- // already waiting on a penalty, just add to the penalty cost and drop the request
- nq.penaltyCost += reqCost
- l.update(nq)
- close(process)
- return process
- }
- } else {
- nq := &nodeQueue{
- queue: []request{{process, reqCost}},
- id: id,
- value: value,
- sumCost: reqCost,
- groupIndex: -1,
- }
- nq.flatWeight, nq.valueWeight = l.selectionWeights(reqCost, value)
- if len(l.nodes) == 0 {
- l.cond.Signal()
- }
- l.nodes[id] = nq
- if nq.valueWeight != 0 {
- l.valueSelect.Update(nq)
- }
- l.addToGroup(nq, address)
- }
- l.sumCost += reqCost
- if l.sumCost > l.sumCostLimit {
- l.dropRequests()
- }
- return process
-}
-
-// update updates the selection weights of the node queue
-func (l *Limiter) update(nq *nodeQueue) {
- var cost uint
- if nq.queue != nil {
- cost = nq.queue[0].cost
- } else {
- cost = nq.penaltyCost
- }
- flatWeight, valueWeight := l.selectionWeights(cost, nq.value)
- ag := l.addresses[nq.address]
- ag.update(nq, flatWeight)
- l.addressSelect.Update(ag)
- nq.valueWeight = valueWeight
- l.valueSelect.Update(nq)
-}
-
-// addToGroup adds the node queue to the given address group. The group is created if
-// it does not exist yet.
-func (l *Limiter) addToGroup(nq *nodeQueue, address string) {
- nq.address = address
- ag := l.addresses[address]
- if ag == nil {
- ag = &addressGroup{nodeSelect: NewWeightedRandomSelect(flatWeight)}
- l.addresses[address] = ag
- }
- ag.add(nq)
- l.addressSelect.Update(ag)
-}
-
-// removeFromGroup removes the node queue from its address group
-func (l *Limiter) removeFromGroup(nq *nodeQueue) {
- ag := l.addresses[nq.address]
- ag.remove(nq)
- if len(ag.nodes) == 0 {
- delete(l.addresses, nq.address)
- }
- l.addressSelect.Update(ag)
-}
-
-// remove removes the node queue from its address group, the nodes map and the value
-// selector
-func (l *Limiter) remove(nq *nodeQueue) {
- l.removeFromGroup(nq)
- if nq.valueWeight != 0 {
- l.valueSelect.Remove(nq)
- }
- delete(l.nodes, nq.id)
-}
-
-// choose selects the next node queue to process.
-func (l *Limiter) choose() *nodeQueue {
- if l.valueSelect.IsEmpty() || l.selectAddressNext {
- if ag, ok := l.addressSelect.Choose().(*addressGroup); ok {
- l.selectAddressNext = false
- return ag.choose()
- }
- }
- nq, _ := l.valueSelect.Choose().(*nodeQueue)
- l.selectAddressNext = true
- return nq
-}
-
-// processLoop processes requests sequentially
-func (l *Limiter) processLoop() {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- for {
- if l.quit {
- for _, nq := range l.nodes {
- for _, request := range nq.queue {
- close(request.process)
- }
- }
- return
- }
- nq := l.choose()
- if nq == nil {
- l.cond.Wait()
- continue
- }
- if nq.queue != nil {
- request := nq.queue[0]
- nq.queue = nq.queue[1:]
- nq.sumCost -= request.cost
- l.sumCost -= request.cost
- l.lock.Unlock()
- ch := make(chan struct{})
- request.process <- ch
- <-ch
- l.lock.Lock()
- if len(nq.queue) > 0 {
- l.update(nq)
- } else {
- l.remove(nq)
- }
- } else {
- // penalized queue removed, next request will be added to a clean queue
- l.remove(nq)
- }
- }
-}
-
-// Stop stops the processing loop. All queued and future requests are rejected.
-func (l *Limiter) Stop() {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- l.quit = true
- l.cond.Signal()
-}
-
-type dropListItem struct {
- nq *nodeQueue
- priority float64
-}
-
-// dropRequests selects the nodes with the highest queued request cost to selection
-// weight ratio and drops their queued request. The empty node queues stay in the
-// selectors with a low selection weight in order to penalize these nodes.
-func (l *Limiter) dropRequests() {
- var (
- sumValue float64
- list []dropListItem
- )
- for _, nq := range l.nodes {
- sumValue += nq.value
- }
- for _, nq := range l.nodes {
- if nq.sumCost == 0 {
- continue
- }
- w := 1 / float64(len(l.addresses)*len(l.addresses[nq.address].nodes))
- if sumValue > 0 {
- w += nq.value / sumValue
- }
- list = append(list, dropListItem{
- nq: nq,
- priority: w / float64(nq.sumCost),
- })
- }
- slices.SortFunc(list, func(a, b dropListItem) int {
- if a.priority < b.priority {
- return -1
- }
- if a.priority < b.priority {
- return 1
- }
- return 0
- })
- for _, item := range list {
- for _, request := range item.nq.queue {
- close(request.process)
- }
- // make the queue penalized; no more requests are accepted until the node is
- // selected based on the penalty cost which is the cumulative cost of all dropped
- // requests. This ensures that sending excess requests is always penalized
- // and incentivizes the sender to stop for a while if no replies are received.
- item.nq.queue = nil
- item.nq.penaltyCost = item.nq.sumCost
- l.sumCost -= item.nq.sumCost // penalty costs are not counted in sumCost
- item.nq.sumCost = 0
- l.update(item.nq)
- if l.sumCost <= l.sumCostLimit/2 {
- return
- }
- }
-}
diff --git a/params/version.go b/params/version.go
index 6d6dc1702..e92ad50fa 100644
--- a/params/version.go
+++ b/params/version.go
@@ -29,8 +29,8 @@ const (
const (
BspVersionMajor = 1 // Major version component of the current release
- BspVersionMinor = 5 // Minor version component of the current release
- BspVersionPatch = 2 // Patch version component of the current release
+ BspVersionMinor = 6 // Minor version component of the current release
+ BspVersionPatch = 0 // Patch version component of the current release
)
// Version holds the textual version string.
diff --git a/trie/proof_test.go b/trie/proof_test.go
index 93262775b..a185c154e 100644
--- a/trie/proof_test.go
+++ b/trie/proof_test.go
@@ -438,7 +438,6 @@ func TestSingleSideRangeProof(t *testing.T) {
k = append(k, entries[i].k)
v = append(v, entries[i].v)
}
-<<<<<<< HEAD
_, err := VerifyRangeProof(trie.Hash(), common.Hash{}.Bytes(), k[len(k)-1], k, v, proof)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
@@ -476,9 +475,6 @@ func TestReverseSingleSideRangeProof(t *testing.T) {
v = append(v, entries[i].v)
}
_, err := VerifyRangeProof(trie.Hash(), k[0], last.Bytes(), k, v, proof)
-=======
- _, err := VerifyRangeProof(trie.Hash(), common.Hash{}.Bytes(), k, v, proof)
->>>>>>> bc0be1b1060b51b802b88a1a528d36021f21c324
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}