11import asyncio
22import json
3+ import logging
34import math
45import os
56import shutil
910from typing import AsyncIterator , Iterable , Literal , cast
1011import warnings
1112
13+ logger = logging .getLogger (__name__ )
14+
1215import aiohttp
1316import numpy as np
1417from openai import AsyncOpenAI
@@ -97,6 +100,9 @@ async def close(self) -> None:
97100
98101 def _close (self ) -> None :
99102 for _ , service in self ._services .items ():
103+ close = getattr (service , "close" , None )
104+ if close is not None :
105+ close ()
100106 close_proxy (service )
101107
102108 async def register (
@@ -140,18 +146,39 @@ def _model_inference_name(self, model: Model, step: int | None = None) -> str:
140146
141147 # For LocalBackend, vLLM always serves LoRA adapters with @step suffix
142148 # Default to step 0 when not specified (the initial checkpoint created at registration)
143- actual_step = step if step is not None else self .__get_step (model )
144- return f"{ model .name } @{ actual_step } "
149+ if step is not None :
150+ actual_step = step
151+ elif model .name in self ._services :
152+ # In dedicated mode the service tracks which adapter vLLM has
153+ # actually loaded. Reading the filesystem would race: the
154+ # checkpoint directory appears before the HTTP reload completes.
155+ svc = self ._services [model .name ]
156+ loaded_step = getattr (svc , "_latest_step" , None )
157+ actual_step = (
158+ loaded_step if loaded_step is not None else self .__get_step (model )
159+ )
160+ else :
161+ actual_step = self .__get_step (model )
162+ name = f"{ model .name } @{ actual_step } "
163+ logger .debug (
164+ f"[BACKEND] _model_inference_name: step_arg={ step } "
165+ f"actual_step={ actual_step } -> { name } "
166+ )
167+ return name
145168
146169 async def _get_service (self , model : TrainableModel ) -> ModelService :
147170 from ..dev .get_model_config import get_model_config
171+ from ..dev .validate import is_dedicated_mode , validate_dedicated_config
148172
149173 if model .name not in self ._services :
150174 config = get_model_config (
151175 base_model = model .base_model ,
152176 output_dir = get_model_dir (model = model , art_path = self ._path ),
153177 config = model ._internal_config ,
154178 )
179+ validate_dedicated_config (config )
180+ dedicated = is_dedicated_mode (config )
181+
155182 is_tinker = config .get ("tinker_args" ) is not None
156183 if is_tinker :
157184 from ..tinker .service import TinkerService
@@ -164,13 +191,19 @@ async def _get_service(self, model: TrainableModel) -> ModelService:
164191 # When moving the service to a child process, import unsloth
165192 # early to maximize optimizations
166193 os .environ ["IMPORT_UNSLOTH" ] = "1"
194+
195+ if dedicated :
196+ os .environ ["CUDA_VISIBLE_DEVICES" ] = "," .join (
197+ str (g ) for g in config ["trainer_gpu_ids" ]
198+ )
199+
167200 self ._services [model .name ] = service_class (
168201 model_name = model .name ,
169202 base_model = model .base_model ,
170203 config = config ,
171204 output_dir = get_model_dir (model = model , art_path = self ._path ),
172205 )
173- if not self ._in_process :
206+ if not dedicated and not self ._in_process :
174207 # Kill all "model-service" processes to free up GPU memory
175208 subprocess .run (["pkill" , "-9" , "model-service" ])
176209 self ._services [model .name ] = move_to_child_process (
@@ -609,6 +642,10 @@ async def _train_model(
609642 # Still advance the step by renaming the checkpoint directory
610643 current_step = self .__get_step (model )
611644 next_step = current_step + 1
645+ logger .info (
646+ f"[BACKEND] _train_model SKIP: current_step={ current_step } "
647+ f"next_step={ next_step } (all rewards equal)"
648+ )
612649 current_checkpoint_dir = get_step_checkpoint_dir (
613650 get_model_dir (model = model , art_path = self ._path ), current_step
614651 )
@@ -623,8 +660,9 @@ async def _train_model(
623660 next_checkpoint_dir ,
624661 dirs_exist_ok = True ,
625662 )
626- print (
627- f"Advanced step from { current_step } to { next_step } (no training occurred)"
663+ logger .info (
664+ f"[BACKEND] _train_model SKIP: copied checkpoint "
665+ f"{ current_step } -> { next_step } , calling register_lora_for_step..."
628666 )
629667
630668 try :
@@ -634,6 +672,10 @@ async def _train_model(
634672 await service .register_lora_for_step ( # type: ignore[attr-defined]
635673 next_step , next_checkpoint_dir
636674 )
675+ logger .info (
676+ f"[BACKEND] _train_model SKIP: register_lora_for_step "
677+ f"completed for step { next_step } "
678+ )
637679 except ModuleNotFoundError :
638680 pass # Unsloth is not installed
639681
0 commit comments