From 73d34db6703a7357d82644c168cb58e9ab7d5f33 Mon Sep 17 00:00:00 2001 From: jlamillan Date: Thu, 21 Feb 2019 13:17:10 -0800 Subject: [PATCH] Use k8s informer cache instead of active API server calls in ingress and service sources. --- source/ingress.go | 56 +++++++++++++++++++------ source/ingress_test.go | 16 +++++++- source/service.go | 93 ++++++++++++++++++++++++++++++++++-------- source/service_test.go | 17 +++++++- source/shared_test.go | 12 +++++- 5 files changed, 160 insertions(+), 34 deletions(-) diff --git a/source/ingress.go b/source/ingress.go index f522b52424..163ea2886b 100644 --- a/source/ingress.go +++ b/source/ingress.go @@ -23,14 +23,17 @@ import ( "strings" "text/template" + "github.com/kubernetes-incubator/external-dns/endpoint" log "github.com/sirupsen/logrus" - "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + kubeinformers "k8s.io/client-go/informers" + extinformers "k8s.io/client-go/informers/extensions/v1beta1" "k8s.io/client-go/kubernetes" - - "github.com/kubernetes-incubator/external-dns/endpoint" + "k8s.io/client-go/tools/cache" + "time" ) // ingressSource is an implementation of Source for Kubernetes ingress objects. @@ -44,6 +47,7 @@ type ingressSource struct { fqdnTemplate *template.Template combineFQDNAnnotation bool ignoreHostnameAnnotation bool + ingressInformer extinformers.IngressInformer } // NewIngressSource creates a new ingressSource with the given config. @@ -61,31 +65,57 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt } } - return &ingressSource{ + // Use shared informer to listen for add/update/delete of ingresses in the specified namespace. + // Set resync period to 0, to prevent processing when nothing has changed. + informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace)) + ingressInformer := informerFactory.Extensions().V1beta1().Ingresses() + + // Add default resource event handlers to properly initialize informer. + ingressInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + }, + }, + ) + + // TODO informer is not explicitly stopped since controller is not passing in its channel. + informerFactory.Start(wait.NeverStop) + + // wait for the local cache to be populated. + err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) { + return ingressInformer.Informer().HasSynced() == true, nil + }) + if err != nil { + return nil, fmt.Errorf("failed to sync cache: %v", err) + } + + sc := &ingressSource{ client: kubeClient, namespace: namespace, annotationFilter: annotationFilter, fqdnTemplate: tmpl, combineFQDNAnnotation: combineFqdnAnnotation, ignoreHostnameAnnotation: ignoreHostnameAnnotation, - }, nil + ingressInformer: ingressInformer, + } + return sc, nil } // Endpoints returns endpoint objects for each host-target combination that should be processed. // Retrieves all ingress resources on all namespaces func (sc *ingressSource) Endpoints() ([]*endpoint.Endpoint, error) { - ingresses, err := sc.client.Extensions().Ingresses(sc.namespace).List(metav1.ListOptions{}) + ingresses, err := sc.ingressInformer.Lister().Ingresses(sc.namespace).List(labels.Everything()) if err != nil { return nil, err } - ingresses.Items, err = sc.filterByAnnotations(ingresses.Items) + ingresses, err = sc.filterByAnnotations(ingresses) if err != nil { return nil, err } endpoints := []*endpoint.Endpoint{} - for _, ing := range ingresses.Items { + for _, ing := range ingresses { // Check controller annotation to see if we are responsible. controller, ok := ing.Annotations[controllerAnnotationKey] if ok && controller != controllerAnnotationValue { @@ -94,11 +124,11 @@ func (sc *ingressSource) Endpoints() ([]*endpoint.Endpoint, error) { continue } - ingEndpoints := endpointsFromIngress(&ing, sc.ignoreHostnameAnnotation) + ingEndpoints := endpointsFromIngress(ing, sc.ignoreHostnameAnnotation) // apply template if host is missing on ingress if (sc.combineFQDNAnnotation || len(ingEndpoints) == 0) && sc.fqdnTemplate != nil { - iEndpoints, err := sc.endpointsFromTemplate(&ing) + iEndpoints, err := sc.endpointsFromTemplate(ing) if err != nil { return nil, err } @@ -161,7 +191,7 @@ func (sc *ingressSource) endpointsFromTemplate(ing *v1beta1.Ingress) ([]*endpoin } // filterByAnnotations filters a list of ingresses by a given annotation selector. -func (sc *ingressSource) filterByAnnotations(ingresses []v1beta1.Ingress) ([]v1beta1.Ingress, error) { +func (sc *ingressSource) filterByAnnotations(ingresses []*v1beta1.Ingress) ([]*v1beta1.Ingress, error) { labelSelector, err := metav1.ParseToLabelSelector(sc.annotationFilter) if err != nil { return nil, err @@ -176,7 +206,7 @@ func (sc *ingressSource) filterByAnnotations(ingresses []v1beta1.Ingress) ([]v1b return ingresses, nil } - filteredList := []v1beta1.Ingress{} + filteredList := []*v1beta1.Ingress{} for _, ingress := range ingresses { // convert the ingress' annotations to an equivalent label selector @@ -191,7 +221,7 @@ func (sc *ingressSource) filterByAnnotations(ingresses []v1beta1.Ingress) ([]v1b return filteredList, nil } -func (sc *ingressSource) setResourceLabel(ingress v1beta1.Ingress, endpoints []*endpoint.Endpoint) { +func (sc *ingressSource) setResourceLabel(ingress *v1beta1.Ingress, endpoints []*endpoint.Endpoint) { for _, ep := range endpoints { ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("ingress/%s/%s", ingress.Namespace, ingress.Name) } diff --git a/source/ingress_test.go b/source/ingress_test.go index 2f29f80200..c9b9164de9 100644 --- a/source/ingress_test.go +++ b/source/ingress_test.go @@ -18,10 +18,12 @@ package source import ( "testing" + "time" "k8s.io/api/core/v1" "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/fake" "github.com/kubernetes-incubator/external-dns/endpoint" @@ -990,7 +992,19 @@ func testIngressEndpoints(t *testing.T) { require.NoError(t, err) } - res, err := ingressSource.Endpoints() + var res []*endpoint.Endpoint + var err error + + // wait up to a few seconds for new resources to appear in informer cache. + err = wait.Poll(time.Second, 3*time.Second, func() (bool, error) { + res, err = ingressSource.Endpoints() + if err != nil { + // stop waiting if we get an error + return true, err + } + return len(res) >= len(ti.expected), nil + }) + if ti.expectError { assert.Error(t, err) } else { diff --git a/source/service.go b/source/service.go index 67b12c69d9..4c26bb5bf8 100644 --- a/source/service.go +++ b/source/service.go @@ -19,6 +19,9 @@ package source import ( "bytes" "fmt" + kubeinformers "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" "sort" "strings" "text/template" @@ -29,9 +32,11 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "github.com/kubernetes-incubator/external-dns/endpoint" + "time" ) const ( @@ -54,6 +59,9 @@ type serviceSource struct { ignoreHostnameAnnotation bool publishInternal bool publishHostIP bool + serviceInformer coreinformers.ServiceInformer + podInformer coreinformers.PodInformer + nodeInformer coreinformers.NodeInformer serviceTypeFilter map[string]struct{} } @@ -72,6 +80,47 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt } } + // Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace. + // Set resync period to 0, to prevent processing when nothing has changed + informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace)) + serviceInformer := informerFactory.Core().V1().Services() + podInformer := informerFactory.Core().V1().Pods() + nodeInformer := informerFactory.Core().V1().Nodes() + + // Add default resource event handlers to properly initialize informer. + serviceInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + log.Debug("service added") + }, + }, + ) + podInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + log.Debug("pod added") + }, + }, + ) + nodeInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + log.Debug("node added") + }, + }, + ) + + // TODO informer is not explicitly stopped since controller is not passing in its channel. + informerFactory.Start(wait.NeverStop) + + // wait for the local cache to be populated. + err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) { + return serviceInformer.Informer().HasSynced() == true, nil + }) + if err != nil { + return nil, fmt.Errorf("failed to sync cache: %v", err) + } + // Transform the slice into a map so it will // be way much easier and fast to filter later serviceTypes := make(map[string]struct{}) @@ -89,24 +138,27 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt ignoreHostnameAnnotation: ignoreHostnameAnnotation, publishInternal: publishInternal, publishHostIP: publishHostIP, + serviceInformer: serviceInformer, + podInformer: podInformer, + nodeInformer: nodeInformer, serviceTypeFilter: serviceTypes, }, nil } // Endpoints returns endpoint objects for each service that should be processed. func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) { - services, err := sc.client.CoreV1().Services(sc.namespace).List(metav1.ListOptions{}) + services, err := sc.serviceInformer.Lister().Services(sc.namespace).List(labels.Everything()) if err != nil { return nil, err } - services.Items, err = sc.filterByAnnotations(services.Items) + services, err = sc.filterByAnnotations(services) if err != nil { return nil, err } // filter on service types if at least one has been provided if len(sc.serviceTypeFilter) > 0 { - services.Items = sc.filterByServiceType(services.Items) + services = sc.filterByServiceType(services) } // get the ip addresses of all the nodes and cache them for this run @@ -117,7 +169,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) { endpoints := []*endpoint.Endpoint{} - for _, svc := range services.Items { + for _, svc := range services { // Check controller annotation to see if we are responsible. controller, ok := svc.Annotations[controllerAnnotationKey] if ok && controller != controllerAnnotationValue { @@ -126,16 +178,16 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) { continue } - svcEndpoints := sc.endpoints(&svc, nodeTargets) + svcEndpoints := sc.endpoints(svc, nodeTargets) // process legacy annotations if no endpoints were returned and compatibility mode is enabled. if len(svcEndpoints) == 0 && sc.compatibility != "" { - svcEndpoints = legacyEndpointsFromService(&svc, sc.compatibility) + svcEndpoints = legacyEndpointsFromService(svc, sc.compatibility) } // apply template if none of the above is found if (sc.combineFQDNAnnotation || len(svcEndpoints) == 0) && sc.fqdnTemplate != nil { - sEndpoints, err := sc.endpointsFromTemplate(&svc, nodeTargets) + sEndpoints, err := sc.endpointsFromTemplate(svc, nodeTargets) if err != nil { return nil, err } @@ -167,14 +219,23 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) { func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint { var endpoints []*endpoint.Endpoint - pods, err := sc.client.CoreV1().Pods(svc.Namespace).List(metav1.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String()}) + labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String()) + if err != nil { + return nil + } + selector, err := metav1.LabelSelectorAsSelector(labelSelector) + if err != nil { + return nil + } + + pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector) if err != nil { log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err) return endpoints } targetsByHeadlessDomain := make(map[string][]string) - for _, v := range pods.Items { + for _, v := range pods { headlessDomain := hostname if v.Spec.Hostname != "" { headlessDomain = v.Spec.Hostname + "." + headlessDomain @@ -251,7 +312,7 @@ func (sc *serviceSource) endpoints(svc *v1.Service, nodeTargets endpoint.Targets } // filterByAnnotations filters a list of services by a given annotation selector. -func (sc *serviceSource) filterByAnnotations(services []v1.Service) ([]v1.Service, error) { +func (sc *serviceSource) filterByAnnotations(services []*v1.Service) ([]*v1.Service, error) { labelSelector, err := metav1.ParseToLabelSelector(sc.annotationFilter) if err != nil { return nil, err @@ -266,7 +327,7 @@ func (sc *serviceSource) filterByAnnotations(services []v1.Service) ([]v1.Servic return services, nil } - filteredList := []v1.Service{} + filteredList := []*v1.Service{} for _, service := range services { // convert the service's annotations to an equivalent label selector @@ -282,8 +343,8 @@ func (sc *serviceSource) filterByAnnotations(services []v1.Service) ([]v1.Servic } // filterByServiceType filters services according their types -func (sc *serviceSource) filterByServiceType(services []v1.Service) []v1.Service { - filteredList := []v1.Service{} +func (sc *serviceSource) filterByServiceType(services []*v1.Service) []*v1.Service { + filteredList := []*v1.Service{} for _, service := range services { // Check if the service is of the given type or not if _, ok := sc.serviceTypeFilter[string(service.Spec.Type)]; ok { @@ -294,7 +355,7 @@ func (sc *serviceSource) filterByServiceType(services []v1.Service) []v1.Service return filteredList } -func (sc *serviceSource) setResourceLabel(service v1.Service, endpoints []*endpoint.Endpoint) { +func (sc *serviceSource) setResourceLabel(service *v1.Service, endpoints []*endpoint.Endpoint) { for _, ep := range endpoints { ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("service/%s/%s", service.Namespace, service.Name) } @@ -392,7 +453,7 @@ func (sc *serviceSource) extractNodeTargets() (endpoint.Targets, error) { externalIPs endpoint.Targets ) - nodes, err := sc.client.CoreV1().Nodes().List(metav1.ListOptions{}) + nodes, err := sc.nodeInformer.Lister().List(labels.Everything()) if err != nil { if errors.IsForbidden(err) { // Return an empty list because it makes sense to continue and try other sources. @@ -402,7 +463,7 @@ func (sc *serviceSource) extractNodeTargets() (endpoint.Targets, error) { return nil, err } - for _, node := range nodes.Items { + for _, node := range nodes { for _, address := range node.Status.Addresses { switch address.Type { case v1.NodeExternalIP: diff --git a/source/service_test.go b/source/service_test.go index acd6a2b96d..6c8b1fc8fc 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -17,8 +17,10 @@ limitations under the License. package source import ( + "k8s.io/apimachinery/pkg/util/wait" "net" "testing" + "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -1087,7 +1089,18 @@ func testServiceSourceEndpoints(t *testing.T) { ) require.NoError(t, err) - endpoints, err := client.Endpoints() + var res []*endpoint.Endpoint + + // wait up to a few seconds for new resources to appear in informer cache. + err = wait.Poll(time.Second, 3*time.Second, func() (bool, error) { + res, err = client.Endpoints() + if err != nil { + // stop waiting if we get an error + return true, err + } + return len(res) >= len(tc.expected), nil + }) + if tc.expectError { require.Error(t, err) } else { @@ -1095,7 +1108,7 @@ func testServiceSourceEndpoints(t *testing.T) { } // Validate returned endpoints against desired endpoints. - validateEndpoints(t, endpoints, tc.expected) + validateEndpoints(t, res, tc.expected) }) } } diff --git a/source/shared_test.go b/source/shared_test.go index aedd47106f..fe27e626ed 100644 --- a/source/shared_test.go +++ b/source/shared_test.go @@ -17,9 +17,10 @@ limitations under the License. package source import ( - "testing" - "github.com/kubernetes-incubator/external-dns/endpoint" + "sort" + "strings" + "testing" ) // test helper functions @@ -28,6 +29,13 @@ func validateEndpoints(t *testing.T, endpoints, expected []*endpoint.Endpoint) { if len(endpoints) != len(expected) { t.Fatalf("expected %d endpoints, got %d", len(expected), len(endpoints)) } + // Make sure endpoints are sorted - validateEndpoint() depends on it. + sort.SliceStable(endpoints, func(i, j int) bool { + return strings.Compare(endpoints[i].DNSName, endpoints[j].DNSName) < 0 + }) + sort.SliceStable(expected, func(i, j int) bool { + return strings.Compare(expected[i].DNSName, expected[j].DNSName) < 0 + }) for i := range endpoints { validateEndpoint(t, endpoints[i], expected[i])