Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stateful deployments: use TaskGroupVolumeClaim table to associate volume requests with volume IDs #24993

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
32 changes: 32 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@
return n.applyHostVolumeRegister(msgType, buf[1:], log.Index)
case structs.HostVolumeDeleteRequestType:
return n.applyHostVolumeDelete(msgType, buf[1:], log.Index)
case structs.TaskGroupVolumeClaimRegisterRequestType:
return n.applyTaskGroupVolumeClaim(msgType, buf[1:], log.Index)

Check failure on line 393 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-windows

n.applyTaskGroupVolumeClaim undefined (type *nomadFSM has no field or method applyTaskGroupVolumeClaim)

Check failure on line 393 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

n.applyTaskGroupVolumeClaim undefined (type *nomadFSM has no field or method applyTaskGroupVolumeClaim)

Check failure on line 393 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

n.applyTaskGroupVolumeClaim undefined (type *nomadFSM has no field or method applyTaskGroupVolumeClaim)

Check failure on line 393 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

n.applyTaskGroupVolumeClaim undefined (type *nomadFSM has no field or method applyTaskGroupVolumeClaim)

Check failure on line 393 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

n.applyTaskGroupVolumeClaim undefined (type *nomadFSM has no field or method applyTaskGroupVolumeClaim)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -2450,6 +2452,36 @@
return nil
}

func (n *nomadFSM) applyTaskVolumeClaimRegister(msgType structs.MessageType, buf []byte, index uint64) interface{} {
tgross marked this conversation as resolved.
Show resolved Hide resolved
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_task_group_volume_claim_register"}, time.Now())

var req structs.TaskGroupVolumeClaimRegisterRequest

Check failure on line 2458 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-windows

undefined: structs.TaskGroupVolumeClaimRegisterRequest

Check failure on line 2458 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

undefined: structs.TaskGroupVolumeClaimRegisterRequest

Check failure on line 2458 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

undefined: structs.TaskGroupVolumeClaimRegisterRequest

Check failure on line 2458 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: structs.TaskGroupVolumeClaimRegisterRequest

Check failure on line 2458 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: structs.TaskGroupVolumeClaimRegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpsertTaskGroupVolumeClaim(index, req.Volume); err != nil {
n.logger.Error("UpsertTaskGroupVolumeClaim failed", "error", err)
return err
}
return nil
}

func (n *nomadFSM) applyTaskVolumeClaimDelete(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_task_group_volume_claim_delete"}, time.Now())

var req structs.TaskGroupVolumeClaimDeleteRequest

Check failure on line 2473 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-windows

undefined: structs.TaskGroupVolumeClaimDeleteRequest

Check failure on line 2473 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

undefined: structs.TaskGroupVolumeClaimDeleteRequest

Check failure on line 2473 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

undefined: structs.TaskGroupVolumeClaimDeleteRequest

Check failure on line 2473 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: structs.TaskGroupVolumeClaimDeleteRequest

Check failure on line 2473 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: structs.TaskGroupVolumeClaimDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteTaskGroupVolumeClaim(index, req.RequestNamespace(), req.VolumeID); err != nil {

Check failure on line 2478 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-windows

too many arguments in call to n.state.DeleteTaskGroupVolumeClaim

Check failure on line 2478 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

too many arguments in call to n.state.DeleteTaskGroupVolumeClaim

Check failure on line 2478 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

too many arguments in call to n.state.DeleteTaskGroupVolumeClaim

Check failure on line 2478 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

too many arguments in call to n.state.DeleteTaskGroupVolumeClaim

Check failure on line 2478 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

too many arguments in call to n.state.DeleteTaskGroupVolumeClaim
n.logger.Error("DeleteTaskGroupVolumeClaim failed", "error", err)
return err
}
return nil
}

func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
// Register the nodes
Expand Down
27 changes: 27 additions & 0 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
TableHostVolumes = "host_volumes"
TableCSIVolumes = "csi_volumes"
TableCSIPlugins = "csi_plugins"
TableTaskGroupVolumeClaim = "task_volume"
)

