Skip to content

Commit

Permalink
Merge pull request #53 from SkynetLabs/ivo/skip_blocked_skylinks
Browse files Browse the repository at this point in the history
Skip blocked skylinks
  • Loading branch information
MSevey authored Aug 15, 2022
2 parents c0ff5ce + 9409640 commit ea1b3ff
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 28 deletions.
17 changes: 16 additions & 1 deletion database/skylink.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ func (db *DB) CreateSkylink(ctx context.Context, skylink skymodules.Skylink, ser
return s, nil
}

// DeleteSkylink removes the skylink from the database.
func (db *DB) DeleteSkylink(ctx context.Context, skylink skymodules.Skylink) error {
filter := bson.M{"skylink": skylink.String()}
dr, err := db.staticDB.Collection(collSkylinks).DeleteOne(ctx, filter)
if err == mongo.ErrNoDocuments || dr.DeletedCount == 0 {
return ErrSkylinkNotExist
}
return nil
}

// FindSkylink fetches a skylink from the DB.
func (db *DB) FindSkylink(ctx context.Context, skylink skymodules.Skylink) (Skylink, error) {
sr := db.staticDB.Collection(collSkylinks).FindOne(ctx, bson.M{"skylink": skylink.String()})
Expand Down Expand Up @@ -237,7 +247,10 @@ func (db *DB) RemoveServerFromSkylinks(ctx context.Context, skylinks []string, s
// { "lock_expires" : { "$lt": new Date() }}
// ]
// })
func (db *DB) FindAndLockUnderpinned(ctx context.Context, server string, minPinners int) (skymodules.Skylink, error) {
func (db *DB) FindAndLockUnderpinned(ctx context.Context, server string, skipSkylinks []string, minPinners int) (skymodules.Skylink, error) {
if skipSkylinks == nil {
skipSkylinks = make([]string, 0)
}
filter := bson.M{
// We use pinned != false because pinned == true is the default but it's
// possible that we've missed setting that somewhere.
Expand All @@ -246,6 +259,8 @@ func (db *DB) FindAndLockUnderpinned(ctx context.Context, server string, minPinn
"$expr": bson.M{"$lt": bson.A{bson.M{"$size": "$servers"}, minPinners}},
// Not pinned by the given server.
"servers": bson.M{"$nin": bson.A{server}},
// Skip all skylinks in the skipSkylinks list.
"skylink": bson.M{"$nin": skipSkylinks},
// Unlocked.
"$or": bson.A{
bson.M{"lock_expires": bson.M{"$exists": false}},
Expand Down
26 changes: 22 additions & 4 deletions skyd/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package skyd

import (
"fmt"
"go.sia.tech/siad/crypto"
"sync"

"gitlab.com/NebulousLabs/errors"
Expand Down Expand Up @@ -135,10 +136,23 @@ func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client) {
psc.mu.Unlock()
}()

// Check all skylinks against the list of blocked skylinks in skyd and we'll
// remove the blocked ones from the cache.
blocklist, err := skydClient.Blocklist()
if err != nil {
err = errors.AddContext(err, "failed to fetch blocklist")
return
}
blockMap := make(map[crypto.Hash]struct{}, len(blocklist.Blocklist))
for _, bl := range blocklist.Blocklist {
blockMap[bl] = struct{}{}
}

// Walk the entire Skynet folder and scan all files we find for skylinks.
dirsToWalk := []skymodules.SiaPath{skymodules.SkynetFolder}
sls := make(map[string]struct{})
var rd api.RenterDirectory
var sl skymodules.Skylink
for len(dirsToWalk) > 0 {
// Pop the first dir and walk it.
dir := dirsToWalk[0]
Expand All @@ -149,12 +163,16 @@ func (psc *PinnedSkylinksCache) threadedRebuild(skydClient Client) {
return
}
for _, f := range rd.Files {
for _, sl := range f.Skylinks {
if !validSkylink(sl) {
build.Critical(fmt.Errorf("Detected invalid skylink in a sia file: skylink '%s', siapath: '%s'", sl, f.SiaPath))
for _, s := range f.Skylinks {
if err = sl.LoadString(s); err != nil {
build.Critical(fmt.Errorf("Detected invalid skylink in a sia file: skylink '%s', siapath: '%s'", s, f.SiaPath))
continue
}
// Check if the skylink is blocked.
if _, exists := blockMap[sl.MerkleRoot()]; exists {
continue
}
sls[sl] = struct{}{}
sls[s] = struct{}{}
}
}
// Grab all subdirs and queue them for walking.
Expand Down
22 changes: 21 additions & 1 deletion skyd/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package skyd

import (
"gitlab.com/SkynetLabs/skyd/node/api"
"gitlab.com/SkynetLabs/skyd/skymodules"
"go.sia.tech/siad/crypto"
"testing"
)

Expand Down Expand Up @@ -50,20 +53,37 @@ func TestCacheRebuild(t *testing.T) {

sl := "XX_uSb3BpGxmSbRAg1xj5T8SdB4hiSFiEW2sEEzxt5MNkg"

// This skylink exists on the mock filesystem.
slBlocked := "CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q__B"
var slbl skymodules.Skylink
err := slbl.LoadString(slBlocked)
if err != nil {
t.Fatal(err)
}
// Add the blocked skylink to a blocklist.
blocklist := api.SkynetBlocklistGET{
Blocklist: []crypto.Hash{slbl.MerkleRoot()},
}

c := NewCache()
// Add a skylink to the cache. Expect this to be gone after the rebuild.
c.Add(sl)
skyd := NewSkydClientMock()
skyd.SetBlocklist(blocklist)
sls := skyd.MockFilesystem()
rr := c.Rebuild(skyd)
// Wait for the rebuild to finish.
<-rr.ErrAvail
if rr.ExternErr != nil {
t.Fatal(rr.ExternErr)
}
// Ensure that the blocked skylink is not in the cache.
if c.Contains(slBlocked) {
t.Fatalf("Expected blocked skylink '%s' to not be present after the rebuild.", sl)
}
// Ensure that all expected skylinks are in the cache now.
for _, s := range sls {
if !c.Contains(s) {
if s != slBlocked && !c.Contains(s) {
t.Fatalf("Expected skylink '%s' to be in the cache.", s)
}
}
Expand Down
15 changes: 15 additions & 0 deletions skyd/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type (
// ClientMock is a mock of skyd.Client
ClientMock struct {
blocklist api.SkynetBlocklistGET
contractData uint64
fileHealth map[skymodules.SiaPath]float64
filesystemMock map[skymodules.SiaPath]rdReturnType
Expand Down Expand Up @@ -42,6 +43,13 @@ func NewSkydClientMock() *ClientMock {
}
}

// Blocklist gets the list of blocked skylinks. Noop.
func (c *ClientMock) Blocklist() (blocklist api.SkynetBlocklistGET, err error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.blocklist, nil
}

// ContractData returns the total data from Active and Passive contracts.
func (c *ClientMock) ContractData() (uint64, error) {
c.mu.Lock()
Expand Down Expand Up @@ -151,6 +159,13 @@ func (c *ClientMock) RenterDirRootGet(siaPath skymodules.SiaPath) (rd api.Renter
return r.RD, r.Err
}

// SetBlocklist allows us to set the blocklist.
func (c *ClientMock) SetBlocklist(bl api.SkynetBlocklistGET) {
c.mu.Lock()
c.blocklist = bl
c.mu.Unlock()
}

// SetHealth allows us to set the health of a sia file.
func (c *ClientMock) SetHealth(sp skymodules.SiaPath, h float64) {
c.mu.Lock()
Expand Down
21 changes: 17 additions & 4 deletions skyd/skyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ var (
// ErrSkylinkAlreadyPinned is returned when the skylink we're trying to pin
// is already pinned.
ErrSkylinkAlreadyPinned = errors.New("skylink already pinned")
// ErrSkylinkIsBlocked is returned when the skylinks we're trying to pin is
// blocked by skyd.
ErrSkylinkIsBlocked = errors.New("skylink is blocked")
)

type (
// Client describes the interface exposed by client.
Client interface {
// Blocklist gets the list of blocked skylinks.
Blocklist() (blocklist api.SkynetBlocklistGET, err error)
// ContractData returns the total data from Active and Passive contracts.
ContractData() (uint64, error)
// DiffPinnedSkylinks returns two lists of skylinks - the ones that
Expand Down Expand Up @@ -70,6 +75,11 @@ func NewClient(host, port, password string, cache *PinnedSkylinksCache, logger l
}
}

// Blocklist gets the list of blocked skylinks.
func (c *client) Blocklist() (blocklist api.SkynetBlocklistGET, err error) {
return c.staticClient.SkynetBlocklistGet()
}

// ContractData returns the total data from Active and Passive contracts.
func (c *client) ContractData() (uint64, error) {
rcs, err := c.staticClient.RenterContractsGet()
Expand Down Expand Up @@ -133,16 +143,19 @@ func (c *client) Pin(skylink string) (skymodules.SiaPath, error) {
return skymodules.SiaPath{}, ErrSkylinkAlreadyPinned
}
sp, err := c.staticClient.SkynetSkylinkPinLazyPost(skylink)
if err != nil && strings.Contains(err.Error(), "failed to pin file to skynet: skylink is blocked") {
return skymodules.SiaPath{}, ErrSkylinkIsBlocked
}
if err == nil || errors.Contains(err, ErrSkylinkAlreadyPinned) {
c.staticSkylinksCache.Add(skylink)
}
return sp, err
}

// RebuildCache rebuilds the cache of skylinks pinned by the local skyd. The
// rebuilding happens in a goroutine, allowing the method to return a channel
// on which the caller can either wait or select. The caller can check whether
// the rebuild was successful by calling Error().
// RebuildCache rebuilds the cache of skylinks pinned by the local skyd. This
// excludes blocked skylinks. The rebuilding happens in a goroutine, allowing
// the method to return a channel on which the caller can either wait or select.
// The caller can check whether the rebuild was successful by calling Error().
func (c *client) RebuildCache() RebuildCacheResult {
c.staticLogger.Trace("Entering RebuildCache")
defer c.staticLogger.Trace("Exiting RebuildCache")
Expand Down
36 changes: 25 additions & 11 deletions test/database/skylink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,15 @@ func TestSkylink(t *testing.T) {
if !s1.Pinned {
t.Fatal("Expected the skylink to be pinned.")
}
// Delete the skylink.
err = db.DeleteSkylink(ctx, sl1)
if err != nil {
t.Fatal(err)
}
s1, err = db.FindSkylink(ctx, sl1)
if !errors.Contains(err, database.ErrSkylinkNotExist) {
t.Fatalf("Expected error %v, got %v.", database.ErrSkylinkNotExist, err)
}

// Add a server for non-existent skylinks.
sl3 := test.RandomSkylink()
Expand Down Expand Up @@ -231,7 +240,7 @@ func TestFindAndLock(t *testing.T) {
cfg.MinPinners = 1

// Try to fetch an underpinned skylink, expect none to be found.
_, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners)
_, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners)
if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) {
t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err)
}
Expand All @@ -241,7 +250,7 @@ func TestFindAndLock(t *testing.T) {
t.Fatal(err)
}
// Try to fetch an underpinned skylink, expect none to be found.
_, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners)
_, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners)
if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) {
t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err)
}
Expand All @@ -250,8 +259,13 @@ func TestFindAndLock(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// Try to fetch underpinned skylinks but skip sl. Expect none to be found.
_, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{sl.String()}, cfg.MinPinners)
if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) {
t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err)
}
// Try to fetch an underpinned skylink, expect to find one.
underpinned, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners)
underpinned, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners)
if err != nil {
t.Fatal(err)
}
Expand All @@ -261,7 +275,7 @@ func TestFindAndLock(t *testing.T) {
// Try to fetch an underpinned skylink from the name of a different server.
// Expect to find none because the one we got before is now locked and
// shouldn't be returned.
_, err = db.FindAndLockUnderpinned(ctx, "different server", cfg.MinPinners)
_, err = db.FindAndLockUnderpinned(ctx, "different server", []string{}, cfg.MinPinners)
if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) {
t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err)
}
Expand All @@ -275,7 +289,7 @@ func TestFindAndLock(t *testing.T) {
t.Fatal(err)
}
// Try to fetch an underpinned skylink, expect none to be found.
_, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners)
_, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners)
if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) {
t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err)
}
Expand All @@ -289,13 +303,13 @@ func TestFindAndLock(t *testing.T) {
// Try to fetch an underpinned skylink, expect none to be found.
// Out test skylink is underpinned but it's pinned by the given server, so
// we expect it not to be returned.
_, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners)
_, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners)
if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) {
t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err)
}
// Try to fetch an underpinned skylink from the name of a different server.
// Expect one to be found.
_, err = db.FindAndLockUnderpinned(ctx, anotherServerName, cfg.MinPinners)
_, err = db.FindAndLockUnderpinned(ctx, anotherServerName, []string{}, cfg.MinPinners)
if err != nil {
t.Fatal(err)
}
Expand All @@ -316,7 +330,7 @@ func TestFindAndLock(t *testing.T) {
}
// Try to fetch an underpinned skylink with a third server name, expect none
// to be found because our skylink is now properly pinned.
_, err = db.FindAndLockUnderpinned(ctx, thirdServerName, cfg.MinPinners)
_, err = db.FindAndLockUnderpinned(ctx, thirdServerName, []string{}, cfg.MinPinners)
if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) {
t.Fatalf("Expected to get '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err)
}
Expand Down Expand Up @@ -358,7 +372,7 @@ func TestFindAndLockOwnFirst(t *testing.T) {
t.Fatal(err)
}
// Fetch and lock one of those.
locked, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners)
locked, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners)
if err != nil {
t.Fatal(err)
}
Expand All @@ -371,7 +385,7 @@ func TestFindAndLockOwnFirst(t *testing.T) {
}
// Try fetching another underpinned skylink before unlocking this one.
// Expect to get a different one.
newLocked, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners)
newLocked, err := db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners)
if err != nil {
t.Fatal(err)
}
Expand All @@ -385,7 +399,7 @@ func TestFindAndLockOwnFirst(t *testing.T) {
}
// Fetch a new underpinned skylink. Expect it to fail because we've run out
// of underpinned skylinks.
newLocked, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, cfg.MinPinners)
newLocked, err = db.FindAndLockUnderpinned(ctx, cfg.ServerName, []string{}, cfg.MinPinners)
if !errors.Contains(err, database.ErrNoUnderpinnedSkylinks) {
t.Fatalf("Expected '%v', got '%v'", database.ErrNoUnderpinnedSkylinks, err)
}
Expand Down
Loading

0 comments on commit ea1b3ff

Please sign in to comment.