Skip to content

Commit

Permalink
a few missing pieces
Browse files Browse the repository at this point in the history
  • Loading branch information
pkazmierczak committed Jan 31, 2025
1 parent 22f5827 commit 42a727a
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 85 deletions.
32 changes: 32 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyHostVolumeRegister(msgType, buf[1:], log.Index)
case structs.HostVolumeDeleteRequestType:
return n.applyHostVolumeDelete(msgType, buf[1:], log.Index)
case structs.TaskGroupVolumeClaimRegisterRequestType:
return n.applyTaskGroupVolumeClaim(msgType, buf[1:], log.Index)

Check failure on line 393 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-windows

n.applyTaskGroupVolumeClaim undefined (type *nomadFSM has no field or method applyTaskGroupVolumeClaim)

Check failure on line 393 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

n.applyTaskGroupVolumeClaim undefined (type *nomadFSM has no field or method applyTaskGroupVolumeClaim)

Check failure on line 393 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

n.applyTaskGroupVolumeClaim undefined (type *nomadFSM has no field or method applyTaskGroupVolumeClaim)

Check failure on line 393 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

n.applyTaskGroupVolumeClaim undefined (type *nomadFSM has no field or method applyTaskGroupVolumeClaim)

Check failure on line 393 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

n.applyTaskGroupVolumeClaim undefined (type *nomadFSM has no field or method applyTaskGroupVolumeClaim)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -2450,6 +2452,36 @@ func (n *nomadFSM) applyHostVolumeDelete(msgType structs.MessageType, buf []byte
return nil
}

func (n *nomadFSM) applyTaskVolumeClaimRegister(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_task_group_volume_claim_register"}, time.Now())

var req structs.TaskGroupVolumeClaimRegisterRequest

Check failure on line 2458 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-windows

undefined: structs.TaskGroupVolumeClaimRegisterRequest

Check failure on line 2458 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

undefined: structs.TaskGroupVolumeClaimRegisterRequest

Check failure on line 2458 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

undefined: structs.TaskGroupVolumeClaimRegisterRequest

Check failure on line 2458 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: structs.TaskGroupVolumeClaimRegisterRequest

Check failure on line 2458 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: structs.TaskGroupVolumeClaimRegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpsertTaskGroupVolumeClaim(index, req.Volume); err != nil {
n.logger.Error("UpsertTaskGroupVolumeClaim failed", "error", err)
return err
}
return nil
}

func (n *nomadFSM) applyTaskVolumeClaimDelete(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_task_group_volume_claim_delete"}, time.Now())

var req structs.TaskGroupVolumeClaimDeleteRequest

Check failure on line 2473 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-windows

undefined: structs.TaskGroupVolumeClaimDeleteRequest

Check failure on line 2473 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

undefined: structs.TaskGroupVolumeClaimDeleteRequest

Check failure on line 2473 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

undefined: structs.TaskGroupVolumeClaimDeleteRequest

Check failure on line 2473 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: structs.TaskGroupVolumeClaimDeleteRequest

Check failure on line 2473 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

undefined: structs.TaskGroupVolumeClaimDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteTaskGroupVolumeClaim(index, req.RequestNamespace(), req.VolumeID); err != nil {

Check failure on line 2478 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-windows

too many arguments in call to n.state.DeleteTaskGroupVolumeClaim

Check failure on line 2478 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-vault

too many arguments in call to n.state.DeleteTaskGroupVolumeClaim

Check failure on line 2478 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / test-e2e-consul

too many arguments in call to n.state.DeleteTaskGroupVolumeClaim

Check failure on line 2478 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

too many arguments in call to n.state.DeleteTaskGroupVolumeClaim

Check failure on line 2478 in nomad/fsm.go

View workflow job for this annotation

GitHub Actions / checks / checks

too many arguments in call to n.state.DeleteTaskGroupVolumeClaim
n.logger.Error("DeleteTaskGroupVolumeClaim failed", "error", err)
return err
}
return nil
}

func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
// Register the nodes
Expand Down
40 changes: 40 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hashicorp/go-set/v3"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/lang"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -4125,6 +4126,45 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
if alloc.Job == nil {
return fmt.Errorf("attempting to upsert allocation %q without a job", alloc.ID)
}

// Check if the alloc requires sticky volumes. If yes, find a node
// that has the right volume and update the task group volume
// claims table
stickyVolumes := []*structs.TaskGroupVolumeClaim{}
for _, tg := range alloc.Job.TaskGroups {
for _, v := range tg.Volumes {
if !v.Sticky {
continue
}
stickyVolumes = append(stickyVolumes, &structs.TaskGroupVolumeClaim{
ID: uuid.Generate(),
JobID: alloc.JobID,
TaskGroupName: tg.Name,
AllocID: alloc.ID,
VolumeName: v.Source,
})
}
}

