Skip to content

Commit

Permalink
fix(edgestacks): remove edge stacks even after a system crash or powe…
Browse files Browse the repository at this point in the history
…r-off BE-10822 (#208)
  • Loading branch information
andres-portainer authored Dec 4, 2024
1 parent 0f6729f commit a028181
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 17 deletions.
10 changes: 9 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/edge"
"github.com/portainer/portainer/pkg/libstack"
)
Expand Down Expand Up @@ -68,6 +69,11 @@ type (
} `json:"Agent"`
}

EdgeStack struct {
ID int
Name string
}

EdgeMetaFields struct {
// EdgeGroupsIDs - Used for AEEC, the created environment will be added to these edge groups
EdgeGroupsIDs []int
Expand Down Expand Up @@ -190,6 +196,7 @@ type (
// WaitForStatus waits until status is reached or an error occurred
// if the received value is an empty string it means the status was
WaitForStatus(ctx context.Context, name string, status libstack.Status, options CheckStatusOptions) <-chan libstack.WaitResult
GetEdgeStacks(ctx context.Context) ([]EdgeStack, error)
}

DeployerBaseOptions struct {
Expand All @@ -202,7 +209,8 @@ type (

DeployOptions struct {
DeployerBaseOptions
Prune bool
Prune bool
EdgeStackID portainer.EdgeStackID
}

RemoveOptions struct {
Expand Down
40 changes: 24 additions & 16 deletions edge/poll.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edge

import (
"context"
"encoding/base64"
"errors"
"math/rand"
Expand Down Expand Up @@ -43,6 +44,7 @@ type PollService struct {
tunnelServerAddr string
tunnelServerFingerprint string
tunnelProxy string
firstPoll bool

// Async mode only
pingInterval time.Duration
Expand Down Expand Up @@ -99,6 +101,7 @@ func newPollService(edgeManager *Manager, edgeStackManager *stack.StackManager,
tunnelServerFingerprint: config.TunnelServerFingerprint,
tunnelProxy: config.TunnelProxy,
portainerClient: portainerClient,
firstPoll: true,
}

if config.TunnelCapability {
Expand Down Expand Up @@ -140,6 +143,7 @@ func (service *PollService) startStatusPollLoop() {
Msg("starting Portainer short-polling client")

lastPollFailed := false

for {
select {
case <-pollCh:
Expand All @@ -153,7 +157,7 @@ func (service *PollService) startStatusPollLoop() {

err := service.poll()
if err != nil {
log.Error().Err(err).Msg("an error occured during short poll")
log.Error().Err(err).Msg("an error occurred during short poll")

lastPollFailed = true
service.pollTicker.Reset(time.Duration(service.pollIntervalInSeconds) * time.Second)
Expand Down Expand Up @@ -233,9 +237,8 @@ func (service *PollService) poll() error {
Float64("checkin_interval_seconds", environmentStatus.CheckinInterval).
Msg("")

tunnelErr := service.manageUpdateTunnel(*environmentStatus)
if tunnelErr != nil {
return tunnelErr
if err := service.manageUpdateTunnel(*environmentStatus); err != nil {
return err
}

service.processSchedules(environmentStatus.Schedules)
Expand Down Expand Up @@ -266,17 +269,15 @@ func (service *PollService) manageUpdateTunnel(environmentStatus client.PollStat
Str("status", environmentStatus.Status).
Msg("idle status detected, shutting down tunnel")

err := service.tunnelClient.CloseTunnel()
if err != nil {
if err := service.tunnelClient.CloseTunnel(); err != nil {
log.Error().Err(err).Msg("unable to shutdown tunnel")
}
}

if environmentStatus.Status == agent.TunnelStatusRequired && !service.tunnelClient.IsTunnelOpen() {
log.Debug().Msg("required status detected, creating reverse tunnel")

err := service.createTunnel(environmentStatus.Credentials, environmentStatus.Port)
if err != nil {
if err := service.createTunnel(environmentStatus.Credentials, environmentStatus.Port); err != nil {
log.Error().Err(err).Msg("unable to create tunnel")

return err
Expand Down Expand Up @@ -306,8 +307,7 @@ func (service *PollService) createTunnel(encodedCredentials string, remotePort i
RemotePort: strconv.Itoa(remotePort),
}

err = service.tunnelClient.CreateTunnel(tunnelConfig)
if err != nil {
if err := service.tunnelClient.CreateTunnel(tunnelConfig); err != nil {
return err
}

Expand All @@ -316,24 +316,32 @@ func (service *PollService) createTunnel(encodedCredentials string, remotePort i
}

func (service *PollService) processSchedules(schedules []agent.Schedule) {
err := service.scheduleManager.Schedule(schedules)
if err != nil {
if err := service.scheduleManager.Schedule(schedules); err != nil {
log.Error().Err(err).Msg("an error occurred during schedule management")
}
}

func (service *PollService) processStacks(pollResponseStacks []client.StackStatus) error {
if pollResponseStacks == nil {
return nil
// Load existing edge stacks so they can be removed using the initial poll response
if service.firstPoll {
log.Info().Msg("loading the existing edge stacks")

ctx, cancelFn := context.WithTimeout(context.Background(), time.Minute)
defer cancelFn()

if err := service.edgeStackManager.LoadExistingEdgeStacks(ctx); err == nil {
service.firstPoll = false
} else {
log.Warn().Err(err).Msg("unable to retrieve the existing edge stacks")
}
}

stacks := map[int]client.StackStatus{}
for _, s := range pollResponseStacks {
stacks[s.ID] = s
}

err := service.edgeStackManager.UpdateStacksStatus(stacks)
if err != nil {
if err := service.edgeStackManager.UpdateStacksStatus(stacks); err != nil {
log.Error().Err(err).Msg("an error occurred during stack management")

return err
Expand Down
30 changes: 30 additions & 0 deletions edge/stack/manager.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package stack

import (
"context"
"fmt"
"sync"

"github.com/portainer/agent"
"github.com/portainer/agent/edge/client"
"github.com/portainer/agent/exec"
"github.com/portainer/agent/kubernetes"
"github.com/portainer/portainer/api/edge"

"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -121,6 +124,33 @@ func (manager *StackManager) SetEngineType(engineTyp engineType) error {
return nil
}

// LoadExistingEdgeStacks loads all the edge stacks deployed by Portainer
func (manager *StackManager) LoadExistingEdgeStacks(ctx context.Context) error {
edgeStacks, err := manager.deployer.GetEdgeStacks(ctx)
if err != nil {
return err
}

manager.mu.Lock()
for _, s := range edgeStacks {
if _, found := manager.stacks[edgeStackID(s.ID)]; found {
continue
}

manager.stacks[edgeStackID(s.ID)] = &edgeStack{
StackPayload: edge.StackPayload{
ID: s.ID,
Name: s.Name,
},
Action: actionIdle,
Status: StatusPending,
}
}
manager.mu.Unlock()

return nil
}

func (manager *StackManager) buildDeployerService(assetsPath string, engineStatus engineType) (agent.Deployer, error) {
switch engineStatus {
case EngineTypeDockerStandalone:
Expand Down
1 change: 1 addition & 0 deletions edge/stack/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ func (manager *StackManager) deployStack(ctx context.Context, stack *edgeStack,
Env: envVars,
Registries: manager.ensureRegCreds(stack),
},
EdgeStackID: portainer.EdgeStackID(stack.ID),
},
)
manager.mu.Lock()
Expand Down
3 changes: 3 additions & 0 deletions edge/stack/stack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func TestStackManager_deployStack(t *testing.T) {

mockPortainerClient.EXPECT().SetEdgeStackStatus(stack.ID, portainer.EdgeStackStatusDeploying, stack.RollbackTo, "").Return(nil)
mockDeployer.EXPECT().Deploy(ctx, stackName, []string{stackFileLocation}, agent.DeployOptions{
EdgeStackID: portainer.EdgeStackID(stack.ID),
DeployerBaseOptions: agent.DeployerBaseOptions{
Namespace: stack.Namespace,
WorkingDir: stack.FileFolder,
Expand Down Expand Up @@ -175,6 +176,7 @@ func TestStackManager_deployStack(t *testing.T) {

mockPortainerClient.EXPECT().SetEdgeStackStatus(stack.ID, portainer.EdgeStackStatusDeploying, stack.RollbackTo, "").Return(nil)
mockDeployer.EXPECT().Deploy(ctx, stackName, []string{stackFileLocation}, agent.DeployOptions{
EdgeStackID: portainer.EdgeStackID(stack.ID),
DeployerBaseOptions: agent.DeployerBaseOptions{
Namespace: stack.Namespace,
WorkingDir: stack.FileFolder,
Expand Down Expand Up @@ -210,6 +212,7 @@ func TestStackManager_deployStack(t *testing.T) {

mockPortainerClient.EXPECT().SetEdgeStackStatus(stack.ID, portainer.EdgeStackStatusDeploying, stack.RollbackTo, "").Return(nil)
mockDeployer.EXPECT().Deploy(ctx, stackName, []string{stackFileLocation}, agent.DeployOptions{
EdgeStackID: portainer.EdgeStackID(stack.ID),
DeployerBaseOptions: agent.DeployerBaseOptions{
Namespace: stack.Namespace,
WorkingDir: stack.FileFolder,
Expand Down
22 changes: 22 additions & 0 deletions exec/docker_compose_stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package exec

import (
"context"
"strings"

"github.com/docker/cli/cli/config/types"
"github.com/portainer/agent"
Expand All @@ -10,6 +11,8 @@ import (
"github.com/portainer/portainer/pkg/libstack/compose"
)

var _ agent.Deployer = &DockerComposeStackService{}

// DockerComposeStackService represents a service for managing stacks by using the Docker binary.
type DockerComposeStackService struct {
deployer libstack.Deployer
Expand All @@ -33,6 +36,7 @@ func (service *DockerComposeStackService) Deploy(ctx context.Context, name strin
Registries: registryCredsToAuthConfigs(options.Registries),
},
RemoveOrphans: options.Prune,
EdgeStackID: options.EdgeStackID,
})
}

Expand Down Expand Up @@ -69,6 +73,24 @@ func (service *DockerComposeStackService) WaitForStatus(ctx context.Context, nam
return service.deployer.WaitForStatus(ctx, name, status)
}

func (service *DockerComposeStackService) GetEdgeStacks(ctx context.Context) ([]agent.EdgeStack, error) {
var r []agent.EdgeStack

edgeStacks, err := service.deployer.GetExistingEdgeStacks(ctx)
if err != nil {
return nil, err
}

for _, s := range edgeStacks {
// Remove the prefix because it will get added back by the stack manager
s.Name = strings.TrimPrefix(s.Name, "edge_")

r = append(r, agent.EdgeStack(s))
}

return r, nil
}

func registryCredsToAuthConfigs(registryCreds []edge.RegistryCredentials) []types.AuthConfig {
var authConfigs []types.AuthConfig

Expand Down
6 changes: 6 additions & 0 deletions exec/docker_swarm_stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/portainer/portainer/pkg/libstack/compose"
)

var _ agent.Deployer = &DockerSwarmStackService{}

// DockerSwarmStackService represents a service for managing stacks by using the Docker binary.
type DockerSwarmStackService struct {
command string
Expand Down Expand Up @@ -90,3 +92,7 @@ func (service *DockerSwarmStackService) Remove(ctx context.Context, name string,

return err
}

func (service *DockerSwarmStackService) GetEdgeStacks(ctx context.Context) ([]agent.EdgeStack, error) {
return nil, nil
}
6 changes: 6 additions & 0 deletions exec/kubernetes_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/portainer/agent/kubernetes"
)

var _ agent.Deployer = &KubernetesDeployer{}

// KubernetesDeployer represents a service to deploy resources inside a Kubernetes environment.
type KubernetesDeployer struct {
command string
Expand Down Expand Up @@ -90,6 +92,10 @@ func (deployer *KubernetesDeployer) DeployRawConfig(token, config string, namesp
return runCommandAndCaptureStdErr(deployer.command, args, &cmdOpts{Input: config})
}

func (service *KubernetesDeployer) GetEdgeStacks(ctx context.Context) ([]agent.EdgeStack, error) {
return nil, nil
}

type argOptions struct {
Namespace string
Token string
Expand Down
15 changes: 15 additions & 0 deletions internals/mocks/mock_agent.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a028181

Please sign in to comment.