Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
Watch namespace objects, allow partial targetNamespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
kaczyns committed Jun 4, 2020
1 parent dfffa7d commit de5e240
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 57 deletions.
6 changes: 5 additions & 1 deletion pkg/controller/kabaneroplatform/gitops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ type testLogger struct{}
func (t testLogger) Info(msg string, keysAndValues ...interface{}) { fmt.Printf("Info: %v \n", msg) }
func (t testLogger) Enabled() bool { return true }
func (t testLogger) Error(err error, msg string, keysAndValues ...interface{}) {
fmt.Printf("Error: %v: %v\n", msg, err.Error())
if err != nil {
fmt.Printf("Error: %v: %v\n", msg, err.Error())
} else {
fmt.Printf("Error: %v\n", msg)
}
}
func (t testLogger) V(level int) logr.InfoLogger { return t }
func (t testLogger) WithValues(keysAndValues ...interface{}) logr.Logger { return t }
Expand Down
77 changes: 58 additions & 19 deletions pkg/controller/kabaneroplatform/kabaneroplatform_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
kabanerov1alpha1 "github.com/kabanero-io/kabanero-operator/pkg/apis/kabanero/v1alpha1"
kabanerov1alpha2 "github.com/kabanero-io/kabanero-operator/pkg/apis/kabanero/v1alpha2"
"github.com/kabanero-io/kabanero-operator/pkg/controller/utils/timer"
"github.com/operator-framework/operator-sdk/pkg/k8sutil"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -60,19 +61,24 @@ var reconcileFuncs = []reconcileFuncType{
// Add creates a new Kabanero Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
return add(mgr, newReconciler(mgr))
}
// It is very unlikely that this would fail, since the main also checks for it.
watchNamespace, err := k8sutil.GetWatchNamespace()
if err != nil {
return err
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileKabanero{
// Lets be sure a single namespace is specified.
numberOfWatchNamespaces := len(strings.Split(watchNamespace, ","))
if numberOfWatchNamespaces != 1 {
return fmt.Errorf("%v watch namespaces were specified, but only a single watch namespace is supported: %v", numberOfWatchNamespaces, watchNamespace)
}

r := &ReconcileKabanero{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
requeueDelayMap: make(map[string]RequeueData)}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
requeueDelayMap: make(map[string]RequeueData),
watchNamespace: watchNamespace}

// Create a new controller
c, err := controller.New("kabaneroplatform-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
Expand Down Expand Up @@ -105,6 +111,16 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

// Watch Namespace instances. We only care about create and delete events, not update events.
// When we see that a namespace has been created/deleted, we need to process any Kabanero objects that
// reference that namespace.
err = c.Watch(&source.Kind{Type: &corev1.Namespace{}}, &handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(r.targetNamespaceMapFunc)}, predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool { return false }})
if err != nil {
return err
}

