Skip to content

Commit

Permalink
Fix backup post hook issue
Browse files Browse the repository at this point in the history
Fix backup post hook issue

Fixes #8159

Signed-off-by: Wenkai Yin(尹文开) <[email protected]>
  • Loading branch information
ywk253100 committed Dec 12, 2024
1 parent cb7758f commit c406423
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 12 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8509-ywk253100
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix backup post hook issue #8159 (caused by #7571): always execute backup post hooks after PVBs are handled
10 changes: 7 additions & 3 deletions internal/hook/hook_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ type HookTracker struct {
// HookExecutedCnt indicates the number of executed hooks.
hookExecutedCnt int
// hookErrs records hook execution errors if any.
hookErrs []HookErrInfo
hookErrs []HookErrInfo
AsyncItemBlocks *sync.WaitGroup
}

// NewHookTracker creates a hookTracker instance.
func NewHookTracker() *HookTracker {
return &HookTracker{
lock: &sync.RWMutex{},
tracker: make(map[hookKey]hookStatus),
lock: &sync.RWMutex{},
tracker: make(map[hookKey]hookStatus),
AsyncItemBlocks: &sync.WaitGroup{},
}
}

Expand Down Expand Up @@ -141,6 +143,8 @@ func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName

// Stat returns the number of attempted hooks and failed hooks
func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailedCnt int) {
ht.AsyncItemBlocks.Wait()

ht.lock.RLock()
defer ht.lock.RUnlock()

Expand Down
113 changes: 104 additions & 9 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/internal/hook"
Expand Down Expand Up @@ -488,7 +491,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
backedUpGRs := kb.backupItemBlock(*itemBlock)
backedUpGRs := kb.backupItemBlock(ctx, *itemBlock)
for _, backedUpGR := range backedUpGRs {
backedUpGroupResources[backedUpGR] = true
}
Expand Down Expand Up @@ -649,7 +652,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions(
}
}

func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource {
func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource {
// find pods in ItemBlock
// filter pods based on whether they still need to be backed up
// this list will be used to run pre/post hooks
Expand All @@ -672,7 +675,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
}
}
}
postHookPods, failedPods, errs := kb.handleItemBlockHooks(itemBlock, preHookPods, hook.PhasePre)
postHookPods, failedPods, errs := kb.handleItemBlockPreHooks(itemBlock, preHookPods)
for i, pod := range failedPods {
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running pre hooks for pod")
// if pre hook fails, flag pod as backed-up and move on
Expand All @@ -692,10 +695,9 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
}
}

itemBlock.Log.Debug("Executing post hooks")
_, failedPods, errs = kb.handleItemBlockHooks(itemBlock, postHookPods, hook.PhasePost)
for i, pod := range failedPods {
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
if len(postHookPods) > 0 {
itemBlock.Log.Debug("Executing post hooks")
kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods)
}

return grList
Expand All @@ -714,12 +716,12 @@ func (kb *kubernetesBackupper) itemMetadataAndKey(item itemblock.ItemBlockItem)
return metadata, key, nil
}

func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem, phase hook.HookPhase) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
var successPods []itemblock.ItemBlockItem
var failedPods []itemblock.ItemBlockItem
var errs []error
for _, pod := range hookPods {
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, phase, itemBlock.itemBackupper.hookTracker)
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, hook.PhasePre, itemBlock.itemBackupper.hookTracker)
if err == nil {
successPods = append(successPods, pod)
} else {
Expand All @@ -730,6 +732,99 @@ func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, h
return successPods, failedPods, errs
}

func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
log := itemBlock.Log

// Handle post hooks for file system backup
// The hooks cannot execute until the PVBs to be processed
isFileSystemBackup := itemBlock.itemBackupper.backupRequest.Spec.DefaultVolumesToFsBackup
if isFileSystemBackup != nil && *isFileSystemBackup {
itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1)
go func() {
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()

if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
}

Check warning on line 749 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L742-L749

Added lines #L742 - L749 were not covered by tests

for _, pod := range hookPods {
if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks,
hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil {
log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
}

Check warning on line 755 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L751-L755

Added lines #L751 - L755 were not covered by tests
}
}()
return

Check warning on line 758 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L758

Added line #L758 was not covered by tests
}

// not file system backup
for _, pod := range hookPods {
if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks,
hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil {
log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
}

Check warning on line 766 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L765-L766

Added lines #L765 - L766 were not covered by tests
}
}

func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)})
if err != nil {
return errors.Wrapf(err, "failed to create label requirement")
}
options := &kbclient.ListOptions{
LabelSelector: labels.NewSelector().Add(*requirement),
}
pvbList := &velerov1api.PodVolumeBackupList{}
if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil {
return errors.Wrap(err, "failed to list PVBs")
}

Check warning on line 781 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L770-L781

Added lines #L770 - L781 were not covered by tests

podMap := map[string]struct{}{}
for _, pod := range pods {
podMap[string(pod.Item.GetUID())] = struct{}{}
}

Check warning on line 786 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L783-L786

Added lines #L783 - L786 were not covered by tests

pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for _, pvb := range pvbList.Items {
if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist {
continue

Check warning on line 791 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L788-L791

Added lines #L788 - L791 were not covered by tests
}

processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
}
pvbMap[&pvb] = processed

Check failure on line 799 in pkg/backup/backup.go

View workflow job for this annotation

GitHub Actions / Run Linter Check

G601: Implicit memory aliasing in for loop. (gosec)

Check warning on line 799 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L794-L799

Added lines #L794 - L799 were not covered by tests
}

checkFunc := func(context.Context) (done bool, err error) {
allProcessed := true
for pvb, processed := range pvbMap {
if processed {
continue

Check warning on line 806 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L802-L806

Added lines #L802 - L806 were not covered by tests
}
updatedPVB := &velerov1api.PodVolumeBackup{}
if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil {
allProcessed = false
log.Infof("failed to get PVB: %v", err)
continue

Check warning on line 812 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L808-L812

Added lines #L808 - L812 were not covered by tests
}
if updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
pvbMap[pvb] = true
continue

Check warning on line 817 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L814-L817

Added lines #L814 - L817 were not covered by tests
}
allProcessed = false

Check warning on line 819 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L819

Added line #L819 was not covered by tests
}

return allProcessed, nil

Check warning on line 822 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L822

Added line #L822 was not covered by tests
}

return wait.PollUntilContextCancel(ctx, 5*time.Second, false, checkFunc)

Check warning on line 825 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L825

Added line #L825 was not covered by tests
}

func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool {
backedUpItem, _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, false, false, itemBlock)
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
Expand Down

0 comments on commit c406423

Please sign in to comment.