From c1ed28c45a68ce7c3059706c3fda349dd13a6080 Mon Sep 17 00:00:00 2001 From: Pranay Valson Date: Mon, 22 Jan 2024 12:01:38 -0800 Subject: [PATCH] resolve conflicts; bump bsp minor version (geth minor upgrade) Signed-off-by: Pranay Valson --- .github/workflows/build-test.yml | 2 +- core/vm/interface.go | 1 - internal/ethapi/multicall_api.go | 10 +- les/api_backend.go | 347 --------------------------- les/servingqueue.go | 364 ---------------------------- les/utils/limiter.go | 398 ------------------------------- params/version.go | 4 +- trie/proof_test.go | 4 - 8 files changed, 4 insertions(+), 1126 deletions(-) delete mode 100644 les/api_backend.go delete mode 100644 les/servingqueue.go delete mode 100644 les/utils/limiter.go diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 0b712e69e..bd80df8a8 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -18,7 +18,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.19 + go-version: 1.21.6 - name: Checkout code uses: actions/checkout@v2 diff --git a/core/vm/interface.go b/core/vm/interface.go index 6ce7ca4be..9b82449b1 100644 --- a/core/vm/interface.go +++ b/core/vm/interface.go @@ -79,7 +79,6 @@ type StateDB interface { AddLog(*types.Log) AddPreimage(common.Hash, []byte) - ForEachStorage(common.Address, func(common.Hash, common.Hash) bool) error GetStateSpecimen() *types.StateSpecimen } diff --git a/internal/ethapi/multicall_api.go b/internal/ethapi/multicall_api.go index 82bdc9bbc..530d76ba0 100644 --- a/internal/ethapi/multicall_api.go +++ b/internal/ethapi/multicall_api.go @@ -59,15 +59,7 @@ func (s *BlockChainAPI) Multicall(ctx context.Context, commonCallArgs Transactio // get a new instance of the EVM to be used once // ethapi's vmError callback always returns nil, so it is dropped here //GetEVM(ctx context.Context, msg *core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config, blockCtx *vm.BlockContext) (*vm.EVM, func() error) - evm, getVmErr := s.b.GetEVM(ctx, msg, state, header, nil, nil) - vmErr := getVmErr() - - if vmErr != nil { - // if we cannot retrieve the an EVM for any message, that failure - // implies a fault in the node as a whole, so we should give up on - // processing the entire request - return nil, vmErr - } + evm := s.b.GetEVM(ctx, msg, state, header, nil, nil) execResult, applyMsgErr := core.ApplyMessage(evm, msg, gp) diff --git a/les/api_backend.go b/les/api_backend.go deleted file mode 100644 index 8d70695e4..000000000 --- a/les/api_backend.go +++ /dev/null @@ -1,347 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package les - -import ( - "context" - "errors" - "math/big" - "time" - - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/eth/gasprice" - "github.com/ethereum/go-ethereum/eth/tracers" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rpc" -) - -type LesApiBackend struct { - extRPCEnabled bool - allowUnprotectedTxs bool - eth *LightEthereum - gpo *gasprice.Oracle -} - -func (b *LesApiBackend) ChainConfig() *params.ChainConfig { - return b.eth.chainConfig -} - -func (b *LesApiBackend) CurrentBlock() *types.Header { - return b.eth.BlockChain().CurrentHeader() -} - -func (b *LesApiBackend) SetHead(number uint64) { - b.eth.blockchain.SetHead(number) -} - -func (b *LesApiBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) { - // Return the latest current as the pending one since there - // is no pending notion in the light client. TODO(rjl493456442) - // unify the behavior of `HeaderByNumber` and `PendingBlockAndReceipts`. - if number == rpc.PendingBlockNumber { - return b.eth.blockchain.CurrentHeader(), nil - } - if number == rpc.LatestBlockNumber { - return b.eth.blockchain.CurrentHeader(), nil - } - return b.eth.blockchain.GetHeaderByNumberOdr(ctx, uint64(number)) -} - -func (b *LesApiBackend) HeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Header, error) { - if blockNr, ok := blockNrOrHash.Number(); ok { - return b.HeaderByNumber(ctx, blockNr) - } - if hash, ok := blockNrOrHash.Hash(); ok { - header, err := b.HeaderByHash(ctx, hash) - if err != nil { - return nil, err - } - if header == nil { - return nil, errors.New("header for hash not found") - } - if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(header.Number.Uint64()) != hash { - return nil, errors.New("hash is not currently canonical") - } - return header, nil - } - return nil, errors.New("invalid arguments; neither block nor hash specified") -} - -func (b *LesApiBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { - return b.eth.blockchain.GetHeaderByHash(hash), nil -} - -func (b *LesApiBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) { - header, err := b.HeaderByNumber(ctx, number) - if header == nil || err != nil { - return nil, err - } - return b.BlockByHash(ctx, header.Hash()) -} - -func (b *LesApiBackend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { - return b.eth.blockchain.GetBlockByHash(ctx, hash) -} - -func (b *LesApiBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) { - if blockNr, ok := blockNrOrHash.Number(); ok { - return b.BlockByNumber(ctx, blockNr) - } - if hash, ok := blockNrOrHash.Hash(); ok { - block, err := b.BlockByHash(ctx, hash) - if err != nil { - return nil, err - } - if block == nil { - return nil, errors.New("header found, but block body is missing") - } - if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(block.NumberU64()) != hash { - return nil, errors.New("hash is not currently canonical") - } - return block, nil - } - return nil, errors.New("invalid arguments; neither block nor hash specified") -} - -func (b *LesApiBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) { - return light.GetBody(ctx, b.eth.odr, hash, uint64(number)) -} - -func (b *LesApiBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { - return nil, nil -} - -func (b *LesApiBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) { - header, err := b.HeaderByNumber(ctx, number) - if err != nil { - return nil, nil, err - } - if header == nil { - return nil, nil, errors.New("header not found") - } - return light.NewState(ctx, header, b.eth.odr), header, nil -} - -func (b *LesApiBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) { - if blockNr, ok := blockNrOrHash.Number(); ok { - return b.StateAndHeaderByNumber(ctx, blockNr) - } - if hash, ok := blockNrOrHash.Hash(); ok { - header := b.eth.blockchain.GetHeaderByHash(hash) - if header == nil { - return nil, nil, errors.New("header for hash not found") - } - if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(header.Number.Uint64()) != hash { - return nil, nil, errors.New("hash is not currently canonical") - } - return light.NewState(ctx, header, b.eth.odr), header, nil - } - return nil, nil, errors.New("invalid arguments; neither block nor hash specified") -} - -func (b *LesApiBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { - if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil { - return light.GetBlockReceipts(ctx, b.eth.odr, hash, *number) - } - return nil, nil -} - -func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { - return light.GetBlockLogs(ctx, b.eth.odr, hash, number) -} - -func (b *LesApiBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int { - if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil { - return b.eth.blockchain.GetTdOdr(ctx, hash, *number) - } - return nil -} - -func (b *LesApiBackend) GetEVM(ctx context.Context, msg *core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config, blockCtx *vm.BlockContext) (*vm.EVM, func() error) { - if vmConfig == nil { - vmConfig = new(vm.Config) - } - txContext := core.NewEVMTxContext(msg) - context := core.NewEVMBlockContext(header, b.eth.blockchain, nil) - if blockCtx != nil { - context = *blockCtx - } - return vm.NewEVM(context, txContext, state, b.eth.chainConfig, *vmConfig), state.Error -} - -func (b *LesApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - return b.eth.txPool.Add(ctx, signedTx) -} - -func (b *LesApiBackend) RemoveTx(txHash common.Hash) { - b.eth.txPool.RemoveTx(txHash) -} - -func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) { - return b.eth.txPool.GetTransactions() -} - -func (b *LesApiBackend) GetPoolTransaction(txHash common.Hash) *types.Transaction { - return b.eth.txPool.GetTransaction(txHash) -} - -func (b *LesApiBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { - return light.GetTransaction(ctx, b.eth.odr, txHash) -} - -func (b *LesApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { - return b.eth.txPool.GetNonce(ctx, addr) -} - -func (b *LesApiBackend) Stats() (pending int, queued int) { - return b.eth.txPool.Stats(), 0 -} - -func (b *LesApiBackend) TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) { - return b.eth.txPool.Content() -} - -func (b *LesApiBackend) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) { - return b.eth.txPool.ContentFrom(addr) -} - -func (b *LesApiBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { - return b.eth.txPool.SubscribeNewTxsEvent(ch) -} - -func (b *LesApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { - return b.eth.blockchain.SubscribeChainEvent(ch) -} - -func (b *LesApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { - return b.eth.blockchain.SubscribeChainHeadEvent(ch) -} - -func (b *LesApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { - return b.eth.blockchain.SubscribeChainSideEvent(ch) -} - -func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { - return b.eth.blockchain.SubscribeLogsEvent(ch) -} - -func (b *LesApiBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { - return event.NewSubscription(func(quit <-chan struct{}) error { - <-quit - return nil - }) -} - -func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { - return b.eth.blockchain.SubscribeRemovedLogsEvent(ch) -} - -func (b *LesApiBackend) SyncProgress() ethereum.SyncProgress { - return ethereum.SyncProgress{} -} - -func (b *LesApiBackend) ProtocolVersion() int { - return b.eth.LesVersion() + 10000 -} - -func (b *LesApiBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { - return b.gpo.SuggestTipCap(ctx) -} - -func (b *LesApiBackend) FeeHistory(ctx context.Context, blockCount uint64, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (firstBlock *big.Int, reward [][]*big.Int, baseFee []*big.Int, gasUsedRatio []float64, err error) { - return b.gpo.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) -} - -func (b *LesApiBackend) ChainDb() ethdb.Database { - return b.eth.chainDb -} - -func (b *LesApiBackend) AccountManager() *accounts.Manager { - return b.eth.accountManager -} - -func (b *LesApiBackend) ExtRPCEnabled() bool { - return b.extRPCEnabled -} - -func (b *LesApiBackend) UnprotectedAllowed() bool { - return b.allowUnprotectedTxs -} - -func (b *LesApiBackend) RPCGasCap() uint64 { - return b.eth.config.RPCGasCap -} - -func (b *LesApiBackend) RPCEVMTimeout() time.Duration { - return b.eth.config.RPCEVMTimeout -} - -func (b *LesApiBackend) RPCTxFeeCap() float64 { - return b.eth.config.RPCTxFeeCap -} - -func (b *LesApiBackend) BloomStatus() (uint64, uint64) { - if b.eth.bloomIndexer == nil { - return 0, 0 - } - sections, _, _ := b.eth.bloomIndexer.Sections() - return params.BloomBitsBlocksClient, sections -} - -func (b *LesApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { - for i := 0; i < bloomFilterThreads; i++ { - go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests) - } -} - -func (b *LesApiBackend) Engine() consensus.Engine { - return b.eth.engine -} - -func (b *LesApiBackend) CurrentHeader() *types.Header { - return b.eth.blockchain.CurrentHeader() -} - -func (b *LesApiBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) { - return b.eth.stateAtBlock(ctx, block, reexec) -} - -func (b *LesApiBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (*core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { - return b.eth.stateAtTransaction(ctx, block, txIndex, reexec) -} - -func (b *LesApiBackend) SetHistoricalBlocksSynced() bool { - if b.SyncProgress().CurrentBlock < b.SyncProgress().HighestBlock { - log.Crit("Light Client not fully synced yet, block specimen producer not supported in light client mode") - } else { - log.Crit("Fully Synced but block specimen producer not supported in light client mode") - } - - return false -} diff --git a/les/servingqueue.go b/les/servingqueue.go deleted file mode 100644 index 68cad9cb5..000000000 --- a/les/servingqueue.go +++ /dev/null @@ -1,364 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package les - -import ( - "sync" - "sync/atomic" - - "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/common/prque" - "golang.org/x/exp/slices" -) - -// servingQueue allows running tasks in a limited number of threads and puts the -// waiting tasks in a priority queue -type servingQueue struct { - recentTime, queuedTime, servingTimeDiff uint64 - burstLimit, burstDropLimit uint64 - burstDecRate float64 - lastUpdate mclock.AbsTime - - queueAddCh, queueBestCh chan *servingTask - stopThreadCh, quit chan struct{} - setThreadsCh chan int - - wg sync.WaitGroup - threadCount int // number of currently running threads - queue *prque.Prque[int64, *servingTask] // priority queue for waiting or suspended tasks - best *servingTask // the highest priority task (not included in the queue) - suspendBias int64 // priority bias against suspending an already running task -} - -// servingTask represents a request serving task. Tasks can be implemented to -// run in multiple steps, allowing the serving queue to suspend execution between -// steps if higher priority tasks are entered. The creator of the task should -// set the following fields: -// -// - priority: greater value means higher priority; values can wrap around the int64 range -// - run: execute a single step; return true if finished -// - after: executed after run finishes or returns an error, receives the total serving time -type servingTask struct { - sq *servingQueue - servingTime, timeAdded, maxTime, expTime uint64 - peer *clientPeer - priority int64 - biasAdded bool - token runToken - tokenCh chan runToken -} - -// runToken received by servingTask.start allows the task to run. Closing the -// channel by servingTask.stop signals the thread controller to allow a new task -// to start running. -type runToken chan struct{} - -// start blocks until the task can start and returns true if it is allowed to run. -// Returning false means that the task should be cancelled. -func (t *servingTask) start() bool { - if t.peer.isFrozen() { - return false - } - t.tokenCh = make(chan runToken, 1) - select { - case t.sq.queueAddCh <- t: - case <-t.sq.quit: - return false - } - select { - case t.token = <-t.tokenCh: - case <-t.sq.quit: - return false - } - if t.token == nil { - return false - } - t.servingTime -= uint64(mclock.Now()) - return true -} - -// done signals the thread controller about the task being finished and returns -// the total serving time of the task in nanoseconds. -func (t *servingTask) done() uint64 { - t.servingTime += uint64(mclock.Now()) - close(t.token) - diff := t.servingTime - t.timeAdded - t.timeAdded = t.servingTime - if t.expTime > diff { - t.expTime -= diff - atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime) - } else { - t.expTime = 0 - } - return t.servingTime -} - -// waitOrStop can be called during the execution of the task. It blocks if there -// is a higher priority task waiting (a bias is applied in favor of the currently -// running task). Returning true means that the execution can be resumed. False -// means the task should be cancelled. -func (t *servingTask) waitOrStop() bool { - t.done() - if !t.biasAdded { - t.priority += t.sq.suspendBias - t.biasAdded = true - } - return t.start() -} - -// newServingQueue returns a new servingQueue -func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue { - sq := &servingQueue{ - queue: prque.New[int64, *servingTask](nil), - suspendBias: suspendBias, - queueAddCh: make(chan *servingTask, 100), - queueBestCh: make(chan *servingTask), - stopThreadCh: make(chan struct{}), - quit: make(chan struct{}), - setThreadsCh: make(chan int, 10), - burstLimit: uint64(utilTarget * bufLimitRatio * 1200000), - burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000), - burstDecRate: utilTarget, - lastUpdate: mclock.Now(), - } - sq.wg.Add(2) - go sq.queueLoop() - go sq.threadCountLoop() - return sq -} - -// newTask creates a new task with the given priority -func (sq *servingQueue) newTask(peer *clientPeer, maxTime uint64, priority int64) *servingTask { - return &servingTask{ - sq: sq, - peer: peer, - maxTime: maxTime, - expTime: maxTime, - priority: priority, - } -} - -// threadController is started in multiple goroutines and controls the execution -// of tasks. The number of active thread controllers equals the allowed number of -// concurrently running threads. It tries to fetch the highest priority queued -// task first. If there are no queued tasks waiting then it can directly catch -// run tokens from the token channel and allow the corresponding tasks to run -// without entering the priority queue. -func (sq *servingQueue) threadController() { - defer sq.wg.Done() - for { - token := make(runToken) - select { - case best := <-sq.queueBestCh: - best.tokenCh <- token - case <-sq.stopThreadCh: - return - case <-sq.quit: - return - } - select { - case <-sq.stopThreadCh: - return - case <-sq.quit: - return - case <-token: - } - } -} - -// peerTasks lists the tasks received from a given peer when selecting peers to freeze -type peerTasks struct { - peer *clientPeer - list []*servingTask - sumTime uint64 - priority float64 -} - -// freezePeers selects the peers with the worst priority queued tasks and freezes -// them until burstTime goes under burstDropLimit or all peers are frozen -func (sq *servingQueue) freezePeers() { - peerMap := make(map[*clientPeer]*peerTasks) - var peerList []*peerTasks - if sq.best != nil { - sq.queue.Push(sq.best, sq.best.priority) - } - sq.best = nil - for sq.queue.Size() > 0 { - task := sq.queue.PopItem() - tasks := peerMap[task.peer] - if tasks == nil { - bufValue, bufLimit := task.peer.fcClient.BufferStatus() - if bufLimit < 1 { - bufLimit = 1 - } - tasks = &peerTasks{ - peer: task.peer, - priority: float64(bufValue) / float64(bufLimit), // lower value comes first - } - peerMap[task.peer] = tasks - peerList = append(peerList, tasks) - } - tasks.list = append(tasks.list, task) - tasks.sumTime += task.expTime - } - slices.SortFunc(peerList, func(a, b *peerTasks) int { - if a.priority < b.priority { - return -1 - } - if a.priority > b.priority { - return 1 - } - return 0 - }) - drop := true - for _, tasks := range peerList { - if drop { - tasks.peer.freeze() - tasks.peer.fcClient.Freeze() - sq.queuedTime -= tasks.sumTime - sqQueuedGauge.Update(int64(sq.queuedTime)) - clientFreezeMeter.Mark(1) - drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit - for _, task := range tasks.list { - task.tokenCh <- nil - } - } else { - for _, task := range tasks.list { - sq.queue.Push(task, task.priority) - } - } - } - if sq.queue.Size() > 0 { - sq.best = sq.queue.PopItem() - } -} - -// updateRecentTime recalculates the recent serving time value -func (sq *servingQueue) updateRecentTime() { - subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0) - now := mclock.Now() - dt := now - sq.lastUpdate - sq.lastUpdate = now - if dt > 0 { - subTime += uint64(float64(dt) * sq.burstDecRate) - } - if sq.recentTime > subTime { - sq.recentTime -= subTime - } else { - sq.recentTime = 0 - } -} - -// addTask inserts a task into the priority queue -func (sq *servingQueue) addTask(task *servingTask) { - if sq.best == nil { - sq.best = task - } else if task.priority-sq.best.priority > 0 { - sq.queue.Push(sq.best, sq.best.priority) - sq.best = task - } else { - sq.queue.Push(task, task.priority) - } - sq.updateRecentTime() - sq.queuedTime += task.expTime - sqServedGauge.Update(int64(sq.recentTime)) - sqQueuedGauge.Update(int64(sq.queuedTime)) - if sq.recentTime+sq.queuedTime > sq.burstLimit { - sq.freezePeers() - } -} - -// queueLoop is an event loop running in a goroutine. It receives tasks from queueAddCh -// and always tries to send the highest priority task to queueBestCh. Successfully sent -// tasks are removed from the queue. -func (sq *servingQueue) queueLoop() { - defer sq.wg.Done() - for { - if sq.best != nil { - expTime := sq.best.expTime - select { - case task := <-sq.queueAddCh: - sq.addTask(task) - case sq.queueBestCh <- sq.best: - sq.updateRecentTime() - sq.queuedTime -= expTime - sq.recentTime += expTime - sqServedGauge.Update(int64(sq.recentTime)) - sqQueuedGauge.Update(int64(sq.queuedTime)) - if sq.queue.Size() == 0 { - sq.best = nil - } else { - sq.best = sq.queue.PopItem() - } - case <-sq.quit: - return - } - } else { - select { - case task := <-sq.queueAddCh: - sq.addTask(task) - case <-sq.quit: - return - } - } - } -} - -// threadCountLoop is an event loop running in a goroutine. It adjusts the number -// of active thread controller goroutines. -func (sq *servingQueue) threadCountLoop() { - var threadCountTarget int - defer sq.wg.Done() - for { - for threadCountTarget > sq.threadCount { - sq.wg.Add(1) - go sq.threadController() - sq.threadCount++ - } - if threadCountTarget < sq.threadCount { - select { - case threadCountTarget = <-sq.setThreadsCh: - case sq.stopThreadCh <- struct{}{}: - sq.threadCount-- - case <-sq.quit: - return - } - } else { - select { - case threadCountTarget = <-sq.setThreadsCh: - case <-sq.quit: - return - } - } - } -} - -// setThreads sets the allowed processing thread count, suspending tasks as soon as -// possible if necessary. -func (sq *servingQueue) setThreads(threadCount int) { - select { - case sq.setThreadsCh <- threadCount: - case <-sq.quit: - return - } -} - -// stop stops task processing as soon as possible and shuts down the serving queue. -func (sq *servingQueue) stop() { - close(sq.quit) - sq.wg.Wait() -} diff --git a/les/utils/limiter.go b/les/utils/limiter.go deleted file mode 100644 index 70b7ff64f..000000000 --- a/les/utils/limiter.go +++ /dev/null @@ -1,398 +0,0 @@ -// Copyright 2021 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package utils - -import ( - "sync" - - "github.com/ethereum/go-ethereum/p2p/enode" - "golang.org/x/exp/slices" -) - -const maxSelectionWeight = 1000000000 // maximum selection weight of each individual node/address group - -// Limiter protects a network request serving mechanism from denial-of-service attacks. -// It limits the total amount of resources used for serving requests while ensuring that -// the most valuable connections always have a reasonable chance of being served. -type Limiter struct { - lock sync.Mutex - cond *sync.Cond - quit bool - - nodes map[enode.ID]*nodeQueue - addresses map[string]*addressGroup - addressSelect, valueSelect *WeightedRandomSelect - maxValue float64 - maxCost, sumCost, sumCostLimit uint - selectAddressNext bool -} - -// nodeQueue represents queued requests coming from a single node ID -type nodeQueue struct { - queue []request // always nil if penaltyCost != 0 - id enode.ID - address string - value float64 - flatWeight, valueWeight uint64 // current selection weights in the address/value selectors - sumCost uint // summed cost of requests queued by the node - penaltyCost uint // cumulative cost of dropped requests since last processed request - groupIndex int -} - -// addressGroup is a group of node IDs that have sent their last requests from the same -// network address -type addressGroup struct { - nodes []*nodeQueue - nodeSelect *WeightedRandomSelect - sumFlatWeight, groupWeight uint64 -} - -// request represents an incoming request scheduled for processing -type request struct { - process chan chan struct{} - cost uint -} - -// flatWeight distributes weights equally between each active network address -func flatWeight(item interface{}) uint64 { return item.(*nodeQueue).flatWeight } - -// add adds the node queue to the address group. It is the caller's responsibility to -// add the address group to the address map and the address selector if it wasn't -// there before. -func (ag *addressGroup) add(nq *nodeQueue) { - if nq.groupIndex != -1 { - panic("added node queue is already in an address group") - } - l := len(ag.nodes) - nq.groupIndex = l - ag.nodes = append(ag.nodes, nq) - ag.sumFlatWeight += nq.flatWeight - ag.groupWeight = ag.sumFlatWeight / uint64(l+1) - ag.nodeSelect.Update(ag.nodes[l]) -} - -// update updates the selection weight of the node queue inside the address group. -// It is the caller's responsibility to update the group's selection weight in the -// address selector. -func (ag *addressGroup) update(nq *nodeQueue, weight uint64) { - if nq.groupIndex == -1 || nq.groupIndex >= len(ag.nodes) || ag.nodes[nq.groupIndex] != nq { - panic("updated node queue is not in this address group") - } - ag.sumFlatWeight += weight - nq.flatWeight - nq.flatWeight = weight - ag.groupWeight = ag.sumFlatWeight / uint64(len(ag.nodes)) - ag.nodeSelect.Update(nq) -} - -// remove removes the node queue from the address group. It is the caller's responsibility -// to remove the address group from the address map if it is empty. -func (ag *addressGroup) remove(nq *nodeQueue) { - if nq.groupIndex == -1 || nq.groupIndex >= len(ag.nodes) || ag.nodes[nq.groupIndex] != nq { - panic("removed node queue is not in this address group") - } - - l := len(ag.nodes) - 1 - if nq.groupIndex != l { - ag.nodes[nq.groupIndex] = ag.nodes[l] - ag.nodes[nq.groupIndex].groupIndex = nq.groupIndex - } - nq.groupIndex = -1 - ag.nodes = ag.nodes[:l] - ag.sumFlatWeight -= nq.flatWeight - if l >= 1 { - ag.groupWeight = ag.sumFlatWeight / uint64(l) - } else { - ag.groupWeight = 0 - } - ag.nodeSelect.Remove(nq) -} - -// choose selects one of the node queues belonging to the address group -func (ag *addressGroup) choose() *nodeQueue { - return ag.nodeSelect.Choose().(*nodeQueue) -} - -// NewLimiter creates a new Limiter -func NewLimiter(sumCostLimit uint) *Limiter { - l := &Limiter{ - addressSelect: NewWeightedRandomSelect(func(item interface{}) uint64 { return item.(*addressGroup).groupWeight }), - valueSelect: NewWeightedRandomSelect(func(item interface{}) uint64 { return item.(*nodeQueue).valueWeight }), - nodes: make(map[enode.ID]*nodeQueue), - addresses: make(map[string]*addressGroup), - sumCostLimit: sumCostLimit, - } - l.cond = sync.NewCond(&l.lock) - go l.processLoop() - return l -} - -// selectionWeights calculates the selection weights of a node for both the address and -// the value selector. The selection weight depends on the next request cost or the -// summed cost of recently dropped requests. -func (l *Limiter) selectionWeights(reqCost uint, value float64) (flatWeight, valueWeight uint64) { - if value > l.maxValue { - l.maxValue = value - } - if value > 0 { - // normalize value to <= 1 - value /= l.maxValue - } - if reqCost > l.maxCost { - l.maxCost = reqCost - } - relCost := float64(reqCost) / float64(l.maxCost) - var f float64 - if relCost <= 0.001 { - f = 1 - } else { - f = 0.001 / relCost - } - f *= maxSelectionWeight - flatWeight, valueWeight = uint64(f), uint64(f*value) - if flatWeight == 0 { - flatWeight = 1 - } - return -} - -// Add adds a new request to the node queue belonging to the given id. Value belongs -// to the requesting node. A higher value gives the request a higher chance of being -// served quickly in case of heavy load or a DDoS attack. Cost is a rough estimate -// of the serving cost of the request. A lower cost also gives the request a -// better chance. -func (l *Limiter) Add(id enode.ID, address string, value float64, reqCost uint) chan chan struct{} { - l.lock.Lock() - defer l.lock.Unlock() - - process := make(chan chan struct{}, 1) - if l.quit { - close(process) - return process - } - if reqCost == 0 { - reqCost = 1 - } - if nq, ok := l.nodes[id]; ok { - if nq.queue != nil { - nq.queue = append(nq.queue, request{process, reqCost}) - nq.sumCost += reqCost - nq.value = value - if address != nq.address { - // known id sending request from a new address, move to different address group - l.removeFromGroup(nq) - l.addToGroup(nq, address) - } - } else { - // already waiting on a penalty, just add to the penalty cost and drop the request - nq.penaltyCost += reqCost - l.update(nq) - close(process) - return process - } - } else { - nq := &nodeQueue{ - queue: []request{{process, reqCost}}, - id: id, - value: value, - sumCost: reqCost, - groupIndex: -1, - } - nq.flatWeight, nq.valueWeight = l.selectionWeights(reqCost, value) - if len(l.nodes) == 0 { - l.cond.Signal() - } - l.nodes[id] = nq - if nq.valueWeight != 0 { - l.valueSelect.Update(nq) - } - l.addToGroup(nq, address) - } - l.sumCost += reqCost - if l.sumCost > l.sumCostLimit { - l.dropRequests() - } - return process -} - -// update updates the selection weights of the node queue -func (l *Limiter) update(nq *nodeQueue) { - var cost uint - if nq.queue != nil { - cost = nq.queue[0].cost - } else { - cost = nq.penaltyCost - } - flatWeight, valueWeight := l.selectionWeights(cost, nq.value) - ag := l.addresses[nq.address] - ag.update(nq, flatWeight) - l.addressSelect.Update(ag) - nq.valueWeight = valueWeight - l.valueSelect.Update(nq) -} - -// addToGroup adds the node queue to the given address group. The group is created if -// it does not exist yet. -func (l *Limiter) addToGroup(nq *nodeQueue, address string) { - nq.address = address - ag := l.addresses[address] - if ag == nil { - ag = &addressGroup{nodeSelect: NewWeightedRandomSelect(flatWeight)} - l.addresses[address] = ag - } - ag.add(nq) - l.addressSelect.Update(ag) -} - -// removeFromGroup removes the node queue from its address group -func (l *Limiter) removeFromGroup(nq *nodeQueue) { - ag := l.addresses[nq.address] - ag.remove(nq) - if len(ag.nodes) == 0 { - delete(l.addresses, nq.address) - } - l.addressSelect.Update(ag) -} - -// remove removes the node queue from its address group, the nodes map and the value -// selector -func (l *Limiter) remove(nq *nodeQueue) { - l.removeFromGroup(nq) - if nq.valueWeight != 0 { - l.valueSelect.Remove(nq) - } - delete(l.nodes, nq.id) -} - -// choose selects the next node queue to process. -func (l *Limiter) choose() *nodeQueue { - if l.valueSelect.IsEmpty() || l.selectAddressNext { - if ag, ok := l.addressSelect.Choose().(*addressGroup); ok { - l.selectAddressNext = false - return ag.choose() - } - } - nq, _ := l.valueSelect.Choose().(*nodeQueue) - l.selectAddressNext = true - return nq -} - -// processLoop processes requests sequentially -func (l *Limiter) processLoop() { - l.lock.Lock() - defer l.lock.Unlock() - - for { - if l.quit { - for _, nq := range l.nodes { - for _, request := range nq.queue { - close(request.process) - } - } - return - } - nq := l.choose() - if nq == nil { - l.cond.Wait() - continue - } - if nq.queue != nil { - request := nq.queue[0] - nq.queue = nq.queue[1:] - nq.sumCost -= request.cost - l.sumCost -= request.cost - l.lock.Unlock() - ch := make(chan struct{}) - request.process <- ch - <-ch - l.lock.Lock() - if len(nq.queue) > 0 { - l.update(nq) - } else { - l.remove(nq) - } - } else { - // penalized queue removed, next request will be added to a clean queue - l.remove(nq) - } - } -} - -// Stop stops the processing loop. All queued and future requests are rejected. -func (l *Limiter) Stop() { - l.lock.Lock() - defer l.lock.Unlock() - - l.quit = true - l.cond.Signal() -} - -type dropListItem struct { - nq *nodeQueue - priority float64 -} - -// dropRequests selects the nodes with the highest queued request cost to selection -// weight ratio and drops their queued request. The empty node queues stay in the -// selectors with a low selection weight in order to penalize these nodes. -func (l *Limiter) dropRequests() { - var ( - sumValue float64 - list []dropListItem - ) - for _, nq := range l.nodes { - sumValue += nq.value - } - for _, nq := range l.nodes { - if nq.sumCost == 0 { - continue - } - w := 1 / float64(len(l.addresses)*len(l.addresses[nq.address].nodes)) - if sumValue > 0 { - w += nq.value / sumValue - } - list = append(list, dropListItem{ - nq: nq, - priority: w / float64(nq.sumCost), - }) - } - slices.SortFunc(list, func(a, b dropListItem) int { - if a.priority < b.priority { - return -1 - } - if a.priority < b.priority { - return 1 - } - return 0 - }) - for _, item := range list { - for _, request := range item.nq.queue { - close(request.process) - } - // make the queue penalized; no more requests are accepted until the node is - // selected based on the penalty cost which is the cumulative cost of all dropped - // requests. This ensures that sending excess requests is always penalized - // and incentivizes the sender to stop for a while if no replies are received. - item.nq.queue = nil - item.nq.penaltyCost = item.nq.sumCost - l.sumCost -= item.nq.sumCost // penalty costs are not counted in sumCost - item.nq.sumCost = 0 - l.update(item.nq) - if l.sumCost <= l.sumCostLimit/2 { - return - } - } -} diff --git a/params/version.go b/params/version.go index 6d6dc1702..e92ad50fa 100644 --- a/params/version.go +++ b/params/version.go @@ -29,8 +29,8 @@ const ( const ( BspVersionMajor = 1 // Major version component of the current release - BspVersionMinor = 5 // Minor version component of the current release - BspVersionPatch = 2 // Patch version component of the current release + BspVersionMinor = 6 // Minor version component of the current release + BspVersionPatch = 0 // Patch version component of the current release ) // Version holds the textual version string. diff --git a/trie/proof_test.go b/trie/proof_test.go index 93262775b..a185c154e 100644 --- a/trie/proof_test.go +++ b/trie/proof_test.go @@ -438,7 +438,6 @@ func TestSingleSideRangeProof(t *testing.T) { k = append(k, entries[i].k) v = append(v, entries[i].v) } -<<<<<<< HEAD _, err := VerifyRangeProof(trie.Hash(), common.Hash{}.Bytes(), k[len(k)-1], k, v, proof) if err != nil { t.Fatalf("Expected no error, got %v", err) @@ -476,9 +475,6 @@ func TestReverseSingleSideRangeProof(t *testing.T) { v = append(v, entries[i].v) } _, err := VerifyRangeProof(trie.Hash(), k[0], last.Bytes(), k, v, proof) -======= - _, err := VerifyRangeProof(trie.Hash(), common.Hash{}.Bytes(), k, v, proof) ->>>>>>> bc0be1b1060b51b802b88a1a528d36021f21c324 if err != nil { t.Fatalf("Expected no error, got %v", err) }