/* Useful if RoleBindingList is changed to use Structured instead of Unstructured
// Index Rolebindings by name
if err := mgr.GetFieldIndexer().IndexField(&rbacv1.RoleBinding{}, "metadata.name", func(rawObj runtime.Object) []string {
Expand All @@ -118,22 +134,17 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return nil
}

func getOperatorImage(c client.Client) (string, error) {
func (r *ReconcileKabanero) getOperatorImage() (string, error) {
// First, read the POD_NAME env variable. This is set in the deployment spec in the CSV.
podName := os.Getenv("POD_NAME")
if len(podName) == 0 {
return "", fmt.Errorf("The POD_NAME environment variable is not set, or is empty")
}

namespace := os.Getenv("WATCH_NAMESPACE")
if len(namespace) == 0 {
return "", fmt.Errorf("The WATCH_NAMESPACE environment variable is not set, or is empty")
}

// Second, get the Pod instance with that name
pod := &corev1.Pod{}
kubePodName := types.NamespacedName{Name: podName, Namespace: namespace}
err := c.Get(context.TODO(), kubePodName, pod)
kubePodName := types.NamespacedName{Name: podName, Namespace: r.watchNamespace}
err := r.client.Get(context.TODO(), kubePodName, pod)
if err != nil {
return "", fmt.Errorf("Pod %v could not be retrieved: %v", podName, err.Error())
}
Expand Down Expand Up @@ -178,6 +189,7 @@ type ReconcileKabanero struct {
client client.Client
scheme *runtime.Scheme
requeueDelayMap map[string]RequeueData
watchNamespace string
}

// RequeueData stores information that enables reconcile operations to be retried.
Expand All @@ -186,6 +198,33 @@ type RequeueData struct {
futureTime time.Time
}

// When we see that a namespace has changed, we want to reconcile any Kabanero instances that
// reference that namespace in its targetNamespaces list.
func (r *ReconcileKabanero) targetNamespaceMapFunc(a handler.MapObject) []reconcile.Request {
log.Info(fmt.Sprintf("Processing for change in namespace %v", a.Meta.GetName()))

// List Kabanero instances
kabaneros := &kabanerov1alpha2.KabaneroList{}
err := r.client.List(context.TODO(), kabaneros, client.InNamespace(r.watchNamespace))
if err != nil {
log.Error(err, fmt.Sprintf("Could not process namespace event for \"%v\"", a.Meta.GetName()))
return nil
}

// For each Kabanero instance, if spec.targetNamespaces includes a.meta.name then add a reconcile request.
requests := []reconcile.Request{}
for _, kabanero := range kabaneros.Items {
for _, namespace := range kabanero.Spec.TargetNamespaces {
if namespace == a.Meta.GetName() {
requests = append(requests, reconcile.Request{types.NamespacedName{Name: kabanero.Name, Namespace: kabanero.Namespace}})
break
}
}
}

return requests
}

// Determine if requeue is needed or not.
// If requeue is required set RequeueAfter to 60 seconds the first time.
// After the first time increase RequeueAfter by 60 seconds up to a max of 15 minutes.
Expand Down Expand Up @@ -289,7 +328,7 @@ func (r *ReconcileKabanero) Reconcile(request reconcile.Request) (reconcile.Resu
// in the add() method because the client is not started yet (that would have been ideal).
operatorContainerImageOp.Do(func() {
var err error
operatorContainerImage, err = getOperatorImage(r.client)
operatorContainerImage, err = r.getOperatorImage()
if err != nil {
log.Error(err, "Could not read the kabanero-operator container image from the pod")
}
Expand Down
70 changes: 48 additions & 22 deletions pkg/controller/kabaneroplatform/targetnamespaces.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package kabaneroplatform
import (
"context"
"errors"
"fmt"
"strings"

kabanerov1alpha2 "github.com/kabanero-io/kabanero-operator/pkg/apis/kabanero/v1alpha2"

Expand All @@ -10,7 +12,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -82,31 +84,36 @@ func reconcileTargetNamespaces(ctx context.Context, k *kabanerov1alpha2.Kabanero
Controller: &ownerIsController,
}

// Compute the new, deleted, and common namespace names
// Be sure each requested namespace exists. This will catch namespaces added to the list, as well as
// namespaces that were deleted but not removed from the targetNamespaces list.
specTargetNamespaces := sets.NewString(getTargetNamespaces(k.Spec.TargetNamespaces, k.GetNamespace())...)
statusTargetNamespaces := sets.NewString(getTargetNamespaces(k.Status.TargetNamespaces.Namespaces, k.GetNamespace())...)
oldNamespaces := statusTargetNamespaces.Difference(specTargetNamespaces)
newNamespaces := specTargetNamespaces.Difference(statusTargetNamespaces)
unchangedNamespaces := specTargetNamespaces.Intersection(statusTargetNamespaces)

// For new namespaces, be sure they exist.
for namespace, _ := range newNamespaces {
var errorNamespaces []string
for namespace, _ := range specTargetNamespaces {
exists, err := namespaceExists(ctx, namespace, cl)
if err != nil {
// Caller will log the error. Just update the status and return.
k.Status.TargetNamespaces.Ready = "False"
k.Status.TargetNamespaces.Message = fmt.Sprintf("Could not check status of namespace %v: %v", namespace, err.Error())
return err
reqLogger.Error(err, fmt.Sprintf("Could not check status of namespace %v", namespace))
errorNamespaces = append(errorNamespaces, namespace)
}
if exists == false {
// Caller will log the error. Just update the status and return.
err = fmt.Errorf("targetNamespace %v does not exist", namespace)
k.Status.TargetNamespaces.Ready = "False"
k.Status.TargetNamespaces.Message = err.Error()
return err
reqLogger.Error(nil, fmt.Sprintf("Target namespace %v does not exist", namespace))
errorNamespaces = append(errorNamespaces, namespace)
}
}

for _, namespace := range errorNamespaces {
delete(specTargetNamespaces, namespace)
}

// TODO: did I do this right? need to process the namespaces, then look at errorNamespaces and
// generate an error message for namespaces that did not exist. Once we have a watch set
// up, that should take care of partially active lists, and the delete case.

// Compute the new, deleted, and common namespace names
statusTargetNamespaces := sets.NewString(getTargetNamespaces(k.Status.TargetNamespaces.Namespaces, k.GetNamespace())...)
oldNamespaces := statusTargetNamespaces.Difference(specTargetNamespaces)
newNamespaces := specTargetNamespaces.Difference(statusTargetNamespaces)
unchangedNamespaces := specTargetNamespaces.Intersection(statusTargetNamespaces)

// Create the templates
bindingTemplates := createBindingTemplates(k.GetNamespace())

Expand Down Expand Up @@ -144,9 +151,28 @@ func reconcileTargetNamespaces(ctx context.Context, k *kabanerov1alpha2.Kabanero
}

// Update the Status to reflect the new target namespaces.
k.Status.TargetNamespaces.Namespaces = k.Spec.TargetNamespaces
k.Status.TargetNamespaces.Ready = "True"
k.Status.TargetNamespaces.Message = ""
k.Status.TargetNamespaces.Namespaces = nil
for _, namespace := range k.Spec.TargetNamespaces {
isErrorNamespace := false
for _, errorNamespace := range errorNamespaces {
if errorNamespace == namespace {
isErrorNamespace = true
break
}
}
if isErrorNamespace == false {
k.Status.TargetNamespaces.Namespaces = append(k.Status.TargetNamespaces.Namespaces, namespace)
}
}

if len(errorNamespaces) == 0 {
k.Status.TargetNamespaces.Ready = "True"
k.Status.TargetNamespaces.Message = ""
} else {
k.Status.TargetNamespaces.Ready = "False"
k.Status.TargetNamespaces.Message = fmt.Sprintf("The following namespaces could not be processed: %v", strings.Join(errorNamespaces, ","))
return errors.New(k.Status.TargetNamespaces.Message)
}

return nil
}
Expand All @@ -164,7 +190,7 @@ func namespaceExists(ctx context.Context, inNamespace string, cl client.Client)
return true, nil
}

if errors.IsNotFound(err) {
if kerrors.IsNotFound(err) {
return false, nil
}

Expand Down
78 changes: 63 additions & 15 deletions pkg/controller/kabaneroplatform/targetnamespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,8 @@ func TestReconcileTargetNamespacesNamespaceNotExist(t *testing.T) {

// Make sure the kabanero status was not updated with the target namespace,
// since it did not exist.
if len(k.Status.TargetNamespaces.Namespaces) != 1 {
t.Fatal(fmt.Sprintf("Kabanero status should have 1 target namespace, but has %v: %v", len(k.Status.TargetNamespaces.Namespaces), k.Status.TargetNamespaces.Namespaces))
}

if k.Status.TargetNamespaces.Namespaces[0] != activeNamespace {
t.Fatal(fmt.Sprintf("Kabanero status target namespace should be %v, but is %v", activeNamespace, k.Status.TargetNamespaces.Namespaces[0]))
if len(k.Status.TargetNamespaces.Namespaces) != 0 {
t.Fatal(fmt.Sprintf("Kabanero status should have 0 target namespace, but has %v: %v", len(k.Status.TargetNamespaces.Namespaces), k.Status.TargetNamespaces.Namespaces))
}

if k.Status.TargetNamespaces.Ready != "False" {
Expand All @@ -209,15 +205,9 @@ func TestReconcileTargetNamespacesNamespaceNotExist(t *testing.T) {
t.Fatal("Kabanero target namespace status contains no error message")
}

// Make sure the RoleBinding map did not get updated.
if len(client.objs) != 1 {
t.Fatal(fmt.Sprintf("Should have created one RoleBinding, but created %v: %#v", len(client.objs), client.objs))
}

for key, _ := range client.objs {
if key.Namespace != activeNamespace {
t.Fatal(fmt.Sprintf("Should have created RoleBinding in %v namespace, but created in %v namespace", activeNamespace, key.Namespace))
}
// Make sure the RoleBinding map got cleared.
if len(client.objs) != 0 {
t.Fatal(fmt.Sprintf("Should have created 0 RoleBindings, but created %v: %#v", len(client.objs), client.objs))
}

// OK, now create the namespace and make sure things resolve as per normal.
Expand Down Expand Up @@ -258,6 +248,64 @@ func TestReconcileTargetNamespacesNamespaceNotExist(t *testing.T) {
}
}

// Apply the role bindings to a namespace that does not exist.
func TestTargetNamespacesGotDeleted(t *testing.T) {
targetNamespace1 := "fred"
targetNamespace2 := "george"
activeNamespace1 := "fred"
activeNamespace2 := "george"

k := kabanerov1alpha2.Kabanero{
ObjectMeta: metav1.ObjectMeta{Name: "kabanero", Namespace: "kabanero"},
Spec: kabanerov1alpha2.KabaneroSpec{
TargetNamespaces: []string{targetNamespace1, targetNamespace2},
},
Status: kabanerov1alpha2.KabaneroStatus {
TargetNamespaces: kabanerov1alpha2.TargetNamespaceStatus {
Namespaces: []string{activeNamespace1, activeNamespace2},
Ready: "True",
},
},
}

// Set up pre-existing objects
existingNamespaces := make(map[string]bool)
existingNamespaces[activeNamespace1] = true
existingRoleBinding := client.ObjectKey{Name: "kabanero-pipeline-deploy-rolebinding", Namespace: activeNamespace1}
existingRoleBindings := make(map[client.ObjectKey]bool)
existingRoleBindings[existingRoleBinding] = true
client := targetnamespaceTestClient{existingRoleBindings, existingNamespaces}

err := reconcileTargetNamespaces(context.TODO(), &k, client, nslog)

if err == nil {
t.Fatal("Did not return an error, but should have because namespace \"george\" does not exist")
}

// Make sure the kabanero status was not updated with the target namespace,
// since it did not exist.
if len(k.Status.TargetNamespaces.Namespaces) != 1 {
t.Fatal(fmt.Sprintf("Kabanero status should have 1 target namespace, but has %v: %v", len(k.Status.TargetNamespaces.Namespaces), k.Status.TargetNamespaces.Namespaces))
}

if k.Status.TargetNamespaces.Namespaces[0] != "fred" {
t.Fatal(fmt.Sprintf("Kabanero status target namespace is not \"fred\", but is %v", k.Status.TargetNamespaces.Namespaces[0]))
}

if k.Status.TargetNamespaces.Ready != "False" {
t.Fatal(fmt.Sprintf("Kabanero target namespace status is not False: %v", k.Status.TargetNamespaces.Ready))
}

if len(k.Status.TargetNamespaces.Message) == 0 {
t.Fatal("Kabanero target namespace status contains no error message")
}

// Make sure the RoleBinding map got cleared.
if len(client.objs) != 1 {
t.Fatal(fmt.Sprintf("Should have created 1 RoleBindings, but created %v: %#v", len(client.objs), client.objs))
}
}

// Test callout from finalizer
func TestCleanupTargetNamespaces(t *testing.T) {
targetNamespace := "fred"
Expand Down

0 comments on commit de5e240

Please sign in to comment.