Skip to content

Commit

Permalink
Merge pull request #585 from karlkfi/karl-watch-perm-error
Browse files Browse the repository at this point in the history
fix: Stop StatusWatcher on Forbidden API error
  • Loading branch information
k8s-ci-robot authored May 17, 2022
2 parents 2d68222 + 57bbe71 commit 0cb95ee
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 65 deletions.
6 changes: 6 additions & 0 deletions pkg/apply/taskrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ func (tsr *TaskStatusRunner) Run(
statusEvent.Error)
if currentTask != nil {
currentTask.Cancel(taskContext)
} else {
// tasks not started yet - abort now
return complete(abortReason)
}
continue
}
Expand Down Expand Up @@ -207,6 +210,9 @@ func (tsr *TaskStatusRunner) Run(
klog.V(7).Infof("Runner aborting: %v", abortReason)
if currentTask != nil {
currentTask.Cancel(taskContext)
} else {
// tasks not started yet - abort now
return complete(abortReason)
}
}
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/kstatus/watcher/dynamic_informer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package watcher

import (
"context"
"regexp"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -55,3 +57,36 @@ func (f *DynamicInformerFactory) NewInformer(ctx context.Context, mapping *meta.
f.Indexers,
)
}

// resourceNotFoundMessage is the condition message for metav1.StatusReasonNotFound.
// This is necessary because the Informer doesn't properly wrap list errors.
// https://github.com/kubernetes/client-go/blob/v0.24.0/tools/cache/reflector.go#L325
// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L448
// TODO: Remove once fix is released (1.25+): https://github.com/kubernetes/kubernetes/pull/110076
const resourceNotFoundMessage = "the server could not find the requested resource"

// containsNotFoundMessage checks if the error string contains the message for
// StatusReasonNotFound.
func containsNotFoundMessage(err error) bool {
return strings.Contains(err.Error(), resourceNotFoundMessage)
}

// resourceForbiddenMessagePattern is a regex pattern to match the condition
// message for metav1.StatusForbidden.
// This is necessary because the Informer doesn't properly wrap list errors.
// https://github.com/kubernetes/client-go/blob/v0.24.0/tools/cache/reflector.go#L325
// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L458
// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L208
// https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/responsewriters/errors.go#L51
// TODO: Remove once fix is released (1.25+): https://github.com/kubernetes/kubernetes/pull/110076
const resourceForbiddenMessagePattern = `(.+) is forbidden: User "(.*)" cannot (.+) resource "(.*)" in API group "(.*)"`

// resourceForbiddenMessageRegexp is the pre-compiled Regexp of
// resourceForbiddenMessagePattern.
var resourceForbiddenMessageRegexp = regexp.MustCompile(resourceForbiddenMessagePattern)

// containsForbiddenMessage checks if the error string contains the message for
// StatusForbidden.
func containsForbiddenMessage(err error) bool {
return resourceForbiddenMessageRegexp.Match([]byte(err.Error()))
}
120 changes: 104 additions & 16 deletions pkg/kstatus/watcher/dynamic_informer_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package watcher

