diff --git a/pkg/controllers/elastic_jobset.go b/pkg/controllers/elastic_jobset.go index 495a4cc1..cfe119f6 100644 --- a/pkg/controllers/elastic_jobset.go +++ b/pkg/controllers/elastic_jobset.go @@ -15,6 +15,7 @@ package controllers import ( "fmt" + "slices" "strconv" batchv1 "k8s.io/api/batch/v1" @@ -22,29 +23,64 @@ import ( jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" ) -// jobsToDeleteDownScale gathers the excess jobs during a downscale +func indexFunc(a, b batchv1.Job) int { + jobIndexA, errA := strconv.Atoi(a.Labels[jobset.JobIndexKey]) + jobIndexB, errB := strconv.Atoi(b.Labels[jobset.JobIndexKey]) + if errA != nil { + return 0 + } + if errB != nil { + return 0 + } + if jobIndexA > jobIndexB { + return 1 + } else if jobIndexA < jobIndexB { + return -1 + } else { + return 0 + } +} + +// jobsToDeleteForDownScale gathers the excess jobs during a downscale // and deletes the jobs -func jobsToDeleteDownScale(replicatedJobs []jobset.ReplicatedJob, replicatedJobStatus []jobset.ReplicatedJobStatus, jobItems []batchv1.Job) ([]*batchv1.Job, error) { +func jobsToDeleteForDownScale(replicatedJobs []jobset.ReplicatedJob, replicatedJobStatuses []jobset.ReplicatedJobStatus, jobItems []batchv1.Job) ([]*batchv1.Job, error) { jobsToDelete := []*batchv1.Job{} + type payload struct { + batchJobs []batchv1.Job + rjStatus jobset.ReplicatedJobStatus + replicas int32 + } + replicatedJobToBatchJobMap := map[string]payload{} for _, replicatedJob := range replicatedJobs { - status := findReplicatedJobStatus(replicatedJobStatus, replicatedJob.Name) - countOfJobsToDelete := status.Ready - replicatedJob.Replicas + status := findReplicatedJobStatus(replicatedJobStatuses, replicatedJob.Name) + newPayload := &payload{} + newPayload.rjStatus = status + newPayload.replicas = replicatedJob.Replicas + for _, val := range jobItems { + if val.Labels[jobset.ReplicatedJobNameKey] != replicatedJob.Name { + continue + } + newPayload.batchJobs = append(newPayload.batchJobs, val) + } + slices.SortFunc(newPayload.batchJobs, indexFunc) + replicatedJobToBatchJobMap[replicatedJob.Name] = *newPayload + } + for _, jobAndStatus := range replicatedJobToBatchJobMap { + countOfJobsToDelete := jobAndStatus.rjStatus.Ready - jobAndStatus.replicas if countOfJobsToDelete > 0 { jobsWeDeleted := 0 - for _, val := range jobItems { - if val.Labels[jobset.ReplicatedJobNameKey] != replicatedJob.Name { - continue - } - jobIndex, err := strconv.Atoi(val.Labels[jobset.JobIndexKey]) + for i := len(jobAndStatus.batchJobs) - 1; i >= 0; i-- { + + jobIndex, err := strconv.Atoi(jobAndStatus.batchJobs[i].Labels[jobset.JobIndexKey]) if err != nil { return nil, fmt.Errorf("unable get integer from job index key") } if jobIndex >= int(countOfJobsToDelete) { jobsWeDeleted = jobsWeDeleted + 1 - jobsToDelete = append(jobsToDelete, &val) + jobsToDelete = append(jobsToDelete, &jobAndStatus.batchJobs[i]) } if jobsWeDeleted == int(countOfJobsToDelete) { - continue + break } } } diff --git a/pkg/controllers/elastic_jobset_test.go b/pkg/controllers/elastic_jobset_test.go index 838df423..6f5f32fa 100644 --- a/pkg/controllers/elastic_jobset_test.go +++ b/pkg/controllers/elastic_jobset_test.go @@ -26,12 +26,12 @@ import ( func TestJobsToDeleteDownScale(t *testing.T) { tests := []struct { - name string - replicatedJobs []jobset.ReplicatedJob - replicatedJobStatus []jobset.ReplicatedJobStatus - jobs []batchv1.Job - expectedJobsToDelete int32 - gotError error + name string + replicatedJobs []jobset.ReplicatedJob + replicatedJobStatus []jobset.ReplicatedJobStatus + jobs []batchv1.Job + expectedJobsThatWereDeleted []batchv1.Job + gotError error }{ { name: "no elastic downscale", @@ -118,7 +118,16 @@ func TestJobsToDeleteDownScale(t *testing.T) { }, }, }, - expectedJobsToDelete: 1, + expectedJobsThatWereDeleted: []batchv1.Job{ + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test", + jobset.JobIndexKey: "1", + }, + }, + }, + }, }, { name: "elastic downscale is needed for second replicated job", @@ -161,6 +170,22 @@ func TestJobsToDeleteDownScale(t *testing.T) { }, }, }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test-2", + jobset.JobIndexKey: "2", + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + jobset.ReplicatedJobNameKey: "test-2", + jobset.JobIndexKey: "3", + }, + }, + }, { ObjectMeta: v1.ObjectMeta{ Labels: map[string]string{ @@ -177,11 +202,13 @@ func TestJobsToDeleteDownScale(t *testing.T) { }, }, }, + }, + expectedJobsThatWereDeleted: []batchv1.Job{ { ObjectMeta: v1.ObjectMeta{ Labels: map[string]string{ jobset.ReplicatedJobNameKey: "test-2", - jobset.JobIndexKey: "2", + jobset.JobIndexKey: "3", }, }, }, @@ -189,23 +216,36 @@ func TestJobsToDeleteDownScale(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Labels: map[string]string{ jobset.ReplicatedJobNameKey: "test-2", - jobset.JobIndexKey: "3", + jobset.JobIndexKey: "2", }, }, }, }, - expectedJobsToDelete: 2, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - actual, err := jobsToDeleteDownScale(tc.replicatedJobs, tc.replicatedJobStatus, tc.jobs) + actual, err := jobsToDeleteForDownScale(tc.replicatedJobs, tc.replicatedJobStatus, tc.jobs) if diff := cmp.Diff(tc.gotError, err); diff != "" { t.Errorf("unexpected finished value (+got/-want): %s", diff) } - if diff := cmp.Diff(tc.expectedJobsToDelete, int32(len(actual))); diff != "" { - t.Errorf("unexpected finished value (+got/-want): %s", diff) + if len(actual) != len(tc.expectedJobsThatWereDeleted) { + t.Errorf("unexpected length mismatch for deleted jobs: got: %d want: %d", len(actual), len(tc.expectedJobsThatWereDeleted)) + } + if tc.expectedJobsThatWereDeleted != nil { + for i := range actual { + actualReplicatedJobName := actual[i].ObjectMeta.Labels[jobset.ReplicatedJobNameKey] + actualJobIndexKey := actual[i].ObjectMeta.Labels[jobset.JobIndexKey] + expectedReplicatedJobName := tc.expectedJobsThatWereDeleted[i].ObjectMeta.Labels[jobset.ReplicatedJobNameKey] + expectedJobIndexKey := tc.expectedJobsThatWereDeleted[i].ObjectMeta.Labels[jobset.JobIndexKey] + if diff := cmp.Diff(actualReplicatedJobName, expectedReplicatedJobName); diff != "" { + t.Errorf("unexpected replicated job name (+got/-want): %s", diff) + } + if diff := cmp.Diff(actualJobIndexKey, expectedJobIndexKey); diff != "" { + t.Errorf("unexpected job index (+got/-want): %s", diff) + } + } } }) } diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index b682c54d..21d3d840 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -536,7 +536,7 @@ func (r *JobSetReconciler) downscaleElasticJobs(ctx context.Context, js *jobset. return err } - jobsToDelete, err := jobsToDeleteDownScale(js.Spec.ReplicatedJobs, replicatedJobStatus, childJobList.Items) + jobsToDelete, err := jobsToDeleteForDownScale(js.Spec.ReplicatedJobs, replicatedJobStatus, childJobList.Items) if err != nil { return err }