diff --git a/pkg/common/errors.go b/pkg/common/errors.go index f73c84b46..7e28426bd 100644 --- a/pkg/common/errors.go +++ b/pkg/common/errors.go @@ -20,11 +20,24 @@ package common import "errors" -// InvalidQueueName returned when queue name is invalid -var InvalidQueueName = errors.New("invalid queue name, max 64 characters consisting of alphanumeric characters and '-', '_', '#', '@', '/', ':' allowed") +var ( + // InvalidQueueName returned when queue name is invalid + InvalidQueueName = errors.New("invalid queue name, max 64 characters consisting of alphanumeric characters and '-', '_', '#', '@', '/', ':' allowed") + // ErrorReservingAlloc returned when an ask that is allocated tries to reserve a node. + ErrorReservingAlloc = errors.New("ask already allocated, no reservation allowed") + // ErrorDuplicateReserve returned when the same reservation already exists on the application + ErrorDuplicateReserve = errors.New("reservation already exists") + // ErrorNodeAlreadyReserved returned when the node is already reserved, failing the reservation + ErrorNodeAlreadyReserved = errors.New("node is already reserved") + // ErrorNodeNotFitReserve returned when the allocation does not fit on an empty node, failing the reservation + ErrorNodeNotFitReserve = errors.New("reservation does not fit on node") +) -const PreemptionPreconditionsFailed = "Preemption preconditions failed" -const PreemptionDoesNotGuarantee = "Preemption queue guarantees check failed" -const PreemptionShortfall = "Preemption helped but short of resources" -const PreemptionDoesNotHelp = "Preemption does not help" -const NoVictimForRequiredNode = "No fit on required node, preemption does not help" +// Constant messages for AllocationLog entries +const ( + PreemptionPreconditionsFailed = "Preemption preconditions failed" + PreemptionDoesNotGuarantee = "Preemption queue guarantees check failed" + PreemptionShortfall = "Preemption helped but short of resources" + PreemptionDoesNotHelp = "Preemption does not help" + NoVictimForRequiredNode = "No fit on required node, preemption does not help" +) diff --git a/pkg/scheduler/objects/allocation.go b/pkg/scheduler/objects/allocation.go index a4701cd68..0ef052221 100644 --- a/pkg/scheduler/objects/allocation.go +++ b/pkg/scheduler/objects/allocation.go @@ -48,7 +48,6 @@ type Allocation struct { allowPreemptOther bool originator bool tags map[string]string - resKeyWithoutNode string // the reservation key without node foreign bool preemptable bool @@ -57,9 +56,8 @@ type Allocation struct { allocLog map[string]*AllocationLogEntry preemptionTriggered bool preemptCheckTime time.Time - schedulingAttempted bool // whether scheduler core has tried to schedule this allocation - scaleUpTriggered bool // whether this aloocation has triggered autoscaling or not - resKeyPerNode map[string]string // reservation key for a given node + schedulingAttempted bool // whether scheduler core has tried to schedule this allocation + scaleUpTriggered bool // whether this allocation has triggered autoscaling or not allocatedResource *resources.Resource askEvents *schedEvt.AskEvents userQuotaCheckFailed bool @@ -145,8 +143,6 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation { allowPreemptOther: alloc.PreemptionPolicy.GetAllowPreemptOther(), originator: alloc.Originator, allocLog: make(map[string]*AllocationLogEntry), - resKeyPerNode: make(map[string]string), - resKeyWithoutNode: reservationKeyWithoutNode(alloc.ApplicationID, alloc.AllocationKey), askEvents: schedEvt.NewAskEvents(events.GetEventSystem()), allocated: allocated, nodeID: nodeID, @@ -554,18 +550,6 @@ func (a *Allocation) HasTriggeredScaleUp() bool { return a.scaleUpTriggered } -func (a *Allocation) setReservationKeyForNode(node, resKey string) { - a.Lock() - defer a.Unlock() - a.resKeyPerNode[node] = resKey -} - -func (a *Allocation) getReservationKeyForNode(node string) string { - a.RLock() - defer a.RUnlock() - return a.resKeyPerNode[node] -} - func (a *Allocation) setHeadroomCheckFailed(headroom *resources.Resource, queue string) { a.Lock() defer a.Unlock() diff --git a/pkg/scheduler/objects/allocation_result.go b/pkg/scheduler/objects/allocation_result.go index bcd56a86c..99af95b72 100644 --- a/pkg/scheduler/objects/allocation_result.go +++ b/pkg/scheduler/objects/allocation_result.go @@ -36,10 +36,11 @@ func (art AllocationResultType) String() string { } type AllocationResult struct { - ResultType AllocationResultType - Request *Allocation - NodeID string - ReservedNodeID string + ResultType AllocationResultType + Request *Allocation + NodeID string + ReservedNodeID string + CancelledReservations int } func (ar *AllocationResult) String() string { diff --git a/pkg/scheduler/objects/allocation_test.go b/pkg/scheduler/objects/allocation_test.go index 62bf316a9..6917acac9 100644 --- a/pkg/scheduler/objects/allocation_test.go +++ b/pkg/scheduler/objects/allocation_test.go @@ -51,7 +51,6 @@ func TestNewAsk(t *testing.T) { askStr := ask.String() expected := "allocationKey ask-1, applicationID app-1, Resource map[first:10], Allocated false" assert.Equal(t, askStr, expected, "Strings should have been equal") - assert.Equal(t, "app-1|ask-1", ask.resKeyWithoutNode) //nolint:staticcheck } func TestAskAllocateDeallocate(t *testing.T) { diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 60ad55ca8..adff38175 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -452,41 +452,44 @@ func (sa *Application) timeoutPlaceholderProcessing() { func (sa *Application) GetReservations() []string { sa.RLock() defer sa.RUnlock() - keys := make([]string, 0) + keys := make([]string, len(sa.reservations)) + var i int for key := range sa.reservations { - keys = append(keys, key) + keys[i] = key + i++ } return keys } -// Return the allocation ask for the key, nil if not found +// GetAllocationAsk returns the allocation alloc for the key, nil if not found func (sa *Application) GetAllocationAsk(allocationKey string) *Allocation { sa.RLock() defer sa.RUnlock() return sa.requests[allocationKey] } -// Return the allocated resources for this application +// GetAllocatedResource returns the currently allocated resources for this application func (sa *Application) GetAllocatedResource() *resources.Resource { sa.RLock() defer sa.RUnlock() return sa.allocatedResource.Clone() } +// GetMaxAllocatedResource returns the peak of the allocated resources for this application func (sa *Application) GetMaxAllocatedResource() *resources.Resource { sa.RLock() defer sa.RUnlock() return sa.maxAllocatedResource.Clone() } -// Return the allocated placeholder resources for this application +// GetPlaceholderResource returns the currently allocated placeholder resources for this application func (sa *Application) GetPlaceholderResource() *resources.Resource { sa.RLock() defer sa.RUnlock() return sa.allocatedPlaceholder.Clone() } -// Return the total placeholder ask for this application +// GetPlaceholderAsk returns the total placeholder resource request for this application // Is only set on app creation and used when app is added to a queue func (sa *Application) GetPlaceholderAsk() *resources.Resource { sa.RLock() @@ -501,8 +504,8 @@ func (sa *Application) GetPendingResource() *resources.Resource { return sa.pending } -// Remove one or more allocation asks from this application. -// This also removes any reservations that are linked to the ask. +// RemoveAllocationAsk removes one or more allocation asks from this application. +// This also removes any reservations that are linked to the allocations. // The return value is the number of reservations released func (sa *Application) RemoveAllocationAsk(allocKey string) int { sa.Lock() @@ -517,23 +520,16 @@ func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord return 0 } var deltaPendingResource *resources.Resource = nil - // when allocation key not specified, cleanup all allocation ask + // when allocation key is not specified, cleanup all allocations var toRelease int if allocKey == "" { // cleanup all reservations - for key, reserve := range sa.reservations { - releases, err := sa.unReserveInternal(reserve.node, reserve.ask) - if err != nil { - log.Log(log.SchedApplication).Warn("Removal of reservation failed while removing all allocation asks", - zap.String("appID", sa.ApplicationID), - zap.String("reservationKey", key), - zap.Error(err)) - continue - } - // clean up the queue reservation (one at a time) - sa.queue.UnReserve(sa.ApplicationID, releases) + for _, reserve := range sa.reservations { + releases := sa.unReserveInternal(reserve) toRelease += releases } + // clean up the queue reservation + sa.queue.UnReserve(sa.ApplicationID, toRelease) // Cleanup total pending resource deltaPendingResource = sa.pending sa.pending = resources.NewResource() @@ -546,16 +542,8 @@ func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord sa.queue.UpdateApplicationPriority(sa.ApplicationID, sa.askMaxPriority) } else { // cleanup the reservation for this allocation - for _, key := range sa.GetAskReservations(allocKey) { - reserve := sa.reservations[key] - releases, err := sa.unReserveInternal(reserve.node, reserve.ask) - if err != nil { - log.Log(log.SchedApplication).Warn("Removal of reservation failed while removing allocation ask", - zap.String("appID", sa.ApplicationID), - zap.String("reservationKey", key), - zap.Error(err)) - continue - } + if reserve, ok := sa.reservations[allocKey]; ok { + releases := sa.unReserveInternal(reserve) // clean up the queue reservation sa.queue.UnReserve(sa.ApplicationID, releases) toRelease += releases @@ -576,7 +564,7 @@ func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord } // clean up the queue pending resources sa.queue.decPendingResource(deltaPendingResource) - // Check if we need to change state based on the ask removal: + // Check if we need to change state based on the removal: // 1) if pending is zero (no more asks left) // 2) if confirmed allocations is zero (no real tasks running) // Change the state to completing. @@ -808,152 +796,145 @@ func (sa *Application) HasReserved() bool { return len(sa.reservations) > 0 } -// IsReservedOnNode returns true if and only if the node has been reserved by the application -// An empty nodeID is never reserved. -func (sa *Application) IsReservedOnNode(nodeID string) bool { - if nodeID == "" { - return false - } +// NodeReservedForAsk returns the nodeID that has been reserved by the application for the ask +// An empty nodeID means the ask is not reserved. An empty askKey is never reserved. +func (sa *Application) NodeReservedForAsk(askKey string) string { sa.RLock() defer sa.RUnlock() - // make sure matches only for the whole nodeID - separator := nodeID + "|" - for key := range sa.reservations { - if strings.HasPrefix(key, separator) { - return true - } + if reserved, ok := sa.reservations[askKey]; ok { + return reserved.nodeID } - return false + return "" } -// Reserve the application for this node and ask combination. +// Reserve the application for this node and alloc combination. // If the reservation fails the function returns false, if the reservation is made it returns true. -// If the node and ask combination was already reserved for the application this is a noop and returns true. +// If the node and alloc combination was already reserved for the application this is a noop and returns true. func (sa *Application) Reserve(node *Node, ask *Allocation) error { + if node == nil || ask == nil { + return fmt.Errorf("reservation creation failed node or alloc are nil on appID %s", sa.ApplicationID) + } sa.Lock() defer sa.Unlock() return sa.reserveInternal(node, ask) } -// Unlocked version for Reserve that really does the work. +// reserveInternal is the unlocked version for Reserve that really does the work. // Must only be called while holding the application lock. func (sa *Application) reserveInternal(node *Node, ask *Allocation) error { + allocKey := ask.GetAllocationKey() + if sa.requests[allocKey] == nil { + log.Log(log.SchedApplication).Debug("alloc is not registered to this app", + zap.String("app", sa.ApplicationID), + zap.String("allocKey", allocKey)) + return fmt.Errorf("reservation creation failed alloc %s not found on appID %s", allocKey, sa.ApplicationID) + } // create the reservation (includes nil checks) nodeReservation := newReservation(node, sa, ask, true) if nodeReservation == nil { log.Log(log.SchedApplication).Debug("reservation creation failed unexpectedly", zap.String("app", sa.ApplicationID), - zap.Any("node", node), - zap.Any("ask", ask)) - return fmt.Errorf("reservation creation failed node or ask are nil on appID %s", sa.ApplicationID) - } - allocKey := ask.GetAllocationKey() - if sa.requests[allocKey] == nil { - log.Log(log.SchedApplication).Debug("ask is not registered to this app", - zap.String("app", sa.ApplicationID), - zap.String("allocKey", allocKey)) - return fmt.Errorf("reservation creation failed ask %s not found on appID %s", allocKey, sa.ApplicationID) + zap.Stringer("node", node), + zap.Stringer("alloc", ask)) + return fmt.Errorf("reservation creation failed node or alloc are nil on appID %s", sa.ApplicationID) } - if !sa.canAskReserve(ask) { - if ask.IsAllocated() { - return fmt.Errorf("ask is already allocated") - } else { - return fmt.Errorf("ask is already reserved") - } + // the alloc should not have reserved a node yet: do not allow multiple nodes to be reserved + if err := sa.canAllocationReserve(ask); err != nil { + return err } // check if we can reserve the node before reserving on the app if err := node.Reserve(sa, ask); err != nil { return err } - sa.reservations[nodeReservation.getKey()] = nodeReservation + sa.reservations[allocKey] = nodeReservation log.Log(log.SchedApplication).Info("reservation added successfully", zap.String("app", sa.ApplicationID), zap.String("node", node.NodeID), - zap.String("ask", allocKey)) + zap.String("alloc", allocKey)) return nil } -// UnReserve the application for this node and ask combination. -// This first removes the reservation from the node. +// UnReserve the application for this node and alloc combination. // If the reservation does not exist it returns 0 for reservations removed, if the reservation is removed it returns 1. // The error is set if the reservation key cannot be removed from the app or node. -func (sa *Application) UnReserve(node *Node, ask *Allocation) (int, error) { +func (sa *Application) UnReserve(node *Node, ask *Allocation) int { + log.Log(log.SchedApplication).Info("unreserving allocation from application", + zap.String("appID", sa.ApplicationID), + zap.Stringer("node", node), + zap.Stringer("alloc", ask)) + if node == nil || ask == nil { + return 0 + } sa.Lock() defer sa.Unlock() - return sa.unReserveInternal(node, ask) + reserve, ok := sa.reservations[ask.allocationKey] + if !ok { + log.Log(log.SchedApplication).Debug("reservation not found on application", + zap.String("appID", sa.ApplicationID), + zap.String("allocationKey", ask.allocationKey)) + return 0 + } + if reserve.nodeID != node.NodeID { + log.Log(log.SchedApplication).Warn("UnReserve: provided info not consistent with reservation", + zap.String("appID", sa.ApplicationID), + zap.String("node", reserve.nodeID), + zap.String("alloc", reserve.allocKey)) + } + return sa.unReserveInternal(reserve) } // Unlocked version for UnReserve that really does the work. +// This is idempotent and will not fail // Must only be called while holding the application lock. -func (sa *Application) unReserveInternal(node *Node, ask *Allocation) (int, error) { - resKey := reservationKey(node, nil, ask) - if resKey == "" { - log.Log(log.SchedApplication).Debug("unreserve reservation key create failed unexpectedly", - zap.String("appID", sa.ApplicationID), - zap.Stringer("node", node), - zap.Stringer("ask", ask)) - return 0, fmt.Errorf("reservation key failed node or ask are nil for appID %s", sa.ApplicationID) +func (sa *Application) unReserveInternal(reserve *reservation) int { + // this should not happen + if reserve == nil { + return 0 } // unReserve the node before removing from the app - var num int - var err error - if num, err = node.unReserve(sa, ask); err != nil { - return 0, err - } + num := reserve.node.unReserve(reserve.alloc) // if the unreserve worked on the node check the app - if _, found := sa.reservations[resKey]; found { + if _, found := sa.reservations[reserve.allocKey]; found { // worked on the node means either found or not but no error, log difference here if num == 0 { log.Log(log.SchedApplication).Info("reservation not found while removing from node, app has reservation", zap.String("appID", sa.ApplicationID), - zap.String("nodeID", node.NodeID), - zap.String("ask", ask.GetAllocationKey())) + zap.String("nodeID", reserve.nodeID), + zap.String("alloc", reserve.allocKey)) } - delete(sa.reservations, resKey) - log.Log(log.SchedApplication).Info("reservation removed successfully", zap.String("node", node.NodeID), - zap.String("app", ask.GetApplicationID()), zap.String("ask", ask.GetAllocationKey())) - return 1, nil + delete(sa.reservations, reserve.allocKey) + log.Log(log.SchedApplication).Info("reservation removed successfully", + zap.String("appID", sa.ApplicationID), + zap.String("node", reserve.nodeID), + zap.String("alloc", reserve.allocKey)) + return 1 } // reservation was not found log.Log(log.SchedApplication).Info("reservation not found while removing from app", zap.String("appID", sa.ApplicationID), - zap.String("nodeID", node.NodeID), - zap.String("ask", ask.GetAllocationKey()), + zap.String("node", reserve.nodeID), + zap.String("alloc", reserve.allocKey), zap.Int("nodeReservationsRemoved", num)) - return 0, nil + return 0 } -// Return the allocation reservations on any node. -// The returned array is 0 or more keys into the reservations map. +// canAllocationReserve Check if the allocation has already been reserved. An alloc can never reserve more than one node. // No locking must be called while holding the lock -func (sa *Application) GetAskReservations(allocKey string) []string { - reservationKeys := make([]string, 0) - if allocKey == "" { - return reservationKeys - } - for key := range sa.reservations { - if strings.HasSuffix(key, allocKey) { - reservationKeys = append(reservationKeys, key) - } - } - return reservationKeys -} - -// Check if the allocation has already been reserved. An ask can never reserve more than one node. -// No locking must be called while holding the lock -func (sa *Application) canAskReserve(ask *Allocation) bool { - allocKey := ask.GetAllocationKey() - if ask.IsAllocated() { - log.Log(log.SchedApplication).Debug("ask already allocated, no reservation allowed", - zap.String("askKey", allocKey)) - return false - } - if len(sa.GetAskReservations(allocKey)) > 0 { +func (sa *Application) canAllocationReserve(alloc *Allocation) error { + allocKey := alloc.GetAllocationKey() + if alloc.IsAllocated() { + log.Log(log.SchedApplication).Debug("allocation is marked as allocated, no reservation allowed", + zap.String("allocationKey", allocKey)) + return common.ErrorReservingAlloc + } + reserved := sa.reservations[allocKey] + if reserved != nil { log.Log(log.SchedApplication).Debug("reservation already exists", - zap.String("askKey", allocKey)) - return false + zap.String("allocKey", allocKey), + zap.String("nodeID", reserved.nodeID)) + return common.ErrorDuplicateReserve } - return true + return nil } func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, userHeadRoom *resources.Resource, total *[]*Allocation) { @@ -1045,42 +1026,13 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption requiredNode := request.GetRequiredNode() // does request have any constraint to run on specific node? if requiredNode != "" { - // the iterator might not have the node we need as it could be reserved, or we have not added it yet - node := getNodeFn(requiredNode) - if node == nil { - getRateLimitedAppLog().Info("required node is not found (could be transient)", - zap.String("application ID", sa.ApplicationID), - zap.String("allocationKey", request.GetAllocationKey()), - zap.String("required node", requiredNode)) - return nil - } - // Are there any non daemon set reservations on specific required node? - // Cancel those reservations to run daemon set pods - reservations := node.GetReservations() - if len(reservations) > 0 { - if !sa.cancelReservations(reservations) { - return nil - } - } - // we don't care about predicate error messages here - result, _ := sa.tryNode(node, request) //nolint:errcheck + result := sa.tryRequiredNode(request, getNodeFn) if result != nil { - // check if the node was reserved and we allocated after a release - if _, ok := sa.reservations[reservationKey(node, nil, request)]; ok { - log.Log(log.SchedApplication).Debug("allocation on required node after release", - zap.String("appID", sa.ApplicationID), - zap.String("nodeID", requiredNode), - zap.String("allocationKey", request.GetAllocationKey())) - result.ResultType = AllocatedReserved - return result - } - log.Log(log.SchedApplication).Debug("allocation on required node is completed", - zap.String("nodeID", node.NodeID), - zap.String("allocationKey", request.GetAllocationKey()), - zap.Stringer("resultType", result.ResultType)) return result } - return newReservedAllocationResult(node.NodeID, request) + // it did not allocate or reserve: should only happen if the node is not registered yet + // just continue with the next request + continue } iterator := nodeIterator() @@ -1108,50 +1060,86 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption return nil } -func (sa *Application) cancelReservations(reservations []*reservation) bool { - for _, res := range reservations { - // skip the node - if res.ask.GetRequiredNode() != "" { - log.Log(log.SchedApplication).Warn("reservation for ask with required node already exists on the node", - zap.String("required node", res.node.NodeID), - zap.String("existing ask reservation key", res.getKey())) - return false - } +// tryRequiredNode tries to place the allocation in the specific node that is set as the required node in the allocation. +// The first time the allocation is seen it will try to make the allocation on the node. If that does not work it will +// always trigger the reservation of the node. +func (sa *Application) tryRequiredNode(request *Allocation, getNodeFn func(string) *Node) *AllocationResult { + requiredNode := request.GetRequiredNode() + allocationKey := request.GetAllocationKey() + // the iterator might not have the node we need as it could be reserved, or we have not added it yet + node := getNodeFn(requiredNode) + if node == nil { + getRateLimitedAppLog().Info("required node is not found (could be transient)", + zap.String("application ID", sa.ApplicationID), + zap.String("allocationKey", allocationKey), + zap.String("required node", requiredNode)) + return nil } - var err error + // Are there any reservations on this node that does not specifically require this node? + // Cancel any reservations to make room for the allocations that require the node var num int + reservations := node.GetReservations() + if len(reservations) > 0 { + num = sa.cancelReservations(reservations) + } + _, thisReserved := sa.reservations[allocationKey] + // now try the request, we don't care about predicate error messages here + result, _ := sa.tryNode(node, request) //nolint:errcheck + if result != nil { + result.CancelledReservations = num + // check if the node was reserved and we allocated after a release + if thisReserved { + log.Log(log.SchedApplication).Debug("allocation on required node after release", + zap.String("appID", sa.ApplicationID), + zap.String("nodeID", requiredNode), + zap.String("allocationKey", allocationKey)) + result.ResultType = AllocatedReserved + return result + } + log.Log(log.SchedApplication).Debug("allocation on required node is completed", + zap.String("nodeID", node.NodeID), + zap.String("allocationKey", allocationKey), + zap.Stringer("resultType", result.ResultType)) + return result + } + // if this ask was already reserved we should not have deleted any reservations + // we also do not need to send back a reservation result and just return nil to check the next ask + if thisReserved { + return nil + } + result = newReservedAllocationResult(node.NodeID, request) + result.CancelledReservations = num + return result +} + +// cancelReservations will cancel all non required node reservations for a node. The list of reservations passed in is +// a copy of all reservations of a single node. This is called during the required node allocation cycle only. +// The returned int value is used to update the partition counter of active reservations. +func (sa *Application) cancelReservations(reservations []*reservation) int { + var released, num int // un reserve all the apps that were reserved on the node for _, res := range reservations { - thisApp := res.app.ApplicationID == sa.ApplicationID - if thisApp { - num, err = sa.unReserveInternal(res.node, res.ask) - } else { - num, err = res.app.UnReserve(res.node, res.ask) - } - if err != nil { - log.Log(log.SchedApplication).Warn("Unable to cancel reservations on node", - zap.String("victim application ID", res.app.ApplicationID), - zap.String("victim allocationKey", res.getKey()), - zap.String("required node", res.node.NodeID), - zap.Int("reservations count", num), - zap.String("application ID", sa.ApplicationID)) - return false - } else { - log.Log(log.SchedApplication).Info("Cancelled reservation on required node", - zap.String("affected application ID", res.app.ApplicationID), - zap.String("affected allocationKey", res.getKey()), - zap.String("required node", res.node.NodeID), - zap.Int("reservations count", num), - zap.String("application ID", sa.ApplicationID)) + // cleanup if the reservation does not have this node as a requirement + if res.alloc.requiredNode != "" { + continue } - // remove the reservation of the queue + thisApp := res.app.ApplicationID == sa.ApplicationID if thisApp { + num = sa.unReserveInternal(res) sa.queue.UnReserve(sa.ApplicationID, num) } else { + num = res.app.UnReserve(res.node, res.alloc) res.app.GetQueue().UnReserve(res.app.ApplicationID, num) } + log.Log(log.SchedApplication).Info("Cancelled reservation for required node allocation", + zap.String("triggered by appID", sa.ApplicationID), + zap.String("affected application ID", res.appID), + zap.String("affected allocationKey", res.allocKey), + zap.String("required node", res.nodeID), + zap.Int("reservations count", num)) + released += num } - return true + return released } // tryPlaceholderAllocate tries to replace a placeholder that is allocated with a real allocation @@ -1238,14 +1226,15 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, // pick the first fit and try all nodes if that fails give up var allocResult *AllocationResult if phFit != nil && reqFit != nil { + resKey := reqFit.GetAllocationKey() iterator.ForEachNode(func(node *Node) bool { if !node.IsSchedulable() { - log.Log(log.SchedApplication).Debug("skipping node for placeholder ask as state is unschedulable", - zap.String("allocationKey", reqFit.GetAllocationKey()), + log.Log(log.SchedApplication).Debug("skipping node for placeholder alloc as state is unschedulable", + zap.String("allocationKey", resKey), zap.String("node", node.NodeID)) return true } - if !node.preAllocateCheck(reqFit.GetAllocatedResource(), reservationKey(nil, sa, reqFit)) { + if !node.preAllocateCheck(reqFit.GetAllocatedResource(), resKey) { return true } // skip the node if conditions can not be satisfied @@ -1257,7 +1246,7 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, if !node.TryAddAllocation(reqFit) { log.Log(log.SchedApplication).Debug("Node update failed unexpectedly", zap.String("applicationID", sa.ApplicationID), - zap.Stringer("ask", reqFit), + zap.Stringer("alloc", reqFit), zap.Stringer("placeholder", phFit)) return false } @@ -1266,7 +1255,7 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly", zap.Error(err)) // unwind node allocation - _ = node.RemoveAllocation(reqFit.GetAllocationKey()) + _ = node.RemoveAllocation(resKey) return false } // allocation worked: on a non placeholder node update resultType and return @@ -1297,7 +1286,7 @@ func (sa *Application) checkHeadRooms(ask *Allocation, userHeadroom *resources.R return userHeadroom.FitInMaxUndef(ask.GetAllocatedResource()) && headRoom.FitInMaxUndef(ask.GetAllocatedResource()) } -// Try a reserved allocation of an outstanding reservation +// tryReservedAllocate tries allocating an outstanding reservation func (sa *Application) tryReservedAllocate(headRoom *resources.Resource, nodeIterator func() NodeIterator) *AllocationResult { sa.Lock() defer sa.Unlock() @@ -1306,14 +1295,14 @@ func (sa *Application) tryReservedAllocate(headRoom *resources.Resource, nodeIte // process all outstanding reservations and pick the first one that fits for _, reserve := range sa.reservations { - ask := sa.requests[reserve.askKey] + ask := sa.requests[reserve.allocKey] // sanity check and cleanup if needed if ask == nil || ask.IsAllocated() { var unreserveAsk *Allocation // if the ask was not found we need to construct one to unreserve if ask == nil { unreserveAsk = &Allocation{ - allocationKey: reserve.askKey, + allocationKey: reserve.allocKey, applicationID: sa.ApplicationID, allocLog: make(map[string]*AllocationLogEntry), } @@ -1348,17 +1337,17 @@ func (sa *Application) tryReservedAllocate(headRoom *resources.Resource, nodeIte // try this on all other nodes for _, reserve := range sa.reservations { - // Other nodes cannot be tried if the ask has a required node - ask := reserve.ask - if ask.GetRequiredNode() != "" { + // Other nodes cannot be tried if a required node is requested + alloc := reserve.alloc + if alloc.GetRequiredNode() != "" { continue } iterator := nodeIterator() if iterator != nil { - if !sa.checkHeadRooms(ask, userHeadroom, headRoom) { + if !sa.checkHeadRooms(alloc, userHeadroom, headRoom) { continue } - result := sa.tryNodesNoReserve(ask, iterator, reserve.nodeID) + result := sa.tryNodesNoReserve(alloc, iterator, reserve.nodeID) // have a candidate return it, including the node that was reserved if result != nil { return result @@ -1413,8 +1402,8 @@ func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask *Allo return false } -// Try all the nodes for a reserved request that have not been tried yet. -// This should never result in a reservation as the ask is already reserved +// tryNodesNoReserve tries all the nodes for a reserved request that have not been tried yet. +// This should never result in a reservation as the allocation is already reserved func (sa *Application) tryNodesNoReserve(ask *Allocation, iterator NodeIterator, reservedNode string) *AllocationResult { var allocResult *AllocationResult iterator.ForEachNode(func(node *Node) bool { @@ -1449,14 +1438,13 @@ func (sa *Application) tryNodesNoReserve(ask *Allocation, iterator NodeIterator, func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator) *AllocationResult { var nodeToReserve *Node scoreReserved := math.Inf(1) - // check if the ask is reserved or not + // check if the alloc is reserved or not allocKey := ask.GetAllocationKey() - reservedAsks := sa.GetAskReservations(allocKey) - allowReserve := !ask.IsAllocated() && len(reservedAsks) == 0 + reserved := sa.reservations[allocKey] var allocResult *AllocationResult var predicateErrors map[string]int iterator.ForEachNode(func(node *Node) bool { - // skip the node if the node is not valid for the ask + // skip the node if the node is not schedulable if !node.IsSchedulable() { log.Log(log.SchedApplication).Debug("skipping node for ask as state is unschedulable", zap.String("allocationKey", allocKey), @@ -1478,28 +1466,23 @@ func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator) *Allocat // allocation worked so return if result != nil { metrics.GetSchedulerMetrics().ObserveTryNodeLatency(tryNodeStart) - // check if the node was reserved for this ask: if it is set the resultType and return - // NOTE: this is a safeguard as reserved nodes should never be part of the iterator - // but we have no locking - if _, ok := sa.reservations[reservationKey(node, nil, ask)]; ok { - log.Log(log.SchedApplication).Debug("allocate found reserved ask during non reserved allocate", - zap.String("appID", sa.ApplicationID), - zap.String("nodeID", node.NodeID), - zap.String("allocationKey", allocKey)) - result.ResultType = AllocatedReserved - allocResult = result - return false - } - // we could also have a different node reserved for this ask if it has pick one of - // the reserved nodes to unreserve (first one in the list) - if len(reservedAsks) > 0 { - nodeID := strings.TrimSuffix(reservedAsks[0], "|"+allocKey) - log.Log(log.SchedApplication).Debug("allocate picking reserved ask during non reserved allocate", - zap.String("appID", sa.ApplicationID), - zap.String("nodeID", nodeID), - zap.String("allocationKey", allocKey)) + // check if the alloc had a reservation: if it has set the resultType and return + if reserved != nil { + if reserved.nodeID != node.NodeID { + // we have a different node reserved for this alloc + log.Log(log.SchedApplication).Debug("allocate picking reserved alloc during non reserved allocate", + zap.String("appID", sa.ApplicationID), + zap.String("reserved nodeID", reserved.nodeID), + zap.String("allocationKey", allocKey)) + result.ReservedNodeID = reserved.nodeID + } else { + // NOTE: this is a safeguard as reserved nodes should never be part of the iterator + log.Log(log.SchedApplication).Debug("allocate found reserved alloc during non reserved allocate", + zap.String("appID", sa.ApplicationID), + zap.String("nodeID", node.NodeID), + zap.String("allocationKey", allocKey)) + } result.ResultType = AllocatedReserved - result.ReservedNodeID = nodeID allocResult = result return false } @@ -1509,14 +1492,14 @@ func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator) *Allocat } // nothing allocated should we look at a reservation? askAge := time.Since(ask.GetCreateTime()) - if allowReserve && askAge > reservationDelay { + if reserved == nil && askAge > reservationDelay { log.Log(log.SchedApplication).Debug("app reservation check", zap.String("allocationKey", allocKey), zap.Time("createTime", ask.GetCreateTime()), zap.Duration("askAge", askAge), zap.Duration("reservationDelay", reservationDelay)) score := node.GetFitInScoreForAvailableResource(ask.GetAllocatedResource()) - // Record the so-far best node to reserve + // Record the best node so-far to reserve if score < scoreReserved { scoreReserved = score nodeToReserve = node @@ -1540,7 +1523,7 @@ func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator) *Allocat zap.String("appID", sa.ApplicationID), zap.String("nodeID", nodeToReserve.NodeID), zap.String("allocationKey", allocKey), - zap.Int("reservations", len(reservedAsks))) + zap.Int("reservations", len(sa.reservations))) // skip the node if conditions can not be satisfied if nodeToReserve.preReserveConditions(ask) != nil { return nil @@ -1552,11 +1535,12 @@ func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator) *Allocat return nil } -// Try allocating on one specific node +// tryNode tries allocating on one specific node func (sa *Application) tryNode(node *Node, ask *Allocation) (*AllocationResult, error) { toAllocate := ask.GetAllocatedResource() + allocationKey := ask.GetAllocationKey() // create the key for the reservation - if !node.preAllocateCheck(toAllocate, reservationKey(nil, sa, ask)) { + if !node.preAllocateCheck(toAllocate, allocationKey) { // skip schedule onto node return nil, nil } @@ -1571,13 +1555,13 @@ func (sa *Application) tryNode(node *Node, ask *Allocation) (*AllocationResult, log.Log(log.SchedApplication).DPanic("queue update failed unexpectedly", zap.Error(err)) // revert the node update - node.RemoveAllocation(ask.GetAllocationKey()) + node.RemoveAllocation(allocationKey) return nil, nil } - // mark this ask as allocated + // mark this alloc as allocated _, err := sa.allocateAsk(ask) if err != nil { - log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly", + log.Log(log.SchedApplication).Warn("allocation of alloc failed unexpectedly", zap.Error(err)) } // all is OK, last update for the app diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 5b7eed879..c6cfca9cf 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -234,11 +234,8 @@ func TestAppReservation(t *testing.T) { if app.HasReserved() { t.Fatal("new app should not have reservations") } - if app.IsReservedOnNode("") { - t.Error("app should not have reservations for empty node ID") - } - if app.IsReservedOnNode("unknown") { - t.Error("new app should not have reservations for unknown node") + if app.NodeReservedForAsk("") != "" { + t.Error("app should not have reservations for empty ask") } queue, err := createRootQueue(nil) @@ -270,33 +267,21 @@ func TestAppReservation(t *testing.T) { // reserve that works err = app.Reserve(node, ask) assert.NilError(t, err, "reservation should not have failed") - if app.IsReservedOnNode("") { - t.Errorf("app should not have reservations for empty node ID") - } - if app.IsReservedOnNode("unknown") { - t.Error("app should not have reservations for unknown node") + if app.NodeReservedForAsk("") != "" { + t.Error("app should not have reservations for empty ask") } - if app.HasReserved() && !app.IsReservedOnNode(nodeID1) { + if app.HasReserved() && app.NodeReservedForAsk(aKey) != nodeID1 { t.Errorf("app should have reservations for node %s", nodeID1) } - // node name similarity check: chop of the last char to make sure we check the full name - similar := nodeID1[:len(nodeID1)-1] - if app.HasReserved() && app.IsReservedOnNode(similar) { - t.Errorf("similar app should not have reservations for node %s", similar) - } - // reserve the same reservation err = app.Reserve(node, ask) if err == nil { t.Errorf("reservation should have failed") } - // unreserve unknown node/ask - _, err = app.UnReserve(nil, nil) - if err == nil { - t.Errorf("illegal reservation release but did not fail") - } + // unreserve unknown node/alloc + assert.Equal(t, app.UnReserve(nil, nil), 0, "illegal reservation release should have returned 0") // 2nd reservation for app ask2 := newAllocationAsk("alloc-2", appID1, res) @@ -309,20 +294,16 @@ func TestAppReservation(t *testing.T) { } err = app.Reserve(node2, ask2) assert.NilError(t, err, "reservation of 2nd node should not have failed") - _, err = app.UnReserve(node2, ask2) - assert.NilError(t, err, "remove of reservation of 2nd node should not have failed") + assert.Equal(t, app.UnReserve(node2, ask2), 1, "remove of reservation of 2nd node should not have failed") // unreserve the same should fail - _, err = app.UnReserve(node2, ask2) - assert.NilError(t, err, "remove twice of reservation of 2nd node should have failed") + assert.Equal(t, app.UnReserve(node2, ask2), 0, "remove twice of reservation of 2nd node should return 0") // failure case: remove reservation from node, app still needs cleanup var num int - num, err = node.unReserve(app, ask) - assert.NilError(t, err, "un-reserve on node should not have failed") + num = node.unReserve(ask) assert.Equal(t, num, 1, "un-reserve on node should have removed reservation") - num, err = app.UnReserve(node, ask) - assert.NilError(t, err, "app has reservation should not have failed") + num = app.UnReserve(node, ask) assert.Equal(t, num, 1, "un-reserve on app should have removed reservation from app") } @@ -338,12 +319,9 @@ func TestAppAllocReservation(t *testing.T) { if app == nil || app.ApplicationID != appID1 { t.Fatalf("app create failed which should not have %v", app) } - if app.HasReserved() { - t.Fatal("new app should not have reservations") - } - if len(app.GetAskReservations("")) != 0 { - t.Fatal("new app should not have reservation for empty allocKey") - } + assert.Assert(t, !app.HasReserved(), "new app should not have reservations") + assert.Equal(t, len(app.GetReservations()), 0, "new app should not have reservation for empty allocKey") + queue, err := createRootQueue(nil) assert.NilError(t, err, "queue create failed") app.queue = queue @@ -360,46 +338,44 @@ func TestAppAllocReservation(t *testing.T) { assert.NilError(t, err, "ask2 should have been added to app") err = app.Reserve(node1, ask) assert.NilError(t, err, "reservation should not have failed") - if len(app.GetAskReservations("")) != 0 { + if app.reservations[""] != nil { t.Fatal("app should not have reservation for empty allocKey") } - nodeKey1 := nodeID1 + "|" + aKey - askReserved := app.GetAskReservations(aKey) - if len(askReserved) != 1 || askReserved[0] != nodeKey1 { - t.Errorf("app should have reservations for %s on %s and has not", aKey, nodeID1) + allocReserved := app.reservations[aKey] + if allocReserved == nil || allocReserved.nodeID != nodeID1 { + t.Fatalf("app should have reservations for %s on %s and has not", aKey, nodeID1) } + assert.Equal(t, len(app.GetReservations()), 1, "app should have 1 reservation") - nodeID2 := "node-2" node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 10}) err = app.Reserve(node2, ask2) assert.NilError(t, err, "reservation should not have failed: error %v", err) - nodeKey2 := nodeID2 + "|" + aKey2 - askReserved = app.GetAskReservations(aKey2) - if len(askReserved) != 1 && askReserved[0] != nodeKey2 { - t.Errorf("app should have reservations for %s on %s and has not", aKey, nodeID2) + allocReserved = app.reservations[aKey2] + if allocReserved == nil || allocReserved.nodeID != nodeID2 { + t.Fatalf("app should have reservations for %s on %s and has not", aKey, nodeID2) } + assert.Equal(t, len(app.GetReservations()), 2, "app should have 2 reservation") // check duplicate reserve: nothing should change - if app.canAskReserve(ask) { - t.Error("ask has already reserved, reserve check should have failed") - } + assert.Assert(t, app.canAllocationReserve(ask) != nil, "alloc has already reserved, reserve check should have failed") node3 := newNode("node-3", map[string]resources.Quantity{"first": 10}) err = app.Reserve(node3, ask) if err == nil { - t.Errorf("reservation should have failed") + t.Fatal("reservation should have failed") } - askReserved = app.GetAskReservations(aKey) - if len(askReserved) != 1 && askReserved[0] != nodeKey1 { - t.Errorf("app should have reservations for node %s and has not: %v", nodeID1, askReserved) + allocReserved = app.reservations[aKey] + if allocReserved == nil || allocReserved.nodeID != nodeID1 { + t.Fatalf("app should have reservations for node %s and has not: %v", nodeID1, allocReserved) } - askReserved = app.GetAskReservations(aKey2) - if len(askReserved) != 1 && askReserved[0] != nodeKey2 { - t.Errorf("app should have reservations for node %s and has not: %v", nodeID2, askReserved) + allocReserved = app.reservations[aKey2] + if allocReserved == nil || allocReserved.nodeID != nodeID2 { + t.Fatalf("app should have reservations for node %s and has not: %v", nodeID2, allocReserved) } + assert.Equal(t, len(app.GetReservations()), 2, "app should have 2 reservation") // clean up all asks and reservations - reservedAsks := app.RemoveAllocationAsk("") - if app.HasReserved() || node1.IsReserved() || node2.IsReserved() || reservedAsks != 2 { - t.Errorf("ask removal did not clean up all reservations, reserved released = %d", reservedAsks) + reservedRelease := app.RemoveAllocationAsk("") + if app.HasReserved() || node1.IsReserved() || node2.IsReserved() || reservedRelease != 2 { + t.Fatalf("ask removal did not clean up all reservations, reserved released = %d", reservedRelease) } } @@ -495,7 +471,6 @@ func TestAddAllocAsk(t *testing.T) { // test add alloc ask event noEvents := uint64(0) err = common.WaitForCondition(10*time.Millisecond, time.Second, func() bool { - fmt.Printf("checking event length: %d\n", eventSystem.Store.CountStoredEvents()) noEvents = eventSystem.Store.CountStoredEvents() return noEvents == 2 }) @@ -652,7 +627,7 @@ func TestRemoveReservedAllocAsk(t *testing.T) { node := newNode(nodeID1, map[string]resources.Quantity{"first": 10}) err = app.Reserve(node, ask2) assert.NilError(t, err, "reservation should not have failed") - if len(app.GetAskReservations(allocKey)) != 1 || !node.IsReserved() { + if app.reservations[allocKey] == nil || !node.IsReserved() { t.Fatalf("app should have reservation for %v on node", allocKey) } before := app.GetPendingResource().Clone() @@ -671,11 +646,10 @@ func TestRemoveReservedAllocAsk(t *testing.T) { err = app.Reserve(node, ask2) assert.NilError(t, err, "reservation should not have failed: error %v", err) - if len(app.GetAskReservations(allocKey)) != 1 || !node.IsReserved() { + if app.reservations[allocKey] == nil || !node.IsReserved() { t.Fatalf("app should have reservation for %v on node", allocKey) } - var num int - num, err = node.unReserve(app, ask2) + num := node.unReserve(ask2) assert.NilError(t, err, "un-reserve on node should not have failed") assert.Equal(t, num, 1, "un-reserve on node should have removed reservation") @@ -1954,6 +1928,235 @@ func TestCanReplace(t *testing.T) { } } +func TestTryRequiredNode(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 5}) + nodeMap := map[string]*Node{nodeID1: node} + getNode := func(nodeID string) *Node { + return nodeMap[nodeID] + } + resMap := map[string]string{"first": "5"} + rootQ, err := createRootQueue(resMap) + assert.NilError(t, err, "unexpected error when creating root queue") + var childQ *Queue + childQ, err = createManagedQueue(rootQ, "child", false, resMap) + assert.NilError(t, err, "unexpected error when creating child queue") + + app := newApplication(appID1, "default", "root.child") + app.SetQueue(childQ) + childQ.applications[appID1] = app + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3}) + alloc := newAllocation(aKey, nodeID1, allocRes) + app.AddAllocation(alloc) + node.AddAllocation(alloc) + + ask := newAllocationAsk(aKey2, appID1, allocRes) + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "unexpected error when adding an ask") + err = app.Reserve(node, ask) + assert.NilError(t, err, "unexpected error when reserving ask") + + // get a small enough allocation that fits after cancelling the reservation + allocRes = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) + ask = newAllocationAsk(aKey3, appID1, allocRes) + ask.requiredNode = nodeID1 + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "unexpected error when adding an ask") + result := app.tryRequiredNode(ask, getNode) + assert.Assert(t, result != nil, "alloc expected") + assert.Assert(t, result.Request == ask, "alloc expected for the ask") + assert.Equal(t, result.ResultType, Allocated, "expected allocated result") + assert.Equal(t, result.CancelledReservations, 1, "expected 1 cancelled reservation") + assert.Equal(t, nodeID1, result.NodeID, "wrong node") + // the non required node one should be cancelled + assert.Assert(t, !node.isReservedForAllocation(aKey2), "expecting no reservation for alloc-2 on node") + assert.Equal(t, app.NodeReservedForAsk(aKey2), "", "expecting no reservation for alloc-2 on node-1") +} + +func TestTryRequiredNodeReserved(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 5}) + nodeMap := map[string]*Node{nodeID1: node} + getNode := func(nodeID string) *Node { + return nodeMap[nodeID] + } + resMap := map[string]string{"first": "5"} + rootQ, err := createRootQueue(resMap) + assert.NilError(t, err, "unexpected error when creating root queue") + var childQ *Queue + childQ, err = createManagedQueue(rootQ, "child", false, resMap) + assert.NilError(t, err, "unexpected error when creating child queue") + + app := newApplication(appID1, "default", "root.child") + app.SetQueue(childQ) + childQ.applications[appID1] = app + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2}) + ask := newAllocationAsk(aKey2, appID1, allocRes) + ask.requiredNode = nodeID1 + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "unexpected error when adding an ask") + err = app.Reserve(node, ask) + assert.NilError(t, err, "unexpected error when reserving ask") + + result := app.tryRequiredNode(ask, getNode) + assert.Assert(t, result != nil, "alloc expected") + assert.Assert(t, result.Request == ask, "alloc expected for the ask") + assert.Equal(t, result.ResultType, AllocatedReserved, "expected allocated reserved result") + assert.Equal(t, result.CancelledReservations, 0, "expected no cancelled reservation") + assert.Equal(t, nodeID1, result.NodeID, "wrong node") +} + +func TestTryRequiredNodeReserve(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 5}) + nodeMap := map[string]*Node{nodeID1: node} + getNode := func(nodeID string) *Node { + return nodeMap[nodeID] + } + resMap := map[string]string{"first": "5"} + rootQ, err := createRootQueue(resMap) + assert.NilError(t, err, "unexpected error when creating root queue") + var childQ *Queue + childQ, err = createManagedQueue(rootQ, "child", false, resMap) + assert.NilError(t, err, "unexpected error when creating child queue") + + app := newApplication(appID1, "default", "root.child") + app.SetQueue(childQ) + childQ.applications[appID1] = app + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3}) + alloc := newAllocation(aKey, nodeID1, allocRes) + app.AddAllocation(alloc) + node.AddAllocation(alloc) + + ask := newAllocationAsk(aKey2, appID1, allocRes) + ask.requiredNode = nodeID1 + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "unexpected error when adding an ask") + + result := app.tryRequiredNode(ask, getNode) + assert.Assert(t, result != nil, "alloc expected") + assert.Assert(t, result.Request == ask, "alloc expected for the ask") + assert.Equal(t, result.ResultType, Reserved, "expected reserved result") + assert.Equal(t, nodeID1, result.NodeID, "wrong node") +} + +func TestTryRequiredNodeCancel(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 5}) + nodeMap := map[string]*Node{nodeID1: node} + getNode := func(nodeID string) *Node { + return nodeMap[nodeID] + } + resMap := map[string]string{"first": "5"} + rootQ, err := createRootQueue(resMap) + assert.NilError(t, err, "unexpected error when creating root queue") + var childQ *Queue + childQ, err = createManagedQueue(rootQ, "child", false, resMap) + assert.NilError(t, err, "unexpected error when creating child queue") + + app := newApplication(appID1, "default", "root.child") + app.SetQueue(childQ) + childQ.applications[appID1] = app + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3}) + alloc := newAllocation(aKey, nodeID1, allocRes) + app.AddAllocation(alloc) + node.AddAllocation(alloc) + + ask := newAllocationAsk(aKey2, appID1, allocRes) + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "adding new allocation to app failed unexpected") + err = app.reserveInternal(node, ask) + assert.NilError(t, err, "reserving new allocation on app/node failed unexpected") + assert.Assert(t, node.isReservedForAllocation(aKey2), "expecting alloc reservation on node") + assert.Equal(t, app.NodeReservedForAsk(aKey2), nodeID1, "expecting app reservation on node") + + ask = newAllocationAsk(aKey3, appID1, allocRes) + ask.requiredNode = nodeID1 + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "adding new allocation to app failed unexpected") + result := app.tryRequiredNode(ask, getNode) + assert.Assert(t, result != nil, "alloc expected") + assert.Assert(t, result.Request == ask, "alloc expected for the ask") + assert.Equal(t, result.ResultType, Reserved, "expected allocated result") + assert.Equal(t, result.CancelledReservations, 1, "expected 1 cancelled reservation") + assert.Equal(t, nodeID1, result.NodeID, "wrong node") + assert.Assert(t, !node.isReservedForAllocation(aKey2), "expecting no reservation for alloc-2 on node") + assert.Equal(t, app.NodeReservedForAsk(aKey2), "", "expecting no reservation for alloc-2 on app") +} + +func TestTryRequiredNodeAdd(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 5}) + nodeMap := map[string]*Node{nodeID1: node} + getNode := func(nodeID string) *Node { + return nodeMap[nodeID] + } + rootQ, err := createRootQueue(map[string]string{"first": "5"}) + assert.NilError(t, err, "unexpected error when creating root queue") + var childQ *Queue + childQ, err = createManagedQueue(rootQ, "child", false, map[string]string{"first": "5"}) + assert.NilError(t, err, "unexpected error when creating child queue") + + app := newApplication(appID1, "default", "root.child") + app.SetQueue(childQ) + childQ.applications[appID1] = app + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3}) + alloc := newAllocation(aKey, nodeID1, allocRes) + app.AddAllocation(alloc) + node.AddAllocation(alloc) + + ask := newAllocationAsk(aKey2, appID1, allocRes) + ask.requiredNode = nodeID1 + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "adding new allocation to app failed unexpected") + + result := app.tryRequiredNode(ask, getNode) + assert.Assert(t, result != nil, "alloc expected") + assert.Assert(t, result.Request == ask, "alloc expected for the ask") + assert.Equal(t, result.ResultType, Reserved, "expected reserved result") + assert.Equal(t, nodeID1, result.NodeID, "wrong node") + + // finish processing do what the context would do + err = app.reserveInternal(node, ask) + assert.NilError(t, err, "reservation processing failed unexpected") + + ask = newAllocationAsk(aKey3, appID1, allocRes) + ask.requiredNode = nodeID1 + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "adding new allocation to app failed unexpected") + result = app.tryRequiredNode(ask, getNode) + assert.Assert(t, result != nil, "alloc expected") + assert.Assert(t, result.Request == ask, "alloc expected for the ask") + assert.Equal(t, result.ResultType, Reserved, "expected allocated result") + assert.Equal(t, result.CancelledReservations, 0, "expected no cancelled reservation") + assert.Equal(t, nodeID1, result.NodeID, "wrong node") + assert.Assert(t, node.isReservedForAllocation(aKey2), "expecting reservation for alloc-2 on node") + assert.Equal(t, app.NodeReservedForAsk(aKey2), nodeID1, "expecting reservation for alloc-2 on app") + assert.Assert(t, !node.isReservedForAllocation(aKey3), "expecting no reservation for alloc-3 on node") + assert.Equal(t, app.NodeReservedForAsk(aKey3), "", "expecting no reservation for alloc-3 on app") +} + +func TestTryRequiredNodeExists(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 5}) + nodeMap := map[string]*Node{nodeID1: node} + getNode := func(nodeID string) *Node { + return nodeMap[nodeID] + } + rootQ, err := createRootQueue(map[string]string{"first": "5"}) + assert.NilError(t, err, "unexpected error when creating root queue") + var childQ *Queue + childQ, err = createManagedQueue(rootQ, "child", false, map[string]string{"first": "5"}) + assert.NilError(t, err, "unexpected error when creating child queue") + + app := newApplication(appID1, "default", "root.child") + app.SetQueue(childQ) + childQ.applications[appID1] = app + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3}) + + ask := newAllocationAsk(aKey2, appID1, allocRes) + ask.requiredNode = nodeID2 + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "adding new allocation to app failed unexpected") + + result := app.tryRequiredNode(ask, getNode) + assert.Assert(t, result == nil, "alloc not expected") +} + func TestTryAllocateNoRequests(t *testing.T) { node := newNode("node1", map[string]resources.Quantity{"first": 5}) nodeMap := map[string]*Node{"node1": node} @@ -3210,3 +3413,28 @@ type mockAppEventHandler struct { func (m mockAppEventHandler) HandleEvent(ev interface{}) { m.callback(ev) } + +func TestApplication_canAllocationReserve(t *testing.T) { + res := resources.NewResource() + tests := []struct { + name string + alloc *Allocation + wantErr bool + }{ + {"new", newAllocationWithKey(aKey, appID1, "", res), false}, + {"allocated", newAllocationWithKey(aKey2, appID1, nodeID1, res), true}, + {"duplicate", newAllocationWithKey(aKey3, appID1, "", res), true}, + } + app := newApplication(appID0, "default", "root.unknown") + app.reservations[aKey3] = &reservation{ + nodeID: nodeID1, + allocKey: aKey, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := app.canAllocationReserve(tt.alloc); (err != nil) != tt.wantErr { + t.Errorf("canAllocationReserve() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go index ae5971f4d..2047ff447 100644 --- a/pkg/scheduler/objects/node.go +++ b/pkg/scheduler/objects/node.go @@ -20,17 +20,17 @@ package objects import ( "fmt" - "strings" "go.uber.org/zap" + "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/events" "github.com/apache/yunikorn-core/pkg/locking" "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-core/pkg/plugins" schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events" - "github.com/apache/yunikorn-scheduler-interface/lib/go/common" + siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -110,9 +110,9 @@ func (sn *Node) initializeAttribute(newAttributes map[string]string) { sn.attributes = map[string]string{} } - sn.Hostname = sn.attributes[common.HostName] - sn.Rackname = sn.attributes[common.RackName] - sn.Partition = sn.attributes[common.NodePartition] + sn.Hostname = sn.attributes[siCommon.HostName] + sn.Rackname = sn.attributes[siCommon.RackName] + sn.Partition = sn.attributes[siCommon.NodePartition] } // Get an attribute by name. The most used attributes can be directly accessed via the @@ -129,7 +129,7 @@ func (sn *Node) GetAttributes() map[string]string { // Get InstanceType of this node. // This is a lock free call because all attributes are considered read only func (sn *Node) GetInstanceType() string { - itype := sn.GetAttribute(common.InstanceType) + itype := sn.GetAttribute(siCommon.InstanceType) if itype != "" { return itype } @@ -521,7 +521,7 @@ func (sn *Node) preConditions(ask *Allocation, allocate bool) error { // preAllocateCheck checks if the node should be considered as a possible node to allocate on. // No updates are made this only performs a pre allocate checks -func (sn *Node) preAllocateCheck(res *resources.Resource, resKey string) bool { +func (sn *Node) preAllocateCheck(res *resources.Resource, allocationKey string) bool { // cannot allocate zero or negative resource if !resources.StrictlyGreaterThanZero(res) { log.Log(log.SchedNode).Debug("pre alloc check: requested resource is zero", @@ -530,10 +530,10 @@ func (sn *Node) preAllocateCheck(res *resources.Resource, resKey string) bool { } // check if the node is reserved for this app/alloc if sn.IsReserved() { - if !sn.isReservedForApp(resKey) { - log.Log(log.SchedNode).Debug("pre alloc check: node reserved for different app or ask", + if !sn.isReservedForAllocation(allocationKey) { + log.Log(log.SchedNode).Debug("pre alloc check: node reserved for different alloc", zap.String("nodeID", sn.NodeID), - zap.String("resKey", resKey)) + zap.String("allocationKey", allocationKey)) return false } } @@ -544,95 +544,98 @@ func (sn *Node) preAllocateCheck(res *resources.Resource, resKey string) bool { return sn.availableResource.FitIn(res) } -// Return if the node has been reserved by any application +// IsReserved returns true if the node has been reserved for an allocation func (sn *Node) IsReserved() bool { sn.RLock() defer sn.RUnlock() return len(sn.reservations) > 0 } -// isReservedForApp returns true if and only if the node has been reserved by the application -// NOTE: a return value of false does not mean the node is not reserved by a different app -func (sn *Node) isReservedForApp(key string) bool { +// isReservedForAllocation returns true if and only if the node has been reserved by this allocation +// NOTE: a return value of false does not mean the node is not reserved by a different allocation, use IsReserved +// to test if the node has any reservation. +func (sn *Node) isReservedForAllocation(key string) bool { if key == "" { return false } sn.RLock() defer sn.RUnlock() - if strings.Contains(key, "|") { - return sn.reservations[key] != nil - } - // make sure matches only for the whole appID - separator := key + "|" - for resKey := range sn.reservations { - if strings.HasPrefix(resKey, separator) { - return true - } - } - return false + return sn.reservations[key] != nil } -// Reserve the node for this application and ask combination, if not reserved yet. +// Reserve the node for this application and alloc combination. // The reservation is checked against the node resources. -// If the reservation fails the function returns false, if the reservation is made it returns true. +// If the reservation fails the function returns an error, if the reservation is made it returns nil. func (sn *Node) Reserve(app *Application, ask *Allocation) error { - defer sn.notifyListeners() sn.Lock() defer sn.Unlock() - if len(sn.reservations) > 0 { - return fmt.Errorf("node is already reserved, nodeID %s", sn.NodeID) - } appReservation := newReservation(sn, app, ask, false) - // this should really not happen just guard against panic - // either app or ask are nil + // this should really not happen just guard against panic either app or alloc are nil if appReservation == nil { log.Log(log.SchedNode).Debug("reservation creation failed unexpectedly", zap.String("nodeID", sn.NodeID), - zap.Any("app", app), - zap.Any("ask", ask)) - return fmt.Errorf("reservation creation failed app or ask are nil on nodeID %s", sn.NodeID) + zap.Stringer("app", app), + zap.Stringer("alloc", ask)) + return fmt.Errorf("reservation creation failed either app or alloc are nil on nodeID %s", sn.NodeID) + } + reqNode := ask.requiredNode != "" + if !reqNode && len(sn.reservations) > 0 { + log.Log(log.SchedNode).Warn("normal reservation on already reserved node", + zap.String("nodeID", sn.NodeID), + zap.String("new app", appReservation.appID), + zap.String("new alloc", appReservation.allocKey)) + return common.ErrorNodeAlreadyReserved + } + // allow multiple required node reservations on the same node + if reqNode { + // make sure all other reservations are for required nodes + for _, reserved := range sn.reservations { + if reserved.alloc.requiredNode == "" { + log.Log(log.SchedNode).Warn("trying to add normal reservation to node with required node reservation", + zap.String("nodeID", sn.NodeID), + zap.String("existing app", reserved.appID), + zap.String("existing alloc", reserved.allocKey), + zap.String("new app", appReservation.appID), + zap.String("new alloc", appReservation.allocKey)) + return fmt.Errorf("normal reservation: required node reservation present, nodeID %s", sn.NodeID) + } + } } // reservation must fit on the empty node if !sn.totalResource.FitIn(ask.GetAllocatedResource()) { log.Log(log.SchedNode).Debug("reservation does not fit on the node", zap.String("nodeID", sn.NodeID), zap.String("appID", app.ApplicationID), - zap.String("ask", ask.GetAllocationKey()), - zap.Stringer("allocationAsk", ask.GetAllocatedResource())) - return fmt.Errorf("reservation does not fit on node %s, appID %s, ask %s", sn.NodeID, app.ApplicationID, ask.GetAllocatedResource().String()) + zap.String("alloc", ask.GetAllocationKey()), + zap.Stringer("requested resources", ask.GetAllocatedResource())) + return common.ErrorNodeNotFitReserve } - sn.reservations[appReservation.getKey()] = appReservation + sn.reservations[ask.allocationKey] = appReservation sn.nodeEvents.SendReservedEvent(sn.NodeID, ask.GetAllocatedResource(), ask.GetAllocationKey()) // reservation added successfully return nil } -// unReserve the node for this application and ask combination -// If the reservation does not exist it returns 0 for reservations removed, if the reservation is removed it returns 1. -// The error is set if the reservation key cannot be generated. -func (sn *Node) unReserve(app *Application, ask *Allocation) (int, error) { - defer sn.notifyListeners() +// unReserve the node for this application and alloc combination +// If the reservation does not exist or alloc is nil it returns 0 for reservations removed, +// if the reservation is removed it returns 1. +func (sn *Node) unReserve(alloc *Allocation) int { + if alloc == nil { + return 0 + } sn.Lock() defer sn.Unlock() - resKey := reservationKey(nil, app, ask) - if resKey == "" { - log.Log(log.SchedNode).Debug("unreserve reservation key create failed unexpectedly", - zap.String("nodeID", sn.NodeID), - zap.Any("app", app), - zap.Any("ask", ask)) - return 0, fmt.Errorf("reservation key failed app or ask are nil on nodeID %s", sn.NodeID) - } - if _, ok := sn.reservations[resKey]; ok { - delete(sn.reservations, resKey) - sn.nodeEvents.SendUnreservedEvent(sn.NodeID, ask.GetAllocatedResource(), ask.GetAllocationKey()) - return 1, nil + if _, ok := sn.reservations[alloc.allocationKey]; ok { + delete(sn.reservations, alloc.allocationKey) + sn.nodeEvents.SendUnreservedEvent(sn.NodeID, alloc.GetAllocatedResource(), alloc.GetAllocationKey()) + return 1 } // reservation was not found log.Log(log.SchedNode).Debug("reservation not found while removing from node", zap.String("nodeID", sn.NodeID), - zap.String("appID", app.ApplicationID), - zap.String("ask", ask.GetAllocationKey())) - return 0, nil + zap.String("alloc", alloc.GetAllocationKey()), + zap.String("appID", alloc.GetApplicationID())) + return 0 } // GetReservations returns all reservation made on this node diff --git a/pkg/scheduler/objects/node_collection_test.go b/pkg/scheduler/objects/node_collection_test.go index 30d73e67c..a43c49cbe 100644 --- a/pkg/scheduler/objects/node_collection_test.go +++ b/pkg/scheduler/objects/node_collection_test.go @@ -163,13 +163,8 @@ func TestSetNodeSortingPolicy(t *testing.T) { node := newNode(nodesInfo[id].nodeID, map[string]resources.Quantity{"vcore": resources.Quantity(defaultCapicity[0]), "memory": resources.Quantity(defaultCapicity[1])}) res := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": resources.Quantity(nodesInfo[id].allocatedVcore), "memory": resources.Quantity(nodesInfo[id].allocatedMem)}) alloc := newAllocation(fmt.Sprintf("test-app-%d", id+1), fmt.Sprintf("test-%d", id+1), res) - if ok := node.TryAddAllocation(alloc); !ok { - t.Error("Allocation error happen in node.") - } - - if err := nc.AddNode(node); err != nil { - t.Errorf("AddNode error:%s", err.Error()) - } + assert.Assert(t, node.TryAddAllocation(alloc), "Allocation error happened on node") + assert.NilError(t, nc.AddNode(node), "Adding node to collection failed") } conf := configs.PartitionConfig{ @@ -196,9 +191,8 @@ func TestSetNodeSortingPolicy(t *testing.T) { } nc.SetNodeSortingPolicy(NewNodeSortingPolicy(conf.NodeSortPolicy.Type, conf.NodeSortPolicy.ResourceWeights)) - iter := nc.GetNodeIterator() id := 0 - iter.ForEachNode(func(node *Node) bool { + nc.GetNodeIterator().ForEachNode(func(node *Node) bool { assert.Equal(t, node.NodeID, tt.nodesOrder[id], "%s: NodeID wanted %s, but it got %s.", nc.GetNodeSortingPolicy().PolicyType().String(), tt.nodesOrder[id], node.NodeID) id++ return true @@ -231,9 +225,7 @@ func TestGetNodeSortingPolicy(t *testing.T) { alloc := newAllocation(fmt.Sprintf("test-app-%d", id+1), fmt.Sprintf("test-%d", id), res) node.AddAllocation(alloc) - if err := nc.AddNode(node); err != nil { - t.Errorf("AddNode error:%s", err.Error()) - } + assert.NilError(t, nc.AddNode(node), "Adding node to collection failed") } conf := configs.PartitionConfig{ @@ -260,21 +252,16 @@ func TestGetNodeSortingPolicy(t *testing.T) { } nc.SetNodeSortingPolicy(NewNodeSortingPolicy(conf.NodeSortPolicy.Type, conf.NodeSortPolicy.ResourceWeights)) - if ans := nc.GetNodeSortingPolicy().PolicyType().String(); ans != tt.want { - t.Errorf("got %s, want %s", ans, tt.want) - } + assert.Equal(t, nc.GetNodeSortingPolicy().PolicyType().String(), tt.want, "expected sort policy not set") - // Checking thes nodes order in iterator is after setting node policy with Default weight{vcore:1, memory:1}. - iter := nc.GetNodeIterator() + // Checking the nodes order in iterator is after setting node policy with Default weight{vcore:1, memory:1}. index := 0 - iter.ForEachNode(func(node *Node) bool { + nc.GetNodeIterator().ForEachNode(func(node *Node) bool { if index >= len(tt.exceptNodeOrder) { t.Error("Wrong length of nodes in node iterator.") } - if node.NodeID != tt.exceptNodeOrder[index] { - t.Errorf("Policy: %s, got %s, want %s", nc.GetNodeSortingPolicy().PolicyType().String(), node.NodeID, tt.exceptNodeOrder[index]) - } + assert.Equal(t, node.NodeID, tt.exceptNodeOrder[index], "Policy: %s, node order wrong", nc.GetNodeSortingPolicy().PolicyType().String()) index++ return true }) @@ -292,21 +279,15 @@ func TestGetFullNodeIterator(t *testing.T) { allocName := fmt.Sprintf("alloc-%02d", i) app := newApplication(appName, "default", "root.test") ask := newAllocationAsk(allocName, appName, resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": resources.Quantity(i)})) - if err := node.Reserve(app, ask); err != nil { - t.Error("Reserving failed.") - } + assert.NilError(t, node.Reserve(app, ask), "Reserving failed.") } else { res := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": resources.Quantity(i)}) alloc := newAllocation(fmt.Sprintf("test-app-%d", i), fmt.Sprintf("test-%d", i), res) - if ok := node.TryAddAllocation(alloc); !ok { - t.Error("Allocation error in node.") - } - } - if err := nc.AddNode(node); err != nil { - t.Error("Adding another node into BC failed.") + assert.Assert(t, node.TryAddAllocation(alloc), "Adding allocation to node failed unexpectedly") } + assert.NilError(t, nc.AddNode(node), "Adding another node into BC failed.") } - nodes := make([]*Node, 0) + var nodes []*Node nc.GetFullNodeIterator().ForEachNode(func(node *Node) bool { nodes = append(nodes, node) return true @@ -328,6 +309,9 @@ func TestGetNodeIterator(t *testing.T) { {"Some nodes are reserved", []bool{false, true, false, true}, []int{1, 3}}, } + // Check order of available nodes + nsp := []string{policies.FairnessPolicy.String(), policies.BinPackingPolicy.String()} + for _, tt := range tests { t.Run("There are reserved nodes in an instance of node collection.", func(t *testing.T) { nc := NewNodeCollection("test") @@ -338,47 +322,29 @@ func TestGetNodeIterator(t *testing.T) { node := newNode(nodeName, map[string]resources.Quantity{"vcore": resources.Quantity(10)}) if tt.reserved[i-1] { appName := fmt.Sprintf("app-%02d", i) - allocName := fmt.Sprintf("alloc-%02d", i) app := newApplication(appName, "default", "root.test") - ask := newAllocationAsk(allocName, appName, resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": resources.Quantity(i)})) - if err := node.Reserve(app, ask); err != nil { - t.Error("Reserving failed.") - } + ask := newAllocationAsk(fmt.Sprintf("alloc-%02d", i), appName, resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": resources.Quantity(i)})) + assert.NilError(t, node.Reserve(app, ask), "Reserving failed.") } else { res := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": resources.Quantity(i)}) alloc := newAllocation(fmt.Sprintf("test-app-%d", i), fmt.Sprintf("test-%d", i), res) - if ok := node.TryAddAllocation(alloc); !ok { - t.Error("Allocation error happen in node.") - } - } - - if err := nc.AddNode(node); err != nil { - t.Error("Adding another node into BC failed.") + assert.Assert(t, node.TryAddAllocation(alloc), "Adding allocation to node failed unexpectedly") } + assert.NilError(t, nc.AddNode(node), "Adding another node into BC failed.") } - // Check order of avialble nodes - NodeSortingPolicy := []string{policies.FairnessPolicy.String(), policies.BinPackingPolicy.String()} - // Fair policy - nc.SetNodeSortingPolicy(NewNodeSortingPolicy(NodeSortingPolicy[0], nil)) - iter := nc.GetNodeIterator() - if ans := nc.GetNodeSortingPolicy().PolicyType().String(); ans != NodeSortingPolicy[0] { - t.Errorf("got %s, want %s", ans, NodeSortingPolicy[0]) - } - index := 0 - iter.ForEachNode(func(node *Node) bool { - fmt.Println(node.NodeID) - return true - }) + nc.SetNodeSortingPolicy(NewNodeSortingPolicy(nsp[0], nil)) + assert.Equal(t, nc.GetNodeSortingPolicy().PolicyType().String(), nsp[0], "expected sort policy not set") - iter.ForEachNode(func(node *Node) bool { + index := 0 + nc.GetNodeIterator().ForEachNode(func(node *Node) bool { if index >= len(tt.wantWithFair) { t.Errorf("Want length of nodes: %d, Get length of nodes: %d", index, len(tt.wantWithFair)) } if want := fmt.Sprintf("node-%d", tt.wantWithFair[index]); node.NodeID != want { - t.Errorf("%s with %s, Want %s, got %s.", tt.name, NodeSortingPolicy[0], want, node.NodeID) + t.Errorf("%s with %s, Want %s, got %s.", tt.name, nsp[0], want, node.NodeID) } index++ @@ -386,26 +352,87 @@ func TestGetNodeIterator(t *testing.T) { }) // Binpacking policy - nc.SetNodeSortingPolicy(NewNodeSortingPolicy(NodeSortingPolicy[1], nil)) - if ans := nc.GetNodeSortingPolicy().PolicyType().String(); ans != NodeSortingPolicy[1] { - t.Errorf("got %s, want %s", ans, NodeSortingPolicy[1]) - } + nc.SetNodeSortingPolicy(NewNodeSortingPolicy(nsp[1], nil)) + assert.Equal(t, nc.GetNodeSortingPolicy().PolicyType().String(), nsp[1], "expected sort policy not set") - iter = nc.GetNodeIterator() - DescreasingIndex := len(tt.wantWithFair) - 1 + decIndex := len(tt.wantWithFair) - 1 index = 0 - iter.ForEachNode(func(node *Node) bool { + nc.GetNodeIterator().ForEachNode(func(node *Node) bool { if index >= len(tt.wantWithFair) { t.Errorf("Want length of nodes: %d, Get length of nodes: %d", index, len(tt.wantWithFair)) } - if want := fmt.Sprintf("node-%d", tt.wantWithFair[DescreasingIndex]); node.NodeID != want { - t.Errorf("%s with %s, want %s, got %s.", tt.name, NodeSortingPolicy[1], want, node.NodeID) + if want := fmt.Sprintf("node-%d", tt.wantWithFair[decIndex]); node.NodeID != want { + t.Errorf("%s with %s, want %s, got %s.", tt.name, nsp[1], want, node.NodeID) } index++ - DescreasingIndex-- + decIndex-- return true }) }) } } + +// TestNodeIteratorReserveUpdate reservation add or remove should not need a node collection update make sure it works. +// YUNIKORN-2976 removed the listener notify in Reserve and unReserve +func TestNodeIteratorReserveUpdate(t *testing.T) { + nc := NewNodeCollection("test") + count := 3 + for i := 0; i < count; i++ { + node := newNode(fmt.Sprintf("node-%d", i), map[string]resources.Quantity{"some": resources.Quantity(10)}) + assert.NilError(t, nc.AddNode(node), "Adding another node into BC failed.") + } + // first check: both iterators return all nodes + allNodes := make([]*Node, 0) + nc.GetFullNodeIterator().ForEachNode(func(node *Node) bool { + allNodes = append(allNodes, node) + return true + }) + assert.Equal(t, len(allNodes), count, "wrong length") + + var itNodes []*Node + nc.GetNodeIterator().ForEachNode(func(node *Node) bool { + itNodes = append(itNodes, node) + return true + }) + assert.Equal(t, len(itNodes), count, "wrong length") + + // add reservation to all nodes + app := newApplication(appID0, "default", "root.test") + for i, node := range allNodes { + ask := newAllocationAsk(fmt.Sprintf("ask-%d", i), appID0, resources.NewResourceFromMap(map[string]resources.Quantity{"some": resources.Quantity(5)})) + app.AddAllocation(ask) + assert.NilError(t, node.Reserve(app, ask), "Reserving failed.") + } + + // full iterator returns all nodes + itNodes = nil + nc.GetFullNodeIterator().ForEachNode(func(node *Node) bool { + itNodes = append(itNodes, node) + return true + }) + assert.Equal(t, len(itNodes), count, "wrong length") + + // filtered iterator returns NO nodes + itNodes = nil + nc.GetNodeIterator().ForEachNode(func(node *Node) bool { + itNodes = append(itNodes, node) + return true + }) + assert.Equal(t, len(itNodes), 0, "wrong length") + + // run over initial list of nodes and remove reservations. + // only one reservation so just pick that one + for _, node := range allNodes { + alloc := node.GetReservations()[0].alloc + assert.Equal(t, node.unReserve(alloc), 1, "unReserve should have returned a single removal") + assert.Assert(t, !node.IsReserved(), "node should not have been reserved") + } + // filtered iterator returns all nodes again + itNodes = nil + nc.GetNodeIterator().ForEachNode(func(node *Node) bool { + itNodes = append(itNodes, node) + return true + }) + assert.Equal(t, len(itNodes), count, "wrong length") +} diff --git a/pkg/scheduler/objects/node_test.go b/pkg/scheduler/objects/node_test.go index 27ad614da..6a725ac8d 100644 --- a/pkg/scheduler/objects/node_test.go +++ b/pkg/scheduler/objects/node_test.go @@ -105,10 +105,9 @@ func TestCheckConditions(t *testing.T) { } func TestPreAllocateCheck(t *testing.T) { - nodeID := nodeID1 resNode := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 1}) - node := newNode(nodeID, resNode.Resources) - if node == nil || node.NodeID != nodeID { + node := newNode(nodeID1, resNode.Resources) + if node == nil || node.NodeID != nodeID1 { t.Fatalf("node create failed which should not have %v", node) } @@ -132,24 +131,20 @@ func TestPreAllocateCheck(t *testing.T) { assert.Assert(t, !node.preAllocateCheck(resOther, ""), "unknown resource type should not have fitted on node") // set allocated resource - alloc := newAllocation(appID1, nodeID, resSmall) + alloc := newAllocation(appID1, nodeID1, resSmall) node.AddAllocation(alloc) assert.Assert(t, node.preAllocateCheck(resSmall, ""), "small resource should have fitted in available allocation") assert.Assert(t, !node.preAllocateCheck(resNode, ""), "node resource should not have fitted in available allocation") // check if we can allocate on a reserved node - q := map[string]resources.Quantity{"first": 0} - res := resources.NewResourceFromMap(q) - ask := newAllocationAsk(aKey, appID1, res) + ask := newAllocationAsk(aKey, appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0})) app := newApplication(appID1, "default", "root.unknown") // standalone reservation unreserve returns false as app is not reserved reserve := newReservation(node, app, ask, false) - node.reservations[reserve.getKey()] = reserve - assert.Assert(t, !node.preAllocateCheck(resSmall, "app-2"), "node was reserved for different app but check passed") - assert.Assert(t, !node.preAllocateCheck(resSmall, "app-1|alloc-2"), "node was reserved for this app but not the alloc and check passed") - assert.Assert(t, node.preAllocateCheck(resSmall, appID1), "node was reserved for this app but check did not pass check") - assert.Assert(t, node.preAllocateCheck(resSmall, "app-1|alloc-1"), "node was reserved for this app/alloc but check did not pass check") + node.reservations[reserve.allocKey] = reserve + assert.Assert(t, !node.preAllocateCheck(resSmall, aKey2), "node was reserved for different app but check passed") + assert.Assert(t, node.preAllocateCheck(resSmall, aKey), "node was reserved for this app/alloc but check did not pass check") // Check if we can allocate on non scheduling node node.SetSchedulable(false) @@ -190,20 +185,14 @@ func TestNodeReservation(t *testing.T) { if node == nil || node.NodeID != nodeID1 { t.Fatalf("node create failed which should not have %v", node) } - if node.IsReserved() { - t.Fatal("new node should not have reservations") - } - if node.isReservedForApp("") { - t.Error("new node should not have reservations for empty key") - } - if node.isReservedForApp("unknown") { - t.Error("new node should not have reservations for unknown key") - } + assert.Assert(t, !node.IsReserved(), "new node should not have reservations") + assert.Assert(t, !node.isReservedForAllocation(""), "new node should not have reservations for empty key") + assert.Assert(t, !node.isReservedForAllocation("unknown"), "new node should not have reservations for unknown key") // reserve illegal request err := node.Reserve(nil, nil) if err == nil { - t.Errorf("illegal reservation requested but did not fail: error %v", err) + t.Fatal("illegal reservation requested but did not fail") } // too large for node @@ -212,7 +201,7 @@ func TestNodeReservation(t *testing.T) { app := newApplication(appID1, "default", "root.unknown") err = node.Reserve(app, ask) if err == nil { - t.Errorf("requested reservation does not fit in node resource but did not fail: error %v", err) + t.Fatal("requested reservation does not fit in node resource but did not fail") } // resource type not available on node @@ -221,7 +210,7 @@ func TestNodeReservation(t *testing.T) { app = newApplication(appID1, "default", "root.unknown") err = node.Reserve(app, ask) if err == nil { - t.Errorf("requested reservation does not match node resource types but did not fail: error %v", err) + t.Fatal("requested reservation does not match node resource types but did not fail") } res = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) @@ -230,39 +219,29 @@ func TestNodeReservation(t *testing.T) { // reserve that works err = node.Reserve(app, ask) assert.NilError(t, err, "reservation should not have failed") - if node.isReservedForApp("") { - t.Error("node should not have reservations for empty key") - } - if node.isReservedForApp("unknown") { - t.Errorf("node should not have reservations for unknown key") - } - if node.IsReserved() && !node.isReservedForApp(appID1) { - t.Errorf("node should have reservations for app-1") - } + assert.Assert(t, !node.isReservedForAllocation(""), "node should not have reservations for empty key") + assert.Assert(t, !node.isReservedForAllocation("unknown"), "node should not have reservations for unknown key") + assert.Assert(t, node.IsReserved(), "node should have been reserved") + assert.Assert(t, node.isReservedForAllocation(aKey), "node should have reservations for alloc-1") // 2nd reservation on node - err = node.Reserve(nil, nil) + app2 := newApplication(appID2, "default", "root.unknown") + ask2 := newAllocationAsk("alloc-2", appID2, res) + err = node.Reserve(app2, ask2) if err == nil { - t.Errorf("reservation requested on already reserved node: error %v", err) + t.Fatal("reservation requested on already reserved node") } - // unreserve different app - _, err = node.unReserve(nil, nil) - if err == nil { - t.Errorf("illegal reservation release but did not fail: error %v", err) - } - ask2 := newAllocationAsk("alloc-2", appID2, res) - app2 := newApplication(appID2, "default", "root.unknown") - var num int - num, err = node.unReserve(app2, ask2) - assert.NilError(t, err, "un-reserve different app should have failed without error") - assert.Equal(t, num, 0, "un-reserve different app should have failed without releases") - num, err = node.unReserve(app, ask) - assert.NilError(t, err, "un-reserve should not have failed") + // unreserve different alloc + num := node.unReserve(nil) + assert.Equal(t, num, 0, "un-reserve different alloc should have failed without releases") + num = node.unReserve(ask2) + assert.Equal(t, num, 0, "un-reserve different alloc should have failed without releases") + num = node.unReserve(ask) assert.Equal(t, num, 1, "un-reserve app should have released ") } -func TestIsReservedForApp(t *testing.T) { +func TestRequiredNodeAfterReservation(t *testing.T) { node := newNode(nodeID1, map[string]resources.Quantity{"first": 10}) if node == nil || node.NodeID != nodeID1 { t.Fatalf("node create failed which should not have %v", node) @@ -270,33 +249,76 @@ func TestIsReservedForApp(t *testing.T) { if node.IsReserved() { t.Fatal("new node should not have reservations") } - - // check if we can allocate on a reserved node - q := map[string]resources.Quantity{"first": 0} - res := resources.NewResourceFromMap(q) - ask := newAllocationAsk(aKey, appID1, res) + // normal node reservation + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) + ask := newAllocationAsk(aKey, appID1, allocRes) app := newApplication(appID1, "default", "root.unknown") + err := node.Reserve(app, ask) + assert.NilError(t, err, "normal node reservation should not have failed") + assert.Assert(t, node.IsReserved(), "node should have been reserved") + assert.Assert(t, node.isReservedForAllocation(aKey), "node should have reservations for alloc-1") - // standalone reservation unreserve returns false as app is not reserved - reserve := newReservation(node, app, ask, false) - node.reservations[reserve.getKey()] = reserve - if node.isReservedForApp("app-2") { - t.Error("node was reserved for different app but check passed ") + ask2 := newAllocationAsk(aKey2, appID1, allocRes) + ask2.requiredNode = nodeID1 + app = newApplication(appID1, "default", "root.unknown") + // required node as 2nd reservation + err = node.Reserve(app, ask2) + if err == nil { + t.Fatalf("adding required node reservation should have failed") } - if node.isReservedForApp("app-1|alloc-2") { - t.Error("node was reserved for this app but not the alloc and check passed ") +} + +func TestMultiRequiredNodeReservation(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 10}) + if node == nil || node.NodeID != nodeID1 { + t.Fatalf("node create failed which should not have %v", node) } - if !node.isReservedForApp(appID1) { - t.Error("node was reserved for this app but check did not passed ") + if node.IsReserved() { + t.Fatal("new node should not have reservations") } - if !node.isReservedForApp("app-1|alloc-1") { - t.Error("node was reserved for this app/alloc but check did not passed ") + + // required node reservation + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) + ask := newAllocationAsk(aKey, appID1, allocRes) + ask.requiredNode = nodeID1 + app := newApplication(appID1, "default", "root.unknown") + err := node.Reserve(app, ask) + assert.NilError(t, err, "required node reservation should not have failed") + assert.Assert(t, node.IsReserved(), "node should have been reserved") + assert.Assert(t, node.isReservedForAllocation(aKey), "node should have reservations for alloc-1") + + ask2 := newAllocationAsk(aKey2, appID1, allocRes) + app = newApplication(appID1, "default", "root.unknown") + // non required node as 2nd reservation + err = node.Reserve(app, ask2) + if err == nil { + t.Fatalf("adding to required node reservation should have failed") } - // app name similarity check: chop of the last char to make sure we check the full name - similar := appID1[:len(appID1)-1] - if node.isReservedForApp(similar) { - t.Errorf("similar app should not have reservations on node %s", similar) + + // required node as 2nd reservation on node + ask2.requiredNode = nodeID1 + err = node.Reserve(app, ask2) + assert.NilError(t, err, "required node reservation should not have failed") + assert.Assert(t, node.IsReserved(), "node should have been reserved") + assert.Assert(t, node.isReservedForAllocation(aKey2), "node should have reservations for alloc-2") +} + +func TestIsReservedForAllocation(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 10}) + if node == nil || node.NodeID != nodeID1 { + t.Fatalf("node create failed which should not have %v", node) } + assert.Assert(t, !node.IsReserved(), "new node should not have reservations") + + // check if we can allocate on a reserved node + ask := newAllocationAsk(aKey, appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0})) + app := newApplication(appID1, "default", "root.unknown") + + // standalone reservation unreserve returns false as app is not reserved + reserve := newReservation(node, app, ask, false) + node.reservations[reserve.allocKey] = reserve + assert.Assert(t, !node.isReservedForAllocation(aKey2), "node was reserved for different alloc but check passed ") + assert.Assert(t, node.isReservedForAllocation(aKey), "node was reserved for this alloc but check did not passed ") } func TestAttributes(t *testing.T) { @@ -858,8 +880,7 @@ func TestNodeEvents(t *testing.T) { assert.Equal(t, si.EventRecord_NODE_RESERVATION, event.EventChangeDetail) mockEvents.Reset() - _, err = node.unReserve(app, ask) - assert.NilError(t, err, "could not unreserve") + assert.Equal(t, node.unReserve(ask), 1, "expected the reservation to be removed") event = mockEvents.Events[0] assert.Equal(t, si.EventRecord_NODE, event.Type) assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType) diff --git a/pkg/scheduler/objects/preemption.go b/pkg/scheduler/objects/preemption.go index 10cacea2b..c70aee4bd 100644 --- a/pkg/scheduler/objects/preemption.go +++ b/pkg/scheduler/objects/preemption.go @@ -162,7 +162,7 @@ func (p *Preemptor) initWorkingState() { // walk node iterator and track available resources per node p.iterator.ForEachNode(func(node *Node) bool { - if !node.IsSchedulable() || (node.IsReserved() && !node.isReservedForApp(reservationKey(nil, p.application, p.ask))) || !node.FitInNode(p.ask.GetAllocatedResource()) { + if !node.IsSchedulable() || (node.IsReserved() && !node.isReservedForAllocation(p.ask.GetAllocationKey())) || !node.FitInNode(p.ask.GetAllocatedResource()) { // node is not available, remove any potential victims from consideration delete(allocationsByNode, node.NodeID) } else { diff --git a/pkg/scheduler/objects/reservation.go b/pkg/scheduler/objects/reservation.go index 5c7f672b6..3001372d9 100644 --- a/pkg/scheduler/objects/reservation.go +++ b/pkg/scheduler/objects/reservation.go @@ -21,37 +21,36 @@ package objects import ( "go.uber.org/zap" - "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/log" ) type reservation struct { - nodeID string - appID string - askKey string - // these references must ONLY be used for ask, node and application removal otherwise + appID string + nodeID string + allocKey string + // these references must ONLY be used for alloc, node and application removal otherwise // the reservations cannot be removed and scheduling might be impacted. - app *Application - node *Node - ask *Allocation + app *Application + node *Node + alloc *Allocation } // The reservation inside the scheduler. A reservation object is never mutated and does not use locking. // The key depends on where the reservation was made (node or app). // appBased must be true for a reservation for an app and false for a reservation on a node -func newReservation(node *Node, app *Application, ask *Allocation, appBased bool) *reservation { - if ask == nil || app == nil || node == nil { +func newReservation(node *Node, app *Application, alloc *Allocation, appBased bool) *reservation { + if alloc == nil || app == nil || node == nil { log.Log(log.SchedReservation).Warn("Illegal reservation requested: one input is nil", zap.Stringer("node", node), zap.Stringer("app", app), - zap.Stringer("ask", ask)) + zap.Stringer("alloc", alloc)) return nil } res := &reservation{ - askKey: ask.GetAllocationKey(), - ask: ask, - app: app, - node: node, + allocKey: alloc.GetAllocationKey(), + alloc: alloc, + app: app, + node: node, } if appBased { res.nodeID = node.NodeID @@ -61,51 +60,21 @@ func newReservation(node *Node, app *Application, ask *Allocation, appBased bool return res } -func reservationKey(node *Node, app *Application, ask *Allocation) string { - if ask == nil || (app == nil && node == nil) || (app != nil && node != nil) { - log.Log(log.SchedReservation).Warn("Illegal reservation key requested", - zap.Any("node", node), - zap.Any("app", app), - zap.Any("ask", ask)) - return "" - } - if node == nil { - return ask.resKeyWithoutNode - } - key := ask.getReservationKeyForNode(node.NodeID) - if key != common.Empty { - return key - } - - key = node.NodeID + "|" + ask.GetAllocationKey() - ask.setReservationKeyForNode(node.NodeID, key) - return key -} - -func reservationKeyWithoutNode(appID, allocKey string) string { - return appID + "|" + allocKey -} - -// Return the reservation key -func (r *reservation) getKey() string { - if r.nodeID == "" { - return r.appID + "|" + r.askKey - } - return r.nodeID + "|" + r.askKey -} - func (r *reservation) String() string { + if r == nil { + return "nil reservation" + } if r.nodeID == "" { - return r.node.NodeID + " -> " + r.appID + "|" + r.askKey + return r.node.NodeID + " -> " + r.appID + "|" + r.allocKey } - return r.app.ApplicationID + " -> " + r.nodeID + "|" + r.askKey + return r.app.ApplicationID + " -> " + r.nodeID + "|" + r.allocKey } // GetObjects returns the objects that created the reservation. // None of the returned values will be nil unless the reservation itself is nil func (r *reservation) GetObjects() (*Node, *Application, *Allocation) { if r != nil { - return r.node, r.app, r.ask + return r.node, r.app, r.alloc } return nil, nil, nil } diff --git a/pkg/scheduler/objects/reservation_test.go b/pkg/scheduler/objects/reservation_test.go index 617ee55a5..697163329 100644 --- a/pkg/scheduler/objects/reservation_test.go +++ b/pkg/scheduler/objects/reservation_test.go @@ -33,62 +33,49 @@ func TestNewReservation(t *testing.T) { app := newApplication("app-1", "default", "root.unknown") node := newNodeRes("node-1", res) - // check the basics (failures) - reserve := newReservation(nil, nil, nil, true) - if reserve != nil { - t.Errorf("reservation with nil objects should have returned nil: %v", reserve) + tests := []struct { + name string + node *Node + app *Application + ask *Allocation + appBased bool + expected *reservation + }{ + {"nil input", nil, nil, nil, true, nil}, + {"nil alloc", node, app, nil, true, nil}, + {"nil app", node, nil, ask, true, nil}, + {"nil node", nil, app, ask, true, nil}, + {"node based", node, app, ask, false, &reservation{"app-1", "", "alloc-1", app, node, ask}}, + {"app based", node, app, ask, true, &reservation{"", "node-1", "alloc-1", app, node, ask}}, } - reserve = newReservation(node, app, nil, true) - if reserve != nil { - t.Errorf("reservation with nil ask set should have returned nil: '%v'", reserve) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reserve := newReservation(tt.node, tt.app, tt.ask, tt.appBased) + if tt.expected == nil { + assert.Equal(t, tt.expected, reserve, "unexpected reservation") + } else { + assert.Equal(t, reserve.appID, tt.expected.appID, "incorrect appID") + assert.Equal(t, reserve.nodeID, tt.expected.nodeID, "incorrect node ID") + assert.Equal(t, reserve.allocKey, tt.expected.allocKey, "incorrect alloc key") + if tt.appBased { + assert.Equal(t, reserve.String(), "app-1 -> node-1|alloc-1", "incorrect string form") + } else { + assert.Equal(t, reserve.String(), "node-1 -> app-1|alloc-1", "incorrect string form") + } + } + }) } - reserve = newReservation(node, nil, ask, true) - if reserve != nil { - t.Errorf("reservation with nil app set should have returned nil: '%v'", reserve) - } - reserve = newReservation(nil, app, ask, true) - if reserve != nil { - t.Errorf("reservation with nil node set should have returned nil: '%v'", reserve) - } - - // working cases - reserve = newReservation(node, app, ask, true) - if reserve == nil { - t.Fatalf("reservation with all objects set should have returned nil: %v", reserve) - } - assert.Equal(t, reserve.getKey(), "node-1|alloc-1", "incorrect node reservation key") - assert.Equal(t, reserve.String(), "app-1 -> node-1|alloc-1", "incorrect string form") - - reserve = newReservation(node, app, ask, false) - if reserve == nil { - t.Fatalf("reservation with all objects set should have returned nil: %v", reserve) - } - assert.Equal(t, reserve.getKey(), "app-1|alloc-1", "incorrect app reservation key") - assert.Equal(t, reserve.String(), "node-1 -> app-1|alloc-1", "incorrect string form") } -func TestReservationKey(t *testing.T) { - // create the input objects - res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) - ask := newAllocationAsk("alloc-1", "app-1", res) - app := newApplication("app-1", "default", "root.unknown") - node := newNodeRes("node-1", res) - - // check the basics - reserve := reservationKey(nil, nil, nil) - assert.Equal(t, reserve, "", "reservation with nil objects should have empty key") - reserve = reservationKey(node, app, nil) - assert.Equal(t, reserve, "", "reservation with nil ask set should have empty key") - reserve = reservationKey(node, app, ask) - assert.Equal(t, reserve, "", "reservation with all objects set should have empty key") - - // other cases - reserve = reservationKey(node, nil, ask) - assert.Equal(t, reserve, "node-1|alloc-1", "incorrect node reservation key") - assert.Equal(t, "node-1|alloc-1", ask.resKeyPerNode["node-1"]) - assert.Equal(t, 1, len(ask.resKeyPerNode)) - reserve = reservationKey(nil, app, ask) - assert.Equal(t, reserve, "app-1|alloc-1", "incorrect app reservation key") +func TestReservationString(t *testing.T) { + var nilReserve *reservation + defer func() { + if r := recover(); r != nil { + t.Fatal("panic on nil reservation in object test") + } + }() + str := nilReserve.String() + assert.Equal(t, "nil reservation", str, "nil reservation did not return correct string") } func TestGetObjects(t *testing.T) { diff --git a/pkg/scheduler/objects/utilities_test.go b/pkg/scheduler/objects/utilities_test.go index 97be13c24..20015a876 100644 --- a/pkg/scheduler/objects/utilities_test.go +++ b/pkg/scheduler/objects/utilities_test.go @@ -45,6 +45,7 @@ const ( appID3 = "app-3" aKey = "alloc-1" aKey2 = "alloc-2" + aKey3 = "alloc-3" nodeID1 = "node-1" nodeID2 = "node-2" instType1 = "itype-1" diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 7db7e4fe8..1375d1850 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -891,6 +891,9 @@ func (pc *PartitionContext) allocate(result *objects.AllocationResult) *objects. return nil } + // reservations were cancelled during the processing + pc.decReservationCount(result.CancelledReservations) + // reservation if result.ResultType == objects.Reserved { pc.reserve(app, targetNode, result.Request) @@ -949,12 +952,23 @@ func (pc *PartitionContext) allocate(result *objects.AllocationResult) *objects. // Lock free call this must be called holding the context lock func (pc *PartitionContext) reserve(app *objects.Application, node *objects.Node, ask *objects.Allocation) { appID := app.ApplicationID - // app has node already reserved cannot reserve again - if app.IsReservedOnNode(node.NodeID) { - log.Log(log.SchedPartition).Info("Application is already reserved on node", + // check if ask has reserved already, cannot have multiple reservations for one ask + nodeID := app.NodeReservedForAsk(ask.GetAllocationKey()) + // We should not see a reservation for this ask yet + // sanity check the node that is reserved: same node just be done + // different node: fix it, unreserve the old node and reserve the new one + // this is all to safeguard the system it should never happen. + if nodeID != "" { + // same nodeID we do not need to do anything + if nodeID == node.NodeID { + return + } + log.Log(log.SchedPartition).Warn("ask is already reserved on different node, fixing reservations", zap.String("appID", appID), - zap.String("nodeID", node.NodeID)) - return + zap.String("allocationKey", ask.GetAllocationKey()), + zap.String("reserved nodeID", nodeID), + zap.String("new nodeID", node.NodeID)) + pc.unReserve(app, pc.nodes.GetNode(nodeID), ask) } // all ok, add the reservation to the app, this will also reserve the node if err := app.Reserve(node, ask); err != nil { @@ -978,13 +992,7 @@ func (pc *PartitionContext) reserve(app *objects.Application, node *objects.Node // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) unReserve(app *objects.Application, node *objects.Node, ask *objects.Allocation) { // remove the reservation of the app, this will also unReserve the node - var err error - var num int - if num, err = app.UnReserve(node, ask); err != nil { - log.Log(log.SchedPartition).Info("Failed to unreserve, error during allocate on the app", - zap.Error(err)) - return - } + num := app.UnReserve(node, ask) // remove the reservation of the queue appID := app.ApplicationID app.GetQueue().UnReserve(appID, num) diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 75e529eb6..673c39487 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -484,7 +484,7 @@ func TestPlaceholderDataWithPlaceholderPreemption(t *testing.T) { } // check if updated (must be after allocate call) assert.Equal(t, 1, len(app2.GetReservations()), "app reservation should have been updated") - assert.Equal(t, 1, len(app2.GetAskReservations(allocKey2)), "ask should have been reserved") + assert.Equal(t, app2.NodeReservedForAsk(allocKey2), nodeID2, "ask should have been reserved") // try through reserved scheduling cycle this should trigger preemption result = partition.tryReservedAllocate() @@ -1626,6 +1626,10 @@ func TestRequiredNodeReservation(t *testing.T) { if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result) } + node := partition.nodes.GetNode(nodeID1) + if node == nil { + t.Fatal("node-1 should have been created") + } app := newApplication(appID1, "default", "root.parent.sub-leaf") res, err := resources.NewResourceFromConf(map[string]string{"vcore": "8"}) @@ -1661,7 +1665,12 @@ func TestRequiredNodeReservation(t *testing.T) { } // check if updated (must be after allocate call) assert.Equal(t, 1, len(app.GetReservations()), "app should have one reserved ask") - assert.Equal(t, 1, len(app.GetAskReservations(allocKey2)), "ask should have been reserved") + assert.Equal(t, app.NodeReservedForAsk(allocKey2), nodeID1, "ask should have been reserved on node-1") + assert.Assert(t, node.IsReserved(), "node should have been reserved") + reservations := node.GetReservations() + assert.Equal(t, len(reservations), 1, "node should have two reservations") + _, _, resAsk := reservations[0].GetObjects() + assert.Equal(t, resAsk.GetAllocationKey(), allocKey2, "alloc-2 should have been reserved on the node") assertLimits(t, getTestUserGroup(), res) // allocation that fits on the node should not be allocated @@ -1669,7 +1678,7 @@ func TestRequiredNodeReservation(t *testing.T) { res2, err = resources.NewResourceFromConf(map[string]string{"vcore": "1"}) assert.NilError(t, err, "failed to create resource") - ask3 := newAllocationAsk("alloc-3", appID1, res2) + ask3 := newAllocationAsk(allocKey3, appID1, res2) ask3.SetRequiredNode(nodeID1) err = app.AddAllocationAsk(ask3) assert.NilError(t, err, "failed to add ask alloc-3 to app-1") @@ -1678,13 +1687,24 @@ func TestRequiredNodeReservation(t *testing.T) { t.Fatal("allocation attempt should not have returned an allocation") } - // reservation count remains same as last try allocate should have failed to find a reservation - assert.Equal(t, 1, len(app.GetReservations()), "ask should not have been reserved, count changed") + // reservation count should be updated as tryAllocate should have added a reservation for the 3rd ask + assert.Equal(t, 2, len(app.GetReservations()), "ask should not have been reserved, count changed") + assert.Equal(t, app.NodeReservedForAsk(allocKey2), nodeID1, "alloc-2 should have been reserved on node-1") + assert.Equal(t, app.NodeReservedForAsk(allocKey3), nodeID1, "alloc-3 should have been reserved on node-1") + assert.Assert(t, node.IsReserved(), "node should have been reserved") + reservations = node.GetReservations() + assert.Equal(t, len(reservations), 2, "node should have two reservations") + _, _, resAsk = reservations[0].GetObjects() + _, _, resAsk2 := reservations[1].GetObjects() + if !((resAsk.GetAllocationKey() == allocKey3 && resAsk2.GetAllocationKey() == allocKey2) || + (resAsk2.GetAllocationKey() == allocKey3 && resAsk.GetAllocationKey() == allocKey2)) { + t.Fatal("missing reservation on the node") + } assertLimits(t, getTestUserGroup(), res) } // allocate ask request with required node having non daemon set reservations -func TestRequiredNodeCancelNonDSReservations(t *testing.T) { +func TestRequiredNodeCancelOtherReservations(t *testing.T) { partition := createQueuesNodes(t) if partition == nil { t.Fatal("partition create failed") @@ -1733,8 +1753,9 @@ func TestRequiredNodeCancelNonDSReservations(t *testing.T) { t.Fatal("2nd allocation did not return the correct allocation") } // check if updated (must be after allocate call) - assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") + assert.Equal(t, 1, len(app.GetReservations()), "allocation should have been reserved") assert.Equal(t, 1, len(app.GetQueue().GetReservedApps()), "queue reserved apps should be 1") + assert.Equal(t, 1, partition.reservations, "partition reservations should be 1") res1, err := resources.NewResourceFromConf(map[string]string{"vcore": "1"}) assert.NilError(t, err, "failed to create resource") @@ -1757,8 +1778,9 @@ func TestRequiredNodeCancelNonDSReservations(t *testing.T) { assert.Equal(t, objects.Allocated, result.ResultType, "allocation result type should have been allocated") // earlier app (app1) reservation count should be zero - assert.Equal(t, 0, len(app.GetReservations()), "ask should have been reserved") + assert.Equal(t, 0, len(app.GetReservations()), "allocation should not have been reserved") assert.Equal(t, 0, len(app.GetQueue().GetReservedApps()), "queue reserved apps should be 0") + assert.Equal(t, 0, partition.reservations, "partition reservations should be 0") } // allocate ask request with required node having daemon set reservations @@ -1787,11 +1809,11 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) { err = partition.AddApplication(app) assert.NilError(t, err, "failed to add app-1 to partition") - ask := newAllocationAsk("alloc-1", appID1, res) + ask := newAllocationAsk(allocKey, appID1, res) ask.SetRequiredNode(nodeID1) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add ask 1 to app") - ask = newAllocationAsk("alloc-2", appID1, res) + ask = newAllocationAsk(allocKey2, appID1, res) ask.SetRequiredNode(nodeID1) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add ask 2 to app") @@ -1815,6 +1837,7 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) { // check if updated (must be after allocate call) assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") assert.Equal(t, 1, len(app.GetQueue().GetReservedApps()), "queue reserved apps should be 1") + assert.Equal(t, 1, partition.reservations, "partition reservations should be 1") res1, err := resources.NewResourceFromConf(map[string]string{"vcore": "1"}) assert.NilError(t, err, "failed to create resource") @@ -1825,18 +1848,24 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) { assert.NilError(t, err, "failed to add app-2 to partition") // required node set on ask - ask2 := newAllocationAsk("alloc-2", appID2, res1) + ask2 := newAllocationAsk(allocKey3, appID2, res1) ask2.SetRequiredNode(nodeID1) err = app1.AddAllocationAsk(ask2) - assert.NilError(t, err, "failed to add ask alloc-2 to app-1") + assert.NilError(t, err, "failed to add ask alloc-3 to app-1") + // this is a reservation handled before we get here and we get a nil result = partition.tryAllocate() if result != nil { t.Fatal("3rd allocation should not return allocation") } - // still reservation count is 1 - assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") - assert.Equal(t, 1, len(app.GetQueue().GetReservedApps()), "queue reserved apps should be 1") + // check the reservation count etc for each app + assert.Equal(t, app.NodeReservedForAsk(allocKey2), nodeID1, "alloc-2 should be reserved on node-1") + assert.Equal(t, 1, len(app.GetReservations()), "only one alloc on app-1 should have been reserved") + assert.Equal(t, app1.NodeReservedForAsk(allocKey3), nodeID1, "alloc-3 should be reserved on node-1") + assert.Equal(t, 1, len(app1.GetReservations()), "alloc-3 on app-2 should have been reserved") + // both apps run in the same queue so just pick one app to get the queue + assert.Equal(t, 2, len(app.GetQueue().GetReservedApps()), "queue reserved apps should be 2") + assert.Equal(t, 2, partition.reservations, "partition reservations should be 2") } func TestRequiredNodeNotExist(t *testing.T) { @@ -2015,15 +2044,16 @@ func TestPreemption(t *testing.T) { // Preemption followed by a normal allocation func TestPreemptionForRequiredNodeNormalAlloc(t *testing.T) { setupUGM() - // setup the partition so we can try the real allocation + // set up the partition so we can try the real allocation partition, app := setupPreemptionForRequiredNode(t) // now try the allocation again: the normal path result := partition.tryAllocate() - if result != nil { - t.Fatal("allocations should not have returned a result") + if result == nil || result.Request == nil { + t.Fatal("allocation should have returned a result") } // check if updated (must be after allocate call) - assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") + assert.Equal(t, 0, len(app.GetReservations()), "no allocations on app should have been reserved") + assert.Equal(t, 0, partition.reservations, "no allocations on partition should have been reserved") } // Preemption followed by a reserved allocation @@ -2234,8 +2264,8 @@ func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext, *objects.A t.Fatal("allocation attempt should not have returned an allocation") } // check if updated (must be after allocate call) - assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") - assert.Equal(t, 1, len(app.GetAskReservations(allocKey2)), "ask should have been reserved") + assert.Equal(t, 1, len(app.GetReservations()), "application should have been reserved") + assert.Equal(t, app.NodeReservedForAsk(allocKey2), nodeID1, "allocation should have been reserved on node-1") assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}), getExpectedQueuesLimitsForPreemptionWithRequiredNode()) // try through reserved scheduling cycle this should trigger preemption @@ -2395,7 +2425,7 @@ func TestTryAllocateReserve(t *testing.T) { err = app.AddAllocationAsk(newAllocationAsk("alloc-1", appID1, res)) assert.NilError(t, err, "failed to add ask alloc-1 to app") - ask := newAllocationAsk("alloc-2", appID1, res) + ask := newAllocationAsk(allocKey2, appID1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add ask alloc-2 to app") node2 := partition.GetNode(nodeID2) @@ -2403,8 +2433,8 @@ func TestTryAllocateReserve(t *testing.T) { t.Fatal("expected node-2 to be returned got nil") } partition.reserve(app, node2, ask) - if !app.IsReservedOnNode(node2.NodeID) || len(app.GetAskReservations("alloc-2")) == 0 { - t.Fatalf("reservation failure for ask2 and node2") + if app.NodeReservedForAsk(allocKey2) != nodeID2 { + t.Fatalf("reservation failure for alloc-2 and node-2") } // first allocation should be app-1 and alloc-2 @@ -2416,11 +2446,11 @@ func TestTryAllocateReserve(t *testing.T) { assert.Equal(t, result.ReservedNodeID, "", "node should not be set for allocated from reserved") assert.Check(t, !result.Request.HasRelease(), "released allocation should not be present") assert.Equal(t, result.Request.GetApplicationID(), appID1, "expected application app-1 to be allocated") - assert.Equal(t, result.Request.GetAllocationKey(), "alloc-2", "expected ask alloc-2 to be allocated") + assert.Equal(t, result.Request.GetAllocationKey(), allocKey2, "expected ask alloc-2 to be allocated") assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000})) // reservations should have been removed: it is in progress - if app.IsReservedOnNode(node2.NodeID) || len(app.GetAskReservations("alloc-2")) != 0 { + if app.NodeReservedForAsk(allocKey2) != "" { t.Fatalf("reservation removal failure for ask2 and node2") } @@ -2462,11 +2492,11 @@ func TestTryAllocateWithReserved(t *testing.T) { err = partition.AddApplication(app) assert.NilError(t, err, "failed to add app-1 to partition") - ask := newAllocationAsk("alloc-1", appID1, res) + ask := newAllocationAsk(allocKey, appID1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add ask alloc-1 to app") - ask2 := newAllocationAsk("alloc-2", appID1, res) + ask2 := newAllocationAsk(allocKey2, appID1, res) err = app.AddAllocationAsk(ask2) assert.NilError(t, err, "failed to add ask alloc-2 to app") @@ -2476,8 +2506,8 @@ func TestTryAllocateWithReserved(t *testing.T) { t.Fatal("expected node-2 to be returned got nil") } partition.reserve(app, node2, ask) - if !app.IsReservedOnNode(node2.NodeID) || len(app.GetAskReservations("alloc-1")) == 0 { - t.Fatal("reservation failure for ask and node2") + if app.NodeReservedForAsk(allocKey) != nodeID2 { + t.Fatal("reservation failure for alloc-1 and node-2") } result := partition.tryAllocate() if result == nil || result.Request == nil { @@ -2486,7 +2516,7 @@ func TestTryAllocateWithReserved(t *testing.T) { assert.Equal(t, objects.AllocatedReserved, result.ResultType, "expected reserved allocation to be returned") assert.Equal(t, "", result.ReservedNodeID, "reserved node should be reset after processing") assert.Equal(t, 0, len(node2.GetReservationKeys()), "reservation should have been removed from node") - assert.Equal(t, false, app.IsReservedOnNode(node2.NodeID), "reservation cleanup for ask on app failed") + assert.Equal(t, "", app.NodeReservedForAsk(allocKey), "reservation cleanup for ask on app failed") assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 5000})) // node2 is unreserved now so the next one should allocate on the 2nd node (fair sharing)