Skip to content

Commit 3067cc1

Browse files
committed
test: Add dedicated LocalBackend smoke coverage
1 parent 6a67282 commit 3067cc1

File tree

1 file changed

+184
-0
lines changed

1 file changed

+184
-0
lines changed
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
"""Dedicated LocalBackend smoke test for PipelineTrainer."""
2+
3+
import asyncio
4+
import os
5+
import tempfile
6+
import uuid
7+
8+
import openai
9+
import pytest
10+
11+
torch = pytest.importorskip("torch")
12+
pytest.importorskip("vllm")
13+
14+
import art
15+
from art.local import LocalBackend
16+
from art.pipeline_trainer import PipelineTrainer
17+
18+
DEFAULT_BASE_MODEL = "Qwen/Qwen3-0.6B"
19+
DEFAULT_GPU_MEMORY_UTILIZATION = 0.2
20+
DEFAULT_MAX_MODEL_LEN = 2048
21+
DEFAULT_MAX_SEQ_LENGTH = 2048
22+
23+
24+
def get_base_model() -> str:
25+
return os.environ.get("BASE_MODEL", DEFAULT_BASE_MODEL)
26+
27+
28+
def get_safe_gpu_memory_utilization() -> float:
29+
requested = float(
30+
os.environ.get(
31+
"ART_TEST_GPU_MEMORY_UTILIZATION",
32+
str(DEFAULT_GPU_MEMORY_UTILIZATION),
33+
)
34+
)
35+
min_free_gib = float(os.environ.get("ART_TEST_MIN_FREE_GPU_GIB", "8"))
36+
free_ratios: list[float] = []
37+
for device in (0, 1):
38+
free_bytes, total_bytes = torch.cuda.mem_get_info(device)
39+
free_gib = free_bytes / (1024**3)
40+
if free_gib < min_free_gib:
41+
pytest.skip(
42+
"Insufficient free GPU memory for dedicated LocalBackend smoke test: "
43+
f"GPU {device} has {free_gib:.1f} GiB free < {min_free_gib:.1f} GiB required."
44+
)
45+
free_ratios.append(free_bytes / total_bytes)
46+
return max(0.02, min(requested, min(free_ratios) * 0.8))
47+
48+
49+
def get_dedicated_vllm_test_config() -> art.dev.InternalModelConfig:
50+
return {
51+
"trainer_gpu_ids": [0],
52+
"inference_gpu_ids": [1],
53+
"engine_args": {
54+
"gpu_memory_utilization": get_safe_gpu_memory_utilization(),
55+
"max_model_len": int(
56+
os.environ.get("ART_TEST_MAX_MODEL_LEN", str(DEFAULT_MAX_MODEL_LEN))
57+
),
58+
"max_num_seqs": 8,
59+
"enforce_eager": True,
60+
},
61+
"init_args": {
62+
"max_seq_length": int(
63+
os.environ.get("ART_TEST_MAX_SEQ_LENGTH", str(DEFAULT_MAX_SEQ_LENGTH))
64+
),
65+
},
66+
}
67+
68+
69+
def reward_for_answer(text: str) -> float:
70+
content = text.lower()
71+
if "yes" in content:
72+
return 1.0
73+
if "no" in content:
74+
return 0.5
75+
if "maybe" in content:
76+
return 0.25
77+
return 0.0
78+
79+
80+
async def assert_chat_logprobs(
81+
client: openai.AsyncOpenAI,
82+
model_name: str,
83+
) -> None:
84+
completion = await client.chat.completions.create(
85+
messages=[{"role": "user", "content": "Say hello."}],
86+
model=model_name,
87+
max_tokens=8,
88+
timeout=60,
89+
logprobs=True,
90+
top_logprobs=0,
91+
)
92+
assert completion.choices[0].logprobs is not None
93+
94+
95+
@pytest.mark.skipif(
96+
not torch.cuda.is_available() or torch.cuda.device_count() < 2,
97+
reason="Need at least 2 CUDA GPUs for dedicated LocalBackend PipelineTrainer test",
98+
)
99+
async def test_pipeline_trainer_local_backend_dedicated_smoke() -> None:
100+
model_name = f"test-pipeline-local-dedicated-{uuid.uuid4().hex[:8]}"
101+
prompts = [
102+
"Say yes",
103+
"Say no",
104+
"Say maybe",
105+
"Say hello",
106+
"Say yes again",
107+
"Say no again",
108+
]
109+
client: openai.AsyncOpenAI | None = None
110+
111+
async def rollout_fn(
112+
model: art.TrainableModel,
113+
scenario: dict[str, str],
114+
_config: None,
115+
) -> art.TrajectoryGroup:
116+
await asyncio.sleep(0.2)
117+
messages: art.Messages = [{"role": "user", "content": scenario["prompt"]}]
118+
assert client is not None
119+
completion = await client.chat.completions.create(
120+
messages=messages,
121+
model=model.get_inference_name(),
122+
max_tokens=10,
123+
timeout=60,
124+
temperature=1,
125+
n=2,
126+
logprobs=True,
127+
top_logprobs=0,
128+
)
129+
return art.TrajectoryGroup(
130+
[
131+
art.Trajectory(
132+
messages_and_choices=[*messages, choice],
133+
reward=reward_for_answer(choice.message.content or ""),
134+
)
135+
for choice in completion.choices
136+
]
137+
)
138+
139+
async def scenario_iter():
140+
for prompt in prompts:
141+
yield {"prompt": prompt}
142+
143+
with tempfile.TemporaryDirectory() as tmpdir:
144+
async with LocalBackend(path=tmpdir) as backend:
145+
model = art.TrainableModel(
146+
name=model_name,
147+
project="integration-tests",
148+
base_model=get_base_model(),
149+
_internal_config=get_dedicated_vllm_test_config(),
150+
)
151+
client: openai.AsyncOpenAI | None = None
152+
try:
153+
await model.register(backend)
154+
client = model.openai_client()
155+
trainer = PipelineTrainer(
156+
model=model,
157+
backend=backend,
158+
rollout_fn=rollout_fn,
159+
scenarios=scenario_iter(),
160+
config=None,
161+
num_rollout_workers=2,
162+
min_batch_size=1,
163+
max_batch_size=1,
164+
max_steps=2,
165+
loss_fn="cispo",
166+
eval_fn=None,
167+
)
168+
169+
await trainer.train()
170+
171+
latest_step = await model.get_step()
172+
assert latest_step >= 2
173+
174+
await assert_chat_logprobs(client, model.get_inference_name(step=0))
175+
await assert_chat_logprobs(
176+
client, model.get_inference_name(step=latest_step)
177+
)
178+
179+
model_ids = [m.id async for m in client.models.list()]
180+
assert f"{model.name}@0" in model_ids
181+
assert f"{model.name}@{latest_step}" in model_ids
182+
finally:
183+
if client is not None:
184+
await client.close()

0 commit comments

Comments
 (0)