Skip to content

Commit

Permalink
add workers count to 100 by default
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Dec 6, 2024
1 parent bc18181 commit 80f468d
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions cmd/purger/purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func init() {
rootCmd.PersistentFlags().String("project", "dfuseio-global", "requester-pay project name")
rootCmd.PersistentFlags().StringSlice("network", []string{"sol-mainnet"}, "specify one or more networks")
rootCmd.PersistentFlags().Bool("force", false, "force pruning (skip confirmation)")
rootCmd.PersistentFlags().Int("workers", 100, "number of parallel workers for delete operations")

oldCmd.Flags().String("database-dsn", "postgres://localhost:5432/postgres?enable_incremental_sort=off&sslmode=disable", "Database DSN")
oldCmd.Flags().Uint64("max-age-days", 31, "max age of module caches to keep, in days")
Expand All @@ -68,19 +69,20 @@ func init() {
poisonedCmd.Flags().StringSlice("module-types", []string{"output", "state", "index"}, "Only modules of these types will be targeted for pruning")
}

func getGlobalParams(cmd *cobra.Command) (project string, networks []string, force bool, err error) {
func getGlobalParams(cmd *cobra.Command) (project string, networks []string, force bool, workers int, err error) {
force = sflags.MustGetBool(cmd, "force")
networks = sflags.MustGetStringSlice(cmd, "network")
if networks == nil {
return "", nil, false, fmt.Errorf("network is required (ex: eth-mainnet)")
return "", nil, false, 0, fmt.Errorf("network is required (ex: eth-mainnet)")
}
project = sflags.MustGetString(cmd, "project")
workers = sflags.MustGetInt(cmd, "workers")
return
}

func runPrunePoisoned(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
project, networks, force, err := getGlobalParams(cmd)
project, networks, force, workers, err := getGlobalParams(cmd)
if err != nil {
return err
}
Expand Down Expand Up @@ -125,7 +127,7 @@ func runPrunePoisoned(cmd *cobra.Command, args []string) error {
jobs := make(chan job, 1000)
var wg sync.WaitGroup

for w := 1; w <= 250; w++ {
for w := 1; w <= workers; w++ {
wg.Add(1)
go worker(ctx, &wg, jobs)
}
Expand Down Expand Up @@ -195,7 +197,7 @@ func runPrunePoisoned(cmd *cobra.Command, args []string) error {
func pruneOldE(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

project, networks, force, err := getGlobalParams(cmd)
project, networks, force, workers, err := getGlobalParams(cmd)
if err != nil {
return err
}
Expand All @@ -220,7 +222,7 @@ func pruneOldE(cmd *cobra.Command, args []string) error {
for {
started := time.Now()
for _, network := range networks {
if err := runPruneOld(ctx, db, network, maxAgeDays, project, force); err != nil {
if err := runPruneOld(ctx, db, network, maxAgeDays, project, force, workers); err != nil {
return fmt.Errorf("pruning old files in %q: %w", network, err)
}
}
Expand All @@ -238,7 +240,7 @@ func pruneOldE(cmd *cobra.Command, args []string) error {
return nil
}

func runPruneOld(ctx context.Context, db *sqlx.DB, network string, maxAgeDays uint64, project string, force bool) error {
func runPruneOld(ctx context.Context, db *sqlx.DB, network string, maxAgeDays uint64, project string, force bool, workers int) error {
zlog.Info("getting modules to purge... (this will take a few minutes)", zap.String("network", network), zap.Uint64("max_age_days", maxAgeDays))
modulesCache, err := datastore.ModulesToPurge(db, network, maxAgeDays)
if err != nil {
Expand Down Expand Up @@ -310,7 +312,7 @@ func runPruneOld(ctx context.Context, db *sqlx.DB, network string, maxAgeDays ui
var wg sync.WaitGroup

start := time.Now()
for w := 1; w <= 250; w++ {
for w := 1; w <= workers; w++ {
wg.Add(1)
go worker(ctx, &wg, jobs)
}
Expand Down

0 comments on commit 80f468d

Please sign in to comment.