diff --git a/aws/adapter.go b/aws/adapter.go index fad42ceb..dff5fd95 100644 --- a/aws/adapter.go +++ b/aws/adapter.go @@ -234,7 +234,7 @@ func (a *Adapter) FindManagedStacks() ([]*Stack, error) { // All the required resources (listeners and target group) are created in a transactional fashion. // Failure to create the stack causes it to be deleted automatically. func (a *Adapter) CreateStack(certificateARN string) (string, error) { - spec := &createStackSpec{ + spec := &stackSpec{ name: a.stackName(certificateARN), scheme: elbv2.LoadBalancerSchemeEnumInternetFacing, certificateARN: certificateARN, @@ -253,6 +253,26 @@ func (a *Adapter) CreateStack(certificateARN string) (string, error) { return createStack(a.cloudformation, spec) } +func (a *Adapter) UpdateStack(certificateARN string) (string, error) { + spec := &stackSpec{ + name: a.stackName(certificateARN), + scheme: elbv2.LoadBalancerSchemeEnumInternetFacing, + certificateARN: certificateARN, + securityGroupID: a.SecurityGroupID(), + subnets: a.PublicSubnetIDs(), + vpcID: a.VpcID(), + clusterID: a.ClusterID(), + healthCheck: &healthCheck{ + path: a.healthCheckPath, + port: a.healthCheckPort, + interval: a.healthCheckInterval, + }, + timeoutInMinutes: uint(a.creationTimeout.Minutes()), + } + + return updateStack(a.cloudformation, spec) +} + func (a *Adapter) stackName(certificateARN string) string { return normalizeStackName(a.ClusterID(), certificateARN) } diff --git a/aws/cf.go b/aws/cf.go index 2b3d6847..08e7da38 100644 --- a/aws/cf.go +++ b/aws/cf.go @@ -116,7 +116,7 @@ const ( parameterListenerCertificateParameter = "ListenerCertificateParameter" ) -type createStackSpec struct { +type stackSpec struct { name string scheme string subnets []string @@ -135,7 +135,7 @@ type healthCheck struct { interval time.Duration } -func createStack(svc cloudformationiface.CloudFormationAPI, spec *createStackSpec) (string, error) { +func createStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (string, error) { template := templateYAML if spec.customTemplate != "" { template = spec.customTemplate @@ -175,6 +175,44 @@ func createStack(svc cloudformationiface.CloudFormationAPI, spec *createStackSpe return aws.StringValue(resp.StackId), nil } +func updateStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (string, error) { + template := templateYAML + if spec.customTemplate != "" { + template = spec.customTemplate + } + params := &cloudformation.UpdateStackInput{ + StackName: aws.String(spec.name), + Parameters: []*cloudformation.Parameter{ + cfParam(parameterLoadBalancerSchemeParameter, spec.scheme), + cfParam(parameterLoadBalancerSecurityGroupParameter, spec.securityGroupID), + cfParam(parameterLoadBalancerSubnetsParameter, strings.Join(spec.subnets, ",")), + cfParam(parameterTargetGroupVPCIDParameter, spec.vpcID), + cfParam(parameterListenerCertificateParameter, spec.certificateARN), + }, + Tags: []*cloudformation.Tag{ + cfTag(kubernetesCreatorTag, kubernetesCreatorValue), + cfTag(clusterIDTag, spec.clusterID), + }, + TemplateBody: aws.String(template), + } + if spec.certificateARN != "" { + params.Tags = append(params.Tags, cfTag(certificateARNTag, spec.certificateARN)) + } + if spec.healthCheck != nil { + params.Parameters = append(params.Parameters, + cfParam(parameterTargetGroupHealthCheckPathParameter, spec.healthCheck.path), + cfParam(parameterTargetGroupHealthCheckPortParameter, fmt.Sprintf("%d", spec.healthCheck.port)), + cfParam(parameterTargetGroupHealthCheckIntervalParameter, fmt.Sprintf("%.0f", spec.healthCheck.interval.Seconds())), + ) + } + resp, err := svc.UpdateStack(params) + if err != nil { + return spec.name, err + } + + return aws.StringValue(resp.StackId), nil +} + func cfParam(key, value string) *cloudformation.Parameter { return &cloudformation.Parameter{ ParameterKey: aws.String(key), diff --git a/aws/cf_test.go b/aws/cf_test.go index 24353c64..40d1a177 100644 --- a/aws/cf_test.go +++ b/aws/cf_test.go @@ -12,28 +12,28 @@ import ( func TestCreatingStack(t *testing.T) { for _, ti := range []struct { name string - givenSpec createStackSpec + givenSpec stackSpec givenOutputs cfMockOutputs want string wantErr bool }{ { "successful-call", - createStackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"}, + stackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"}, cfMockOutputs{createStack: R(mockCSOutput("fake-stack-id"), nil)}, "fake-stack-id", false, }, { "successful-call", - createStackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"}, + stackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"}, cfMockOutputs{createStack: R(mockCSOutput("fake-stack-id"), nil)}, "fake-stack-id", false, }, { "fail-call", - createStackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"}, + stackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"}, cfMockOutputs{createStack: R(nil, dummyErr)}, "fake-stack-id", true, @@ -58,19 +58,19 @@ func TestCreatingStack(t *testing.T) { func TestDeleteStack(t *testing.T) { for _, ti := range []struct { msg string - givenSpec createStackSpec + givenSpec stackSpec givenOutputs cfMockOutputs wantErr bool }{ { "delete-existing-stack", - createStackSpec{name: "existing-stack-id"}, + stackSpec{name: "existing-stack-id"}, cfMockOutputs{deleteStack: R(mockDeleteStackOutput("existing-stack-id"), nil)}, false, }, { "delete-non-existing-stack", - createStackSpec{name: "non-existing-stack-id"}, + stackSpec{name: "non-existing-stack-id"}, cfMockOutputs{deleteStack: R(mockDeleteStackOutput("existing-stack-id"), nil)}, false, }, diff --git a/controller.go b/controller.go index 527f847f..4d36c3a4 100644 --- a/controller.go +++ b/controller.go @@ -27,6 +27,7 @@ var ( healthCheckPort uint healthcheckInterval time.Duration metricsAddress string + updateStackInterval time.Duration ) func loadSettings() error { @@ -35,6 +36,7 @@ func loadSettings() error { "server base url. If empty will try to use the configuration from the running cluster, else it will use InsecureConfig, that does not use encryption or authentication (use case to develop with kubectl proxy).") flag.DurationVar(&pollingInterval, "polling-interval", 30*time.Second, "sets the polling interval for "+ "ingress resources. The flag accepts a value acceptable to time.ParseDuration") + flag.DurationVar(&updateStackInterval, "update-stack-interval", 1*time.Hour, "sets the interval for update AWS ALB stack resources, which can fix migrations, if you add for example one subnet to your VPC your ALB has not interface in that. An update stack will add these interfaces automatically. The flag accepts a value acceptable to time.ParseDuration") flag.StringVar(&cfCustomTemplate, "cf-custom-template", "", "filename for a custom cloud formation template to use instead of the built in") flag.DurationVar(&creationTimeout, "creation-timeout", aws.DefaultCreationTimeout, @@ -61,6 +63,9 @@ func loadSettings() error { if err := loadDurationFromEnv("POLLING_INTERVAL", &pollingInterval); err != nil { return err } + if err := loadDurationFromEnv("UPDATE_STACK_INTERVAL", &updateStackInterval); err != nil { + return err + } if err := loadDurationFromEnv("CREATION_TIMEOUT", &creationTimeout); err != nil { return err @@ -162,7 +167,7 @@ func main() { go serveMetrics(metricsAddress) quitCH := make(chan struct{}) - go startPolling(quitCH, certificatesProvider, awsAdapter, kubeAdapter, pollingInterval) + go startPolling(quitCH, certificatesProvider, awsAdapter, kubeAdapter, pollingInterval, updateStackInterval) <-quitCH log.Printf("terminating %s", os.Args[0]) diff --git a/worker.go b/worker.go index ec2abd69..98ff5f04 100644 --- a/worker.go +++ b/worker.go @@ -32,6 +32,10 @@ const ( orphan ) +const ( + maxTargetGroupSupported = 1000 +) + func (item *managedItem) Status() int { if item.stack.ShouldDelete() { return orphan @@ -51,7 +55,9 @@ func waitForTerminationSignals(signals ...os.Signal) chan os.Signal { return c } -func startPolling(quitCH chan struct{}, certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter, pollingInterval time.Duration) { +func startPolling(quitCH chan struct{}, certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter, pollingInterval, updateStackInterval time.Duration) { + items := make(chan *managedItem, maxTargetGroupSupported) + go updateStacks(awsAdapter, updateStackInterval, items) for { log.Printf("Start polling sleep %s", pollingInterval) select { @@ -59,14 +65,38 @@ func startPolling(quitCH chan struct{}, certsProvider certs.CertificatesProvider quitCH <- struct{}{} return case <-time.After(pollingInterval): - if err := doWork(certsProvider, awsAdapter, kubeAdapter); err != nil { + if err := doWork(certsProvider, awsAdapter, kubeAdapter, items); err != nil { log.Println(err) } } } } -func doWork(certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter) error { +func updateStacks(awsAdapter *aws.Adapter, interval time.Duration, items <-chan *managedItem) { + for { + itemsMap := map[string]*managedItem{} + done := make(chan struct{}) + go func() { + for { + select { + case item := <-items: + if _, ok := itemsMap[item.stack.CertificateARN()]; !ok { + itemsMap[item.stack.CertificateARN()] = item + } + case <-done: + return + } + } + }() + time.Sleep(interval) + done <- struct{}{} + for _, item := range itemsMap { + updateStack(awsAdapter, item) + } + } +} + +func doWork(certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter, items chan<- *managedItem) error { defer func() error { if r := recover(); r != nil { log.Println("shit has hit the fan:", errors.Wrap(r.(error), "panic caused by")) @@ -98,8 +128,9 @@ func doWork(certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, k markToDeleteStack(awsAdapter, managedItem) case missing: createStack(awsAdapter, managedItem) - fallthrough + updateIngress(kubeAdapter, managedItem) case ready: + items <- managedItem updateIngress(kubeAdapter, managedItem) } } @@ -168,6 +199,20 @@ func createStack(awsAdapter *aws.Adapter, item *managedItem) { } } +func updateStack(awsAdapter *aws.Adapter, item *managedItem) { + certificateARN := item.ingresses[0].CertificateARN() + log.Printf("updating stack for certificate %q / ingress %q", certificateARN, item.ingresses) + + stackId, err := awsAdapter.UpdateStack(certificateARN) + if isNoUpdatesToBePerformedError(err) { + log.Printf("stack(%q) is already up to date", certificateARN) + } else if err != nil { + log.Printf("updateStack(%q) failed: %v", certificateARN, err) + } else { + log.Printf("stack %q for certificate %q updated", stackId, certificateARN) + } +} + func isAlreadyExistsError(err error) bool { if awsErr, ok := err.(awserr.Error); ok { return awsErr.Code() == cloudformation.ErrCodeAlreadyExistsException @@ -175,6 +220,16 @@ func isAlreadyExistsError(err error) bool { return false } +func isNoUpdatesToBePerformedError(err error) bool { + if err == nil { + return false + } + if _, ok := err.(awserr.Error); ok { + return strings.Contains(err.Error(), "No updates are to be performed") + } + return false +} + func updateIngress(kubeAdapter *kubernetes.Adapter, item *managedItem) { if item.stack == nil { return