Skip to content

Commit 072f337

Browse files
authored
Merge pull request #18053 from justinsb/gang_scheduling_with_kueue
[aiconformance] Add test for gang scheduling with kueue
2 parents df43863 + 770fba0 commit 072f337

6 files changed

Lines changed: 231 additions & 0 deletions

File tree

tests/e2e/scenarios/ai-conformance/validators/harness.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"os"
2222
"path/filepath"
23+
"sync"
2324
"testing"
2425

2526
"k8s.io/client-go/dynamic"
@@ -35,6 +36,12 @@ type ValidatorHarness struct {
3536

3637
dynamicClient dynamic.Interface
3738
restConfig *rest.Config
39+
40+
// mutex guards our mutable state
41+
mutex sync.Mutex
42+
43+
// testNamespace is a per-test namespace to use for creating resources. It is lazily initialized when TestNamespace() is called.
44+
testNamespace string
3845
}
3946

4047
// NewValidatorHarness creates a new ValidatorHarness.
@@ -70,3 +77,9 @@ func NewValidatorHarness(t *testing.T) *ValidatorHarness {
7077
func (h *ValidatorHarness) Context() context.Context {
7178
return h.t.Context()
7279
}
80+
81+
// Skip allows the test to be skipped with a message, and ensures that the skip is recorded in the output.
82+
func (h *ValidatorHarness) Skip(message string) {
83+
h.output.Skip(message)
84+
h.t.Skip(message)
85+
}

tests/e2e/scenarios/ai-conformance/validators/kube.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ limitations under the License.
1717
package validators
1818

1919
import (
20+
"fmt"
21+
"strings"
22+
"time"
23+
2024
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2125
"k8s.io/apimachinery/pkg/runtime/schema"
2226
"k8s.io/client-go/dynamic"
@@ -103,3 +107,88 @@ func (h *ValidatorHarness) ListResourceSlices() []*ResourceSlice {
103107
}
104108
return out
105109
}
110+
111+
// CRD is a wrapper around the CustomResourceDefinition type.
112+
type CRD struct {
113+
u *unstructured.Unstructured
114+
}
115+
116+
// Name returns the name of the CRD.
117+
func (d *CRD) Name() string {
118+
return d.u.GetName()
119+
}
120+
121+
var crdGVR = schema.GroupVersionResource{
122+
Group: "apiextensions.k8s.io",
123+
Version: "v1",
124+
Resource: "customresourcedefinitions",
125+
}
126+
127+
// ListCRDs lists all CRDs in the cluster.
128+
func (h *ValidatorHarness) ListCRDs() []*CRD {
129+
objectList, err := h.DynamicClient().Resource(crdGVR).List(h.Context(), metav1.ListOptions{})
130+
if err != nil {
131+
h.Fatalf("failed to list CRDs: %v", err)
132+
}
133+
var out []*CRD
134+
for i := range objectList.Items {
135+
out = append(out, &CRD{u: &objectList.Items[i]})
136+
}
137+
return out
138+
}
139+
140+
// HasCRD returns true if a CRD with the given name exists.
141+
func (h *ValidatorHarness) HasCRD(name string) bool {
142+
for _, crd := range h.ListCRDs() {
143+
if crd.Name() == name {
144+
return true
145+
}
146+
}
147+
return false
148+
}
149+
150+
var namespaceGVR = schema.GroupVersionResource{
151+
Group: "",
152+
Version: "v1",
153+
Resource: "namespaces",
154+
}
155+
156+
var namespaceGVK = schema.GroupVersionKind{
157+
Group: "",
158+
Version: "v1",
159+
Kind: "Namespace",
160+
}
161+
162+
func (h *ValidatorHarness) TestNamespace() string {
163+
h.mutex.Lock()
164+
defer h.mutex.Unlock()
165+
166+
if h.testNamespace == "" {
167+
prefix := strings.ToLower(h.t.Name())
168+
prefix = strings.ReplaceAll(prefix, "/", "-")
169+
prefix = strings.ReplaceAll(prefix, "_", "-")
170+
ns := fmt.Sprintf("%s-%d", prefix, time.Now().Unix())
171+
172+
nsObj := &unstructured.Unstructured{}
173+
nsObj.SetGroupVersionKind(namespaceGVK)
174+
nsObj.SetName(ns)
175+
176+
h.Logf("Creating test namespace %q", ns)
177+
178+
if _, err := h.DynamicClient().Resource(namespaceGVR).Create(h.Context(), nsObj, metav1.CreateOptions{}); err != nil {
179+
h.Fatalf("failed to create test namespace: %v", err)
180+
}
181+
182+
h.testNamespace = ns
183+
184+
h.t.Cleanup(func() {
185+
h.Logf("Deleting test namespace %q", ns)
186+
err := h.DynamicClient().Resource(namespaceGVR).Delete(h.Context(), ns, metav1.DeleteOptions{})
187+
if err != nil {
188+
h.Logf("failed to delete test namespace: %v", err)
189+
}
190+
})
191+
}
192+
193+
return h.testNamespace
194+
}

tests/e2e/scenarios/ai-conformance/validators/markdown.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ func (o *MarkdownOutput) Success(text string) {
4040
o.printf("✓ %s\n", text)
4141
}
4242

43+
// Skip writes a skip message to the markdown file, prefixed with a warning symbol.
44+
func (o *MarkdownOutput) Skip(message string) {
45+
o.printf("&warning; SKIPPED: %s\n", message)
46+
}
47+
4348
// Close closes the underlying file. It should be called when all output is done.
4449
func (o *MarkdownOutput) Close() error {
4550
return o.f.Close()

tests/e2e/scenarios/ai-conformance/validators/output.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ type OutputSink interface {
3232
// Success indicates a successful check, allowing the output sink to format it accordingly.
3333
Success(text string)
3434

35+
// Skip indicates that a test was skipped, allowing the output sink to format it accordingly.
36+
Skip(message string)
37+
3538
// Close closes the output sink and releases any resources.
3639
io.Closer
3740
}
@@ -64,3 +67,9 @@ func (h *ValidatorHarness) Success(format string, args ...interface{}) {
6467
h.output.Success(s)
6568
h.t.Logf("SUCCESS: "+format, args...)
6669
}
70+
71+
// RecordConformance records that a specific conformance test was passed.
72+
func (h *ValidatorHarness) RecordConformance(testName string) {
73+
// We should gather these in a structured way, but for now we'll just log them.
74+
h.Logf("Conformance %q passed", testName)
75+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dra_support
18+
19+
import (
20+
"fmt"
21+
"testing"
22+
23+
"k8s.io/kops/tests/e2e/scenarios/ai-conformance/validators"
24+
)
25+
26+
// TestGangScheduling_ViaKueue corresponds to the schedulingOrchestration/gang_scheduling scenario,
27+
// for the case that the vendor chooses to demonstrate gang scheduling support via Kueue.
28+
func TestGangScheduling_ViaKueue(t *testing.T) {
29+
// Description:
30+
// The platform must allow for the installation and successful operation of at least one gang scheduling solution that ensures all-or-nothing scheduling for distributed AI workloads (e.g. Kueue, Volcano, etc.) To be conformant, the vendor must demonstrate that their platform can successfully run at least one such solution.
31+
32+
h := validators.NewValidatorHarness(t)
33+
34+
if !h.HasCRD("localqueues.kueue.x-k8s.io") {
35+
h.Skip("Kueue CRDs not found, skipping gang scheduling test via Kueue")
36+
}
37+
38+
h.Logf("# Applying manifest")
39+
40+
jobName := "gangscheduling-kueue"
41+
{
42+
ns := h.TestNamespace()
43+
h.ShellExec(fmt.Sprintf("kubectl apply --namespace %s -f %s", ns, "testdata/gangscheduling/gangscheduling-kueue.yaml"))
44+
}
45+
46+
// Wait for Job completion
47+
{
48+
ns := h.TestNamespace()
49+
h.Logf("# Waiting for Job to complete")
50+
h.ShellExec(fmt.Sprintf("kubectl wait --namespace %s --for=condition=complete job/%s", ns, jobName))
51+
52+
}
53+
54+
h.Success("Gang scheduling via Kueue test completed successfully.")
55+
56+
h.RecordConformance("schedulingOrchestration/gang_scheduling")
57+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
apiVersion: kueue.x-k8s.io/v1beta1
2+
kind: ResourceFlavor
3+
metadata:
4+
name: default-flavor
5+
---
6+
apiVersion: kueue.x-k8s.io/v1beta1
7+
kind: ClusterQueue
8+
metadata:
9+
name: cluster-queue
10+
spec:
11+
namespaceSelector: {} # Available to all namespaces
12+
queueingStrategy: BestEffortFIFO # Default queueing strategy
13+
resourceGroups:
14+
- coveredResources: ["cpu", "memory", "ephemeral-storage"]
15+
flavors:
16+
- name: "default-flavor"
17+
resources:
18+
- name: "cpu"
19+
nominalQuota: 10
20+
- name: "memory"
21+
nominalQuota: 10Gi
22+
- name: "ephemeral-storage"
23+
nominalQuota: 10Gi
24+
---
25+
apiVersion: kueue.x-k8s.io/v1beta1
26+
kind: LocalQueue
27+
metadata:
28+
name: team1
29+
spec:
30+
clusterQueue: cluster-queue
31+
---
32+
apiVersion: batch/v1
33+
kind: Job
34+
metadata:
35+
name: gangscheduling-kueue
36+
annotations:
37+
kueue.x-k8s.io/queue-name: team1 # Point to the LocalQueue
38+
spec:
39+
ttlSecondsAfterFinished: 60 # Job will be deleted after 60 seconds
40+
parallelism: 3 # This Job will have 3 replicas running at the same time
41+
completions: 3 # This Job requires 3 completions
42+
suspend: true # Set to true to allow Kueue to control the Job when it starts
43+
template:
44+
spec:
45+
containers:
46+
- name: dummy-job
47+
image: debian:latest
48+
command: ["sleep", "10s"] # Sleep for 10 seconds
49+
resources:
50+
requests:
51+
cpu: "500m"
52+
memory: "512Mi"
53+
ephemeral-storage: "512Mi"
54+
limits:
55+
cpu: "500m"
56+
memory: "512Mi"
57+
ephemeral-storage: "512Mi"
58+
restartPolicy: Never

0 commit comments

Comments
 (0)