Skip to content

Commit 8b6c1ae

Browse files
committed
feat: stream backend installation progress to CLI users
Signed-off-by: Dorin Geman <dorin.geman@docker.com>
1 parent a5efa9f commit 8b6c1ae

File tree

3 files changed

+77
-4
lines changed

3 files changed

+77
-4
lines changed

cmd/cli/desktop/desktop.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -479,12 +479,58 @@ func (c *Client) ChatWithMessagesContext(ctx context.Context, model string, conv
479479
TotalTokens int `json:"total_tokens"`
480480
}
481481

482-
// Detect streaming vs non-streaming response via Content-Type header
482+
// Use a buffered reader so we can consume server-sent progress
483+
// lines (e.g. "Installing vllm-metal backend...") that arrive
484+
// before the actual SSE or JSON inference response.
485+
br := bufio.NewReader(resp.Body)
486+
487+
// Consume any plain-text progress lines that precede the real
488+
// response. We peek ahead: if the next non-empty content starts
489+
// with '{' (JSON) or "data:" / ":" (SSE), the progress section
490+
// is over and we fall through to normal processing.
491+
for {
492+
peek, err := br.Peek(1)
493+
if err != nil {
494+
break
495+
}
496+
// JSON object or SSE stream — stop consuming progress lines.
497+
if peek[0] == '{' || peek[0] == ':' {
498+
break
499+
}
500+
line, err := br.ReadString('\n')
501+
if err != nil && line == "" {
502+
break
503+
}
504+
line = strings.TrimRight(line, "\r\n")
505+
if line == "" {
506+
continue
507+
}
508+
// SSE data line — stop, let the normal SSE parser handle it.
509+
if strings.HasPrefix(line, "data:") {
510+
// Put the line back by chaining a reader with the rest.
511+
br = bufio.NewReader(io.MultiReader(
512+
strings.NewReader(line+"\n"),
513+
br,
514+
))
515+
break
516+
}
517+
// Progress message — print to stderr.
518+
fmt.Fprintln(os.Stderr, line)
519+
}
520+
521+
// Detect streaming vs non-streaming response. Because server-sent
522+
// progress lines may have been flushed before the Content-Type was
523+
// set, we also peek at the body content to detect SSE.
483524
isStreaming := strings.HasPrefix(resp.Header.Get("Content-Type"), "text/event-stream")
525+
if !isStreaming {
526+
if peek, err := br.Peek(5); err == nil {
527+
isStreaming = strings.HasPrefix(string(peek), "data:")
528+
}
529+
}
484530

485531
if !isStreaming {
486532
// Non-streaming JSON response
487-
body, err := io.ReadAll(resp.Body)
533+
body, err := io.ReadAll(br)
488534
if err != nil {
489535
return assistantResponse.String(), fmt.Errorf("error reading response body: %w", err)
490536
}
@@ -506,7 +552,7 @@ func (c *Client) ChatWithMessagesContext(ctx context.Context, model string, conv
506552
}
507553
} else {
508554
// SSE streaming response - process line by line
509-
scanner := bufio.NewScanner(resp.Body)
555+
scanner := bufio.NewScanner(br)
510556

511557
for scanner.Scan() {
512558
// Check if context was cancelled

pkg/inference/scheduling/api.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ const (
1313
// enough to encompass any real-world request but also small enough to avoid
1414
// DoS attacks.
1515
maximumOpenAIInferenceRequestSize = 10 * 1024 * 1024
16+
17+
// modelCLIUserAgentPrefix is the user-agent prefix set by the model CLI.
18+
modelCLIUserAgentPrefix = "docker-model-cli/"
1619
)
1720

1821
// trimRequestPathToOpenAIRoot trims a request path to start at the first

pkg/inference/scheduling/http_handler.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,28 @@ func (h *HTTPHandler) handleOpenAIInference(w http.ResponseWriter, r *http.Reque
198198
backend = h.scheduler.selectBackendForModel(model, backend, request.Model)
199199
}
200200

201+
// If a deferred backend needs on-demand installation and the request
202+
// comes from the model CLI, stream progress messages so the user sees
203+
// what is happening while the download runs.
204+
autoInstall := h.scheduler.installer.deferredBackends[backend.Name()] &&
205+
!h.scheduler.installer.isInstalled(backend.Name()) &&
206+
strings.Contains(r.UserAgent(), modelCLIUserAgentPrefix)
207+
if autoInstall {
208+
fmt.Fprintf(w, "Installing %s backend...\n", backend.Name())
209+
if f, ok := w.(http.Flusher); ok {
210+
f.Flush()
211+
}
212+
}
213+
201214
// Wait for the corresponding backend installation to complete or fail. We
202215
// don't allow any requests to be scheduled for a backend until it has
203216
// completed installation.
204217
if err := h.scheduler.installer.wait(r.Context(), backend.Name()); err != nil {
205-
if errors.Is(err, ErrBackendNotFound) {
218+
if autoInstall {
219+
// Headers are already sent (200 OK) from the progress
220+
// line, so we can only write the error as plain text.
221+
fmt.Fprintf(w, "backend installation failed: %v\n", err)
222+
} else if errors.Is(err, ErrBackendNotFound) {
206223
http.Error(w, err.Error(), http.StatusNotFound)
207224
} else if errors.Is(err, errInstallerNotStarted) {
208225
http.Error(w, err.Error(), http.StatusServiceUnavailable)
@@ -222,6 +239,13 @@ func (h *HTTPHandler) handleOpenAIInference(w http.ResponseWriter, r *http.Reque
222239
return
223240
}
224241

242+
if autoInstall {
243+
fmt.Fprintf(w, "%s backend installed successfully\n", backend.Name())
244+
if f, ok := w.(http.Flusher); ok {
245+
f.Flush()
246+
}
247+
}
248+
225249
modelID := h.scheduler.modelManager.ResolveID(request.Model)
226250

227251
// Request a runner to execute the request and defer its release.

0 commit comments

Comments
 (0)