Skip to content

Commit

Permalink
Fix replace with update (#235)
Browse files Browse the repository at this point in the history
* plan1 : (1) delete new pod, (2) label origin pod with replaceUpdate label

* fix replace update

* fix ut: replace update change to inplaceUpdate

* add ut: test-replace-pod-with-recreate-update

* add e2e: replace pod with recreate update

* fix golint

* fix ut
  • Loading branch information
ColdsteelRail authored Jul 30, 2024
1 parent e9c2e18 commit 104f503
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 35 deletions.
216 changes: 200 additions & 16 deletions pkg/controllers/collaset/collaset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,166 @@ var (

var _ = Describe("collaset controller", func() {

It("replace pod with update", func() {
for _, updateStrategy := range []appsv1alpha1.PodUpdateStrategyType{appsv1alpha1.CollaSetRecreatePodUpdateStrategyType, appsv1alpha1.CollaSetInPlaceIfPossiblePodUpdateStrategyType, appsv1alpha1.CollaSetReplacePodUpdateStrategyType} {
testcase := fmt.Sprintf("test-replace-pod-with-%s-update", strings.ToLower(string(updateStrategy)))
Expect(createNamespace(c, testcase)).Should(BeNil())
csName := fmt.Sprintf("foo-%s", strings.ToLower(string(updateStrategy)))
cs := &appsv1alpha1.CollaSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: testcase,
Name: csName,
},
Spec: appsv1alpha1.CollaSetSpec{
Replicas: int32Pointer(1),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": csName,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": csName,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: csName,
Image: "nginx:v1",
},
},
},
},
UpdateStrategy: appsv1alpha1.UpdateStrategy{
PodUpdatePolicy: updateStrategy,
OperationDelaySeconds: int32Pointer(1),
},
},
}

Expect(c.Create(context.TODO(), cs)).Should(BeNil())
var originPodName, replaceNewPodName, replaceNewUpdatedPodName string

podList := &corev1.PodList{}
Eventually(func() bool {
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
return len(podList.Items) == 1
}, 5*time.Second, 1*time.Second).Should(BeTrue())
Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}, cs)).Should(BeNil())
Expect(expectedStatusReplicas(c, cs, 0, 0, 0, 1, 1, 0, 0, 0)).Should(BeNil())

// label pod to trigger replace
originPod := podList.Items[0]
originPodName = originPod.Name
Expect(updatePodWithRetry(c, originPod.Namespace, originPodName, func(pod *corev1.Pod) bool {
pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey] = "true"
return true
})).Should(BeNil())
Eventually(func() error {
return expectedStatusReplicas(c, cs, 0, 0, 0, 2, 2, 0, 0, 0)
}, 5*time.Second, 1*time.Second).Should(BeNil())

// update collaset with recreate update
observedGeneration := cs.Status.ObservedGeneration
Expect(updateCollaSetWithRetry(c, cs.Namespace, cs.Name, func(cls *appsv1alpha1.CollaSet) bool {
cls.Spec.UpdateStrategy.PodUpdatePolicy = updateStrategy
cls.Spec.Template.Spec.Containers[0].Image = "nginx:v2"
return true
})).Should(BeNil())
Expect(observedGeneration != cs.Status.ObservedGeneration)
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
Eventually(func() error {
if updateStrategy == appsv1alpha1.CollaSetReplacePodUpdateStrategyType {
return expectedStatusReplicas(c, cs, 0, 0, 0, 2, 0, 0, 0, 0)
} else {
return expectedStatusReplicas(c, cs, 0, 0, 0, 2, 0, 1, 0, 0)
}
}, 5*time.Second, 1*time.Second).Should(BeNil())

