1- import { Cause , Effect , Layer , ServiceMap } from "effect"
1+ import { Cause , Deferred , Effect , Layer , ServiceMap } from "effect"
22import * as Stream from "effect/Stream"
33import { Agent } from "@/agent/agent"
44import { Bus } from "@/bus"
@@ -18,6 +18,7 @@ import { SessionStatus } from "./status"
1818import { SessionSummary } from "./summary"
1919import type { Provider } from "@/provider/provider"
2020import { Question } from "@/question"
21+ import { errorMessage } from "@/util/error"
2122import { isRecord } from "@/util/record"
2223
2324export namespace SessionProcessor {
@@ -30,7 +31,19 @@ export namespace SessionProcessor {
3031
3132 export interface Handle {
3233 readonly message : MessageV2 . Assistant
33- readonly partFromToolCall : ( toolCallID : string ) => MessageV2 . ToolPart | undefined
34+ readonly updateToolCall : (
35+ toolCallID : string ,
36+ update : ( part : MessageV2 . ToolPart ) => MessageV2 . ToolPart ,
37+ ) => Effect . Effect < MessageV2 . ToolPart | undefined >
38+ readonly completeToolCall : (
39+ toolCallID : string ,
40+ output : {
41+ title : string
42+ metadata : Record < string , any >
43+ output : string
44+ attachments ?: MessageV2 . FilePart [ ]
45+ } ,
46+ ) => Effect . Effect < void >
3447 readonly process : ( streamInput : LLM . StreamInput ) => Effect . Effect < Result >
3548 }
3649
@@ -44,8 +57,15 @@ export namespace SessionProcessor {
4457 readonly create : ( input : Input ) => Effect . Effect < Handle >
4558 }
4659
60+ type ToolCall = {
61+ partID : MessageV2 . ToolPart [ "id" ]
62+ messageID : MessageV2 . ToolPart [ "messageID" ]
63+ sessionID : MessageV2 . ToolPart [ "sessionID" ]
64+ done : Deferred . Deferred < void >
65+ }
66+
4767 interface ProcessorContext extends Input {
48- toolcalls : Record < string , MessageV2 . ToolPart >
68+ toolcalls : Record < string , ToolCall >
4969 shouldBreak : boolean
5070 snapshot : string | undefined
5171 blocked : boolean
@@ -108,6 +128,88 @@ export namespace SessionProcessor {
108128 aborted,
109129 } )
110130
131+ const settleToolCall = Effect . fn ( "SessionProcessor.settleToolCall" ) ( function * ( toolCallID : string ) {
132+ const done = ctx . toolcalls [ toolCallID ] ?. done
133+ delete ctx . toolcalls [ toolCallID ]
134+ if ( done ) yield * Deferred . succeed ( done , undefined ) . pipe ( Effect . ignore )
135+ } )
136+
137+ const readToolCall = Effect . fn ( "SessionProcessor.readToolCall" ) ( function * ( toolCallID : string ) {
138+ const call = ctx . toolcalls [ toolCallID ]
139+ if ( ! call ) return
140+ const part = yield * session . getPart ( {
141+ partID : call . partID ,
142+ messageID : call . messageID ,
143+ sessionID : call . sessionID ,
144+ } )
145+ if ( ! part || part . type !== "tool" ) {
146+ delete ctx . toolcalls [ toolCallID ]
147+ return
148+ }
149+ return { call, part }
150+ } )
151+
152+ const updateToolCall = Effect . fn ( "SessionProcessor.updateToolCall" ) ( function * (
153+ toolCallID : string ,
154+ update : ( part : MessageV2 . ToolPart ) => MessageV2 . ToolPart ,
155+ ) {
156+ const match = yield * readToolCall ( toolCallID )
157+ if ( ! match ) return
158+ const part = yield * session . updatePart ( update ( match . part ) )
159+ ctx . toolcalls [ toolCallID ] = {
160+ ...match . call ,
161+ partID : part . id ,
162+ messageID : part . messageID ,
163+ sessionID : part . sessionID ,
164+ }
165+ return part
166+ } )
167+
168+ const completeToolCall = Effect . fn ( "SessionProcessor.completeToolCall" ) ( function * (
169+ toolCallID : string ,
170+ output : {
171+ title : string
172+ metadata : Record < string , any >
173+ output : string
174+ attachments ?: MessageV2 . FilePart [ ]
175+ } ,
176+ ) {
177+ const match = yield * readToolCall ( toolCallID )
178+ if ( ! match || match . part . state . status !== "running" ) return
179+ yield * session . updatePart ( {
180+ ...match . part ,
181+ state : {
182+ status : "completed" ,
183+ input : match . part . state . input ,
184+ output : output . output ,
185+ metadata : output . metadata ,
186+ title : output . title ,
187+ time : { start : match . part . state . time . start , end : Date . now ( ) } ,
188+ attachments : output . attachments ,
189+ } ,
190+ } )
191+ yield * settleToolCall ( toolCallID )
192+ } )
193+
194+ const failToolCall = Effect . fn ( "SessionProcessor.failToolCall" ) ( function * ( toolCallID : string , error : unknown ) {
195+ const match = yield * readToolCall ( toolCallID )
196+ if ( ! match || match . part . state . status !== "running" ) return false
197+ yield * session . updatePart ( {
198+ ...match . part ,
199+ state : {
200+ status : "error" ,
201+ input : match . part . state . input ,
202+ error : errorMessage ( error ) ,
203+ time : { start : match . part . state . time . start , end : Date . now ( ) } ,
204+ } ,
205+ } )
206+ if ( error instanceof Permission . RejectedError || error instanceof Question . RejectedError ) {
207+ ctx . blocked = ctx . shouldBreak
208+ }
209+ yield * settleToolCall ( toolCallID )
210+ return true
211+ } )
212+
111213 const handleEvent = Effect . fn ( "SessionProcessor.handleEvent" ) ( function * ( value : StreamEvent ) {
112214 switch ( value . type ) {
113215 case "start" :
@@ -154,8 +256,8 @@ export namespace SessionProcessor {
154256 if ( ctx . assistantMessage . summary ) {
155257 throw new Error ( `Tool call not allowed while generating summary: ${ value . toolName } ` )
156258 }
157- ctx . toolcalls [ value . id ] = yield * session . updatePart ( {
158- id : ctx . toolcalls [ value . id ] ?. id ?? PartID . ascending ( ) ,
259+ const part = yield * session . updatePart ( {
260+ id : ctx . toolcalls [ value . id ] ?. partID ?? PartID . ascending ( ) ,
159261 messageID : ctx . assistantMessage . id ,
160262 sessionID : ctx . assistantMessage . sessionID ,
161263 type : "tool" ,
@@ -164,6 +266,12 @@ export namespace SessionProcessor {
164266 state : { status : "pending" , input : { } , raw : "" } ,
165267 metadata : value . providerExecuted ? { providerExecuted : true } : undefined ,
166268 } satisfies MessageV2 . ToolPart )
269+ ctx . toolcalls [ value . id ] = {
270+ done : yield * Deferred . make < void > ( ) ,
271+ partID : part . id ,
272+ messageID : part . messageID ,
273+ sessionID : part . sessionID ,
274+ }
167275 return
168276
169277 case "tool-input-delta" :
@@ -176,14 +284,7 @@ export namespace SessionProcessor {
176284 if ( ctx . assistantMessage . summary ) {
177285 throw new Error ( `Tool call not allowed while generating summary: ${ value . toolName } ` )
178286 }
179- const pointer = ctx . toolcalls [ value . toolCallId ]
180- const match = yield * session . getPart ( {
181- partID : pointer . id ,
182- messageID : pointer . messageID ,
183- sessionID : pointer . sessionID ,
184- } )
185- if ( ! match || match . type !== "tool" ) return
186- ctx . toolcalls [ value . toolCallId ] = yield * session . updatePart ( {
287+ yield * updateToolCall ( value . toolCallId , ( match ) => ( {
187288 ...match ,
188289 tool : value . toolName ,
189290 state : {
@@ -195,7 +296,7 @@ export namespace SessionProcessor {
195296 metadata : match . metadata ?. providerExecuted
196297 ? { ...value . providerMetadata , providerExecuted : true }
197298 : value . providerMetadata ,
198- } satisfies MessageV2 . ToolPart )
299+ } ) )
199300
200301 const parts = MessageV2 . parts ( ctx . assistantMessage . id )
201302 const recentParts = parts . slice ( - DOOM_LOOP_THRESHOLD )
@@ -226,41 +327,12 @@ export namespace SessionProcessor {
226327 }
227328
228329 case "tool-result" : {
229- const match = ctx . toolcalls [ value . toolCallId ]
230- if ( ! match || match . state . status !== "running" ) return
231- yield * session . updatePart ( {
232- ...match ,
233- state : {
234- status : "completed" ,
235- input : value . input ?? match . state . input ,
236- output : value . output . output ,
237- metadata : value . output . metadata ,
238- title : value . output . title ,
239- time : { start : match . state . time . start , end : Date . now ( ) } ,
240- attachments : value . output . attachments ,
241- } ,
242- } )
243- delete ctx . toolcalls [ value . toolCallId ]
330+ yield * completeToolCall ( value . toolCallId , value . output )
244331 return
245332 }
246333
247334 case "tool-error" : {
248- const match = ctx . toolcalls [ value . toolCallId ]
249- if ( ! match || match . state . status !== "running" ) return
250-
251- yield * session . updatePart ( {
252- ...match ,
253- state : {
254- status : "error" ,
255- input : value . input ?? match . state . input ,
256- error : value . error instanceof Error ? value . error . message : String ( value . error ) ,
257- time : { start : match . state . time . start , end : Date . now ( ) } ,
258- } ,
259- } )
260- if ( value . error instanceof Permission . RejectedError || value . error instanceof Question . RejectedError ) {
261- ctx . blocked = ctx . shouldBreak
262- }
263- delete ctx . toolcalls [ value . toolCallId ]
335+ yield * failToolCall ( value . toolCallId , value . error )
264336 return
265337 }
266338
@@ -413,7 +485,16 @@ export namespace SessionProcessor {
413485 }
414486 ctx . reasoningMap = { }
415487
416- for ( const part of Object . values ( ctx . toolcalls ) ) {
488+ yield * Effect . forEach (
489+ Object . values ( ctx . toolcalls ) ,
490+ ( call ) => Deferred . await ( call . done ) . pipe ( Effect . timeout ( "250 millis" ) , Effect . ignore ) ,
491+ { concurrency : "unbounded" } ,
492+ )
493+
494+ for ( const toolCallID of Object . keys ( ctx . toolcalls ) ) {
495+ const match = yield * readToolCall ( toolCallID )
496+ if ( ! match ) continue
497+ const part = match . part
417498 const end = Date . now ( )
418499 const metadata = "metadata" in part . state && isRecord ( part . state . metadata ) ? part . state . metadata : { }
419500 yield * session . updatePart ( {
@@ -503,9 +584,8 @@ export namespace SessionProcessor {
503584 get message ( ) {
504585 return ctx . assistantMessage
505586 } ,
506- partFromToolCall ( toolCallID : string ) {
507- return ctx . toolcalls [ toolCallID ]
508- } ,
587+ updateToolCall,
588+ completeToolCall,
509589 process,
510590 } satisfies Handle
511591 } )
0 commit comments