Skip to content

Commit 75b96c2

Browse files
committed
Fix data race on wsPool lazy init and minor issues
- Initialize wsPool eagerly in NewClient instead of lazily in createWebSocketStream to eliminate a potential data race when concurrent goroutines both see wsPool==nil - Downgrade WebSocket→SSE fallback log from Error to Warn since this is an intentional graceful degradation, not an unexpected error - Close HTTP response body defensively in dialWebSocket on handshake failure to prevent a potential resource leak Assisted-By: docker-agent
1 parent e4f454c commit 75b96c2

2 files changed

Lines changed: 20 additions & 12 deletions

File tree

pkg/model/provider/openai/client.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type Client struct {
3737

3838
clientFn func(context.Context) (*openai.Client, error)
3939

40-
// wsPool is lazily initialized when transport=websocket is configured.
40+
// wsPool is initialized in NewClient when transport=websocket is configured.
4141
// It maintains a persistent WebSocket connection across requests.
4242
wsPool *wsPool
4343
}
@@ -145,14 +145,24 @@ func NewClient(ctx context.Context, cfg *latest.ModelConfig, env environment.Pro
145145

146146
slog.Debug("OpenAI client created successfully", "model", cfg.Model)
147147

148-
return &Client{
148+
client := &Client{
149149
Config: base.Config{
150150
ModelConfig: *cfg,
151151
ModelOptions: globalOptions,
152152
Env: env,
153153
},
154154
clientFn: clientFn,
155-
}, nil
155+
}
156+
157+
// Pre-create the WebSocket pool when the transport is configured.
158+
// The pool is cheap (no connections opened until the first Stream call)
159+
// and eager init avoids a data race on the lazy path.
160+
if getTransport(cfg) == "websocket" && globalOptions.Gateway() == "" {
161+
baseURL := cmp.Or(cfg.BaseURL, "https://api.openai.com/v1")
162+
client.wsPool = newWSPool(httpToWSURL(baseURL), client.buildWSHeaderFn())
163+
}
164+
165+
return client, nil
156166
}
157167

158168
// Close releases resources held by the client, including any pooled WebSocket
@@ -413,7 +423,7 @@ func (c *Client) CreateResponseStream(
413423
if transport == "websocket" && c.ModelOptions.Gateway() == "" {
414424
stream, err := c.createWebSocketStream(ctx, params)
415425
if err != nil {
416-
slog.Error("WebSocket stream failed, falling back to SSE", "error", err)
426+
slog.Warn("WebSocket stream failed, falling back to SSE", "error", err)
417427
// Fall through to SSE below.
418428
} else {
419429
slog.Debug("OpenAI responses WebSocket stream created successfully", "model", c.ModelConfig.Model)
@@ -436,19 +446,14 @@ func (c *Client) CreateResponseStream(
436446
return newResponseStreamAdapter(stream, trackUsage), nil
437447
}
438448

439-
// createWebSocketStream initializes (or reuses) a WebSocket connection and
440-
// sends the response.create message, returning a responseEventStream.
449+
// createWebSocketStream sends a request over the pre-initialized WebSocket
450+
// pool, returning a responseEventStream.
441451
func (c *Client) createWebSocketStream(
442452
ctx context.Context,
443453
params responses.ResponseNewParams,
444454
) (responseEventStream, error) {
445455
if c.wsPool == nil {
446-
// Lazy-init the pool on first WebSocket call.
447-
baseURL := cmp.Or(c.ModelConfig.BaseURL, "https://api.openai.com/v1")
448-
wsURL := httpToWSURL(baseURL)
449-
450-
headerFn := c.buildWSHeaderFn()
451-
c.wsPool = newWSPool(wsURL, headerFn)
456+
return nil, errors.New("websocket pool not initialized")
452457
}
453458

454459
return c.wsPool.Stream(ctx, params)

pkg/model/provider/openai/ws_stream.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ func dialWebSocket(
9797
conn, resp, err := dialer.DialContext(ctx, wsURL, headers)
9898
if err != nil {
9999
if resp != nil {
100+
if resp.Body != nil {
101+
_ = resp.Body.Close()
102+
}
100103
slog.Error("WebSocket handshake failed",
101104
"status", resp.StatusCode,
102105
"error", err)

0 commit comments

Comments
 (0)