import (
"context"
"net/http"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -54,13 +54,7 @@ func TestResourceNotFoundError(t *testing.T) {
// dynamicClient converts Status objects from the apiserver into errors.
// So we can just return the right error here to simulate an error from
// the apiserver.
name := "" // unused by LIST requests
// The apisevrer confusingly does not return apierrors.NewNotFound,
// which has a nice constant for its error message.
// err = apierrors.NewNotFound(exampleGR, name)
// Instead it uses apierrors.NewGenericServerResponse, which uses
// a hard-coded error message.
err = apierrors.NewGenericServerResponse(http.StatusNotFound, "list", exampleGR, name, "unused", -1, false)
err = newGenericServerResponse(action, newNotFoundResourceStatusError(action))
return true, nil, err
})
},
Expand Down Expand Up @@ -88,13 +82,7 @@ func TestResourceNotFoundError(t *testing.T) {
// dynamicClient converts Status objects from the apiserver into errors.
// So we can just return the right error here to simulate an error from
// the apiserver.
name := "" // unused by LIST requests
// The apisevrer confusingly does not return apierrors.NewNotFound,
// which has a nice constant for its error message.
// err = apierrors.NewNotFound(exampleGR, name)
// Instead it uses apierrors.NewGenericServerResponse, which uses
// a hard-coded error message.
err = apierrors.NewGenericServerResponse(http.StatusNotFound, "list", exampleGR, name, "unused", -1, false)
err = newGenericServerResponse(action, newNotFoundResourceStatusError(action))
return true, nil, err
})
},
Expand All @@ -110,7 +98,67 @@ func TestResourceNotFoundError(t *testing.T) {
t.Errorf("Expected typed NotFound error, but got untyped NotFound error: %v", err)
default:
// If we got this error, the test is probably broken.
t.Errorf("Expected untyped NotFound error, but got a different error: %v", err)
t.Errorf("Expected typed NotFound error, but got a different error: %v", err)
}
},
},
{
name: "List resource forbidden error",
setup: func(fakeClient *dynamicfake.FakeDynamicClient) {
fakeClient.PrependReactor("list", exampleGR.Resource, func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
listAction := action.(clienttesting.ListAction)
if listAction.GetNamespace() != namespace {
assert.Fail(t, "Received unexpected LIST namespace: %s", listAction.GetNamespace())
return false, nil, nil
}
// dynamicClient converts Status objects from the apiserver into errors.
// So we can just return the right error here to simulate an error from
// the apiserver.
err = newGenericServerResponse(action, newForbiddenResourceStatusError(action))
return true, nil, err
})
},
errorHandler: func(t *testing.T, err error) {
switch {
case apierrors.IsForbidden(err):
// If we got this error, something changed in the apiserver or
// client. If the client changed, it might be safe to stop parsing
// the error string.
t.Errorf("Expected untyped Forbidden error, but got typed Forbidden error: %v", err)
case containsForbiddenMessage(err):
// This is the expected hack, because the Informer/Reflector
// doesn't wrap the error with "%w".
t.Logf("Received expected untyped Forbidden error: %v", err)
default:
// If we got this error, the test is probably broken.
t.Errorf("Expected untyped Forbidden error, but got a different error: %v", err)
}
},
},
{
name: "Watch resource forbidden error",
setup: func(fakeClient *dynamicfake.FakeDynamicClient) {
fakeClient.PrependWatchReactor(exampleGR.Resource, func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
// dynamicClient converts Status objects from the apiserver into errors.
// So we can just return the right error here to simulate an error from
// the apiserver.
err = newGenericServerResponse(action, newForbiddenResourceStatusError(action))
return true, nil, err
})
},
errorHandler: func(t *testing.T, err error) {
switch {
case apierrors.IsForbidden(err):
// This is the expected behavior, because the
// Informer/Reflector DOES wrap watch errors
t.Logf("Received expected untyped Forbidden error: %v", err)
case containsForbiddenMessage(err):
// If this happens, there was a regression.
// Watch errors are expected to be wrapped with "%w"
t.Errorf("Expected typed Forbidden error, but got untyped Forbidden error: %v", err)
default:
// If we got this error, the test is probably broken.
t.Errorf("Expected typed Forbidden error, but got a different error: %v", err)
}
},
},
Expand Down Expand Up @@ -164,3 +212,43 @@ func TestResourceNotFoundError(t *testing.T) {
})
}
}

// newForbiddenResourceStatusError emulates a Forbidden error from the apiserver
// for a namespace-scoped resource.
// https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/responsewriters/errors.go#L36
func newForbiddenResourceStatusError(action clienttesting.Action) *apierrors.StatusError {
username := "unused"
verb := action.GetVerb()
resource := action.GetResource().Resource
if subresource := action.GetSubresource(); len(subresource) > 0 {
resource = resource + "/" + subresource
}
apiGroup := action.GetResource().Group
namespace := action.GetNamespace()

// https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/responsewriters/errors.go#L51
err := fmt.Errorf("User %q cannot %s resource %q in API group %q in the namespace %q",
username, verb, resource, apiGroup, namespace)

qualifiedResource := action.GetResource().GroupResource()
name := "" // unused by ListAndWatch
return apierrors.NewForbidden(qualifiedResource, name, err)
}

// newNotFoundResourceStatusError emulates a NotFOund error from the apiserver
// for a resource (not an object).
func newNotFoundResourceStatusError(action clienttesting.Action) *apierrors.StatusError {
qualifiedResource := action.GetResource().GroupResource()
name := "" // unused by ListAndWatch
return apierrors.NewNotFound(qualifiedResource, name)
}

// newGenericServerResponse emulates a StatusError from the apiserver.
func newGenericServerResponse(action clienttesting.Action, statusError *apierrors.StatusError) *apierrors.StatusError {
errorCode := int(statusError.ErrStatus.Code)
verb := action.GetVerb()
qualifiedResource := action.GetResource().GroupResource()
name := statusError.ErrStatus.Details.Name
// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L435
return apierrors.NewGenericServerResponse(errorCode, verb, qualifiedResource, name, statusError.Error(), -1, false)
}
3 changes: 3 additions & 0 deletions pkg/kstatus/watcher/event_funnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"

"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
)

Expand Down Expand Up @@ -37,6 +38,7 @@ func newEventFunnel(ctx context.Context) *eventFunnel {
go func() {
defer func() {
// Don't close counterCh, otherwise AddInputChannel may panic.
klog.V(5).Info("Closing funnel")
close(funnel.outCh)
close(funnel.doneCh)
}()
Expand All @@ -48,6 +50,7 @@ func newEventFunnel(ctx context.Context) *eventFunnel {
select {
case delta := <-funnel.counterCh:
inputs += delta
klog.V(5).Infof("Funnel input channels (%+d): %d", delta, inputs)
case <-ctxDoneCh:
// Stop waiting for context closure.
// Nil channel avoids busy waiting.
Expand Down
Loading

0 comments on commit 0cb95ee

Please sign in to comment.