diff --git a/actpool/actpool.go b/actpool/actpool.go index 5716435a2b..32bfaeb947 100644 --- a/actpool/actpool.go +++ b/actpool/actpool.go @@ -89,6 +89,7 @@ type ActPool interface { type Subscriber interface { OnAdded(*action.SealedEnvelope) OnRemoved(*action.SealedEnvelope) + OnRejected(context.Context, *action.SealedEnvelope, error) } // SortedActions is a slice of actions that implements sort.Interface to sort by Value. @@ -281,7 +282,11 @@ func (ap *actPool) PendingActionMap() map[string][]*action.SealedEnvelope { } func (ap *actPool) Add(ctx context.Context, act *action.SealedEnvelope) error { - return ap.add(ctx, act) + err := ap.add(ctx, act) + if err != nil { + ap.onRejected(ctx, act, err) + } + return err } func (ap *actPool) add(ctx context.Context, act *action.SealedEnvelope) error { @@ -586,6 +591,12 @@ func (ap *actPool) onRemoved(act *action.SealedEnvelope) { } } +func (ap *actPool) onRejected(ctx context.Context, act *action.SealedEnvelope, err error) { + for _, sub := range ap.subs { + sub.OnRejected(ctx, act, err) + } +} + type destinationMap struct { mu sync.Mutex acts map[string]map[hash.Hash256]*action.SealedEnvelope diff --git a/actpool/validator.go b/actpool/validator.go index 201af354c8..813b854108 100644 --- a/actpool/validator.go +++ b/actpool/validator.go @@ -61,3 +61,5 @@ func (v *blobValidator) OnRemoved(act *action.SealedEnvelope) { } v.blobCntPerAcc[sender]-- } + +func (v *blobValidator) OnRejected(context.Context, *action.SealedEnvelope, error) {} diff --git a/api/action_radio.go b/api/action_radio.go index c015bc46bf..2e421fb52f 100644 --- a/api/action_radio.go +++ b/api/action_radio.go @@ -3,6 +3,7 @@ package api import ( "context" "encoding/hex" + "errors" "sync" "time" @@ -117,6 +118,26 @@ func (ar *ActionRadio) OnRemoved(selp *action.SealedEnvelope) { delete(ar.unconfirmedActs, hash) } +func (ar *ActionRadio) OnRejected(ctx context.Context, selp *action.SealedEnvelope, err error) { + if !errors.Is(err, action.ErrExistedInPool) { + return + } + if _, fromAPI := GetAPIContext(ctx); fromAPI { + // ignore action rejected from API + return + } + // retry+1 for action broadcast from other nodes, alleviate the network congestion + hash, _ := selp.Hash() + ar.mutex.Lock() + defer ar.mutex.Unlock() + if radioAct, ok := ar.unconfirmedActs[hash]; ok { + radioAct.retry++ + radioAct.lastRadioTime = time.Now() + } else { + log.L().Warn("Found rejected action not in unconfirmedActs", zap.String("actionHash", hex.EncodeToString(hash[:]))) + } +} + // autoRadio broadcasts long time pending actions periodically func (ar *ActionRadio) autoRadio() { now := time.Now() diff --git a/api/context.go b/api/context.go index d772ad6644..185009394b 100644 --- a/api/context.go +++ b/api/context.go @@ -8,6 +8,8 @@ import ( type ( streamContextKey struct{} + apiContextKey struct{} + StreamContext struct { listenerIDs map[string]struct{} mutex sync.Mutex @@ -46,3 +48,15 @@ func StreamFromContext(ctx context.Context) (*StreamContext, bool) { sc, ok := ctx.Value(streamContextKey{}).(*StreamContext) return sc, ok } + +func WithAPIContext(ctx context.Context) context.Context { + return context.WithValue(ctx, apiContextKey{}, struct{}{}) +} + +func GetAPIContext(ctx context.Context) (struct{}, bool) { + c := ctx.Value(apiContextKey{}) + if c == nil { + return struct{}{}, false + } + return c.(struct{}), true +} diff --git a/api/coreservice.go b/api/coreservice.go index 28817bbade..9e63cf2c04 100644 --- a/api/coreservice.go +++ b/api/coreservice.go @@ -483,6 +483,7 @@ func (core *coreService) SendAction(ctx context.Context, in *iotextypes.Action) return "", err } l := log.Logger("api").With(zap.String("actionHash", hex.EncodeToString(hash[:]))) + ctx = WithAPIContext(ctx) if err = core.ap.Add(ctx, selp); err != nil { txBytes, serErr := proto.Marshal(in) if serErr != nil {