From f06f9d9c83b7c4d0a51ac9f239c0328bfbd4c4de Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 29 Jan 2025 09:51:56 -0500 Subject: [PATCH 01/12] state store: fix logic for evaluating job status --- nomad/job_endpoint.go | 4 + nomad/state/state_store.go | 54 +++--- nomad/state/state_store_test.go | 298 +++++++++----------------------- 3 files changed, 112 insertions(+), 244 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 7267f0a15b6..bc15edc1580 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -659,6 +659,10 @@ func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterR // Clear out the VersionTag to prevent tag duplication revJob.VersionTag = nil + // Set the stable flag to false as this is functionally a new registration + // and should handle deployment updates + revJob.Stable = false + reg := &structs.JobRegisterRequest{ Job: revJob, WriteRequest: args.WriteRequest, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index f5b690291f3..87f905d7c11 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3716,6 +3716,7 @@ func (s *StateStore) DeleteEval(index uint64, evals, allocs []string, userInitia } } + // TODO: should we really be doing this here? We don't do it for filtered evals. // Set the job's status if err := s.setJobStatuses(index, txn, jobs, true); err != nil { return fmt.Errorf("setting job status failed: %v", err) @@ -4824,14 +4825,13 @@ func (s *StateStore) UpdateDeploymentStatus(msgType structs.MessageType, index u return err } - // Upsert the job if necessary + // On failed deployments with auto_revert set to true, a new eval and job will be included on the request. + // We should upsert them both if req.Job != nil { if err := s.upsertJobImpl(index, nil, req.Job, false, txn); err != nil { return err } } - - // Upsert the optional eval if req.Eval != nil { if err := s.nestedUpsertEval(txn, index, req.Eval); err != nil { return err @@ -5532,6 +5532,12 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn, if err := s.setJobSummary(txn, updated, index, oldStatus, newStatus); err != nil { return fmt.Errorf("job summary update failed %w", err) } + + // Update the job version details + if err := s.upsertJobVersion(index, updated, txn); err != nil { + return err + } + return nil } @@ -5599,14 +5605,16 @@ func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64, } func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) (string, error) { + // If the job has been stopped, it's status is dead + if job.Stopped() { + return structs.JobStatusDead, nil + } + // System, Periodic and Parameterized jobs are running until explicitly // stopped. if job.Type == structs.JobTypeSystem || job.IsParameterized() || job.IsPeriodic() { - if job.Stop { - return structs.JobStatusDead, nil - } return structs.JobStatusRunning, nil } @@ -5616,40 +5624,28 @@ func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) ( } // If there is a non-terminal allocation, the job is running. - hasAlloc := false + terminalAllocs := false for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() { - hasAlloc = true - if !alloc.(*structs.Allocation).TerminalStatus() { - return structs.JobStatusRunning, nil - } - } - - evals, err := txn.Get("evals", "job_prefix", job.Namespace, job.ID) - if err != nil { - return "", err - } + a := alloc.(*structs.Allocation) - hasEval := false - for raw := evals.Next(); raw != nil; raw = evals.Next() { - e := raw.(*structs.Evaluation) - - // Filter non-exact matches - if e.JobID != job.ID { + if a.Job.Version < job.Version { continue } - hasEval = true - if !e.TerminalStatus() { - return structs.JobStatusPending, nil + if !a.TerminalStatus() { + return structs.JobStatusRunning, nil } + + terminalAllocs = true } - // The job is dead if all the allocations and evals are terminal or if there - // are no evals because of garbage collection. - if evalDelete || hasEval || hasAlloc { + // The job is dead if all allocations for this version are terminal. + if terminalAllocs { return structs.JobStatusDead, nil } + // There are no allocs yet, which will happen for new job submissions, + // running new versions of a job, or reverting. return structs.JobStatusPending, nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index c0efa31112f..da876241d86 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7678,232 +7678,100 @@ func TestStateStore_SetJobStatus(t *testing.T) { } } -func TestStateStore_GetJobStatus_NoEvalsOrAllocs(t *testing.T) { +func TestStateStore_GetJobStatus_new(t *testing.T) { ci.Parallel(t) - job := mock.Job() - state := testStateStore(t) - txn := state.db.ReadTxn() - status, err := state.getJobStatus(txn, job, false) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } - - if status != structs.JobStatusPending { - t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusPending) - } -} - -func TestStateStore_GetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) { - ci.Parallel(t) - - job := mock.PeriodicJob() - state := testStateStore(t) - txn := state.db.ReadTxn() - status, err := state.getJobStatus(txn, job, false) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } - - if status != structs.JobStatusRunning { - t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) - } -} - -func TestStateStore_GetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) { - ci.Parallel(t) - - job := mock.Job() - state := testStateStore(t) - txn := state.db.ReadTxn() - status, err := state.getJobStatus(txn, job, true) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } - - if status != structs.JobStatusDead { - t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) - } -} - -func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) { - ci.Parallel(t) - - state := testStateStore(t) - job := mock.Job() - - // Create a mock alloc that is dead. - alloc := mock.Alloc() - alloc.JobID = job.ID - alloc.DesiredStatus = structs.AllocDesiredStatusStop - state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) - if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}); err != nil { - t.Fatalf("err: %v", err) - } - - // Create a mock eval that is complete - eval := mock.Eval() - eval.JobID = job.ID - eval.Status = structs.EvalStatusComplete - if err := state.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval}); err != nil { - t.Fatalf("err: %v", err) - } - - txn := state.db.ReadTxn() - status, err := state.getJobStatus(txn, job, false) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } - - if status != structs.JobStatusDead { - t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) - } -} - -func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) { - ci.Parallel(t) - - state := testStateStore(t) - job := mock.Job() - - // Create a mock alloc that is running. - alloc := mock.Alloc() - alloc.JobID = job.ID - alloc.DesiredStatus = structs.AllocDesiredStatusRun - state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) - if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}); err != nil { - t.Fatalf("err: %v", err) - } - - txn := state.db.ReadTxn() - status, err := state.getJobStatus(txn, job, true) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } - - if status != structs.JobStatusRunning { - t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) - } -} - -func TestStateStore_GetJobStatus_PeriodicJob(t *testing.T) { - ci.Parallel(t) - - state := testStateStore(t) - job := mock.PeriodicJob() - - txn := state.db.ReadTxn() - status, err := state.getJobStatus(txn, job, false) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } - - if status != structs.JobStatusRunning { - t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) - } - - // Mark it as stopped - job.Stop = true - status, err = state.getJobStatus(txn, job, false) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } - - if status != structs.JobStatusDead { - t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) - } -} - -func TestStateStore_GetJobStatus_ParameterizedJob(t *testing.T) { - ci.Parallel(t) - - state := testStateStore(t) - job := mock.Job() - job.ParameterizedJob = &structs.ParameterizedJobConfig{} - - txn := state.db.ReadTxn() - status, err := state.getJobStatus(txn, job, false) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } - - if status != structs.JobStatusRunning { - t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) - } - - // Mark it as stopped - job.Stop = true - status, err = state.getJobStatus(txn, job, false) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } - - if status != structs.JobStatusDead { - t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) - } -} - -func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { - ci.Parallel(t) - - state := testStateStore(t) - job := mock.Job() - - // Create a mock eval that is pending. - eval := mock.Eval() - eval.JobID = job.ID - eval.Status = structs.EvalStatusPending - if err := state.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval}); err != nil { - t.Fatalf("err: %v", err) - } - - txn := state.db.ReadTxn() - status, err := state.getJobStatus(txn, job, true) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) + testCases := []struct { + name string + hasAlloc bool + allocSetup func(*structs.Allocation) + jobSetup func(*structs.Job) + exp string + }{ + { + name: "stopped job", + hasAlloc: false, + jobSetup: func(j *structs.Job) { + j.Stop = true + }, + exp: structs.JobStatusDead, + }, + { + name: "parameterized job", + hasAlloc: false, + jobSetup: func(j *structs.Job) { + j.ParameterizedJob = &structs.ParameterizedJobConfig{} + j.Dispatched = false + }, + exp: structs.JobStatusRunning, + }, + { + name: "periodic job", + hasAlloc: false, + jobSetup: func(j *structs.Job) { + j.Periodic = &structs.PeriodicConfig{} + }, + exp: structs.JobStatusRunning, + }, + { + name: "no allocs", + hasAlloc: false, + jobSetup: func(j *structs.Job) {}, + exp: structs.JobStatusPending, + }, + { + name: "current job has running alloc", + hasAlloc: true, + jobSetup: func(j *structs.Job) {}, + exp: structs.JobStatusRunning, + }, + { + name: "current job with all terminal allocs", + hasAlloc: true, + allocSetup: func(a *structs.Allocation) { + a.ClientStatus = structs.AllocClientStatusComplete + }, + jobSetup: func(j *structs.Job) {}, + exp: structs.JobStatusDead, + }, + { + name: "previous job version had allocs", + hasAlloc: true, + jobSetup: func(j *structs.Job) { + j.Version += 1 + }, + exp: structs.JobStatusPending, + }, } - if status != structs.JobStatusPending { - t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusPending) - } -} + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + state := testStateStore(t) -// TestStateStore_SetJobStatus_SystemJob asserts that system jobs are still -// considered running until explicitly stopped. -func TestStateStore_SetJobStatus_SystemJob(t *testing.T) { - ci.Parallel(t) + txn := state.db.WriteTxn(0) - state := testStateStore(t) - job := mock.SystemJob() + var job structs.Job + if tc.hasAlloc { + a := mock.Alloc() - // Create a mock eval that is pending. - eval := mock.Eval() - eval.JobID = job.ID - eval.Type = job.Type - eval.Status = structs.EvalStatusComplete - if err := state.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval}); err != nil { - t.Fatalf("err: %v", err) - } + if tc.allocSetup != nil { + tc.allocSetup(a) + } - txn := state.db.ReadTxn() - status, err := state.getJobStatus(txn, job, true) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } + err := txn.Insert("allocs", a) + require.NoError(t, err) - if expected := structs.JobStatusRunning; status != expected { - t.Fatalf("getJobStatus() returned %v; expected %v", status, expected) - } + job = *a.Job + } else { + job = *structs.MockJob() + } - // Stop the job - job.Stop = true - status, err = state.getJobStatus(txn, job, true) - if err != nil { - t.Fatalf("getJobStatus() failed: %v", err) - } + tc.jobSetup(&job) - if expected := structs.JobStatusDead; status != expected { - t.Fatalf("getJobStatus() returned %v; expected %v", status, expected) + status, err := state.getJobStatus(txn, &job, false) + require.NoError(t, err) + require.Equal(t, tc.exp, status) + }) } } From 03a0b4eeb274070a56c276bebcc8724a0be9a10e Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 29 Jan 2025 10:27:41 -0500 Subject: [PATCH 02/12] fix test name --- nomad/state/state_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index da876241d86..dea9444dd02 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7678,7 +7678,7 @@ func TestStateStore_SetJobStatus(t *testing.T) { } } -func TestStateStore_GetJobStatus_new(t *testing.T) { +func TestStateStore_GetJobStatus(t *testing.T) { ci.Parallel(t) testCases := []struct { From dcf51cc9bacef0c454a0dd093a3f4068fd8e2d21 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 29 Jan 2025 14:51:57 -0500 Subject: [PATCH 03/12] add back evals to differentiate rescheduling jobs --- nomad/fsm.go | 15 ++++++++------- nomad/state/state_store.go | 21 +++++++++++++++------ 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 3cc4bf101b4..d804839b0f6 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -948,13 +948,8 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt } } - // Update all the client allocations - if err := n.state.UpdateAllocsFromClient(msgType, index, req.Alloc); err != nil { - n.logger.Error("UpdateAllocFromClient failed", "error", err) - return err - } - - // Update any evals + // Update any evals first. During a reschedule, we don't want to mark the job dead, which + // would happen if all the allocs were terminal and there wasn't a pending eval if len(req.Evals) > 0 { if err := n.upsertEvals(msgType, index, req.Evals); err != nil { n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err) @@ -962,6 +957,12 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt } } + // Update all the client allocations + if err := n.state.UpdateAllocsFromClient(msgType, index, req.Alloc); err != nil { + n.logger.Error("UpdateAllocFromClient failed", "error", err) + return err + } + // Unblock evals for the nodes computed node class if the client has // finished running an allocation. for _, alloc := range req.Alloc { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 87f905d7c11..f7c37c241ad 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3852,11 +3852,6 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]* e := raw.(*structs.Evaluation) - // Filter non-exact matches - if e.JobID != jobID { - continue - } - out = append(out, e) } return out, nil @@ -5639,13 +5634,27 @@ func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) ( terminalAllocs = true } + evals, err := txn.Get("evals", "job_prefix", job.Namespace, job.ID) + if err != nil { + return "", err + } + + for raw := evals.Next(); raw != nil; raw = evals.Next() { + e := raw.(*structs.Evaluation) + + if !e.TerminalStatus() { + return structs.JobStatusPending, nil + } + } + // The job is dead if all allocations for this version are terminal. if terminalAllocs { return structs.JobStatusDead, nil } // There are no allocs yet, which will happen for new job submissions, - // running new versions of a job, or reverting. + // running new versions of a job, or reverting. This will happen if + // the evaluation is persisted after the job is persisted return structs.JobStatusPending, nil } From 8ea2135b3ba46bed876fc7bc9471c1d837aaff53 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 29 Jan 2025 16:10:08 -0500 Subject: [PATCH 04/12] refactor tests, add eval test case --- nomad/state/state_store_test.go | 138 ++++++++++++++++++++------------ 1 file changed, 88 insertions(+), 50 deletions(-) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index dea9444dd02..1b32599ea6a 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7682,63 +7682,116 @@ func TestStateStore_GetJobStatus(t *testing.T) { ci.Parallel(t) testCases := []struct { - name string - hasAlloc bool - allocSetup func(*structs.Allocation) - jobSetup func(*structs.Job) - exp string + name string + setup func(*txn) (*structs.Job, error) + exp string }{ { - name: "stopped job", - hasAlloc: false, - jobSetup: func(j *structs.Job) { + name: "stopped job", + setup: func(txn *txn) (*structs.Job, error) { + j := mock.Job() j.Stop = true + return j, nil }, exp: structs.JobStatusDead, }, { - name: "parameterized job", - hasAlloc: false, - jobSetup: func(j *structs.Job) { + name: "parameterized job", + setup: func(txn *txn) (*structs.Job, error) { + j := mock.Job() j.ParameterizedJob = &structs.ParameterizedJobConfig{} j.Dispatched = false + return j, nil }, exp: structs.JobStatusRunning, }, { - name: "periodic job", - hasAlloc: false, - jobSetup: func(j *structs.Job) { + name: "periodic job", + setup: func(txn *txn) (*structs.Job, error) { + j := mock.Job() j.Periodic = &structs.PeriodicConfig{} + return j, nil }, exp: structs.JobStatusRunning, }, { - name: "no allocs", - hasAlloc: false, - jobSetup: func(j *structs.Job) {}, - exp: structs.JobStatusPending, + name: "no allocs", + setup: func(txn *txn) (*structs.Job, error) { + return mock.Job(), nil + }, + exp: structs.JobStatusPending, }, { - name: "current job has running alloc", - hasAlloc: true, - jobSetup: func(j *structs.Job) {}, - exp: structs.JobStatusRunning, + name: "current job has running alloc", + setup: func(txn *txn) (*structs.Job, error) { + j := mock.Job() + a := mock.Alloc() + + a.JobID = j.ID + + if err := txn.Insert("allocs", a); err != nil { + return nil, err + } + return j, nil + }, + exp: structs.JobStatusRunning, }, { - name: "current job with all terminal allocs", - hasAlloc: true, - allocSetup: func(a *structs.Allocation) { - a.ClientStatus = structs.AllocClientStatusComplete + name: "previous job version had allocs", + setup: func(txn *txn) (*structs.Job, error) { + j := mock.Job() + a := mock.Alloc() + + a.JobID = j.ID + a.ClientStatus = structs.AllocClientStatusFailed + + j.Version += 1 + + if err := txn.Insert("allocs", a); err != nil { + return nil, err + } + return j, nil }, - jobSetup: func(j *structs.Job) {}, - exp: structs.JobStatusDead, + exp: structs.JobStatusPending, }, { - name: "previous job version had allocs", - hasAlloc: true, - jobSetup: func(j *structs.Job) { - j.Version += 1 + name: "job has all terminal allocs, with no evals", + setup: func(txn *txn) (*structs.Job, error) { + j := mock.Job() + a := mock.Alloc() + + a.ClientStatus = structs.AllocClientStatusFailed + a.JobID = j.ID + + if err := txn.Insert("allocs", a); err != nil { + return nil, err + } + return j, nil + }, + exp: structs.JobStatusDead, + }, + { + name: "job has all terminal allocs, with pending eval", + setup: func(txn *txn) (*structs.Job, error) { + j := mock.Job() + a := mock.Alloc() + + a.ClientStatus = structs.AllocClientStatusFailed + a.JobID = j.ID + + e := mock.Eval() + e.JobID = j.ID + e.Status = structs.EvalStatusPending + + if err := txn.Insert("allocs", a); err != nil { + return nil, err + } + + if err := txn.Insert("evals", e); err != nil { + return nil, err + } + return j, nil + }, exp: structs.JobStatusPending, }, @@ -7749,26 +7802,11 @@ func TestStateStore_GetJobStatus(t *testing.T) { state := testStateStore(t) txn := state.db.WriteTxn(0) + txn.Insert("allocs", mock.Alloc()) - var job structs.Job - if tc.hasAlloc { - a := mock.Alloc() - - if tc.allocSetup != nil { - tc.allocSetup(a) - } - - err := txn.Insert("allocs", a) - require.NoError(t, err) - - job = *a.Job - } else { - job = *structs.MockJob() - } - - tc.jobSetup(&job) + job, err := tc.setup(txn) - status, err := state.getJobStatus(txn, &job, false) + status, err := state.getJobStatus(txn, job, false) require.NoError(t, err) require.Equal(t, tc.exp, status) }) From 81f62041bf9435e99adfd99c0f3f03a93dec6787 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Thu, 30 Jan 2025 15:32:00 -0500 Subject: [PATCH 05/12] fix tests and add back eval logic --- nomad/core_sched_test.go | 17 ++++++++++------- nomad/state/state_store.go | 24 ++++++++++++++++-------- nomad/state/state_store_test.go | 1 + 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index fe700090f35..a1f906970fb 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -117,28 +117,28 @@ func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) { defer cleanupS1() testutil.WaitForLeader(t, s1.RPC) + job := mock.Job() + // Insert "dead" eval store := s1.fsm.State() eval := mock.Eval() + eval.JobModifyIndex = job.ModifyIndex eval.CreateTime = time.Now().UTC().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds eval.ModifyTime = time.Now().UTC().Add(-5 * time.Hour).UnixNano() eval.Status = structs.EvalStatusFailed - must.NoError(t, store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))) - must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})) + must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{eval})) // Insert "pending" eval for same job eval2 := mock.Eval() eval2.JobID = eval.JobID + eval2.JobModifyIndex = job.ModifyIndex + 1 eval2.CreateTime = time.Now().UTC().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds eval2.ModifyTime = time.Now().UTC().Add(-5 * time.Hour).UnixNano() - must.NoError(t, store.UpsertJobSummary(999, mock.JobSummary(eval2.JobID))) - must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2})) + must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 2, []*structs.Evaluation{eval2})) // Insert mock job with default reschedule policy of 2 in 10 minutes - job := mock.Job() job.ID = eval.JobID - - must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job)) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 3, nil, job)) // Insert failed alloc with an old reschedule attempt, can be GCed alloc := mock.Alloc() @@ -1049,12 +1049,14 @@ func TestCoreScheduler_JobGC_OutstandingEvals(t *testing.T) { // Insert two evals, one terminal and one not eval := mock.Eval() eval.JobID = job.ID + eval.JobModifyIndex = job.ModifyIndex eval.Status = structs.EvalStatusComplete eval.CreateTime = time.Now().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds eval.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano() eval2 := mock.Eval() eval2.JobID = job.ID + eval2.JobModifyIndex = job.ModifyIndex eval2.Status = structs.EvalStatusPending eval2.CreateTime = time.Now().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds eval2.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano() @@ -1484,6 +1486,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) { // Insert a terminal eval eval := mock.Eval() eval.JobID = job.ID + eval.JobModifyIndex = job.ModifyIndex eval.Status = structs.EvalStatusComplete eval.CreateTime = time.Now().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds eval.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano() diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index f7c37c241ad..7814c8d43ed 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -5600,16 +5600,12 @@ func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64, } func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) (string, error) { - // If the job has been stopped, it's status is dead - if job.Stopped() { - return structs.JobStatusDead, nil - } - // System, Periodic and Parameterized jobs are running until explicitly // stopped. if job.Type == structs.JobTypeSystem || job.IsParameterized() || - job.IsPeriodic() { + job.IsPeriodic() || + job.Stopped() { return structs.JobStatusRunning, nil } @@ -5639,16 +5635,28 @@ func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) ( return "", err } + terminalEvals := false for raw := evals.Next(); raw != nil; raw = evals.Next() { e := raw.(*structs.Evaluation) + // Ignore evals created for previous jobs + if e.JobModifyIndex < job.ModifyIndex { + continue + } + if !e.TerminalStatus() { return structs.JobStatusPending, nil } + + terminalEvals = true } - // The job is dead if all allocations for this version are terminal. - if terminalAllocs { + // The job is dead if all allocations for this version are terminal, + // all evals are terminal + // + // Also, in the event a jobs allocs and evals are all GC'd, we don't + // want the job to be marked pending. + if terminalAllocs || terminalEvals || evalDelete { return structs.JobStatusDead, nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1b32599ea6a..54cec4248f9 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7781,6 +7781,7 @@ func TestStateStore_GetJobStatus(t *testing.T) { e := mock.Eval() e.JobID = j.ID + e.JobModifyIndex = j.ModifyIndex e.Status = structs.EvalStatusPending if err := txn.Insert("allocs", a); err != nil { From c29c2bb17cecea592bd7056a29501e4b6ea51546 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Fri, 31 Jan 2025 08:34:31 -0500 Subject: [PATCH 06/12] fixes job stop and adds tests --- nomad/state/state_store.go | 10 +++++++--- nomad/state/state_store_test.go | 35 ++++++++++++++++++++++++++++++--- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 7814c8d43ed..b0105f95993 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3716,7 +3716,9 @@ func (s *StateStore) DeleteEval(index uint64, evals, allocs []string, userInitia } } - // TODO: should we really be doing this here? We don't do it for filtered evals. + // TODO: should we really be doing this here? The only time this will affect the + // status of a job is if it's the last eval and alloc for a client, at which point + // the status of the job will already be "dead" from handling the alloc update. // Set the job's status if err := s.setJobStatuses(index, txn, jobs, true); err != nil { return fmt.Errorf("setting job status failed: %v", err) @@ -5604,8 +5606,10 @@ func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) ( // stopped. if job.Type == structs.JobTypeSystem || job.IsParameterized() || - job.IsPeriodic() || - job.Stopped() { + job.IsPeriodic() { + if job.Stopped() { + return structs.JobStatusDead, nil + } return structs.JobStatusRunning, nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 54cec4248f9..e647a3dba44 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3653,19 +3653,22 @@ func TestStateStore_JobsByGC(t *testing.T) { } for i := 0; i < 20; i += 2 { + idx := 2000 + uint64(i+1) job := mock.Job() job.Type = structs.JobTypeBatch + job.ModifyIndex = idx gc[job.ID] = struct{}{} - if err := state.UpsertJob(structs.MsgTypeTestSetup, 2000+uint64(i), nil, job); err != nil { + if err := state.UpsertJob(structs.MsgTypeTestSetup, idx, nil, job); err != nil { t.Fatalf("err: %v", err) } // Create an eval for it eval := mock.Eval() eval.JobID = job.ID + eval.JobModifyIndex = job.ModifyIndex eval.Status = structs.EvalStatusComplete - if err := state.UpsertEvals(structs.MsgTypeTestSetup, 2000+uint64(i+1), []*structs.Evaluation{eval}); err != nil { + if err := state.UpsertEvals(structs.MsgTypeTestSetup, idx, []*structs.Evaluation{eval}); err != nil { t.Fatalf("err: %v", err) } @@ -5084,6 +5087,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) { require.Equal(t, uint64(1002), evalsIndex) } +// This tests the evalDelete boolean by deleting a Pending eval and Pending Alloc. func TestStateStore_DeleteEval_ChildJob(t *testing.T) { ci.Parallel(t) @@ -7687,10 +7691,35 @@ func TestStateStore_GetJobStatus(t *testing.T) { exp string }{ { - name: "stopped job", + name: "stopped job with running allocations is still running", setup: func(txn *txn) (*structs.Job, error) { j := mock.Job() j.Stop = true + + a := mock.Alloc() + a.JobID = j.ID + a.Job = j + a.ClientStatus = structs.AllocClientStatusRunning + if err := txn.Insert("allocs", a); err != nil { + return nil, err + } + return j, nil + }, + exp: structs.JobStatusRunning, + }, + { + name: "stopped job with terminal allocs is dead", + setup: func(txn *txn) (*structs.Job, error) { + j := mock.Job() + j.Stop = true + + a := mock.Alloc() + a.JobID = j.ID + a.Job = j + a.ClientStatus = structs.AllocClientStatusComplete + if err := txn.Insert("allocs", a); err != nil { + return nil, err + } return j, nil }, exp: structs.JobStatusDead, From 9275bfb3ccf9fbfe67703723fa357d3745a0762d Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Fri, 31 Jan 2025 12:03:01 -0500 Subject: [PATCH 07/12] more test fixes --- nomad/core_sched_test.go | 18 +++++++++++------- nomad/job_endpoint.go | 2 +- nomad/state/state_store_test.go | 1 + nomad/system_endpoint_test.go | 1 + 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index a1f906970fb..84494990998 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -126,19 +126,19 @@ func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) { eval.CreateTime = time.Now().UTC().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds eval.ModifyTime = time.Now().UTC().Add(-5 * time.Hour).UnixNano() eval.Status = structs.EvalStatusFailed - must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{eval})) + must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})) + + // Insert mock job with default reschedule policy of 2 in 10 minutes + job.ID = eval.JobID + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1002, nil, job)) // Insert "pending" eval for same job eval2 := mock.Eval() eval2.JobID = eval.JobID - eval2.JobModifyIndex = job.ModifyIndex + 1 + eval2.JobModifyIndex = job.ModifyIndex // must have same modify index as job in order to set job status correctly eval2.CreateTime = time.Now().UTC().Add(-6 * time.Hour).UnixNano() // make sure objects we insert are older than GC thresholds eval2.ModifyTime = time.Now().UTC().Add(-5 * time.Hour).UnixNano() - must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 2, []*structs.Evaluation{eval2})) - - // Insert mock job with default reschedule policy of 2 in 10 minutes - job.ID = eval.JobID - must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 3, nil, job)) + must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval2})) // Insert failed alloc with an old reschedule attempt, can be GCed alloc := mock.Alloc() @@ -1704,6 +1704,7 @@ func TestCoreScheduler_jobGC(t *testing.T) { mockJob1Alloc1 := mock.Alloc() mockJob1Alloc1.EvalID = mockEval1.ID mockJob1Alloc1.JobID = inputJob.ID + mockJob1Alloc1.Job.Version = inputJob.Version mockJob1Alloc1.ClientStatus = structs.AllocClientStatusRunning mockJob1Alloc1.CreateTime = time.Now().Add(-6 * time.Hour).UnixNano() mockJob1Alloc1.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano() @@ -1711,6 +1712,7 @@ func TestCoreScheduler_jobGC(t *testing.T) { mockJob1Alloc2 := mock.Alloc() mockJob1Alloc2.EvalID = mockEval1.ID mockJob1Alloc2.JobID = inputJob.ID + mockJob1Alloc2.Job.Version = inputJob.Version mockJob1Alloc2.ClientStatus = structs.AllocClientStatusRunning mockJob1Alloc2.CreateTime = time.Now().Add(-6 * time.Hour).UnixNano() mockJob1Alloc2.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano() @@ -1778,8 +1780,10 @@ func TestCoreScheduler_jobGC(t *testing.T) { // Mark that the allocations have reached a terminal state. mockJob1Alloc1.DesiredStatus = structs.AllocDesiredStatusStop mockJob1Alloc1.ClientStatus = structs.AllocClientStatusComplete + mockJob1Alloc1.Job.Version = inputJob.Version mockJob1Alloc2.DesiredStatus = structs.AllocDesiredStatusStop mockJob1Alloc2.ClientStatus = structs.AllocClientStatusComplete + mockJob1Alloc2.Job.Version = inputJob.Version must.NoError(t, testServer.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 30, []*structs.Allocation{ diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index bc15edc1580..60f2d67c101 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -422,7 +422,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return nil } - if eval != nil && !submittedEval { + if !submittedEval { eval.JobModifyIndex = reply.JobModifyIndex update := &structs.EvalUpdateRequest{ Evals: []*structs.Evaluation{eval}, diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index e647a3dba44..5ee6150bb76 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -4870,6 +4870,7 @@ func TestStateStore_UpsertEvals_Eval_ChildJob(t *testing.T) { eval := mock.Eval() eval.Status = structs.EvalStatusComplete eval.JobID = child.ID + eval.JobModifyIndex = child.ModifyIndex // Create watchsets so we can test that upsert fires the watch ws := memdb.NewWatchSet() diff --git a/nomad/system_endpoint_test.go b/nomad/system_endpoint_test.go index 209e875492b..f43d2791b45 100644 --- a/nomad/system_endpoint_test.go +++ b/nomad/system_endpoint_test.go @@ -41,6 +41,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { eval := mock.Eval() eval.Status = structs.EvalStatusComplete eval.JobID = job.ID + eval.JobModifyIndex = job.ModifyIndex // set modify time older than now but still newer than default GC threshold eval.ModifyTime = time.Now().Add(-10 * time.Millisecond).UnixNano() must.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})) From 49614feb587323da95c9c282a54fffb00eb65db1 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Mon, 3 Feb 2025 15:05:59 -0500 Subject: [PATCH 08/12] adds reschedule tests --- nomad/core_sched_test.go | 4 -- nomad/state/state_store.go | 57 +++++++++++++++++++----- nomad/state/state_store_test.go | 77 ++++++++++++++++++++++++++++++--- 3 files changed, 116 insertions(+), 22 deletions(-) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 84494990998..416f855ff74 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -1704,7 +1704,6 @@ func TestCoreScheduler_jobGC(t *testing.T) { mockJob1Alloc1 := mock.Alloc() mockJob1Alloc1.EvalID = mockEval1.ID mockJob1Alloc1.JobID = inputJob.ID - mockJob1Alloc1.Job.Version = inputJob.Version mockJob1Alloc1.ClientStatus = structs.AllocClientStatusRunning mockJob1Alloc1.CreateTime = time.Now().Add(-6 * time.Hour).UnixNano() mockJob1Alloc1.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano() @@ -1712,7 +1711,6 @@ func TestCoreScheduler_jobGC(t *testing.T) { mockJob1Alloc2 := mock.Alloc() mockJob1Alloc2.EvalID = mockEval1.ID mockJob1Alloc2.JobID = inputJob.ID - mockJob1Alloc2.Job.Version = inputJob.Version mockJob1Alloc2.ClientStatus = structs.AllocClientStatusRunning mockJob1Alloc2.CreateTime = time.Now().Add(-6 * time.Hour).UnixNano() mockJob1Alloc2.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano() @@ -1780,10 +1778,8 @@ func TestCoreScheduler_jobGC(t *testing.T) { // Mark that the allocations have reached a terminal state. mockJob1Alloc1.DesiredStatus = structs.AllocDesiredStatusStop mockJob1Alloc1.ClientStatus = structs.AllocClientStatusComplete - mockJob1Alloc1.Job.Version = inputJob.Version mockJob1Alloc2.DesiredStatus = structs.AllocDesiredStatusStop mockJob1Alloc2.ClientStatus = structs.AllocClientStatusComplete - mockJob1Alloc2.Job.Version = inputJob.Version must.NoError(t, testServer.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 30, []*structs.Allocation{ diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index b0105f95993..468522ef147 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1800,6 +1800,7 @@ func (s *StateStore) upsertJobImpl(index uint64, sub *structs.JobSubmission, job // Compute the job status var err error job.Status, err = s.getJobStatus(txn, job, false) + s.logger.Info("setting job status", "status", job.Status) if err != nil { return fmt.Errorf("setting job status for %q failed: %v", job.ID, err) } @@ -5618,20 +5619,26 @@ func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) ( return "", err } - // If there is a non-terminal allocation, the job is running. terminalAllocs := false for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() { a := alloc.(*structs.Allocation) - if a.Job.Version < job.Version { - continue - } - + // If there is a non-terminal allocation, the job is running. if !a.TerminalStatus() { return structs.JobStatusRunning, nil } - terminalAllocs = true + // If all the allocs are terminal and any are not reschedulable + // mark this job as having terminal allocs, possibly dead + if !isReschedulable(a) { + terminalAllocs = true + } + } + + // The job is dead if it is stopped and there are no allocs + // or all allocs are terminal + if job.Stopped() { + return structs.JobStatusDead, nil } evals, err := txn.Get("evals", "job_prefix", job.Namespace, job.ID) @@ -5643,7 +5650,10 @@ func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) ( for raw := evals.Next(); raw != nil; raw = evals.Next() { e := raw.(*structs.Evaluation) - // Ignore evals created for previous jobs + // This handles restarting stopped jobs, or else they are marked dead. + // We need to be careful with this, because an eval can technically + // still apply to a job with a greater modify index, i.e. during reschedule + // but we handle reschedule above. if e.JobModifyIndex < job.ModifyIndex { continue } @@ -5656,17 +5666,16 @@ func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) ( } // The job is dead if all allocations for this version are terminal, - // all evals are terminal - // + // all evals are terminal. // Also, in the event a jobs allocs and evals are all GC'd, we don't // want the job to be marked pending. if terminalAllocs || terminalEvals || evalDelete { return structs.JobStatusDead, nil } - // There are no allocs yet, which will happen for new job submissions, + // There are no allocs/evals yet, which can happen for new job submissions, // running new versions of a job, or reverting. This will happen if - // the evaluation is persisted after the job is persisted + // the evaluation is persisted after the job is persisted. return structs.JobStatusPending, nil } @@ -7365,6 +7374,32 @@ func (s *StateStore) ScalingPoliciesByIDPrefix(ws memdb.WatchSet, namespace stri return iter, nil } +// RescheduleInfo is used to calculate remaining reschedule attempts +// according to the given time and the task groups reschedule policy +// This is modified from the API package +func isReschedulable(a *structs.Allocation) bool { + if a.ReschedulePolicy() == nil { + return false + } + reschedulePolicy := a.ReschedulePolicy() + availableAttempts := reschedulePolicy.Attempts + interval := reschedulePolicy.Interval + attempted := 0 + currTime := time.Now() + + // Loop over reschedule tracker to find attempts within the restart policy's interval + if a.RescheduleTracker != nil && availableAttempts > 0 && interval > 0 { + for j := len(a.RescheduleTracker.Events) - 1; j >= 0; j-- { + lastAttempt := a.RescheduleTracker.Events[j].RescheduleTime + timeDiff := currTime.UTC().UnixNano() - lastAttempt + if timeDiff < interval.Nanoseconds() { + attempted += 1 + } + } + } + return attempted < availableAttempts +} + // scalingPolicyNamespaceFilter returns a filter function that filters all // scaling policies not targeting the given namespace. func scalingPolicyNamespaceFilter(namespace string) func(interface{}) bool { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 5ee6150bb76..89ddd57318f 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -5,6 +5,7 @@ package state import ( "context" + "errors" "fmt" "reflect" "sort" @@ -7695,7 +7696,6 @@ func TestStateStore_GetJobStatus(t *testing.T) { name: "stopped job with running allocations is still running", setup: func(txn *txn) (*structs.Job, error) { j := mock.Job() - j.Stop = true a := mock.Alloc() a.JobID = j.ID @@ -7704,7 +7704,11 @@ func TestStateStore_GetJobStatus(t *testing.T) { if err := txn.Insert("allocs", a); err != nil { return nil, err } - return j, nil + + stoppedJob := j.Copy() + stoppedJob.Stop = true + stoppedJob.Version += 1 + return stoppedJob, nil }, exp: structs.JobStatusRunning, }, @@ -7758,7 +7762,6 @@ func TestStateStore_GetJobStatus(t *testing.T) { a := mock.Alloc() a.JobID = j.ID - if err := txn.Insert("allocs", a); err != nil { return nil, err } @@ -7771,15 +7774,23 @@ func TestStateStore_GetJobStatus(t *testing.T) { setup: func(txn *txn) (*structs.Job, error) { j := mock.Job() a := mock.Alloc() + e := mock.Eval() + + e.JobID = j.ID + e.JobModifyIndex = j.ModifyIndex + e.Status = structs.EvalStatusPending a.JobID = j.ID + a.Job = j a.ClientStatus = structs.AllocClientStatusFailed j.Version += 1 - if err := txn.Insert("allocs", a); err != nil { return nil, err } + if err := txn.Insert("evals", e); err != nil { + return nil, err + } return j, nil }, exp: structs.JobStatusPending, @@ -7801,7 +7812,7 @@ func TestStateStore_GetJobStatus(t *testing.T) { exp: structs.JobStatusDead, }, { - name: "job has all terminal allocs, with pending eval", + name: "job has all terminal allocs, but pending eval", setup: func(txn *txn) (*structs.Job, error) { j := mock.Job() a := mock.Alloc() @@ -7826,6 +7837,59 @@ func TestStateStore_GetJobStatus(t *testing.T) { }, exp: structs.JobStatusPending, }, + { + name: "reschedulable alloc is pending waiting for replacement", + setup: func(t *txn) (*structs.Job, error) { + j := mock.Job() + if j.TaskGroups[0].ReschedulePolicy == nil { + return nil, errors.New("mock job doesn't have replacement policy") + } + a := mock.Alloc() + a.Job = j + a.JobID = j.ID + a.ClientStatus = structs.AllocClientStatusFailed + if err := t.Insert("allocs", a); err != nil { + return nil, err + } + return j, nil + }, + exp: structs.JobStatusPending, + }, + { + name: "reschedulable alloc is dead after replacement fails", + setup: func(t *txn) (*structs.Job, error) { + j := mock.Job() + // give job one reschedule attempt + j.TaskGroups[0].ReschedulePolicy.Attempts = 1 + j.TaskGroups[0].ReschedulePolicy.Interval = time.Hour + + // Replacement alloc + a := mock.Alloc() + a.Job = j + a.JobID = j.ID + a.ClientStatus = structs.AllocClientStatusFailed + a.RescheduleTracker = &structs.RescheduleTracker{ + Events: []*structs.RescheduleEvent{ + structs.NewRescheduleEvent(time.Now().UTC().UnixNano(), "", "", time.Minute), + }, + } + if err := t.Insert("allocs", a); err != nil { + return nil, err + } + + // Original alloc + a2 := mock.Alloc() + a2.Job = j + a2.JobID = j.ID + a2.ClientStatus = structs.AllocClientStatusFailed + a2.NextAllocation = a.ID + if err := t.Insert("allocs", a2); err != nil { + return nil, err + } + return j, nil + }, + exp: structs.JobStatusDead, + }, } for _, tc := range testCases { @@ -7833,9 +7897,8 @@ func TestStateStore_GetJobStatus(t *testing.T) { state := testStateStore(t) txn := state.db.WriteTxn(0) - txn.Insert("allocs", mock.Alloc()) - job, err := tc.setup(txn) + job, _ := tc.setup(txn) status, err := state.getJobStatus(txn, job, false) require.NoError(t, err) From 43c65251a2833926bb9740c8470cdfd0fcda0e09 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Mon, 3 Feb 2025 17:45:39 -0500 Subject: [PATCH 09/12] filter batch jobs during reschedule check --- nomad/core_sched_test.go | 2 ++ nomad/state/state_store.go | 11 +++++------ nomad/state/state_store_test.go | 7 +++++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 416f855ff74..9f16d0cdb0e 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -1147,6 +1147,7 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) { // Insert two allocs, one terminal and one not alloc := mock.Alloc() alloc.JobID = job.ID + alloc.Job = job alloc.EvalID = eval.ID alloc.DesiredStatus = structs.AllocDesiredStatusRun alloc.ClientStatus = structs.AllocClientStatusComplete @@ -1154,6 +1155,7 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) { alloc2 := mock.Alloc() alloc2.JobID = job.ID + alloc.Job = job alloc2.EvalID = eval.ID alloc2.DesiredStatus = structs.AllocDesiredStatusRun alloc2.ClientStatus = structs.AllocClientStatusRunning diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 468522ef147..ef5927c2831 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -5628,8 +5628,9 @@ func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) ( return structs.JobStatusRunning, nil } - // If all the allocs are terminal and any are not reschedulable - // mark this job as having terminal allocs, possibly dead + // Check if the allocs are reschedulable before before + // marking the job dead. If any of the allocs are terminal + // and not reschedulable, mark the job dead. if !isReschedulable(a) { terminalAllocs = true } @@ -7374,13 +7375,11 @@ func (s *StateStore) ScalingPoliciesByIDPrefix(ws memdb.WatchSet, namespace stri return iter, nil } -// RescheduleInfo is used to calculate remaining reschedule attempts -// according to the given time and the task groups reschedule policy -// This is modified from the API package func isReschedulable(a *structs.Allocation) bool { - if a.ReschedulePolicy() == nil { + if a.ReschedulePolicy() == nil || a.Job.Type != structs.JobTypeService { return false } + reschedulePolicy := a.ReschedulePolicy() availableAttempts := reschedulePolicy.Attempts interval := reschedulePolicy.Interval diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 89ddd57318f..f00f27b9bf3 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -5798,6 +5798,7 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 998, nil, parent)) child := mock.Job() + child.Type = structs.JobTypeBatch child.Status = "" child.ParentID = parent.ID must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 999, nil, child)) @@ -7796,13 +7797,15 @@ func TestStateStore_GetJobStatus(t *testing.T) { exp: structs.JobStatusPending, }, { - name: "job has all terminal allocs, with no evals", + name: "batch job has all terminal allocs, with no evals", setup: func(txn *txn) (*structs.Job, error) { j := mock.Job() - a := mock.Alloc() + j.Type = structs.JobTypeBatch + a := mock.Alloc() a.ClientStatus = structs.AllocClientStatusFailed a.JobID = j.ID + a.Job = j if err := txn.Insert("allocs", a); err != nil { return nil, err From 379040f962856fdd44615e57b3eaba9e1020c7d8 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 4 Feb 2025 09:54:34 -0500 Subject: [PATCH 10/12] remove log --- nomad/state/state_store.go | 1 - 1 file changed, 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index ef5927c2831..0ea7e2510a5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1800,7 +1800,6 @@ func (s *StateStore) upsertJobImpl(index uint64, sub *structs.JobSubmission, job // Compute the job status var err error job.Status, err = s.getJobStatus(txn, job, false) - s.logger.Info("setting job status", "status", job.Status) if err != nil { return fmt.Errorf("setting job status for %q failed: %v", job.ID, err) } From 6539b861cc5f6a45a9b6ac143dc717f12e5cbb8e Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 4 Feb 2025 14:07:35 -0500 Subject: [PATCH 11/12] remove unnecessary change --- nomad/fsm.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index d804839b0f6..3cc4bf101b4 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -948,8 +948,13 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt } } - // Update any evals first. During a reschedule, we don't want to mark the job dead, which - // would happen if all the allocs were terminal and there wasn't a pending eval + // Update all the client allocations + if err := n.state.UpdateAllocsFromClient(msgType, index, req.Alloc); err != nil { + n.logger.Error("UpdateAllocFromClient failed", "error", err) + return err + } + + // Update any evals if len(req.Evals) > 0 { if err := n.upsertEvals(msgType, index, req.Evals); err != nil { n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err) @@ -957,12 +962,6 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt } } - // Update all the client allocations - if err := n.state.UpdateAllocsFromClient(msgType, index, req.Alloc); err != nil { - n.logger.Error("UpdateAllocFromClient failed", "error", err) - return err - } - // Unblock evals for the nodes computed node class if the client has // finished running an allocation. for _, alloc := range req.Alloc { From 28c2b075621813b74ec413f816a3917909243474 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 4 Feb 2025 14:19:40 -0500 Subject: [PATCH 12/12] adds changelog --- .changelog/24974.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/24974.txt diff --git a/.changelog/24974.txt b/.changelog/24974.txt new file mode 100644 index 00000000000..b45d5ef63c4 --- /dev/null +++ b/.changelog/24974.txt @@ -0,0 +1,3 @@ +```release-note:bug +state store: fix for setting correct job status in various scenarios +```