Skip to content

Commit

Permalink
add target group policy controller and status updates (#509)
Browse files Browse the repository at this point in the history
* add target group policy controller and status updates
  • Loading branch information
mikhail-aws authored Nov 17, 2023
1 parent 61689bc commit ed90166
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 19 deletions.
5 changes: 5 additions & 0 deletions cmd/aws-application-networking-k8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ func main() {
setupLog.Fatalf("iam auth policy controller setup failed: %s", err)
}

err = controllers.RegisterTargetGroupPolicyController(ctrlLog.Named("target-group-policy"), mgr)
if err != nil {
setupLog.Fatalf("target group policy controller setup failed: %s", err)
}

err = controllers.RegisterVpcAssociationPolicyController(ctrlLog.Named("vpc-association-policy"), cloud, finalizerManager, mgr)
if err != nil {
setupLog.Fatalf("vpc association policy controller setup failed: %s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,14 @@ spec:
- targetRef
type: object
status:
description: TargetGroupPolicyStatus defines the observed state of AccessLogPolicy.
default:
conditions:
- lastTransitionTime: "1970-01-01T00:00:00Z"
message: Waiting for controller
reason: NotReconciled
status: Unknown
type: Accepted
description: Status defines the current state of TargetGroupPolicy.
properties:
conditions:
default:
Expand Down Expand Up @@ -254,4 +261,5 @@ spec:
type: object
served: true
storage: true
subresources: {}
subresources:
status: {}
1 change: 1 addition & 0 deletions controllers/eventhandlers/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (h *policyEventHandler[T]) MapObjectToPolicy() handler.EventHandler {

func (h *policyEventHandler[T]) mapObjectToPolicy(ctx context.Context, eventObj client.Object) []reconcile.Request {
var requests []reconcile.Request

policies, err := policyhelper.GetAttachedPolicies(ctx, h.client, k8s.NamespacedName(eventObj), *new(T))
if err != nil {
h.log.Errorf("Failed calling k8s operation: %s", err.Error())
Expand Down
142 changes: 142 additions & 0 deletions controllers/targetgrouppolicy_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package controllers

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"

anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/k8s/policyhelper"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)

type TargetGroupPolicyController struct {
log gwlog.Logger
client client.Client
}

func RegisterTargetGroupPolicyController(log gwlog.Logger, mgr ctrl.Manager) error {
controller := &TargetGroupPolicyController{
log: log,
client: mgr.GetClient(),
}
mapfn := targetGroupPolicyMapFunc(mgr.GetClient(), log)
return ctrl.NewControllerManagedBy(mgr).
For(&anv1alpha1.TargetGroupPolicy{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(mapfn)).
Complete(controller)
}

func (c *TargetGroupPolicyController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
tgPolicy := &anv1alpha1.TargetGroupPolicy{}
err := c.client.Get(ctx, req.NamespacedName, tgPolicy)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
c.log.Infow("reconcile target group policy", "req", req, "targetRef", tgPolicy.Spec.TargetRef)

validationErr := c.validateSpec(ctx, tgPolicy)
reason := validationErrToStatusReason(validationErr)
msg := ""
if validationErr != nil {
msg = validationErr.Error()
}
c.updatePolicyCondition(tgPolicy, reason, msg)
err = c.client.Status().Update(ctx, tgPolicy)
if err != nil {
return ctrl.Result{}, err
}

c.log.Infow("reconciled target group policy",
"req", req,
"targetRef", tgPolicy.Spec.TargetRef,
)
return ctrl.Result{}, nil
}

func (c *TargetGroupPolicyController) validateSpec(ctx context.Context, tgPolicy *anv1alpha1.TargetGroupPolicy) error {
tr := tgPolicy.Spec.TargetRef
if tr.Group != corev1.GroupName {
return fmt.Errorf("%w: %s", GroupNameError, tr.Group)
}
if string(tr.Kind) != "Service" {
return fmt.Errorf("%w: %s", KindError, tr.Kind)
}
tgref := types.NamespacedName{
Namespace: tgPolicy.Namespace,
Name: string(tgPolicy.Spec.TargetRef.Name),
}
valid, err := policyhelper.GetValidPolicy(ctx, c.client, tgref, tgPolicy)
if err != nil {
return nil
}
if valid != nil && valid.GetNamespacedName() != tgPolicy.GetNamespacedName() {
return fmt.Errorf("%w, with policy %s", TargetRefConflict, valid.GetName())
}
refExists, err := c.targetRefExists(ctx, tgPolicy)
if err != nil {
return err
}
if !refExists {
return fmt.Errorf("%w: %s", TargetRefNotFound, tr.Name)
}
return nil
}

func (c *TargetGroupPolicyController) targetRefExists(ctx context.Context, tgPolicy *anv1alpha1.TargetGroupPolicy) (bool, error) {
tr := tgPolicy.Spec.TargetRef
var obj client.Object
switch tr.Kind {
case "Service":
obj = &corev1.Service{}
default:
panic("unexpected targetRef Kind=" + tr.Kind)
}
return k8s.ObjExists(ctx, c.client, types.NamespacedName{
Namespace: tgPolicy.Namespace,
Name: string(tr.Name),
}, obj)
}

func (c *TargetGroupPolicyController) updatePolicyCondition(tgPolicy *anv1alpha1.TargetGroupPolicy, reason gwv1alpha2.PolicyConditionReason, msg string) {
status := metav1.ConditionTrue
if reason != gwv1alpha2.PolicyReasonAccepted {
status = metav1.ConditionFalse
}
cnd := metav1.Condition{
Type: string(gwv1alpha2.PolicyConditionAccepted),
Status: status,
Reason: string(reason),
Message: msg,
}
meta.SetStatusCondition(&tgPolicy.Status.Conditions, cnd)
}

func targetGroupPolicyMapFunc(c client.Client, log gwlog.Logger) handler.MapFunc {
return func(ctx context.Context, obj client.Object) []ctrl.Request {
requests := []ctrl.Request{}
policies := &anv1alpha1.TargetGroupPolicyList{}
err := c.List(ctx, policies, &client.ListOptions{Namespace: obj.GetNamespace()})
if err != nil {
log.Error(err)
return requests
}
for _, policy := range policies.Items {
if obj.GetName() == string(policy.Spec.TargetRef.Name) {
requests = append(requests, ctrl.Request{NamespacedName: policy.GetNamespacedName()})
}
}
return requests
}
}
12 changes: 10 additions & 2 deletions helm/crds/application-networking.k8s.aws_targetgrouppolicies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,14 @@ spec:
- targetRef
type: object
status:
description: TargetGroupPolicyStatus defines the observed state of AccessLogPolicy.
default:
conditions:
- lastTransitionTime: "1970-01-01T00:00:00Z"
message: Waiting for controller
reason: NotReconciled
status: Unknown
type: Accepted
description: Status defines the current state of TargetGroupPolicy.
properties:
conditions:
default:
Expand Down Expand Up @@ -254,4 +261,5 @@ spec:
type: object
served: true
storage: true
subresources: {}
subresources:
status: {}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ const (
// +kubebuilder:resource:categories=gateway-api,shortName=tgp
// +kubebuilder:storageversion
// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp`
// +kubebuilder:subresource:status
type TargetGroupPolicy struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec TargetGroupPolicySpec `json:"spec"`

// Status defines the current state of TargetGroupPolicy.
//
// +kubebuilder:default={conditions: {{type: "Accepted", status: "Unknown", reason:"NotReconciled", message:"Waiting for controller", lastTransitionTime: "1970-01-01T00:00:00Z"}}}
Status TargetGroupPolicyStatus `json:"status,omitempty"`
}

Expand Down
23 changes: 8 additions & 15 deletions pkg/gateway/model_build_targetgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,19 @@ import (
"errors"
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"

"github.com/aws/aws-sdk-go/service/vpclattice"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/aws-sdk-go/service/vpclattice"

anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/config"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/k8s/policyhelper"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)

type InvalidBackendRefError struct {
Expand Down Expand Up @@ -149,17 +145,16 @@ func (t *svcExportTargetGroupModelBuildTask) buildTargetGroup(ctx context.Contex
}
}

tgps, err := policyhelper.GetAttachedPolicies(ctx, t.client, k8s.NamespacedName(t.serviceExport), &anv1alpha1.TargetGroupPolicy{})
tgp, err := policyhelper.GetValidPolicy(ctx, t.client,
k8s.NamespacedName(t.serviceExport), &anv1alpha1.TargetGroupPolicy{})
if err != nil {
return nil, err
}

protocol := "HTTP"
protocolVersion := vpclattice.TargetGroupProtocolVersionHttp1
var healthCheckConfig *vpclattice.HealthCheckConfig
if len(tgps) > 0 {
// TODO: TGP conflicts should be handled correctly w/ status update, for now just picking up one
tgp := tgps[0]
if tgp != nil {
if tgp.Spec.Protocol != nil {
protocol = *tgp.Spec.Protocol
}
Expand Down Expand Up @@ -318,17 +313,15 @@ func (t *backendRefTargetGroupModelBuildTask) buildTargetGroupSpec(ctx context.C
}
}

tgps, err := policyhelper.GetAttachedPolicies(ctx, t.client, backendRefNsName, &anv1alpha1.TargetGroupPolicy{})
tgp, err := policyhelper.GetValidPolicy(ctx, t.client, backendRefNsName, &anv1alpha1.TargetGroupPolicy{})
if err != nil {
return model.TargetGroupSpec{}, err
}

protocol := "HTTP"
protocolVersion := vpclattice.TargetGroupProtocolVersionHttp1
var healthCheckConfig *vpclattice.HealthCheckConfig
if len(tgps) > 0 {
// TODO: TGP conflicts should be handled correctly w/ status update, for now just picking up one
tgp := tgps[0]
if tgp != nil {
if tgp.Spec.Protocol != nil {
protocol = *tgp.Spec.Protocol
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/k8s/policyhelper/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package policyhelper
import (
"context"
"fmt"
"strings"

"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -20,6 +22,19 @@ type policyInfo struct {
kind gwv1beta1.Kind
}

func GetValidPolicy[T core.Policy](ctx context.Context, k8sClient client.Client, searchTargetRef types.NamespacedName, policy T) (T, error) {
var empty T
policies, err := GetAttachedPolicies(ctx, k8sClient, searchTargetRef, policy)
conflictResolutionSort(policies)
if err != nil {
return empty, err
}
if len(policies) == 0 {
return empty, nil
}
return policies[0], nil
}

func GetAttachedPolicies[T core.Policy](ctx context.Context, k8sClient client.Client, searchTargetRef types.NamespacedName, policy T) ([]T, error) {
var policies []T
info, err := getPolicyInfo(policy)
Expand Down Expand Up @@ -76,3 +91,31 @@ func getPolicyInfo(policyType core.Policy) (policyInfo, error) {
return policyInfo{}, fmt.Errorf("unsupported policy type %T", policyType)
}
}

// sort in-place for policy conflict resolution
// 1. older policy (CreationTimeStamp) has precedence
// 2. alphabetical order namespace, then name
func conflictResolutionSort[T core.Policy](policies []T) {
slices.SortFunc(policies, func(a, b T) int {
tsA := a.GetCreationTimestamp().Time
tsB := b.GetCreationTimestamp().Time
switch {
case tsA.Before(tsB):
return -1
case tsA.After(tsB):
return 1
default:
nsnA := a.GetNamespacedName()
nsnB := b.GetNamespacedName()
nsA := nsnA.Namespace
nsB := nsnB.Namespace
nsCmp := strings.Compare(nsA, nsB)
if nsCmp != 0 {
return nsCmp
}
nA := nsnA.Name
nB := nsnB.Name
return strings.Compare(nA, nB)
}
})
}
Loading

0 comments on commit ed90166

Please sign in to comment.