Skip to content

Commit 2fe77dd

Browse files
authored
feat(chat): added streaming from chat panel & chat deploy (#321)
* added streaming for openai in chat panel * added streaming for anthropic * added streaming for cerebras + deepseek * added streaming for xai and groq * added streaming for chat deploy * moved privacy to the bottom of settings modal, added layout for chat subdomain * added logs for chat panel for openai with context passed along with stream * added proper logging for all providers in the console and in the logs * added unit tests * added streaming for gemini models * added logging formatting for chat deploy with streaming * fix build issues * remove extraneous logs * fixed streaming not working with forced tool calls for all providers, fixed response formatting for streamed back responses without tool calls
1 parent 3e01258 commit 2fe77dd

33 files changed

Lines changed: 3123 additions & 424 deletions

File tree

sim/app/api/chat/[subdomain]/route.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { NextRequest } from 'next/server'
1+
import { NextRequest, NextResponse } from 'next/server'
22
import { eq } from 'drizzle-orm'
33
import { createLogger } from '@/lib/logs/console-logger'
44
import { db } from '@/db'
@@ -96,6 +96,28 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
9696
// Execute the workflow using our helper function
9797
const result = await executeWorkflowForChat(deployment.id, message)
9898

99+
// If the executor returned a ReadableStream, stream it directly to the client
100+
if (result instanceof ReadableStream) {
101+
const streamResponse = new NextResponse(result, {
102+
status: 200,
103+
headers: {
104+
'Content-Type': 'text/plain; charset=utf-8',
105+
},
106+
})
107+
return addCorsHeaders(streamResponse, request)
108+
}
109+
110+
// Handle StreamingExecution format
111+
if (result && typeof result === 'object' && 'stream' in result && 'execution' in result) {
112+
const streamResponse = new NextResponse(result.stream as ReadableStream, {
113+
status: 200,
114+
headers: {
115+
'Content-Type': 'text/plain; charset=utf-8',
116+
},
117+
})
118+
return addCorsHeaders(streamResponse, request)
119+
}
120+
99121
// Format the result for the client
100122
// If result.content is an object, preserve it for structured handling
101123
// If it's text or another primitive, make sure it's accessible

sim/app/api/chat/utils.ts

Lines changed: 122 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ import { Serializer } from '@/serializer'
1111
import { mergeSubblockState } from '@/stores/workflows/utils'
1212
import { persistExecutionLogs } from '@/lib/logs/execution-logger'
1313
import { buildTraceSpans } from '@/lib/logs/trace-spans'
14+
import { BlockLog } from '@/executor/types'
15+
16+
declare global {
17+
var __chatStreamProcessingTasks: Promise<{success: boolean, error?: any}>[] | undefined
18+
}
1419

1520
const logger = createLogger('ChatAuthUtils')
1621
const isDevelopment = process.env.NODE_ENV === 'development'
@@ -393,16 +398,108 @@ export async function executeWorkflowForChat(chatId: string, message: string) {
393398
)
394399

395400
// Create and execute the workflow - mimicking use-workflow-execution.ts
396-
const executor = new Executor(
397-
serializedWorkflow,
398-
processedBlockStates,
399-
decryptedEnvVars,
400-
{ input: message },
401-
workflowVariables
402-
)
403-
401+
const executor = new Executor({
402+
workflow: serializedWorkflow,
403+
currentBlockStates: processedBlockStates,
404+
envVarValues: decryptedEnvVars,
405+
workflowInput: { input: message },
406+
workflowVariables,
407+
contextExtensions: {
408+
// Always request streaming – the executor will downgrade gracefully if unsupported
409+
stream: true,
410+
selectedOutputIds: outputBlockIds,
411+
edges: edges.map((e: any) => ({ source: e.source, target: e.target })),
412+
},
413+
})
414+
404415
// Execute and capture the result
405416
const result = await executor.execute(workflowId)
417+
418+
// If the executor returned a ReadableStream, forward it directly for streaming
419+
if (result instanceof ReadableStream) {
420+
return result
421+
}
422+
423+
// Handle StreamingExecution format (combined stream + execution data)
424+
if (result && typeof result === 'object' && 'stream' in result && 'execution' in result) {
425+
// We need to stream the response to the client while *also* capturing the full
426+
// content so that we can persist accurate logs once streaming completes.
427+
428+
// Duplicate the original stream – one copy goes to the client, the other we read
429+
// server-side for log enrichment.
430+
const [clientStream, loggingStream] = (result.stream as ReadableStream).tee()
431+
432+
// Kick off background processing to read the stream and persist enriched logs
433+
const processingPromise = (async () => {
434+
try {
435+
// The stream is only used to properly drain it and prevent memory leaks
436+
// All the execution data is already provided from the agent handler
437+
// through the X-Execution-Data header
438+
await drainStream(loggingStream)
439+
440+
// No need to wait for a processing promise
441+
// The execution-logger.ts will handle token estimation
442+
443+
// We can use the execution data as-is since it's already properly structured
444+
const executionData = result.execution as any
445+
446+
// Before persisting, clean up any response objects with zero tokens in agent blocks
447+
// This prevents confusion in the console logs
448+
if (executionData.logs && Array.isArray(executionData.logs)) {
449+
executionData.logs.forEach((log: BlockLog) => {
450+
if (log.blockType === 'agent' && log.output?.response) {
451+
const response = log.output.response;
452+
453+
// Check for zero tokens that will be estimated later
454+
if (response.tokens &&
455+
(!response.tokens.completion || response.tokens.completion === 0) &&
456+
(!response.toolCalls || !response.toolCalls.list || response.toolCalls.list.length === 0)) {
457+
458+
// Remove tokens from console display to avoid confusion
459+
// They'll be properly estimated in the execution logger
460+
delete response.tokens;
461+
}
462+
}
463+
});
464+
}
465+
466+
// Build trace spans and persist
467+
const { traceSpans, totalDuration } = buildTraceSpans(executionData)
468+
const enrichedResult = {
469+
...executionData,
470+
traceSpans,
471+
totalDuration,
472+
}
473+
474+
const executionId = uuidv4()
475+
await persistExecutionLogs(workflowId, executionId, enrichedResult, 'chat')
476+
logger.debug(`[${requestId}] Persisted execution logs for streaming chat with ID: ${executionId}`)
477+
478+
return { success: true }
479+
} catch (error) {
480+
logger.error(`[${requestId}] Failed to persist streaming chat execution logs:`, error)
481+
return { success: false, error }
482+
} finally {
483+
// Ensure the stream is properly closed even if an error occurs
484+
try {
485+
const controller = new AbortController()
486+
const signal = controller.signal
487+
controller.abort()
488+
} catch (cleanupError) {
489+
logger.debug(`[${requestId}] Error during stream cleanup: ${cleanupError}`)
490+
}
491+
}
492+
})()
493+
494+
// Register this processing promise with a global handler or tracker if needed
495+
// This allows the background task to be monitored or waited for in testing
496+
if (typeof global.__chatStreamProcessingTasks !== 'undefined') {
497+
global.__chatStreamProcessingTasks.push(processingPromise as Promise<{success: boolean, error?: any}>)
498+
}
499+
500+
// Return the client-facing stream
501+
return clientStream
502+
}
406503

407504
// Mark as chat execution in metadata
408505
if (result) {
@@ -412,7 +509,7 @@ export async function executeWorkflowForChat(chatId: string, message: string) {
412509
}
413510
}
414511

415-
// Persist execution logs using the 'chat' trigger type
512+
// Persist execution logs using the 'chat' trigger type for non-streaming results
416513
try {
417514
// Build trace spans to enrich the logs (same as in use-workflow-execution.ts)
418515
const { traceSpans, totalDuration } = buildTraceSpans(result)
@@ -543,4 +640,20 @@ export async function executeWorkflowForChat(chatId: string, message: string) {
543640
type: 'workflow'
544641
}
545642
}
643+
}
644+
645+
/**
646+
* Utility function to properly drain a stream to prevent memory leaks
647+
*/
648+
async function drainStream(stream: ReadableStream): Promise<void> {
649+
const reader = stream.getReader()
650+
try {
651+
while (true) {
652+
const { done, value } = await reader.read()
653+
if (done) break
654+
// We don't need to do anything with the value, just drain the stream
655+
}
656+
} finally {
657+
reader.releaseLock()
658+
}
546659
}

