Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

state store: fix logic for evaluating job status #24974

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
25 changes: 15 additions & 10 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,28 +117,28 @@ func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) {
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

job := mock.Job()

// Insert "dead" eval
store := s1.fsm.State()
eval := mock.Eval()
eval.JobModifyIndex = job.ModifyIndex
eval.CreateTime = time.Now().UTC().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds
eval.ModifyTime = time.Now().UTC().Add(-5 * time.Hour).UnixNano()
eval.Status = structs.EvalStatusFailed
must.NoError(t, store.UpsertJobSummary(999, mock.JobSummary(eval.JobID)))
must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval}))
must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval}))

// Insert mock job with default reschedule policy of 2 in 10 minutes
job.ID = eval.JobID
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1002, nil, job))

// Insert "pending" eval for same job
eval2 := mock.Eval()
eval2.JobID = eval.JobID
eval2.JobModifyIndex = job.ModifyIndex // must have same modify index as job in order to set job status correctly
eval2.CreateTime = time.Now().UTC().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds
eval2.ModifyTime = time.Now().UTC().Add(-5 * time.Hour).UnixNano()
must.NoError(t, store.UpsertJobSummary(999, mock.JobSummary(eval2.JobID)))
must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2}))

// Insert mock job with default reschedule policy of 2 in 10 minutes
job := mock.Job()
job.ID = eval.JobID

must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job))
must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval2}))

// Insert failed alloc with an old reschedule attempt, can be GCed
alloc := mock.Alloc()
Expand Down Expand Up @@ -1049,12 +1049,14 @@ func TestCoreScheduler_JobGC_OutstandingEvals(t *testing.T) {
// Insert two evals, one terminal and one not
eval := mock.Eval()
eval.JobID = job.ID
eval.JobModifyIndex = job.ModifyIndex
eval.Status = structs.EvalStatusComplete
eval.CreateTime = time.Now().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds
eval.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano()

eval2 := mock.Eval()
eval2.JobID = job.ID
eval2.JobModifyIndex = job.ModifyIndex
eval2.Status = structs.EvalStatusPending
eval2.CreateTime = time.Now().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds
eval2.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano()
Expand Down Expand Up @@ -1145,13 +1147,15 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) {
// Insert two allocs, one terminal and one not
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.Job = job
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusComplete
alloc.TaskGroup = job.TaskGroups[0].Name

alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc.Job = job
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusRunning
Expand Down Expand Up @@ -1484,6 +1488,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
// Insert a terminal eval
eval := mock.Eval()
eval.JobID = job.ID
eval.JobModifyIndex = job.ModifyIndex
eval.Status = structs.EvalStatusComplete
eval.CreateTime = time.Now().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds
eval.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano()
Expand Down
15 changes: 8 additions & 7 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,20 +948,21 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt
}
}

// Update all the client allocations
if err := n.state.UpdateAllocsFromClient(msgType, index, req.Alloc); err != nil {
n.logger.Error("UpdateAllocFromClient failed", "error", err)
return err
}

// Update any evals
// Update any evals first. During a reschedule, we don't want to mark the job dead, which
mismithhisler marked this conversation as resolved.
Show resolved Hide resolved
// would happen if all the allocs were terminal and there wasn't a pending eval
if len(req.Evals) > 0 {
if err := n.upsertEvals(msgType, index, req.Evals); err != nil {
n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err)
return err
}
}

// Update all the client allocations
if err := n.state.UpdateAllocsFromClient(msgType, index, req.Alloc); err != nil {
n.logger.Error("UpdateAllocFromClient failed", "error", err)
return err
}

// Unblock evals for the nodes computed node class if the client has
// finished running an allocation.
for _, alloc := range req.Alloc {
Expand Down
6 changes: 5 additions & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return nil
}

