Skip to content

Commit

Permalink
[factory] add parent to workingset
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Jan 9, 2025
1 parent d70ffd4 commit 884ab14
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 29 deletions.
53 changes: 37 additions & 16 deletions state/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,11 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe
if err != nil {
return nil, err
}
return sf.createSfWorkingSet(ctx, height, store)
var parent *workingSet
if height > 0 {
parent = getWorkingSetByHeight(sf.workingsets, height-1)
}
return sf.createSfWorkingSet(ctx, height, store, parent)
}

func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*workingSet, error) {
Expand All @@ -290,10 +294,10 @@ func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*w
if err != nil {
return nil, err
}
return sf.createSfWorkingSet(ctx, height, store)
return sf.createSfWorkingSet(ctx, height, store, nil)
}

func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore) (*workingSet, error) {
func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore, parent *workingSet) (*workingSet, error) {
if err := store.Start(ctx); err != nil {
return nil, err
}
Expand All @@ -308,7 +312,7 @@ func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store
}
}
}
return newWorkingSet(height, store), nil
return newWorkingSet(height, store, parent), nil
}

func (sf *factory) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
Expand Down Expand Up @@ -357,6 +361,7 @@ func (sf *factory) Validate(ctx context.Context, blk *block.Block) error {
return errors.Wrap(err, "failed to validate block with workingset in factory")
}
sf.workingsets.Add(key, ws)
sf.workingsets.Add(ws.height, ws)
}
receipts, err := ws.Receipts()
if err != nil {
Expand Down Expand Up @@ -399,6 +404,7 @@ func (sf *factory) NewBlockBuilder(
blkCtx := protocol.MustGetBlockCtx(ctx)
key := generateWorkingSetCacheKey(blkBuilder.GetCurrentBlockHeader(), blkCtx.Producer.String())
sf.workingsets.Add(key, ws)
sf.workingsets.Add(ws.height, ws)
return blkBuilder, nil
}

Expand Down Expand Up @@ -434,19 +440,35 @@ func (sf *factory) WorkingSetAtHeight(ctx context.Context, height uint64, preact
}

// PutBlock persists all changes in RunActions() into the DB
func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error {
func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) (err error) {
timer := sf.timerFactory.NewTimer("Commit")
defer timer.End()
var (
ws *workingSet
isExist bool
)
defer func() {
timer.End()
if err != nil {
// abandon current workingset, and all pending workingsets beyond current height
ws.abandon()
h, _ := ws.Height()
abandonWorkingSets(sf.workingsets, h)
}
}()
producer := blk.PublicKey().Address()
if producer == nil {
return errors.New("failed to get address")
}
ctx = protocol.WithRegistry(ctx, sf.registry)
key := generateWorkingSetCacheKey(blk.Header, blk.Header.ProducerAddress())
ws, isExist, err := sf.getFromWorkingSets(ctx, key)
ws, isExist, err = sf.getFromWorkingSets(ctx, key)
if err != nil {
return err
return
}
if err = ws.verifyParent(); err != nil {
return
}
ws.detachParent()
if !isExist {
// regenerate workingset
if !sf.skipBlockValidationOnPut {
Expand All @@ -456,14 +478,14 @@ func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error {
}
if err != nil {
log.L().Error("Failed to update state.", zap.Error(err))
return err
return
}
}
sf.mutex.Lock()
defer sf.mutex.Unlock()
receipts, err := ws.Receipts()
if err != nil {
return err
return
}
blk.Receipts = receipts
h, _ := ws.Height()
Expand All @@ -475,18 +497,17 @@ func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error {
)
}

if err := ws.Commit(ctx); err != nil {
return err
if err = ws.Commit(ctx); err != nil {
return
}
rh, err := sf.dao.Get(ArchiveTrieNamespace, []byte(ArchiveTrieRootKey))
if err != nil {
return err
return
}
if err := sf.twoLayerTrie.SetRootHash(rh); err != nil {
return err
if err = sf.twoLayerTrie.SetRootHash(rh); err != nil {
return
}
sf.currentChainHeight = h

return nil
}

Expand Down
45 changes: 33 additions & 12 deletions state/factory/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,11 @@ func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingS
if err := store.Start(ctx); err != nil {
return nil, err
}

return newWorkingSet(height, store), nil
var parent *workingSet
if height > 0 {
parent = getWorkingSetByHeight(sdb.workingsets, height-1)
}
return newWorkingSet(height, store, parent), nil
}

func (sdb *stateDB) Register(p protocol.Protocol) error {
Expand All @@ -217,6 +220,7 @@ func (sdb *stateDB) Validate(ctx context.Context, blk *block.Block) error {
return errors.Wrap(err, "failed to validate block with workingset in statedb")
}
sdb.workingsets.Add(key, ws)
sdb.workingsets.Add(ws.height, ws)
}
receipts, err := ws.Receipts()
if err != nil {
Expand Down Expand Up @@ -260,6 +264,7 @@ func (sdb *stateDB) NewBlockBuilder(
blkCtx := protocol.MustGetBlockCtx(ctx)
key := generateWorkingSetCacheKey(blkBuilder.GetCurrentBlockHeader(), blkCtx.Producer.String())
sdb.workingsets.Add(key, ws)
sdb.workingsets.Add(ws.height, ws)
return blkBuilder, nil
}

Expand Down Expand Up @@ -287,21 +292,37 @@ func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64, preac
}

// PutBlock persists all changes in RunActions() into the DB
func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error {
func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) (err error) {
sdb.mutex.Lock()
timer := sdb.timerFactory.NewTimer("Commit")
sdb.mutex.Unlock()
defer timer.End()
var (
ws *workingSet
isExist bool
)
defer func() {
timer.End()
if err != nil {
// abandon current workingset, and all pending workingsets beyond current height
ws.abandon()
h, _ := ws.Height()
abandonWorkingSets(sdb.workingsets, h)
}
}()
producer := blk.PublicKey().Address()
if producer == nil {
return errors.New("failed to get address")
}
ctx = protocol.WithRegistry(ctx, sdb.registry)
key := generateWorkingSetCacheKey(blk.Header, blk.Header.ProducerAddress())
ws, isExist, err := sdb.getFromWorkingSets(ctx, key)
ws, isExist, err = sdb.getFromWorkingSets(ctx, key)
if err != nil {
return err
return
}
if err = ws.verifyParent(); err != nil {
return
}
ws.detachParent()
if !isExist {
if !sdb.skipBlockValidationOnPut {
err = ws.ValidateBlock(ctx, blk)
Expand All @@ -310,14 +331,14 @@ func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error {
}
if err != nil {
log.L().Error("Failed to update state.", zap.Error(err))
return err
return
}
}
sdb.mutex.Lock()
defer sdb.mutex.Unlock()
receipts, err := ws.Receipts()
if err != nil {
return err
return
}
blk.Receipts = receipts
h, _ := ws.Height()
Expand All @@ -329,8 +350,8 @@ func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error {
)
}

if err := ws.Commit(ctx); err != nil {
return err
if err = ws.Commit(ctx); err != nil {
return
}
sdb.currentChainHeight = h
return nil
Expand Down Expand Up @@ -440,6 +461,6 @@ func (sdb *stateDB) getFromWorkingSets(ctx context.Context, key hash.Hash256) (*
sdb.mutex.RLock()
currHeight := sdb.currentChainHeight
sdb.mutex.RUnlock()
tx, err := sdb.newWorkingSet(ctx, currHeight+1)
return tx, false, err
ws, err := sdb.newWorkingSet(ctx, currHeight+1)
return ws, false, err
}
23 changes: 23 additions & 0 deletions state/factory/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"

"github.com/iotexproject/go-pkgs/bloom"
"github.com/iotexproject/go-pkgs/cache"
"github.com/iotexproject/go-pkgs/crypto"
"github.com/iotexproject/go-pkgs/hash"
"github.com/pkg/errors"
Expand Down Expand Up @@ -96,6 +97,28 @@ func generateWorkingSetCacheKey(blkHeader block.Header, producerAddr string) has
return hash.Hash256b(sum)
}

func getWorkingSetByHeight(c cache.LRUCache, h uint64) *workingSet {
if h == 0 {
return nil
}
if data, ok := c.Get(h); ok {
if ws, ok := data.(*workingSet); ok {
return ws
}
}
return nil
}

func abandonWorkingSets(c cache.LRUCache, h uint64) {
for ; ; h++ {
if ws := getWorkingSetByHeight(c, h); ws != nil {
c.Remove(h)
} else {
break
}
}
}

func protocolPreCommit(ctx context.Context, sr protocol.StateManager) error {
if reg, ok := protocol.GetRegistry(ctx); ok {
for _, p := range reg.All() {
Expand Down
44 changes: 43 additions & 1 deletion state/factory/workingset.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"math/big"
"sort"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -64,17 +65,20 @@ type (
height uint64
store workingSetStore
finalized bool
abandoned atomic.Bool
dock protocol.Dock
txValidator *protocol.GenericValidator
receipts []*action.Receipt
parent *workingSet
}
)

func newWorkingSet(height uint64, store workingSetStore) *workingSet {
func newWorkingSet(height uint64, store workingSetStore, parent *workingSet) *workingSet {
ws := &workingSet{
height: height,
store: store,
dock: protocol.NewDock(),
parent: parent,
}
ws.txValidator = protocol.NewGenericValidator(ws, accountutil.AccountState)
return ws
Expand Down Expand Up @@ -114,6 +118,26 @@ func (ws *workingSet) validate(ctx context.Context) error {
return nil
}

func (ws *workingSet) isAbandoned() bool {
return ws.abandoned.Load()
}

func (ws *workingSet) abandon() {
ws.abandoned.Store(true)
}

func (ws *workingSet) verifyParent() error {
if ws.parent != nil && ws.parent.isAbandoned() {
ws.abandon()
return errors.New("workingset abandoned")
}
return nil
}

func (ws *workingSet) detachParent() {
ws.parent = nil
}

func withActionCtx(ctx context.Context, selp *action.SealedEnvelope) (context.Context, error) {
var actionCtx protocol.ActionCtx
var err error
Expand Down Expand Up @@ -294,6 +318,9 @@ func (ws *workingSet) freshAccountConversion(ctx context.Context, actCtx *protoc

// Commit persists all changes in RunActions() into the DB
func (ws *workingSet) Commit(ctx context.Context) error {
if err := ws.verifyParent(); err != nil {
return err
}
if err := protocolPreCommit(ctx, ws); err != nil {
return err
}
Expand All @@ -318,6 +345,11 @@ func (ws *workingSet) State(s interface{}, opts ...protocol.StateOption) (uint64
if cfg.Keys != nil {
return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
if ws.parent != nil {
if value, err := ws.parent.store.Get(cfg.Namespace, cfg.Key); err == nil {
return ws.height, state.Deserialize(s, value)
}
}
value, err := ws.store.Get(cfg.Namespace, cfg.Key)
if err != nil {
return ws.height, err
Expand All @@ -333,6 +365,13 @@ func (ws *workingSet) States(opts ...protocol.StateOption) (uint64, state.Iterat
if cfg.Key != nil {
return 0, nil, errors.Wrap(ErrNotSupported, "Read states with key option has not been implemented yet")
}
if ws.parent != nil {
if keys, values, err := ws.parent.store.States(cfg.Namespace, cfg.Keys); err == nil {
if iter, err := state.NewIterator(keys, values); err == nil {
return ws.height, iter, nil
}
}
}
keys, values, err := ws.store.States(cfg.Namespace, cfg.Keys)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -480,6 +519,9 @@ func (ws *workingSet) Process(ctx context.Context, actions []*action.SealedEnvel
}

func (ws *workingSet) processWithCorrectOrder(ctx context.Context, actions []*action.SealedEnvelope) error {
if err := ws.verifyParent(); err != nil {
return err
}
if err := ws.validate(ctx); err != nil {
return err
}
Expand Down

0 comments on commit 884ab14

Please sign in to comment.