Skip to content

Commit

Permalink
speed up modules lookup for stores if the last few snapshots before s…
Browse files Browse the repository at this point in the history
…tart-block are present
  • Loading branch information
sduchesneau committed Feb 3, 2025
1 parent ad1f022 commit 709e85f
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 21 deletions.
46 changes: 42 additions & 4 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stage

import (
"context"
"errors"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -517,20 +518,57 @@ func (s *Stages) previousUnitComplete(u Unit) bool {
return state == UnitCompleted || state == UnitNoOp
}

type loadedStore struct {
name string
kv *store.FullKV
err error
}

func (s *Stages) FinalStoreMap(exclusiveEndBlock uint64) (store.Map, error) {
out := store.NewMap()

var storeModuleStates []*StoreModuleState
for _, stage := range s.stages {
if stage.kind != KindStore {
continue
}
for _, modState := range stage.storeModuleStates {
storeModuleStates = append(storeModuleStates, modState)
}
}

out := store.NewMap()
if len(storeModuleStates) == 0 {
return out, nil
}

loadingChan := make(chan loadedStore, len(storeModuleStates))
for _, modState := range storeModuleStates {
modState := modState
go func() {
fullKV, err := modState.getStore(s.ctx, exclusiveEndBlock)
if err != nil {
return nil, fmt.Errorf("stores didn't sync up properly, expected store %q to be at block %d but was at %d: %w", modState.name, exclusiveEndBlock, modState.lastBlockInStore, err)
loadingChan <- loadedStore{
name: modState.name,
kv: fullKV,
err: err,
}
out[modState.name] = fullKV
}()
}

var errs error
for loaded := range loadingChan {
if loaded.err != nil {
errs = errors.Join(errs, fmt.Errorf("while loading %s: %w", loaded.name, loaded.err))
continue
}
out[loaded.name] = loaded.kv
if len(out) == len(storeModuleStates) {
close(loadingChan)
}
}
if errs != nil {
return nil, errs
}

return out, nil
}

Expand Down
6 changes: 4 additions & 2 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
_ "github.com/streamingfast/substreams/wasm/wazero"
)

const testSegmentSize = uint64(100)

func TestPipeline_runExecutor(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -187,7 +189,7 @@ func testConfigMap(t *testing.T, configs []testStoreConfig) store2.ConfigMap {
objStore := dstore.NewMockStore(nil)

for _, conf := range configs {
newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore)
newStore, err := store2.NewConfig(conf.name, conf.initBlock, testSegmentSize, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore)
require.NoError(t, err)
confMap[newStore.Name()] = newStore

Expand Down Expand Up @@ -220,7 +222,7 @@ func withTestRequest(t *testing.T, outputModule string, startBlock uint64) conte
func() (uint64, error) { return 0, nil },
newTestCursorResolver().resolveCursor,
func() (uint64, error) { return 0, nil },
100,
testSegmentSize,
)
require.NoError(t, err)
return reqctx.WithRequest(context.Background(), req)
Expand Down
2 changes: 1 addition & 1 deletion service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
return fmt.Errorf("new config map: %w", err)
}

storeConfigs, err := store.NewConfigMap(cacheStore, execGraph.Stores(), execGraph.ModuleHashes(), chainFirstStreamableBlock)
storeConfigs, err := store.NewConfigMap(cacheStore, execGraph.Stores(), execGraph.ModuleHashes(), chainFirstStreamableBlock, segmentSize)
if err != nil {
return fmt.Errorf("configuring stores: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
return fmt.Errorf("new config map: %w", err)
}

storeConfigs, err := store.NewConfigMap(cacheStore, execGraph.Stores(), execGraph.ModuleHashes(), request.FirstStreamableBlock)
storeConfigs, err := store.NewConfigMap(cacheStore, execGraph.Stores(), execGraph.ModuleHashes(), request.FirstStreamableBlock, request.SegmentSize)
if err != nil {
return fmt.Errorf("configuring stores: %w", err)
}
Expand Down
66 changes: 64 additions & 2 deletions storage/store/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"
"fmt"
"runtime/trace"

"github.com/streamingfast/derr"
"github.com/streamingfast/dstore"
Expand All @@ -25,6 +26,7 @@ type Config struct {
updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy
valueType string

segmentSize uint64
appendLimit uint64
totalSizeLimit uint64
itemSizeLimit uint64
Expand All @@ -33,6 +35,7 @@ type Config struct {
func NewConfig(
name string,
moduleInitialBlock uint64,
segmentSize uint64,
moduleHash string,
updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy,
valueType string,
Expand All @@ -55,6 +58,7 @@ func NewConfig(
outputsStore: outputsStore,
moduleInitialBlock: moduleInitialBlock,
moduleHash: moduleHash,
segmentSize: segmentSize,
appendLimit: 8_388_608, // 8MiB = 8 * 1024 * 1024,
totalSizeLimit: 1_073_741_824, // 1GiB
itemSizeLimit: 10_485_760, // 10MiB
Expand Down Expand Up @@ -130,12 +134,67 @@ func (c *Config) FileSize(ctx context.Context, fileInfo *FileInfo) (int64, error
return size, nil
}

func (c *Config) lowestAlignedBoundary() uint64 {
lowestBoundary := c.moduleInitialBlock / c.segmentSize * c.segmentSize
if lowestBoundary < c.moduleInitialBlock {
lowestBoundary += c.segmentSize
}
return lowestBoundary
}

func (c *Config) optimisticGetHighestFullSnapshotFile(ctx context.Context, upTo uint64) *FileInfo {
lowestAlignedBoundary := c.lowestAlignedBoundary()
if upTo <= (lowestAlignedBoundary + (c.segmentSize * 1000)) {
// below 1000 files we don't bother with this optimisation
return nil
}

lowestLookupBlock := upTo - c.segmentSize*10 // look for an existing 'fullKV snapshot' in the last 10 segments to skip the full walk

var highest *FileInfo
if err := derr.RetryContext(ctx, 3, func(ctx context.Context) error {
return c.objStore.WalkFrom(ctx, "", fmt.Sprintf("%010d", lowestLookupBlock), func(filename string) error {
fileInfo, ok := parseFileName(c.Name(), filename)
if !ok || fileInfo.Partial {
return nil
}

if fileInfo.Range.ExclusiveEndBlock > upTo {
return dstore.StopIteration
}
// Walk is always in ascending order
highest = fileInfo
return nil
})
}); err != nil {
return nil
}

return highest
}

func (c *Config) ListSnapshotFiles(ctx context.Context, below uint64) (files []*FileInfo, err error) {
logger := logging.Logger(ctx, zlog)
if below == 0 {
if trace.IsEnabled() {
logger.Debug("no files to list", zap.String("module_hash", c.moduleHash))
}
return nil, nil
}

logger := logging.Logger(ctx, zlog)
if highestFile := c.optimisticGetHighestFullSnapshotFile(ctx, below); highestFile != nil {
if trace.IsEnabled() {
logger.Debug("found a store fullKV file close to head, optimistically assuming existence of previous segments", zap.String("module_hash", c.moduleHash), zap.String("filename", highestFile.Filename))
}
lowestAlignedBoundary := c.lowestAlignedBoundary()
var files []*FileInfo
for i := lowestAlignedBoundary; i <= below; i += c.segmentSize {
fileInfo := NewCompleteFileInfo(c.Name(), c.ModuleInitialBlock(), i)
files = append(files, fileInfo)
}
return files, nil
}

err = derr.RetryContext(ctx, 3, func(ctx context.Context) error {
// We need to clear each time we start because a previous retry could have accumulated a partial state
files = nil
Expand All @@ -160,7 +219,10 @@ func (c *Config) ListSnapshotFiles(ctx context.Context, below uint64) (files []*
return nil
}

if fileInfo.Range.StartBlock >= below {
if fileInfo.Partial && fileInfo.Range.StartBlock > below {
return dstore.StopIteration
}
if !fileInfo.Partial && fileInfo.Range.ExclusiveEndBlock > below {
return dstore.StopIteration
}

Expand Down
50 changes: 49 additions & 1 deletion storage/store/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestConfig_ListSnapshotFiles(t *testing.T) {
return nil
}

c := &Config{objStore: testStore}
c := &Config{objStore: testStore, segmentSize: 1000}

files, err := c.ListSnapshotFiles(context.Background(), 10000)
require.NoError(t, err)
Expand All @@ -51,3 +51,51 @@ func TestConfig_ListSnapshotFiles(t *testing.T) {

assert.Equal(t, expectedFiles, actualFiles)
}

func TestLowestAlignedBoundary(t *testing.T) {
tests := []struct {
name string
moduleInitialBlock uint64
segmentSize uint64
expected uint64
}{
{
name: "aligned initial block",
moduleInitialBlock: 1000,
segmentSize: 100,
expected: 1000,
},
{
name: "unaligned initial block",
moduleInitialBlock: 1234,
segmentSize: 100,
expected: 1300,
},
{
name: "initial block zero",
moduleInitialBlock: 0,
segmentSize: 100,
expected: 0,
},
{
name: "large segment size",
moduleInitialBlock: 5000,
segmentSize: 10000,
expected: 10000,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
moduleInitialBlock: tt.moduleInitialBlock,
segmentSize: tt.segmentSize,
}

got := cfg.lowestAlignedBoundary()
if got != tt.expected {
t.Errorf("lowestAlignedBoundary() = %v, want %v", got, tt.expected)
}
})
}
}
3 changes: 2 additions & 1 deletion storage/store/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type ConfigMap map[string]*Config

func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, firstStreamableBlock uint64) (out ConfigMap, err error) {
func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, firstStreamableBlock uint64, segmentSize uint64) (out ConfigMap, err error) {
out = make(ConfigMap)
for _, storeModule := range storeModules {
initialBlock := storeModule.InitialBlock
Expand All @@ -20,6 +20,7 @@ func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Mod
c, err := NewConfig(
storeModule.Name,
initialBlock,
segmentSize,
moduleHashes.Get(storeModule.Name),
storeModule.GetKindStore().UpdatePolicy,
storeModule.GetKindStore().ValueType,
Expand Down
4 changes: 3 additions & 1 deletion storage/store/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/stretchr/testify/require"
)

const testSegmentSize = uint64(100)

func newTestBaseStore(
t require.TestingT,
updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy,
Expand All @@ -25,7 +27,7 @@ func newTestBaseStore(
appendLimit = 10
}

config, err := NewConfig("test", 0, "test.module.hash", updatePolicy, valueType, store)
config, err := NewConfig("test", 0, testSegmentSize, "test.module.hash", updatePolicy, valueType, store)
config.appendLimit = appendLimit
config.totalSizeLimit = 9999
config.itemSizeLimit = 10_485_760
Expand Down
3 changes: 3 additions & 0 deletions tools/analytics_store_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dstore"
"github.com/streamingfast/substreams/block"
"github.com/streamingfast/substreams/manifest"
Expand All @@ -29,6 +30,7 @@ var analyticsStoreStatsCmd = &cobra.Command{

func init() {
analyticsCmd.AddCommand(analyticsStoreStatsCmd)
analyticsStoreStatsCmd.Flags().Uint64("segment-size", 1000, "number of blocks in each state file")
}

var ErrEmptyStore = errors.New("store is empty")
Expand Down Expand Up @@ -111,6 +113,7 @@ func StoreStatsE(cmd *cobra.Command, args []string) error {
conf, err := store.NewConfig(
module.Name,
module.InitialBlock,
sflags.MustGetUint64(cmd, "segment-size"),
hash,
module.GetKind().(*pbsubstreams.Module_KindStore_).KindStore.UpdatePolicy,
module.GetKind().(*pbsubstreams.Module_KindStore_).KindStore.ValueType,
Expand Down
8 changes: 5 additions & 3 deletions tools/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"

"github.com/streamingfast/cli/sflags"
store2 "github.com/streamingfast/substreams/storage/store"
"go.uber.org/zap"

Expand All @@ -22,12 +23,13 @@ var checkCmd = &cobra.Command{

func init() {
Cmd.AddCommand(checkCmd)
checkCmd.Flags().Uint64("segment-size", 1000, "number of blocks in each state file")
}

func checkE(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

stateStore, _, err := newStore(args[0])
stateStore, _, err := newStore(args[0], sflags.MustGetUint64(cmd, "segment-size"))
if err != nil {
return fmt.Errorf("failed to create store: %w", err)
}
Expand Down Expand Up @@ -58,13 +60,13 @@ func checkE(cmd *cobra.Command, args []string) error {
return err
}

func newStore(storeURL string) (*store2.FullKV, dstore.Store, error) {
func newStore(storeURL string, segmentSize uint64) (*store2.FullKV, dstore.Store, error) {
remoteStore, err := dstore.NewStore(storeURL, "zst", "zstd", false)
if err != nil {
return nil, nil, fmt.Errorf("could not create store from %s: %w", storeURL, err)
}

config, err := store2.NewConfig("", 0, "", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_IF_NOT_EXISTS, "", remoteStore)
config, err := store2.NewConfig("", 0, segmentSize, "", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_IF_NOT_EXISTS, "", remoteStore)
if err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit 709e85f

Please sign in to comment.