if eval != nil && !submittedEval {
if !submittedEval {
eval.JobModifyIndex = reply.JobModifyIndex
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
Expand Down Expand Up @@ -659,6 +659,10 @@ func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterR
// Clear out the VersionTag to prevent tag duplication
revJob.VersionTag = nil

// Set the stable flag to false as this is functionally a new registration
// and should handle deployment updates
revJob.Stable = false

reg := &structs.JobRegisterRequest{
Job: revJob,
WriteRequest: args.WriteRequest,
Expand Down
91 changes: 71 additions & 20 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,7 @@ func (s *StateStore) upsertJobImpl(index uint64, sub *structs.JobSubmission, job
// Compute the job status
var err error
job.Status, err = s.getJobStatus(txn, job, false)
s.logger.Info("setting job status", "status", job.Status)
if err != nil {
return fmt.Errorf("setting job status for %q failed: %v", job.ID, err)
}
Expand Down Expand Up @@ -3716,6 +3717,9 @@ func (s *StateStore) DeleteEval(index uint64, evals, allocs []string, userInitia
}
}

// TODO: should we really be doing this here? The only time this will affect the
// status of a job is if it's the last eval and alloc for a client, at which point
// the status of the job will already be "dead" from handling the alloc update.
// Set the job's status
if err := s.setJobStatuses(index, txn, jobs, true); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
Expand Down Expand Up @@ -3851,11 +3855,6 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*

e := raw.(*structs.Evaluation)

// Filter non-exact matches
if e.JobID != jobID {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary anymore because periodic jobs do not get an evaluation.

continue
}

out = append(out, e)
}
return out, nil
Expand Down Expand Up @@ -4824,14 +4823,13 @@ func (s *StateStore) UpdateDeploymentStatus(msgType structs.MessageType, index u
return err
}

// Upsert the job if necessary
// On failed deployments with auto_revert set to true, a new eval and job will be included on the request.
// We should upsert them both
if req.Job != nil {
if err := s.upsertJobImpl(index, nil, req.Job, false, txn); err != nil {
return err
}
}

// Upsert the optional eval
if req.Eval != nil {
if err := s.nestedUpsertEval(txn, index, req.Eval); err != nil {
return err
Expand Down Expand Up @@ -5532,6 +5530,12 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn,
if err := s.setJobSummary(txn, updated, index, oldStatus, newStatus); err != nil {
return fmt.Errorf("job summary update failed %w", err)
}

// Update the job version details
if err := s.upsertJobVersion(index, updated, txn); err != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels odd to call here, but it's necessary to also update a job's version status anytime we are updating the job summary, or they can get out of sync.

return err
}

return nil
}

Expand Down Expand Up @@ -5604,7 +5608,7 @@ func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) (
if job.Type == structs.JobTypeSystem ||
job.IsParameterized() ||
job.IsPeriodic() {
if job.Stop {
if job.Stopped() {
return structs.JobStatusDead, nil
}
return structs.JobStatusRunning, nil
Expand All @@ -5615,41 +5619,64 @@ func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) (
return "", err
}

// If there is a non-terminal allocation, the job is running.
hasAlloc := false
terminalAllocs := false
for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() {
hasAlloc = true
if !alloc.(*structs.Allocation).TerminalStatus() {
a := alloc.(*structs.Allocation)

// If there is a non-terminal allocation, the job is running.
if !a.TerminalStatus() {
return structs.JobStatusRunning, nil
}

// Check if the allocs are reschedulable before before
// marking the job dead. If any of the allocs are terminal
// and not reschedulable, mark the job dead.
if !isReschedulable(a) {
terminalAllocs = true
}
}

// The job is dead if it is stopped and there are no allocs
// or all allocs are terminal
if job.Stopped() {
return structs.JobStatusDead, nil
}

evals, err := txn.Get("evals", "job_prefix", job.Namespace, job.ID)
if err != nil {
return "", err
}

hasEval := false
terminalEvals := false
for raw := evals.Next(); raw != nil; raw = evals.Next() {
e := raw.(*structs.Evaluation)

// Filter non-exact matches
if e.JobID != job.ID {
// This handles restarting stopped jobs, or else they are marked dead.
// We need to be careful with this, because an eval can technically
// still apply to a job with a greater modify index, i.e. during reschedule
// but we handle reschedule above.
if e.JobModifyIndex < job.ModifyIndex {
continue
}

hasEval = true
if !e.TerminalStatus() {
return structs.JobStatusPending, nil
}

terminalEvals = true
}

// The job is dead if all the allocations and evals are terminal or if there
// are no evals because of garbage collection.
if evalDelete || hasEval || hasAlloc {
// The job is dead if all allocations for this version are terminal,
// all evals are terminal.
// Also, in the event a jobs allocs and evals are all GC'd, we don't
// want the job to be marked pending.
if terminalAllocs || terminalEvals || evalDelete {
return structs.JobStatusDead, nil
}

// There are no allocs/evals yet, which can happen for new job submissions,
// running new versions of a job, or reverting. This will happen if
// the evaluation is persisted after the job is persisted.
return structs.JobStatusPending, nil
}

Expand Down Expand Up @@ -7348,6 +7375,30 @@ func (s *StateStore) ScalingPoliciesByIDPrefix(ws memdb.WatchSet, namespace stri
return iter, nil
}

func isReschedulable(a *structs.Allocation) bool {
if a.ReschedulePolicy() == nil || a.Job.Type != structs.JobTypeService {
return false
}

reschedulePolicy := a.ReschedulePolicy()
availableAttempts := reschedulePolicy.Attempts
interval := reschedulePolicy.Interval
attempted := 0
currTime := time.Now()

// Loop over reschedule tracker to find attempts within the restart policy's interval
if a.RescheduleTracker != nil && availableAttempts > 0 && interval > 0 {
for j := len(a.RescheduleTracker.Events) - 1; j >= 0; j-- {
lastAttempt := a.RescheduleTracker.Events[j].RescheduleTime
timeDiff := currTime.UTC().UnixNano() - lastAttempt
if timeDiff < interval.Nanoseconds() {
attempted += 1
}
}
}
return attempted < availableAttempts
}

// scalingPolicyNamespaceFilter returns a filter function that filters all
// scaling policies not targeting the given namespace.
func scalingPolicyNamespaceFilter(namespace string) func(interface{}) bool {
Expand Down
Loading