From 7d21d0c39d9d81f1d22f04bcc4b3f9a42f64c401 Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Thu, 18 Apr 2024 17:14:33 +0200 Subject: [PATCH 1/7] Fix execution of failpoint should not block deactivation There are 2 main flows of the gofail library: namely enable/disable and execution (`Acquire`) of the failpoints. Currently, a mutex is protecting both flows, thus only one action can make progress at a time. This PR proposes a fine-grained mutex, as each failpoint is protected under a dedicated `RWMutex`. The existing `failpointsMu` will only be protecting the main shared data structures, such as `failpoints` map. Notice that in our current implementation, the execution of the same failpoint is still sequential (there is a lock within `eval` on the term being executed) Reference: - https://github.com/etcd-io/gofail/issues/64 Signed-off-by: Chun-Hung Tseng --- runtime/failpoint.go | 16 ++++++++++++---- runtime/runtime.go | 11 +++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/runtime/failpoint.go b/runtime/failpoint.go index a470417..7559140 100644 --- a/runtime/failpoint.go +++ b/runtime/failpoint.go @@ -16,10 +16,13 @@ package runtime import ( "fmt" + "sync" ) type Failpoint struct { t *terms + + failpointMu sync.RWMutex } func NewFailpoint(name string) *Failpoint { @@ -28,14 +31,19 @@ func NewFailpoint(name string) *Failpoint { // Acquire gets evalutes the failpoint terms; if the failpoint // is active, it will return a value. Otherwise, returns a non-nil error. +// +// Notice that during the exection of Acquire(), the failpoint can be disabled, +// but the already in-flight execution won't be terminated func (fp *Failpoint) Acquire() (interface{}, error) { - failpointsMu.RLock() - defer failpointsMu.RUnlock() + fp.failpointMu.RLock() + // terms are locked during execution, so deepcopy is not required as no change can be made during execution + cachedT := fp.t + fp.failpointMu.RUnlock() - if fp.t == nil { + if cachedT == nil { return nil, ErrDisabled } - result := fp.t.eval() + result := cachedT.eval() if result == nil { return nil, ErrDisabled } diff --git a/runtime/runtime.go b/runtime/runtime.go index 938aa4d..2356980 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -86,6 +86,10 @@ func enable(name, inTerms string) error { fmt.Printf("failed to enable \"%s=%s\" (%v)\n", name, inTerms, err) return err } + + fp.failpointMu.Lock() + defer fp.failpointMu.Unlock() + fp.t = t return nil @@ -104,6 +108,9 @@ func disable(name string) error { return ErrNoExist } + fp.failpointMu.Lock() + defer fp.failpointMu.Unlock() + if fp.t == nil { return ErrDisabled } @@ -116,6 +123,7 @@ func disable(name string) error { func Status(failpath string) (string, int, error) { failpointsMu.Lock() defer failpointsMu.Unlock() + return status(failpath) } @@ -125,6 +133,9 @@ func status(failpath string) (string, int, error) { return "", 0, ErrNoExist } + fp.failpointMu.RLock() + defer fp.failpointMu.RUnlock() + t := fp.t if t == nil { return "", 0, ErrDisabled From 6ef0387f21847d493a2c40f76e8bd7f14fa7ca30 Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Fri, 19 Apr 2024 11:20:21 +0200 Subject: [PATCH 2/7] Reduce failpointsMu locking scope Signed-off-by: Chun-Hung Tseng --- runtime/runtime.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/runtime/runtime.go b/runtime/runtime.go index 2356980..71a3408 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -69,14 +69,14 @@ func parseFailpoints(fps string) (map[string]string, error) { // Enable sets a failpoint to a given failpoint description. func Enable(name, inTerms string) error { - failpointsMu.Lock() - defer failpointsMu.Unlock() return enable(name, inTerms) } // enable enables a failpoint func enable(name, inTerms string) error { + failpointsMu.RLock() fp := failpoints[name] + failpointsMu.RUnlock() if fp == nil { return ErrNoExist } @@ -97,13 +97,13 @@ func enable(name, inTerms string) error { // Disable stops a failpoint from firing. func Disable(name string) error { - failpointsMu.Lock() - defer failpointsMu.Unlock() return disable(name) } func disable(name string) error { + failpointsMu.RLock() fp := failpoints[name] + failpointsMu.RUnlock() if fp == nil { return ErrNoExist } @@ -121,14 +121,13 @@ func disable(name string) error { // Status gives the current setting and execution count for the failpoint func Status(failpath string) (string, int, error) { - failpointsMu.Lock() - defer failpointsMu.Unlock() - return status(failpath) } func status(failpath string) (string, int, error) { + failpointsMu.RLock() fp := failpoints[failpath] + failpointsMu.RUnlock() if fp == nil { return "", 0, ErrNoExist } @@ -160,13 +159,14 @@ func list() []string { func register(name string) *Failpoint { failpointsMu.Lock() - defer failpointsMu.Unlock() if _, ok := failpoints[name]; ok { + failpointsMu.Unlock() panic(fmt.Sprintf("failpoint name %s is already registered.", name)) } fp := &Failpoint{} failpoints[name] = fp + failpointsMu.Unlock() if t, ok := envTerms[name]; ok { enable(name, t) } From c03f587c331a91d122a4bcb8435006dbfac25e65 Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Fri, 3 May 2024 23:35:49 +0800 Subject: [PATCH 3/7] Introduces the panicMu mutex Ensures that panic failpoints and serving of the http requests won't be able to be executed at the same time. Signed-off-by: Chun-Hung Tseng --- runtime/http.go | 13 +++++-------- runtime/runtime.go | 5 ++++- runtime/terms.go | 3 +++ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/runtime/http.go b/runtime/http.go index 62df301..3c27681 100644 --- a/runtime/http.go +++ b/runtime/http.go @@ -37,14 +37,11 @@ func serve(host string) error { } func (*httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // This prevents all failpoints from being triggered. It ensures - // the server(runtime) doesn't panic due to any failpoints during - // processing the HTTP request. - // It may be inefficient, but correctness is more important than - // efficiency. Usually users will not enable too many failpoints - // at a time, so it (the efficiency) isn't a problem. - failpointsMu.Lock() - defer failpointsMu.Unlock() + // Ensures the server(runtime) doesn't panic due to the execution of + // panic failpoints during processing of the HTTP request + panicMu.Lock() + defer panicMu.Unlock() + // flush before unlocking so a panic failpoint won't // take down the http server before it sends the response defer flush(w) diff --git a/runtime/runtime.go b/runtime/runtime.go index 71a3408..5aa7588 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -27,7 +27,10 @@ var ( failpoints map[string]*Failpoint failpointsMu sync.RWMutex - envTerms map[string]string + + envTerms map[string]string + + panicMu sync.Mutex ) func init() { diff --git a/runtime/terms.go b/runtime/terms.go index f94d22b..b28b468 100644 --- a/runtime/terms.go +++ b/runtime/terms.go @@ -317,6 +317,9 @@ func actSleep(t *term) interface{} { } func actPanic(t *term) interface{} { + panicMu.Lock() + defer panicMu.Unlock() + if t.val != nil { panic(fmt.Sprintf("failpoint panic: %v", t.val)) } From 260f9157a22e0415e0df39122d9b6241821d32c6 Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Fri, 3 May 2024 23:36:13 +0800 Subject: [PATCH 4/7] Refactor enable, status, and disable functions Signed-off-by: Chun-Hung Tseng --- runtime/http.go | 10 +++++----- runtime/runtime.go | 15 +-------------- 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/runtime/http.go b/runtime/http.go index 3c27681..f006869 100644 --- a/runtime/http.go +++ b/runtime/http.go @@ -72,7 +72,7 @@ func (*httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } for k, v := range fpMap { - if err := enable(k, v); err != nil { + if err := Enable(k, v); err != nil { http.Error(w, fmt.Sprintf("fail to set failpoint: %v", err), http.StatusBadRequest) return } @@ -86,13 +86,13 @@ func (*httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { sort.Strings(fps) lines := make([]string, len(fps)) for i := range lines { - s, _, _ := status(fps[i]) + s, _, _ := Status(fps[i]) lines[i] = fps[i] + "=" + s } w.Write([]byte(strings.Join(lines, "\n") + "\n")) } else if strings.HasSuffix(key, "/count") { fp := key[:len(key)-len("/count")] - _, count, err := status(fp) + _, count, err := Status(fp) if err != nil { if errors.Is(err, ErrNoExist) { http.Error(w, "failed to GET: "+err.Error(), http.StatusNotFound) @@ -103,7 +103,7 @@ func (*httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } w.Write([]byte(strconv.Itoa(count))) } else { - status, _, err := status(key) + status, _, err := Status(key) if err != nil { http.Error(w, "failed to GET: "+err.Error(), http.StatusNotFound) } @@ -112,7 +112,7 @@ func (*httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // deactivates a failpoint case r.Method == "DELETE": - if err := disable(key); err != nil { + if err := Disable(key); err != nil { http.Error(w, "failed to delete failpoint "+err.Error(), http.StatusBadRequest) return } diff --git a/runtime/runtime.go b/runtime/runtime.go index 5aa7588..1e57932 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -72,11 +72,6 @@ func parseFailpoints(fps string) (map[string]string, error) { // Enable sets a failpoint to a given failpoint description. func Enable(name, inTerms string) error { - return enable(name, inTerms) -} - -// enable enables a failpoint -func enable(name, inTerms string) error { failpointsMu.RLock() fp := failpoints[name] failpointsMu.RUnlock() @@ -100,10 +95,6 @@ func enable(name, inTerms string) error { // Disable stops a failpoint from firing. func Disable(name string) error { - return disable(name) -} - -func disable(name string) error { failpointsMu.RLock() fp := failpoints[name] failpointsMu.RUnlock() @@ -124,10 +115,6 @@ func disable(name string) error { // Status gives the current setting and execution count for the failpoint func Status(failpath string) (string, int, error) { - return status(failpath) -} - -func status(failpath string) (string, int, error) { failpointsMu.RLock() fp := failpoints[failpath] failpointsMu.RUnlock() @@ -171,7 +158,7 @@ func register(name string) *Failpoint { failpoints[name] = fp failpointsMu.Unlock() if t, ok := envTerms[name]; ok { - enable(name, t) + Enable(name, t) } return fp } From 499c363c690d12e2d358a343974ec553cde2e4ca Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Mon, 13 May 2024 14:10:49 +0200 Subject: [PATCH 5/7] Refactor runtime/failpoint.go Co-authored-by: Benjamin Wang Co-authored-by: Marek Siarkowicz Signed-off-by: Chun-Hung Tseng --- runtime/failpoint.go | 34 ++++++++++++++++++++++++++++++++-- runtime/runtime.go | 25 +++---------------------- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/runtime/failpoint.go b/runtime/failpoint.go index 7559140..4bb3d8e 100644 --- a/runtime/failpoint.go +++ b/runtime/failpoint.go @@ -20,8 +20,7 @@ import ( ) type Failpoint struct { - t *terms - + t *terms failpointMu sync.RWMutex } @@ -54,3 +53,34 @@ func (fp *Failpoint) Acquire() (interface{}, error) { func (fp *Failpoint) BadType(v interface{}, t string) { fmt.Printf("failpoint: %q got value %v of type \"%T\" but expected type %q\n", fp.t.fpath, v, v, t) } + +func (fp *Failpoint) SetTerm(t *terms) { + fp.failpointMu.Lock() + defer fp.failpointMu.Unlock() + + fp.t = t +} + +func (fp *Failpoint) ClearTerm() error { + fp.failpointMu.Lock() + defer fp.failpointMu.Unlock() + + if fp.t == nil { + return ErrDisabled + } + fp.t = nil + + return nil +} + +func (fp *Failpoint) Status() (string, int, error) { + fp.failpointMu.RLock() + defer fp.failpointMu.RUnlock() + + t := fp.t + if t == nil { + return "", 0, ErrDisabled + } + + return t.desc, t.counter, nil +} diff --git a/runtime/runtime.go b/runtime/runtime.go index 1e57932..3230fb6 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -85,10 +85,7 @@ func Enable(name, inTerms string) error { return err } - fp.failpointMu.Lock() - defer fp.failpointMu.Unlock() - - fp.t = t + fp.SetTerm(t) return nil } @@ -102,15 +99,7 @@ func Disable(name string) error { return ErrNoExist } - fp.failpointMu.Lock() - defer fp.failpointMu.Unlock() - - if fp.t == nil { - return ErrDisabled - } - fp.t = nil - - return nil + return fp.ClearTerm() } // Status gives the current setting and execution count for the failpoint @@ -122,15 +111,7 @@ func Status(failpath string) (string, int, error) { return "", 0, ErrNoExist } - fp.failpointMu.RLock() - defer fp.failpointMu.RUnlock() - - t := fp.t - if t == nil { - return "", 0, ErrDisabled - } - - return t.desc, t.counter, nil + return fp.Status() } func List() []string { From 60b65e206f0c72a987c70e1112b2e12048f15927 Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Thu, 16 May 2024 03:30:58 +0800 Subject: [PATCH 6/7] Rename mutex within Failpoint from failpointMu to mux Signed-off-by: Chun-Hung Tseng Co-authored-by: Marek Siarkowicz --- runtime/failpoint.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/runtime/failpoint.go b/runtime/failpoint.go index 4bb3d8e..ea8db3f 100644 --- a/runtime/failpoint.go +++ b/runtime/failpoint.go @@ -20,8 +20,8 @@ import ( ) type Failpoint struct { - t *terms - failpointMu sync.RWMutex + t *terms + mux sync.RWMutex } func NewFailpoint(name string) *Failpoint { @@ -34,10 +34,10 @@ func NewFailpoint(name string) *Failpoint { // Notice that during the exection of Acquire(), the failpoint can be disabled, // but the already in-flight execution won't be terminated func (fp *Failpoint) Acquire() (interface{}, error) { - fp.failpointMu.RLock() + fp.mux.RLock() // terms are locked during execution, so deepcopy is not required as no change can be made during execution cachedT := fp.t - fp.failpointMu.RUnlock() + fp.mux.RUnlock() if cachedT == nil { return nil, ErrDisabled @@ -55,15 +55,15 @@ func (fp *Failpoint) BadType(v interface{}, t string) { } func (fp *Failpoint) SetTerm(t *terms) { - fp.failpointMu.Lock() - defer fp.failpointMu.Unlock() + fp.mux.Lock() + defer fp.mux.Unlock() fp.t = t } func (fp *Failpoint) ClearTerm() error { - fp.failpointMu.Lock() - defer fp.failpointMu.Unlock() + fp.mux.Lock() + defer fp.mux.Unlock() if fp.t == nil { return ErrDisabled @@ -74,8 +74,8 @@ func (fp *Failpoint) ClearTerm() error { } func (fp *Failpoint) Status() (string, int, error) { - fp.failpointMu.RLock() - defer fp.failpointMu.RUnlock() + fp.mux.RLock() + defer fp.mux.RUnlock() t := fp.t if t == nil { From 13d483e268529eebe679f613f71cc8454c8775d2 Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Wed, 15 May 2024 21:40:20 +0200 Subject: [PATCH 7/7] Add comments regarding failpointsMu and panicMu Signed-off-by: Chun-Hung Tseng Co-authored-by: Marek Siarkowicz --- runtime/http.go | 4 +++- runtime/runtime.go | 8 +++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/runtime/http.go b/runtime/http.go index f006869..84ecaf7 100644 --- a/runtime/http.go +++ b/runtime/http.go @@ -38,7 +38,9 @@ func serve(host string) error { func (*httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Ensures the server(runtime) doesn't panic due to the execution of - // panic failpoints during processing of the HTTP request + // panic failpoints during processing of the HTTP request, as the + // sender of the HTTP request should not be affected by the execution + // of the panic failpoints and crash as a side effect panicMu.Lock() defer panicMu.Unlock() diff --git a/runtime/runtime.go b/runtime/runtime.go index 3230fb6..f6e1589 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -25,11 +25,17 @@ var ( ErrNoExist = fmt.Errorf("failpoint: failpoint does not exist") ErrDisabled = fmt.Errorf("failpoint: failpoint is disabled") - failpoints map[string]*Failpoint + failpoints map[string]*Failpoint + // failpointsMu protects the failpoints map, preventing concurrent + // accesses during commands such as Enabling and Disabling failpointsMu sync.RWMutex envTerms map[string]string + // panicMu (panic mutex) ensures that the action of panic failpoints + // and serving of the HTTP requests won't be executed at the same time, + // avoiding the possibility that the server runtime panics during processing + // requests panicMu sync.Mutex )