From b81a601a86e0e6e7a1a13a37c1b8e2a5db4af65e Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Fri, 12 Aug 2022 12:10:19 +0200 Subject: [PATCH 1/9] Omit blocked skylinks when rebuilding the skyd client cache. --- database/skylink.go | 10 ++++++++++ skyd/cache.go | 34 +++++++++++++++++++++++++++++++--- skyd/mock.go | 7 ++++++- skyd/skyd.go | 27 ++++++++++++++++++++------- sweeper/sweeper.go | 2 +- workers/scanner.go | 20 +++++++++++++++----- 6 files changed, 83 insertions(+), 17 deletions(-) diff --git a/database/skylink.go b/database/skylink.go index 47a5f28..60dd823 100644 --- a/database/skylink.go +++ b/database/skylink.go @@ -73,6 +73,16 @@ func (db *DB) CreateSkylink(ctx context.Context, skylink skymodules.Skylink, ser return s, nil } +// DeleteSkylink removes the skylink from the database. +func (db *DB) DeleteSkylink(ctx context.Context, skylink skymodules.Skylink) error { + filter := bson.M{"skylink": skylink.String()} + dr, err := db.staticDB.Collection(collSkylinks).DeleteOne(ctx, filter) + if err == mongo.ErrNoDocuments || dr.DeletedCount == 0 { + return ErrSkylinkNotExist + } + return nil +} + // FindSkylink fetches a skylink from the DB. func (db *DB) FindSkylink(ctx context.Context, skylink skymodules.Skylink) (Skylink, error) { sr := db.staticDB.Collection(collSkylinks).FindOne(ctx, bson.M{"skylink": skylink.String()}) diff --git a/skyd/cache.go b/skyd/cache.go index 6bc0ac2..5579dd1 100644 --- a/skyd/cache.go +++ b/skyd/cache.go @@ -1,6 +1,7 @@ package skyd import ( + "bytes" "fmt" "sync" @@ -92,13 +93,13 @@ func (psc *PinnedSkylinksCache) Diff(sls []string) (unknown []string, missing [] // rebuilding happens in a goroutine, allowing the method to return a channel // on which the caller can either wait or select. The caller can check whether // the rebuild was successful by calling Error(). -func (psc *PinnedSkylinksCache) Rebuild(skydClient Client) RebuildCacheResult { +func (psc *PinnedSkylinksCache) Rebuild(skydClient Client, omitBlockedSkylinks bool) RebuildCacheResult { psc.mu.Lock() defer psc.mu.Unlock() if !psc.isRebuildInProgress() { psc.result = NewRebuildCacheResult() // Kick off the actual rebuild in a separate goroutine. - go psc.threadedRebuild(skydClient) + go psc.threadedRebuild(skydClient, omitBlockedSkylinks) } return *psc.result } @@ -121,7 +122,7 @@ func (psc *PinnedSkylinksCache) isRebuildInProgress() bool { // threadedRebuild performs the actual cache rebuild process. It reports any // errors by setting the psc.err variable and it always closes the rebuildCh on // exit. -func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client) { +func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client, omitBlockedSkylinks bool) { var err error // Ensure that we properly wrap up the rebuild process. defer func() { @@ -163,6 +164,33 @@ func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client) { } } + // If omitBlockedSkylinks is true we'll check all skylinks against the list + // of blocked skylinks in skyd and we'll remove the blocked ones from the + // cache. + if omitBlockedSkylinks { + blocklist, err := skydClient.Blocklist() + if err != nil { + err = errors.AddContext(err, "failed to fetch blocklist") + return + } + var sl skymodules.Skylink + for s := range sls { + err = sl.LoadString(s) + if err != nil { + // This is an invalid skylink, remove it. + delete(sls, s) + } + for _, bl := range blocklist.Blocklist { + mr := sl.MerkleRoot() + if bytes.Equal(bl[:], mr[:]) { + // This skylink is blocked. Remove it from the list. + delete(sls, s) + break + } + } + } + } + // Update the cache. psc.mu.Lock() psc.skylinks = sls diff --git a/skyd/mock.go b/skyd/mock.go index 6877dfb..8fa892e 100644 --- a/skyd/mock.go +++ b/skyd/mock.go @@ -42,6 +42,11 @@ func NewSkydClientMock() *ClientMock { } } +// Blocklist gets the list of blocked skylinks. Noop. +func (c *ClientMock) Blocklist() (blocklist api.SkynetBlocklistGET, err error) { + return api.SkynetBlocklistGET{}, nil +} + // ContractData returns the total data from Active and Passive contracts. func (c *ClientMock) ContractData() (uint64, error) { c.mu.Lock() @@ -128,7 +133,7 @@ func (c *ClientMock) Pin(skylink string) (skymodules.SiaPath, error) { } // RebuildCache is a noop mock that takes at least 100ms. -func (c *ClientMock) RebuildCache() RebuildCacheResult { +func (c *ClientMock) RebuildCache(_ bool) RebuildCacheResult { closedCh := make(chan struct{}) close(closedCh) // Do some work. There are tests which rely on this value being above 50ms. diff --git a/skyd/skyd.go b/skyd/skyd.go index 33256f6..430e587 100644 --- a/skyd/skyd.go +++ b/skyd/skyd.go @@ -17,11 +17,16 @@ var ( // ErrSkylinkAlreadyPinned is returned when the skylink we're trying to pin // is already pinned. ErrSkylinkAlreadyPinned = errors.New("skylink already pinned") + // ErrSkylinkIsBlocked is returned when the skylinks we're trying to pin is + // blocked by skyd. + ErrSkylinkIsBlocked = errors.New("skylink is blocked") ) type ( // Client describes the interface exposed by client. Client interface { + // Blocklist gets the list of blocked skylinks. + Blocklist() (blocklist api.SkynetBlocklistGET, err error) // ContractData returns the total data from Active and Passive contracts. ContractData() (uint64, error) // DiffPinnedSkylinks returns two lists of skylinks - the ones that @@ -36,7 +41,7 @@ type ( // Pin instructs the local skyd to pin the given skylink. Pin(skylink string) (skymodules.SiaPath, error) // RebuildCache rebuilds the cache of skylinks pinned by the local skyd. - RebuildCache() RebuildCacheResult + RebuildCache(omitBlockedSkylinks bool) RebuildCacheResult // RenterDirRootGet is a direct proxy to the skyd client method with the // same name. RenterDirRootGet(siaPath skymodules.SiaPath) (rd api.RenterDirectory, err error) @@ -70,6 +75,11 @@ func NewClient(host, port, password string, cache *PinnedSkylinksCache, logger l } } +// Blocklist gets the list of blocked skylinks. +func (c *client) Blocklist() (blocklist api.SkynetBlocklistGET, err error) { + return c.staticClient.SkynetBlocklistGet() +} + // ContractData returns the total data from Active and Passive contracts. func (c *client) ContractData() (uint64, error) { rcs, err := c.staticClient.RenterContractsGet() @@ -133,20 +143,23 @@ func (c *client) Pin(skylink string) (skymodules.SiaPath, error) { return skymodules.SiaPath{}, ErrSkylinkAlreadyPinned } sp, err := c.staticClient.SkynetSkylinkPinLazyPost(skylink) + if err != nil && strings.Contains(err.Error(), "failed to pin file to skynet: skylink is blocked") { + return skymodules.SiaPath{}, ErrSkylinkIsBlocked + } if err == nil || errors.Contains(err, ErrSkylinkAlreadyPinned) { c.staticSkylinksCache.Add(skylink) } return sp, err } -// RebuildCache rebuilds the cache of skylinks pinned by the local skyd. The -// rebuilding happens in a goroutine, allowing the method to return a channel -// on which the caller can either wait or select. The caller can check whether -// the rebuild was successful by calling Error(). -func (c *client) RebuildCache() RebuildCacheResult { +// RebuildCache rebuilds the cache of skylinks pinned by the local skyd. This +// excludes blocked skylinks. The rebuilding happens in a goroutine, allowing +// the method to return a channel on which the caller can either wait or select. +// The caller can check whether the rebuild was successful by calling Error(). +func (c *client) RebuildCache(omitBlockedSkylinks bool) RebuildCacheResult { c.staticLogger.Trace("Entering RebuildCache") defer c.staticLogger.Trace("Exiting RebuildCache") - return c.staticSkylinksCache.Rebuild(c) + return c.staticSkylinksCache.Rebuild(c, omitBlockedSkylinks) } // RenterDirRootGet is a direct proxy to skyd client's method. diff --git a/sweeper/sweeper.go b/sweeper/sweeper.go index 00e22d2..407ae80 100644 --- a/sweeper/sweeper.go +++ b/sweeper/sweeper.go @@ -83,7 +83,7 @@ func (s *Sweeper) threadedPerformSweep() { // Kick off a skyd client cache rebuild. That happens in a separate // goroutine. We'll block on the result channel only after we're done with // the other tasks we can do while waiting. - res := s.staticSkydClient.RebuildCache() + res := s.staticSkydClient.RebuildCache(true) // We use an independent context because we are not strictly bound to a // specific API call. Also, this operation can take a significant amount of diff --git a/workers/scanner.go b/workers/scanner.go index 19967d2..d8bce5c 100644 --- a/workers/scanner.go +++ b/workers/scanner.go @@ -213,7 +213,7 @@ func (s *Scanner) threadedScanAndPin() { // Perform a scan: // Rebuild the cache and watch for service shutdown while doing that. - res := s.staticSkydClient.RebuildCache() + res := s.staticSkydClient.RebuildCache(true) select { case <-s.staticTG.StopChan(): return @@ -301,6 +301,7 @@ func (s *Scanner) managedPinUnderpinnedSkylinks() { s.staticWaitUntilHealthy(skylink, sp) continue } + s.staticLogger.Trace(err) // In case of error we still want to sleep for a moment in order to // avoid a tight(ish) loop of errors when we either fail to pin or // fail to mark as pinned. Note that this only happens when we want @@ -327,7 +328,9 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S minPinners := s.minPinners s.mu.Unlock() - sl, err := s.staticDB.FindAndLockUnderpinned(context.TODO(), s.staticServerName, minPinners) + ctx := context.TODO() + + sl, err := s.staticDB.FindAndLockUnderpinned(ctx, s.staticServerName, minPinners) if database.IsNoSkylinksNeedPinning(err) { return skymodules.Skylink{}, skymodules.SiaPath{}, false, err } @@ -336,7 +339,7 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S return skymodules.Skylink{}, skymodules.SiaPath{}, false, err } defer func() { - err = s.staticDB.UnlockSkylink(context.TODO(), sl, s.staticServerName) + err = s.staticDB.UnlockSkylink(ctx, sl, s.staticServerName) if err != nil { s.staticLogger.Debug(errors.AddContext(err, "failed to unlock skylink after trying to pin it")) } @@ -352,12 +355,19 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S if errors.Contains(err, skyd.ErrSkylinkAlreadyPinned) { s.staticLogger.Info(err) // The skylink is already pinned locally but it's not marked as such. - err = s.staticDB.AddServerForSkylinks(context.TODO(), []string{sl.String()}, s.staticServerName, false) + err = s.staticDB.AddServerForSkylinks(ctx, []string{sl.String()}, s.staticServerName, false) if err != nil { s.staticLogger.Debug(errors.AddContext(err, "failed to mark as pinned by this server")) } return skymodules.Skylink{}, skymodules.SiaPath{}, true, err } + if errors.Contains(err, skyd.ErrSkylinkIsBlocked) { + s.staticLogger.Info(err) + // The skylink is blocked by skyd. We'll remove it from the database, so + // no other server will try to repin it. + err = s.staticDB.DeleteSkylink(ctx, sl) + return skymodules.Skylink{}, skymodules.SiaPath{}, true, err + } if err != nil && (strings.Contains(err.Error(), "API authentication failed.") || strings.Contains(err.Error(), "connect: connection refused")) { err = errors.AddContext(err, fmt.Sprintf("unrecoverable error while pinning '%s'", sl)) @@ -371,7 +381,7 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S return skymodules.Skylink{}, skymodules.SiaPath{}, true, err } s.staticLogger.Infof("Successfully pinned '%s'", sl) - err = s.staticDB.AddServerForSkylinks(context.TODO(), []string{sl.String()}, s.staticServerName, false) + err = s.staticDB.AddServerForSkylinks(ctx, []string{sl.String()}, s.staticServerName, false) if err != nil { s.staticLogger.Debug(errors.AddContext(err, "failed to mark as pinned by this server")) } From 555125fa2d11c9db6fb2142328dbe733e634edd1 Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Fri, 12 Aug 2022 13:07:36 +0200 Subject: [PATCH 2/9] Add a mechanism for skipping certain skylinks (blocked, causing errors, etc.) during the current scan. We'll retry them on the next scan. Add a test for the new feature. --- database/skylink.go | 4 +++- skyd/cache_test.go | 2 +- test/database/skylink_test.go | 27 ++++++++++++++++----------- workers/scanner.go | 11 ++++++++--- 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/database/skylink.go b/database/skylink.go index 60dd823..ef654a2 100644 --- a/database/skylink.go +++ b/database/skylink.go @@ -196,7 +196,7 @@ func (db *DB) RemoveServerFromSkylinks(ctx context.Context, skylinks []string, s // { "lock_expires" : { "$lt": new Date() }} // ] // }) -func (db *DB) FindAndLockUnderpinned(ctx context.Context, server string, minPinners int) (skymodules.Skylink, error) { +func (db *DB) FindAndLockUnderpinned(ctx context.Context, server string, skipSkylinks []string, minPinners int) (skymodules.Skylink, error) { filter := bson.M{ // We use pinned != false because pinned == true is the default but it's // possible that we've missed setting that somewhere. @@ -205,6 +205,8 @@ func (db *DB) FindAndLockUnderpinned(ctx context.Context, server string, minPinn "$expr": bson.M{"$lt": bson.A{bson.M{"$size": "$servers"}, minPinners}}, // Not pinned by the given server. "servers": bson.M{"$nin": bson.A{server}}, + // Skip all skylinks in the skipSkylinks list. + "skylink": bson.M{"$nin": bson.A{skipSkylinks}}, // Unlocked. "$or": bson.A{ bson.M{"lock_expires": bson.M{"$exists": false}}, diff --git a/skyd/cache_test.go b/skyd/cache_test.go index 257ee04..59f61c9 100644 --- a/skyd/cache_test.go +++ b/skyd/cache_test.go @@ -55,7 +55,7 @@ func TestCacheRebuild(t *testing.T) { c.Add(sl) skyd := NewSkydClientMock() sls := skyd.MockFilesystem() - rr := c.Rebuild(skyd) + rr := c.Rebuild(skyd, true) // Wait for the rebuild to finish. <-rr.ErrAvail if rr.ExternErr != nil { diff --git a/test/database/skylink_test.go b/test/database/skylink_test.go index 122b6bc..a078b69 100644 --- a/test/database/skylink_test.go +++ b/test/database/skylink_test.go @@ -209,7 +209,7 @@ func TestFindAndLock(t *testing.T) { cfg.MinPinners = 1 // Try to fetch an underpinned skylink, expect none to be found. - _, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners) + _, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners) if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) { t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err) } @@ -219,7 +219,7 @@ func TestFindAndLock(t *testing.T) { t.Fatal(err) } // Try to fetch an underpinned skylink, expect none to be found. - _, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners) + _, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners) if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) { t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err) } @@ -228,8 +228,13 @@ func TestFindAndLock(t *testing.T) { if err != nil { t.Fatal(err) } + // Try to fetch underpinned skylinks but skip sl. Expect none to be found. + _, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{sl.String()}, cfg.MinPinners) + if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) { + t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err) + } // Try to fetch an underpinned skylink, expect to find one. - underpinned, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners) + underpinned, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners) if err != nil { t.Fatal(err) } @@ -239,7 +244,7 @@ func TestFindAndLock(t *testing.T) { // Try to fetch an underpinned skylink from the name of a different server. // Expect to find none because the one we got before is now locked and // shouldn't be returned. - _, err = db.FindAndLockUnderpinned(ctx, "different server", cfg.MinPinners) + _, err = db.FindAndLockUnderpinned(ctx, "different server", []string{}, cfg.MinPinners) if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) { t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err) } @@ -253,7 +258,7 @@ func TestFindAndLock(t *testing.T) { t.Fatal(err) } // Try to fetch an underpinned skylink, expect none to be found. - _, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners) + _, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners) if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) { t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err) } @@ -267,13 +272,13 @@ func TestFindAndLock(t *testing.T) { // Try to fetch an underpinned skylink, expect none to be found. // Out test skylink is underpinned but it's pinned by the given server, so // we expect it not to be returned. - _, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners) + _, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners) if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) { t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err) } // Try to fetch an underpinned skylink from the name of a different server. // Expect one to be found. - _, err = db.FindAndLockUnderpinned(ctx, anotherServerName, cfg.MinPinners) + _, err = db.FindAndLockUnderpinned(ctx, anotherServerName, []string{}, cfg.MinPinners) if err != nil { t.Fatal(err) } @@ -294,7 +299,7 @@ func TestFindAndLock(t *testing.T) { } // Try to fetch an underpinned skylink with a third server name, expect none // to be found because our skylink is now properly pinned. - _, err = db.FindAndLockUnderpinned(ctx, thirdServerName, cfg.MinPinners) + _, err = db.FindAndLockUnderpinned(ctx, thirdServerName, []string{}, cfg.MinPinners) if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) { t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err) } @@ -336,7 +341,7 @@ func TestFindAndLockOwnFirst(t *testing.T) { t.Fatal(err) } // Fetch and lock one of those. - locked, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners) + locked, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners) if err != nil { t.Fatal(err) } @@ -349,7 +354,7 @@ func TestFindAndLockOwnFirst(t *testing.T) { } // Try fetching another underpinned skylink before unlocking this one. // Expect to get a different one. - newLocked, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners) + newLocked, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners) if err != nil { t.Fatal(err) } @@ -363,7 +368,7 @@ func TestFindAndLockOwnFirst(t *testing.T) { } // Fetch a new underpinned skylink. Expect it to fail because we've run out // of underpinned skylinks. - newLocked, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners) + newLocked, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners) if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) { t.Fatalf("Expected '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err) } diff --git a/workers/scanner.go b/workers/scanner.go index d8bce5c..367aa5f 100644 --- a/workers/scanner.go +++ b/workers/scanner.go @@ -329,8 +329,11 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S s.mu.Unlock() ctx := context.TODO() + // skipSkylinks is a list of skylinks which we want to skip during this + // scan. These might be skylinks which errored out or blocked skylinks. + skipSkylinks := make([]string, 0) - sl, err := s.staticDB.FindAndLockUnderpinned(ctx, s.staticServerName, minPinners) + sl, err := s.staticDB.FindAndLockUnderpinned(ctx, s.staticServerName, skipSkylinks, minPinners) if database.IsNoSkylinksNeedPinning(err) { return skymodules.Skylink{}, skymodules.SiaPath{}, false, err } @@ -355,6 +358,7 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S if errors.Contains(err, skyd.ErrSkylinkAlreadyPinned) { s.staticLogger.Info(err) // The skylink is already pinned locally but it's not marked as such. + skipSkylinks = append(skipSkylinks, sl.String()) err = s.staticDB.AddServerForSkylinks(ctx, []string{sl.String()}, s.staticServerName, false) if err != nil { s.staticLogger.Debug(errors.AddContext(err, "failed to mark as pinned by this server")) @@ -363,19 +367,20 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S } if errors.Contains(err, skyd.ErrSkylinkIsBlocked) { s.staticLogger.Info(err) + skipSkylinks = append(skipSkylinks, sl.String()) // The skylink is blocked by skyd. We'll remove it from the database, so // no other server will try to repin it. err = s.staticDB.DeleteSkylink(ctx, sl) return skymodules.Skylink{}, skymodules.SiaPath{}, true, err } - if err != nil && (strings.Contains(err.Error(), "API authentication failed.") || - strings.Contains(err.Error(), "connect: connection refused")) { + if err != nil && (strings.Contains(err.Error(), "API authentication failed.") || strings.Contains(err.Error(), "connect: connection refused")) { err = errors.AddContext(err, fmt.Sprintf("unrecoverable error while pinning '%s'", sl)) s.staticLogger.Error(err) return skymodules.Skylink{}, skymodules.SiaPath{}, false, err } if err != nil { s.staticLogger.Warn(errors.AddContext(err, fmt.Sprintf("failed to pin '%s'", sl))) + skipSkylinks = append(skipSkylinks, sl.String()) // Since this is not an unrecoverable error, we'll signal the caller to // continue trying to pin other skylinks. return skymodules.Skylink{}, skymodules.SiaPath{}, true, err From 698f0cf3e162acb32e657b010d75176650491cd7 Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Fri, 12 Aug 2022 13:42:04 +0200 Subject: [PATCH 3/9] Test deleting a skylink. Test rebuilding the cache and skipping the blocked skylinks. --- skyd/cache_test.go | 22 +++++++++++++++++++++- skyd/mock.go | 12 +++++++++++- test/database/skylink_test.go | 9 +++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/skyd/cache_test.go b/skyd/cache_test.go index 59f61c9..776a595 100644 --- a/skyd/cache_test.go +++ b/skyd/cache_test.go @@ -1,6 +1,9 @@ package skyd import ( + "gitlab.com/SkynetLabs/skyd/node/api" + "gitlab.com/SkynetLabs/skyd/skymodules" + "go.sia.tech/siad/crypto" "testing" ) @@ -50,10 +53,23 @@ func TestCacheRebuild(t *testing.T) { sl := "XX_uSb3BpGxmSbRAg1xj5T8SdB4hiSFiEW2sEEzxt5MNkg" + // This skylink exists on the mock filesystem. + slBlocked := "CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q__B" + var slbl skymodules.Skylink + err := slbl.LoadString(slBlocked) + if err != nil { + t.Fatal(err) + } + // Add the blocked skylink to a blocklist. + blocklist := api.SkynetBlocklistGET{ + Blocklist: []crypto.Hash{slbl.MerkleRoot()}, + } + c := NewCache() // Add a skylink to the cache. Expect this to be gone after the rebuild. c.Add(sl) skyd := NewSkydClientMock() + skyd.SetBlocklist(blocklist) sls := skyd.MockFilesystem() rr := c.Rebuild(skyd, true) // Wait for the rebuild to finish. @@ -61,9 +77,13 @@ func TestCacheRebuild(t *testing.T) { if rr.ExternErr != nil { t.Fatal(rr.ExternErr) } + // Ensure that the blocked skylink is not in the cache. + if c.Contains(slBlocked) { + t.Fatalf("Expected blocked skylink '%s' to not be present after the rebuild.", sl) + } // Ensure that all expected skylinks are in the cache now. for _, s := range sls { - if !c.Contains(s) { + if s != slBlocked && !c.Contains(s) { t.Fatalf("Expected skylink '%s' to be in the cache.", s) } } diff --git a/skyd/mock.go b/skyd/mock.go index 8fa892e..fc8de45 100644 --- a/skyd/mock.go +++ b/skyd/mock.go @@ -12,6 +12,7 @@ import ( type ( // ClientMock is a mock of skyd.Client ClientMock struct { + blocklist api.SkynetBlocklistGET contractData uint64 fileHealth map[skymodules.SiaPath]float64 filesystemMock map[skymodules.SiaPath]rdReturnType @@ -44,7 +45,9 @@ func NewSkydClientMock() *ClientMock { // Blocklist gets the list of blocked skylinks. Noop. func (c *ClientMock) Blocklist() (blocklist api.SkynetBlocklistGET, err error) { - return api.SkynetBlocklistGET{}, nil + c.mu.Lock() + defer c.mu.Unlock() + return c.blocklist, nil } // ContractData returns the total data from Active and Passive contracts. @@ -156,6 +159,13 @@ func (c *ClientMock) RenterDirRootGet(siaPath skymodules.SiaPath) (rd api.Renter return r.RD, r.Err } +// SetBlocklist allows us to set the blocklist. +func (c *ClientMock) SetBlocklist(bl api.SkynetBlocklistGET) { + c.mu.Lock() + c.blocklist = bl + c.mu.Unlock() +} + // SetHealth allows us to set the health of a sia file. func (c *ClientMock) SetHealth(sp skymodules.SiaPath, h float64) { c.mu.Lock() diff --git a/test/database/skylink_test.go b/test/database/skylink_test.go index a078b69..46c40fe 100644 --- a/test/database/skylink_test.go +++ b/test/database/skylink_test.go @@ -181,6 +181,15 @@ func TestSkylink(t *testing.T) { if !s1.Pinned { t.Fatal("Expected the skylink to be pinned.") } + // Delete the skylink. + err = db.DeleteSkylink(ctx, sl1) + if err != nil { + t.Fatal(err) + } + s1, err = db.FindSkylink(ctx, sl1) + if !errors.Contains(err, database.ErrSkylinkNotExist) { + t.Fatalf("Expected error %v, got %v.", database.ErrSkylinkNotExist, err) + } } // TestFindAndLock tests the functionality of FindAndLockUnderpinned and From 96f9884d30743c2b45d96c5e57caef606e523c6c Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Fri, 12 Aug 2022 15:35:08 +0200 Subject: [PATCH 4/9] Remove the optional skip of blocked skylinks and make it obligatory. Use a map to check if a skylink is blocked. Fix a bug in the mongo query which skips blocked skylinks. --- database/skylink.go | 5 ++++- skyd/cache.go | 51 ++++++++++++++++++++++----------------------- skyd/cache_test.go | 2 +- skyd/mock.go | 2 +- skyd/skyd.go | 6 +++--- sweeper/sweeper.go | 2 +- workers/scanner.go | 6 +++--- 7 files changed, 38 insertions(+), 36 deletions(-) diff --git a/database/skylink.go b/database/skylink.go index ef654a2..8b6d490 100644 --- a/database/skylink.go +++ b/database/skylink.go @@ -197,6 +197,9 @@ func (db *DB) RemoveServerFromSkylinks(ctx context.Context, skylinks []string, s // ] // }) func (db *DB) FindAndLockUnderpinned(ctx context.Context, server string, skipSkylinks []string, minPinners int) (skymodules.Skylink, error) { + if skipSkylinks == nil { + skipSkylinks = make([]string, 0) + } filter := bson.M{ // We use pinned != false because pinned == true is the default but it's // possible that we've missed setting that somewhere. @@ -206,7 +209,7 @@ func (db *DB) FindAndLockUnderpinned(ctx context.Context, server string, skipSky // Not pinned by the given server. "servers": bson.M{"$nin": bson.A{server}}, // Skip all skylinks in the skipSkylinks list. - "skylink": bson.M{"$nin": bson.A{skipSkylinks}}, + "skylink": bson.M{"$nin": skipSkylinks}, // Unlocked. "$or": bson.A{ bson.M{"lock_expires": bson.M{"$exists": false}}, diff --git a/skyd/cache.go b/skyd/cache.go index 5579dd1..a1ff0c3 100644 --- a/skyd/cache.go +++ b/skyd/cache.go @@ -1,8 +1,8 @@ package skyd import ( - "bytes" "fmt" + "go.sia.tech/siad/crypto" "sync" "gitlab.com/NebulousLabs/errors" @@ -93,13 +93,13 @@ func (psc *PinnedSkylinksCache) Diff(sls []string) (unknown []string, missing [] // rebuilding happens in a goroutine, allowing the method to return a channel // on which the caller can either wait or select. The caller can check whether // the rebuild was successful by calling Error(). -func (psc *PinnedSkylinksCache) Rebuild(skydClient Client, omitBlockedSkylinks bool) RebuildCacheResult { +func (psc *PinnedSkylinksCache) Rebuild(skydClient Client) RebuildCacheResult { psc.mu.Lock() defer psc.mu.Unlock() if !psc.isRebuildInProgress() { psc.result = NewRebuildCacheResult() // Kick off the actual rebuild in a separate goroutine. - go psc.threadedRebuild(skydClient, omitBlockedSkylinks) + go psc.threadedRebuild(skydClient) } return *psc.result } @@ -122,7 +122,7 @@ func (psc *PinnedSkylinksCache) isRebuildInProgress() bool { // threadedRebuild performs the actual cache rebuild process. It reports any // errors by setting the psc.err variable and it always closes the rebuildCh on // exit. -func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client, omitBlockedSkylinks bool) { +func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client) { var err error // Ensure that we properly wrap up the rebuild process. defer func() { @@ -164,30 +164,29 @@ func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client, omitBlockedSk } } - // If omitBlockedSkylinks is true we'll check all skylinks against the list - // of blocked skylinks in skyd and we'll remove the blocked ones from the - // cache. - if omitBlockedSkylinks { - blocklist, err := skydClient.Blocklist() + // Check all skylinks against the list of blocked skylinks in skyd and we'll + // remove the blocked ones from the cache. + blocklist, err := skydClient.Blocklist() + if err != nil { + err = errors.AddContext(err, "failed to fetch blocklist") + return + } + blockMap := make(map[crypto.Hash]struct{}, len(blocklist.Blocklist)) + for _, bl := range blocklist.Blocklist { + blockMap[bl] = struct{}{} + } + var sl skymodules.Skylink + for s := range sls { + err = sl.LoadString(s) if err != nil { - err = errors.AddContext(err, "failed to fetch blocklist") - return + // This is an invalid skylink, remove it. + delete(sls, s) + continue } - var sl skymodules.Skylink - for s := range sls { - err = sl.LoadString(s) - if err != nil { - // This is an invalid skylink, remove it. - delete(sls, s) - } - for _, bl := range blocklist.Blocklist { - mr := sl.MerkleRoot() - if bytes.Equal(bl[:], mr[:]) { - // This skylink is blocked. Remove it from the list. - delete(sls, s) - break - } - } + if _, exists := blockMap[sl.MerkleRoot()]; exists { + // This skylink is blocked. Remove it from the list. + delete(sls, s) + break } } diff --git a/skyd/cache_test.go b/skyd/cache_test.go index 776a595..261c724 100644 --- a/skyd/cache_test.go +++ b/skyd/cache_test.go @@ -71,7 +71,7 @@ func TestCacheRebuild(t *testing.T) { skyd := NewSkydClientMock() skyd.SetBlocklist(blocklist) sls := skyd.MockFilesystem() - rr := c.Rebuild(skyd, true) + rr := c.Rebuild(skyd) // Wait for the rebuild to finish. <-rr.ErrAvail if rr.ExternErr != nil { diff --git a/skyd/mock.go b/skyd/mock.go index fc8de45..5d74a82 100644 --- a/skyd/mock.go +++ b/skyd/mock.go @@ -136,7 +136,7 @@ func (c *ClientMock) Pin(skylink string) (skymodules.SiaPath, error) { } // RebuildCache is a noop mock that takes at least 100ms. -func (c *ClientMock) RebuildCache(_ bool) RebuildCacheResult { +func (c *ClientMock) RebuildCache() RebuildCacheResult { closedCh := make(chan struct{}) close(closedCh) // Do some work. There are tests which rely on this value being above 50ms. diff --git a/skyd/skyd.go b/skyd/skyd.go index 430e587..a5451ba 100644 --- a/skyd/skyd.go +++ b/skyd/skyd.go @@ -41,7 +41,7 @@ type ( // Pin instructs the local skyd to pin the given skylink. Pin(skylink string) (skymodules.SiaPath, error) // RebuildCache rebuilds the cache of skylinks pinned by the local skyd. - RebuildCache(omitBlockedSkylinks bool) RebuildCacheResult + RebuildCache() RebuildCacheResult // RenterDirRootGet is a direct proxy to the skyd client method with the // same name. RenterDirRootGet(siaPath skymodules.SiaPath) (rd api.RenterDirectory, err error) @@ -156,10 +156,10 @@ func (c *client) Pin(skylink string) (skymodules.SiaPath, error) { // excludes blocked skylinks. The rebuilding happens in a goroutine, allowing // the method to return a channel on which the caller can either wait or select. // The caller can check whether the rebuild was successful by calling Error(). -func (c *client) RebuildCache(omitBlockedSkylinks bool) RebuildCacheResult { +func (c *client) RebuildCache() RebuildCacheResult { c.staticLogger.Trace("Entering RebuildCache") defer c.staticLogger.Trace("Exiting RebuildCache") - return c.staticSkylinksCache.Rebuild(c, omitBlockedSkylinks) + return c.staticSkylinksCache.Rebuild(c) } // RenterDirRootGet is a direct proxy to skyd client's method. diff --git a/sweeper/sweeper.go b/sweeper/sweeper.go index 407ae80..00e22d2 100644 --- a/sweeper/sweeper.go +++ b/sweeper/sweeper.go @@ -83,7 +83,7 @@ func (s *Sweeper) threadedPerformSweep() { // Kick off a skyd client cache rebuild. That happens in a separate // goroutine. We'll block on the result channel only after we're done with // the other tasks we can do while waiting. - res := s.staticSkydClient.RebuildCache(true) + res := s.staticSkydClient.RebuildCache() // We use an independent context because we are not strictly bound to a // specific API call. Also, this operation can take a significant amount of diff --git a/workers/scanner.go b/workers/scanner.go index 367aa5f..646979f 100644 --- a/workers/scanner.go +++ b/workers/scanner.go @@ -213,7 +213,7 @@ func (s *Scanner) threadedScanAndPin() { // Perform a scan: // Rebuild the cache and watch for service shutdown while doing that. - res := s.staticSkydClient.RebuildCache(true) + res := s.staticSkydClient.RebuildCache() select { case <-s.staticTG.StopChan(): return @@ -329,10 +329,10 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S s.mu.Unlock() ctx := context.TODO() + // skipSkylinks is a list of skylinks which we want to skip during this // scan. These might be skylinks which errored out or blocked skylinks. - skipSkylinks := make([]string, 0) - + var skipSkylinks []string sl, err := s.staticDB.FindAndLockUnderpinned(ctx, s.staticServerName, skipSkylinks, minPinners) if database.IsNoSkylinksNeedPinning(err) { return skymodules.Skylink{}, skymodules.SiaPath{}, false, err From cd6da38b8c23e4e46a087ffeb949ee22686154d0 Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Fri, 12 Aug 2022 17:30:15 +0200 Subject: [PATCH 5/9] Move the list of skipped skylinks to the scanner struct. --- workers/scanner.go | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/workers/scanner.go b/workers/scanner.go index 646979f..971df5e 100644 --- a/workers/scanner.go +++ b/workers/scanner.go @@ -108,7 +108,10 @@ type ( dryRun bool minPinners int - mu sync.Mutex + // skipSkylinks is a list of skylinks which we want to skip during this + // scan. These might be skylinks which errored out or blocked skylinks. + skipSkylinks []string + mu sync.Mutex } ) @@ -279,6 +282,12 @@ func (s *Scanner) staticScheduleNextScan() bool { func (s *Scanner) managedPinUnderpinnedSkylinks() { s.staticLogger.Trace("Entering managedPinUnderpinnedSkylinks") defer s.staticLogger.Trace("Exiting managedPinUnderpinnedSkylinks") + + // Clear out the skipped skylinks from the previous run. + s.mu.Lock() + s.skipSkylinks = []string{} + s.mu.Unlock() + for { // Check for service shutdown before talking to the DB. select { @@ -326,13 +335,11 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S s.mu.Lock() dryRun := s.dryRun minPinners := s.minPinners + skipSkylinks := s.skipSkylinks s.mu.Unlock() ctx := context.TODO() - // skipSkylinks is a list of skylinks which we want to skip during this - // scan. These might be skylinks which errored out or blocked skylinks. - var skipSkylinks []string sl, err := s.staticDB.FindAndLockUnderpinned(ctx, s.staticServerName, skipSkylinks, minPinners) if database.IsNoSkylinksNeedPinning(err) { return skymodules.Skylink{}, skymodules.SiaPath{}, false, err @@ -358,7 +365,7 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S if errors.Contains(err, skyd.ErrSkylinkAlreadyPinned) { s.staticLogger.Info(err) // The skylink is already pinned locally but it's not marked as such. - skipSkylinks = append(skipSkylinks, sl.String()) + s.managedSkipSkylink(sl) err = s.staticDB.AddServerForSkylinks(ctx, []string{sl.String()}, s.staticServerName, false) if err != nil { s.staticLogger.Debug(errors.AddContext(err, "failed to mark as pinned by this server")) @@ -367,7 +374,7 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S } if errors.Contains(err, skyd.ErrSkylinkIsBlocked) { s.staticLogger.Info(err) - skipSkylinks = append(skipSkylinks, sl.String()) + s.managedSkipSkylink(sl) // The skylink is blocked by skyd. We'll remove it from the database, so // no other server will try to repin it. err = s.staticDB.DeleteSkylink(ctx, sl) @@ -380,7 +387,7 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S } if err != nil { s.staticLogger.Warn(errors.AddContext(err, fmt.Sprintf("failed to pin '%s'", sl))) - skipSkylinks = append(skipSkylinks, sl.String()) + s.managedSkipSkylink(sl) // Since this is not an unrecoverable error, we'll signal the caller to // continue trying to pin other skylinks. return skymodules.Skylink{}, skymodules.SiaPath{}, true, err @@ -393,6 +400,13 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S return sl, sp, true, nil } +// managedSkipSkylink adds a skylink to the list of skipped skylinks. +func (s *Scanner) managedSkipSkylink(sl skymodules.Skylink) { + s.mu.Lock() + s.skipSkylinks = append(s.skipSkylinks, sl.String()) + s.mu.Unlock() +} + // staticEstimateTimeToFull calculates how long we should sleep after pinning // the given skylink in order to give the renter time to fully upload it before // we pin another one. It returns a ballpark value. From 9cc4e5e0c9d7182e00736686ea6c786a9a925bb0 Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Mon, 15 Aug 2022 15:04:45 +0200 Subject: [PATCH 6/9] Periodically print out information about the currently running pinning session. --- workers/scanner.go | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/workers/scanner.go b/workers/scanner.go index 971df5e..b88db53 100644 --- a/workers/scanner.go +++ b/workers/scanner.go @@ -78,6 +78,14 @@ var ( Testing: time.Millisecond, }).(time.Duration) + // printPinningStatisticsPeriod defines how often we print intermediate + // statistics while pinning underpinned files. + printPinningStatisticsPeriod = build.Select(build.Var{ + Standard: 10 * time.Minute, + Dev: 10 * time.Second, + Testing: 500 * time.Millisecond, + }).(time.Duration) + // sleepBetweenScans defines how often we'll scan the DB for underpinned // skylinks. // Needs to be at least twice as long as conf.SleepBetweenChecksForScan. @@ -288,6 +296,21 @@ func (s *Scanner) managedPinUnderpinnedSkylinks() { s.skipSkylinks = []string{} s.mu.Unlock() + intermediateStatsTicker := time.NewTicker(printPinningStatisticsPeriod) + defer intermediateStatsTicker.Stop() + countPinned := 0 + t0 := lib.Now() + + // Print final statistics when finishing the method. + defer func() { + t1 := lib.Now() + s.staticLogger.Infof("Finished at %s, runtime %s, pinned skylinks %d", t1.Format(conf.TimeFormat), t1.Sub(t0).String(), countPinned) + s.mu.Lock() + skipped := s.skipSkylinks + s.mu.Unlock() + s.staticLogger.Tracef("Skipped %d skylinks: %v", len(skipped), skipped) + }() + for { // Check for service shutdown before talking to the DB. select { @@ -297,7 +320,20 @@ func (s *Scanner) managedPinUnderpinnedSkylinks() { default: } + // Print intermediate statistics. + select { + case <-intermediateStatsTicker.C: + t1 := lib.Now() + s.staticLogger.Infof("Time %s, runtime %s, pinned skylinks %d", t1.Format(conf.TimeFormat), t1.Sub(t0).String(), countPinned) + default: + } + skylink, sp, continueScanning, err := s.managedFindAndPinOneUnderpinnedSkylink() + if err == nil { + countPinned++ + } else { + s.staticLogger.Trace(err) + } if !continueScanning { return } @@ -310,7 +346,6 @@ func (s *Scanner) managedPinUnderpinnedSkylinks() { s.staticWaitUntilHealthy(skylink, sp) continue } - s.staticLogger.Trace(err) // In case of error we still want to sleep for a moment in order to // avoid a tight(ish) loop of errors when we either fail to pin or // fail to mark as pinned. Note that this only happens when we want From de1465de991efbfdb0dcdd23121bd3bae75ac53e Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Mon, 15 Aug 2022 15:18:15 +0200 Subject: [PATCH 7/9] Clean up filtering blocked skylinks. --- skyd/cache.go | 51 +++++++++++++++++++++------------------------------ 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/skyd/cache.go b/skyd/cache.go index a1ff0c3..375356b 100644 --- a/skyd/cache.go +++ b/skyd/cache.go @@ -135,10 +135,23 @@ func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client) { psc.mu.Unlock() }() + // Check all skylinks against the list of blocked skylinks in skyd and we'll + // remove the blocked ones from the cache. + blocklist, err := skydClient.Blocklist() + if err != nil { + err = errors.AddContext(err, "failed to fetch blocklist") + return + } + blockMap := make(map[crypto.Hash]struct{}, len(blocklist.Blocklist)) + for _, bl := range blocklist.Blocklist { + blockMap[bl] = struct{}{} + } + // Walk the entire Skynet folder and scan all files we find for skylinks. dirsToWalk := []skymodules.SiaPath{skymodules.SkynetFolder} sls := make(map[string]struct{}) var rd api.RenterDirectory + var sl skymodules.Skylink for len(dirsToWalk) > 0 { // Pop the first dir and walk it. dir := dirsToWalk[0] @@ -149,12 +162,16 @@ func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client) { return } for _, f := range rd.Files { - for _, sl := range f.Skylinks { - if !validSkylink(sl) { - build.Critical(fmt.Errorf("Detected invalid skylink in a sia file: skylink '%s', siapath: '%s'", sl, f.SiaPath)) + for _, s := range f.Skylinks { + if err = sl.LoadString(s); err != nil { + build.Critical(fmt.Errorf("Detected invalid skylink in a sia file: skylink '%s', siapath: '%s'", s, f.SiaPath)) + continue + } + // Check if the skylink is blocked. + if _, exists := blockMap[sl.MerkleRoot()]; exists { continue } - sls[sl] = struct{}{} + sls[s] = struct{}{} } } // Grab all subdirs and queue them for walking. @@ -164,32 +181,6 @@ func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client) { } } - // Check all skylinks against the list of blocked skylinks in skyd and we'll - // remove the blocked ones from the cache. - blocklist, err := skydClient.Blocklist() - if err != nil { - err = errors.AddContext(err, "failed to fetch blocklist") - return - } - blockMap := make(map[crypto.Hash]struct{}, len(blocklist.Blocklist)) - for _, bl := range blocklist.Blocklist { - blockMap[bl] = struct{}{} - } - var sl skymodules.Skylink - for s := range sls { - err = sl.LoadString(s) - if err != nil { - // This is an invalid skylink, remove it. - delete(sls, s) - continue - } - if _, exists := blockMap[sl.MerkleRoot()]; exists { - // This skylink is blocked. Remove it from the list. - delete(sls, s) - break - } - } - // Update the cache. psc.mu.Lock() psc.skylinks = sls From 8f5eac8f02f6e735a854c6e8f1f845692693e1f9 Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Mon, 15 Aug 2022 16:51:12 +0200 Subject: [PATCH 8/9] Add an integration test for deleting blocked skylinks. --- workers/scanner.go | 7 +++++++ workers/scanner_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/workers/scanner.go b/workers/scanner.go index b88db53..f286773 100644 --- a/workers/scanner.go +++ b/workers/scanner.go @@ -383,7 +383,13 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S s.staticLogger.Warn(errors.AddContext(err, "failed to fetch underpinned skylink")) return skymodules.Skylink{}, skymodules.SiaPath{}, false, err } + // This is a flag we are going to raise if we delete the skylink from the DB + // while processing it. + var deleted bool defer func() { + if deleted { + return + } err = s.staticDB.UnlockSkylink(ctx, sl, s.staticServerName) if err != nil { s.staticLogger.Debug(errors.AddContext(err, "failed to unlock skylink after trying to pin it")) @@ -413,6 +419,7 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S // The skylink is blocked by skyd. We'll remove it from the database, so // no other server will try to repin it. err = s.staticDB.DeleteSkylink(ctx, sl) + deleted = err == nil return skymodules.Skylink{}, skymodules.SiaPath{}, true, err } if err != nil && (strings.Contains(err.Error(), "API authentication failed.") || strings.Contains(err.Error(), "connect: connection refused")) { diff --git a/workers/scanner_test.go b/workers/scanner_test.go index dc5f537..39ca2c9 100644 --- a/workers/scanner_test.go +++ b/workers/scanner_test.go @@ -449,6 +449,39 @@ func TestFindAndPinOneUnderpinnedSkylink(t *testing.T) { if !test.Contains(sls, sl.String()) { t.Fatalf("Expected to find '%s' among the skylinks pinned by this server, got '%v'", sl.String(), sls) } + + // Make sure we handle blocked skylinks correctly. + blockedSl := test.RandomSkylink() + _, err = db.CreateSkylink(ctx, blockedSl, serverName) + if err != nil { + t.Fatal(err) + } + // Make sure the skylink is in the DB. + _, err = db.FindSkylink(ctx, blockedSl) + if err != nil { + t.Fatal(err) + } + err = db.RemoveServerFromSkylinks(ctx, []string{blockedSl.String()}, serverName) + if err != nil { + t.Fatal(err) + } + skydcm.SetPinError(skyd.ErrSkylinkIsBlocked) + _, _, _, err = s.managedFindAndPinOneUnderpinnedSkylink() + if err != nil { + t.Fatal(err) + } + sls, err = db.SkylinksForServer(ctx, serverName) + if err != nil { + t.Fatal(err) + } + if test.Contains(sls, blockedSl.String()) { + t.Fatalf("Expected to NOT find '%s' among the skylinks pinned by this server, got '%v'", blockedSl.String(), sls) + } + // Make sure the skylink is gone from the DB. + _, err = db.FindSkylink(ctx, blockedSl) + if !errors.Contains(err, database.ErrSkylinkNotExist) { + t.Fatalf("Expected '%s', got '%v'", database.ErrSkylinkNotExist, err) + } } // TestEligibleToPin makes sure that we can follow our eligibility rules: From 29cb49480af4b654a062908a384bf5a6e310dd48 Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Mon, 15 Aug 2022 16:58:08 +0200 Subject: [PATCH 9/9] Add number of skipped skylinks to the logged info. --- workers/scanner.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/workers/scanner.go b/workers/scanner.go index f286773..e0ef1ef 100644 --- a/workers/scanner.go +++ b/workers/scanner.go @@ -304,10 +304,10 @@ func (s *Scanner) managedPinUnderpinnedSkylinks() { // Print final statistics when finishing the method. defer func() { t1 := lib.Now() - s.staticLogger.Infof("Finished at %s, runtime %s, pinned skylinks %d", t1.Format(conf.TimeFormat), t1.Sub(t0).String(), countPinned) s.mu.Lock() skipped := s.skipSkylinks s.mu.Unlock() + s.staticLogger.Infof("Finished at %s, runtime %s, pinned skylinks %d, skipped skylinks %d", t1.Format(conf.TimeFormat), t1.Sub(t0).String(), countPinned, len(skipped)) s.staticLogger.Tracef("Skipped %d skylinks: %v", len(skipped), skipped) }() @@ -324,7 +324,10 @@ func (s *Scanner) managedPinUnderpinnedSkylinks() { select { case <-intermediateStatsTicker.C: t1 := lib.Now() - s.staticLogger.Infof("Time %s, runtime %s, pinned skylinks %d", t1.Format(conf.TimeFormat), t1.Sub(t0).String(), countPinned) + s.mu.Lock() + numSkipped := len(s.skipSkylinks) + s.mu.Unlock() + s.staticLogger.Infof("Time %s, runtime %s, pinned skylinks %d, skipped skylinks %d", t1.Format(conf.TimeFormat), t1.Sub(t0).String(), countPinned, numSkipped) default: }