From cd318552c09b3b78ba8ff53127b58590e3c39113 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Wed, 1 Apr 2026 18:31:06 +0530 Subject: [PATCH 01/26] moved quota preemption out from tryallocate --- pkg/scheduler/objects/queue.go | 32 +++++++------------ pkg/scheduler/objects/queue_test.go | 2 +- pkg/scheduler/objects/quota_preemptor.go | 2 +- pkg/scheduler/objects/quota_preemptor_test.go | 6 ++-- pkg/scheduler/partition.go | 2 +- 5 files changed, 18 insertions(+), 26 deletions(-) diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 37792fb24..35d60b499 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -1630,25 +1630,7 @@ func (sq *Queue) canRunApp(appID string) bool { // resources are skipped. // Applications are sorted based on the application sortPolicy. Applications without pending resources are skipped. // Lock free call this all locks are taken when needed in called functions -func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() NodeIterator, getnode func(string) *Node, allowPreemption, quotaPreemption bool) *AllocationResult { - if quotaPreemption && sq.shouldTriggerPreemption() { - go func() { - log.Log(log.SchedQueue).Info("Preconditions has passed trigger preemption to enforce new max resources", - zap.String("queueName", sq.GetQueuePath()), - zap.Stringer("maxResource", sq.cloneMaxResource())) - preemptor := NewQuotaPreemptor(sq) - preemptor.tryPreemption() - }() - // when we trigger for a parent queue do not trigger for the children just yet. At least wait until the next - // scheduling cycle. - quotaPreemption = false - } - // if quota preemption is running for this queue we do not want to trigger for any of the children. - // we do a top-down approach: parent first and when done we check the children - // there could be a child quota preemption running already - if sq.getQuotaPreemptionRunning() { - quotaPreemption = false - } +func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() NodeIterator, getnode func(string) *Node, allowPreemption bool) *AllocationResult { if sq.IsLeafQueue() { // get the headroom headRoom := sq.getHeadRoom() @@ -1685,7 +1667,7 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N } else { // process the child queues (filters out queues without pending requests) for _, child := range sq.sortQueues() { - result := child.TryAllocate(iterator, fullIterator, getnode, allowPreemption, quotaPreemption) + result := child.TryAllocate(iterator, fullIterator, getnode, allowPreemption) if result != nil { return result } @@ -1694,6 +1676,16 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N return nil } +func (sq *Queue) TryQuotaPreemption() { + if sq.shouldTriggerPreemption() { + log.Log(log.SchedQueue).Info("Preconditions has passed trigger preemption to enforce new max resources", + zap.String("queueName", sq.GetQueuePath()), + zap.Stringer("maxResource", sq.cloneMaxResource())) + preemptor := NewQuotaPreemptor(sq) + preemptor.tryQuotaPreemption() + } +} + // TryPlaceholderAllocate tries to replace a placeholders with a real allocation. // This only gets called if there is a pending request on this queue or its children. // This is a depth first algorithm: descend into the depth of the queue tree first. Child queues are sorted based on diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index dd00d46e8..7cb01f6ec 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2451,7 +2451,7 @@ func TestQuotaPreemptionSettings(t *testing.T) { // Wait till delay expires to let trigger preemption automatically time.Sleep(parent.quotaPreemptionDelay + 50*time.Millisecond) assert.Equal(t, parent.shouldTriggerPreemption(), tc.timeChange, "preemption should get trigger for set delay") - parent.TryAllocate(nil, nil, nil, false, true) + parent.TryAllocate(nil, nil, nil, false) time.Sleep(50 * time.Millisecond) diff --git a/pkg/scheduler/objects/quota_preemptor.go b/pkg/scheduler/objects/quota_preemptor.go index 1eedf827c..abc3c85af 100644 --- a/pkg/scheduler/objects/quota_preemptor.go +++ b/pkg/scheduler/objects/quota_preemptor.go @@ -50,7 +50,7 @@ func NewQuotaPreemptor(queue *Queue) *QuotaPreemptionContext { } } -func (qpc *QuotaPreemptionContext) tryPreemption() { +func (qpc *QuotaPreemptionContext) tryQuotaPreemption() { // Get Preemptable Resource qpc.setPreemptableResources() diff --git a/pkg/scheduler/objects/quota_preemptor_test.go b/pkg/scheduler/objects/quota_preemptor_test.go index d3dff3ed3..3257608db 100644 --- a/pkg/scheduler/objects/quota_preemptor_test.go +++ b/pkg/scheduler/objects/quota_preemptor_test.go @@ -203,7 +203,7 @@ func TestQuotaChangeTryPreemption(t *testing.T) { leaf.guaranteedResource = tc.guaranteed preemptor := NewQuotaPreemptor(tc.queue) preemptor.allocations = asks - preemptor.tryPreemption() + preemptor.tryQuotaPreemption() assert.Equal(t, len(preemptor.allocations), tc.totalExpectedVictims) var victimsCount int for _, a := range asks { @@ -314,7 +314,7 @@ func TestQuotaChangeTryPreemptionWithDifferentResTypes(t *testing.T) { leaf.maxResource = tc.newMax leaf.guaranteedResource = tc.guaranteed preemptor := NewQuotaPreemptor(tc.queue) - preemptor.tryPreemption() + preemptor.tryQuotaPreemption() assert.Equal(t, len(preemptor.allocations), v.totalExpectedVictims) var victimsCount int for _, a := range asks { @@ -574,7 +574,7 @@ func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { tc.queue.maxResource = tc.newMax tc.queue.guaranteedResource = tc.newMax preemptor := NewQuotaPreemptor(tc.queue) - preemptor.tryPreemption() + preemptor.tryQuotaPreemption() victimsCount := 0 for _, asks := range tc.victims { for _, a := range asks { diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 82c398e0a..ddc2e29e7 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -819,7 +819,7 @@ func (pc *PartitionContext) tryAllocate() *objects.AllocationResult { return nil } // try allocating from the root down - result := pc.root.TryAllocate(pc.GetNodeIterator, pc.GetFullNodeIterator, pc.GetNode, pc.IsPreemptionEnabled(), pc.IsQuotaPreemptionEnabled()) + result := pc.root.TryAllocate(pc.GetNodeIterator, pc.GetFullNodeIterator, pc.GetNode, pc.IsPreemptionEnabled()) if result != nil { return pc.allocate(result) } From 979b90c6f404778357e9b878da525608037cbb0f Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Wed, 1 Apr 2026 18:47:05 +0530 Subject: [PATCH 02/26] running quota preemption in its own cycle --- pkg/scheduler/scheduler.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index a47a5be4e..f1f76009e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -70,6 +70,7 @@ func (s *Scheduler) StartService(handlers handler.EventHandlers, manualSchedule if !manualSchedule { go s.internalSchedule() go s.internalInspectOutstandingRequests() + go s.internalQuotaPreemption() } } @@ -106,6 +107,20 @@ func (s *Scheduler) internalInspectOutstandingRequests() { } } +func (s *Scheduler) internalQuotaPreemption() { + for { + select { + case <-s.stop: + return + case <-time.After(time.Second): + if s.triggerQuotaPreemption(){ + log.Log(log.Scheduler).Info("Running quota preemption") + } + } + } +} + + // Implement methods for Scheduler events func (s *Scheduler) HandleEvent(ev interface{}) { enqueueAndCheckFull(s.pendingEvents, ev) @@ -162,6 +177,16 @@ func (s *Scheduler) registerActivity() { } } +func (s *Scheduler) triggerQuotaPreemption() { + for _, psc := range s.clusterContext.GetPartitionMapClone() { + if psc.quotaPreemptionEnabled { + log.Log(log.Scheduler).Info("Triggering quota preemption", + zap.String("partition", psc.Name)) + psc.root.TryQuotaPreemption() + } + } +} + // inspect on the outstanding requests for each of the queues, // update request state accordingly to shim if needed. // this function filters out all outstanding requests that being From 035d2e03f31d2976c1e21bc6a03306d8b356c82d Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Wed, 1 Apr 2026 22:46:22 +0530 Subject: [PATCH 03/26] fixed return type --- pkg/scheduler/scheduler.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f1f76009e..617619ff4 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -114,7 +114,9 @@ func (s *Scheduler) internalQuotaPreemption() { return case <-time.After(time.Second): if s.triggerQuotaPreemption(){ - log.Log(log.Scheduler).Info("Running quota preemption") + log.Log(log.Scheduler).Info("Quota preemption completed") + }else{ + log.Log(log.Scheduler).Debug("No quota preemption needed at this time") } } } @@ -177,14 +179,17 @@ func (s *Scheduler) registerActivity() { } } -func (s *Scheduler) triggerQuotaPreemption() { +func (s *Scheduler) triggerQuotaPreemption() bool{ + quotaPreemptionTried := false for _, psc := range s.clusterContext.GetPartitionMapClone() { if psc.quotaPreemptionEnabled { log.Log(log.Scheduler).Info("Triggering quota preemption", zap.String("partition", psc.Name)) psc.root.TryQuotaPreemption() + quotaPreemptionTried = true } } + return quotaPreemptionTried } // inspect on the outstanding requests for each of the queues, From 1defa06a9977f11911c98747405c6f13e690a649 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Wed, 1 Apr 2026 22:49:54 +0530 Subject: [PATCH 04/26] formatting --- pkg/scheduler/scheduler.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 617619ff4..78c130f3f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -113,16 +113,15 @@ func (s *Scheduler) internalQuotaPreemption() { case <-s.stop: return case <-time.After(time.Second): - if s.triggerQuotaPreemption(){ + if s.triggerQuotaPreemption() { log.Log(log.Scheduler).Info("Quota preemption completed") - }else{ + } else { log.Log(log.Scheduler).Debug("No quota preemption needed at this time") } } } } - // Implement methods for Scheduler events func (s *Scheduler) HandleEvent(ev interface{}) { enqueueAndCheckFull(s.pendingEvents, ev) @@ -179,7 +178,7 @@ func (s *Scheduler) registerActivity() { } } -func (s *Scheduler) triggerQuotaPreemption() bool{ +func (s *Scheduler) triggerQuotaPreemption() bool { quotaPreemptionTried := false for _, psc := range s.clusterContext.GetPartitionMapClone() { if psc.quotaPreemptionEnabled { From 59203b5a78be5a0e44122fbb505553ac261293e0 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Wed, 1 Apr 2026 23:12:34 +0530 Subject: [PATCH 05/26] fixed recursive calls and linting issue --- pkg/scheduler/objects/queue.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 35d60b499..5bef3aaa1 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -1683,6 +1683,16 @@ func (sq *Queue) TryQuotaPreemption() { zap.Stringer("maxResource", sq.cloneMaxResource())) preemptor := NewQuotaPreemptor(sq) preemptor.tryQuotaPreemption() + // if quota preemption is running for this queue we do not want to trigger for any of the children. + // we do a top-down approach: parent first and when done we check the children + // there could be a child quota preemption running already + if !sq.getQuotaPreemptionRunning() && !sq.IsLeafQueue() { + for _, child := range sq.sortQueues() { + log.Log(log.Scheduler).Info("Triggering quota preemption for child queue", + zap.String("queue", child.GetQueuePath())) + child.TryQuotaPreemption() + } + } } } From b7de9924d18f0b225859a70232764bd313d5b7d8 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Mon, 6 Apr 2026 12:28:24 +0530 Subject: [PATCH 06/26] addressed some initial review comments --- pkg/scheduler/objects/queue.go | 8 +++++--- pkg/scheduler/scheduler.go | 8 ++------ 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 5bef3aaa1..29a71e2da 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -1687,10 +1687,12 @@ func (sq *Queue) TryQuotaPreemption() { // we do a top-down approach: parent first and when done we check the children // there could be a child quota preemption running already if !sq.getQuotaPreemptionRunning() && !sq.IsLeafQueue() { - for _, child := range sq.sortQueues() { - log.Log(log.Scheduler).Info("Triggering quota preemption for child queue", + for _, child := range sq.GetCopyOfChildren() { + if child.IsRunning() { + log.Log(log.Scheduler).Info("Triggering quota preemption for child queue", zap.String("queue", child.GetQueuePath())) - child.TryQuotaPreemption() + child.TryQuotaPreemption() + } } } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 78c130f3f..11ec25339 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -113,11 +113,7 @@ func (s *Scheduler) internalQuotaPreemption() { case <-s.stop: return case <-time.After(time.Second): - if s.triggerQuotaPreemption() { - log.Log(log.Scheduler).Info("Quota preemption completed") - } else { - log.Log(log.Scheduler).Debug("No quota preemption needed at this time") - } + s.triggerQuotaPreemption() } } } @@ -181,7 +177,7 @@ func (s *Scheduler) registerActivity() { func (s *Scheduler) triggerQuotaPreemption() bool { quotaPreemptionTried := false for _, psc := range s.clusterContext.GetPartitionMapClone() { - if psc.quotaPreemptionEnabled { + if psc.IsQuotaPreemptionEnabled() { log.Log(log.Scheduler).Info("Triggering quota preemption", zap.String("partition", psc.Name)) psc.root.TryQuotaPreemption() From aeb44b28b622ed129dc33bf0985b46de12d4648c Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Mon, 6 Apr 2026 12:34:31 +0530 Subject: [PATCH 07/26] addressed review comments --- pkg/scheduler/objects/queue.go | 2 +- pkg/scheduler/objects/quota_preemptor.go | 2 +- pkg/scheduler/objects/quota_preemptor_test.go | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 29a71e2da..5a3aa15e6 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -1682,7 +1682,7 @@ func (sq *Queue) TryQuotaPreemption() { zap.String("queueName", sq.GetQueuePath()), zap.Stringer("maxResource", sq.cloneMaxResource())) preemptor := NewQuotaPreemptor(sq) - preemptor.tryQuotaPreemption() + preemptor.tryPreemption() // if quota preemption is running for this queue we do not want to trigger for any of the children. // we do a top-down approach: parent first and when done we check the children // there could be a child quota preemption running already diff --git a/pkg/scheduler/objects/quota_preemptor.go b/pkg/scheduler/objects/quota_preemptor.go index abc3c85af..1eedf827c 100644 --- a/pkg/scheduler/objects/quota_preemptor.go +++ b/pkg/scheduler/objects/quota_preemptor.go @@ -50,7 +50,7 @@ func NewQuotaPreemptor(queue *Queue) *QuotaPreemptionContext { } } -func (qpc *QuotaPreemptionContext) tryQuotaPreemption() { +func (qpc *QuotaPreemptionContext) tryPreemption() { // Get Preemptable Resource qpc.setPreemptableResources() diff --git a/pkg/scheduler/objects/quota_preemptor_test.go b/pkg/scheduler/objects/quota_preemptor_test.go index 3257608db..d3dff3ed3 100644 --- a/pkg/scheduler/objects/quota_preemptor_test.go +++ b/pkg/scheduler/objects/quota_preemptor_test.go @@ -203,7 +203,7 @@ func TestQuotaChangeTryPreemption(t *testing.T) { leaf.guaranteedResource = tc.guaranteed preemptor := NewQuotaPreemptor(tc.queue) preemptor.allocations = asks - preemptor.tryQuotaPreemption() + preemptor.tryPreemption() assert.Equal(t, len(preemptor.allocations), tc.totalExpectedVictims) var victimsCount int for _, a := range asks { @@ -314,7 +314,7 @@ func TestQuotaChangeTryPreemptionWithDifferentResTypes(t *testing.T) { leaf.maxResource = tc.newMax leaf.guaranteedResource = tc.guaranteed preemptor := NewQuotaPreemptor(tc.queue) - preemptor.tryQuotaPreemption() + preemptor.tryPreemption() assert.Equal(t, len(preemptor.allocations), v.totalExpectedVictims) var victimsCount int for _, a := range asks { @@ -574,7 +574,7 @@ func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { tc.queue.maxResource = tc.newMax tc.queue.guaranteedResource = tc.newMax preemptor := NewQuotaPreemptor(tc.queue) - preemptor.tryQuotaPreemption() + preemptor.tryPreemption() victimsCount := 0 for _, asks := range tc.victims { for _, a := range asks { From bbec286e30dfd8a5c948fe33f8ebcf8bfcb3a725 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Tue, 7 Apr 2026 15:25:33 +0530 Subject: [PATCH 08/26] addressed review comments --- pkg/scheduler/objects/queue.go | 23 ++++++++++------------- pkg/scheduler/scheduler.go | 5 +---- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 5a3aa15e6..4f715359c 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -510,9 +510,9 @@ func (sq *Queue) shouldTriggerPreemption() bool { return false } // usage is below max: no need to trigger. Happens if the queue drops below the new max when pods stop. - // Should clean up, but we have just a read lock... + // The stale start time will be cleared by setPreemptionTime on the next config update, or by + // setQuotaPreemptionState after any preemption run completes. if sq.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(sq.allocatedResource) { - sq.quotaPreemptionStartTime = time.Time{} return false } // trigger if the time is right @@ -1683,17 +1683,14 @@ func (sq *Queue) TryQuotaPreemption() { zap.Stringer("maxResource", sq.cloneMaxResource())) preemptor := NewQuotaPreemptor(sq) preemptor.tryPreemption() - // if quota preemption is running for this queue we do not want to trigger for any of the children. - // we do a top-down approach: parent first and when done we check the children - // there could be a child quota preemption running already - if !sq.getQuotaPreemptionRunning() && !sq.IsLeafQueue() { - for _, child := range sq.GetCopyOfChildren() { - if child.IsRunning() { - log.Log(log.Scheduler).Info("Triggering quota preemption for child queue", - zap.String("queue", child.GetQueuePath())) - child.TryQuotaPreemption() - } - } + return // no need to check for sub-tree children when we trigger for parent queue + } + if sq.getQuotaPreemptionRunning() { + return + } + if !sq.IsLeafQueue() { + for _, child := range sq.GetCopyOfChildren() { + child.TryQuotaPreemption() } } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 11ec25339..e4d2f7cee 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -174,17 +174,14 @@ func (s *Scheduler) registerActivity() { } } -func (s *Scheduler) triggerQuotaPreemption() bool { - quotaPreemptionTried := false +func (s *Scheduler) triggerQuotaPreemption() { for _, psc := range s.clusterContext.GetPartitionMapClone() { if psc.IsQuotaPreemptionEnabled() { log.Log(log.Scheduler).Info("Triggering quota preemption", zap.String("partition", psc.Name)) psc.root.TryQuotaPreemption() - quotaPreemptionTried = true } } - return quotaPreemptionTried } // inspect on the outstanding requests for each of the queues, From dc22934436f9e429eedf4412899c7b92a9c77d15 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Tue, 7 Apr 2026 16:00:48 +0530 Subject: [PATCH 09/26] fixed race conditions --- pkg/scheduler/objects/queue.go | 45 +++++++++++++----------- pkg/scheduler/objects/quota_preemptor.go | 10 +++--- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 4f715359c..1481db6d8 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -496,27 +496,29 @@ func (sq *Queue) setPreemptionTime(oldMaxResource *resources.Resource, oldDelay } } -// shouldTriggerPreemption returns true if quota preemption should be triggered based on the settings and the -// current time. -func (sq *Queue) shouldTriggerPreemption() bool { - sq.RLock() - defer sq.RUnlock() +// tryAcquirePreemption atomically checks all preconditions for quota preemption and marks the queue as running. +// This prevents concurrent goroutines from both passing the check before either sets the flag (TOCTOU). +// Returns true only if all conditions are met and the flag was successfully acquired. +// The caller MUST call setQuotaPreemptionState(false) when done. +func (sq *Queue) tryAcquirePreemption() bool { + sq.Lock() + defer sq.Unlock() // dynamic queues do not support quota preemption - if !sq.isManaged { - return false - } - // already in progress do not run again - if sq.isQuotaPreemptionRunning { + // already in progress: do not run again + if !sq.isManaged || sq.isQuotaPreemptionRunning { return false } // usage is below max: no need to trigger. Happens if the queue drops below the new max when pods stop. - // The stale start time will be cleared by setPreemptionTime on the next config update, or by - // setQuotaPreemptionState after any preemption run completes. if sq.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(sq.allocatedResource) { + sq.quotaPreemptionStartTime = time.Time{} return false } // trigger if the time is right - return !sq.quotaPreemptionStartTime.IsZero() && time.Now().After(sq.quotaPreemptionStartTime) + if sq.quotaPreemptionStartTime.IsZero() || time.Now().Before(sq.quotaPreemptionStartTime) { + return false + } + sq.isQuotaPreemptionRunning = true + return true } // setQuotaPreemptionState set or clear the running state for quota preemption. When done the start time is also cleared @@ -1677,13 +1679,16 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N } func (sq *Queue) TryQuotaPreemption() { - if sq.shouldTriggerPreemption() { - log.Log(log.SchedQueue).Info("Preconditions has passed trigger preemption to enforce new max resources", - zap.String("queueName", sq.GetQueuePath()), - zap.Stringer("maxResource", sq.cloneMaxResource())) - preemptor := NewQuotaPreemptor(sq) - preemptor.tryPreemption() - return // no need to check for sub-tree children when we trigger for parent queue + if sq.tryAcquirePreemption() { + go func() { + defer sq.setQuotaPreemptionState(false) + log.Log(log.SchedQueue).Info("Preconditions has passed trigger preemption to enforce new max resources", + zap.String("queueName", sq.GetQueuePath()), + zap.Stringer("maxResource", sq.cloneMaxResource())) + preemptor := NewQuotaPreemptor(sq) + preemptor.tryPreemption() + }() // fire preemption in a separate go routine to avoid blocking the quotapreemption loop + return // no need to run quota preemption for sub-tree children when we quota preemption is running for parent queue } if sq.getQuotaPreemptionRunning() { return diff --git a/pkg/scheduler/objects/quota_preemptor.go b/pkg/scheduler/objects/quota_preemptor.go index 1eedf827c..f05e2c6d9 100644 --- a/pkg/scheduler/objects/quota_preemptor.go +++ b/pkg/scheduler/objects/quota_preemptor.go @@ -72,6 +72,8 @@ func (qpc *QuotaPreemptionContext) tryPreemption() { } } +// this MUST always be run in top-down manner. +// This assumes that parent queue will not call this for leaf queues if quota preemption is already running for parent queue. func (qpc *QuotaPreemptionContext) tryPreemptionInternal() { log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change preemption for leaf queue", zap.String("leaf queue", qpc.queue.GetQueuePath()), @@ -80,8 +82,8 @@ func (qpc *QuotaPreemptionContext) tryPreemptionInternal() { zap.Stringer("actual allocated resource", qpc.allocatedResource), zap.Stringer("preemptable resource distribution", qpc.preemptableResource), ) - // quota change preemption has started, so mark the flag - qpc.queue.setQuotaPreemptionState(true) + // isQuotaPreemptionRunning is already set atomically by tryAcquirePreemption + // before this function is called; do not set it again here. // Filter the allocations qpc.filterAllocations() @@ -91,9 +93,7 @@ func (qpc *QuotaPreemptionContext) tryPreemptionInternal() { // Preempt the victims qpc.preemptVictims() - - // quota change preemption has ended, so mark the flag - qpc.queue.setQuotaPreemptionState(false) + // isQuotaPreemptionRunning is cleared by the caller (via defer setQuotaPreemptionState(false)). } // getChildQueuesPreemptableResource Compute leaf queue's preemptable resource distribution from the parent's preemptable resource. From 7406f1bed4ff8c22b29a036eeaac7ac514690cde Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Tue, 7 Apr 2026 16:10:44 +0530 Subject: [PATCH 10/26] fixed formatting --- pkg/scheduler/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e4d2f7cee..7759215d0 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -174,7 +174,7 @@ func (s *Scheduler) registerActivity() { } } -func (s *Scheduler) triggerQuotaPreemption() { +func (s *Scheduler) triggerQuotaPreemption() { for _, psc := range s.clusterContext.GetPartitionMapClone() { if psc.IsQuotaPreemptionEnabled() { log.Log(log.Scheduler).Info("Triggering quota preemption", From 6e8bf47968a8cf3bc01b06a601e3845d56855d19 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Tue, 7 Apr 2026 16:18:05 +0530 Subject: [PATCH 11/26] fixed test cases --- pkg/scheduler/objects/queue_test.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index 7cb01f6ec..136934ae6 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2450,13 +2450,18 @@ func TestQuotaPreemptionSettings(t *testing.T) { parent.UpdateQueueProperties(oldMax) // Wait till delay expires to let trigger preemption automatically time.Sleep(parent.quotaPreemptionDelay + 50*time.Millisecond) - assert.Equal(t, parent.shouldTriggerPreemption(), tc.timeChange, "preemption should get trigger for set delay") + triggered := parent.tryAcquirePreemption() + assert.Equal(t, triggered, tc.timeChange, "preemption should get trigger for set delay") + // Simulate preemption completing: clears isQuotaPreemptionRunning and quotaPreemptionStartTime. + if triggered { + parent.setQuotaPreemptionState(false) + } parent.TryAllocate(nil, nil, nil, false) time.Sleep(50 * time.Millisecond) // since preemption settings are reset, preemption should not be triggerred again during the next check - assert.Equal(t, parent.shouldTriggerPreemption(), false) + assert.Equal(t, parent.tryAcquirePreemption(), false) }) } } @@ -2535,12 +2540,17 @@ func TestShouldTriggerPreemption(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - assert.Equal(t, tc.queue.shouldTriggerPreemption(), tc.preconditionResult) + result := tc.queue.tryAcquirePreemption() + assert.Equal(t, result, tc.preconditionResult) + // Reset the flag if acquired so it doesn't affect other test cases. + if result { + tc.queue.setQuotaPreemptionState(false) + } }) } - // special case: reset time if usage below max + // special case: reset time if usage below max (StrictlyGreaterThanOrEqualsOnlyExisting clears the start time) usageNotMatchingMaxQueue.quotaPreemptionStartTime = time.Now().Add(time.Hour) - assert.Assert(t, !usageNotMatchingMaxQueue.shouldTriggerPreemption(), "preemption should not be triggered") + assert.Assert(t, !usageNotMatchingMaxQueue.tryAcquirePreemption(), "preemption should not be triggered") assert.Assert(t, usageNotMatchingMaxQueue.quotaPreemptionStartTime.IsZero(), "start time should be reset") } From 76b8c304ad3b5330b2d92a660e6bacfb7141dc3c Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Tue, 7 Apr 2026 16:55:09 +0530 Subject: [PATCH 12/26] added test cases --- pkg/scheduler/objects/queue_test.go | 47 ++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index 136934ae6..c4a839f3d 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2427,6 +2427,23 @@ func TestQuotaPreemptionSettings(t *testing.T) { }, Properties: map[string]string{configs.QuotaPreemptionDelay: "50ms"}, }, 0, true}, + {"parent queue preemption is running, leaf should not be allowed", + map[string]string{"memory": "500"}, + configs.QueueConfig{ + Resources: configs.Resources{ + Max: map[string]string{"memory": "100"}, + }, + Properties: map[string]string{configs.QuotaPreemptionDelay: "50ms"}, + Queues: []configs.QueueConfig{ + { + Name: "leaf", + Resources: configs.Resources{ + Max: map[string]string{"memory": "100"}, + }, + Properties: map[string]string{configs.QuotaPreemptionDelay: "50ms"}, + }, + }, + }, 50 * time.Millisecond, true}, } var oldMax *resources.Resource @@ -2438,7 +2455,7 @@ func TestQuotaPreemptionSettings(t *testing.T) { expectedMax, err = resources.NewResourceFromConf(tc.maxRes) assert.NilError(t, err, "resource creation failed") } - parent, err = createManagedQueue(root, "parent", false, tc.maxRes) + parent, err = createManagedQueue(root, "parent", true, tc.maxRes) assert.NilError(t, err, "failed to create basic queue: %v", err) oldMax, err = parent.ApplyConf(tc.conf) assert.NilError(t, err, "failed to apply conf: %v", err) @@ -2448,6 +2465,20 @@ func TestQuotaPreemptionSettings(t *testing.T) { parent.allocatedResource = resources.Multiply(oldMax, 2) parent.quotaPreemptionDelay = tc.oldDelay parent.UpdateQueueProperties(oldMax) + + var leafQ *Queue + if len(tc.conf.Queues) > 0 { + leaf := tc.conf.Queues[0].Name + if leaf != "" { + leafQ, err = createManagedQueue(parent, leaf, false, tc.maxRes) + assert.NilError(t, err, "failed to create leaf queue: %v", err) + oldMax, err = leafQ.ApplyConf(tc.conf.Queues[0]) + leafQ.allocatedResource = resources.Multiply(oldMax, 2) + leafQ.quotaPreemptionDelay = tc.oldDelay + leafQ.UpdateQueueProperties(oldMax) + } + } + // Wait till delay expires to let trigger preemption automatically time.Sleep(parent.quotaPreemptionDelay + 50*time.Millisecond) triggered := parent.tryAcquirePreemption() @@ -2456,7 +2487,6 @@ func TestQuotaPreemptionSettings(t *testing.T) { if triggered { parent.setQuotaPreemptionState(false) } - parent.TryAllocate(nil, nil, nil, false) time.Sleep(50 * time.Millisecond) @@ -2466,7 +2496,7 @@ func TestQuotaPreemptionSettings(t *testing.T) { } } -func TestShouldTriggerPreemption(t *testing.T) { +func TestTryAcquirePreemption(t *testing.T) { parentConfig := configs.QueueConfig{ Name: "parent", Parent: true, @@ -2477,7 +2507,7 @@ func TestShouldTriggerPreemption(t *testing.T) { parent, err := NewConfiguredQueue(parentConfig, nil, false, nil) assert.NilError(t, err) parent.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, "cpu": 2000}) - parent.quotaPreemptionStartTime = time.Now() + parent.quotaPreemptionStartTime = time.Now().Add(-time.Second) leafRes := configs.Resources{ Max: map[string]string{"memory": "1000"}, @@ -2525,6 +2555,14 @@ func TestShouldTriggerPreemption(t *testing.T) { assert.NilError(t, err) usageNotMatchingMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}) + futureStartQueue, err := NewConfiguredQueue(configs.QueueConfig{ + Name: "leaf-future-start-time", + Resources: leafRes, + }, parent, false, nil) + assert.NilError(t, err) + futureStartQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, "cpu": 2000}) + futureStartQueue.quotaPreemptionStartTime = time.Now().Add(time.Hour) + testCases := []struct { name string queue *Queue @@ -2535,6 +2573,7 @@ func TestShouldTriggerPreemption(t *testing.T) { {"preemption running", alreadyPreemptionRunning, false}, {"usage exceeded max, no start time", usageExceededMaxQueue, false}, {"usage exceeded max, start time set", parent, true}, + {"usage exceeded max, start time in future", futureStartQueue, false}, {"usage equals max resources", usageEqualsMaxQueue, false}, {"usage res not matching max", usageNotMatchingMaxQueue, false}, } From 275ee6223c5a19747bd70abafa5c7a930c183da9 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Wed, 8 Apr 2026 18:31:36 +0530 Subject: [PATCH 13/26] removed unnecessary case --- pkg/scheduler/objects/queue_test.go | 35 ++--------------------------- 1 file changed, 2 insertions(+), 33 deletions(-) diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index c4a839f3d..07ce26e41 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2427,23 +2427,6 @@ func TestQuotaPreemptionSettings(t *testing.T) { }, Properties: map[string]string{configs.QuotaPreemptionDelay: "50ms"}, }, 0, true}, - {"parent queue preemption is running, leaf should not be allowed", - map[string]string{"memory": "500"}, - configs.QueueConfig{ - Resources: configs.Resources{ - Max: map[string]string{"memory": "100"}, - }, - Properties: map[string]string{configs.QuotaPreemptionDelay: "50ms"}, - Queues: []configs.QueueConfig{ - { - Name: "leaf", - Resources: configs.Resources{ - Max: map[string]string{"memory": "100"}, - }, - Properties: map[string]string{configs.QuotaPreemptionDelay: "50ms"}, - }, - }, - }, 50 * time.Millisecond, true}, } var oldMax *resources.Resource @@ -2455,7 +2438,7 @@ func TestQuotaPreemptionSettings(t *testing.T) { expectedMax, err = resources.NewResourceFromConf(tc.maxRes) assert.NilError(t, err, "resource creation failed") } - parent, err = createManagedQueue(root, "parent", true, tc.maxRes) + parent, err = createManagedQueue(root, "parent", false, tc.maxRes) assert.NilError(t, err, "failed to create basic queue: %v", err) oldMax, err = parent.ApplyConf(tc.conf) assert.NilError(t, err, "failed to apply conf: %v", err) @@ -2465,20 +2448,6 @@ func TestQuotaPreemptionSettings(t *testing.T) { parent.allocatedResource = resources.Multiply(oldMax, 2) parent.quotaPreemptionDelay = tc.oldDelay parent.UpdateQueueProperties(oldMax) - - var leafQ *Queue - if len(tc.conf.Queues) > 0 { - leaf := tc.conf.Queues[0].Name - if leaf != "" { - leafQ, err = createManagedQueue(parent, leaf, false, tc.maxRes) - assert.NilError(t, err, "failed to create leaf queue: %v", err) - oldMax, err = leafQ.ApplyConf(tc.conf.Queues[0]) - leafQ.allocatedResource = resources.Multiply(oldMax, 2) - leafQ.quotaPreemptionDelay = tc.oldDelay - leafQ.UpdateQueueProperties(oldMax) - } - } - // Wait till delay expires to let trigger preemption automatically time.Sleep(parent.quotaPreemptionDelay + 50*time.Millisecond) triggered := parent.tryAcquirePreemption() @@ -2507,7 +2476,7 @@ func TestTryAcquirePreemption(t *testing.T) { parent, err := NewConfiguredQueue(parentConfig, nil, false, nil) assert.NilError(t, err) parent.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, "cpu": 2000}) - parent.quotaPreemptionStartTime = time.Now().Add(-time.Second) + parent.quotaPreemptionStartTime = time.Now() leafRes := configs.Resources{ Max: map[string]string{"memory": "1000"}, From e10f3b155666f3c6f01ea171beed2e3aa1ab7ece Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Wed, 8 Apr 2026 19:20:20 +0530 Subject: [PATCH 14/26] added test case for trypreemptioninternal --- pkg/scheduler/objects/queue_test.go | 30 +++ pkg/scheduler/objects/quota_preemptor.go | 2 +- pkg/scheduler/objects/quota_preemptor_test.go | 184 ++++++++++++++++++ 3 files changed, 215 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index 07ce26e41..7947e37fd 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2532,6 +2532,16 @@ func TestTryAcquirePreemption(t *testing.T) { futureStartQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, "cpu": 2000}) futureStartQueue.quotaPreemptionStartTime = time.Now().Add(time.Hour) + // nil maxResource: StrictlyGreaterThanOrEqualsOnlyExisting(nil, alloc) -> false (won't clear start time) + // so a past start time will still trigger preemption. + nilMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{ + Name: "leaf-nil-max", + // Resources intentionally not set -> maxResource == nil + }, parent, false, nil) + assert.NilError(t, err) + nilMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}) + nilMaxQueue.quotaPreemptionStartTime = time.Now().Add(-time.Second) + testCases := []struct { name string queue *Queue @@ -2545,6 +2555,7 @@ func TestTryAcquirePreemption(t *testing.T) { {"usage exceeded max, start time in future", futureStartQueue, false}, {"usage equals max resources", usageEqualsMaxQueue, false}, {"usage res not matching max", usageNotMatchingMaxQueue, false}, + {"nil max resource with usage and past start time", nilMaxQueue, true}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -2560,6 +2571,25 @@ func TestTryAcquirePreemption(t *testing.T) { usageNotMatchingMaxQueue.quotaPreemptionStartTime = time.Now().Add(time.Hour) assert.Assert(t, !usageNotMatchingMaxQueue.tryAcquirePreemption(), "preemption should not be triggered") assert.Assert(t, usageNotMatchingMaxQueue.quotaPreemptionStartTime.IsZero(), "start time should be reset") + + // start time is also cleared when usage equals max (within-quota side effect check). + usageEqualsMaxQueue.quotaPreemptionStartTime = time.Now().Add(-time.Second) + assert.Assert(t, !usageEqualsMaxQueue.tryAcquirePreemption(), "preemption must not fire when usage equals max") + assert.Assert(t, usageEqualsMaxQueue.quotaPreemptionStartTime.IsZero(), "start time must be cleared when usage equals max") + + // successful acquisition sets isQuotaPreemptionRunning to true. + // re-arm parent: table test called setQuotaPreemptionState(false) which cleared the start time. + parent.quotaPreemptionStartTime = time.Now().Add(-time.Second) + assert.Assert(t, parent.tryAcquirePreemption(), "acquisition should succeed after re-arming") + assert.Assert(t, parent.isQuotaPreemptionRunning, "isQuotaPreemptionRunning must be true after acquisition") + parent.setQuotaPreemptionState(false) + + // after release, start time is cleared → immediate re-acquisition is blocked. + assert.Assert(t, !parent.tryAcquirePreemption(), "re-acquisition must fail when start time is cleared after release") + // re-arming the start time makes re-acquisition possible again. + parent.quotaPreemptionStartTime = time.Now().Add(-time.Second) + assert.Assert(t, parent.tryAcquirePreemption(), "re-acquisition must succeed after re-arming start time") + parent.setQuotaPreemptionState(false) } func TestNewConfiguredQueue(t *testing.T) { diff --git a/pkg/scheduler/objects/quota_preemptor.go b/pkg/scheduler/objects/quota_preemptor.go index f05e2c6d9..31c267eee 100644 --- a/pkg/scheduler/objects/quota_preemptor.go +++ b/pkg/scheduler/objects/quota_preemptor.go @@ -72,7 +72,7 @@ func (qpc *QuotaPreemptionContext) tryPreemption() { } } -// this MUST always be run in top-down manner. +// this MUST always be run in top-down manner starting from the root queue. // This assumes that parent queue will not call this for leaf queues if quota preemption is already running for parent queue. func (qpc *QuotaPreemptionContext) tryPreemptionInternal() { log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change preemption for leaf queue", diff --git a/pkg/scheduler/objects/quota_preemptor_test.go b/pkg/scheduler/objects/quota_preemptor_test.go index d3dff3ed3..10162e198 100644 --- a/pkg/scheduler/objects/quota_preemptor_test.go +++ b/pkg/scheduler/objects/quota_preemptor_test.go @@ -19,6 +19,7 @@ package objects import ( + "sort" "testing" "time" @@ -592,6 +593,189 @@ func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { } } +// TestTryPreemptionInternal tests tryPreemptionInternal which filters, sorts, and preempts victims +// based on the preemptable resource that must be set before calling this function. +func TestTryPreemptionInternal(t *testing.T) { + node := NewNode(&si.NodeInfo{ + NodeID: "node", + Attributes: nil, + SchedulableResource: &si.Resource{ + Resources: map[string]*si.Quantity{"first": {Value: 500}}, + }, + }) + + leaf, err := NewConfiguredQueue(configs.QueueConfig{ + Name: "leaf", + }, nil, false, nil) + assert.NilError(t, err) + + testCases := []struct { + name string + preemptableResource *resources.Resource + maxResource *resources.Resource + guaranteedResource *resources.Resource + victims []*Allocation + expectedPreemptedKeys []string + }{ + { + name: "nil preemptable resource - no victims preempted", + preemptableResource: nil, + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + guaranteedResource: nil, + victims: []*Allocation{ + createVictim(t, "a1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), + }, + expectedPreemptedKeys: []string{}, + }, + { + name: "empty victim list - nothing to preempt", + preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + guaranteedResource: nil, + victims: []*Allocation{}, + expectedPreemptedKeys: []string{}, + }, + { + name: "single victim fits preemptable resource - preempted", + preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + guaranteedResource: nil, + victims: []*Allocation{ + createVictim(t, "b1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), + }, + expectedPreemptedKeys: []string{"b1"}, + }, + { + name: "victim resource exceeds preemptable - skipped", + preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + guaranteedResource: nil, + victims: []*Allocation{ + createVictim(t, "c1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), + }, + expectedPreemptedKeys: []string{}, + }, + { + // After sorting by resource share (smaller first): d2(5/20=0.25) < d3(10/20=0.5) == d1(10/20=0.5). + // d3 and d1 have equal share; sort is stable but the comparator returns true for both + // directions on equal elements, so d3 ends up before d1 in practice. + // Iteration: d2(total=5) kept, d3(total=15, ==preemptable) kept, d1(total=25 > 15) skipped. + name: "multiple victims - only enough to cover preemptable resource preempted", + preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15}), + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}), + guaranteedResource: nil, + victims: []*Allocation{ + createVictim(t, "d1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), + createVictim(t, "d2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})), + createVictim(t, "d3", node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), + }, + expectedPreemptedKeys: []string{"d2", "d3"}, + }, + { + // e1 has a required node and is excluded by filterAllocations; only e2 passes the filter. + name: "required-node allocations skipped during filter", + preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}), + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + guaranteedResource: nil, + victims: func() []*Allocation { + a := createVictim(t, "e1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) + a.SetRequiredNode("node") + b := createVictim(t, "e2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) + return []*Allocation{a, b} + }(), + expectedPreemptedKeys: []string{"e2"}, + }, + { + // f1 is already preempted before the call so it is excluded by filterAllocations; + // f2 is newly preempted by tryPreemptionInternal. Both are reported as preempted + // because f1 retains its pre-existing preempted state throughout. + name: "already preempted allocations skipped during filter", + preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}), + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + guaranteedResource: nil, + victims: func() []*Allocation { + a := createVictim(t, "f1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) + err := a.MarkPreempted() + assert.NilError(t, err) + b := createVictim(t, "f2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) + return []*Allocation{a, b} + }(), + expectedPreemptedKeys: []string{"f1", "f2"}, + }, + { + // g1 is released before the call so it is excluded by filterAllocations; + // only g2 is preempted by tryPreemptionInternal. + name: "already released allocations skipped during filter", + preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}), + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + guaranteedResource: nil, + victims: func() []*Allocation { + a := createVictim(t, "g1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) + err := a.SetReleased(true) + assert.NilError(t, err) + b := createVictim(t, "g2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) + return []*Allocation{a, b} + }(), + expectedPreemptedKeys: []string{"g2"}, + }, + { + // Best-effort mode (guaranteed==max): all victims have distinct shares, + // sorted ascending: h4(2/10=0.2) < h1(3/10=0.3) < h3(5/10=0.5) < h2(6/10=0.6). + // Iteration: h4(total=2) kept, h1(total=5) kept, h3(total=10, ==preemptable) kept, + // h2(total=16 > 10) skipped. + name: "best-effort mode: guaranteed equals max - preempt as close as possible", + preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + guaranteedResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + victims: []*Allocation{ + createVictim(t, "h1", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3})), + createVictim(t, "h2", node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 6})), + createVictim(t, "h3", node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})), + createVictim(t, "h4", node, 1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})), + }, + expectedPreemptedKeys: []string{"h4", "h1", "h3"}, + }, + { + name: "no matching resource type in preemptable - no victims filtered", + preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 100}), + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + guaranteedResource: nil, + victims: []*Allocation{ + createVictim(t, "i1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), + }, + expectedPreemptedKeys: []string{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + leaf.maxResource = tc.maxResource + leaf.guaranteedResource = tc.guaranteedResource + asks := tc.victims + assignAllocationsToQueue(asks, leaf) + + preemptor := NewQuotaPreemptor(leaf) + preemptor.preemptableResource = tc.preemptableResource + + preemptor.tryPreemptionInternal() + + preemptedKeys := make([]string, 0) + for _, a := range asks { + if a.IsPreempted() { + preemptedKeys = append(preemptedKeys, a.GetAllocationKey()) + } + } + sort.Strings(preemptedKeys) + expectedKeys := append([]string{}, tc.expectedPreemptedKeys...) + sort.Strings(expectedKeys) + assert.DeepEqual(t, preemptedKeys, expectedKeys) + + removeAllocationAsks(node, asks) + resetQueue(leaf) + }) + } +} + // createQueueSetups Creates a queue hierarchy // Queue Structure: // parent From 433b98f5365d65ab61f6d51e2b6b468719b458c8 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Wed, 8 Apr 2026 20:48:17 +0530 Subject: [PATCH 15/26] fixed formatting --- pkg/scheduler/objects/quota_preemptor_test.go | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/scheduler/objects/quota_preemptor_test.go b/pkg/scheduler/objects/quota_preemptor_test.go index 81840a253..4c2545b60 100644 --- a/pkg/scheduler/objects/quota_preemptor_test.go +++ b/pkg/scheduler/objects/quota_preemptor_test.go @@ -20,7 +20,7 @@ package objects import ( "sort" - "strconv" + "strconv" "strings" "testing" "time" @@ -625,11 +625,11 @@ func TestTryPreemptionInternal(t *testing.T) { assert.NilError(t, err) testCases := []struct { - name string - preemptableResource *resources.Resource - maxResource *resources.Resource - guaranteedResource *resources.Resource - victims []*Allocation + name string + preemptableResource *resources.Resource + maxResource *resources.Resource + guaranteedResource *resources.Resource + victims []*Allocation expectedPreemptedKeys []string }{ { @@ -643,11 +643,11 @@ func TestTryPreemptionInternal(t *testing.T) { expectedPreemptedKeys: []string{}, }, { - name: "empty victim list - nothing to preempt", - preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - guaranteedResource: nil, - victims: []*Allocation{}, + name: "empty victim list - nothing to preempt", + preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), + guaranteedResource: nil, + victims: []*Allocation{}, expectedPreemptedKeys: []string{}, }, { @@ -788,7 +788,7 @@ func TestTryPreemptionInternal(t *testing.T) { removeAllocationAsks(node, asks) resetQueue(leaf) }) - } + } } func assertQuotaPreemptionEvent(t *testing.T, victims int, results string, records []*si.EventRecord) { From b0cdc39e350b94a6496e240cabb6455225f4cafd Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Wed, 8 Apr 2026 21:30:29 +0530 Subject: [PATCH 16/26] added test cases for trigger preemption --- pkg/scheduler/scheduler_test.go | 168 ++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index d3108dbae..3c9ee6404 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -18,11 +18,15 @@ limitations under the License. package scheduler import ( + "strconv" "testing" + "time" "gotest.tools/v3/assert" + "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/common/resources" + "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent" "github.com/apache/yunikorn-core/pkg/scheduler/objects" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -91,3 +95,167 @@ func TestInspectOutstandingRequests(t *testing.T) { assert.Equal(t, 0, noRequests) assert.Assert(t, resources.IsZero(totalResources), "total resource is not zero: %v", totalResources) } + +// TestTriggerQuotaPreemption_QuotaPreemptionDisabled verifies that triggerQuotaPreemption is a no-op +// when quota preemption is disabled on the partition. +func TestTriggerQuotaPreemption_QuotaPreemptionDisabled(t *testing.T) { + scheduler := NewScheduler() + // newBasePartition creates a partition with quota preemption disabled (default) + partition, err := newBasePartition() + assert.NilError(t, err, "unable to create partition: %v", err) + scheduler.clusterContext.partitions["test"] = partition + + _, testHandler := newApplicationWithHandler(appID1, "default", "root.default") + + // quota preemption is not enabled; triggerQuotaPreemption should be a no-op + scheduler.triggerQuotaPreemption() + time.Sleep(200 * time.Millisecond) + + // no release events expected + for _, event := range testHandler.GetEvents() { + if _, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { + t.Fatal("unexpected release event when quota preemption is disabled") + } + } +} + +// TestTriggerQuotaPreemption_QuotaPreemptionEnabled verifies that triggerQuotaPreemption fires +// preemption and produces release events when the queue is over max and the preemption delay has elapsed. +func TestTriggerQuotaPreemption_QuotaPreemptionEnabled(t *testing.T) { + scheduler := NewScheduler() + partition := createQuotaPreemptionQueuesNodes(t) + scheduler.clusterContext.partitions["test"] = partition + + app, testHandler := newApplicationWithHandler(appID1, "default", "root.leaf") + err := partition.AddApplication(app) + assert.NilError(t, err) + + maxAllocs := 5 + for i := 1; i <= maxAllocs; i++ { + res, resErr := resources.NewResourceFromConf(map[string]string{"vcore": "2"}) + assert.NilError(t, resErr) + alloc := si.Allocation{ + AllocationKey: "ask-key-" + strconv.Itoa(i), + ApplicationID: appID1, + NodeID: nodeID1, + ResourcePerAlloc: res.ToProto(), + } + _, allocCreated, allocErr := partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc)) + assert.NilError(t, allocErr) + assert.Check(t, allocCreated, "alloc should have been created") + } + + // lower the max so the queue is now over quota — use updateQueues so ApplyConf calls setPreemptionTime + root := partition.GetQueue("root") + assert.Assert(t, root != nil, "root queue not found") + newLeafConf := []configs.QueueConfig{ + createLeafQueueConfig( + map[string]string{"memory": "10", "vcore": "5"}, + map[string]string{configs.QuotaPreemptionDelay: "1s"}, + ), + } + err = partition.updateQueues(newLeafConf, root) + assert.NilError(t, err, "failed to update queue config") + + // wait for the preemption delay (configured as 1s) to elapse + time.Sleep(1100 * time.Millisecond) + + scheduler.triggerQuotaPreemption() + + // wait for the async preemption goroutine to complete and events to propagate + time.Sleep(300 * time.Millisecond) + + releaseEventCount := 0 + for _, event := range testHandler.GetEvents() { + if _, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { + releaseEventCount++ + } + } + assert.Check(t, releaseEventCount > 0, "expected at least one release event from quota preemption, got 0") +} + +// TestTriggerQuotaPreemption_NoPartitions verifies that triggerQuotaPreemption handles an empty +// partition map cleanly without panicking. +func TestTriggerQuotaPreemption_NoPartitions(t *testing.T) { + scheduler := NewScheduler() + // clusterContext has no partitions; call should be a no-op + scheduler.triggerQuotaPreemption() +} + +// TestTriggerQuotaPreemption_UsageBelowMax verifies that triggerQuotaPreemption does not trigger +// preemption when the queue usage is within the configured max resource limit. +func TestTriggerQuotaPreemption_UsageBelowMax(t *testing.T) { + scheduler := NewScheduler() + partition := createQuotaPreemptionQueuesNodes(t) + scheduler.clusterContext.partitions["test"] = partition + + app, testHandler := newApplicationWithHandler(appID1, "default", "root.leaf") + err := partition.AddApplication(app) + assert.NilError(t, err) + + // allocate well within the configured max (max is vcore:10, allocate vcore:4) + for i := 1; i <= 2; i++ { + res, resErr := resources.NewResourceFromConf(map[string]string{"vcore": "2"}) + assert.NilError(t, resErr) + alloc := si.Allocation{ + AllocationKey: "ask-key-" + strconv.Itoa(i), + ApplicationID: appID1, + NodeID: nodeID1, + ResourcePerAlloc: res.ToProto(), + } + _, allocCreated, allocErr := partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc)) + assert.NilError(t, allocErr) + assert.Check(t, allocCreated) + } + + time.Sleep(1100 * time.Millisecond) + scheduler.triggerQuotaPreemption() + time.Sleep(300 * time.Millisecond) + + for _, event := range testHandler.GetEvents() { + if _, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { + t.Fatal("unexpected release event when usage is below max") + } + } +} + +// TestTriggerQuotaPreemption_PreemptionDelayNotElapsed verifies that no preemption occurs when +// the queue is over quota but the configured preemption delay has not elapsed yet. +func TestTriggerQuotaPreemption_PreemptionDelayNotElapsed(t *testing.T) { + scheduler := NewScheduler() + partition := createQuotaPreemptionQueuesNodes(t) + scheduler.clusterContext.partitions["test"] = partition + + app, testHandler := newApplicationWithHandler(appID1, "default", "root.leaf") + err := partition.AddApplication(app) + assert.NilError(t, err) + + maxAllocs := 5 + for i := 1; i <= maxAllocs; i++ { + res, resErr := resources.NewResourceFromConf(map[string]string{"vcore": "2"}) + assert.NilError(t, resErr) + alloc := si.Allocation{ + AllocationKey: "ask-key-" + strconv.Itoa(i), + ApplicationID: appID1, + NodeID: nodeID1, + ResourcePerAlloc: res.ToProto(), + } + _, allocCreated, allocErr := partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc)) + assert.NilError(t, allocErr) + assert.Check(t, allocCreated) + } + + leaf := partition.GetQueue("root.leaf") + assert.Assert(t, leaf != nil, "leaf queue not found") + leaf.SetMaxResource(resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 5})) + + // trigger immediately — the 1s delay has NOT elapsed + scheduler.triggerQuotaPreemption() + time.Sleep(200 * time.Millisecond) + + for _, event := range testHandler.GetEvents() { + if _, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { + t.Fatal("unexpected release event before preemption delay elapsed") + } + } +} From 47a68fb076b06c8c7acbbdeb2ea74e4d5a190b9b Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Thu, 9 Apr 2026 10:42:13 +0530 Subject: [PATCH 17/26] addressed minor comments --- pkg/scheduler/objects/quota_preemptor.go | 4 ++-- pkg/scheduler/objects/quota_preemptor_test.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/objects/quota_preemptor.go b/pkg/scheduler/objects/quota_preemptor.go index f817319a1..6b115f015 100644 --- a/pkg/scheduler/objects/quota_preemptor.go +++ b/pkg/scheduler/objects/quota_preemptor.go @@ -120,6 +120,8 @@ func (qpc *QuotaPreemptionContext) tryPreemption() { // this MUST always be run in top-down manner starting from the root queue. // This assumes that parent queue will not call this for leaf queues if quota preemption is already running for parent queue. +// Use tryAcquirePreemption to safely mark the quota preemption running for queue before calling this function. +// isQuotaPreemptionRunning is already set atomically by tryAcquirePreemption before this function is called. func (qpc *QuotaPreemptionContext) tryPreemptionInternal() { log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change preemption for leaf queue", zap.String("leaf queue", qpc.queue.GetQueuePath()), @@ -128,8 +130,6 @@ func (qpc *QuotaPreemptionContext) tryPreemptionInternal() { zap.Stringer("actual allocated resource", qpc.allocatedResource), zap.Stringer("preemptable resource distribution", qpc.preemptableResource), ) - // isQuotaPreemptionRunning is already set atomically by tryAcquirePreemption - // before this function is called; do not set it again here. // Filter the allocations qpc.filterAllocations() diff --git a/pkg/scheduler/objects/quota_preemptor_test.go b/pkg/scheduler/objects/quota_preemptor_test.go index 4c2545b60..316778ef9 100644 --- a/pkg/scheduler/objects/quota_preemptor_test.go +++ b/pkg/scheduler/objects/quota_preemptor_test.go @@ -610,6 +610,7 @@ func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { // TestTryPreemptionInternal tests tryPreemptionInternal which filters, sorts, and preempts victims // based on the preemptable resource that must be set before calling this function. +// nolint:funlen func TestTryPreemptionInternal(t *testing.T) { node := NewNode(&si.NodeInfo{ NodeID: "node", From dfcd67a48e0103b8bea1f79871394a19a8ad196e Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Thu, 9 Apr 2026 13:51:08 +0530 Subject: [PATCH 18/26] fixed broken tests --- .gitignore | 1 + pkg/scheduler/partition_test.go | 10 ++++------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index f8e4061ec..ee7b0038f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea +.vscode .DS_Store /tools/ /build/ diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 33bb49d42..9d79bd84e 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -3746,7 +3746,8 @@ func TestUpdateAllocationWithQuotaPreemption(t *testing.T) { // delay so that preemption delay of 1 sec expires time.Sleep(1100 * time.Millisecond) - result := partition.tryAllocate() + // call quota preemption on root always + partition.root.TryQuotaPreemption() // delay so that events are sent out time.Sleep(100 * time.Millisecond) @@ -3763,8 +3764,6 @@ func TestUpdateAllocationWithQuotaPreemption(t *testing.T) { } } assert.Equal(t, eventsCount, tt.releasedEvents, "unexpected release events count") - } else { - assert.Equal(t, result.ResultType, objects.Allocated) } leaf.ResetPreemptionTime() partition.removeApplication(appID1) @@ -3940,7 +3939,8 @@ func TestUpdateAllocationWithAskAndQuotaPreemption(t *testing.T) { // delay so that preemption delay of 1 sec expires time.Sleep(1100 * time.Millisecond) - result := partition.tryAllocate() + // call quota preemption on root always + partition.root.TryQuotaPreemption() // delay so that events are sent out time.Sleep(100 * time.Millisecond) @@ -3957,8 +3957,6 @@ func TestUpdateAllocationWithAskAndQuotaPreemption(t *testing.T) { } } assert.Equal(t, eventsCount, tt.releasedEvents, "unexpected release events count") - } else { - assert.Equal(t, result.ResultType, objects.Allocated) } leaf.ResetPreemptionTime() partition.removeApplication(appID1) From 9c38f2186bad8f000af268003251939e10c55892 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Thu, 9 Apr 2026 14:14:30 +0530 Subject: [PATCH 19/26] removed unnecessary time backing --- pkg/scheduler/objects/queue_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index 4ad403063..9a64bb961 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2541,7 +2541,7 @@ func TestTryAcquirePreemption(t *testing.T) { }, parent, false, nil) assert.NilError(t, err) nilMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}) - nilMaxQueue.quotaPreemptionStartTime = time.Now().Add(-time.Second) + nilMaxQueue.quotaPreemptionStartTime = time.Now() testCases := []struct { name string @@ -2574,13 +2574,13 @@ func TestTryAcquirePreemption(t *testing.T) { assert.Assert(t, usageNotMatchingMaxQueue.quotaPreemptionStartTime.IsZero(), "start time should be reset") // start time is also cleared when usage equals max (within-quota side effect check). - usageEqualsMaxQueue.quotaPreemptionStartTime = time.Now().Add(-time.Second) + usageEqualsMaxQueue.quotaPreemptionStartTime = time.Now() assert.Assert(t, !usageEqualsMaxQueue.tryAcquirePreemption(), "preemption must not fire when usage equals max") assert.Assert(t, usageEqualsMaxQueue.quotaPreemptionStartTime.IsZero(), "start time must be cleared when usage equals max") // successful acquisition sets isQuotaPreemptionRunning to true. // re-arm parent: table test called setQuotaPreemptionState(false) which cleared the start time. - parent.quotaPreemptionStartTime = time.Now().Add(-time.Second) + parent.quotaPreemptionStartTime = time.Now() assert.Assert(t, parent.tryAcquirePreemption(), "acquisition should succeed after re-arming") assert.Assert(t, parent.isQuotaPreemptionRunning, "isQuotaPreemptionRunning must be true after acquisition") parent.setQuotaPreemptionState(false) @@ -2588,7 +2588,7 @@ func TestTryAcquirePreemption(t *testing.T) { // after release, start time is cleared → immediate re-acquisition is blocked. assert.Assert(t, !parent.tryAcquirePreemption(), "re-acquisition must fail when start time is cleared after release") // re-arming the start time makes re-acquisition possible again. - parent.quotaPreemptionStartTime = time.Now().Add(-time.Second) + parent.quotaPreemptionStartTime = time.Now() assert.Assert(t, parent.tryAcquirePreemption(), "re-acquisition must succeed after re-arming start time") parent.setQuotaPreemptionState(false) } From b09eb4bc7712d40019302d3b97d4917321942ad6 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Thu, 9 Apr 2026 15:55:36 +0530 Subject: [PATCH 20/26] fixed comments, test cases as per review comments --- pkg/scheduler/objects/queue_test.go | 11 +++-------- pkg/scheduler/objects/quota_preemptor.go | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index 9a64bb961..508fcc780 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2451,16 +2451,11 @@ func TestQuotaPreemptionSettings(t *testing.T) { parent.UpdateQueueProperties(oldMax) // Wait till delay expires to let trigger preemption automatically time.Sleep(parent.quotaPreemptionDelay + 50*time.Millisecond) + assert.Equal(t, parent.isQuotaPreemptionRunning, false, "quotaPreemptionRunning flag should be false initially") triggered := parent.tryAcquirePreemption() assert.Equal(t, triggered, tc.timeChange, "preemption should get trigger for set delay") - // Simulate preemption completing: clears isQuotaPreemptionRunning and quotaPreemptionStartTime. - if triggered { - parent.setQuotaPreemptionState(false) - } - - time.Sleep(50 * time.Millisecond) - - // since preemption settings are reset, preemption should not be triggerred again during the next check + assert.Equal(t, parent.isQuotaPreemptionRunning, triggered, "quotaPreemptionRunning flag should be true if preemption is triggered") + // preemption should not be triggerred again during the next check assert.Equal(t, parent.tryAcquirePreemption(), false) }) } diff --git a/pkg/scheduler/objects/quota_preemptor.go b/pkg/scheduler/objects/quota_preemptor.go index 6b115f015..da050b164 100644 --- a/pkg/scheduler/objects/quota_preemptor.go +++ b/pkg/scheduler/objects/quota_preemptor.go @@ -122,6 +122,7 @@ func (qpc *QuotaPreemptionContext) tryPreemption() { // This assumes that parent queue will not call this for leaf queues if quota preemption is already running for parent queue. // Use tryAcquirePreemption to safely mark the quota preemption running for queue before calling this function. // isQuotaPreemptionRunning is already set atomically by tryAcquirePreemption before this function is called. +// isQuotaPreemptionRunning SHOULD be cleared by the caller (via setQuotaPreemptionState(false)). func (qpc *QuotaPreemptionContext) tryPreemptionInternal() { log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change preemption for leaf queue", zap.String("leaf queue", qpc.queue.GetQueuePath()), @@ -139,7 +140,6 @@ func (qpc *QuotaPreemptionContext) tryPreemptionInternal() { // Preempt the victims qpc.preemptVictims() - // isQuotaPreemptionRunning is cleared by the caller (via defer setQuotaPreemptionState(false)). } // getChildQueuesPreemptableResource Compute leaf queue's preemptable resource distribution from the parent's preemptable resource. From de6051daf98d85c07fa4b2ecf82751a9e39a9d1e Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Thu, 9 Apr 2026 16:17:26 +0530 Subject: [PATCH 21/26] removed redundant test case --- pkg/scheduler/objects/quota_preemptor_test.go | 184 ------------------ 1 file changed, 184 deletions(-) diff --git a/pkg/scheduler/objects/quota_preemptor_test.go b/pkg/scheduler/objects/quota_preemptor_test.go index 316778ef9..d7e17ec8b 100644 --- a/pkg/scheduler/objects/quota_preemptor_test.go +++ b/pkg/scheduler/objects/quota_preemptor_test.go @@ -608,190 +608,6 @@ func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { } } -// TestTryPreemptionInternal tests tryPreemptionInternal which filters, sorts, and preempts victims -// based on the preemptable resource that must be set before calling this function. -// nolint:funlen -func TestTryPreemptionInternal(t *testing.T) { - node := NewNode(&si.NodeInfo{ - NodeID: "node", - Attributes: nil, - SchedulableResource: &si.Resource{ - Resources: map[string]*si.Quantity{"first": {Value: 500}}, - }, - }) - - leaf, err := NewConfiguredQueue(configs.QueueConfig{ - Name: "leaf", - }, nil, false, nil) - assert.NilError(t, err) - - testCases := []struct { - name string - preemptableResource *resources.Resource - maxResource *resources.Resource - guaranteedResource *resources.Resource - victims []*Allocation - expectedPreemptedKeys []string - }{ - { - name: "nil preemptable resource - no victims preempted", - preemptableResource: nil, - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - guaranteedResource: nil, - victims: []*Allocation{ - createVictim(t, "a1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), - }, - expectedPreemptedKeys: []string{}, - }, - { - name: "empty victim list - nothing to preempt", - preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - guaranteedResource: nil, - victims: []*Allocation{}, - expectedPreemptedKeys: []string{}, - }, - { - name: "single victim fits preemptable resource - preempted", - preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - guaranteedResource: nil, - victims: []*Allocation{ - createVictim(t, "b1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), - }, - expectedPreemptedKeys: []string{"b1"}, - }, - { - name: "victim resource exceeds preemptable - skipped", - preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - guaranteedResource: nil, - victims: []*Allocation{ - createVictim(t, "c1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), - }, - expectedPreemptedKeys: []string{}, - }, - { - // After sorting by resource share (smaller first): d2(5/20=0.25) < d3(10/20=0.5) == d1(10/20=0.5). - // d3 and d1 have equal share; sort is stable but the comparator returns true for both - // directions on equal elements, so d3 ends up before d1 in practice. - // Iteration: d2(total=5) kept, d3(total=15, ==preemptable) kept, d1(total=25 > 15) skipped. - name: "multiple victims - only enough to cover preemptable resource preempted", - preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15}), - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}), - guaranteedResource: nil, - victims: []*Allocation{ - createVictim(t, "d1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), - createVictim(t, "d2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})), - createVictim(t, "d3", node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), - }, - expectedPreemptedKeys: []string{"d2", "d3"}, - }, - { - // e1 has a required node and is excluded by filterAllocations; only e2 passes the filter. - name: "required-node allocations skipped during filter", - preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}), - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - guaranteedResource: nil, - victims: func() []*Allocation { - a := createVictim(t, "e1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) - a.SetRequiredNode("node") - b := createVictim(t, "e2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) - return []*Allocation{a, b} - }(), - expectedPreemptedKeys: []string{"e2"}, - }, - { - // f1 is already preempted before the call so it is excluded by filterAllocations; - // f2 is newly preempted by tryPreemptionInternal. Both are reported as preempted - // because f1 retains its pre-existing preempted state throughout. - name: "already preempted allocations skipped during filter", - preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}), - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - guaranteedResource: nil, - victims: func() []*Allocation { - a := createVictim(t, "f1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) - err := a.MarkPreempted() - assert.NilError(t, err) - b := createVictim(t, "f2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) - return []*Allocation{a, b} - }(), - expectedPreemptedKeys: []string{"f1", "f2"}, - }, - { - // g1 is released before the call so it is excluded by filterAllocations; - // only g2 is preempted by tryPreemptionInternal. - name: "already released allocations skipped during filter", - preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}), - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - guaranteedResource: nil, - victims: func() []*Allocation { - a := createVictim(t, "g1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) - err := a.SetReleased(true) - assert.NilError(t, err) - b := createVictim(t, "g2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) - return []*Allocation{a, b} - }(), - expectedPreemptedKeys: []string{"g2"}, - }, - { - // Best-effort mode (guaranteed==max): all victims have distinct shares, - // sorted ascending: h4(2/10=0.2) < h1(3/10=0.3) < h3(5/10=0.5) < h2(6/10=0.6). - // Iteration: h4(total=2) kept, h1(total=5) kept, h3(total=10, ==preemptable) kept, - // h2(total=16 > 10) skipped. - name: "best-effort mode: guaranteed equals max - preempt as close as possible", - preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - guaranteedResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - victims: []*Allocation{ - createVictim(t, "h1", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3})), - createVictim(t, "h2", node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 6})), - createVictim(t, "h3", node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})), - createVictim(t, "h4", node, 1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})), - }, - expectedPreemptedKeys: []string{"h4", "h1", "h3"}, - }, - { - name: "no matching resource type in preemptable - no victims filtered", - preemptableResource: resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 100}), - maxResource: resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), - guaranteedResource: nil, - victims: []*Allocation{ - createVictim(t, "i1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})), - }, - expectedPreemptedKeys: []string{}, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - leaf.maxResource = tc.maxResource - leaf.guaranteedResource = tc.guaranteedResource - asks := tc.victims - assignAllocationsToQueue(asks, leaf) - - preemptor := NewQuotaPreemptor(leaf) - preemptor.preemptableResource = tc.preemptableResource - - preemptor.tryPreemptionInternal() - - preemptedKeys := make([]string, 0) - for _, a := range asks { - if a.IsPreempted() { - preemptedKeys = append(preemptedKeys, a.GetAllocationKey()) - } - } - sort.Strings(preemptedKeys) - expectedKeys := append([]string{}, tc.expectedPreemptedKeys...) - sort.Strings(expectedKeys) - assert.DeepEqual(t, preemptedKeys, expectedKeys) - - removeAllocationAsks(node, asks) - resetQueue(leaf) - }) - } -} - func assertQuotaPreemptionEvent(t *testing.T, victims int, results string, records []*si.EventRecord) { recordsLen := len(records) if victims > 0 { From fdcb6894d98c8389c1f688615e49b540fe174d01 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Thu, 9 Apr 2026 17:38:58 +0530 Subject: [PATCH 22/26] improved existing test cases with asserting on exact allocations preempted --- pkg/scheduler/objects/quota_preemptor_test.go | 134 ++++++++++-------- 1 file changed, 75 insertions(+), 59 deletions(-) diff --git a/pkg/scheduler/objects/quota_preemptor_test.go b/pkg/scheduler/objects/quota_preemptor_test.go index d7e17ec8b..ad40db850 100644 --- a/pkg/scheduler/objects/quota_preemptor_test.go +++ b/pkg/scheduler/objects/quota_preemptor_test.go @@ -163,7 +163,8 @@ func TestQuotaChangeTryPreemption(t *testing.T) { shortfallVictims := make([]*Allocation, 0) suitableVictims = append(suitableVictims, createVictim(t, "ask1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) - suitableVictims = append(suitableVictims, createVictim(t, "ask2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) + // ask2 uses {first:8} (smaller than ask1's {first:10}) so it is deterministically sorted first and preempted + suitableVictims = append(suitableVictims, createVictim(t, "ask2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))) oversizedVictims = append(oversizedVictims, createVictim(t, "ask21", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}))) oversizedVictims = append(oversizedVictims, createVictim(t, "ask3", node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))) @@ -174,7 +175,7 @@ func TestQuotaChangeTryPreemption(t *testing.T) { shortfallVictims = append(shortfallVictims, createVictim(t, "ask5", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))) shortfallVictims = append(shortfallVictims, createVictim(t, "ask51", node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 6}))) - shortfallVictims = append(shortfallVictims, createVictim(t, "ask52", node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))) + shortfallVictims = append(shortfallVictims, createVictim(t, "ask52", node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3}))) shortfallVictims = append(shortfallVictims, createVictim(t, "ask53", node, 1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 4}))) notSuitableVictims = append(notSuitableVictims, createVictim(t, "ask6", node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))) @@ -184,6 +185,11 @@ func TestQuotaChangeTryPreemption(t *testing.T) { preemptable := newMax guaranteed := preemptable lowerGuaranteed := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) + // ask2={first:8} (< ask1={first:10}) ensures ask2 has a smaller resource share and is deterministically sorted first + suitablePreemptable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}) + // ask52={first:3} (< ask53={first:4} < ask5={first:5}) ensures a unique sort order for best-effort cases + bestEffortPreemptable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}) + bestEffortClaimedResource := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 7}) testCases := []struct { name string queue *Queue @@ -194,16 +200,16 @@ func TestQuotaChangeTryPreemption(t *testing.T) { victims []*Allocation claimedResource *resources.Resource totalExpectedVictims int - expectedVictimsCount int + preemptedKeys []string }{ - {"no victims available", leaf, oldMax, newMax, nil, preemptable, []*Allocation{}, nil, 0, 0}, - {"suitable victims available", leaf, oldMax, newMax, nil, preemptable, suitableVictims, preemptable, 2, 1}, - {"victims available but none is suitable ", leaf, oldMax, newMax, nil, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}), notSuitableVictims, nil, 1, 0}, - {"skip over sized victims", leaf, oldMax, newMax, nil, preemptable, oversizedVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}), 2, 1}, - {"guaranteed not set", leaf, oldMax, newMax, nil, preemptable, overflowVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), 3, 1}, - {"guaranteed set but lower than max", leaf, oldMax, newMax, lowerGuaranteed, preemptable, overflowVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), 3, 1}, - {"best effort - guaranteed set and equals max", leaf, oldMax, newMax, guaranteed, preemptable, shortfallVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}), 4, 2}, - {"best effort - guaranteed set, max not set earlier but now", leaf, nil, newMax, guaranteed, preemptable, shortfallVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}), 4, 2}, + {"no victims available", leaf, oldMax, newMax, nil, preemptable, []*Allocation{}, nil, 0, []string{}}, + {"suitable victims available", leaf, oldMax, newMax, nil, suitablePreemptable, suitableVictims, suitablePreemptable, 2, []string{"ask2"}}, + {"victims available but none is suitable ", leaf, oldMax, newMax, nil, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}), notSuitableVictims, nil, 1, []string{}}, + {"skip over sized victims", leaf, oldMax, newMax, nil, preemptable, oversizedVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}), 2, []string{"ask21"}}, + {"guaranteed not set", leaf, oldMax, newMax, nil, preemptable, overflowVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), 3, []string{"ask4"}}, + {"guaranteed set but lower than max", leaf, oldMax, newMax, lowerGuaranteed, preemptable, overflowVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), 3, []string{"ask4"}}, + {"best effort - guaranteed set and equals max", leaf, oldMax, newMax, guaranteed, bestEffortPreemptable, shortfallVictims, bestEffortClaimedResource, 4, []string{"ask52", "ask53"}}, + {"best effort - guaranteed set, max not set earlier but now", leaf, nil, newMax, guaranteed, bestEffortPreemptable, shortfallVictims, bestEffortClaimedResource, 4, []string{"ask52", "ask53"}}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -217,16 +223,10 @@ func TestQuotaChangeTryPreemption(t *testing.T) { preemptor.allocations = asks preemptor.tryPreemption() assert.Equal(t, len(preemptor.allocations), tc.totalExpectedVictims) - var victimsCount int - for _, a := range asks { - if a.IsPreempted() { - victimsCount++ - } - } - assert.Equal(t, victimsCount, tc.expectedVictimsCount) + assertPreemptedAllocationKeys(t, asks, tc.preemptedKeys) time.Sleep(500 * time.Millisecond) - assertQuotaPreemptionEvent(t, tc.totalExpectedVictims, "Quota Preemption results summary: preemptable resources: "+tc.preemptableResource.String()+", claimed resources: "+tc.claimedResource.String()+", selected victims: "+strconv.Itoa(tc.totalExpectedVictims)+", preempted victims: "+strconv.Itoa(tc.expectedVictimsCount), eventSystem.Store.CollectEvents()) + assertQuotaPreemptionEvent(t, tc.totalExpectedVictims, "Quota Preemption results summary: preemptable resources: "+tc.preemptableResource.String()+", claimed resources: "+tc.claimedResource.String()+", selected victims: "+strconv.Itoa(tc.totalExpectedVictims)+", preempted victims: "+strconv.Itoa(len(tc.preemptedKeys)), eventSystem.Store.CollectEvents()) removeAllocationAsks(node, asks) resetQueue(leaf) @@ -255,7 +255,8 @@ func TestQuotaChangeTryPreemptionWithDifferentResTypes(t *testing.T) { oversizedVictims := make([]*Allocation, 0) suitableVictims = append(suitableVictims, createVictim(t, "ask1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 10}))) - suitableVictims = append(suitableVictims, createVictim(t, "ask2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 10}))) + // ask2 uses {first:9, second:9} (smaller than ask1 in both dimensions) to ensure deterministic sort order + suitableVictims = append(suitableVictims, createVictim(t, "ask2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9, "second": 9}))) oversizedVictims = append(oversizedVictims, createVictim(t, "ask21", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9, "second": 10}))) oversizedVictims = append(oversizedVictims, createVictim(t, "ask3", node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11, "second": 10}))) @@ -274,7 +275,7 @@ func TestQuotaChangeTryPreemptionWithDifferentResTypes(t *testing.T) { type test struct { allocs []*Allocation totalExpectedVictims int - expectedVictimsCount int + preemptedKeys []string } testCases := []struct { @@ -287,37 +288,37 @@ func TestQuotaChangeTryPreemptionWithDifferentResTypes(t *testing.T) { }{ {"oversized victims available with extra resource types", leaf, oldMax, newMax, nil, []test{ - {oversizedVictims, 2, 1}, + {oversizedVictims, 2, []string{"ask21"}}, }, }, {"suitable victims available with extra resource types other than defined in max", leaf, oldMax, newMax, nil, []test{ - {suitableVictims, 2, 1}, + {suitableVictims, 2, []string{"ask2"}}, }, }, {"suitable victims available with extra resource types other than defined in max", leaf, nil, newMax, nil, []test{ - {suitableVictims, 2, 1}, + {suitableVictims, 2, []string{"ask2"}}, }, }, {"suitable victims available with extra resource types other than defined in guaranteed", leaf, nil, newMax, lowerGuaranteed, []test{ - {suitableVictims, 2, 1}, + {suitableVictims, 2, []string{"ask2"}}, }, }, {"suitable victims available - different res types, adding new res type in max", leaf, oldMax, newMaxWithNewResTypes, nil, []test{ - {suitableVictims, 2, 1}, + {suitableVictims, 2, []string{"ask2"}}, }, }, {"suitable victims available - different res types, removing existing res type from max", leaf, oldMax, newMaxWithRemovedResTypes, nil, []test{ - {suitableVictims, 2, 1}, + {suitableVictims, 2, []string{"ask2"}}, }, }, {"overflow victims available with extra resource types other than defined in guaranteed and vice versa", leaf, oldMax, newMax, lowerGuaranteedWithNewResTypes, []test{ - {overflowVictims, 3, 1}, + {overflowVictims, 3, []string{"ask4"}}, }, }, } @@ -334,13 +335,7 @@ func TestQuotaChangeTryPreemptionWithDifferentResTypes(t *testing.T) { preemptor := NewQuotaPreemptor(tc.queue) preemptor.tryPreemption() assert.Equal(t, len(preemptor.allocations), v.totalExpectedVictims) - var victimsCount int - for _, a := range asks { - if a.IsPreempted() { - victimsCount++ - } - } - assert.Equal(t, victimsCount, v.expectedVictimsCount) + assertPreemptedAllocationKeys(t, asks, v.preemptedKeys) removeAllocationAsks(node, asks) resetQueue(leaf) } @@ -513,9 +508,10 @@ func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { leaf111WithParentG, leaf12WithParentG, leaf211WithParentG, leaf22WithParentG, leaf4WithParentG := createQueueSetups(t, parent2, configs.Resources{}, configs.Resources{}) var suitableVictims, notSuitableVictims, suitableVictims1, suitableVictims2, suitableVictims3 []*Allocation - suitableVictims = append(suitableVictims, createVictim(t, "ask1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) + // ask1({first:9}), ask2({first:10}), ask3({first:11}) — distinct sizes summing to 30 for deterministic sort (smaller preempted first) + suitableVictims = append(suitableVictims, createVictim(t, "ask1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}))) suitableVictims = append(suitableVictims, createVictim(t, "ask2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) - suitableVictims = append(suitableVictims, createVictim(t, "ask3", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) + suitableVictims = append(suitableVictims, createVictim(t, "ask3", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))) notSuitableVictims = append(notSuitableVictims, createVictim(t, "ask3_1", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))) leafGVictims, leafGNotSuitableVictims, leafVictims, leafVictimsWithParentG := make(map[*Queue][]*Allocation), make(map[*Queue][]*Allocation), make(map[*Queue][]*Allocation), make(map[*Queue][]*Allocation) @@ -523,22 +519,25 @@ func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { leafGNotSuitableVictims[leaf111G] = notSuitableVictims leafVictims[leaf111] = suitableVictims - suitableVictims1 = append(suitableVictims1, createVictim(t, "ask4", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) - suitableVictims1 = append(suitableVictims1, createVictim(t, "ask5", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) + // ask4({first:9}), ask5({first:11}) — distinct sizes summing to 20 for deterministic sort + suitableVictims1 = append(suitableVictims1, createVictim(t, "ask4", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}))) + suitableVictims1 = append(suitableVictims1, createVictim(t, "ask5", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))) leafGVictims[leaf12G] = suitableVictims1 leafVictims[leaf12] = suitableVictims1 - suitableVictims2 = append(suitableVictims2, createVictim(t, "ask6", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) - suitableVictims2 = append(suitableVictims2, createVictim(t, "ask7", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) + // ask6({first:8})..ask10({first:12}) — distinct sizes summing to 50 for deterministic sort + suitableVictims2 = append(suitableVictims2, createVictim(t, "ask6", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))) + suitableVictims2 = append(suitableVictims2, createVictim(t, "ask7", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}))) suitableVictims2 = append(suitableVictims2, createVictim(t, "ask8", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) - suitableVictims2 = append(suitableVictims2, createVictim(t, "ask9", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) - suitableVictims2 = append(suitableVictims2, createVictim(t, "ask10", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) + suitableVictims2 = append(suitableVictims2, createVictim(t, "ask9", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))) + suitableVictims2 = append(suitableVictims2, createVictim(t, "ask10", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 12}))) leafGVictims[leaf211G] = suitableVictims2 leafVictims[leaf211] = suitableVictims2 - suitableVictims3 = append(suitableVictims3, createVictim(t, "ask11", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) + // ask11({first:9}), ask12({first:10}), ask13({first:11}) — distinct sizes summing to 30 for deterministic sort + suitableVictims3 = append(suitableVictims3, createVictim(t, "ask11", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}))) suitableVictims3 = append(suitableVictims3, createVictim(t, "ask12", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) - suitableVictims3 = append(suitableVictims3, createVictim(t, "ask13", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))) + suitableVictims3 = append(suitableVictims3, createVictim(t, "ask13", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))) leafGVictims[leaf22G] = suitableVictims3 leafVictims[leaf22] = suitableVictims3 @@ -567,13 +566,16 @@ func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { victims map[*Queue][]*Allocation claimedResources *resources.Resource totalVictims int - expectedVictims int + preemptedKeys []string }{ - {"Guaranteed set on one side of queue hierarchy - suitable victims available", parent, oldMax, newMax, leafGVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 110}), 13, 11}, - {"Guaranteed set on one side of queue hierarchy - victims available but none suitable", parent, oldMax, newMax, leafGNotSuitableVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 110}), 1, 0}, - {"Guaranteed set not set on any queue - suitable victims available", parent1, oldMax, newMax, leafVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 90}), 14, 9}, - {"Guaranteed set only on parent queue but not on any child queues underneath - suitable victims available", parent2, oldMax1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}), leafVictimsWithParentG, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 60}), 10, 5}, - {"Guaranteed set only on parent queue but not on any child queues underneath - suitable victims available", parent2, oldMax1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), leafVictimsWithParentG, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 60}), 10, 5}, + // claimed = ask1(9)+ask2(10) + ask4(9) + ask6(8)+ask7(9)+ask8(10)+ask9(11)+ask10(12) + ask11(9)+ask12(10)+ask13(11) = 19+9+50+30 = 108 + {"Guaranteed set on one side of queue hierarchy - suitable victims available", parent, oldMax, newMax, leafGVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 108}), 13, []string{"ask1", "ask2", "ask4", "ask6", "ask7", "ask8", "ask9", "ask10", "ask11", "ask12", "ask13"}}, + {"Guaranteed set on one side of queue hierarchy - victims available but none suitable", parent, oldMax, newMax, leafGNotSuitableVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 110}), 1, []string{}}, + // claimed = ask1(9)+ask2(10) + ask4(9) + ask6(8)+ask7(9)+ask8(10)+ask9(11) + ask11(9)+ask12(10) = 19+9+38+19 = 85 + {"Guaranteed set not set on any queue - suitable victims available", parent1, oldMax, newMax, leafVictims, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 85}), 14, []string{"ask1", "ask2", "ask4", "ask6", "ask7", "ask8", "ask9", "ask11", "ask12"}}, + // each leaf preempts the smaller victim (ask_even = {first:12}), claimed = 5 × 12 = 60 + {"Guaranteed set only on parent queue but not on any child queues underneath - suitable victims available", parent2, oldMax1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}), leafVictimsWithParentG, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 60}), 10, []string{"ask16", "ask18", "ask20", "ask22", "ask24"}}, + {"Guaranteed set only on parent queue but not on any child queues underneath - suitable victims available", parent2, oldMax1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), leafVictimsWithParentG, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 60}), 10, []string{"ask16", "ask18", "ask20", "ask22", "ask24"}}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -586,17 +588,13 @@ func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { preemptableResource := resources.SubOnlyExisting(tc.newMax, tc.queue.allocatedResource) preemptor := NewQuotaPreemptor(tc.queue) preemptor.tryPreemption() - victimsCount := 0 + allAllocs := make([]*Allocation, 0) for _, asks := range tc.victims { - for _, a := range asks { - if a.IsPreempted() { - victimsCount++ - } - } + allAllocs = append(allAllocs, asks...) } - assert.Equal(t, victimsCount, tc.expectedVictims) + assertPreemptedAllocationKeys(t, allAllocs, tc.preemptedKeys) time.Sleep(500 * time.Millisecond) - assertQuotaPreemptionEvent(t, tc.expectedVictims, "Quota Preemption results summary: preemptable resources: "+resources.Multiply(preemptableResource, -1).String()+", claimed resources: "+tc.claimedResources.String()+", selected victims: "+strconv.Itoa(tc.totalVictims)+", preempted victims: "+strconv.Itoa(tc.expectedVictims), eventSystem.Store.CollectEvents()) + assertQuotaPreemptionEvent(t, len(tc.preemptedKeys), "Quota Preemption results summary: preemptable resources: "+resources.Multiply(preemptableResource, -1).String()+", claimed resources: "+tc.claimedResources.String()+", selected victims: "+strconv.Itoa(tc.totalVictims)+", preempted victims: "+strconv.Itoa(len(tc.preemptedKeys)), eventSystem.Store.CollectEvents()) for _, v := range tc.victims { removeAllocationAsks(node, v) } @@ -608,6 +606,24 @@ func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { } } +func assertPreemptedAllocationKeys(t *testing.T, allocations []*Allocation, expectedKeys []string) { + t.Helper() + preemptedKeys := make([]string, 0) + for _, a := range allocations { + if a.IsPreempted() { + preemptedKeys = append(preemptedKeys, a.GetAllocationKey()) + } + } + sort.Strings(preemptedKeys) + expected := make([]string, len(expectedKeys)) + copy(expected, expectedKeys) + sort.Strings(expected) + assert.Equal(t, len(preemptedKeys), len(expected), "preempted allocation keys count mismatch") + for i := range preemptedKeys { + assert.Equal(t, preemptedKeys[i], expected[i], "unexpected preempted allocation key") + } +} + func assertQuotaPreemptionEvent(t *testing.T, victims int, results string, records []*si.EventRecord) { recordsLen := len(records) if victims > 0 { From 222d1dc69e3addec5b57b2d703ecdede93f0a427 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Fri, 10 Apr 2026 09:33:26 +0530 Subject: [PATCH 23/26] removed redundant test cases --- pkg/scheduler/scheduler_test.go | 78 --------------------------------- 1 file changed, 78 deletions(-) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 3c9ee6404..3f2930f75 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -181,81 +181,3 @@ func TestTriggerQuotaPreemption_NoPartitions(t *testing.T) { // clusterContext has no partitions; call should be a no-op scheduler.triggerQuotaPreemption() } - -// TestTriggerQuotaPreemption_UsageBelowMax verifies that triggerQuotaPreemption does not trigger -// preemption when the queue usage is within the configured max resource limit. -func TestTriggerQuotaPreemption_UsageBelowMax(t *testing.T) { - scheduler := NewScheduler() - partition := createQuotaPreemptionQueuesNodes(t) - scheduler.clusterContext.partitions["test"] = partition - - app, testHandler := newApplicationWithHandler(appID1, "default", "root.leaf") - err := partition.AddApplication(app) - assert.NilError(t, err) - - // allocate well within the configured max (max is vcore:10, allocate vcore:4) - for i := 1; i <= 2; i++ { - res, resErr := resources.NewResourceFromConf(map[string]string{"vcore": "2"}) - assert.NilError(t, resErr) - alloc := si.Allocation{ - AllocationKey: "ask-key-" + strconv.Itoa(i), - ApplicationID: appID1, - NodeID: nodeID1, - ResourcePerAlloc: res.ToProto(), - } - _, allocCreated, allocErr := partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc)) - assert.NilError(t, allocErr) - assert.Check(t, allocCreated) - } - - time.Sleep(1100 * time.Millisecond) - scheduler.triggerQuotaPreemption() - time.Sleep(300 * time.Millisecond) - - for _, event := range testHandler.GetEvents() { - if _, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { - t.Fatal("unexpected release event when usage is below max") - } - } -} - -// TestTriggerQuotaPreemption_PreemptionDelayNotElapsed verifies that no preemption occurs when -// the queue is over quota but the configured preemption delay has not elapsed yet. -func TestTriggerQuotaPreemption_PreemptionDelayNotElapsed(t *testing.T) { - scheduler := NewScheduler() - partition := createQuotaPreemptionQueuesNodes(t) - scheduler.clusterContext.partitions["test"] = partition - - app, testHandler := newApplicationWithHandler(appID1, "default", "root.leaf") - err := partition.AddApplication(app) - assert.NilError(t, err) - - maxAllocs := 5 - for i := 1; i <= maxAllocs; i++ { - res, resErr := resources.NewResourceFromConf(map[string]string{"vcore": "2"}) - assert.NilError(t, resErr) - alloc := si.Allocation{ - AllocationKey: "ask-key-" + strconv.Itoa(i), - ApplicationID: appID1, - NodeID: nodeID1, - ResourcePerAlloc: res.ToProto(), - } - _, allocCreated, allocErr := partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc)) - assert.NilError(t, allocErr) - assert.Check(t, allocCreated) - } - - leaf := partition.GetQueue("root.leaf") - assert.Assert(t, leaf != nil, "leaf queue not found") - leaf.SetMaxResource(resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 5})) - - // trigger immediately — the 1s delay has NOT elapsed - scheduler.triggerQuotaPreemption() - time.Sleep(200 * time.Millisecond) - - for _, event := range testHandler.GetEvents() { - if _, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { - t.Fatal("unexpected release event before preemption delay elapsed") - } - } -} From e4a9f4b8c43725f7d2fd7210ec864283ff4abfce Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Mon, 13 Apr 2026 12:17:51 +0530 Subject: [PATCH 24/26] changed log level to debug --- pkg/scheduler/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 7759215d0..ece83dacb 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -177,7 +177,7 @@ func (s *Scheduler) registerActivity() { func (s *Scheduler) triggerQuotaPreemption() { for _, psc := range s.clusterContext.GetPartitionMapClone() { if psc.IsQuotaPreemptionEnabled() { - log.Log(log.Scheduler).Info("Triggering quota preemption", + log.Log(log.Scheduler).Debug("Triggering quota preemption", zap.String("partition", psc.Name)) psc.root.TryQuotaPreemption() } From f85c3028bfa0ac31f8138bc5cda17fcf10809cc8 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Mon, 13 Apr 2026 16:07:34 +0530 Subject: [PATCH 25/26] addressed review comments --- pkg/scheduler/objects/queue.go | 1 + pkg/scheduler/objects/queue_test.go | 2 +- pkg/scheduler/scheduler_test.go | 148 +++++++++++++--------------- 3 files changed, 69 insertions(+), 82 deletions(-) diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 85446e74e..9fac11843 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -503,6 +503,7 @@ func (sq *Queue) ResetPreemptionTime() { defer sq.Unlock() sq.quotaPreemptionStartTime = time.Time{} sq.quotaPreemptionDelay = 0 + sq.isQuotaPreemptionRunning = false } // tryAcquirePreemption atomically checks all preconditions for quota preemption and marks the queue as running. diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index 508fcc780..4718ae293 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2559,7 +2559,7 @@ func TestTryAcquirePreemption(t *testing.T) { assert.Equal(t, result, tc.preconditionResult) // Reset the flag if acquired so it doesn't affect other test cases. if result { - tc.queue.setQuotaPreemptionState(false) + tc.queue.ResetPreemptionTime() } }) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 3f2930f75..2aacb5666 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -96,88 +96,74 @@ func TestInspectOutstandingRequests(t *testing.T) { assert.Assert(t, resources.IsZero(totalResources), "total resource is not zero: %v", totalResources) } -// TestTriggerQuotaPreemption_QuotaPreemptionDisabled verifies that triggerQuotaPreemption is a no-op -// when quota preemption is disabled on the partition. -func TestTriggerQuotaPreemption_QuotaPreemptionDisabled(t *testing.T) { - scheduler := NewScheduler() - // newBasePartition creates a partition with quota preemption disabled (default) - partition, err := newBasePartition() - assert.NilError(t, err, "unable to create partition: %v", err) - scheduler.clusterContext.partitions["test"] = partition - - _, testHandler := newApplicationWithHandler(appID1, "default", "root.default") - - // quota preemption is not enabled; triggerQuotaPreemption should be a no-op - scheduler.triggerQuotaPreemption() - time.Sleep(200 * time.Millisecond) - - // no release events expected - for _, event := range testHandler.GetEvents() { - if _, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { - t.Fatal("unexpected release event when quota preemption is disabled") - } +// TestTriggerQuotaPreemption verifies the behavior of triggerQuotaPreemption in two scenarios: +// disabled (no-op) and enabled (releases fired after the preemption delay elapses). +// The only difference between the two cases is the PartitionPreemptionConfig.QuotaPreemptionEnabled +// flag; all other setup is identical. +func TestTriggerQuotaPreemption(t *testing.T) { + testCases := []struct { + name string + quotaPreemptionEnabled bool + expectedReleaseCount int // 0 means none; >0 means at-least-one + }{ + {"quota preemption disabled", false, 0}, + {"quota preemption enabled", true, 1}, } -} - -// TestTriggerQuotaPreemption_QuotaPreemptionEnabled verifies that triggerQuotaPreemption fires -// preemption and produces release events when the queue is over max and the preemption delay has elapsed. -func TestTriggerQuotaPreemption_QuotaPreemptionEnabled(t *testing.T) { - scheduler := NewScheduler() - partition := createQuotaPreemptionQueuesNodes(t) - scheduler.clusterContext.partitions["test"] = partition - app, testHandler := newApplicationWithHandler(appID1, "default", "root.leaf") - err := partition.AddApplication(app) - assert.NilError(t, err) - - maxAllocs := 5 - for i := 1; i <= maxAllocs; i++ { - res, resErr := resources.NewResourceFromConf(map[string]string{"vcore": "2"}) - assert.NilError(t, resErr) - alloc := si.Allocation{ - AllocationKey: "ask-key-" + strconv.Itoa(i), - ApplicationID: appID1, - NodeID: nodeID1, - ResourcePerAlloc: res.ToProto(), - } - _, allocCreated, allocErr := partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc)) - assert.NilError(t, allocErr) - assert.Check(t, allocCreated, "alloc should have been created") - } - - // lower the max so the queue is now over quota — use updateQueues so ApplyConf calls setPreemptionTime - root := partition.GetQueue("root") - assert.Assert(t, root != nil, "root queue not found") - newLeafConf := []configs.QueueConfig{ - createLeafQueueConfig( - map[string]string{"memory": "10", "vcore": "5"}, - map[string]string{configs.QuotaPreemptionDelay: "1s"}, - ), + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + scheduler := NewScheduler() + partition := createQuotaPreemptionQueuesNodes(t) + // override the partition-level flag to match the test case + partition.quotaPreemptionEnabled = tc.quotaPreemptionEnabled + scheduler.clusterContext.partitions["test"] = partition + + app, testHandler := newApplicationWithHandler(appID1, "default", "root.leaf") + err := partition.AddApplication(app) + assert.NilError(t, err) + + // add allocations so queue usage is above its limit + for i := 1; i <= 5; i++ { + res, resErr := resources.NewResourceFromConf(map[string]string{"vcore": "2"}) + assert.NilError(t, resErr) + alloc := si.Allocation{ + AllocationKey: "ask-key-" + strconv.Itoa(i), + ApplicationID: appID1, + NodeID: nodeID1, + ResourcePerAlloc: res.ToProto(), + } + _, allocCreated, allocErr := partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc)) + assert.NilError(t, allocErr) + assert.Check(t, allocCreated, "alloc should have been created") + } + + // lower the max so the queue is now over quota; delay is always set + root := partition.GetQueue("root") + assert.Assert(t, root != nil, "root queue not found") + newLeafConf := []configs.QueueConfig{ + createLeafQueueConfig( + map[string]string{"memory": "10", "vcore": "5"}, + map[string]string{configs.QuotaPreemptionDelay: "1s"}, + ), + } + err = partition.updateQueues(newLeafConf, root) + assert.NilError(t, err, "failed to update queue config") + + // wait for the preemption delay to elapse (no-op for disabled case) + time.Sleep(1100 * time.Millisecond) + + scheduler.triggerQuotaPreemption() + + // wait for any async preemption goroutine to complete + time.Sleep(300 * time.Millisecond) + + releaseEventCount := 0 + for _, event := range testHandler.GetEvents() { + if _, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { + releaseEventCount++ + } + } + assert.Equal(t, releaseEventCount, tc.expectedReleaseCount, "expected release event does not match actual count") + }) } - err = partition.updateQueues(newLeafConf, root) - assert.NilError(t, err, "failed to update queue config") - - // wait for the preemption delay (configured as 1s) to elapse - time.Sleep(1100 * time.Millisecond) - - scheduler.triggerQuotaPreemption() - - // wait for the async preemption goroutine to complete and events to propagate - time.Sleep(300 * time.Millisecond) - - releaseEventCount := 0 - for _, event := range testHandler.GetEvents() { - if _, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { - releaseEventCount++ - } - } - assert.Check(t, releaseEventCount > 0, "expected at least one release event from quota preemption, got 0") -} - -// TestTriggerQuotaPreemption_NoPartitions verifies that triggerQuotaPreemption handles an empty -// partition map cleanly without panicking. -func TestTriggerQuotaPreemption_NoPartitions(t *testing.T) { - scheduler := NewScheduler() - // clusterContext has no partitions; call should be a no-op - scheduler.triggerQuotaPreemption() } From da1a69b336fa8758358df1ed7ee19b7b78b04280 Mon Sep 17 00:00:00 2001 From: Aditya Maheshwari Date: Mon, 13 Apr 2026 16:29:15 +0530 Subject: [PATCH 26/26] fixed formatting --- pkg/scheduler/scheduler_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 2aacb5666..19eb7ecfe 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -102,9 +102,9 @@ func TestInspectOutstandingRequests(t *testing.T) { // flag; all other setup is identical. func TestTriggerQuotaPreemption(t *testing.T) { testCases := []struct { - name string - quotaPreemptionEnabled bool - expectedReleaseCount int // 0 means none; >0 means at-least-one + name string + quotaPreemptionEnabled bool + expectedReleaseCount int // 0 means none; >0 means at-least-one }{ {"quota preemption disabled", false, 0}, {"quota preemption enabled", true, 1},