Skip to content

Commit

Permalink
Fix deadlock when under high r/w load
Browse files Browse the repository at this point in the history
  • Loading branch information
Woellchen committed Aug 30, 2022
1 parent ade94b0 commit 484bba7
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 34 deletions.
20 changes: 11 additions & 9 deletions weed/mount/dirty_pages_chunked.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package mount

import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/page_writer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"io"
"sync"
"sync/atomic"
"time"

"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/page_writer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)

type ChunkedDirtyPages struct {
Expand All @@ -17,7 +19,7 @@ type ChunkedDirtyPages struct {
collection string
replication string
uploadPipeline *page_writer.UploadPipeline
hasWrites bool
hasWrites int32
}

var (
Expand All @@ -39,7 +41,7 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
}

func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool) {
pages.hasWrites = true
atomic.StoreInt32(&pages.hasWrites, 1)

glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data)))
pages.uploadPipeline.SaveDataAt(data, offset, isSequential)
Expand All @@ -48,7 +50,7 @@ func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential
}

func (pages *ChunkedDirtyPages) FlushData() error {
if !pages.hasWrites {
if atomic.LoadInt32(&pages.hasWrites) == 0 {
return nil
}
pages.uploadPipeline.FlushAll()
Expand All @@ -59,7 +61,7 @@ func (pages *ChunkedDirtyPages) FlushData() error {
}

func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
if !pages.hasWrites {
if atomic.LoadInt32(&pages.hasWrites) == 0 {
return
}
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
Expand All @@ -80,18 +82,18 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader
}
chunk.Mtime = mtime
pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
pages.fh.entryViewCache = nil
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)

}

func (pages ChunkedDirtyPages) Destroy() {
func (pages *ChunkedDirtyPages) Destroy() {
pages.uploadPipeline.Shutdown()
}

func (pages *ChunkedDirtyPages) LockForRead(startOffset, stopOffset int64) {
pages.uploadPipeline.LockForRead(startOffset, stopOffset)
}

func (pages *ChunkedDirtyPages) UnlockForRead(startOffset, stopOffset int64) {
pages.uploadPipeline.UnlockForRead(startOffset, stopOffset)
}
5 changes: 3 additions & 2 deletions weed/mount/page_writer/page_chunk_mem.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package page_writer

import (
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"sync"
"sync/atomic"

"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
)

var (
Expand Down
9 changes: 5 additions & 4 deletions weed/mount/page_writer/page_chunk_swapfile.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package page_writer

import (
"os"
"sync"

"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"os"
"sync"
)

var (
Expand Down Expand Up @@ -138,8 +139,6 @@ func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
}
sc.Lock()
defer sc.Unlock()

for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
data := mem.Allocate(int(t.Size()))
Expand All @@ -150,5 +149,7 @@ func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
mem.Free(data)
}

sc.Lock()
sc.usage = newChunkWrittenIntervalList()
sc.Unlock()
}
16 changes: 8 additions & 8 deletions weed/mount/page_writer/upload_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package page_writer

import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
"sync"
"sync/atomic"
"time"

"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
)

type LogicChunkIndex int
Expand Down Expand Up @@ -140,8 +141,10 @@ func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex
}

func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
atomic.AddInt32(&up.uploaderCount, 1)
up.uploaderCountCond.L.Lock()
up.uploaderCount += 1
glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)
up.uploaderCountCond.L.Unlock()

up.sealedChunksLock.Lock()

Expand All @@ -162,12 +165,9 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
sealedChunk.chunk.SaveContent(up.saveToStorageFn)

// notify waiting process
atomic.AddInt32(&up.uploaderCount, -1)
glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
// Lock and Unlock are not required,
// but it may signal multiple times during one wakeup,
// and the waiting goroutine may miss some of them!
up.uploaderCountCond.L.Lock()
up.uploaderCount -= 1
glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
up.uploaderCountCond.Broadcast()
up.uploaderCountCond.L.Unlock()

Expand Down
11 changes: 1 addition & 10 deletions weed/mount/page_writer/upload_pipeline_lock.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package page_writer

import (
"sync/atomic"
)

func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
Expand Down Expand Up @@ -51,12 +47,7 @@ func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {

func (up *UploadPipeline) waitForCurrentWritersToComplete() {
up.uploaderCountCond.L.Lock()
t := int32(100)
for {
t = atomic.LoadInt32(&up.uploaderCount)
if t <= 0 {
break
}
for up.uploaderCount > 0 {
up.uploaderCountCond.Wait()
}
up.uploaderCountCond.L.Unlock()
Expand Down
6 changes: 5 additions & 1 deletion weed/mount/weedfs_file_write.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package mount

import (
"github.com/hanwen/go-fuse/v2/fuse"
"net/http"
"syscall"

"github.com/hanwen/go-fuse/v2/fuse"
)

/**
Expand Down Expand Up @@ -53,6 +54,9 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
return 0, fuse.OK
}

fh.entryLock.Lock()
defer fh.entryLock.Unlock()

entry.Content = nil
offset := int64(in.Offset)
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
Expand Down

0 comments on commit 484bba7

Please sign in to comment.