if len(stickyVolumes) > 0 {
allocNode, err := s.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}

for _, v := range allocNode.HostVolumes {
// Record volumes that this allocation uses in the claims table
for _, sv := range stickyVolumes {
if sv.VolumeName != v.Name {
continue
}
sv.VolumeID = v.ID
if err := s.UpsertTaskGroupVolumeClaim(index, sv); err != nil {
return err
}
}
}
}
} else {
alloc.CreateIndex = exist.CreateIndex
alloc.ModifyIndex = index
Expand Down
109 changes: 29 additions & 80 deletions nomad/state/state_store_task_group_volume_claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (

// UpsertACLBindingRules is used to insert a task group volume assignment into
// the state store.
func (s *StateStore) UpsertTaskGroupVolumeAssociation(
index uint64, association *structs.TaskGroupVolumeClaim) error {
func (s *StateStore) UpsertTaskGroupVolumeClaim(
index uint64, claim *structs.TaskGroupVolumeClaim) error {

// Grab a write transaction.
txn := s.db.WriteTxnMsgT(structs.TaskGroupVolumeAssociationRegisterRequestType, index)
txn := s.db.WriteTxnMsgT(structs.TaskGroupVolumeClaimRegisterRequestType, index)
defer txn.Abort()

existingRaw, err := txn.First(TableTaskGroupVolumeClaim, indexID, association.ID)
existingRaw, err := txn.First(TableTaskGroupVolumeClaim, indexID, claim.ID)
if err != nil {
return fmt.Errorf("Task group volume association lookup failed: %v", err)
}
Expand All @@ -31,20 +31,20 @@ func (s *StateStore) UpsertTaskGroupVolumeAssociation(
}

if existing != nil {
if existing.Equal(association) {
if existing.Equal(claim) {
return nil
}

association.CreateIndex = existing.CreateIndex
association.ModifyIndex = index
association.CreateTime = existing.CreateTime
claim.CreateIndex = existing.CreateIndex
claim.ModifyIndex = index
claim.CreateTime = existing.CreateTime
} else {
association.CreateIndex = index
association.ModifyIndex = index
claim.CreateIndex = index
claim.ModifyIndex = index
}

// Insert the claim into the table.
if err := txn.Insert(TableTaskGroupVolumeClaim, association); err != nil {
if err := txn.Insert(TableTaskGroupVolumeClaim, claim); err != nil {
return fmt.Errorf("Task group volume claim insert failed: %v", err)
}

Expand All @@ -56,93 +56,42 @@ func (s *StateStore) UpsertTaskGroupVolumeAssociation(
return txn.Commit()
}

// DeleteACLBindingRules is responsible for batch deleting ACL binding rules.
// It uses a single write transaction for efficiency, however, any error means
// no entries will be committed. An error is produced if a rule is not found
// within state which has been passed within the array.
func (s *StateStore) DeleteTaskGroupVolumeAssociation(index uint64, bindingRuleIDs []string) error {
txn := s.db.WriteTxnMsgT(structs.ACLBindingRulesDeleteRequestType, index)
// DeleteTaskGroupVolumeClaim is responsible for deleting volume claims.
func (s *StateStore) DeleteTaskGroupVolumeClaim(index uint64, claimID string) error {
txn := s.db.WriteTxnMsgT(structs.TaskGroupVolumeClaimDeleteRequestType, index)
defer txn.Abort()

for _, ruleID := range bindingRuleIDs {
if err := s.deleteACLBindingRuleTxn(txn, ruleID); err != nil {
return err
}
}

// Update the index table to indicate an update has occurred.
if err := txn.Insert(tableIndex, &IndexEntry{TableACLBindingRules, index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return txn.Commit()
}

// deleteACLBindingRuleTxn deletes a single ACL binding rule from the state
// store using the provided write transaction. It is the responsibility of the
// caller to update the index table.
func (s *StateStore) deleteTaskGroupVolumeAssociationTxn(txn *txn, ruleID string) error {
existing, err := txn.First(TableACLBindingRules, indexID, ruleID)
existing, err := txn.First(TableTaskGroupVolumeClaim, indexID, claimID)
if err != nil {
return fmt.Errorf("ACL binding rule lookup failed: %v", err)
return fmt.Errorf("Task group volume claim lookup failed: %v", err)
}
if existing == nil {
return errors.New("ACL binding rule not found")
}

// Delete the existing entry from the table.
if err := txn.Delete(TableACLBindingRules, existing); err != nil {
return fmt.Errorf("ACL binding rule deletion failed: %v", err)
if err := txn.Delete(TableTaskGroupVolumeClaim, existing); err != nil {
return fmt.Errorf("Task group volume claim deletion failed: %v", err)
}
return nil
}

// GetACLBindingRules returns an iterator that contains all ACL binding rules
// stored within state.
func (s *StateStore) GetTaskGroupVolumeAssociations(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

// Walk the entire table to get all ACL binding rules.
iter, err := txn.Get(TableACLBindingRules, indexID)
if err != nil {
return nil, fmt.Errorf("ACL binding rules lookup failed: %v", err)
// Update the index table to indicate an update has occurred.
if err := txn.Insert(tableIndex, &IndexEntry{TableTaskGroupVolumeClaim, index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
ws.Add(iter.WatchCh())

return iter, nil
return txn.Commit()
}

// GetACLBindingRule returns a single ACL binding rule specified by the input
// ID. The binding rule object will be nil, if no matching entry was found; it
// is the responsibility of the caller to check for this.
func (s *StateStore) GetTaskGroupVolumeAssociation(ws memdb.WatchSet, ruleID string) (*structs.ACLBindingRule, error) {
// GetTaskGroupVolumeClaims returns an iterator that contains all task group
// volume associations stored within state.
func (s *StateStore) GetTaskGroupVolumeClaims(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

// Perform the ACL binding rule lookup using the ID.
watchCh, existing, err := txn.FirstWatch(TableACLBindingRules, indexID, ruleID)
iter, err := txn.Get(TableTaskGroupVolumeClaim, indexID)
if err != nil {
return nil, fmt.Errorf("ACL binding rule lookup failed: %v", err)
return nil, fmt.Errorf("Task group volume claims lookup failed: %v", err)
}
ws.Add(watchCh)
ws.Add(iter.WatchCh())

if existing != nil {
return existing.(*structs.ACLBindingRule), nil
}
return nil, nil
return iter, nil
}

// GetACLBindingRulesByAuthMethod returns an iterator with all binding rules
// associated with the named authentication method.
// func (s *StateStore) GetACLBindingRulesByAuthMethod(
// ws memdb.WatchSet, authMethod string) (memdb.ResultIterator, error) {
//
// txn := s.db.ReadTxn()
//
// iter, err := txn.Get(TableACLBindingRules, indexAuthMethod, authMethod)
// if err != nil {
// return nil, fmt.Errorf("ACL binding rule lookup failed: %v", err)
// }
// ws.Add(iter.WatchCh())
//
// return iter, nil
// }
13 changes: 8 additions & 5 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ const (
NamespaceDeleteRequestType MessageType = 65

// MessageTypes 66-74 are in Nomad Enterprise
HostVolumeRegisterRequestType MessageType = 75
HostVolumeDeleteRequestType MessageType = 76
TaskGroupVolumeAssociationRegisterRequestType MessageType = 77
TaskGroupVolumeAssociationDeleteRequestType MessageType = 78
HostVolumeRegisterRequestType MessageType = 75
HostVolumeDeleteRequestType MessageType = 76
TaskGroupVolumeClaimRegisterRequestType MessageType = 77
TaskGroupVolumeClaimDeleteRequestType MessageType = 78

// NOTE: MessageTypes are shared between CE and ENT. If you need to add a
// new type, check that ENT is not already using that value.
Expand Down Expand Up @@ -7785,7 +7785,8 @@ type TaskGroupVolumeClaim struct {
TaskGroupName string
AllocID string // TODO: do we need this?

VolumeID string
VolumeID string
VolumeName string

// Hash is the hashed value of the claim and is generated using all fields
// from the full object except the create and modify times and indexes.
Expand Down Expand Up @@ -7813,13 +7814,15 @@ func (tgvc *TaskGroupVolumeClaim) SetHash() []byte {
_, _ = hash.Write([]byte(tgvc.TaskGroupName))
_, _ = hash.Write([]byte(tgvc.AllocID))
_, _ = hash.Write([]byte(tgvc.VolumeID))
_, _ = hash.Write([]byte(tgvc.VolumeName))

// Finalize the hash.
hashVal := hash.Sum(nil)

// Set and return the hash.
tgvc.Hash = hashVal
return hashVal
}

func (tgvc *TaskGroupVolumeClaim) Equal(otherClaim *TaskGroupVolumeClaim) bool {
if tgvc == nil || otherClaim == nil {
Expand Down

0 comments on commit 42a727a

Please sign in to comment.