Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: preserve allocations enriched during placement as 'informational' #24960

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,10 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
untainted, migrate, lost, disconnecting, reconnecting, ignore, expiring := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
desiredChanges.Ignore += uint64(len(ignore))

// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
// Determine what set of terminal allocations need to be rescheduled and
// see that we don't discard allocations that carry important information
// for future reschedules or deployments
untainted, rescheduleNow, rescheduleLater, informational := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)

// If there are allocations reconnecting we need to reconcile them and
// their replacements first because there is specific logic when deciding
Expand Down Expand Up @@ -509,7 +511,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// the reschedule policy won't be enabled and the lost allocations
// wont be rescheduled, and PreventRescheduleOnLost is ignored.
if tg.GetDisconnectLostTimeout() != 0 {
untaintedDisconnecting, rescheduleDisconnecting, laterDisconnecting := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)
untaintedDisconnecting, rescheduleDisconnecting, laterDisconnecting, _ := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)

rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
untainted = untainted.union(untaintedDisconnecting)
Expand Down Expand Up @@ -591,7 +593,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// * An alloc was lost
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying, informational)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
Expand Down Expand Up @@ -798,7 +800,7 @@ func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, un
// Placements will meet or exceed group count.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet,
isCanarying bool) []allocPlaceResult {
isCanarying bool, informational []*structs.Allocation) []allocPlaceResult {

// Add rescheduled placement results
var place []allocPlaceResult
Expand Down Expand Up @@ -843,9 +845,18 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
// Add remaining placement results
if existing < group.Count {
for _, name := range nameIndex.Next(uint(group.Count - existing)) {

// if there are any informational allocs, pop and add them as previousAlloc to
// our new placement
var a *structs.Allocation
if len(informational) > 0 {
a, informational = informational[len(informational)-1], informational[:len(informational)-1]
}

place = append(place, allocPlaceResult{
name: name,
taskGroup: group,
previousAlloc: a,
downgradeNonCanary: isCanarying,
})
}
Expand Down
160 changes: 160 additions & 0 deletions scheduler/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,9 @@ func TestReconciler_Place_Existing(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
Comment on lines +391 to +393
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are a great quick-and-dirty addition for avoiding regressions 👍

allocs = append(allocs, alloc)
}

Expand Down Expand Up @@ -429,6 +432,9 @@ func TestReconciler_ScaleDown_Partial(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
allocs = append(allocs, alloc)
}

Expand Down Expand Up @@ -471,6 +477,9 @@ func TestReconciler_ScaleDown_Zero(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
allocs = append(allocs, alloc)
}

Expand Down Expand Up @@ -514,6 +523,9 @@ func TestReconciler_ScaleDown_Zero_DuplicateNames(t *testing.T) {
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i%2))
allocs = append(allocs, alloc)
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
expectedStopped = append(expectedStopped, i%2)
}

Expand Down Expand Up @@ -552,6 +564,9 @@ func TestReconciler_Inplace(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
allocs = append(allocs, alloc)
}

Expand Down Expand Up @@ -593,6 +608,9 @@ func TestReconciler_Inplace_ScaleUp(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
allocs = append(allocs, alloc)
}

Expand Down Expand Up @@ -636,6 +654,9 @@ func TestReconciler_Inplace_ScaleDown(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
allocs = append(allocs, alloc)
}

Expand Down Expand Up @@ -686,6 +707,9 @@ func TestReconciler_Inplace_Rollback(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
allocs = append(allocs, alloc)
}
// allocs[0] is an allocation from version 0
Expand Down Expand Up @@ -746,6 +770,9 @@ func TestReconciler_Destructive(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
allocs = append(allocs, alloc)
}

Expand Down Expand Up @@ -782,6 +809,9 @@ func TestReconciler_DestructiveMaxParallel(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
allocs = append(allocs, alloc)
}