// allow origin pod to update
Expect(updatePodWithRetry(c, originPod.Namespace, originPodName, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.UpdateOpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
return true
})).Should(BeNil())
if updateStrategy == appsv1alpha1.CollaSetReplacePodUpdateStrategyType {
Eventually(func() error {
return expectedStatusReplicas(c, cs, 0, 0, 0, 2, 0, 0, 0, 0)
}, 10*time.Second, 1*time.Second).Should(BeNil())
} else {
Eventually(func() error {
return expectedStatusReplicas(c, cs, 0, 0, 0, 2, 0, 1, 0, 0)
}, 10*time.Second, 1*time.Second).Should(BeNil())
}
// allow replaceNewPod to delete
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for i := range podList.Items {
pod := &podList.Items[i]
Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
if _, exist := pod.Labels[appsv1alpha1.PodReplacePairOriginName]; exist {
replaceNewPodName = pod.Name
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, poddeletion.OpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
}
return true
})).Should(BeNil())
}

// wait for replaceNewPod deleted and replaceNewUpdatedPod created
Eventually(func() bool {
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for _, pod := range podList.Items {
if pod.Name == replaceNewPodName {
return false
}
}
return true
}, 5*time.Second, time.Second).Should(BeTrue())
Eventually(func() error {
if updateStrategy == appsv1alpha1.CollaSetReplacePodUpdateStrategyType {
return expectedStatusReplicas(c, cs, 0, 0, 0, 2, 1, 0, 0, 0)
} else {
return expectedStatusReplicas(c, cs, 0, 0, 0, 2, 1, 1, 0, 0)
}
}, 5*time.Second, 1*time.Second).Should(BeNil())

// mock replaceNewUpdatedPod pod service available
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for _, pod := range podList.Items {
if pod.Labels[appsv1alpha1.PodReplacePairOriginName] == originPodName {
replaceNewUpdatedPodName = pod.Name
Expect(pod.Spec.Containers[0].Image).Should(BeEquivalentTo("nginx:v2"))
Expect(updatePodWithRetry(c, pod.Namespace, replaceNewUpdatedPodName, func(pod *corev1.Pod) bool {
pod.Labels[appsv1alpha1.PodServiceAvailableLabel] = "true"
return true
})).Should(BeNil())
}
}

// wait for originPod is deleted
Eventually(func() bool {
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for _, pod := range podList.Items {
if pod.Name == originPodName {
// allow originPod pod to be deleted
Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, poddeletion.OpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
return true
})).Should(BeNil())
return false
}
}
return true
}, 5*time.Second, time.Second).Should(BeTrue())
Eventually(func() error {
return expectedStatusReplicas(c, cs, 0, 0, 1, 1, 1, 0, 0, 1)
}, 5*time.Second, 1*time.Second).Should(BeNil())
}
})

