Skip to content

Commit

Permalink
add dry-run and don't delete old files if last_used.zst is more recent
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Feb 18, 2025
1 parent b2ea492 commit 6eea385
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
35 changes: 27 additions & 8 deletions cmd/purger/purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func init() {
rootCmd.PersistentFlags().StringSlice("network", []string{"sol-mainnet"}, "specify one or more networks")
rootCmd.PersistentFlags().Bool("force", false, "force pruning (skip confirmation)")
rootCmd.PersistentFlags().Int("workers", 100, "number of parallel workers for delete operations")
rootCmd.PersistentFlags().Bool("dry-run", false, "run in dry-run, just printing files that would be deleted")

oldCmd.Flags().String("database-dsn", "postgres://localhost:5432/postgres?enable_incremental_sort=off&sslmode=disable", "Database DSN")
oldCmd.Flags().Uint64("max-age-days", 31, "max age of module caches to keep, in days")
Expand All @@ -74,20 +75,21 @@ func init() {
poisonedCmd.Flags().StringSlice("module-types", []string{"output", "state", "index"}, "Only modules of these types will be targeted for pruning")
}

func getGlobalParams(cmd *cobra.Command) (project string, networks []string, force bool, workers int, err error) {
func getGlobalParams(cmd *cobra.Command) (project string, networks []string, force bool, workers int, dryRun bool, err error) {
force = sflags.MustGetBool(cmd, "force")
networks = sflags.MustGetStringSlice(cmd, "network")
if networks == nil {
return "", nil, false, 0, fmt.Errorf("network is required (ex: eth-mainnet)")
return "", nil, false, 0, false, fmt.Errorf("network is required (ex: eth-mainnet)")
}
project = sflags.MustGetString(cmd, "project")
workers = sflags.MustGetInt(cmd, "workers")
dryRun = sflags.MustGetBool(cmd, "dry-run")
return
}

func runPrunePoisoned(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
project, networks, force, workers, err := getGlobalParams(cmd)
project, networks, force, workers, dryRun, err := getGlobalParams(cmd)
if err != nil {
return err
}
Expand Down Expand Up @@ -141,7 +143,7 @@ func runPrunePoisoned(cmd *cobra.Command, args []string) error {

for w := 1; w <= workers; w++ {
wg.Add(1)
go worker(ctx, &wg, jobs, logFile)
go worker(ctx, &wg, jobs, logFile, dryRun)
}

var lookupPaths []string
Expand Down Expand Up @@ -232,7 +234,7 @@ func runPrunePoisoned(cmd *cobra.Command, args []string) error {
func pruneOldE(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

project, networks, force, workers, err := getGlobalParams(cmd)
project, networks, force, workers, dryRun, err := getGlobalParams(cmd)
if err != nil {
return err
}
Expand All @@ -254,10 +256,12 @@ func pruneOldE(cmd *cobra.Command, args []string) error {
}
daemon := sflags.MustGetBool(cmd, "daemon")

cmd.SilenceUsage = true

for {
started := time.Now()
for _, network := range networks {
if err := runPruneOld(ctx, db, network, maxAgeDays, project, force, workers); err != nil {
if err := runPruneOld(ctx, db, network, maxAgeDays, project, force, workers, dryRun); err != nil {
return fmt.Errorf("pruning old files in %q: %w", network, err)
}
}
Expand All @@ -275,7 +279,7 @@ func pruneOldE(cmd *cobra.Command, args []string) error {
return nil
}

func runPruneOld(ctx context.Context, db *sqlx.DB, network string, maxAgeDays uint64, project string, force bool, workers int) error {
func runPruneOld(ctx context.Context, db *sqlx.DB, network string, maxAgeDays uint64, project string, force bool, workers int, dryRun bool) error {
zlog.Info("getting modules to purge... (this will take a few minutes)", zap.String("network", network), zap.Uint64("max_age_days", maxAgeDays))
modulesCache, err := datastore.ModulesToPurge(db, network, maxAgeDays)
if err != nil {
Expand Down Expand Up @@ -303,6 +307,21 @@ func runPruneOld(ctx context.Context, db *sqlx.DB, network string, maxAgeDays ui
}
relpath := fmt.Sprintf("%s/%s", m.Network, m.Subfolder)

// in case a substreams is "broken" and the module cannot proceed forward, but the old data is still used, we use the 'last_used.zst' file
// to determine that there are still active queries on it.
if strings.HasSuffix(relpath, "/outputs") ||
strings.HasSuffix(relpath, "/index") ||
strings.HasSuffix(relpath, "/states") {

last_used_file := strings.TrimSuffix(strings.TrimSuffix(strings.TrimSuffix(relpath, "/outputs"), "/index"), "/states") + "/last_used.zst"
if attrs, err := bucket.Object(last_used_file).Attrs(ctx); err == nil { // we IGNORE on error
if attrs.Updated.After(youngestDate) {
continue
}
}

}

fileCount := 0
filesToPurge := make([]string, 0)
var totalFileSize int64
Expand Down Expand Up @@ -350,7 +369,7 @@ func runPruneOld(ctx context.Context, db *sqlx.DB, network string, maxAgeDays ui
start := time.Now()
for w := 1; w <= workers; w++ {
wg.Add(1)
go worker(ctx, &wg, jobs, nil)
go worker(ctx, &wg, jobs, nil, dryRun)
}

for _, filePath := range filesToPurge {
Expand Down
5 changes: 4 additions & 1 deletion cmd/purger/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,17 @@ func listFiles(ctx context.Context, prefix string, bucket *storage.BucketHandle,
return nil
}

func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan job, logFile *os.File) {
func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan job, logFile *os.File, dryRun bool) {
defer wg.Done()
for j := range jobs {
if logFile != nil {
logFileLock.Lock()
logFile.WriteString(fmt.Sprintf("Deleting file %s\n", j.filePath))
logFileLock.Unlock()
}
if dryRun {
fmt.Println("dry run: skipping file", j.bucket, j.filePath)
}
err := deleteFile(ctx, j.filePath, j.bucket)
if err != nil {
zlog.Info("skipping failed file", zap.String("file", j.filePath), zap.Error(err))
Expand Down

0 comments on commit 6eea385

Please sign in to comment.