Expand Down Expand Up @@ -821,6 +851,9 @@ func TestReconciler_Destructive_ScaleUp(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
allocs = append(allocs, alloc)
}

Expand Down Expand Up @@ -863,6 +896,9 @@ func TestReconciler_Destructive_ScaleDown(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}
allocs = append(allocs, alloc)
}

Expand Down Expand Up @@ -1018,6 +1054,10 @@ func TestReconciler_LostNode_PreventRescheduleOnLost(t *testing.T) {
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
alloc.DesiredStatus = structs.AllocDesiredStatusRun

// set host volume IDs on running allocations to make sure their presence doesn't
// interfere with reconciler behavior
alloc.HostVolumeIDs = []string{"host-volume1", "host-volume2"}

// Set one of the allocations to failed
if i == 4 {
alloc.ClientStatus = structs.AllocClientStatusFailed
Expand Down Expand Up @@ -1063,6 +1103,126 @@ func TestReconciler_LostNode_PreventRescheduleOnLost(t *testing.T) {
}
}

func TestReconciler_InformationalAllocs(t *testing.T) {
disabledReschedulePolicy := &structs.ReschedulePolicy{
Attempts: 0,
Unlimited: false,
}

ci.Parallel(t)
now := time.Now()

testCases := []struct {
name string
count int
stoppedCount int
failedCount int
reschedulePolicy *structs.ReschedulePolicy
expectPlace int
expectStop int
expectIgnore int
}{
{
name: "Count 3, 2 allocs failed, 1 stopped, no reschedule",
count: 3,
stoppedCount: 1,
failedCount: 2,
reschedulePolicy: disabledReschedulePolicy,
expectPlace: 2,
expectStop: 1,
expectIgnore: 1,
},
Comment on lines +1125 to +1134
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case appears to cover your comment @tgross, does it not? The desired behavior, if I'm correct, should be 2 placed allocs, 1 stopped and 1 ignored in this case, which is exactly what we're getting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If rescheduling is disabled, can't we only replace the stopped allocation? Where does the other placement come from?

Oh, I see... this test is actually quite complicated, as the first failed alloc is on the down node and the second failed alloc is on the disconnected node. So the 2nd failed alloc is resulting in a replacement for the disconnect? We should probably leave a comment on the expectPlace where those come from to help future readers.

{
name: "Count 1, 1 alloc failed, 1 stopped, reschedule",
count: 1,
stoppedCount: 1,
failedCount: 1,
reschedulePolicy: &structs.ReschedulePolicy{
Attempts: 1,
},
expectPlace: 1,
expectStop: 2,
expectIgnore: 0,
},
{
name: "Count 2, no allocs failed, 2 stopped, no reschedule",
count: 2,
stoppedCount: 2,
failedCount: 0,
reschedulePolicy: disabledReschedulePolicy,
expectPlace: 2,
expectStop: 1,
expectIgnore: 0,
Comment on lines +1153 to +1155
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a 100% confident about these. Intuitively, I would expect 2 allocs to place, 0 to ignore and 0 to stop. This might have to do with what nodes are available in this case, I will look into this and do some additional manual testing to be sure that we're setting alloc desired status and client status correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed, this one looks funny.

Like the one above, I'd annotate the expectations here because it's not intuitive. You've got no failed allocs, so one stopped alloc is sitting on a down node, and the other stopped alloc is disconnected. So I'd expect 1 placement for the down node, and 1 temporary replacement for the disconnected alloc. Where's the stop come from? Are we calling stop for an allocation that's already been stopped?

},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Count = tc.count
job.TaskGroups[0].ReschedulePolicy = tc.reschedulePolicy

var allocs []*structs.Allocation
for i := 0; i < tc.failedCount; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
alloc.HostVolumeIDs = []string{"foo"}
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusFailed

allocs = append(allocs, alloc)
}

for i := 0; i < tc.stoppedCount; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
alloc.HostVolumeIDs = []string{"foo"}
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.ClientStatus = structs.AllocClientStatusComplete

allocs = append(allocs, alloc)
}

// Build a map of tainted nodes, one down one disconnected
tainted := make(map[string]*structs.Node, 2)
downNode := mock.Node()
downNode.ID = allocs[0].NodeID
downNode.Status = structs.NodeStatusDown
tainted[downNode.ID] = downNode

disconnected := mock.Node()
disconnected.ID = allocs[1].NodeID
disconnected.Status = structs.NodeStatusDisconnected
tainted[disconnected.ID] = disconnected

reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, allocs, tainted, "", 50, true, AllocRenconcilerWithNow(now))
r := reconciler.Compute()

