From b9918cc8bb84a453d4a3b0708be26fc1682a0214 Mon Sep 17 00:00:00 2001 From: lou-lan Date: Wed, 6 Mar 2024 14:53:04 +0800 Subject: [PATCH] Optimize egress gateway controller reconcile logic (#1243) Signed-off-by: lou-lan --- pkg/controller/controller.go | 22 +- .../policy/egress_cluster_policy.go | 125 - pkg/controller/policy/egress_policy.go | 129 -- pkg/egressgateway/egress_gateway.go | 2010 +++++++++-------- pkg/k8s/apis/v1beta1/egressgateway_types.go | 10 + pkg/k8s/apis/v1beta1/egresstunnel_types.go | 12 + 6 files changed, 1087 insertions(+), 1221 deletions(-) delete mode 100644 pkg/controller/policy/egress_cluster_policy.go delete mode 100644 pkg/controller/policy/egress_policy.go diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 8dab79988..1850f844b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,9 +6,6 @@ package controller import ( "context" "fmt" - "github.com/spidernet-io/egressgateway/pkg/controller/endpoint" - "github.com/spidernet-io/egressgateway/pkg/controller/policy" - "github.com/spidernet-io/egressgateway/pkg/controller/tunnel" "github.com/go-logr/logr" ctrl "sigs.k8s.io/controller-runtime" @@ -19,7 +16,9 @@ import ( "github.com/spidernet-io/egressgateway/pkg/config" egressclusterinfo "github.com/spidernet-io/egressgateway/pkg/controller/egress_cluster_info" + "github.com/spidernet-io/egressgateway/pkg/controller/endpoint" "github.com/spidernet-io/egressgateway/pkg/controller/metrics" + "github.com/spidernet-io/egressgateway/pkg/controller/tunnel" "github.com/spidernet-io/egressgateway/pkg/controller/webhook" "github.com/spidernet-io/egressgateway/pkg/egressgateway" "github.com/spidernet-io/egressgateway/pkg/logger" @@ -61,27 +60,22 @@ func New(cfg *config.Config) (types.Service, error) { return nil, fmt.Errorf("failed to create manager: %w", err) } + cli, err := client.New(cfg.KubeConfig, client.Options{Scheme: schema.GetScheme()}) + if err != nil { + return nil, err + } + if err = setManger(mgr, cfg, log); err != nil { return nil, err } metrics.RegisterMetricCollectors() - err = egressgateway.NewEgressGatewayController(mgr, log, cfg) + err = egressgateway.NewEgressGatewayController(mgr, log, cfg, cli) if err != nil { return nil, fmt.Errorf("failed to create egress gateway controller: %w", err) } - err = policy.NewEgressPolicyController(mgr, log, cfg) - if err != nil { - return nil, fmt.Errorf("failed to create egress policy controller: %w", err) - } - - err = policy.NewEgressClusterPolicyController(mgr, log, cfg) - if err != nil { - return nil, fmt.Errorf("failed to create egress cluster policy controller: %w", err) - } - err = tunnel.NewEgressTunnelController(mgr, log, cfg) if err != nil { return nil, fmt.Errorf("failed to create egress tunnel controller: %w", err) diff --git a/pkg/controller/policy/egress_cluster_policy.go b/pkg/controller/policy/egress_cluster_policy.go deleted file mode 100644 index 33298e696..000000000 --- a/pkg/controller/policy/egress_cluster_policy.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2022 Authors of spidernet-io -// SPDX-License-Identifier: Apache-2.0 - -package policy - -import ( - "context" - "fmt" - - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" - - "github.com/spidernet-io/egressgateway/pkg/config" - "github.com/spidernet-io/egressgateway/pkg/egressgateway" - "github.com/spidernet-io/egressgateway/pkg/k8s/apis/v1beta1" - "github.com/spidernet-io/egressgateway/pkg/utils" -) - -type egcpReconciler struct { - client client.Client - log logr.Logger - config *config.Config -} - -func (r *egcpReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - kind, newReq, err := utils.ParseKindWithReq(req) - if err != nil { - return reconcile.Result{}, err - } - - log := r.log.WithValues("name", newReq.Name, "kind", kind) - log.Info("reconciling") - switch kind { - case "EgressGateway": - return r.reconcileEGW(ctx, newReq, log) - default: - return reconcile.Result{}, nil - } -} - -// reconcileEN reconcile egressgateway -// goal: -// - update egressclusterpolicy -func (r *egcpReconciler) reconcileEGW(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { - deleted := false - egw := new(v1beta1.EgressGateway) - err := r.client.Get(ctx, req.NamespacedName, egw) - if err != nil { - if !errors.IsNotFound(err) { - return reconcile.Result{Requeue: true}, err - } - deleted = true - } - deleted = deleted || !egw.GetDeletionTimestamp().IsZero() - - if deleted { - return reconcile.Result{Requeue: false}, nil - } - - egcpList := &v1beta1.EgressClusterPolicyList{} - if err := r.client.List(ctx, egcpList); err != nil { - log.Error(err, "failed to list") - return reconcile.Result{Requeue: true}, err - } - - for _, item := range egcpList.Items { - if item.Spec.EgressGatewayName == egw.Name { - newEGCP := item.DeepCopy() - - newEGCP.Status.Eip.Ipv4 = "" - newEGCP.Status.Eip.Ipv6 = "" - newEGCP.Status.Node = "" - - policy := v1beta1.Policy{Name: item.Name, Namespace: item.Namespace} - eipStatus, isExist := egressgateway.GetEIPStatusByPolicy(policy, *egw) - if isExist { - for _, eip := range eipStatus.Eips { - for _, p := range eip.Policies { - if p == policy { - newEGCP.Status.Eip.Ipv4 = eip.IPv4 - newEGCP.Status.Eip.Ipv6 = eip.IPv6 - newEGCP.Status.Node = eipStatus.Name - } - } - } - } - - log.V(1).Info("update egressclusterpolicy status", "status", newEGCP.Status) - err = r.client.Status().Update(ctx, newEGCP) - if err != nil { - log.Error(err, "update egressclusterpolicy status", "status", newEGCP.Status) - return reconcile.Result{Requeue: true}, err - } - } - } - - return reconcile.Result{Requeue: false}, nil -} - -func NewEgressClusterPolicyController(mgr manager.Manager, log logr.Logger, cfg *config.Config) error { - if cfg == nil { - return fmt.Errorf("cfg can not be nil") - } - - log.Info("new egressclusterpolicy controller") - - r := &egcpReconciler{client: mgr.GetClient(), log: log, config: cfg} - c, err := controller.New("egressclusterpolicy", mgr, controller.Options{Reconciler: r}) - if err != nil { - return err - } - - if err := c.Watch(source.Kind(mgr.GetCache(), &v1beta1.EgressGateway{}), - handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressGateway"))); err != nil { - return fmt.Errorf("failed to watch EgressGateway: %w", err) - } - - return nil -} diff --git a/pkg/controller/policy/egress_policy.go b/pkg/controller/policy/egress_policy.go deleted file mode 100644 index 963028c66..000000000 --- a/pkg/controller/policy/egress_policy.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2022 Authors of spidernet-io -// SPDX-License-Identifier: Apache-2.0 - -package policy - -import ( - "context" - "fmt" - - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" - - "github.com/spidernet-io/egressgateway/pkg/config" - "github.com/spidernet-io/egressgateway/pkg/egressgateway" - "github.com/spidernet-io/egressgateway/pkg/k8s/apis/v1beta1" - "github.com/spidernet-io/egressgateway/pkg/utils" -) - -type egpReconciler struct { - client client.Client - log logr.Logger - config *config.Config -} - -func (r *egpReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - kind, newReq, err := utils.ParseKindWithReq(req) - if err != nil { - return reconcile.Result{}, err - } - - log := r.log.WithValues("name", newReq.Name, "kind", kind) - log.V(1).Info("reconciling") - switch kind { - case "EgressGateway": - return r.reconcileEGW(ctx, newReq, log) - default: - return reconcile.Result{}, nil - } -} - -// reconcileEN reconcile EgressGateway -// goal: -// - update EgressPolicy -func (r *egpReconciler) reconcileEGW(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { - deleted := false - egw := new(v1beta1.EgressGateway) - err := r.client.Get(ctx, req.NamespacedName, egw) - if err != nil { - if !errors.IsNotFound(err) { - return reconcile.Result{Requeue: true}, err - } - deleted = true - } - deleted = deleted || !egw.GetDeletionTimestamp().IsZero() - - if deleted { - return reconcile.Result{Requeue: false}, nil - } - - egpList := &v1beta1.EgressPolicyList{} - if err := r.client.List(ctx, egpList); err != nil { - log.Error(err, "failed to list") - return reconcile.Result{Requeue: true}, err - } - - for _, item := range egpList.Items { - if item.Spec.EgressGatewayName == egw.Name { - newEGP := item.DeepCopy() - - newEGP.Status.Eip.Ipv4 = "" - newEGP.Status.Eip.Ipv6 = "" - newEGP.Status.Node = "" - - policy := v1beta1.Policy{Name: item.Name, Namespace: item.Namespace} - eipStatus, isExist := egressgateway.GetEIPStatusByPolicy(policy, *egw) - if isExist { - for _, eip := range eipStatus.Eips { - for _, p := range eip.Policies { - if p == policy { - newEGP.Status.Eip.Ipv4 = eip.IPv4 - newEGP.Status.Eip.Ipv6 = eip.IPv6 - newEGP.Status.Node = eipStatus.Name - } - } - } - } - - log.V(1).Info("update EgressPolicy status", "status", newEGP.Status) - err = r.client.Status().Update(ctx, newEGP) - if err != nil { - log.Error(err, "update EgressPolicy status", "status", newEGP.Status) - return reconcile.Result{Requeue: true}, err - } - } - } - - return reconcile.Result{Requeue: false}, nil -} - -func NewEgressPolicyController(mgr manager.Manager, log logr.Logger, cfg *config.Config) error { - if cfg == nil { - return fmt.Errorf("cfg can not be nil") - } - - r := &egpReconciler{ - client: mgr.GetClient(), - log: log, - config: cfg, - } - - log.Info("new egress policy controller") - c, err := controller.New("egresspolicy", mgr, controller.Options{Reconciler: r}) - if err != nil { - return err - } - - if err := c.Watch(source.Kind(mgr.GetCache(), &v1beta1.EgressGateway{}), - handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressGateway"))); err != nil { - return fmt.Errorf("failed to watch EgressGateway: %w", err) - } - - return nil -} diff --git a/pkg/egressgateway/egress_gateway.go b/pkg/egressgateway/egress_gateway.go index 704ac5583..15b7af345 100644 --- a/pkg/egressgateway/egress_gateway.go +++ b/pkg/egressgateway/egress_gateway.go @@ -8,6 +8,7 @@ import ( "fmt" "math/rand" "net" + "reflect" "time" "github.com/go-logr/logr" @@ -18,6 +19,7 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -35,19 +37,10 @@ type egnReconciler struct { client client.Client log logr.Logger config *config.Config + cli client.Client } -type policyInfo struct { - egw string - ipv4 string - ipv6 string - node string - policy egress.Policy - isUseNodeIP bool - allocatorPolicy string -} - -func (r egnReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { +func (r *egnReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { kind, newReq, err := utils.ParseKindWithReq(req) if err != nil { return reconcile.Result{}, err @@ -57,284 +50,228 @@ func (r egnReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re switch kind { case "EgressGateway": - return r.reconcileEGW(ctx, newReq, log) + return r.reconcileGateway(ctx, newReq, log) case "EgressClusterPolicy": - fallthrough + return r.reconcileEgressClusterPolicy(ctx, newReq, log) case "EgressPolicy": - return r.reconcileEGP(ctx, newReq, log) + return r.reconcileEgressPolicy(ctx, newReq, log) case "Node": return r.reconcileNode(ctx, newReq, log) case "EgressTunnel": - return r.reconcileEGT(ctx, newReq, log) + return r.reconcileTunnel(ctx, newReq, log) default: return reconcile.Result{}, nil } } -// reconcileNode reconcile node -func (r egnReconciler) reconcileNode(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { +func (r *egnReconciler) reconcileTunnel(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { deleted := false - node := new(corev1.Node) - err := r.client.Get(ctx, req.NamespacedName, node) + tunnel := &egress.EgressTunnel{} + err := r.client.Get(ctx, req.NamespacedName, tunnel) if err != nil { if !errors.IsNotFound(err) { - return reconcile.Result{}, err + return reconcile.Result{Requeue: true}, err } deleted = true } - deleted = deleted || !node.GetDeletionTimestamp().IsZero() + deleted = deleted || !tunnel.GetDeletionTimestamp().IsZero() - egwList := &egress.EgressGatewayList{} - if err := r.client.List(ctx, egwList); err != nil { - return reconcile.Result{Requeue: true}, nil - } - - // Node NoReady event, complete in reconcile EgressTunnel event if deleted { - r.log.Info("request item is deleted") - err := r.deleteNodeFromEGs(ctx, log, req.Name, egwList) - if err != nil { - return reconcile.Result{Requeue: true}, nil - } - - return reconcile.Result{}, nil - } + // case 1, tunnel delete + // move ip - // Checking the node label - for _, egw := range egwList.Items { - selNode, err := metav1.LabelSelectorAsSelector(egw.Spec.NodeSelector.Selector) + egwList := new(egress.EgressGatewayList) + err := r.cli.List(ctx, egwList) if err != nil { - return reconcile.Result{Requeue: true}, nil - } - isMatch := selNode.Matches(labels.Set(node.Labels)) - if isMatch { - // If the tag matches, check whether information about the node exists. If it does not exist, add an empty one - _, isExist := GetPoliciesByNode(node.Name, egw) - if !isExist { - egt := new(egress.EgressTunnel) - err := r.client.Get(ctx, types.NamespacedName{Name: node.Name}, egt) - if err == nil { - egw.Status.NodeList = append(egw.Status.NodeList, egress.EgressIPStatus{Name: node.Name, Status: string(egt.Status.Phase)}) - } else { - egw.Status.NodeList = append(egw.Status.NodeList, egress.EgressIPStatus{Name: node.Name, Status: string(egress.EgressTunnelFailed)}) - } + return reconcile.Result{Requeue: true}, err + } - ipv4sFree, ipv6sFree, ipv4sTotal, ipv6sTotal, err := countGatewayIP(&egw) - if err != nil { - r.log.Error(err, "count egress gateway ippools", "nodeList", egw.Status.NodeList) - return reconcile.Result{Requeue: true}, nil + for _, egw := range egwList.Items { + var needMoveIPs []egress.Eips + needUpdate := false + for nodeIndex, node := range egw.Status.NodeList { + if node.Name == req.Name { + needUpdate = true + needMoveIPs = append(needMoveIPs, node.Eips...) + egw.Status.NodeList = append(egw.Status.NodeList[:nodeIndex], egw.Status.NodeList[nodeIndex+1:]...) + break } - egw.Status.IPUsage.IPv4Free = ipv4sFree - egw.Status.IPUsage.IPv4Total = ipv4sTotal - egw.Status.IPUsage.IPv6Free = ipv6sFree - egw.Status.IPUsage.IPv6Total = ipv6sTotal - - r.log.V(1).Info("update egress gateway status", "status", egw.Status) - err = r.client.Status().Update(ctx, &egw) + } + if len(needMoveIPs) > 0 { + moveEipToReadyNode(&egw, needMoveIPs) + } + if needUpdate { + err := updateGatewayStatusWithUsage(ctx, r.client, &egw) if err != nil { - r.log.Error(err, "update egress gateway status", "status", egw.Status) - return reconcile.Result{Requeue: true}, nil + return reconcile.Result{Requeue: true}, err } - } - } else { - // Labels do not match. If there is a node in status, delete the node from status and reallocate the policy - _, isExist := GetPoliciesByNode(node.Name, egw) - if isExist { - err := r.deleteNodeFromEG(ctx, log, node.Name, egw) + // sync all policy status + err = updateAllPolicyStatus(ctx, r.client, &egw) if err != nil { - return reconcile.Result{Requeue: true}, nil + return reconcile.Result{Requeue: true}, err } } } + return reconcile.Result{}, nil } - return reconcile.Result{}, nil -} + fmt.Println("update tunnel") -// reconcileEGW reconcile egress gateway -func (r egnReconciler) reconcileEGW(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { - deleted := false - isUpdate := false - egw := &egress.EgressGateway{} - err := r.client.Get(ctx, req.NamespacedName, egw) + egwList := new(egress.EgressGatewayList) + err = r.cli.List(ctx, egwList) if err != nil { - if !errors.IsNotFound(err) { - return reconcile.Result{Requeue: true}, err - } - deleted = true + return reconcile.Result{Requeue: true}, err } - deleted = deleted || !egw.GetDeletionTimestamp().IsZero() - if deleted { - log.Info("request item is deleted") - p, err := getEgressGatewayPolicies(r.client, ctx, egw) - if err != nil { - log.Error(err, "getEgressGatewayPolicies when delete egressgateway") - return reconcile.Result{Requeue: true}, err + for _, egw := range egwList.Items { + var needMoveIPs []egress.Eips + needUpdate := false + for nodeIndex, node := range egw.Status.NodeList { + // case 2, tunnel update + if node.Name == req.Name { + if tunnel.Status.Phase == egress.EgressTunnelReady { + // case 2.1: other status (e.g. NodeNotReady) -> Ready + if tunnel.Status.Phase.IsNotEqual(node.Status) { + needUpdate = true + egw.Status.NodeList[nodeIndex].Status = tunnel.Status.Phase.String() + if egw.Status.ReadyCount() == 1 { + // if it is the first tunnel in the node list, + // we need do more (list all policy, recheck all) + res, err := r.checkAndUpdateAllPolicyIfNeedWhenFirstNodeReady(ctx, req, log, &egw) + if err != nil { + return res, err + } + } + } + } else { + // case 2.2: ready -> not ready / statue not sync + // move ip + if len(node.Eips) > 0 { + // need move ip + needUpdate = true + needMoveIPs = append(needMoveIPs, node.Eips...) + egw.Status.NodeList[nodeIndex].Eips = make([]egress.Eips, 0) + } + // check state are sync + if tunnel.Status.Phase.IsNotEqual(node.Status) { + needUpdate = true + egw.Status.NodeList[nodeIndex].Status = tunnel.Status.Phase.String() + } + } + break + } } - if containsEgressGatewayFinalizer(egw, egressGatewayFinalizers) && len(p) == 0 { - log.Info("remove the egressGatewayFinalizer") - removeEgressGatewayFinalizer(egw) - log.V(1).Info("remove the egressGatewayFinalizer", "ObjectMeta", egw.ObjectMeta) - - err = r.client.Update(ctx, egw) + if len(needMoveIPs) > 0 { + moveEipToReadyNode(&egw, needMoveIPs) + } + if needUpdate { + err := updateGatewayStatusWithUsage(ctx, r.client, &egw) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + // sync all policy status + err = updateAllPolicyStatus(ctx, r.client, &egw) if err != nil { - log.Error(err, "remove the egressGatewayFinalizer", "ObjectMeta", egw.ObjectMeta) return reconcile.Result{Requeue: true}, err } } - return reconcile.Result{Requeue: false}, nil - } - - if egw.Spec.NodeSelector.Selector == nil { - log.Info("nodeSelector is nil, skip reconcile") - return reconcile.Result{}, nil } - // Obtain the latest node that meets the conditions - newNodeList := &corev1.NodeList{} - selNodes, err := metav1.LabelSelectorAsSelector(egw.Spec.NodeSelector.Selector) - if err != nil { - return reconcile.Result{}, err - } - err = r.client.List(ctx, newNodeList, &client.ListOptions{ - LabelSelector: selNodes, - }) - if err != nil { - return reconcile.Result{}, err - } - - log.Info("obtained nodes", - "numberOfNodes", len(newNodeList.Items), - "selector", egw.Spec.NodeSelector.Selector.String()) + return reconcile.Result{}, nil +} - // Get the node you want to delete - delNodeMap := make(map[string]egress.EgressIPStatus) - for _, oldNode := range egw.Status.NodeList { - delNodeMap[oldNode.Name] = oldNode - } +func (r *egnReconciler) checkAndUpdateAllPolicyIfNeedWhenFirstNodeReady(ctx context.Context, + req reconcile.Request, log logr.Logger, egw *egress.EgressGateway) (reconcile.Result, error) { - for _, newNode := range newNodeList.Items { - delete(delNodeMap, newNode.Name) - } - - perNodeMap := make(map[string]egress.EgressIPStatus) - for _, node := range egw.Status.NodeList { - _, ok := delNodeMap[node.Name] - if !ok { - perNodeMap[node.Name] = node - } + clusterPolicy := new(egress.EgressClusterPolicyList) + err := r.client.List(ctx, clusterPolicy) + if err != nil { + return reconcile.Result{Requeue: true}, err } - - for _, node := range newNodeList.Items { - _, ok := perNodeMap[node.Name] - if !ok { - egt := new(egress.EgressTunnel) - err := r.client.Get(ctx, types.NamespacedName{Name: node.Name}, egt) - if err == nil { - perNodeMap[node.Name] = egress.EgressIPStatus{Name: node.Name, Status: string(egt.Status.Phase)} - } else { - perNodeMap[node.Name] = egress.EgressIPStatus{Name: node.Name, Status: string(egress.EgressTunnelFailed)} + for _, p := range clusterPolicy.Items { + if p.Spec.EgressGatewayName == egw.Name { + res, err := r.reAssignEgressClusterPolicyIP(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Namespace: p.Namespace, Name: p.Name}, + }, egw, &p) + if err != nil { + return res, err } - isUpdate = true } } - - if len(egw.Status.NodeList) != len(newNodeList.Items) { - isUpdate = true + policy := new(egress.EgressPolicyList) + err = r.client.List(ctx, policy) + if err != nil { + return reconcile.Result{Requeue: true}, err } - - log.Info("deleted gateway nodes", "delNodeMap", delNodeMap) - - if len(delNodeMap) != 0 { - // Select a gateway node for the policy again - var reSetPolicies []egress.Policy - for _, item := range delNodeMap { - for _, eip := range item.Eips { - reSetPolicies = append(reSetPolicies, eip.Policies...) - } - } - - for _, policy := range reSetPolicies { - if err = r.reAllocatorPolicy(ctx, log, policy, egw, perNodeMap); err != nil { - log.Error(err, "failed to reallocate a gateway node for EgressPolicy", - "policy", policy, - "egressGateway", egw.Name, - "namespace", egw.Namespace) - return reconcile.Result{Requeue: true}, err + for _, p := range policy.Items { + if p.Spec.EgressGatewayName == egw.Name { + res, err := r.reAssignEgressPolicyIP(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Namespace: p.Namespace, Name: p.Name}, + }, egw, &p) + if err != nil { + return res, err } } - - isUpdate = true } + return reconcile.Result{}, nil +} + +func (r *egnReconciler) reAssignEgressPolicyIP(ctx context.Context, + req reconcile.Request, gateway *egress.EgressGateway, policy *egress.EgressPolicy) (reconcile.Result, error) { - // When the first gateway node of an egw recovers, you need to rebind the policy that references the egw - readyNum := 0 - policyNum := 0 - for _, node := range perNodeMap { - if node.Status == string(egress.EgressTunnelReady) { - readyNum++ - policyNum += len(node.Eips) + var err error + assignedIP := getAssignedIP(gateway, req.Namespace, req.Name) + if assignedIP == nil { + assignedIP, err = assignIP(gateway, req, policy.Spec.EgressIP) + if err != nil { + return reconcile.Result{Requeue: true}, err } - } - if readyNum == 1 && policyNum == 0 { - var policies []egress.Policy - egpList := &egress.EgressPolicyList{} - if err := r.client.List(ctx, egpList); err != nil { - log.Error(err, "list EgressPolicy failed") + if assignedIP == nil { + return reconcile.Result{Requeue: true}, fmt.Errorf("not enough ip") + } + err = updateGatewayStatusWithUsage(ctx, r.client, gateway) + if err != nil { return reconcile.Result{Requeue: true}, err } - - for _, egp := range egpList.Items { - if egp.Spec.EgressGatewayName == egw.Name { - policies = append(policies, egress.Policy{Name: egp.Name, Namespace: egp.Namespace}) - } + //err = updateEgressPolicyIfNeed(ctx, r.client, policy, assignedIP) + //if err != nil { + // return reconcile.Result{Requeue: true}, err + //} + err := updateEgressPolicyStatusIfNeed(ctx, r.client, policy, assignedIP) + if err != nil { + return reconcile.Result{Requeue: true}, err } - - egcpList := &egress.EgressClusterPolicyList{} - if err := r.client.List(ctx, egcpList); err != nil { - log.Error(err, "list EgressClusterPolicy failed") + } else { + err := updateEgressPolicyStatusIfNeed(ctx, r.client, policy, assignedIP) + if err != nil { return reconcile.Result{Requeue: true}, err } + } - for _, egcp := range egcpList.Items { - if egcp.Spec.EgressGatewayName == egw.Name { - policies = append(policies, egress.Policy{Name: egcp.Name}) - } - } + return reconcile.Result{}, nil +} - for _, policy := range policies { - err = r.reAllocatorPolicy(ctx, log, policy, egw, perNodeMap) - if err != nil { - log.Error(err, "failed to reassign a gateway node for EgressPolicy", "policy", policy) - return reconcile.Result{Requeue: true}, err - } - } +func (r *egnReconciler) reAssignEgressClusterPolicyIP(ctx context.Context, + req reconcile.Request, gateway *egress.EgressGateway, policy *egress.EgressClusterPolicy) (reconcile.Result, error) { - isUpdate = true - } + var err error - if isUpdate { - var perNodeList []egress.EgressIPStatus - for _, node := range perNodeMap { - perNodeList = append(perNodeList, node) + assignedIP := getAssignedIP(gateway, req.Namespace, req.Name) + if assignedIP == nil { + assignedIP, err = assignIP(gateway, req, policy.Spec.EgressIP) + if err != nil { + return reconcile.Result{Requeue: true}, err } - egw.Status.NodeList = perNodeList - - ipv4sFree, ipv6sFree, ipv4sTotal, ipv6sTotal, err := countGatewayIP(egw) + if assignedIP == nil { + return reconcile.Result{Requeue: true}, fmt.Errorf("not enough ip") + } + err := updateEgressClusterPolicyStatusIfNeed(ctx, r.client, policy, assignedIP) if err != nil { - r.log.Error(err, "count egress gateway ippools", "nodeList", egw.Status.NodeList) - return reconcile.Result{Requeue: true}, nil + return reconcile.Result{Requeue: true}, err } - egw.Status.IPUsage.IPv4Free = ipv4sFree - egw.Status.IPUsage.IPv4Total = ipv4sTotal - egw.Status.IPUsage.IPv6Free = ipv6sFree - egw.Status.IPUsage.IPv6Total = ipv6sTotal - - log.V(1).Info("update egress gateway status", "status", egw.Status) - err = r.client.Status().Update(ctx, egw) + } else { + err := updateEgressClusterPolicyStatusIfNeed(ctx, r.client, policy, assignedIP) if err != nil { - log.Error(err, "update egress gateway status", "status", egw.Status) return reconcile.Result{Requeue: true}, err } } @@ -342,714 +279,906 @@ func (r egnReconciler) reconcileEGW(ctx context.Context, req reconcile.Request, return reconcile.Result{}, nil } -// reconcileEG reconcile egress tunnel -func (r egnReconciler) reconcileEGT(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { +func (r *egnReconciler) reconcileNode(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { deleted := false - egt := new(egress.EgressTunnel) - egt.Name = req.Name - err := r.client.Get(ctx, req.NamespacedName, egt) + node := &corev1.Node{} + err := r.client.Get(ctx, req.NamespacedName, node) if err != nil { if !errors.IsNotFound(err) { return reconcile.Result{Requeue: true}, err } + deleted = true } - deleted = deleted || !egt.GetDeletionTimestamp().IsZero() + deleted = deleted || !node.GetDeletionTimestamp().IsZero() - // The node deletion event has already been handled, so there is no need to do that here if deleted { - log.Info("request item is deleted") + // case1: do nothing return reconcile.Result{}, nil } - egwList := &egress.EgressGatewayList{} - - if err := r.client.List(context.Background(), egwList); err != nil { - return reconcile.Result{Requeue: true}, nil + // case2: node label update + egwList := new(egress.EgressGatewayList) + err = r.cli.List(ctx, egwList) + if err != nil { + return reconcile.Result{Requeue: true}, err } - for _, item := range egwList.Items { - policies, isExist := GetPoliciesByNode(egt.Name, item) - if isExist { - perNodeMap := make(map[string]egress.EgressIPStatus) - egw := item.DeepCopy() - - // If the node is not in success state, the policy on the node is reassigned - if egt.Status.Phase != egress.EgressTunnelReady { - for _, node := range egw.Status.NodeList { - if node.Name != egt.Name { - perNodeMap[node.Name] = node - } else { - perNodeMap[node.Name] = egress.EgressIPStatus{Name: node.Name, Status: string(egt.Status.Phase)} - } + for _, egw := range egwList.Items { + selector, err := metav1.LabelSelectorAsSelector(egw.Spec.NodeSelector.Selector) + if err != nil { + return reconcile.Result{}, err + } + // + needUpdate := false + if selector.Matches(labels.Set(node.Labels)) { + // case2.1: label match + // case2.1.1: not in list, add it + // case2.1.1: already int list, do nothing + var find bool + for _, item := range egw.Status.NodeList { + if item.Name == node.Name { + find = true + break } - - for _, policy := range policies { - err = r.reAllocatorPolicy(ctx, log, policy, egw, perNodeMap) - if err != nil { - log.Error(err, "failed to reassign a gateway node for EgressPolicy", "policy", policy) - return reconcile.Result{Requeue: true}, err + } + if !find { + needUpdate = true + status := egress.EgressTunnelPending.String() + tunnel := new(egress.EgressTunnel) + err := r.client.Get(ctx, types.NamespacedName{Name: node.Name}, tunnel) + if err != nil { + if !errors.IsNotFound(err) { + return reconcile.Result{}, err } + } else { + status = tunnel.Status.Phase.String() } - } else { - for _, node := range egw.Status.NodeList { - if node.Name == egt.Name { - for _, node := range egw.Status.NodeList { - perNodeMap[node.Name] = node - } - - if node.Status != string(egress.EgressTunnelReady) { - perNodeMap[node.Name] = egress.EgressIPStatus{Name: node.Name, Eips: node.Eips, Status: string(egress.EgressTunnelReady)} - - // When the first gateway node of an egw recovers, you need to rebind the policy that references the egw - readyNum := 0 - policyNum := 0 - for _, node := range perNodeMap { - if node.Status == string(egress.EgressTunnelReady) { - readyNum++ - policyNum += len(node.Eips) - } - } - if readyNum == 1 && policyNum == 0 { - var policies []egress.Policy - egpList := &egress.EgressPolicyList{} - if err := r.client.List(ctx, egpList); err != nil { - log.Error(err, "list EgressPolicy failed") - return reconcile.Result{Requeue: true}, err - } - - for _, egp := range egpList.Items { - if egp.Spec.EgressGatewayName == egw.Name { - policies = append(policies, egress.Policy{Name: egp.Name, Namespace: egp.Namespace}) - } - } - - egcpList := &egress.EgressClusterPolicyList{} - if err := r.client.List(ctx, egpList); err != nil { - log.Error(err, "list EgressClusterPolicy failed") - return reconcile.Result{Requeue: true}, err - } - - for _, egcp := range egcpList.Items { - if egcp.Spec.EgressGatewayName == egw.Name { - policies = append(policies, egress.Policy{Name: egcp.Name}) - } - } - - for _, policy := range policies { - err = r.reAllocatorPolicy(ctx, log, policy, egw, perNodeMap) - if err != nil { - log.Error(err, "failed to reassign a gateway node for EgressPolicy", "policy", policy) - return reconcile.Result{Requeue: true}, err - } - } - } - break - } - return reconcile.Result{Requeue: false}, nil - } + egw.Status.NodeList = append(egw.Status.NodeList, egress.EgressIPStatus{ + Name: node.Name, + Eips: make([]egress.Eips, 0), + Status: status, + }) + // if it is the first ready + // if it is the first tunnel in the node list, + // we need do more (list all policy, recheck all) + res, err := r.checkAndUpdateAllPolicyIfNeedWhenFirstNodeReady(ctx, req, log, &egw) + if err != nil { + return res, err } } - - var perNodeList []egress.EgressIPStatus - for _, node := range perNodeMap { - perNodeList = append(perNodeList, node) + } else { + // case2.2: label not match + // case2.2.1: not in list, do nothing + // case2.2.1: already int list, delete it + var needMoveIPs []egress.Eips + for nodeIndex, item := range egw.Status.NodeList { + if item.Name == node.Name { + needUpdate = true + needMoveIPs = item.Eips + egw.Status.NodeList = append(egw.Status.NodeList[:nodeIndex], egw.Status.NodeList[nodeIndex+1:]...) + break + } } - - egw.Status.NodeList = perNodeList - ipv4sFree, ipv6sFree, ipv4sTotal, ipv6sTotal, err := countGatewayIP(egw) + if len(needMoveIPs) > 0 { + moveEipToReadyNode(&egw, needMoveIPs) + } + } + if needUpdate { + err := updateGatewayStatusWithUsage(ctx, r.client, &egw) if err != nil { - r.log.Error(err, "count egress gateway ippools", "nodeList", egw.Status.NodeList) - return reconcile.Result{Requeue: true}, nil + return reconcile.Result{Requeue: true}, err } - egw.Status.IPUsage.IPv4Free = ipv4sFree - egw.Status.IPUsage.IPv4Total = ipv4sTotal - egw.Status.IPUsage.IPv6Free = ipv6sFree - egw.Status.IPUsage.IPv6Total = ipv6sTotal - - log.V(1).Info("update egress gateway status", "status", egw.Status) - err = r.client.Status().Update(ctx, egw) + // sync all policy status + err = updateAllPolicyStatus(ctx, r.client, &egw) if err != nil { - log.Error(err, "update egress gateway status", "status", egw.Status) return reconcile.Result{Requeue: true}, err } } } - return reconcile.Result{}, nil } -// reconcileEN reconcile EgressPolicy and EgressClusterPolicy -func (r egnReconciler) reconcileEGP(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { - if req.Namespace == "" { - log = log.WithValues("name", req.Name) - } else { - log = log.WithValues("name", req.Name, "namespace", req.Namespace) - } - log.V(1).Info("reconciling") - - deleted := false - isUpdate := false - egp := &egress.EgressPolicy{} - egcp := &egress.EgressClusterPolicy{} - pi := policyInfo{} - - if len(req.Namespace) == 0 { - err := r.client.Get(ctx, req.NamespacedName, egcp) - if err != nil { - if !errors.IsNotFound(err) { - log.Error(err, "retrieves an obj from the k8s") - return reconcile.Result{}, err +func updateAllPolicyStatus(ctx context.Context, cli client.Client, egw *egress.EgressGateway) error { + for _, node := range egw.Status.NodeList { + for _, eip := range node.Eips { + assignedIP := &AssignedIP{ + Node: node.Name, + IPv4: eip.IPv4, + IPv6: eip.IPv6, + UseNodeIP: false, } - deleted = true - } - deleted = deleted || !egcp.GetDeletionTimestamp().IsZero() - pi.policy = egress.Policy{Name: req.Name} - if !deleted { - if len(egcp.Spec.EgressIP.IPv4) != 0 { - pi.ipv4 = egcp.Spec.EgressIP.IPv4 - } else { - pi.ipv4 = egcp.Status.Eip.Ipv4 + if assignedIP.IPv4 == "" && assignedIP.IPv6 == "" { + assignedIP.UseNodeIP = true } - - if len(egcp.Spec.EgressIP.IPv6) != 0 { - pi.ipv6 = egcp.Spec.EgressIP.IPv6 - } else { - pi.ipv6 = egcp.Status.Eip.Ipv6 + for _, p := range eip.Policies { + if p.Namespace != "" { + policy := new(egress.EgressPolicy) + err := cli.Get(ctx, types.NamespacedName{Namespace: p.Namespace, Name: p.Name}, policy) + if err != nil { + return err + } + err = updateEgressPolicyStatusIfNeed(ctx, cli, policy, assignedIP) + if err != nil { + return err + } + } else { + policy := new(egress.EgressClusterPolicy) + err := cli.Get(ctx, types.NamespacedName{Namespace: p.Namespace, Name: p.Name}, policy) + if err != nil { + return err + } + err = updateEgressClusterPolicyStatusIfNeed(ctx, cli, policy, assignedIP) + if err != nil { + return err + } + } } + } + } + return nil +} - pi.isUseNodeIP = egcp.Spec.EgressIP.UseNodeIP - pi.egw = egcp.Spec.EgressGatewayName +func (r *egnReconciler) reconcileGateway(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { + deleted := false + egw := &egress.EgressGateway{} + err := r.cli.Get(ctx, req.NamespacedName, egw) + if err != nil { + if !errors.IsNotFound(err) { + return reconcile.Result{Requeue: true}, err } - } else { - err := r.client.Get(ctx, req.NamespacedName, egp) + deleted = true + } + deleted = deleted || !egw.GetDeletionTimestamp().IsZero() + + if deleted { + // case 1 + log.Info("request item is deleted") + count, err := getPolicyCountByGatewayName(ctx, r.client, req.Name) if err != nil { - if !errors.IsNotFound(err) { - log.Error(err, "retrieves an obj from the k8s") - return reconcile.Result{}, err - } - deleted = true + return reconcile.Result{Requeue: true}, err } - deleted = deleted || !egp.GetDeletionTimestamp().IsZero() - pi.policy = egress.Policy{Name: req.Name, Namespace: req.Namespace} - if !deleted { - if len(egp.Spec.EgressIP.IPv4) != 0 { - pi.ipv4 = egp.Spec.EgressIP.IPv4 - } else { - pi.ipv4 = egp.Status.Eip.Ipv4 + if count == 0 && egw.Name != "" { + log.Info("remove the egressGatewayFinalizer") + removeEgressGatewayFinalizer(egw) + log.V(1).Info("remove the egressGatewayFinalizer", "ObjectMeta", egw.ObjectMeta) + err = r.client.Update(ctx, egw) + if err != nil { + log.Error(err, "remove the egressGatewayFinalizer", "ObjectMeta", egw.ObjectMeta) + return reconcile.Result{Requeue: true}, err } + } + return reconcile.Result{Requeue: false}, nil + } - if len(egp.Spec.EgressIP.IPv6) != 0 { - pi.ipv6 = egp.Spec.EgressIP.IPv6 - } else { - pi.ipv6 = egp.Status.Eip.Ipv6 - } + // case2: egw match label update + k8sNodeList := &corev1.NodeList{} + selector, err := metav1.LabelSelectorAsSelector(egw.Spec.NodeSelector.Selector) + if err != nil { + return reconcile.Result{}, err + } + opt := &client.ListOptions{LabelSelector: selector} + err = r.client.List(ctx, k8sNodeList, opt) + if err != nil { + return reconcile.Result{}, err + } - pi.isUseNodeIP = egp.Spec.EgressIP.UseNodeIP - pi.egw = egp.Spec.EgressGatewayName - } + // + k8sNodeMap := make(map[string]struct{}) + for _, node := range k8sNodeList.Items { + k8sNodeMap[node.Name] = struct{}{} } - policy := pi.policy - if deleted { - egwList := &egress.EgressGatewayList{} - if err := r.client.List(ctx, egwList); err != nil { - return reconcile.Result{Requeue: true}, nil - } - for _, egw := range egwList.Items { - _, isExist := GetEIPStatusByPolicy(policy, egw) - if isExist { - log.Info("delete policy", "policy", policy, "egw", egw.Name) - // Delete the policy from the EgressGateway. If the referenced EIP is not used by any other policy, - // the system reclaims the EIP. - DeletePolicyFromEG(log, policy, &egw) - - ipv4sFree, ipv6sFree, ipv4sTotal, ipv6sTotal, err := countGatewayIP(&egw) - if err != nil { - r.log.Error(err, "count egress gateway ippools", "nodeList", egw.Status.NodeList) - return reconcile.Result{Requeue: true}, nil - } - egw.Status.IPUsage.IPv4Free = ipv4sFree - egw.Status.IPUsage.IPv4Total = ipv4sTotal - egw.Status.IPUsage.IPv6Free = ipv6sFree - egw.Status.IPUsage.IPv6Total = ipv6sTotal + needUpdate := false - log.V(1).Info("update egress gateway status", "status", egw.Status) - err = r.client.Status().Update(ctx, &egw) - if err != nil { - log.Error(err, "update egress gateway status", "status", egw.Status) - return reconcile.Result{Requeue: true}, err + // need to move eips + var needMoveIPs []egress.Eips + + for i := 0; i < len(egw.Status.NodeList); { + node := egw.Status.NodeList[i] + if _, ok := k8sNodeMap[node.Name]; ok { + delete(k8sNodeMap, node.Name) + tunnel := new(egress.EgressTunnel) + err := r.client.Get(ctx, types.NamespacedName{Name: node.Name}, tunnel) + if err != nil { + if !errors.IsNotFound(err) { + return reconcile.Result{}, err } - return reconcile.Result{}, nil + // case 1.1: node exists, but tunnel not exists, set statue to pending + if egress.EgressTunnelPending.IsNotEqual(node.Status) { + needMoveIPs = append(needMoveIPs, node.Eips...) + // sync status + egw.Status.NodeList[i].Status = egress.EgressTunnelPending.String() + egw.Status.NodeList[i].Eips = []egress.Eips{} + needUpdate = true + } + continue + } + // case 1.2: node exists, but node status not equal tunnel statue + // just sync it + if tunnel.Status.Phase.IsNotEqual(node.Status) { + egw.Status.NodeList[i].Status = egress.EgressTunnelPending.String() + needUpdate = true + } + // case 1.3: status has been synchronized, do nothing + i++ + } else { + needMoveIPs = append(needMoveIPs, node.Eips...) + needUpdate = true + // Remove the element by shifting the subsequent elements forward and reducing the length of the slice. + copy(egw.Status.NodeList[i:], egw.Status.NodeList[i+1:]) + egw.Status.NodeList = egw.Status.NodeList[:len(egw.Status.NodeList)-1] + } + } + + // add k8s nodes to gateway match list + beforeReadyCount := egw.Status.ReadyCount() + for node := range k8sNodeMap { + fmt.Println("") + needUpdate = true + status := egress.EgressTunnelPending.String() + tunnel := new(egress.EgressTunnel) + err := r.client.Get(ctx, types.NamespacedName{Name: node}, tunnel) + if err != nil { + if !errors.IsNotFound(err) { + return reconcile.Result{}, err } + } else { + status = tunnel.Status.Phase.String() } - return reconcile.Result{}, nil + egw.Status.NodeList = append(egw.Status.NodeList, egress.EgressIPStatus{ + Name: node, + Eips: make([]egress.Eips, 0), + Status: status, + }) } - egwName := pi.egw - egw := &egress.EgressGateway{} - err := r.client.Get(ctx, types.NamespacedName{Name: egwName}, egw) - if err != nil { - if errors.IsNotFound(err) { - return reconcile.Result{}, err - } - log.Error(err, "get EgressGateway") - return reconcile.Result{Requeue: true}, err + if len(needMoveIPs) > 0 { + moveEipToReadyNode(egw, needMoveIPs) } - // Assigned if the policy does not have a gateway node - eipStatus, isExist := GetEIPStatusByPolicy(policy, *egw) - if !isExist { - perNodeMap := make(map[string]egress.EgressIPStatus) - for _, item := range egw.Status.NodeList { - perNodeMap[item.Name] = item + if beforeReadyCount == 0 { + res, err := r.checkAndUpdateAllPolicyIfNeedWhenFirstNodeReady(ctx, req, log, egw) + if err != nil { + return res, err } + } - err := r.reAllocatorPolicy(ctx, log, policy, egw, perNodeMap) + if needUpdate { + // update + err := updateGatewayStatusWithUsage(ctx, r.client, egw) if err != nil { - r.log.Error(err, "reallocator Failed to reassign a gateway node for EgressPolicy", "policy", policy) return reconcile.Result{Requeue: true}, err } - - var perNodeList []egress.EgressIPStatus - for _, node := range perNodeMap { - perNodeList = append(perNodeList, node) + // sync all policy status + err = updateAllPolicyStatus(ctx, r.client, egw) + if err != nil { + return reconcile.Result{Requeue: true}, err } - egw.Status.NodeList = perNodeList + } - isUpdate = true - } else { - // Check whether the EIP is correct - for i, eip := range eipStatus.Eips { - for j, p := range eip.Policies { - if p == policy { - isReAllocatorPolicy := false - if pi.isUseNodeIP && (eip.IPv4 != "" || eip.IPv6 != "") { - isReAllocatorPolicy = true - } else if pi.ipv4 != "" && pi.ipv4 != eip.IPv4 { - log.Info("policy", policy, ", pi.ipv4=", pi.ipv4, ", eip.IPv4", "=", eip.IPv4) - isReAllocatorPolicy = true - } else if pi.ipv6 != "" && pi.ipv6 != eip.IPv6 { - log.Info("policy", policy, ", pi.ipv6=", pi.ipv6, ", eip.IPv6", "=", eip.IPv6) - isReAllocatorPolicy = true - } + return reconcile.Result{}, nil +} - if isReAllocatorPolicy { - eipStatus.Eips[i].Policies = append(eipStatus.Eips[i].Policies[:j], eipStatus.Eips[i].Policies[j+1:]...) - perNodeMap := make(map[string]egress.EgressIPStatus) - for _, node := range egw.Status.NodeList { - if node.Name == eipStatus.Name { - perNodeMap[node.Name] = eipStatus - } else { - perNodeMap[node.Name] = node - } - } - err := r.reAllocatorPolicy(ctx, log, policy, egw, perNodeMap) - if err != nil { - log.Error(err, "failed to reassign a gateway node for EgressPolicy", - "policy", policy, - "egressGateway", egw.Name, - "namespace", egw.Namespace) - - return reconcile.Result{Requeue: true}, err - } +func moveEipToReadyNode(gateway *egress.EgressGateway, needMoveIPs []egress.Eips) { + if gateway.Status.ReadyCount() <= 0 { + return + } - var perNodeList []egress.EgressIPStatus - for _, node := range perNodeMap { - perNodeList = append(perNodeList, node) - } - egw.Status.NodeList = perNodeList - } else { - // check policy status - var policyStatus egress.EgressPolicyStatus - policyStatus.Eip.Ipv4 = eip.IPv4 - policyStatus.Eip.Ipv6 = eip.IPv6 - policyStatus.Node = eipStatus.Name - - if len(policy.Namespace) == 0 { - if len(egcp.Status.Node) == 0 { - egcp.Status = policyStatus - log.V(1).Info("update egressclusterpolicy status", "status", egcp.Status) - err = r.client.Status().Update(ctx, egcp) - if err != nil { - log.Error(err, "update egressclusterpolicy status", "status", egcp.Status) - return reconcile.Result{Requeue: true}, err - } - } - } else { - if len(egp.Status.Node) == 0 { - egp.Status = policyStatus - log.V(1).Info("update egresspolicy status", "status", egp.Status) - err = r.client.Status().Update(ctx, egp) - if err != nil { - log.Error(err, "update egresspolicy status", "status", egp.Status) - return reconcile.Result{Requeue: true}, err - } - } - } - } + minEipNodeIndex := -1 + minEipCount := -1 + useNodeIPIndex := -1 - isUpdate = true - goto update + for i, node := range gateway.Status.NodeList { + if node.Status == "Ready" { + eipCount := len(node.Eips) + if minEipCount == -1 || eipCount < minEipCount { + minEipNodeIndex = i + minEipCount = eipCount + for tmp, eip := range node.Eips { + if eip.IPv4 == "" && eip.IPv6 == "" { + useNodeIPIndex = tmp + } } } } - } -update: - if isUpdate { - ipv4sFree, ipv6sFree, ipv4sTotal, ipv6sTotal, err := countGatewayIP(egw) - if err != nil { - r.log.Error(err, "count egress gateway ippools", "nodeList", egw.Status.NodeList) - return reconcile.Result{Requeue: true}, nil - } - egw.Status.IPUsage.IPv4Free = ipv4sFree - egw.Status.IPUsage.IPv4Total = ipv4sTotal - egw.Status.IPUsage.IPv6Free = ipv6sFree - egw.Status.IPUsage.IPv6Total = ipv6sTotal - r.log.V(1).Info("update egress gateway status", "status", egw.Status) - err = r.client.Status().Update(ctx, egw) - if err != nil { - r.log.Error(err, "update egress gateway status", "status", egw.Status) - return reconcile.Result{Requeue: true}, err - } - } - return reconcile.Result{}, nil -} - -func (r egnReconciler) deleteNodeFromEGs(ctx context.Context, log logr.Logger, nodeName string, egwList *egress.EgressGatewayList) error { - for _, egw := range egwList.Items { - for _, eipStatus := range egw.Status.NodeList { - if nodeName == eipStatus.Name { - err := r.deleteNodeFromEG(ctx, log, nodeName, egw) - if err != nil { - return err + if minEipNodeIndex != -1 { + for _, eip := range needMoveIPs { + if eip.IPv4 == "" && eip.IPv6 == "" { + // case 1: move user node ip case + if useNodeIPIndex != -1 { + // case: append policy to target node + gateway.Status.NodeList[minEipNodeIndex].Eips[useNodeIPIndex].Policies = append( + gateway.Status.NodeList[minEipNodeIndex].Eips[useNodeIPIndex].Policies, + eip.Policies..., + ) + } else { + // case: need create new + useNodeIPIndex = len(gateway.Status.NodeList[minEipNodeIndex].Eips) + gateway.Status.NodeList[minEipNodeIndex].Eips = append( + gateway.Status.NodeList[minEipNodeIndex].Eips, + egress.Eips{IPv4: "", IPv6: "", Policies: eip.Policies}, + ) } - break + } else { + // case 2: move eip case + gateway.Status.NodeList[minEipNodeIndex].Eips = append(gateway.Status.NodeList[minEipNodeIndex].Eips, eip) } } } - return nil + // case: no healthy nodes to move(migrate) eip, do nothing } -// Delete the node from the EgressGateway -func (r egnReconciler) deleteNodeFromEG(ctx context.Context, log logr.Logger, nodeName string, egw egress.EgressGateway) error { - // Get the policy that needs to be reassigned - policies, isExist := GetPoliciesByNode(nodeName, egw) +func (r *egnReconciler) reconcileEgressClusterPolicy(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { + deleted := false + policy := new(egress.EgressClusterPolicy) + err := r.client.Get(ctx, req.NamespacedName, policy) + if err != nil { + if !errors.IsNotFound(err) { + return reconcile.Result{Requeue: true}, err + } + deleted = true + } + deleted = deleted || !policy.GetDeletionTimestamp().IsZero() + if deleted { + return r.reconcileDeletePolicy(ctx, req, policy.Spec.EgressGatewayName, log) + } - if isExist { - perNodeMap := make(map[string]egress.EgressIPStatus) - for _, item := range egw.Status.NodeList { - if nodeName != item.Name { - perNodeMap[item.Name] = item + if policy != nil && policy.Name != "" && policy.Spec.EgressGatewayName != "" { + gateway := new(egress.EgressGateway) + gatewayName := policy.Spec.EgressGatewayName + err := r.cli.Get(ctx, types.NamespacedName{Name: gatewayName}, gateway) + if err != nil { + if !errors.IsNotFound(err) { + return reconcile.Result{Requeue: true}, err } + return reconcile.Result{Requeue: false}, fmt.Errorf("reconcile EgressPolicy %s, not found egress gateway: %s", req, gatewayName) } - - // Redistribute network gateway nodes - for _, policy := range policies { - err := r.reAllocatorPolicy(ctx, log, policy, &egw, perNodeMap) + assignedIP := getAssignedIP(gateway, req.Namespace, req.Name) + if assignedIP == nil { + assignedIP, err = assignIP(gateway, req, policy.Spec.EgressIP) if err != nil { - r.log.Error(err, "failed to reassign a gateway node for EgressPolicy", "policy", policy) - return err + return reconcile.Result{Requeue: true}, err + } + if assignedIP == nil { + return reconcile.Result{Requeue: true}, fmt.Errorf("not enough ip") + } + err = updateGatewayStatusWithUsage(ctx, r.client, gateway) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + err := updateEgressClusterPolicyStatusIfNeed(ctx, r.client, policy, assignedIP) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + } else { + err := updateEgressClusterPolicyStatusIfNeed(ctx, r.client, policy, assignedIP) + if err != nil { + return reconcile.Result{Requeue: true}, err } } + } + return reconcile.Result{}, nil +} - var perNodeList []egress.EgressIPStatus - for _, node := range perNodeMap { - perNodeList = append(perNodeList, node) - } - - egw.Status.NodeList = perNodeList - ipv4sFree, ipv6sFree, ipv4sTotal, ipv6sTotal, err := countGatewayIP(&egw) +func (r *egnReconciler) reconcileDeletePolicy(ctx context.Context, req reconcile.Request, egwName string, log logr.Logger) (reconcile.Result, error) { + if egwName == "" { + gatewayList := new(egress.EgressGatewayList) + err := r.cli.List(ctx, gatewayList) if err != nil { - r.log.Error(err, "count egress gateway ippools", "nodeList", egw.Status.NodeList) - return err + if !errors.IsNotFound(err) { + return reconcile.Result{Requeue: true}, err + } + return reconcile.Result{Requeue: false}, nil } - egw.Status.IPUsage.IPv4Free = ipv4sFree - egw.Status.IPUsage.IPv4Total = ipv4sTotal - egw.Status.IPUsage.IPv6Free = ipv6sFree - egw.Status.IPUsage.IPv6Total = ipv6sTotal - - r.log.V(1).Info("update egress gateway status", "status", egw.Status) - err = r.client.Status().Update(ctx, &egw) + if len(gatewayList.Items) == 0 { + return reconcile.Result{Requeue: false}, nil + } + for _, gateway := range gatewayList.Items { + update, err := deleteEgressPolicy(&gateway, req.Namespace, req.Name) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + if update { + err := updateGatewayStatusWithUsage(ctx, r.client, &gateway) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + break + } + } + } else { + gateway := new(egress.EgressGateway) + err := r.cli.Get(ctx, types.NamespacedName{Name: egwName}, gateway) if err != nil { - r.log.Error(err, "update egress gateway status", "status", egw.Status) - return err + if !errors.IsNotFound(err) { + return reconcile.Result{Requeue: true}, err + } + return reconcile.Result{Requeue: false}, nil + } + if gateway.Name != "" { + update, err := deleteEgressPolicy(gateway, req.Namespace, req.Name) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + if update { + err := updateGatewayStatusWithUsage(ctx, r.client, gateway) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + } } } - - return nil + return reconcile.Result{Requeue: false}, nil } -func (r egnReconciler) reAllocatorPolicy(ctx context.Context, log logr.Logger, policy egress.Policy, egw *egress.EgressGateway, nodeMap map[string]egress.EgressIPStatus) error { - var perNode string - var ipv4, ipv6 string - var err error - pi := policyInfo{} - pi.policy = policy - - if len(nodeMap) == 0 { - r.log.Info("egw: ", egw.Name, " does not have a matching node") - return nil +func (r *egnReconciler) reconcileEgressPolicy(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { + deleted := false + policy := new(egress.EgressPolicy) + err := r.client.Get(ctx, req.NamespacedName, policy) + if err != nil { + if !errors.IsNotFound(err) { + return reconcile.Result{Requeue: true}, err + } + deleted = true } + deleted = deleted || !policy.GetDeletionTimestamp().IsZero() - if len(pi.policy.Namespace) == 0 { - egcp := &egress.EgressClusterPolicy{} - err := r.client.Get(ctx, types.NamespacedName{Name: pi.policy.Name}, egcp) - if err != nil { - return err + if deleted { + egwName := "" + if policy != nil && policy.Spec.EgressGatewayName != "" { + egwName = policy.Spec.EgressGatewayName } - - if len(egcp.Spec.EgressIP.IPv4) != 0 { - pi.ipv4 = egcp.Spec.EgressIP.IPv4 + if egwName == "" { + gatewayList := new(egress.EgressGatewayList) + err := r.cli.List(ctx, gatewayList) + if err != nil { + if !errors.IsNotFound(err) { + return reconcile.Result{Requeue: true}, err + } + return reconcile.Result{Requeue: false}, nil + } + if len(gatewayList.Items) == 0 { + return reconcile.Result{Requeue: false}, nil + } + for _, gateway := range gatewayList.Items { + update, err := deleteEgressPolicy(&gateway, req.Namespace, req.Name) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + if update { + err := updateGatewayStatusWithUsage(ctx, r.client, &gateway) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + break + } + } } else { - pi.ipv4 = egcp.Status.Eip.Ipv4 + gateway := new(egress.EgressGateway) + err := r.cli.Get(ctx, types.NamespacedName{Name: egwName}, gateway) + if err != nil { + if !errors.IsNotFound(err) { + return reconcile.Result{Requeue: true}, err + } + return reconcile.Result{Requeue: false}, nil + } + if gateway.Name != "" { + update, err := deleteEgressPolicy(gateway, req.Namespace, req.Name) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + if update { + err := updateGatewayStatusWithUsage(ctx, r.client, gateway) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + } + } } + return reconcile.Result{Requeue: false}, nil + } - if len(egcp.Spec.EgressIP.IPv6) != 0 { - pi.ipv6 = egcp.Spec.EgressIP.IPv6 + if policy != nil && policy.Name != "" && policy.Spec.EgressGatewayName != "" { + gateway := new(egress.EgressGateway) + gatewayName := policy.Spec.EgressGatewayName + err := r.cli.Get(ctx, types.NamespacedName{Name: gatewayName}, gateway) + if err != nil { + if !errors.IsNotFound(err) { + return reconcile.Result{Requeue: true}, err + } + return reconcile.Result{Requeue: false}, fmt.Errorf("reconcile EgressPolicy %s, not found egress gateway: %s", req, gatewayName) + } + assignedIP := getAssignedIP(gateway, req.Namespace, req.Name) + if assignedIP == nil { + assignedIP, err = assignIP(gateway, req, policy.Spec.EgressIP) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + if assignedIP == nil { + return reconcile.Result{Requeue: true}, fmt.Errorf("not enough ip") + } + err = updateGatewayStatusWithUsage(ctx, r.client, gateway) + if err != nil { + return reconcile.Result{Requeue: true}, err + } + //err = updateEgressPolicyIfNeed(ctx, r.client, policy, assignedIP) + //if err != nil { + // return reconcile.Result{Requeue: true}, err + //} + err := updateEgressPolicyStatusIfNeed(ctx, r.client, policy, assignedIP) + if err != nil { + return reconcile.Result{Requeue: true}, err + } } else { - pi.ipv6 = egcp.Status.Eip.Ipv6 + err := updateEgressPolicyStatusIfNeed(ctx, r.client, policy, assignedIP) + if err != nil { + return reconcile.Result{Requeue: true}, err + } } + } - pi.isUseNodeIP = egcp.Spec.EgressIP.UseNodeIP - pi.egw = egcp.Spec.EgressGatewayName - pi.allocatorPolicy = egcp.Spec.EgressIP.AllocatorPolicy - } else { - egp := &egress.EgressPolicy{} - err := r.client.Get(ctx, types.NamespacedName{Namespace: pi.policy.Namespace, Name: pi.policy.Name}, egp) - if err != nil { - return err - } + return reconcile.Result{}, nil +} - if len(egp.Spec.EgressIP.IPv4) != 0 { - pi.ipv4 = egp.Spec.EgressIP.IPv4 - } else { - pi.ipv4 = egp.Status.Eip.Ipv4 +func assignIP(from *egress.EgressGateway, req reconcile.Request, specEgressIP egress.EgressIP) (*AssignedIP, error) { + // apply node policy to select node + nIndex := -1 + eipNum := -1 + for nodeIndex, node := range from.Status.NodeList { + if node.Status != string(egress.EgressTunnelReady) { + continue } - - if len(egp.Spec.EgressIP.IPv6) != 0 { - pi.ipv6 = egp.Spec.EgressIP.IPv6 - } else { - pi.ipv6 = egp.Status.Eip.Ipv6 + if eipNum == -1 { + eipNum = len(node.Eips) + nIndex = nodeIndex + } else if eipNum > len(node.Eips) { + eipNum = len(node.Eips) + nIndex = nodeIndex + } + } + + // case1 + if specEgressIP.UseNodeIP { + if nIndex != -1 { + for eipIndex, eip := range from.Status.NodeList[nIndex].Eips { + if eip.IPv4 == "" && eip.IPv6 == "" { + from.Status.NodeList[nIndex].Eips[eipIndex].Policies = append( + from.Status.NodeList[nIndex].Eips[eipIndex].Policies, + egress.Policy{Name: req.Name, Namespace: req.Namespace}, + ) + return &AssignedIP{ + Node: from.Status.NodeList[nIndex].Name, + UseNodeIP: true, + }, nil + } + } + // not found + from.Status.NodeList[nIndex].Eips = append( + from.Status.NodeList[nIndex].Eips, + egress.Eips{ + IPv4: "", IPv6: "", + Policies: []egress.Policy{{Name: req.Name, Namespace: req.Namespace}}, + }, + ) + return &AssignedIP{ + Node: from.Status.NodeList[nIndex].Name, + UseNodeIP: true, + }, nil + } + return nil, nil + } + + // case2 reuse eip + if specEgressIP.IPv4 != "" || specEgressIP.IPv6 != "" { + // check + for nodeIndex, node := range from.Status.NodeList { + for eipIndex, eip := range node.Eips { + if eip.IPv4 == specEgressIP.IPv4 || eip.IPv6 == specEgressIP.IPv6 { + from.Status.NodeList[nodeIndex].Eips[eipIndex].Policies = append( + from.Status.NodeList[nodeIndex].Eips[eipIndex].Policies, egress.Policy{ + Name: req.Name, + Namespace: req.Namespace, + }) + return &AssignedIP{ + Node: node.Name, + IPv4: eip.IPv4, + IPv6: eip.IPv6, + UseNodeIP: false, + }, nil + } + } } - - pi.isUseNodeIP = egp.Spec.EgressIP.UseNodeIP - pi.egw = egp.Spec.EgressGatewayName - pi.allocatorPolicy = egp.Spec.EgressIP.AllocatorPolicy } - ipv4 = pi.ipv4 - if len(ipv4) != 0 { - perNode = GetNodeByIP(ipv4, *egw) - if nodeMap[perNode].Status != string(egress.EgressTunnelReady) { - perNode = "" + // case3 assign new IP use eip assign policy + // + if specEgressIP.AllocatorPolicy == egress.EipAllocatorRR { + randObj := rand.New(rand.NewSource(time.Now().UnixNano())) + assignedIP := &AssignedIP{ + Node: "", + IPv4: "", + IPv6: "", + UseNodeIP: false, } - - if len(perNode) == 0 { - perNode, err = r.allocatorNode("rr", nodeMap) + if len(from.Spec.Ippools.IPv4) > 0 { + ipv4Ranges, err := ip.MergeIPRanges(constant.IPv4, from.Spec.Ippools.IPv4) if err != nil { - return err + return nil, fmt.Errorf("assignIP MergeIPRanges with error: %s", err) + } + // user specify ipv4 + if specEgressIP.IPv4 != "" { + ok, err := ip.IsIPIncludedRange(constant.IPv4, specEgressIP.IPv4, ipv4Ranges) + if err != nil { + return nil, fmt.Errorf("encountered an error while trying to check if the Egress IP of Policy %s/%s exists in the ippool: %v", req.Namespace, req.Name, err) + } + if !ok { + return nil, fmt.Errorf("the specified egress IPv4 %s is not in the gateway's ippool", specEgressIP.IPv4) + } + assignedIP.IPv4 = specEgressIP.IPv4 + } else { + ipv4s, err := ip.ParseIPRanges(constant.IPv4, ipv4Ranges) + if err != nil { + return nil, err + } + var useIpv4s []net.IP + for _, node := range from.Status.NodeList { + for _, eip := range node.Eips { + if len(eip.IPv4) != 0 { + useIpv4s = append(useIpv4s, net.ParseIP(eip.IPv4)) + } + } + } + freeIpv4s := ip.IPsDiffSet(ipv4s, useIpv4s, false) + if len(freeIpv4s) == 0 { + return nil, fmt.Errorf("EgressGateway %s does not have enough IPs to allocate for Policy %s/%s", from.Name, req.Namespace, req.Name) + } + assignedIP.IPv4 = freeIpv4s[randObj.Intn(len(freeIpv4s))].String() } } - ipv4, ipv6, err = r.allocatorEIP("", perNode, pi, *egw) - if err != nil { - return err - } - } else { - allocatorPolicy := pi.allocatorPolicy - if allocatorPolicy == egress.EipAllocatorRR { - perNode, err = r.allocatorNode("rr", nodeMap) + if len(from.Spec.Ippools.IPv6) > 0 { + ipv6Ranges, err := ip.MergeIPRanges(constant.IPv6, from.Spec.Ippools.IPv6) if err != nil { - return err + return nil, fmt.Errorf("assignIP MergeIPRanges with error: %s", err) } - - ipv4, ipv6, err = r.allocatorEIP("", perNode, pi, *egw) - if err != nil { - return err + // user specify ipv6 + if specEgressIP.IPv6 != "" { + ok, err := ip.IsIPIncludedRange(constant.IPv6, specEgressIP.IPv6, ipv6Ranges) + if err != nil { + return nil, fmt.Errorf("encountered an error while trying to check if the Egress IP of Policy %s/%s exists in the ippool: %v", req.Namespace, req.Name, err) + } + if !ok { + return nil, fmt.Errorf("the specified egress IPv6 %s is not in the gateway's ippool", specEgressIP.IPv6) + } + assignedIP.IPv6 = specEgressIP.IPv6 + } else { + ipv6s, err := ip.ParseIPRanges(constant.IPv6, ipv6Ranges) + if err != nil { + return nil, err + } + var useIpv6s []net.IP + for _, node := range from.Status.NodeList { + for _, eip := range node.Eips { + if len(eip.IPv6) != 0 { + useIpv6s = append(useIpv6s, net.ParseIP(eip.IPv6)) + } + } + } + freeIpv6s := ip.IPsDiffSet(ipv6s, useIpv6s, false) + if len(freeIpv6s) == 0 { + return nil, fmt.Errorf("EgressGateway %s does not have enough IPs to allocate for Policy %s/%s", from.Name, req.Namespace, req.Name) + } + assignedIP.IPv6 = freeIpv6s[randObj.Intn(len(freeIpv6s))].String() } - } else { - ipv4 = egw.Spec.Ippools.Ipv4DefaultEIP - ipv6 = egw.Spec.Ippools.Ipv6DefaultEIP + } - perNode = GetNodeByIP(ipv4, *egw) - if nodeMap[perNode].Status != string(egress.EgressTunnelReady) { - perNode = "" + assignedIP.Node = from.Status.NodeList[nIndex].Name + // append assignedIP to egw + from.Status.NodeList[nIndex].Eips = append( + from.Status.NodeList[nIndex].Eips, + egress.Eips{ + IPv4: assignedIP.IPv4, + IPv6: assignedIP.IPv6, + Policies: []egress.Policy{{Name: req.Name, Namespace: req.Namespace}}, + }, + ) + return assignedIP, nil + } else { + assignedIP := &AssignedIP{ + Node: "", + IPv4: from.Spec.Ippools.Ipv4DefaultEIP, + IPv6: from.Spec.Ippools.Ipv6DefaultEIP, + UseNodeIP: false, + } + defaultEipIndex := -1 + for i, node := range from.Status.NodeList { + for _, eip := range node.Eips { + if eip.IPv4 != "" && eip.IPv4 == from.Spec.Ippools.Ipv4DefaultEIP { + defaultEipIndex = i + break + } + if eip.IPv6 != "" && eip.IPv6 == from.Spec.Ippools.Ipv6DefaultEIP { + defaultEipIndex = i + break + } } - - if len(perNode) == 0 { - perNode, err = r.allocatorNode("rr", nodeMap) - if err != nil { - return err + if defaultEipIndex != -1 { + assignedIP.Node = node.Name + break + } + } + if defaultEipIndex == -1 { + for i, node := range from.Status.NodeList { + if node.Status != string(egress.EgressTunnelReady) { + continue } + from.Status.NodeList[i].Eips = append( + from.Status.NodeList[i].Eips, + egress.Eips{ + IPv4: from.Spec.Ippools.Ipv4DefaultEIP, + IPv6: from.Spec.Ippools.Ipv6DefaultEIP, + Policies: []egress.Policy{{Name: req.Name, Namespace: req.Namespace}}, + }, + ) } } - } - - log.Info("reAllocatorPolicy", " policy=", pi.policy, " perNode=", perNode, " ipv4=", ipv4, " ipv6=", ipv6) + if assignedIP.Node == "" { + return nil, fmt.Errorf("EgressGateway %s does not have an available Node", from.Name) + } - err = setEipStatus(ipv4, ipv6, perNode, pi.policy, nodeMap) - if err != nil { - return err + return assignedIP, nil } - - return nil } -func (r egnReconciler) allocatorNode(selNodePolicy string, nodeMap map[string]egress.EgressIPStatus) (string, error) { - - if len(nodeMap) == 0 { - err := fmt.Errorf("nodeList is empty") - return "", err - } +type AssignedIP struct { + Node string + IPv4 string + IPv6 string + UseNodeIP bool +} - var perNode string - perNodePolicyNum := 0 - i := 0 - for _, node := range nodeMap { - if node.Status != string(egress.EgressTunnelReady) { - continue +func updateEgressPolicyIfNeed(ctx context.Context, cli client.Client, policy *egress.EgressPolicy, assignedIP *AssignedIP) error { + if policy.Spec.EgressIP.IPv4 != assignedIP.IPv4 || policy.Spec.EgressIP.IPv4 != assignedIP.IPv6 { + policy.Spec.EgressIP.IPv4 = assignedIP.IPv4 + policy.Spec.EgressIP.IPv6 = assignedIP.IPv6 + err := cli.Update(ctx, policy) + if err != nil { + return err } + } + return nil +} - policyNum := 0 +func getAssignedIP(from *egress.EgressGateway, policyNs, policyName string) *AssignedIP { + for _, node := range from.Status.NodeList { for _, eip := range node.Eips { - policyNum += len(eip.Policies) - } - - if i == 0 { - i++ - perNode = node.Name - perNodePolicyNum = policyNum - } else if policyNum <= perNodePolicyNum { - perNode = node.Name - perNodePolicyNum = policyNum + for _, policy := range eip.Policies { + if policy.Name == policyName && policy.Namespace == policyNs { + return &AssignedIP{Node: node.Name, IPv4: eip.IPv4, IPv6: eip.IPv6} + } + } } } - - return perNode, nil + return nil } -func (r egnReconciler) allocatorEIP(selEipLolicy string, nodeName string, pi policyInfo, egw egress.EgressGateway) (string, string, error) { +func updateEgressPolicyStatusIfNeed(ctx context.Context, cli client.Client, policy *egress.EgressPolicy, assignedIP *AssignedIP) error { + if policy.Status.Eip.Ipv4 != assignedIP.IPv4 || policy.Status.Eip.Ipv6 != assignedIP.IPv6 || policy.Status.Node != assignedIP.Node { + policy.Status.Eip.Ipv4 = assignedIP.IPv4 + policy.Status.Eip.Ipv6 = assignedIP.IPv6 + policy.Status.Node = assignedIP.Node - if pi.isUseNodeIP || len(nodeName) == 0 { - return "", "", nil - } - var perIpv4 string - var perIpv6 string - rander := rand.New(rand.NewSource(time.Now().UnixNano())) - - if len(egw.Spec.Ippools.IPv4) > 0 { - var useIpv4s []net.IP - - ipv4Ranges, _ := ip.MergeIPRanges(constant.IPv4, egw.Spec.Ippools.IPv4) - perIpv4 = pi.ipv4 - if len(perIpv4) != 0 { - result, err := ip.IsIPIncludedRange(constant.IPv4, perIpv4, ipv4Ranges) - if err != nil { - return "", "", err - } - if !result { - return "", "", fmt.Errorf("%v is not within the EIP range of EgressGateway %v", perIpv4, egw.Name) - } - } else { - for _, node := range egw.Status.NodeList { - for _, eip := range node.Eips { - if len(eip.IPv4) != 0 { - useIpv4s = append(useIpv4s, net.ParseIP(eip.IPv4)) + err := cli.Status().Update(ctx, policy) + if err != nil { + if errors.IsConflict(err) { + newPolicy := new(egress.EgressPolicy) + err := cli.Get(ctx, types.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, policy) + if err != nil { + if !errors.IsNotFound(err) { + return nil } + return err + } + newPolicy.Status = policy.Status + err = cli.Status().Update(ctx, policy) + if err != nil { + return err } } - - ipv4s, _ := ip.ParseIPRanges(constant.IPv4, ipv4Ranges) - freeIpv4s := ip.IPsDiffSet(ipv4s, useIpv4s, false) - - if len(freeIpv4s) == 0 { - return "", "", fmt.Errorf("No Egress IPV4 is available; policy=%v egw=%v", pi.policy, egw.Name) - - // save it for later policy - // var useIpv4sByNode []net.IP - // for _, node := range egw.Status.NodeList { - // if node.Name == nodeName { - // for _, eip := range node.Eips { - // if len(eip.IPv4) != 0 { - // useIpv4sByNode = append(useIpv4sByNode, net.ParseIP(eip.IPv4)) - // } - // } - // } - // } - - // if len(useIpv4sByNode) == 0 { - // return "", "", fmt.Errorf("No EIP meeting requirements is found on node %v; EG %v", nodeName, egw.Name) - // } - - // perIpv4 = useIpv4sByNode[rander.Intn(len(useIpv4sByNode))].String() - } else { - perIpv4 = freeIpv4s[rander.Intn(len(freeIpv4s))].String() - } + return err } } + return nil +} - if len(egw.Spec.Ippools.IPv6) > 0 { - if len(perIpv4) != 0 && len(GetEipByIPV4(perIpv4, egw).IPv6) != 0 { - return perIpv4, GetEipByIPV4(perIpv4, egw).IPv6, nil +func updateEgressClusterPolicyStatusIfNeed(ctx context.Context, cli client.Client, policy *egress.EgressClusterPolicy, assignedIP *AssignedIP) error { + if policy.Status.Eip.Ipv4 != assignedIP.IPv4 || policy.Status.Eip.Ipv6 != assignedIP.IPv6 || policy.Status.Node != assignedIP.Node { + policy.Status.Eip.Ipv4 = assignedIP.IPv4 + policy.Status.Eip.Ipv6 = assignedIP.IPv6 + policy.Status.Node = assignedIP.Node + err := cli.Status().Update(ctx, policy) + if err != nil { + if errors.IsConflict(err) { + newPolicy := new(egress.EgressPolicy) + err := cli.Get(ctx, types.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, policy) + if err != nil { + if !errors.IsNotFound(err) { + return nil + } + return err + } + newPolicy.Status = policy.Status + err = cli.Status().Update(ctx, policy) + if err != nil { + return err + } + } + return err } + } + return nil +} - var useIpv6s []net.IP - - ipv6Ranges, _ := ip.MergeIPRanges(constant.IPv6, egw.Spec.Ippools.IPv6) +func updateGatewayStatusWithUsage(ctx context.Context, cli client.Client, gateway *egress.EgressGateway) error { + if gateway == nil { + return fmt.Errorf("gateway is nil") + } + ipv4sFree, ipv6sFree, ipv4sTotal, ipv6sTotal, err := countGatewayIP(gateway) + if err != nil { + return fmt.Errorf("failed to calculate gateway ip usage") + } + gateway.Status.IPUsage.IPv4Free = ipv4sFree + gateway.Status.IPUsage.IPv6Free = ipv6sFree + gateway.Status.IPUsage.IPv4Total = ipv4sTotal + gateway.Status.IPUsage.IPv6Total = ipv6sTotal + err = cli.Status().Update(ctx, gateway) + if err != nil { + return err + } + return nil +} - perIpv6 = pi.ipv6 - if len(perIpv6) != 0 { - result, err := ip.IsIPIncludedRange(constant.IPv6, perIpv6, ipv6Ranges) - if err != nil { - return "", "", err - } - if !result { - return "", "", fmt.Errorf("%v is not within the EIP range of EgressGateway %v", perIpv6, egw.Name) - } - } else { - for _, node := range egw.Status.NodeList { - for _, eip := range node.Eips { - if len(eip.IPv6) != 0 { - useIpv6s = append(useIpv6s, net.ParseIP(eip.IPv6)) +func deleteEgressPolicy(gateway *egress.EgressGateway, policyNs, policyName string) (bool, error) { + if gateway == nil { + return false, fmt.Errorf("gateway is nil") + } + + policyFound := false + + for nodeIndex, node := range gateway.Status.NodeList { + for eipIndex, eip := range node.Eips { + for policyIndex, policy := range eip.Policies { + if policy.Name == policyName && policy.Namespace == policyNs { + gateway.Status.NodeList[nodeIndex].Eips[eipIndex].Policies = append( + gateway.Status.NodeList[nodeIndex].Eips[eipIndex].Policies[:policyIndex], + gateway.Status.NodeList[nodeIndex].Eips[eipIndex].Policies[policyIndex+1:]..., + ) + // if it is the latest policy, we delete this eip + if len(gateway.Status.NodeList[nodeIndex].Eips[eipIndex].Policies) == 0 { + gateway.Status.NodeList[nodeIndex].Eips = append( + gateway.Status.NodeList[nodeIndex].Eips[:eipIndex], + gateway.Status.NodeList[nodeIndex].Eips[eipIndex+1:]..., + ) } + policyFound = true + break } } - - ipv6s, _ := ip.ParseIPRanges(constant.IPv6, ipv6Ranges) - freeIpv6s := ip.IPsDiffSet(ipv6s, useIpv6s, false) - - if len(freeIpv6s) == 0 { - return "", "", fmt.Errorf("No Egress IPV6 is available; policy=%v egw=%v", pi.policy, egw.Name) - - // save it for later policy - // var useIpv6sByNode []net.IP - // for _, node := range egw.Status.NodeList { - // if node.Name == nodeName { - // for _, eip := range node.Eips { - // if len(eip.IPv6) != 0 { - // useIpv6sByNode = append(useIpv6sByNode, net.ParseIP(eip.IPv6)) - // } - // } - // } - // } - - // if len(useIpv6sByNode) == 0 { - // return "", "", fmt.Errorf("No EIP meeting requirements is found on node %v; EG %v", nodeName, egw.Name) - // } - // perIpv6 = useIpv6sByNode[rander.Intn(len(useIpv6sByNode))].String() - } else { - perIpv6 = freeIpv6s[rander.Intn(len(freeIpv6s))].String() + if policyFound { + break } } + if policyFound { + break + } } - - return perIpv4, perIpv6, nil + return policyFound, nil } -func NewEgressGatewayController(mgr manager.Manager, log logr.Logger, cfg *config.Config) error { +func NewEgressGatewayController(mgr manager.Manager, log logr.Logger, cfg *config.Config, client client.Client) error { if cfg == nil { return fmt.Errorf("cfg can not be nil") } @@ -1057,6 +1186,7 @@ func NewEgressGatewayController(mgr manager.Manager, log logr.Logger, cfg *confi client: mgr.GetClient(), log: log, config: cfg, + cli: client, } c, err := controller.New("egressGateway", mgr, @@ -1066,27 +1196,27 @@ func NewEgressGatewayController(mgr manager.Manager, log logr.Logger, cfg *confi } if err = c.Watch(source.Kind(mgr.GetCache(), &egress.EgressGateway{}), - handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressGateway"))); err != nil { + handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressGateway")), egressGatewayPredicate{}); err != nil { return fmt.Errorf("failed to watch EgressGateway: %w", err) } if err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Node{}), - handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("Node"))); err != nil { + handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("Node")), nodePredicate{}); err != nil { return fmt.Errorf("failed to watch Node: %w", err) } if err = c.Watch(source.Kind(mgr.GetCache(), &egress.EgressPolicy{}), - handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressPolicy"))); err != nil { + handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressPolicy")), egressPolicyPredicate{}); err != nil { return fmt.Errorf("failed to watch EgressPolicy: %w", err) } if err = c.Watch(source.Kind(mgr.GetCache(), &egress.EgressClusterPolicy{}), - handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressClusterPolicy"))); err != nil { + handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressClusterPolicy")), egressClusterPolicyPredicate{}); err != nil { return fmt.Errorf("failed to watch EgressClusterPolicy: %w", err) } if err = c.Watch(source.Kind(mgr.GetCache(), &egress.EgressTunnel{}), - handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressTunnel"))); err != nil { + handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressTunnel")), egressTunnelPredicate{}); err != nil { return fmt.Errorf("failed to watch EgressTunnel: %w", err) } @@ -1119,144 +1249,6 @@ func GetEipByIPV6(ipv6 string, egw egress.EgressGateway) egress.Eips { return eipInfo } -func GetNodeByIP(ipv4 string, egw egress.EgressGateway) string { - var nodeName string - for _, node := range egw.Status.NodeList { - for _, eip := range node.Eips { - if eip.IPv4 == ipv4 { - nodeName = node.Name - } - } - } - - return nodeName -} - -func setEipStatus(ipv4, ipv6 string, nodeName string, policy egress.Policy, nodeMap map[string]egress.EgressIPStatus) error { - if len(nodeName) == 0 { - return nil - } - - eipStatus, ok := nodeMap[nodeName] - if !ok { - return fmt.Errorf("the %v node is not a gateway node", nodeName) - } - isExist := false - newEipStatus := egress.EgressIPStatus{} - - for _, eip := range eipStatus.Eips { - if (len(ipv4) != 0 && ipv4 == eip.IPv4) || (len(ipv6) != 0 && ipv6 == eip.IPv6) { - eip.Policies = append(eip.Policies, policy) - - isExist = true - } - newEipStatus.Eips = append(newEipStatus.Eips, eip) - } - - if !isExist { - newEip := egress.Eips{} - newEip.IPv4 = ipv4 - newEip.IPv6 = ipv6 - newEip.Policies = append(newEip.Policies, policy) - eipStatus.Eips = append(eipStatus.Eips, newEip) - nodeMap[nodeName] = eipStatus - } else { - newEipStatus.Name = nodeName - newEipStatus.Status = eipStatus.Status - nodeMap[nodeName] = newEipStatus - } - - return nil -} - -func GetPoliciesByNode(nodeName string, egw egress.EgressGateway) ([]egress.Policy, bool) { - - var eipStatus egress.EgressIPStatus - var policies []egress.Policy - isExist := false - for _, node := range egw.Status.NodeList { - if node.Name == nodeName { - eipStatus = node - isExist = true - } - } - - if isExist { - for _, eip := range eipStatus.Eips { - policies = append(policies, eip.Policies...) - } - } - - return policies, isExist -} - -func GetEIPStatusByPolicy(policy egress.Policy, egw egress.EgressGateway) (egress.EgressIPStatus, bool) { - var eipStatus egress.EgressIPStatus - isExist := false - - for _, item := range egw.Status.NodeList { - for _, eip := range item.Eips { - for _, p := range eip.Policies { - if p == policy { - eipStatus = item - isExist = true - } - } - } - } - - return eipStatus, isExist -} - -func DeletePolicyFromEG(log logr.Logger, policy egress.Policy, egw *egress.EgressGateway) { - var policies []egress.Policy - var eips []egress.Eips - for i, node := range egw.Status.NodeList { - for j, eip := range node.Eips { - for k, item := range eip.Policies { - if item == policy { - policies = append(eip.Policies[:k], eip.Policies[k+1:]...) - - if len(policies) == 0 { - // Release EIP - for x, e := range node.Eips { - if (len(eip.IPv4) != 0 && eip.IPv4 == e.IPv4) || (len(eip.IPv6) != 0 && eip.IPv6 == e.IPv6) { - eips = append(node.Eips[:x], node.Eips[x+1:]...) - log.Info("release", " EIP= ", node.Eips[x], " policy=", policy) - break - } - } - egw.Status.NodeList[i].Eips = eips - } else { - egw.Status.NodeList[i].Eips[j].Policies = policies - } - goto breakHere - } - } - } - } -breakHere: - return -} - -func egwIpsCount(status egress.EgressGatewayStatus) (int, int) { - ipv4Usage := 0 - ipv6Usage := 0 - for i := range status.NodeList { - for j := range status.NodeList[i].Eips { - if len(status.NodeList[i].Eips[j].IPv4) != 0 { - ipv4Usage++ - } - - if len(status.NodeList[i].Eips[j].IPv6) != 0 { - ipv6Usage++ - } - } - } - - return ipv4Usage, ipv6Usage -} - func countGatewayIP(egw *egress.EgressGateway) (ipv4sFree, ipv6sFree, ipv4sTotal, ipv6sTotal int, err error) { ipv4s, err := ip.ConvertCidrOrIPrangeToIPs(egw.Spec.Ippools.IPv4, constant.IPv4) if err != nil { @@ -1288,38 +1280,37 @@ func countGatewayIP(egw *egress.EgressGateway) (ipv4sFree, ipv6sFree, ipv4sTotal // removeEgressGatewayFinalizer if the egress gateway is being deleted func removeEgressGatewayFinalizer(egw *egress.EgressGateway) { - if !egw.DeletionTimestamp.IsZero() { - if containsEgressGatewayFinalizer(egw, egressGatewayFinalizers) { - egw.Finalizers = slice.RemoveElement(egw.Finalizers, egressGatewayFinalizers) - } + if containsEgressGatewayFinalizer(egw, egressGatewayFinalizers) { + egw.Finalizers = slice.RemoveElement(egw.Finalizers, egressGatewayFinalizers) } } -func getEgressGatewayPolicies(client client.Client, ctx context.Context, egw *egress.EgressGateway) ([]egress.Policy, error) { - policies := make([]egress.Policy, 0) - // list policy - policyList := &egress.EgressPolicyList{} - err := client.List(ctx, policyList) +func getPolicyCountByGatewayName(ctx context.Context, client client.Client, name string) (int, error) { + var num int + + list := new(egress.EgressPolicyList) + err := client.List(ctx, list) if err != nil { - return nil, err + return num, err } - for _, p := range policyList.Items { - if p.Spec.EgressGatewayName == egw.Name { - policies = append(policies, egress.Policy{Name: p.Name, Namespace: p.Namespace}) + for _, p := range list.Items { + if p.Spec.EgressGatewayName == name { + num++ } } - // list cluster policy - clusterPolicyList := &egress.EgressClusterPolicyList{} - err = client.List(ctx, clusterPolicyList) + + policyList := new(egress.EgressClusterPolicyList) + err = client.List(ctx, list) if err != nil { - return nil, err + return num, err } - for _, cp := range clusterPolicyList.Items { - if cp.Spec.EgressGatewayName == egw.Name { - policies = append(policies, egress.Policy{Name: cp.Name, Namespace: cp.Namespace}) + for _, p := range policyList.Items { + if p.Spec.EgressGatewayName == name { + num++ } } - return policies, nil + + return num, nil } func containsEgressGatewayFinalizer(gateway *egress.EgressGateway, finalizer string) bool { @@ -1330,3 +1321,116 @@ func containsEgressGatewayFinalizer(gateway *egress.EgressGateway, finalizer str } return false } + +type egressPolicyPredicate struct{} + +func (p egressPolicyPredicate) Create(_ event.CreateEvent) bool { return true } +func (p egressPolicyPredicate) Delete(_ event.DeleteEvent) bool { return true } +func (p egressPolicyPredicate) Update(updateEvent event.UpdateEvent) bool { + oldObj, ok := updateEvent.ObjectOld.(*egress.EgressPolicy) + if !ok { + return false + } + newObj, ok := updateEvent.ObjectNew.(*egress.EgressPolicy) + if !ok { + return false + } + if !reflect.DeepEqual(oldObj.Spec, newObj.Spec) { + return true + } + return false +} +func (p egressPolicyPredicate) Generic(_ event.GenericEvent) bool { return true } + +type egressClusterPolicyPredicate struct{} + +func (p egressClusterPolicyPredicate) Create(_ event.CreateEvent) bool { return true } +func (p egressClusterPolicyPredicate) Delete(_ event.DeleteEvent) bool { return true } +func (p egressClusterPolicyPredicate) Update(updateEvent event.UpdateEvent) bool { + oldObj, ok := updateEvent.ObjectOld.(*egress.EgressClusterPolicy) + if !ok { + return false + } + newObj, ok := updateEvent.ObjectNew.(*egress.EgressClusterPolicy) + if !ok { + return false + } + if !reflect.DeepEqual(oldObj.Spec, newObj.Spec) { + return true + } + return false +} +func (p egressClusterPolicyPredicate) Generic(_ event.GenericEvent) bool { return true } + +type egressGatewayPredicate struct{} + +func (p egressGatewayPredicate) Create(_ event.CreateEvent) bool { return true } +func (p egressGatewayPredicate) Delete(_ event.DeleteEvent) bool { return true } +func (p egressGatewayPredicate) Update(updateEvent event.UpdateEvent) bool { + oldObj, ok := updateEvent.ObjectOld.(*egress.EgressGateway) + if !ok { + return false + } + newObj, ok := updateEvent.ObjectNew.(*egress.EgressGateway) + if !ok { + return false + } + if !reflect.DeepEqual(oldObj.Spec, newObj.Spec) { + return true + } + return false +} +func (p egressGatewayPredicate) Generic(_ event.GenericEvent) bool { return true } + +type nodePredicate struct{} + +func (p nodePredicate) Create(_ event.CreateEvent) bool { return true } +func (p nodePredicate) Delete(_ event.DeleteEvent) bool { return true } +func (p nodePredicate) Update(updateEvent event.UpdateEvent) bool { + oldObj, ok := updateEvent.ObjectOld.(*corev1.Node) + if !ok { + return false + } + newObj, ok := updateEvent.ObjectNew.(*corev1.Node) + if !ok { + return false + } + if areMapsEqual(oldObj.Labels, newObj.Labels) { + return false + } + return true +} + +func areMapsEqual(mapA, mapB map[string]string) bool { + if len(mapA) != len(mapB) { + return false + } + for key, valueA := range mapA { + if valueB, ok := mapB[key]; !ok || valueA != valueB { + return false + } + } + return true +} + +func (p nodePredicate) Generic(_ event.GenericEvent) bool { return true } + +type egressTunnelPredicate struct{} + +func (p egressTunnelPredicate) Create(_ event.CreateEvent) bool { return true } +func (p egressTunnelPredicate) Delete(_ event.DeleteEvent) bool { return true } +func (p egressTunnelPredicate) Update(updateEvent event.UpdateEvent) bool { + oldEgressTunnel, ok := updateEvent.ObjectOld.(*egress.EgressTunnel) + if !ok { + return false + } + newEgressTunnel, ok := updateEvent.ObjectNew.(*egress.EgressTunnel) + if !ok { + return false + } + if oldEgressTunnel.Status.Phase != newEgressTunnel.Status.Phase { + return true + } + return false +} +func (p egressTunnelPredicate) Generic(_ event.GenericEvent) bool { return true } diff --git a/pkg/k8s/apis/v1beta1/egressgateway_types.go b/pkg/k8s/apis/v1beta1/egressgateway_types.go index 52aba8dc7..318fc0351 100644 --- a/pkg/k8s/apis/v1beta1/egressgateway_types.go +++ b/pkg/k8s/apis/v1beta1/egressgateway_types.go @@ -99,6 +99,16 @@ type EgressIPStatus struct { Status string `json:"status,omitempty"` } +func (status *EgressGatewayStatus) ReadyCount() int { + res := 0 + for _, node := range status.NodeList { + if EgressTunnelReady.IsEqual(node.Status) { + res++ + } + } + return res +} + type Eips struct { // +kubebuilder:validation:Optional IPv4 string `json:"ipv4,omitempty"` diff --git a/pkg/k8s/apis/v1beta1/egresstunnel_types.go b/pkg/k8s/apis/v1beta1/egresstunnel_types.go index 5e75dbb0e..a76cd0c1e 100644 --- a/pkg/k8s/apis/v1beta1/egresstunnel_types.go +++ b/pkg/k8s/apis/v1beta1/egresstunnel_types.go @@ -68,6 +68,18 @@ type Parent struct { type EgressTunnelPhase string +func (e EgressTunnelPhase) String() string { + return string(e) +} + +func (e EgressTunnelPhase) IsEqual(s string) bool { + return string(e) == s +} + +func (e EgressTunnelPhase) IsNotEqual(s string) bool { + return string(e) != s +} + const ( // EgressTunnelPending wait for tunnel address available EgressTunnelPending EgressTunnelPhase = "Pending"