const (
Expand All @@ -45,6 +46,7 @@ const (
indexSigningKey = "signing_key"
indexAuthMethod = "auth_method"
indexNodePool = "node_pool"
indexTaskGroupName = "tg_name"
)

var (
Expand Down Expand Up @@ -102,6 +104,7 @@ func init() {
aclAuthMethodsTableSchema,
bindingRulesTableSchema,
hostVolumeTableSchema,
taskVolumeClaimSchema,
}...)
}

Expand Down Expand Up @@ -1706,3 +1709,27 @@ func hostVolumeTableSchema() *memdb.TableSchema {
},
}
}

func taskVolumeClaimSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: TableTaskGroupVolumeClaim,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
},
},
indexTaskGroupName: {
Name: indexTaskGroupName,
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "TaskGroupName",
},
},
},
}
}
43 changes: 43 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hashicorp/go-set/v3"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/lang"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -4125,6 +4126,45 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
if alloc.Job == nil {
return fmt.Errorf("attempting to upsert allocation %q without a job", alloc.ID)
}

// Check if the alloc requires sticky volumes. If yes, find a node
// that has the right volume and update the task group volume
// claims table
stickyVolumes := []*structs.TaskGroupVolumeClaim{}
for _, tg := range alloc.Job.TaskGroups {
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved
for _, v := range tg.Volumes {
if !v.Sticky {
continue
}
stickyVolumes = append(stickyVolumes, &structs.TaskGroupVolumeClaim{
ID: uuid.Generate(),
JobID: alloc.JobID,
TaskGroupName: tg.Name,
AllocID: alloc.ID,
VolumeName: v.Source,
})
}
}

if len(stickyVolumes) > 0 {
allocNode, err := s.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}