// Assert the correct results
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: tc.expectPlace,
stop: tc.expectStop,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: uint64(tc.expectPlace),
Stop: uint64(tc.expectStop),
Ignore: uint64(tc.expectIgnore),
},
},
})
})
}
}

// Tests the reconciler properly handles lost nodes with allocations
func TestReconciler_LostNode(t *testing.T) {
ci.Parallel(t)
Expand Down
33 changes: 25 additions & 8 deletions scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (a allocSet) fromKeys(keys ...[]string) allocSet {
return from
}

// filterByTainted takes a set of tainted nodes and filters the allocation set
// into the following groups:
// filterByTainted takes a set of tainted nodes and partitions the allocation
// set into the following groups:
// 1. Those that exist on untainted nodes
// 2. Those exist on nodes that are draining
// 3. Those that exist on lost nodes or have expired
Expand Down Expand Up @@ -385,14 +385,25 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
return
}

// filterByRescheduleable filters the allocation set to return the set of allocations that are either
// untainted or a set of allocations that must be rescheduled now. Allocations that can be rescheduled
// at a future time are also returned so that we can create follow up evaluations for them. Allocs are
// skipped or considered untainted according to logic defined in shouldFilter method.
func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time.Time, evalID string, deployment *structs.Deployment) (allocSet, allocSet, []*delayedRescheduleInfo) {
// filterByRescheduleable filters the allocation set to return the set of
// allocations that are either untainted or a set of allocations that must be
// rescheduled now. Allocations that can be rescheduled at a future time are
// also returned so that we can create follow up evaluations for them. Allocs
// are skipped or considered untainted according to logic defined in
// shouldFilter method.
//
// filterByRescheduleable returns an extra slice of allocations as its last
// output: these allocs are "informational." They will not be rescheduled now
// or later, but they carry important information for future allocations that
// might get rescheduled. An example of such allocations are stateful
// deployments: allocs that require particular host volume IDs.
func (a allocSet) filterByRescheduleable(
isBatch, isDisconnecting bool, now time.Time, evalID string, deployment *structs.Deployment,
) (allocSet, allocSet, []*delayedRescheduleInfo, []*structs.Allocation) {
untainted := make(map[string]*structs.Allocation)
rescheduleNow := make(map[string]*structs.Allocation)
rescheduleLater := []*delayedRescheduleInfo{}
informational := []*structs.Allocation{}

for _, alloc := range a {
// Ignore disconnecting allocs that are already unknown. This can happen
Expand All @@ -411,6 +422,12 @@ func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time
continue
}

// Allocations with host volume IDs can be ignored, but we must keep
// the information they carry for future migrated allocs
if len(alloc.HostVolumeIDs) > 0 {
informational = append(informational, alloc)
}
tgross marked this conversation as resolved.
Show resolved Hide resolved

isUntainted, ignore := shouldFilter(alloc, isBatch)
if isUntainted && !isDisconnecting {
untainted[alloc.ID] = alloc
Expand All @@ -436,7 +453,7 @@ func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time
}

}
return untainted, rescheduleNow, rescheduleLater
return untainted, rescheduleNow, rescheduleLater, informational
}

// shouldFilter returns whether the alloc should be ignored or considered untainted.
Expand Down
Loading