sim/app/api/providers/route.ts

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { NextRequest, NextResponse } from 'next/server'
22
import { createLogger } from '@/lib/logs/console-logger'
33
import { executeProviderRequest } from '@/providers'
44
import { getApiKey } from '@/providers/utils'
5+
import { StreamingExecution } from '@/executor/types'
56

67
const logger = createLogger('ProvidersAPI')
78

@@ -24,6 +25,7 @@ export async function POST(request: NextRequest) {
2425
apiKey,
2526
responseFormat,
2627
workflowId,
28+
stream,
2729
} = body
2830

2931
let finalApiKey: string
@@ -48,8 +50,90 @@ export async function POST(request: NextRequest) {
4850
apiKey: finalApiKey,
4951
responseFormat,
5052
workflowId,
53+
stream,
5154
})
5255

56+
// Check if the response is a StreamingExecution
57+
if (response && typeof response === 'object' && 'stream' in response && 'execution' in response) {
58+
const streamingExec = response as StreamingExecution
59+
logger.info('Received StreamingExecution from provider')
60+
61+
// Extract the stream and execution data
62+
const stream = streamingExec.stream
63+
const executionData = streamingExec.execution
64+
65+
// Attach the execution data as a custom header
66+
// We need to safely serialize the execution data to avoid circular references
67+
let executionDataHeader
68+
try {
69+
// Create a safe version of execution data with the most important fields
70+
const safeExecutionData = {
71+
success: executionData.success,
72+
output: {
73+
response: {
74+
// Sanitize content to remove non-ASCII characters that would cause ByteString errors
75+
content: executionData.output?.response?.content
76+
? String(executionData.output.response.content).replace(/[\u0080-\uFFFF]/g, '')
77+
: '',
78+
model: executionData.output?.response?.model,
79+
tokens: executionData.output?.response?.tokens || {
80+
prompt: 0,
81+
completion: 0,
82+
total: 0
83+
},
84+
// Sanitize any potential Unicode characters in tool calls
85+
toolCalls: executionData.output?.response?.toolCalls
86+
? sanitizeToolCalls(executionData.output.response.toolCalls)
87+
: undefined,
88+
providerTiming: executionData.output?.response?.providerTiming,
89+
cost: executionData.output?.response?.cost,
90+
}
91+
},
92+
error: executionData.error,
93+
logs: [], // Strip logs from header to avoid encoding issues
94+
metadata: {
95+
startTime: executionData.metadata?.startTime,
96+
endTime: executionData.metadata?.endTime,
97+
duration: executionData.metadata?.duration
98+
},
99+
isStreaming: true, // Always mark streaming execution data as streaming
100+
blockId: executionData.logs?.[0]?.blockId,
101+
blockName: executionData.logs?.[0]?.blockName,
102+
blockType: executionData.logs?.[0]?.blockType,
103+
}
104+
executionDataHeader = JSON.stringify(safeExecutionData)
105+
} catch (error) {
106+
logger.error('Failed to serialize execution data:', error)
107+
executionDataHeader = JSON.stringify({
108+
success: executionData.success,
109+
error: 'Failed to serialize full execution data'
110+
})
111+
}
112+
113+
// Return the stream with execution data in a header
114+
return new Response(stream, {
115+
headers: {
116+
'Content-Type': 'text/event-stream',
117+
'Cache-Control': 'no-cache',
118+
'Connection': 'keep-alive',
119+
'X-Execution-Data': executionDataHeader
120+
},
121+
})
122+
}
123+
124+
// Check if the response is a ReadableStream for streaming
125+
if (response instanceof ReadableStream) {
126+
logger.info('Streaming response from provider')
127+
return new Response(response, {
128+
headers: {
129+
'Content-Type': 'text/event-stream',
130+
'Cache-Control': 'no-cache',
131+
'Connection': 'keep-alive',
132+
},
133+
})
134+
}
135+
136+
// Return regular JSON response for non-streaming
53137
return NextResponse.json(response)
54138
} catch (error) {
55139
logger.error('Provider request failed:', error)
@@ -59,3 +143,89 @@ export async function POST(request: NextRequest) {
59143
)
60144
}
61145
}
146+
147+
/**
148+
* Helper function to sanitize tool calls to remove Unicode characters
149+
*/
150+
function sanitizeToolCalls(toolCalls: any) {
151+
// If it's an object with a list property, sanitize the list
152+
if (toolCalls && typeof toolCalls === 'object' && Array.isArray(toolCalls.list)) {
153+
return {
154+
...toolCalls,
155+
list: toolCalls.list.map(sanitizeToolCall)
156+
}
157+
}
158+
159+
// If it's an array, sanitize each item
160+
if (Array.isArray(toolCalls)) {
161+
return toolCalls.map(sanitizeToolCall)
162+
}
163+
164+
return toolCalls
165+
}
166+
167+
/**
168+
* Sanitize a single tool call to remove Unicode characters
169+
*/
170+
function sanitizeToolCall(toolCall: any) {
171+
if (!toolCall || typeof toolCall !== 'object') return toolCall
172+
173+
// Create a sanitized copy
174+
const sanitized = { ...toolCall }
175+
176+
// Sanitize any string fields that might contain Unicode
177+
if (typeof sanitized.name === 'string') {
178+
sanitized.name = sanitized.name.replace(/[\u0080-\uFFFF]/g, '')
179+
}
180+
181+
// Sanitize input/arguments
182+
if (sanitized.input && typeof sanitized.input === 'object') {
183+
sanitized.input = sanitizeObject(sanitized.input)
184+
}
185+
186+
if (sanitized.arguments && typeof sanitized.arguments === 'object') {
187+
sanitized.arguments = sanitizeObject(sanitized.arguments)
188+
}
189+
190+
// Sanitize output/result
191+
if (sanitized.output && typeof sanitized.output === 'object') {
192+
sanitized.output = sanitizeObject(sanitized.output)
193+
}
194+
195+
if (sanitized.result && typeof sanitized.result === 'object') {
196+
sanitized.result = sanitizeObject(sanitized.result)
197+
}
198+
199+
// Sanitize error message
200+
if (typeof sanitized.error === 'string') {
201+
sanitized.error = sanitized.error.replace(/[\u0080-\uFFFF]/g, '')
202+
}
203+
204+
return sanitized
205+
}
206+
207+
/**
208+
* Recursively sanitize an object to remove Unicode characters from strings
209+
*/
210+
function sanitizeObject(obj: any): any {
211+
if (!obj || typeof obj !== 'object') return obj
212+
213+
// Handle arrays
214+
if (Array.isArray(obj)) {
215+
return obj.map(item => sanitizeObject(item))
216+
}
217+
218+
// Handle objects
219+
const result: any = {}
220+
for (const [key, value] of Object.entries(obj)) {
221+
if (typeof value === 'string') {
222+
result[key] = value.replace(/[\u0080-\uFFFF]/g, '')
223+
} else if (typeof value === 'object' && value !== null) {
224+
result[key] = sanitizeObject(value)
225+
} else {
226+
result[key] = value
227+
}
228+
}
229+
230+
return result
231+
}

0 commit comments

Comments
 (0)