for _, v := range allocNode.HostVolumes {
// Record volumes that this allocation uses in the claims table
for _, sv := range stickyVolumes {
if sv.VolumeName != v.Name {
continue
}
sv.VolumeID = v.ID
if err := s.UpsertTaskGroupVolumeClaim(index, sv); err != nil {
tgross marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}
}
}
} else {
alloc.CreateIndex = exist.CreateIndex
alloc.ModifyIndex = index
Expand All @@ -4133,6 +4173,9 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
// Keep the clients task states
alloc.TaskStates = exist.TaskStates

// Keep the volume info
alloc.HostVolumeIDs = exist.HostVolumeIDs

// If the scheduler is marking this allocation as lost or unknown we do not
// want to reuse the status of the existing allocation.
if alloc.ClientStatus != structs.AllocClientStatusLost &&
Expand Down
97 changes: 97 additions & 0 deletions nomad/state/state_store_task_group_volume_claims.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package state

import (
"errors"
"fmt"

"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)

// UpsertACLBindingRules is used to insert a task group volume assignment into
// the state store.
func (s *StateStore) UpsertTaskGroupVolumeClaim(
index uint64, claim *structs.TaskGroupVolumeClaim) error {

// Grab a write transaction.
txn := s.db.WriteTxnMsgT(structs.TaskGroupVolumeClaimRegisterRequestType, index)
defer txn.Abort()

existingRaw, err := txn.First(TableTaskGroupVolumeClaim, indexID, claim.ID)
if err != nil {
return fmt.Errorf("Task group volume association lookup failed: %v", err)
}

var existing *structs.TaskGroupVolumeClaim
if existingRaw != nil {
existing = existingRaw.(*structs.TaskGroupVolumeClaim)
}

if existing != nil {
if existing.Equal(claim) {
return nil
}

claim.CreateIndex = existing.CreateIndex
claim.ModifyIndex = index
claim.CreateTime = existing.CreateTime
} else {
claim.CreateIndex = index
claim.ModifyIndex = index
}

// Insert the claim into the table.
if err := txn.Insert(TableTaskGroupVolumeClaim, claim); err != nil {
return fmt.Errorf("Task group volume claim insert failed: %v", err)
}

// Perform the index table update to mark the new insert.
if err := txn.Insert(tableIndex, &IndexEntry{TableTaskGroupVolumeClaim, index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return txn.Commit()
}

// DeleteTaskGroupVolumeClaim is responsible for deleting volume claims.
func (s *StateStore) DeleteTaskGroupVolumeClaim(index uint64, claimID string) error {
txn := s.db.WriteTxnMsgT(structs.TaskGroupVolumeClaimDeleteRequestType, index)
defer txn.Abort()

existing, err := txn.First(TableTaskGroupVolumeClaim, indexID, claimID)
if err != nil {
return fmt.Errorf("Task group volume claim lookup failed: %v", err)
}
if existing == nil {
return errors.New("ACL binding rule not found")
}

// Delete the existing entry from the table.
if err := txn.Delete(TableTaskGroupVolumeClaim, existing); err != nil {
return fmt.Errorf("Task group volume claim deletion failed: %v", err)
}

// Update the index table to indicate an update has occurred.
if err := txn.Insert(tableIndex, &IndexEntry{TableTaskGroupVolumeClaim, index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return txn.Commit()
}

// GetTaskGroupVolumeClaims returns an iterator that contains all task group
// volume associations stored within state.
func (s *StateStore) GetTaskGroupVolumeClaims(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get(TableTaskGroupVolumeClaim, indexID)
if err != nil {
return nil, fmt.Errorf("Task group volume claims lookup failed: %v", err)
}
ws.Add(iter.WatchCh())

return iter, nil
}
69 changes: 67 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ const (
NamespaceDeleteRequestType MessageType = 65

// MessageTypes 66-74 are in Nomad Enterprise
HostVolumeRegisterRequestType MessageType = 75
HostVolumeDeleteRequestType MessageType = 76
HostVolumeRegisterRequestType MessageType = 75
HostVolumeDeleteRequestType MessageType = 76
TaskGroupVolumeClaimRegisterRequestType MessageType = 77
TaskGroupVolumeClaimDeleteRequestType MessageType = 78

// NOTE: MessageTypes are shared between CE and ENT. If you need to add a
// new type, check that ENT is not already using that value.
Expand Down Expand Up @@ -7774,6 +7776,69 @@ func (tg *TaskGroup) SetConstraints(newConstraints []*Constraint) {
tg.Constraints = newConstraints
}

// TaskGroupVolumeClaim associates a task group with a host volume ID. It's
// used for stateful deployments, i.e., volume requests with "sticky" set to
// true.
type TaskGroupVolumeClaim struct {
ID string
JobID string
TaskGroupName string
AllocID string // TODO: do we need this?

VolumeID string
VolumeName string

// Hash is the hashed value of the claim and is generated using all fields
// from the full object except the create and modify times and indexes.
Hash []byte

CreateTime time.Time
ModifyTime time.Time
CreateIndex uint64
ModifyIndex uint64
}

// SetHash is used to compute and set the hash of the task group volume claim.
// This should be called every time a user specified field on the method is
// changed before updating the Nomad state store.
func (tgvc *TaskGroupVolumeClaim) SetHash() []byte {
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved

// Initialize a 256bit Blake2 hash (32 bytes).
hash, err := blake2b.New256(nil)
if err != nil {
panic(err)
}

_, _ = hash.Write([]byte(tgvc.ID))
_, _ = hash.Write([]byte(tgvc.JobID))
_, _ = hash.Write([]byte(tgvc.TaskGroupName))
_, _ = hash.Write([]byte(tgvc.AllocID))
_, _ = hash.Write([]byte(tgvc.VolumeID))
_, _ = hash.Write([]byte(tgvc.VolumeName))

// Finalize the hash.
hashVal := hash.Sum(nil)

// Set and return the hash.
tgvc.Hash = hashVal
return hashVal
}

func (tgvc *TaskGroupVolumeClaim) Equal(otherClaim *TaskGroupVolumeClaim) bool {
if tgvc == nil || otherClaim == nil {
return tgvc == otherClaim
}

if len(tgvc.Hash) == 0 {
tgvc.SetHash()
}
if len(otherClaim.Hash) == 0 {
otherClaim.SetHash()
}

return bytes.Equal(tgvc.Hash, otherClaim.Hash)
}

// CheckRestart describes if and when a task should be restarted based on
// failing health checks.
type CheckRestart struct {
Expand Down
Loading