Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cd31855
moved quota preemption out from tryallocate
adityadtu5 Apr 1, 2026
979b90c
running quota preemption in its own cycle
adityadtu5 Apr 1, 2026
035d2e0
fixed return type
adityadtu5 Apr 1, 2026
1defa06
formatting
adityadtu5 Apr 1, 2026
59203b5
fixed recursive calls and linting issue
adityadtu5 Apr 1, 2026
b7de992
addressed some initial review comments
adityadtu5 Apr 6, 2026
aeb44b2
addressed review comments
adityadtu5 Apr 6, 2026
bbec286
addressed review comments
adityadtu5 Apr 7, 2026
dc22934
fixed race conditions
adityadtu5 Apr 7, 2026
7406f1b
fixed formatting
adityadtu5 Apr 7, 2026
6e8bf47
fixed test cases
adityadtu5 Apr 7, 2026
76b8c30
added test cases
adityadtu5 Apr 7, 2026
275ee62
removed unnecessary case
adityadtu5 Apr 8, 2026
e10f3b1
added test case for trypreemptioninternal
adityadtu5 Apr 8, 2026
1c8d875
Merge branch 'master' into YUNIKORN-3249
adityadtu5 Apr 8, 2026
433b98f
fixed formatting
adityadtu5 Apr 8, 2026
b0cdc39
added test cases for trigger preemption
adityadtu5 Apr 8, 2026
47a68fb
addressed minor comments
adityadtu5 Apr 9, 2026
dfcd67a
fixed broken tests
adityadtu5 Apr 9, 2026
9c38f21
removed unnecessary time backing
adityadtu5 Apr 9, 2026
b09eb4b
fixed comments, test cases as per review comments
adityadtu5 Apr 9, 2026
de6051d
removed redundant test case
adityadtu5 Apr 9, 2026
fdcb689
improved existing test cases with asserting on exact allocations pree…
adityadtu5 Apr 9, 2026
222d1dc
removed redundant test cases
adityadtu5 Apr 10, 2026
e4a9f4b
changed log level to debug
adityadtu5 Apr 13, 2026
f85c302
addressed review comments
adityadtu5 Apr 13, 2026
da1a69b
fixed formatting
adityadtu5 Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.vscode
.DS_Store
/tools/
/build/
Expand Down
71 changes: 39 additions & 32 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,29 +503,32 @@ func (sq *Queue) ResetPreemptionTime() {
defer sq.Unlock()
sq.quotaPreemptionStartTime = time.Time{}
sq.quotaPreemptionDelay = 0
sq.isQuotaPreemptionRunning = false
}

// shouldTriggerPreemption returns true if quota preemption should be triggered based on the settings and the
// current time.
func (sq *Queue) shouldTriggerPreemption() bool {
sq.RLock()
defer sq.RUnlock()
// tryAcquirePreemption atomically checks all preconditions for quota preemption and marks the queue as running.
// This prevents concurrent goroutines from both passing the check before either sets the flag (TOCTOU).
// Returns true only if all conditions are met and the flag was successfully acquired.
// The caller MUST call setQuotaPreemptionState(false) when done.
func (sq *Queue) tryAcquirePreemption() bool {
sq.Lock()
defer sq.Unlock()
// dynamic queues do not support quota preemption
if !sq.isManaged {
return false
}
// already in progress do not run again
if sq.isQuotaPreemptionRunning {
// already in progress: do not run again
if !sq.isManaged || sq.isQuotaPreemptionRunning {
return false
}
// usage is below max: no need to trigger. Happens if the queue drops below the new max when pods stop.
// Should clean up, but we have just a read lock...
if sq.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(sq.allocatedResource) {
sq.quotaPreemptionStartTime = time.Time{}
return false
}
// trigger if the time is right
return !sq.quotaPreemptionStartTime.IsZero() && time.Now().After(sq.quotaPreemptionStartTime)
if sq.quotaPreemptionStartTime.IsZero() || time.Now().Before(sq.quotaPreemptionStartTime) {
return false
}
sq.isQuotaPreemptionRunning = true
return true
}

// setQuotaPreemptionState set or clear the running state for quota preemption. When done the start time is also cleared
Expand Down Expand Up @@ -1658,25 +1661,7 @@ func (sq *Queue) canRunApp(appID string) bool {
// resources are skipped.
// Applications are sorted based on the application sortPolicy. Applications without pending resources are skipped.
// Lock free call this all locks are taken when needed in called functions
func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() NodeIterator, getnode func(string) *Node, allowPreemption, quotaPreemption bool) *AllocationResult {
if quotaPreemption && sq.shouldTriggerPreemption() {
go func() {
log.Log(log.SchedQueue).Info("Preconditions has passed trigger preemption to enforce new max resources",
zap.String("queueName", sq.GetQueuePath()),
zap.Stringer("maxResource", sq.cloneMaxResource()))
preemptor := NewQuotaPreemptor(sq)
preemptor.tryPreemption()
}()
// when we trigger for a parent queue do not trigger for the children just yet. At least wait until the next
// scheduling cycle.
quotaPreemption = false
}
// if quota preemption is running for this queue we do not want to trigger for any of the children.
// we do a top-down approach: parent first and when done we check the children
// there could be a child quota preemption running already
if sq.getQuotaPreemptionRunning() {
quotaPreemption = false
}
func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() NodeIterator, getnode func(string) *Node, allowPreemption bool) *AllocationResult {
if sq.IsLeafQueue() {
// get the headroom
headRoom := sq.getHeadRoom()
Expand Down Expand Up @@ -1713,7 +1698,7 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N
} else {
// process the child queues (filters out queues without pending requests)
for _, child := range sq.sortQueues() {
result := child.TryAllocate(iterator, fullIterator, getnode, allowPreemption, quotaPreemption)
result := child.TryAllocate(iterator, fullIterator, getnode, allowPreemption)
if result != nil {
return result
}
Expand All @@ -1722,6 +1707,28 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N
return nil
}

func (sq *Queue) TryQuotaPreemption() {
Comment thread
adityadtu5 marked this conversation as resolved.
if sq.tryAcquirePreemption() {
go func() {
defer sq.setQuotaPreemptionState(false)
log.Log(log.SchedQueue).Info("Preconditions has passed trigger preemption to enforce new max resources",
zap.String("queueName", sq.GetQueuePath()),
zap.Stringer("maxResource", sq.cloneMaxResource()))
preemptor := NewQuotaPreemptor(sq)
preemptor.tryPreemption()
}() // fire preemption in a separate go routine to avoid blocking the quotapreemption loop
return // no need to run quota preemption for sub-tree children when we quota preemption is running for parent queue
}
if sq.getQuotaPreemptionRunning() {
return
}
if !sq.IsLeafQueue() {
for _, child := range sq.GetCopyOfChildren() {
child.TryQuotaPreemption()
}
}
}

// TryPlaceholderAllocate tries to replace a placeholders with a real allocation.
// This only gets called if there is a pending request on this queue or its children.
// This is a depth first algorithm: descend into the depth of the queue tree first. Child queues are sorted based on
Expand Down
65 changes: 54 additions & 11 deletions pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2451,18 +2451,17 @@ func TestQuotaPreemptionSettings(t *testing.T) {
parent.UpdateQueueProperties(oldMax)
// Wait till delay expires to let trigger preemption automatically
time.Sleep(parent.quotaPreemptionDelay + 50*time.Millisecond)
assert.Equal(t, parent.shouldTriggerPreemption(), tc.timeChange, "preemption should get trigger for set delay")
parent.TryAllocate(nil, nil, nil, false, true)

time.Sleep(50 * time.Millisecond)

// since preemption settings are reset, preemption should not be triggerred again during the next check
assert.Equal(t, parent.shouldTriggerPreemption(), false)
assert.Equal(t, parent.isQuotaPreemptionRunning, false, "quotaPreemptionRunning flag should be false initially")
triggered := parent.tryAcquirePreemption()
assert.Equal(t, triggered, tc.timeChange, "preemption should get trigger for set delay")
assert.Equal(t, parent.isQuotaPreemptionRunning, triggered, "quotaPreemptionRunning flag should be true if preemption is triggered")
// preemption should not be triggerred again during the next check
assert.Equal(t, parent.tryAcquirePreemption(), false)
})
}
}

func TestShouldTriggerPreemption(t *testing.T) {
func TestTryAcquirePreemption(t *testing.T) {
parentConfig := configs.QueueConfig{
Name: "parent",
Parent: true,
Expand Down Expand Up @@ -2521,6 +2520,24 @@ func TestShouldTriggerPreemption(t *testing.T) {
assert.NilError(t, err)
usageNotMatchingMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000})

futureStartQueue, err := NewConfiguredQueue(configs.QueueConfig{
Name: "leaf-future-start-time",
Resources: leafRes,
}, parent, false, nil)
assert.NilError(t, err)
futureStartQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, "cpu": 2000})
futureStartQueue.quotaPreemptionStartTime = time.Now().Add(time.Hour)

// nil maxResource: StrictlyGreaterThanOrEqualsOnlyExisting(nil, alloc) -> false (won't clear start time)
// so a past start time will still trigger preemption.
nilMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
Name: "leaf-nil-max",
// Resources intentionally not set -> maxResource == nil
}, parent, false, nil)
assert.NilError(t, err)
nilMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000})
nilMaxQueue.quotaPreemptionStartTime = time.Now()

testCases := []struct {
name string
queue *Queue
Expand All @@ -2531,18 +2548,44 @@ func TestShouldTriggerPreemption(t *testing.T) {
{"preemption running", alreadyPreemptionRunning, false},
{"usage exceeded max, no start time", usageExceededMaxQueue, false},
{"usage exceeded max, start time set", parent, true},
{"usage exceeded max, start time in future", futureStartQueue, false},
{"usage equals max resources", usageEqualsMaxQueue, false},
{"usage res not matching max", usageNotMatchingMaxQueue, false},
{"nil max resource with usage and past start time", nilMaxQueue, true},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.queue.shouldTriggerPreemption(), tc.preconditionResult)
result := tc.queue.tryAcquirePreemption()
assert.Equal(t, result, tc.preconditionResult)
// Reset the flag if acquired so it doesn't affect other test cases.
if result {
tc.queue.ResetPreemptionTime()
}
})
}
// special case: reset time if usage below max
// special case: reset time if usage below max (StrictlyGreaterThanOrEqualsOnlyExisting clears the start time)
usageNotMatchingMaxQueue.quotaPreemptionStartTime = time.Now().Add(time.Hour)
assert.Assert(t, !usageNotMatchingMaxQueue.shouldTriggerPreemption(), "preemption should not be triggered")
assert.Assert(t, !usageNotMatchingMaxQueue.tryAcquirePreemption(), "preemption should not be triggered")
assert.Assert(t, usageNotMatchingMaxQueue.quotaPreemptionStartTime.IsZero(), "start time should be reset")

// start time is also cleared when usage equals max (within-quota side effect check).
usageEqualsMaxQueue.quotaPreemptionStartTime = time.Now()
assert.Assert(t, !usageEqualsMaxQueue.tryAcquirePreemption(), "preemption must not fire when usage equals max")
assert.Assert(t, usageEqualsMaxQueue.quotaPreemptionStartTime.IsZero(), "start time must be cleared when usage equals max")

// successful acquisition sets isQuotaPreemptionRunning to true.
// re-arm parent: table test called setQuotaPreemptionState(false) which cleared the start time.
parent.quotaPreemptionStartTime = time.Now()
assert.Assert(t, parent.tryAcquirePreemption(), "acquisition should succeed after re-arming")
assert.Assert(t, parent.isQuotaPreemptionRunning, "isQuotaPreemptionRunning must be true after acquisition")
parent.setQuotaPreemptionState(false)

// after release, start time is cleared → immediate re-acquisition is blocked.
assert.Assert(t, !parent.tryAcquirePreemption(), "re-acquisition must fail when start time is cleared after release")
// re-arming the start time makes re-acquisition possible again.
parent.quotaPreemptionStartTime = time.Now()
assert.Assert(t, parent.tryAcquirePreemption(), "re-acquisition must succeed after re-arming start time")
parent.setQuotaPreemptionState(false)
}

func TestNewConfiguredQueue(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/scheduler/objects/quota_preemptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ func (qpc *QuotaPreemptionContext) tryPreemption() {
}
}

// this MUST always be run in top-down manner starting from the root queue.
// This assumes that parent queue will not call this for leaf queues if quota preemption is already running for parent queue.
// Use tryAcquirePreemption to safely mark the quota preemption running for queue before calling this function.
// isQuotaPreemptionRunning is already set atomically by tryAcquirePreemption before this function is called.
// isQuotaPreemptionRunning SHOULD be cleared by the caller (via setQuotaPreemptionState(false)).
func (qpc *QuotaPreemptionContext) tryPreemptionInternal() {
log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change preemption for leaf queue",
zap.String("leaf queue", qpc.queue.GetQueuePath()),
Expand All @@ -126,8 +131,6 @@ func (qpc *QuotaPreemptionContext) tryPreemptionInternal() {
zap.Stringer("actual allocated resource", qpc.allocatedResource),
zap.Stringer("preemptable resource distribution", qpc.preemptableResource),
)
// quota change preemption has started, so mark the flag
qpc.queue.setQuotaPreemptionState(true)

// Filter the allocations
qpc.filterAllocations()
Expand All @@ -137,9 +140,6 @@ func (qpc *QuotaPreemptionContext) tryPreemptionInternal() {

// Preempt the victims
qpc.preemptVictims()

// quota change preemption has ended, so mark the flag
qpc.queue.setQuotaPreemptionState(false)
}

// getChildQueuesPreemptableResource Compute leaf queue's preemptable resource distribution from the parent's preemptable resource.
Expand Down
Loading
Loading