Skip to content

Commit

Permalink
Add support for multiple controller instances (#160)
Browse files Browse the repository at this point in the history
Signed-off-by: Jonathan Lima <[email protected]>
  • Loading branch information
greenboxal authored and hjacobs committed May 24, 2018
1 parent e60791e commit ce3fe3e
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 29 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,21 @@ built from this codebase. You can deploy it with 2 easy steps:
If you use [Kops](https://github.com/kubernetes/kops) to create your
cluster, please use our [deployment guide for Kops](deploy/kops.md)

## Running multiple instances

In some cases it might be useful to run multiple instances of this controller:

* Isolating internal vs external traffic
* Using a different set of traffic processing nodes
* Using different frontend routers (e.g.: Skipper and Traefik)

You can use the flag `-operator-id` to set a token that will be used to isolate resources between controller instances.
This value will be used to tag those resources.

If you don't pass an ID, the default `kube-ingress-aws-controller` will be used.

Usually you would want to combine this flag with `ingress-class-filter` so different types of ingresses are associated with the different controllers.

## Trying it out

The Ingress Controller's responsibility is limited to managing load balancers, as described above. To have a fully
Expand Down
14 changes: 13 additions & 1 deletion aws/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Adapter struct {
singleInstances map[string]*instanceDetails
obsoleteInstances []string
stackTerminationProtection bool
controllerID string
}

type manifest struct {
Expand All @@ -71,6 +72,7 @@ const (
DefaultCreationTimeout = 5 * time.Minute
DefaultStackTTL = 5 * time.Minute
DefaultIdleConnectionTimeout = 1 * time.Minute
DefaultControllerID = "kube-ingress-aws-controller"

nameTag = "Name"

Expand Down Expand Up @@ -133,6 +135,7 @@ func NewAdapter() (adapter *Adapter, err error) {
ec2Details: make(map[string]*instanceDetails),
singleInstances: make(map[string]*instanceDetails),
obsoleteInstances: make([]string, 0),
controllerID: DefaultControllerID,
}

adapter.manifest, err = buildManifest(adapter)
Expand Down Expand Up @@ -198,6 +201,13 @@ func (a *Adapter) WithCustomTemplate(template string) *Adapter {
return a
}

// WithControllerID returns the receiver adapter after changing the CloudFormation template that should be used
// to create Load Balancer stacks
func (a *Adapter) WithControllerID(id string) *Adapter {
a.controllerID = id
return a
}

// WithStackTerminationProtection returns the receiver adapter after changing
// the stack termination protection value.
func (a *Adapter) WithStackTerminationProtection(terminationProtection bool) *Adapter {
Expand Down Expand Up @@ -283,7 +293,7 @@ func (a *Adapter) SecurityGroupID() string {
// FindManagedStacks returns all CloudFormation stacks containing the controller management tags
// that match the current cluster and are ready to be used. The stack status is used to filter.
func (a *Adapter) FindManagedStacks() ([]*Stack, error) {
stacks, err := findManagedStacks(a.cloudformation, a.ClusterID())
stacks, err := findManagedStacks(a.cloudformation, a.ClusterID(), a.controllerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -356,6 +366,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, owner string) (s
timeoutInMinutes: uint(a.creationTimeout.Minutes()),
stackTerminationProtection: a.stackTerminationProtection,
idleConnectionTimeoutSeconds: uint(a.idleConnectionTimeout.Seconds()),
controllerID: a.controllerID,
}

return createStack(a.cloudformation, spec)
Expand All @@ -378,6 +389,7 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.
timeoutInMinutes: uint(a.creationTimeout.Minutes()),
stackTerminationProtection: a.stackTerminationProtection,
idleConnectionTimeoutSeconds: uint(a.idleConnectionTimeout.Seconds()),
controllerID: a.controllerID,
}

return updateStack(a.cloudformation, spec)
Expand Down
15 changes: 9 additions & 6 deletions aws/cf.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type stackSpec struct {
customTemplate string
stackTerminationProtection bool
idleConnectionTimeoutSeconds uint
controllerID string
}

type healthCheck struct {
Expand All @@ -170,7 +171,7 @@ func createStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (st
cfParam(parameterTargetGroupVPCIDParameter, spec.vpcID),
},
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, spec.controllerID),
cfTag(clusterIDTagPrefix+spec.clusterID, resourceLifecycleOwned),
},
TemplateBody: aws.String(template),
Expand Down Expand Up @@ -217,7 +218,7 @@ func updateStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (st
cfParam(parameterTargetGroupVPCIDParameter, spec.vpcID),
},
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, spec.controllerID),
cfTag(clusterIDTagPrefix+spec.clusterID, resourceLifecycleOwned),
},
TemplateBody: aws.String(template),
Expand Down Expand Up @@ -361,12 +362,12 @@ func mapToManagedStack(stack *cloudformation.Stack) *Stack {
}
}

func findManagedStacks(svc cloudformationiface.CloudFormationAPI, clusterID string) ([]*Stack, error) {
func findManagedStacks(svc cloudformationiface.CloudFormationAPI, clusterID, controllerID string) ([]*Stack, error) {
stacks := make([]*Stack, 0)
err := svc.DescribeStacksPages(&cloudformation.DescribeStacksInput{},
func(page *cloudformation.DescribeStacksOutput, lastPage bool) bool {
for _, s := range page.Stacks {
if isManagedStack(s.Tags, clusterID) {
if isManagedStack(s.Tags, clusterID, controllerID) {
stacks = append(stacks, mapToManagedStack(s))
}
}
Expand All @@ -378,11 +379,13 @@ func findManagedStacks(svc cloudformationiface.CloudFormationAPI, clusterID stri
return stacks, nil
}

func isManagedStack(cfTags []*cloudformation.Tag, clusterID string) bool {
func isManagedStack(cfTags []*cloudformation.Tag, clusterID string, controllerID string) bool {
tags := convertCloudFormationTags(cfTags)
if tags[kubernetesCreatorTag] != kubernetesCreatorValue {

if tags[kubernetesCreatorTag] != controllerID {
return false
}

// TODO(sszuecs): remove 2nd condition, only for migration
return tags[clusterIDTagPrefix+clusterID] == resourceLifecycleOwned || tags[clusterIDTag] == clusterID
}
Expand Down
43 changes: 24 additions & 19 deletions aws/cf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ func TestManagementAssertion(t *testing.T) {
want bool
}{
{"managed", []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned),
cfTag("foo", "bar"),
}, true},
{"missing-cluster-tag", []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
}, false},
{"missing-kube-mgmt-tag", []*cloudformation.Tag{
cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned),
Expand All @@ -151,13 +151,18 @@ func TestManagementAssertion(t *testing.T) {
cfTag("foo", "bar"),
}, false},
{"mismatch-cluster-tag", []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
cfTag(clusterIDTagPrefix+"other-cluster", resourceLifecycleOwned),
cfTag("foo", "bar"),
}, false},
{"mismatch-controller-id", []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, "the-other-one"),
cfTag(clusterIDTagPrefix+"other-cluster", resourceLifecycleOwned),
cfTag("foo", "bar"),
}, false},
} {
t.Run(ti.name, func(t *testing.T) {
got := isManagedStack(ti.given, "test-cluster")
got := isManagedStack(ti.given, "test-cluster", DefaultControllerID)
if ti.want != got {
t.Errorf("unexpected result. wanted %+v, got %+v", ti.want, got)
}
Expand Down Expand Up @@ -227,7 +232,7 @@ func TestFindManagedStacks(t *testing.T) {
StackName: aws.String("managed-stack-not-ready"),
StackStatus: aws.String(cloudformation.StackStatusUpdateInProgress),
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned),
cfTag(certificateARNTagPrefix+"cert-arn", time.Time{}.Format(time.RFC3339)),
},
Expand All @@ -240,7 +245,7 @@ func TestFindManagedStacks(t *testing.T) {
StackName: aws.String("managed-stack"),
StackStatus: aws.String(cloudformation.StackStatusCreateComplete),
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned),
cfTag(certificateARNTagPrefix+"cert-arn", time.Time{}.Format(time.RFC3339)),
},
Expand All @@ -253,7 +258,7 @@ func TestFindManagedStacks(t *testing.T) {
StackName: aws.String("managed-stack-not-ready"),
StackStatus: aws.String(cloudformation.StackStatusUpdateInProgress),
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned),
},
},
Expand All @@ -266,13 +271,13 @@ func TestFindManagedStacks(t *testing.T) {
{
StackName: aws.String("another-unmanaged-stack"),
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
},
},
{
StackName: aws.String("belongs-to-other-cluster"),
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
cfTag(clusterIDTagPrefix+"other-cluster", resourceLifecycleOwned),
},
},
Expand All @@ -288,7 +293,7 @@ func TestFindManagedStacks(t *testing.T) {
},
targetGroupARN: "tg-arn",
tags: map[string]string{
kubernetesCreatorTag: kubernetesCreatorValue,
kubernetesCreatorTag: DefaultControllerID,
clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned,
certificateARNTagPrefix + "cert-arn": time.Time{}.Format(time.RFC3339),
},
Expand All @@ -302,7 +307,7 @@ func TestFindManagedStacks(t *testing.T) {
},
targetGroupARN: "tg-arn",
tags: map[string]string{
kubernetesCreatorTag: kubernetesCreatorValue,
kubernetesCreatorTag: DefaultControllerID,
clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned,
certificateARNTagPrefix + "cert-arn": time.Time{}.Format(time.RFC3339),
},
Expand All @@ -312,7 +317,7 @@ func TestFindManagedStacks(t *testing.T) {
name: "managed-stack-not-ready",
certificateARNs: map[string]time.Time{},
tags: map[string]string{
kubernetesCreatorTag: kubernetesCreatorValue,
kubernetesCreatorTag: DefaultControllerID,
clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned,
},
status: cloudformation.StackStatusUpdateInProgress,
Expand All @@ -330,7 +335,7 @@ func TestFindManagedStacks(t *testing.T) {
StackName: aws.String("managed-stack-not-ready"),
StackStatus: aws.String(cloudformation.StackStatusReviewInProgress),
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned),
},
Outputs: []*cloudformation.Output{
Expand All @@ -342,7 +347,7 @@ func TestFindManagedStacks(t *testing.T) {
StackName: aws.String("managed-stack"),
StackStatus: aws.String(cloudformation.StackStatusRollbackComplete),
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned),
},
Outputs: []*cloudformation.Output{
Expand All @@ -360,7 +365,7 @@ func TestFindManagedStacks(t *testing.T) {
targetGroupARN: "tg-arn",
certificateARNs: map[string]time.Time{},
tags: map[string]string{
kubernetesCreatorTag: kubernetesCreatorValue,
kubernetesCreatorTag: DefaultControllerID,
clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned,
},
status: cloudformation.StackStatusReviewInProgress,
Expand All @@ -371,7 +376,7 @@ func TestFindManagedStacks(t *testing.T) {
targetGroupARN: "tg-arn",
certificateARNs: map[string]time.Time{},
tags: map[string]string{
kubernetesCreatorTag: kubernetesCreatorValue,
kubernetesCreatorTag: DefaultControllerID,
clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned,
},
status: cloudformation.StackStatusRollbackComplete,
Expand Down Expand Up @@ -399,7 +404,7 @@ func TestFindManagedStacks(t *testing.T) {
} {
t.Run(ti.name, func(t *testing.T) {
c := &mockCloudFormationClient{outputs: ti.given}
got, err := findManagedStacks(c, "test-cluster")
got, err := findManagedStacks(c, "test-cluster", DefaultControllerID)
if err != nil {
if !ti.wantErr {
t.Error("unexpected error", err)
Expand Down Expand Up @@ -430,7 +435,7 @@ func TestGetStack(t *testing.T) {
StackName: aws.String("managed-stack"),
StackStatus: aws.String(cloudformation.StackStatusCreateComplete),
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(kubernetesCreatorTag, DefaultControllerID),
cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned),
cfTag(certificateARNTagPrefix+"cert-arn", time.Time{}.Format(time.RFC3339)),
},
Expand All @@ -450,7 +455,7 @@ func TestGetStack(t *testing.T) {
},
targetGroupARN: "tg-arn",
tags: map[string]string{
kubernetesCreatorTag: kubernetesCreatorValue,
kubernetesCreatorTag: DefaultControllerID,
clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned,
certificateARNTagPrefix + "cert-arn": time.Time{}.Format(time.RFC3339),
},
Expand Down
3 changes: 1 addition & 2 deletions aws/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ const (
clusterIDTagPrefix = "kubernetes.io/cluster/"
resourceLifecycleOwned = "owned"
kubernetesCreatorTag = "kubernetes:application"
kubernetesCreatorValue = "kube-ingress-aws-controller"
autoScalingGroupNameTag = "aws:autoscaling:groupName"
runningState = 16 // See https://github.com/aws/aws-sdk-go/blob/master/service/ec2/api.go, type InstanceState
stoppedState = 80 // See https://github.com/aws/aws-sdk-go/blob/master/service/ec2/api.go, type InstanceState
Expand Down Expand Up @@ -288,7 +287,7 @@ func findSecurityGroupWithClusterID(svc ec2iface.EC2API, clusterID string) (*sec
{
Name: aws.String("tag-value"),
Values: []*string{
aws.String(kubernetesCreatorValue),
aws.String(DefaultControllerID),
},
},
},
Expand Down
5 changes: 4 additions & 1 deletion controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
stackTerminationProtection bool
idleConnectionTimeout time.Duration
ingressClassFilters string
controllerID string
)

func loadSettings() error {
Expand Down Expand Up @@ -76,6 +77,7 @@ func loadSettings() error {
"sets the idle connection timeout of all ALBs. The flag accepts a value acceptable to time.ParseDuration and are between 1s and 4000s.")
flag.StringVar(&metricsAddress, "metrics-address", ":7979", "defines where to serve metrics")
flag.StringVar(&ingressClassFilters, "ingress-class-filter", "", "optional comma-seperated list of kubernetes.io/ingress.class annotation values to filter behaviour on. ")
flag.StringVar(&controllerID, "controller-id", aws.DefaultControllerID, "controller ID used to differentiate resources from multiple aws ingress controller instances")

flag.Parse()

Expand Down Expand Up @@ -163,7 +165,8 @@ func main() {
WithCreationTimeout(creationTimeout).
WithCustomTemplate(cfCustomTemplate).
WithStackTerminationProtection(stackTerminationProtection).
WithIdleConnectionTimeout(idleConnectionTimeout)
WithIdleConnectionTimeout(idleConnectionTimeout).
WithControllerID(controllerID)

certificatesProvider, err := certs.NewCachingProvider(
certPollingInterval,
Expand Down

0 comments on commit ce3fe3e

Please sign in to comment.