Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(splitstore): merry christmas lotus! Remove ~120 G from lotus datastore #12803

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

# UNRELEASED

- Sync snapshots directly to hotstore. Save 120 G of lotus disk space. ([filecoin-project/lotus#12800](https://github.com/filecoin-project/lotus/pull/12803))
- Add json output of tipsets to `louts chain list`. ([filecoin-project/lotus#12691](https://github.com/filecoin-project/lotus/pull/12691))
- Remove IPNI advertisement relay over pubsub via Lotus node as it now has been deprecated. ([filecoin-project/lotus#12768](https://github.com/filecoin-project/lotus/pull/12768)
- During a network upgrade, log migration progress every 2 seconds so they are more helpful and informative. The `LOTUS_MIGRATE_PROGRESS_LOG_SECONDS` environment variable can be used to change this if needed. ([filecoin-project/lotus#12732](https://github.com/filecoin-project/lotus/pull/12732))
Expand Down
9 changes: 9 additions & 0 deletions blockstore/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,12 @@ func (m MemBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
func (m MemBlockstore) HashOnRead(enabled bool) {
// no-op
}

func (m MemBlockstore) ForEachKey(f func(c cid.Cid) error) error {
for _, b := range m {
if err := f(b.Cid()); err != nil {
return err
}
}
return nil
}
5 changes: 5 additions & 0 deletions blockstore/splitstore/markset_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func NewMapMarkSetEnv(path string) (*MapMarkSetEnv, error) {

func (e *MapMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) {
path := filepath.Join(e.path, name)
// Limit map size to 1k if sizeHint exceeds this value
// This prevents accidental hanging when sizeHint is too large
if sizeHint > 1000 {
sizeHint = 1000
}
return &MapMarkSet{
set: make(map[string]struct{}, sizeHint),
path: path,
Expand Down
4 changes: 2 additions & 2 deletions blockstore/splitstore/splitstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func testSplitStore(t *testing.T, cfg *Config) {
hotCnt := countBlocks(hot)

if coldCnt != 2 {
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
t.Fatalf("expected %d blocks, but got %d", 2, coldCnt)
}

if hotCnt != 12 {
t.Errorf("expected %d blocks, but got %d", 12, hotCnt)
t.Fatalf("expected %d blocks, but got %d", 12, hotCnt)
}

// trigger a compaction
Expand Down
9 changes: 6 additions & 3 deletions blockstore/splitstore/splitstore_warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
start := time.Now()

err := s.doWarmup(curTs)

if err != nil {
log.Errorf("error warming up hotstore: %s", err)
return
Expand All @@ -47,9 +48,11 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
return nil
}

// the actual warmup procedure; it walks the chain loading all state roots at the boundary
// and headers all the way up to genesis.
// objects are written in batches so as to minimize overhead.
// the full warmup procedure
// Most of this is unnecessary in the common case where we are starting from a snapshot
// since snapshots are now loaded directly to the hotstore. However this is safe and robust,
// handling all cases where we are transition from a universal setup to a splitstore setup.
// And warmup costs are only paid on init so it is not
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
var boundaryEpoch abi.ChainEpoch
epoch := curTs.Height()
Expand Down
6 changes: 6 additions & 0 deletions blockstore/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,9 @@ func (m *SyncBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error
func (m *SyncBlockstore) HashOnRead(enabled bool) {
// noop
}

func (m *SyncBlockstore) ForEachKey(f func(c cid.Cid) error) error {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.ForEachKey(f)
}
2 changes: 1 addition & 1 deletion chain/gen/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
return nil, xerrors.Errorf("failed to get metadata datastore: %w", err)
}

bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func BenchmarkGetRandomness(b *testing.B) {
b.Fatal(err)
}

bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
if err != nil {
b.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ var chainBalanceStateCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down Expand Up @@ -721,7 +721,7 @@ var chainPledgeCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return xerrors.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/deal-label.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var dealLabelCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var diffMinerStates = &cli.Command{
_ = lkrepo.Close()
}(lkrepo)

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/export-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var exportCarCmd = &cli.Command{

lr, err := r.Lock(repo.FullNode)
if err == nil {
bs, err = lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err = lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var exportChainCmd = &cli.Command{

defer fi.Close() //nolint:errcheck

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/gas-estimation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var gasTraceCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ var replayOfflineCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/import-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var importCarCmd = &cli.Command{
return xerrors.Errorf("opening the car file: %w", err)
}

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return err
}
Expand Down Expand Up @@ -118,7 +118,7 @@ var importObjectCmd = &cli.Command{
}
defer lr.Close() //nolint:errcheck

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/invariants.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var invariantsCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
cold, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open universal blockstore %w", err)
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/lotus-shed/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,15 @@ var migrationsCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
cold, closer, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open universal blockstore %w", err)
}
defer func() {
if err := closer(); err != nil {
log.Warnf("failed to close universal blockstore: %s", err)
}
}()

path, err := lkrepo.SplitstorePath()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/miner-peerid.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var minerPeeridCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/miner-types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var minerTypesCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/msig.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var multisigGetAllCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
69 changes: 69 additions & 0 deletions cmd/lotus-shed/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -31,6 +32,7 @@ var splitstoreCmd = &cli.Command{
splitstoreClearCmd,
splitstoreCheckCmd,
splitstoreInfoCmd,
splitstoreRepoInfoCmd,
},
}

Expand Down Expand Up @@ -353,6 +355,40 @@ func deleteSplitstoreKeys(lr repo.LockedRepo) error {
return nil
}

func printSplitstoreKeys(lr repo.LockedRepo) error {
ds, err := lr.Datastore(context.TODO(), "/metadata")
if err != nil {
return xerrors.Errorf("error opening datastore: %w", err)
}
if closer, ok := ds.(io.Closer); ok {
defer closer.Close() //nolint
}

res, err := ds.Query(context.Background(), query.Query{Prefix: "/splitstore"})
if err != nil {
return xerrors.Errorf("error querying datastore for splitstore keys: %w", err)
}

fmt.Println("Splitstore keys and values:")
for r := range res.Next() {
if r.Error != nil {
return xerrors.Errorf("datastore query error: %w", r.Error)
}

// Get the value for this key
value, err := ds.Get(context.Background(), datastore.NewKey(r.Key))
if err != nil {
return xerrors.Errorf("error getting value for key %s: %w", r.Key, err)
}

// Decode the value as a uvarint
decoded, _ := binary.Uvarint(value)
fmt.Printf(" %s: %d\n", r.Key, decoded)
}

return nil
}

// badger logging through go-log
type badgerLogger struct {
*zap.SugaredLogger
Expand All @@ -378,6 +414,39 @@ var splitstoreCheckCmd = &cli.Command{
},
}

var splitstoreRepoInfoCmd = &cli.Command{
Name: "splitstore-info",
Usage: "Display splitstore metadata information",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
Usage: "lotus repo path",
},
},
Action: func(cctx *cli.Context) error {
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("error opening fs repo: %w", err)
}

exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}

lr, err := r.Lock(repo.FullNode)
if err != nil {
return xerrors.Errorf("error locking repo: %w", err)
}

return printSplitstoreKeys(lr)
},
}

var splitstoreInfoCmd = &cli.Command{
Name: "info",
Description: "prints some basic splitstore information",
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/state-stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error)
return nil, err
}

cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
cold, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return nil, xerrors.Errorf("failed to open universal blockstore %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/terminations.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var terminationsCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-sim/simulation/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewNode(ctx context.Context, r repo.Repo) (nd *Node, _err error) {
}
}()

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading