Skip to content

Commit

Permalink
fix: if the capability cpu or memory is not specified in the hierarch…
Browse files Browse the repository at this point in the history
…ical queue, it will be set to the corresponding value of the parent queue

Signed-off-by: JesseStutler <[email protected]>
  • Loading branch information
JesseStutler committed Dec 25, 2024
1 parent b0c1a56 commit 08dcdb4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 27 deletions.
22 changes: 12 additions & 10 deletions pkg/scheduler/api/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,16 @@ type ScalarResource struct {

// BuildResourceList builds resource list object
func BuildResourceList(cpu string, memory string, scalarResources ...ScalarResource) v1.ResourceList {
resourceList := v1.ResourceList{
v1.ResourceCPU: resource.MustParse(cpu),
v1.ResourceMemory: resource.MustParse(memory),
resourceList := v1.ResourceList{}

if len(cpu) > 0 {
resourceList[v1.ResourceCPU] = resource.MustParse(cpu)
}

if len(memory) > 0 {
resourceList[v1.ResourceMemory] = resource.MustParse(memory)
}

for _, scalar := range scalarResources {
resourceList[v1.ResourceName(scalar.Name)] = resource.MustParse(scalar.Value)
}
Expand All @@ -107,13 +113,9 @@ func BuildResourceList(cpu string, memory string, scalarResources ...ScalarResou

// BuildResourceListWithGPU builds resource list with GPU
func BuildResourceListWithGPU(cpu string, memory string, GPU string, scalarResources ...ScalarResource) v1.ResourceList {
resourceList := v1.ResourceList{
v1.ResourceCPU: resource.MustParse(cpu),
v1.ResourceMemory: resource.MustParse(memory),
GPUResourceName: resource.MustParse(GPU),
}
for _, scalar := range scalarResources {
resourceList[v1.ResourceName(scalar.Name)] = resource.MustParse(scalar.Value)
resourceList := BuildResourceList(cpu, memory, scalarResources...)
if len(GPU) > 0 {
resourceList[GPUResourceName] = resource.MustParse(GPU)
}

return resourceList
Expand Down
37 changes: 21 additions & 16 deletions pkg/scheduler/plugins/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,13 @@ func (cp *capacityPlugin) buildHierarchicalQueueAttrs(ssn *framework.Session) bo
attr.name, attr.allocated.String(), attr.request.String(), attr.inqueue.String(), attr.elastic.String())
}

// init root queue realCapability/capability/deserved as cp.totalResource
rootQueueAttr := cp.queueOpts[api.QueueID(cp.rootQueue)]
rootQueueAttr.capability = cp.totalResource
rootQueueAttr.realCapability = cp.totalResource
rootQueueAttr.deserved = cp.totalResource
// Check the hierarchical structure of queues
err := cp.checkHierarchicalQueue(cp.queueOpts[api.QueueID(cp.rootQueue)])
err := cp.checkHierarchicalQueue(rootQueueAttr)
if err != nil {
klog.Errorf("Failed to check queue's hierarchical structure, error: %v", err)
return false
Expand Down Expand Up @@ -586,21 +591,16 @@ func (cp *capacityPlugin) newQueueAttr(queue *api.QueueInfo) *queueAttr {
parents: make([]api.QueueID, 0),
children: make(map[api.QueueID]*queueAttr),

deserved: api.NewResource(queue.Queue.Spec.Deserved),
allocated: api.EmptyResource(),
request: api.EmptyResource(),
elastic: api.EmptyResource(),
inqueue: api.EmptyResource(),
guarantee: api.EmptyResource(),
deserved: api.NewResource(queue.Queue.Spec.Deserved),
allocated: api.EmptyResource(),
request: api.EmptyResource(),
elastic: api.EmptyResource(),
inqueue: api.EmptyResource(),
guarantee: api.EmptyResource(),
capability: api.EmptyResource(),
}
if len(queue.Queue.Spec.Capability) != 0 {
attr.capability = api.NewResource(queue.Queue.Spec.Capability)
if attr.capability.MilliCPU <= 0 {
attr.capability.MilliCPU = math.MaxFloat64
}
if attr.capability.Memory <= 0 {
attr.capability.Memory = math.MaxFloat64
}
}

if len(queue.Queue.Spec.Guarantee.Resource) != 0 {
Expand Down Expand Up @@ -644,17 +644,22 @@ func (cp *capacityPlugin) checkHierarchicalQueue(attr *queueAttr) error {
for _, childAttr := range attr.children {
totalDeserved.Add(childAttr.deserved)
totalGuarantee.Add(childAttr.guarantee)
// if the user does not set CPU or memory in capability, we set the value to be the same as parent(we do not consider the situation where the user sets CPU or memory<=0)
if childAttr.capability.MilliCPU <= 0 {
childAttr.capability.MilliCPU = attr.capability.MilliCPU
}
if childAttr.capability.Memory <= 0 {
childAttr.capability.Memory = attr.capability.Memory
}
// Check if the parent queue's capability is less than the child queue's capability
if attr.capability != nil && childAttr.capability != nil && attr.capability.LessPartly(childAttr.capability, api.Zero) {
if attr.capability.LessPartly(childAttr.capability, api.Zero) {
return fmt.Errorf("queue <%s> capability is less than its child queue <%s>", attr.name, childAttr.name)
}
}

if attr.name == cp.rootQueue {
attr.guarantee = totalGuarantee
cp.totalGuarantee = totalGuarantee
attr.realCapability = cp.totalResource
attr.deserved = cp.totalResource
}

for _, childAttr := range attr.children {
Expand Down
23 changes: 22 additions & 1 deletion pkg/scheduler/plugins/capacity/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func Test_capacityPlugin_OnSessionOpenWithHierarchy(t *testing.T) {
// podgroup
pg1 := util.BuildPodGroup("pg1", "ns1", "q11", 1, nil, schedulingv1beta1.PodGroupInqueue)
// queue
root := buildQueueWithParents("root", "", api.BuildResourceList("8", "8Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), api.BuildResourceList("8", "8Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...))
root := buildQueueWithParents("root", "", nil, nil)
queue1 := buildQueueWithParents("q1", "root", nil, api.BuildResourceList("4", "4Gi"))
queue2 := buildQueueWithParents("q2", "root", nil, api.BuildResourceList("4", "4Gi"))
queue11 := buildQueueWithParents("q11", "q1", nil, api.BuildResourceList("1", "1Gi"))
Expand Down Expand Up @@ -383,6 +383,15 @@ func Test_capacityPlugin_OnSessionOpenWithHierarchy(t *testing.T) {
pg5 := util.BuildPodGroup("pg5", "ns1", "q31", 1, nil, schedulingv1beta1.PodGroupRunning)
pg6 := util.BuildPodGroup("pg6", "ns1", "q32", 1, nil, schedulingv1beta1.PodGroupInqueue)

// resources for test case 4
// queue
queue5 := buildQueueWithParents("q5", "root", nil, api.BuildResourceList("", "4Gi", []api.ScalarResource{}...))
queue51 := buildQueueWithParents("q51", "q5", nil, api.BuildResourceList("", "2Gi", []api.ScalarResource{}...))
// podgroup
pg7 := util.BuildPodGroup("pg7", "ns1", "q51", 1, nil, schedulingv1beta1.PodGroupRunning)
// pod
p9 := util.BuildPod("ns1", "p9", "", corev1.PodPending, api.BuildResourceList("1", ""), "pg7", make(map[string]string), map[string]string{})

tests := []uthelper.TestCommonStruct{
{
Name: "case0: Pod allocatable when queue is leaf queue",
Expand Down Expand Up @@ -432,6 +441,18 @@ func Test_capacityPlugin_OnSessionOpenWithHierarchy(t *testing.T) {
ExpectEvicted: []string{"ns1/p7"},
ExpectEvictNum: 1,
},
{
Name: "case4: If the capability cpu or memory is not specified, the value should be inherited from parent queue",
Plugins: plugins,
Pods: []*corev1.Pod{p9},
Nodes: []*corev1.Node{n1},
PodGroups: []*schedulingv1beta1.PodGroup{pg7},
Queues: []*schedulingv1beta1.Queue{root, queue5, queue51},
ExpectBindMap: map[string]string{
"ns1/p9": "n1",
},
ExpectBindsNum: 1,
},
}

tiers := []conf.Tier{
Expand Down

0 comments on commit 08dcdb4

Please sign in to comment.