It("scale reconcile", func() {
testcase := "test-scale"
Expect(createNamespace(c, testcase)).Should(BeNil())
Expand Down Expand Up @@ -1454,7 +1614,7 @@ var _ = Describe("collaset controller", func() {
Name: "foo",
},
Spec: appsv1alpha1.CollaSetSpec{
Replicas: int32Pointer(1),
Replicas: int32Pointer(2),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "foo",
Expand Down Expand Up @@ -1487,10 +1647,10 @@ var _ = Describe("collaset controller", func() {
podList := &corev1.PodList{}
Eventually(func() bool {
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
return len(podList.Items) == 1
return len(podList.Items) == 2
}, 5*time.Second, 1*time.Second).Should(BeTrue())
Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}, cs)).Should(BeNil())
Expect(expectedStatusReplicas(c, cs, 0, 0, 0, 1, 1, 0, 0, 0)).Should(BeNil())
Expect(expectedStatusReplicas(c, cs, 0, 0, 0, 2, 2, 0, 0, 0)).Should(BeNil())

observedGeneration := cs.Status.ObservedGeneration
// update CollaSet image
Expand All @@ -1508,10 +1668,11 @@ var _ = Describe("collaset controller", func() {
}, 5*time.Second, 1*time.Second).Should(BeTrue())

Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
originPod := podList.Items[0]
originPod1 := podList.Items[0]
originPod2 := podList.Items[1]

// label pod to trigger update
Expect(updatePodWithRetry(c, originPod.Namespace, originPod.Name, func(pod *corev1.Pod) bool {
// label originPod1 to trigger update
Expect(updatePodWithRetry(c, originPod1.Namespace, originPod1.Name, func(pod *corev1.Pod) bool {
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
Expand All @@ -1521,7 +1682,7 @@ var _ = Describe("collaset controller", func() {

Eventually(func() error {
// check updated pod replicas by CollaSet status
return expectedStatusReplicas(c, cs, 0, 0, 0, 2, 1, 0, 0, 0)
return expectedStatusReplicas(c, cs, 0, 0, 0, 3, 1, 0, 0, 0)
}, 30*time.Second, 1*time.Second).Should(BeNil())

Expect(updateCollaSetWithRetry(c, cs.Namespace, cs.Name, func(cls *appsv1alpha1.CollaSet) bool {
Expand All @@ -1531,22 +1692,45 @@ var _ = Describe("collaset controller", func() {

Eventually(func() bool {
pod := &corev1.Pod{}
error := c.Get(context.TODO(), types.NamespacedName{Namespace: originPod.Namespace, Name: originPod.Name}, pod)
error := c.Get(context.TODO(), types.NamespacedName{Namespace: originPod1.Namespace, Name: originPod1.Name}, pod)
Expect(error).Should(BeNil())
Expect(pod.Labels).ShouldNot(BeNil())
_, replaceIndicate := pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]
_, replaceByUpdate := pod.Labels[appsv1alpha1.PodReplaceByReplaceUpdateLabelKey]
// check updated pod replicas by CollaSet status
return !replaceIndicate && !replaceByUpdate
return replaceIndicate && replaceByUpdate
}, 5*time.Second, 1*time.Second).Should(BeTrue())
// double check updated pod replicas

// label originPod2 to trigger update
Expect(updatePodWithRetry(c, originPod2.Namespace, originPod2.Name, func(pod *corev1.Pod) bool {
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
pod.Labels[appsv1alpha1.CollaSetUpdateIndicateLabelKey] = "true"
return true
})).Should(BeNil())

// allow originPod2 to do inPlace update
Expect(updatePodWithRetry(c, originPod2.Namespace, originPod2.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.UpdateOpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
return true
})).Should(BeNil())

time.Sleep(3 * time.Second)

Eventually(func() error {
// check updated pod replicas by CollaSet status
return expectedStatusReplicas(c, cs, 0, 0, 0, 3, 2, 2, 0, 0)
}, 30*time.Second, 1*time.Second).Should(BeNil())

Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for _, pod := range podList.Items {
if pod.Spec.Containers[0].Image == cs.Spec.Template.Spec.Containers[0].Image {
// check new pod to delete
Expect(pod.Labels).ShouldNot(BeNil())
Expect(pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]).ShouldNot(BeNil())
Expect(pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]).ShouldNot(BeNil())
for i := range podList.Items {
pod := podList.Items[i]
// originPod1 is during inPlaceUpdate lifecycle, but continue to be replaced by updated pod
// originPod2 is during inPlaceUpdate lifecycle, and being updated by inPlace
if pod.Name == originPod1.Name || pod.Name == originPod2.Name {
Expect(podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, &pod)).Should(BeTrue())
}
}
})
Expand Down
65 changes: 55 additions & 10 deletions pkg/controllers/collaset/synccontrol/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"fmt"
"strconv"
"strings"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1"
Expand Down Expand Up @@ -181,7 +183,7 @@ func (r *RealSyncControl) replaceOriginPods(
return successCount, err
}

func dealReplacePods(pods []*corev1.Pod, instance *appsv1alpha1.CollaSet) (needReplacePods []*corev1.Pod, needCleanLabelPods []*corev1.Pod, podNeedCleanLabels [][]string, needDeletePods []*corev1.Pod, replaceIndicateCount int) {
func dealReplacePods(pods []*corev1.Pod) (needReplacePods []*corev1.Pod, needCleanLabelPods []*corev1.Pod, podNeedCleanLabels [][]string, needDeletePods []*corev1.Pod, replaceIndicateCount int) {
var podInstanceIdMap = make(map[string]*corev1.Pod)
var podNameMap = make(map[string]*corev1.Pod)
for _, pod := range pods {
Expand Down Expand Up @@ -215,16 +217,10 @@ func dealReplacePods(pods []*corev1.Pod, instance *appsv1alpha1.CollaSet) (needR

needReplacePods = append(needReplacePods, pod)
}
isReplaceUpdate := instance.Spec.UpdateStrategy.PodUpdatePolicy == appsv1alpha1.CollaSetReplacePodUpdateStrategyType
// deal pods need to delete when pod update strategy is not replace update

for _, pod := range pods {
_, inReplace := pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]
_, replaceByUpdate := pod.Labels[appsv1alpha1.PodReplaceByReplaceUpdateLabelKey]
var needCleanLabels []string
if inReplace && replaceByUpdate && !isReplaceUpdate {
needCleanLabels = []string{appsv1alpha1.PodReplaceIndicationLabelKey, appsv1alpha1.PodReplaceByReplaceUpdateLabelKey}
}

// pod is replace new created pod, skip replace
if originPodName, exist := pod.Labels[appsv1alpha1.PodReplacePairOriginName]; exist {
Expand All @@ -245,10 +241,8 @@ func dealReplacePods(pods []*corev1.Pod, instance *appsv1alpha1.CollaSet) (needR
}

if newPairPodId, exist := pod.Labels[appsv1alpha1.PodReplacePairNewId]; exist {
if newPod, exist := podInstanceIdMap[newPairPodId]; !exist {
if _, exist := podInstanceIdMap[newPairPodId]; !exist {
needCleanLabels = append(needCleanLabels, appsv1alpha1.PodReplacePairNewId)
} else if replaceByUpdate && !isReplaceUpdate {
needDeletePods = append(needDeletePods, newPod)
}
}

Expand All @@ -261,6 +255,57 @@ func dealReplacePods(pods []*corev1.Pod, instance *appsv1alpha1.CollaSet) (needR
return
}

func updateReplaceOriginPod(
ctx context.Context,
c client.Client,
recorder record.EventRecorder,
originPodUpdateInfo, newPodUpdateInfo *PodUpdateInfo,
updatedRevision *appsv1.ControllerRevision) error {

originPod := originPodUpdateInfo.Pod
// 1. delete the new pod if not updated
if newPodUpdateInfo != nil {
newPod := newPodUpdateInfo.Pod
_, deletionIndicate := newPod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]
currentRevision, exist := newPod.Labels[appsv1.ControllerRevisionHashLabelKey]
if exist && currentRevision != updatedRevision.Name && !deletionIndicate {
patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano())))
if patchErr := c.Patch(ctx, newPod, patch); patchErr != nil {
err := fmt.Errorf("failed to delete replace pair new pod %s/%s %s",
newPod.Namespace, newPod.Name, patchErr)
return err
}
recorder.Eventf(originPod,
corev1.EventTypeNormal,
"DeleteOldNewPod",
"succeed to delete replace new Pod %s/%s by label to-replace",
originPod.Namespace,
originPod.Name,
)
}
}

// 2. replace the origin pod with updated pod
_, replaceIndicate := originPod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]
_, replaceByUpdate := originPod.Labels[appsv1alpha1.PodReplaceByReplaceUpdateLabelKey]
if !replaceIndicate || !replaceByUpdate {
now := time.Now().UnixNano()
patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%v", "%s": "%v"}}}`, appsv1alpha1.PodReplaceIndicationLabelKey, now, appsv1alpha1.PodReplaceByReplaceUpdateLabelKey, true)))
if err := c.Patch(ctx, originPod, patch); err != nil {
return fmt.Errorf("fail to label origin pod %s/%s with replace indicate label by replaceUpdate: %s", originPod.Namespace, originPod.Name, err)
}
recorder.Eventf(originPod,
corev1.EventTypeNormal,
"UpdateOriginPod",
"succeed to update Pod %s/%s by label to-replace",
originPod.Namespace,
originPod.Name,
)
}

return nil
}

func getReplaceRevision(originPod *corev1.Pod, resources *collasetutils.RelatedResources) *appsv1.ControllerRevision {
if _, exist := originPod.Labels[appsv1alpha1.PodReplaceByReplaceUpdateLabelKey]; exist {
return resources.UpdatedRevision
Expand Down
Loading

0 comments on commit 104f503

Please sign in to comment.