Skip to content

Commit

Permalink
alleviate network congestion
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc committed Nov 21, 2024
1 parent 74a8d5d commit 45f7028
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 1 deletion.
13 changes: 12 additions & 1 deletion actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type ActPool interface {
type Subscriber interface {
OnAdded(*action.SealedEnvelope)
OnRemoved(*action.SealedEnvelope)
OnRejected(context.Context, *action.SealedEnvelope, error)
}

// SortedActions is a slice of actions that implements sort.Interface to sort by Value.
Expand Down Expand Up @@ -281,7 +282,11 @@ func (ap *actPool) PendingActionMap() map[string][]*action.SealedEnvelope {
}

func (ap *actPool) Add(ctx context.Context, act *action.SealedEnvelope) error {
return ap.add(ctx, act)
err := ap.add(ctx, act)
if err != nil {
ap.onRejected(ctx, act, err)
}
return err
}

func (ap *actPool) add(ctx context.Context, act *action.SealedEnvelope) error {
Expand Down Expand Up @@ -586,6 +591,12 @@ func (ap *actPool) onRemoved(act *action.SealedEnvelope) {
}
}

func (ap *actPool) onRejected(ctx context.Context, act *action.SealedEnvelope, err error) {
for _, sub := range ap.subs {
sub.OnRejected(ctx, act, err)
}
}

type destinationMap struct {
mu sync.Mutex
acts map[string]map[hash.Hash256]*action.SealedEnvelope
Expand Down
2 changes: 2 additions & 0 deletions actpool/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,5 @@ func (v *blobValidator) OnRemoved(act *action.SealedEnvelope) {
}
v.blobCntPerAcc[sender]--
}

func (v *blobValidator) OnRejected(context.Context, *action.SealedEnvelope, error) {}
21 changes: 21 additions & 0 deletions api/action_radio.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"context"
"encoding/hex"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -117,6 +118,26 @@ func (ar *ActionRadio) OnRemoved(selp *action.SealedEnvelope) {
delete(ar.unconfirmedActs, hash)
}

func (ar *ActionRadio) OnRejected(ctx context.Context, selp *action.SealedEnvelope, err error) {
if !errors.Is(err, action.ErrExistedInPool) {
return
}
if _, fromAPI := GetAPIContext(ctx); fromAPI {
// ignore action rejected from API
return
}
// retry+1 for action broadcast from other nodes, alleviate the network congestion
hash, _ := selp.Hash()
ar.mutex.Lock()
defer ar.mutex.Unlock()
if radioAct, ok := ar.unconfirmedActs[hash]; ok {
radioAct.retry++
radioAct.lastRadioTime = time.Now()
} else {
log.L().Warn("Found rejected action not in unconfirmedActs", zap.String("actionHash", hex.EncodeToString(hash[:])))
}
}

// autoRadio broadcasts long time pending actions periodically
func (ar *ActionRadio) autoRadio() {
now := time.Now()
Expand Down
14 changes: 14 additions & 0 deletions api/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
type (
streamContextKey struct{}

apiContextKey struct{}

StreamContext struct {
listenerIDs map[string]struct{}
mutex sync.Mutex
Expand Down Expand Up @@ -46,3 +48,15 @@ func StreamFromContext(ctx context.Context) (*StreamContext, bool) {
sc, ok := ctx.Value(streamContextKey{}).(*StreamContext)
return sc, ok
}

func WithAPIContext(ctx context.Context) context.Context {
return context.WithValue(ctx, apiContextKey{}, struct{}{})
}

func GetAPIContext(ctx context.Context) (struct{}, bool) {
c := ctx.Value(apiContextKey{})
if c == nil {
return struct{}{}, false
}
return c.(struct{}), true
}
1 change: 1 addition & 0 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ func (core *coreService) SendAction(ctx context.Context, in *iotextypes.Action)
return "", err
}
l := log.Logger("api").With(zap.String("actionHash", hex.EncodeToString(hash[:])))
ctx = WithAPIContext(ctx)
if err = core.ap.Add(ctx, selp); err != nil {
txBytes, serErr := proto.Marshal(in)
if serErr != nil {
Expand Down

0 comments on commit 45f7028

Please sign in to comment.