Skip to content

Commit

Permalink
Add karpenter_machine_approver controller
Browse files Browse the repository at this point in the history
  • Loading branch information
muraee committed Jan 8, 2025
1 parent dc9b949 commit 7530a57
Show file tree
Hide file tree
Showing 8 changed files with 578 additions and 76 deletions.
1 change: 1 addition & 0 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ FROM registry.access.redhat.com/ubi9:latest
COPY --from=builder /hypershift/bin/hypershift \
/hypershift/bin/hcp \
/hypershift/bin/hypershift-operator \
/hypershift/bin/karpenter-operator \
/hypershift/bin/control-plane-operator \
/hypershift/bin/control-plane-pki-operator \
/usr/bin/
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ all: build e2e tests

pre-commit: all verify test

build: hypershift-operator control-plane-operator control-plane-pki-operator hypershift product-cli
build: hypershift-operator control-plane-operator control-plane-pki-operator karpenter-operator hypershift product-cli

.PHONY: update
update: workspace-sync api-deps api api-docs deps clients
Expand Down
7 changes: 7 additions & 0 deletions control-plane-pki-operator/certificates/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ func HasTrueCondition(csr *certificatesv1.CertificateSigningRequest, conditionTy
return false
}

// IsCertificateRequestPending returns true if a certificate request has no
// "Approved" or "Denied" conditions; false otherwise.
func IsCertificateRequestPending(csr *certificatesv1.CertificateSigningRequest) bool {
approved, denied := GetCertApprovalCondition(&csr.Status)
return !approved && !denied
}

func GetCertApprovalCondition(status *certificatesv1.CertificateSigningRequestStatus) (approved bool, denied bool) {
for _, c := range status.Conditions {
if c.Type == certificatesv1.CertificateApproved {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/openshift/hypershift/karpenter-operator/controllers/karpenter/assets"
supportassets "github.com/openshift/hypershift/support/assets"
"github.com/openshift/hypershift/support/upsert"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
Expand All @@ -17,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
Expand Down Expand Up @@ -73,7 +75,8 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, man
return []ctrl.Request{{NamespacedName: client.ObjectKey{Namespace: r.Namespace}}}
}
return nil
}))); err != nil {
},
))); err != nil {
return fmt.Errorf("failed to watch CRDs: %w", err)
}

Expand All @@ -92,7 +95,8 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, man
return nil
}
return []ctrl.Request{{NamespacedName: client.ObjectKeyFromObject(o)}}
}))); err != nil {
},
))); err != nil {
return fmt.Errorf("failed to watch Deployment: %w", err)
}

Expand Down
232 changes: 232 additions & 0 deletions karpenter-operator/controllers/karpenter/machine_approver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package karpenter

import (
"context"
"fmt"
"os"
"strings"

awsutil "github.com/openshift/hypershift/cmd/infra/aws/util"
"github.com/openshift/hypershift/control-plane-pki-operator/certificates"

certificatesv1 "k8s.io/api/certificates/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
certificatesv1client "k8s.io/client-go/kubernetes/typed/certificates/v1"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
)

const (
nodeBootstrapperUsername = "system:serviceaccount:openshift-machine-config-operator:node-bootstrapper"
)

type MachineApproverController struct {
client client.Client
certClient *certificatesv1client.CertificatesV1Client
}

func (r *MachineApproverController) SetupWithManager(mgr ctrl.Manager) error {
certClient, err := certificatesv1client.NewForConfig(mgr.GetConfig())
if err != nil {
return err
}
r.certClient = certClient
r.client = mgr.GetClient()

c, err := controller.New("karpenter_machine_approver", mgr, controller.Options{Reconciler: r})
if err != nil {
return fmt.Errorf("failed to construct karpenter_machine_approver controller: %w", err)
}

csrFilterFn := func(csr *certificatesv1.CertificateSigningRequest) bool {
if csr.Spec.SignerName != certificatesv1.KubeAPIServerClientKubeletSignerName {
return false
}
// only reconcile pending CSRs (not approved and not denied).
if !certificates.IsCertificateRequestPending(csr) {
return false
}
// only reconcile kubernetes.io/kube-apiserver-client-kubelet when it is created by the node bootstrapper
if csr.Spec.Username != nodeBootstrapperUsername {
mgr.GetLogger().Info("Ignoring csr because it is not from the node bootstrapper", "csr", csr.Name)
return false
}
return true
}

if err := c.Watch(source.Kind(
mgr.GetCache(),
&certificatesv1.CertificateSigningRequest{},
&handler.TypedEnqueueRequestForObject[*certificatesv1.CertificateSigningRequest]{},
predicate.NewTypedPredicateFuncs(csrFilterFn),
)); err != nil {
return fmt.Errorf("failed to watch CertificateSigningRequest: %v", err)
}

return nil
}

