Skip to content

Commit

Permalink
Use k8s informer cache instead of active API server calls in ingress …
Browse files Browse the repository at this point in the history
…and service sources.
  • Loading branch information
jlamillan committed Feb 26, 2019
1 parent 3301c96 commit 73d34db
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 34 deletions.
56 changes: 43 additions & 13 deletions source/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ import (
"strings"
"text/template"

"github.com/kubernetes-incubator/external-dns/endpoint"
log "github.com/sirupsen/logrus"

"k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
extinformers "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"

"github.com/kubernetes-incubator/external-dns/endpoint"
"k8s.io/client-go/tools/cache"
"time"
)

// ingressSource is an implementation of Source for Kubernetes ingress objects.
Expand All @@ -44,6 +47,7 @@ type ingressSource struct {
fqdnTemplate *template.Template
combineFQDNAnnotation bool
ignoreHostnameAnnotation bool
ingressInformer extinformers.IngressInformer
}

// NewIngressSource creates a new ingressSource with the given config.
Expand All @@ -61,31 +65,57 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt
}
}

return &ingressSource{
// Use shared informer to listen for add/update/delete of ingresses in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed.
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
ingressInformer := informerFactory.Extensions().V1beta1().Ingresses()

// Add default resource event handlers to properly initialize informer.
ingressInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)

// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)

// wait for the local cache to be populated.
err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
return ingressInformer.Informer().HasSynced() == true, nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
}

sc := &ingressSource{
client: kubeClient,
namespace: namespace,
annotationFilter: annotationFilter,
fqdnTemplate: tmpl,
combineFQDNAnnotation: combineFqdnAnnotation,
ignoreHostnameAnnotation: ignoreHostnameAnnotation,
}, nil
ingressInformer: ingressInformer,
}
return sc, nil
}

