Skip to content

Commit

Permalink
update-stack on ready CFs (#79)
Browse files Browse the repository at this point in the history
* always update stacks on ready states
* run updatestacks in a separate goroutine
* rename createStackSpec to stackSpec
* expose updateStackInterval as flag and environment variable
  • Loading branch information
szuecs authored Jul 26, 2017
1 parent 8c0726b commit 384c4b8
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 15 deletions.
22 changes: 21 additions & 1 deletion aws/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (a *Adapter) FindManagedStacks() ([]*Stack, error) {
// All the required resources (listeners and target group) are created in a transactional fashion.
// Failure to create the stack causes it to be deleted automatically.
func (a *Adapter) CreateStack(certificateARN string) (string, error) {
spec := &createStackSpec{
spec := &stackSpec{
name: a.stackName(certificateARN),
scheme: elbv2.LoadBalancerSchemeEnumInternetFacing,
certificateARN: certificateARN,
Expand All @@ -253,6 +253,26 @@ func (a *Adapter) CreateStack(certificateARN string) (string, error) {
return createStack(a.cloudformation, spec)
}

func (a *Adapter) UpdateStack(certificateARN string) (string, error) {
spec := &stackSpec{
name: a.stackName(certificateARN),
scheme: elbv2.LoadBalancerSchemeEnumInternetFacing,
certificateARN: certificateARN,
securityGroupID: a.SecurityGroupID(),
subnets: a.PublicSubnetIDs(),
vpcID: a.VpcID(),
clusterID: a.ClusterID(),
healthCheck: &healthCheck{
path: a.healthCheckPath,
port: a.healthCheckPort,
interval: a.healthCheckInterval,
},
timeoutInMinutes: uint(a.creationTimeout.Minutes()),
}

return updateStack(a.cloudformation, spec)
}

func (a *Adapter) stackName(certificateARN string) string {
return normalizeStackName(a.ClusterID(), certificateARN)
}
Expand Down
42 changes: 40 additions & 2 deletions aws/cf.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ const (
parameterListenerCertificateParameter = "ListenerCertificateParameter"
)

type createStackSpec struct {
type stackSpec struct {
name string
scheme string
subnets []string
Expand All @@ -135,7 +135,7 @@ type healthCheck struct {
interval time.Duration
}

func createStack(svc cloudformationiface.CloudFormationAPI, spec *createStackSpec) (string, error) {
func createStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (string, error) {
template := templateYAML
if spec.customTemplate != "" {
template = spec.customTemplate
Expand Down Expand Up @@ -175,6 +175,44 @@ func createStack(svc cloudformationiface.CloudFormationAPI, spec *createStackSpe
return aws.StringValue(resp.StackId), nil
}

func updateStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (string, error) {
template := templateYAML
if spec.customTemplate != "" {
template = spec.customTemplate
}
params := &cloudformation.UpdateStackInput{
StackName: aws.String(spec.name),
Parameters: []*cloudformation.Parameter{
cfParam(parameterLoadBalancerSchemeParameter, spec.scheme),
cfParam(parameterLoadBalancerSecurityGroupParameter, spec.securityGroupID),
cfParam(parameterLoadBalancerSubnetsParameter, strings.Join(spec.subnets, ",")),
cfParam(parameterTargetGroupVPCIDParameter, spec.vpcID),
cfParam(parameterListenerCertificateParameter, spec.certificateARN),
},
Tags: []*cloudformation.Tag{
cfTag(kubernetesCreatorTag, kubernetesCreatorValue),
cfTag(clusterIDTag, spec.clusterID),
},
TemplateBody: aws.String(template),
}
if spec.certificateARN != "" {
params.Tags = append(params.Tags, cfTag(certificateARNTag, spec.certificateARN))
}
if spec.healthCheck != nil {
params.Parameters = append(params.Parameters,
cfParam(parameterTargetGroupHealthCheckPathParameter, spec.healthCheck.path),
cfParam(parameterTargetGroupHealthCheckPortParameter, fmt.Sprintf("%d", spec.healthCheck.port)),
cfParam(parameterTargetGroupHealthCheckIntervalParameter, fmt.Sprintf("%.0f", spec.healthCheck.interval.Seconds())),
)
}
resp, err := svc.UpdateStack(params)
if err != nil {
return spec.name, err
}

return aws.StringValue(resp.StackId), nil
}

func cfParam(key, value string) *cloudformation.Parameter {
return &cloudformation.Parameter{
ParameterKey: aws.String(key),
Expand Down
14 changes: 7 additions & 7 deletions aws/cf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@ import (
func TestCreatingStack(t *testing.T) {
for _, ti := range []struct {
name string
givenSpec createStackSpec
givenSpec stackSpec
givenOutputs cfMockOutputs
want string
wantErr bool
}{
{
"successful-call",
createStackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"},
stackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"},
cfMockOutputs{createStack: R(mockCSOutput("fake-stack-id"), nil)},
"fake-stack-id",
false,
},
{
"successful-call",
createStackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"},
stackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"},
cfMockOutputs{createStack: R(mockCSOutput("fake-stack-id"), nil)},
"fake-stack-id",
false,
},
{
"fail-call",
createStackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"},
stackSpec{name: "foo", securityGroupID: "bar", vpcID: "baz"},
cfMockOutputs{createStack: R(nil, dummyErr)},
"fake-stack-id",
true,
Expand All @@ -58,19 +58,19 @@ func TestCreatingStack(t *testing.T) {
func TestDeleteStack(t *testing.T) {
for _, ti := range []struct {
msg string
givenSpec createStackSpec
givenSpec stackSpec
givenOutputs cfMockOutputs
wantErr bool
}{
{
"delete-existing-stack",
createStackSpec{name: "existing-stack-id"},
stackSpec{name: "existing-stack-id"},
cfMockOutputs{deleteStack: R(mockDeleteStackOutput("existing-stack-id"), nil)},
false,
},
{
"delete-non-existing-stack",
createStackSpec{name: "non-existing-stack-id"},
stackSpec{name: "non-existing-stack-id"},
cfMockOutputs{deleteStack: R(mockDeleteStackOutput("existing-stack-id"), nil)},
false,
},
Expand Down
7 changes: 6 additions & 1 deletion controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
healthCheckPort uint
healthcheckInterval time.Duration
metricsAddress string
updateStackInterval time.Duration
)

func loadSettings() error {
Expand All @@ -35,6 +36,7 @@ func loadSettings() error {
"server base url. If empty will try to use the configuration from the running cluster, else it will use InsecureConfig, that does not use encryption or authentication (use case to develop with kubectl proxy).")
flag.DurationVar(&pollingInterval, "polling-interval", 30*time.Second, "sets the polling interval for "+
"ingress resources. The flag accepts a value acceptable to time.ParseDuration")
flag.DurationVar(&updateStackInterval, "update-stack-interval", 1*time.Hour, "sets the interval for update AWS ALB stack resources, which can fix migrations, if you add for example one subnet to your VPC your ALB has not interface in that. An update stack will add these interfaces automatically. The flag accepts a value acceptable to time.ParseDuration")
flag.StringVar(&cfCustomTemplate, "cf-custom-template", "",
"filename for a custom cloud formation template to use instead of the built in")
flag.DurationVar(&creationTimeout, "creation-timeout", aws.DefaultCreationTimeout,
Expand All @@ -61,6 +63,9 @@ func loadSettings() error {
if err := loadDurationFromEnv("POLLING_INTERVAL", &pollingInterval); err != nil {
return err
}
if err := loadDurationFromEnv("UPDATE_STACK_INTERVAL", &updateStackInterval); err != nil {
return err
}

if err := loadDurationFromEnv("CREATION_TIMEOUT", &creationTimeout); err != nil {
return err
Expand Down Expand Up @@ -162,7 +167,7 @@ func main() {

go serveMetrics(metricsAddress)
quitCH := make(chan struct{})
go startPolling(quitCH, certificatesProvider, awsAdapter, kubeAdapter, pollingInterval)
go startPolling(quitCH, certificatesProvider, awsAdapter, kubeAdapter, pollingInterval, updateStackInterval)
<-quitCH

log.Printf("terminating %s", os.Args[0])
Expand Down
63 changes: 59 additions & 4 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const (
orphan
)

const (
maxTargetGroupSupported = 1000
)

func (item *managedItem) Status() int {
if item.stack.ShouldDelete() {
return orphan
Expand All @@ -51,22 +55,48 @@ func waitForTerminationSignals(signals ...os.Signal) chan os.Signal {
return c
}

func startPolling(quitCH chan struct{}, certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter, pollingInterval time.Duration) {
func startPolling(quitCH chan struct{}, certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter, pollingInterval, updateStackInterval time.Duration) {
items := make(chan *managedItem, maxTargetGroupSupported)
go updateStacks(awsAdapter, updateStackInterval, items)
for {
log.Printf("Start polling sleep %s", pollingInterval)
select {
case <-waitForTerminationSignals(syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT):
quitCH <- struct{}{}
return
case <-time.After(pollingInterval):
if err := doWork(certsProvider, awsAdapter, kubeAdapter); err != nil {
if err := doWork(certsProvider, awsAdapter, kubeAdapter, items); err != nil {
log.Println(err)
}
}
}
}

func doWork(certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter) error {
func updateStacks(awsAdapter *aws.Adapter, interval time.Duration, items <-chan *managedItem) {
for {
itemsMap := map[string]*managedItem{}
done := make(chan struct{})
go func() {
for {
select {
case item := <-items:
if _, ok := itemsMap[item.stack.CertificateARN()]; !ok {
itemsMap[item.stack.CertificateARN()] = item
}
case <-done:
return
}
}
}()
time.Sleep(interval)
done <- struct{}{}
for _, item := range itemsMap {
updateStack(awsAdapter, item)
}
}
}

func doWork(certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter, items chan<- *managedItem) error {
defer func() error {
if r := recover(); r != nil {
log.Println("shit has hit the fan:", errors.Wrap(r.(error), "panic caused by"))
Expand Down Expand Up @@ -98,8 +128,9 @@ func doWork(certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, k
markToDeleteStack(awsAdapter, managedItem)
case missing:
createStack(awsAdapter, managedItem)
fallthrough
updateIngress(kubeAdapter, managedItem)
case ready:
items <- managedItem
updateIngress(kubeAdapter, managedItem)
}
}
Expand Down Expand Up @@ -168,13 +199,37 @@ func createStack(awsAdapter *aws.Adapter, item *managedItem) {
}
}

func updateStack(awsAdapter *aws.Adapter, item *managedItem) {
certificateARN := item.ingresses[0].CertificateARN()
log.Printf("updating stack for certificate %q / ingress %q", certificateARN, item.ingresses)

stackId, err := awsAdapter.UpdateStack(certificateARN)
if isNoUpdatesToBePerformedError(err) {
log.Printf("stack(%q) is already up to date", certificateARN)
} else if err != nil {
log.Printf("updateStack(%q) failed: %v", certificateARN, err)
} else {
log.Printf("stack %q for certificate %q updated", stackId, certificateARN)
}
}

func isAlreadyExistsError(err error) bool {
if awsErr, ok := err.(awserr.Error); ok {
return awsErr.Code() == cloudformation.ErrCodeAlreadyExistsException
}
return false
}

func isNoUpdatesToBePerformedError(err error) bool {
if err == nil {
return false
}
if _, ok := err.(awserr.Error); ok {
return strings.Contains(err.Error(), "No updates are to be performed")
}
return false
}

func updateIngress(kubeAdapter *kubernetes.Adapter, item *managedItem) {
if item.stack == nil {
return
Expand Down

0 comments on commit 384c4b8

Please sign in to comment.