func (r *MachineApproverController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
log.Info("Reconciling CSR", "req", req)

csr := &certificatesv1.CertificateSigningRequest{}
if err := r.client.Get(ctx, req.NamespacedName, csr); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, fmt.Errorf("failed to get csr %s: %v", req.NamespacedName, err)
}

// Return early if deleted
if !csr.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}

// If a CSR is approved/denied after being added to the queue,
// but before we reconcile it, trying to approve it will result in an error and cause a loop.
// Return early if the CSR has been approved/denied externally.
if !certificates.IsCertificateRequestPending(csr) {
log.Info("CSR is already processed ", "csr", csr.Name)
return ctrl.Result{}, nil
}

ec2Client, err := getEC2Client()
if err != nil {
return ctrl.Result{}, err
}

authorized, err := r.authorize(ctx, csr, ec2Client)
if err != nil {
return ctrl.Result{}, err
}

if authorized {
log.Info("Attempting to approve CSR", "csr", csr.Name)
if err := r.approve(ctx, csr); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to approve csr %s: %v", csr.Name, err)
}
}

return ctrl.Result{}, nil
}

func (r *MachineApproverController) authorize(ctx context.Context, csr *certificatesv1.CertificateSigningRequest, ec2Client ec2iface.EC2API) (bool, error) {
x509cr, err := certificates.ParseCSR(csr.Spec.Request)
if err != nil {
return false, err
}

nodeName := strings.TrimPrefix(x509cr.Subject.CommonName, "system:node:")
if len(nodeName) == 0 {
return false, fmt.Errorf("subject common name does not have a valid node name")
}

nodeClaims, err := listNodeClaims(ctx, r.client)
if err != nil {
return false, err
}

dnsNames, err := getEC2InstancesDNSNames(ctx, nodeClaims, ec2Client)
if err != nil {
return false, err
}

for _, dnsName := range dnsNames {
if nodeName == dnsName {
return true, nil // approve node client cert
}
}

return false, nil
}

func getEC2InstancesDNSNames(ctx context.Context, nodeClaims *unstructured.UnstructuredList, ec2Client ec2iface.EC2API) ([]string, error) {
ec2InstanceIDs := []string{}
for _, claim := range nodeClaims.Items {
nodeName := claim.UnstructuredContent()["status"].(map[string]interface{})["nodeName"]
if nodeName != nil {
// skip if a node is already created for this nodeClaim.
continue
}
providerID := claim.UnstructuredContent()["status"].(map[string]interface{})["providerID"].(string)
instanceID := providerID[strings.LastIndex(providerID, "/")+1:]

ec2InstanceIDs = append(ec2InstanceIDs, instanceID)
}

if len(ec2InstanceIDs) == 0 {
return nil, nil
}

output, err := ec2Client.DescribeInstancesWithContext(ctx, &ec2.DescribeInstancesInput{
InstanceIds: awssdk.StringSlice(ec2InstanceIDs),
})
if err != nil {
return nil, err
}

dnsNames := []string{}
for _, reservation := range output.Reservations {
for _, instance := range reservation.Instances {
dnsNames = append(dnsNames, *instance.PrivateDnsName)
}
}
return dnsNames, nil
}

func (r *MachineApproverController) approve(ctx context.Context, csr *certificatesv1.CertificateSigningRequest) error {
csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1.CertificateSigningRequestCondition{
Type: certificatesv1.CertificateApproved,
Reason: "KarpenterCSRApprove",
Message: "Auto approved by karpenter_machine_approver",
Status: corev1.ConditionTrue,
})

_, err := r.certClient.CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating approval for csr: %v", err)
}

return nil
}

func getEC2Client() (ec2iface.EC2API, error) {
// AWS_SHARED_CREDENTIALS_FILE and AWS_REGION envvar should be set in operator deployment
// when reconciling an AWS hosted control plane
if os.Getenv("AWS_SHARED_CREDENTIALS_FILE") == "" {
return nil, fmt.Errorf("AWS credentials not set")
}

awsSession := awsutil.NewSession("karpenter-operator", "", "", "", "")
ec2Client := ec2.New(awsSession, awssdk.NewConfig())
return ec2Client, nil
}

func listNodeClaims(ctx context.Context, client client.Client) (*unstructured.UnstructuredList, error) {
nodeClaimList := &unstructured.UnstructuredList{}
nodeClaimList.SetGroupVersionKind(schema.GroupVersionKind{
Group: "karpenter.sh",
Version: "v1",
Kind: "NodeClaim",
})
err := client.List(ctx, nodeClaimList)
if err != nil {
return nil, fmt.Errorf("failed to list NodeClaims: %w", err)
}

return nodeClaimList, nil
}
Loading

0 comments on commit 7530a57

Please sign in to comment.