// Endpoints returns endpoint objects for each host-target combination that should be processed.
// Retrieves all ingress resources on all namespaces
func (sc *ingressSource) Endpoints() ([]*endpoint.Endpoint, error) {
ingresses, err := sc.client.Extensions().Ingresses(sc.namespace).List(metav1.ListOptions{})
ingresses, err := sc.ingressInformer.Lister().Ingresses(sc.namespace).List(labels.Everything())
if err != nil {
return nil, err
}
ingresses.Items, err = sc.filterByAnnotations(ingresses.Items)
ingresses, err = sc.filterByAnnotations(ingresses)
if err != nil {
return nil, err
}

endpoints := []*endpoint.Endpoint{}

for _, ing := range ingresses.Items {
for _, ing := range ingresses {
// Check controller annotation to see if we are responsible.
controller, ok := ing.Annotations[controllerAnnotationKey]
if ok && controller != controllerAnnotationValue {
Expand All @@ -94,11 +124,11 @@ func (sc *ingressSource) Endpoints() ([]*endpoint.Endpoint, error) {
continue
}

ingEndpoints := endpointsFromIngress(&ing, sc.ignoreHostnameAnnotation)
ingEndpoints := endpointsFromIngress(ing, sc.ignoreHostnameAnnotation)

// apply template if host is missing on ingress
if (sc.combineFQDNAnnotation || len(ingEndpoints) == 0) && sc.fqdnTemplate != nil {
iEndpoints, err := sc.endpointsFromTemplate(&ing)
iEndpoints, err := sc.endpointsFromTemplate(ing)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -161,7 +191,7 @@ func (sc *ingressSource) endpointsFromTemplate(ing *v1beta1.Ingress) ([]*endpoin
}

// filterByAnnotations filters a list of ingresses by a given annotation selector.
func (sc *ingressSource) filterByAnnotations(ingresses []v1beta1.Ingress) ([]v1beta1.Ingress, error) {
func (sc *ingressSource) filterByAnnotations(ingresses []*v1beta1.Ingress) ([]*v1beta1.Ingress, error) {
labelSelector, err := metav1.ParseToLabelSelector(sc.annotationFilter)
if err != nil {
return nil, err
Expand All @@ -176,7 +206,7 @@ func (sc *ingressSource) filterByAnnotations(ingresses []v1beta1.Ingress) ([]v1b
return ingresses, nil
}

filteredList := []v1beta1.Ingress{}
filteredList := []*v1beta1.Ingress{}

for _, ingress := range ingresses {
// convert the ingress' annotations to an equivalent label selector
Expand All @@ -191,7 +221,7 @@ func (sc *ingressSource) filterByAnnotations(ingresses []v1beta1.Ingress) ([]v1b
return filteredList, nil
}

func (sc *ingressSource) setResourceLabel(ingress v1beta1.Ingress, endpoints []*endpoint.Endpoint) {
func (sc *ingressSource) setResourceLabel(ingress *v1beta1.Ingress, endpoints []*endpoint.Endpoint) {
for _, ep := range endpoints {
ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("ingress/%s/%s", ingress.Namespace, ingress.Name)
}
Expand Down
16 changes: 15 additions & 1 deletion source/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package source

import (
"testing"
"time"

"k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"

"github.com/kubernetes-incubator/external-dns/endpoint"
Expand Down Expand Up @@ -990,7 +992,19 @@ func testIngressEndpoints(t *testing.T) {
require.NoError(t, err)
}

res, err := ingressSource.Endpoints()
var res []*endpoint.Endpoint
var err error

// wait up to a few seconds for new resources to appear in informer cache.
err = wait.Poll(time.Second, 3*time.Second, func() (bool, error) {
res, err = ingressSource.Endpoints()
if err != nil {
// stop waiting if we get an error
return true, err
}
return len(res) >= len(ti.expected), nil
})

if ti.expectError {
assert.Error(t, err)
} else {
Expand Down
93 changes: 77 additions & 16 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package source
import (
"bytes"
"fmt"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"sort"
"strings"
"text/template"
Expand All @@ -29,9 +32,11 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/kubernetes-incubator/external-dns/endpoint"
"time"
)

const (
Expand All @@ -54,6 +59,9 @@ type serviceSource struct {
ignoreHostnameAnnotation bool
publishInternal bool
publishHostIP bool
serviceInformer coreinformers.ServiceInformer
podInformer coreinformers.PodInformer
nodeInformer coreinformers.NodeInformer
serviceTypeFilter map[string]struct{}
}

Expand All @@ -72,6 +80,47 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
}
}

// Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
podInformer := informerFactory.Core().V1().Pods()
nodeInformer := informerFactory.Core().V1().Nodes()

// Add default resource event handlers to properly initialize informer.
serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("service added")
},
},
)
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("pod added")
},
},
)
nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("node added")
},
},
)

// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)

// wait for the local cache to be populated.
err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
return serviceInformer.Informer().HasSynced() == true, nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
}

// Transform the slice into a map so it will
// be way much easier and fast to filter later
serviceTypes := make(map[string]struct{})
Expand All @@ -89,24 +138,27 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
ignoreHostnameAnnotation: ignoreHostnameAnnotation,
publishInternal: publishInternal,
publishHostIP: publishHostIP,
serviceInformer: serviceInformer,
podInformer: podInformer,
nodeInformer: nodeInformer,
serviceTypeFilter: serviceTypes,
}, nil
}

// Endpoints returns endpoint objects for each service that should be processed.
func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
services, err := sc.client.CoreV1().Services(sc.namespace).List(metav1.ListOptions{})
services, err := sc.serviceInformer.Lister().Services(sc.namespace).List(labels.Everything())
if err != nil {
return nil, err
}
services.Items, err = sc.filterByAnnotations(services.Items)
services, err = sc.filterByAnnotations(services)
if err != nil {
return nil, err
}

// filter on service types if at least one has been provided
if len(sc.serviceTypeFilter) > 0 {
services.Items = sc.filterByServiceType(services.Items)
services = sc.filterByServiceType(services)
}

// get the ip addresses of all the nodes and cache them for this run
Expand All @@ -117,7 +169,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {

endpoints := []*endpoint.Endpoint{}

for _, svc := range services.Items {
for _, svc := range services {
// Check controller annotation to see if we are responsible.
controller, ok := svc.Annotations[controllerAnnotationKey]
if ok && controller != controllerAnnotationValue {
Expand All @@ -126,16 +178,16 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
continue
}

svcEndpoints := sc.endpoints(&svc, nodeTargets)
svcEndpoints := sc.endpoints(svc, nodeTargets)

// process legacy annotations if no endpoints were returned and compatibility mode is enabled.
if len(svcEndpoints) == 0 && sc.compatibility != "" {
svcEndpoints = legacyEndpointsFromService(&svc, sc.compatibility)
svcEndpoints = legacyEndpointsFromService(svc, sc.compatibility)
}

// apply template if none of the above is found
if (sc.combineFQDNAnnotation || len(svcEndpoints) == 0) && sc.fqdnTemplate != nil {
sEndpoints, err := sc.endpointsFromTemplate(&svc, nodeTargets)
sEndpoints, err := sc.endpointsFromTemplate(svc, nodeTargets)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,14 +219,23 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint

pods, err := sc.client.CoreV1().Pods(svc.Namespace).List(metav1.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String()})
labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
if err != nil {
return nil
}
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil
}

pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
if err != nil {
log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err)
return endpoints
}

targetsByHeadlessDomain := make(map[string][]string)
for _, v := range pods.Items {
for _, v := range pods {
headlessDomain := hostname
if v.Spec.Hostname != "" {
headlessDomain = v.Spec.Hostname + "." + headlessDomain
Expand Down Expand Up @@ -251,7 +312,7 @@ func (sc *serviceSource) endpoints(svc *v1.Service, nodeTargets endpoint.Targets
}

// filterByAnnotations filters a list of services by a given annotation selector.
func (sc *serviceSource) filterByAnnotations(services []v1.Service) ([]v1.Service, error) {
func (sc *serviceSource) filterByAnnotations(services []*v1.Service) ([]*v1.Service, error) {
labelSelector, err := metav1.ParseToLabelSelector(sc.annotationFilter)
if err != nil {
return nil, err
Expand All @@ -266,7 +327,7 @@ func (sc *serviceSource) filterByAnnotations(services []v1.Service) ([]v1.Servic
return services, nil
}

filteredList := []v1.Service{}
filteredList := []*v1.Service{}

for _, service := range services {
// convert the service's annotations to an equivalent label selector
Expand All @@ -282,8 +343,8 @@ func (sc *serviceSource) filterByAnnotations(services []v1.Service) ([]v1.Servic
}

// filterByServiceType filters services according their types
func (sc *serviceSource) filterByServiceType(services []v1.Service) []v1.Service {
filteredList := []v1.Service{}
func (sc *serviceSource) filterByServiceType(services []*v1.Service) []*v1.Service {
filteredList := []*v1.Service{}
for _, service := range services {
// Check if the service is of the given type or not
if _, ok := sc.serviceTypeFilter[string(service.Spec.Type)]; ok {
Expand All @@ -294,7 +355,7 @@ func (sc *serviceSource) filterByServiceType(services []v1.Service) []v1.Service
return filteredList
}

func (sc *serviceSource) setResourceLabel(service v1.Service, endpoints []*endpoint.Endpoint) {
func (sc *serviceSource) setResourceLabel(service *v1.Service, endpoints []*endpoint.Endpoint) {
for _, ep := range endpoints {
ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("service/%s/%s", service.Namespace, service.Name)
}
Expand Down Expand Up @@ -392,7 +453,7 @@ func (sc *serviceSource) extractNodeTargets() (endpoint.Targets, error) {
externalIPs endpoint.Targets
)

nodes, err := sc.client.CoreV1().Nodes().List(metav1.ListOptions{})
nodes, err := sc.nodeInformer.Lister().List(labels.Everything())
if err != nil {
if errors.IsForbidden(err) {
// Return an empty list because it makes sense to continue and try other sources.
Expand All @@ -402,7 +463,7 @@ func (sc *serviceSource) extractNodeTargets() (endpoint.Targets, error) {
return nil, err
}

for _, node := range nodes.Items {
for _, node := range nodes {
for _, address := range node.Status.Addresses {
switch address.Type {
case v1.NodeExternalIP:
Expand Down
Loading

0 comments on commit 73d34db

Please sign in to comment.