@@ -103,6 +103,13 @@ export namespace SessionPrompt {
103103 const state = yield * SessionRunState . Service
104104 const revert = yield * SessionRevert . Service
105105
106+ const run = {
107+ promise : < A , E > ( effect : Effect . Effect < A , E > ) =>
108+ Effect . runPromise ( effect . pipe ( Effect . provide ( EffectLogger . layer ) ) ) ,
109+ fork : < A , E > ( effect : Effect . Effect < A , E > ) =>
110+ Effect . runFork ( effect . pipe ( Effect . provide ( EffectLogger . layer ) ) ) ,
111+ }
112+
106113 const cancel = Effect . fn ( "SessionPrompt.cancel" ) ( function * ( sessionID : SessionID ) {
107114 yield * elog . info ( "cancel" , { sessionID } )
108115 yield * state . cancel ( sessionID )
@@ -358,7 +365,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
358365 agent : input . agent . name ,
359366 messages : input . messages ,
360367 metadata : ( val ) =>
361- Effect . runPromise (
368+ run . promise (
362369 input . processor . updateToolCall ( options . toolCallId , ( match ) => {
363370 if ( ! [ "running" , "pending" ] . includes ( match . state . status ) ) return match
364371 return {
@@ -374,14 +381,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the
374381 } ) ,
375382 ) ,
376383 ask : ( req ) =>
377- Effect . runPromise (
378- permission . ask ( {
384+ permission
385+ . ask ( {
379386 ...req ,
380387 sessionID : input . session . id ,
381388 tool : { messageID : input . processor . message . id , callID : options . toolCallId } ,
382389 ruleset : Permission . merge ( input . agent . permission , input . session . permission ?? [ ] ) ,
383- } ) ,
384- ) ,
390+ } )
391+ . pipe ( Effect . orDie ) ,
385392 } )
386393
387394 for ( const item of yield * registry . tools ( {
@@ -395,15 +402,15 @@ NOTE: At any point in time through this workflow you should feel free to ask the
395402 description : item . description ,
396403 inputSchema : jsonSchema ( schema as any ) ,
397404 execute ( args , options ) {
398- return Effect . runPromise (
405+ return run . promise (
399406 Effect . gen ( function * ( ) {
400407 const ctx = context ( args , options )
401408 yield * plugin . trigger (
402409 "tool.execute.before" ,
403410 { tool : item . id , sessionID : ctx . sessionID , callID : ctx . callID } ,
404411 { args } ,
405412 )
406- const result = yield * Effect . promise ( ( ) => item . execute ( args , ctx ) )
413+ const result = yield * item . execute ( args , ctx )
407414 const output = {
408415 ...result ,
409416 attachments : result . attachments ?. map ( ( attachment ) => ( {
@@ -436,15 +443,15 @@ NOTE: At any point in time through this workflow you should feel free to ask the
436443 const transformed = ProviderTransform . schema ( input . model , schema )
437444 item . inputSchema = jsonSchema ( transformed )
438445 item . execute = ( args , opts ) =>
439- Effect . runPromise (
446+ run . promise (
440447 Effect . gen ( function * ( ) {
441448 const ctx = context ( args , opts )
442449 yield * plugin . trigger (
443450 "tool.execute.before" ,
444451 { tool : key , sessionID : ctx . sessionID , callID : opts . toolCallId } ,
445452 { args } ,
446453 )
447- yield * Effect . promise ( ( ) => ctx . ask ( { permission : key , metadata : { } , patterns : [ "*" ] , always : [ "*" ] } ) )
454+ yield * ctx . ask ( { permission : key , metadata : { } , patterns : [ "*" ] , always : [ "*" ] } )
448455 const result : Awaited < ReturnType < NonNullable < typeof execute > > > = yield * Effect . promise ( ( ) =>
449456 execute ( args , opts ) ,
450457 )
@@ -576,45 +583,46 @@ NOTE: At any point in time through this workflow you should feel free to ask the
576583 }
577584
578585 let error : Error | undefined
579- const result = yield * Effect . promise ( ( signal ) =>
580- taskTool
581- . execute ( taskArgs , {
582- agent : task . agent ,
583- messageID : assistantMessage . id ,
584- sessionID,
585- abort : signal ,
586- callID : part . callID ,
587- extra : { bypassAgentCheck : true , promptOps } ,
588- messages : msgs ,
589- metadata ( val : { title ?: string ; metadata ?: Record < string , any > } ) {
590- return Effect . runPromise (
591- Effect . gen ( function * ( ) {
592- part = yield * sessions . updatePart ( {
593- ...part ,
594- type : "tool" ,
595- state : { ...part . state , ...val } ,
596- } satisfies MessageV2 . ToolPart )
597- } ) ,
598- )
599- } ,
600- ask ( req : any ) {
601- return Effect . runPromise (
602- permission . ask ( {
603- ...req ,
604- sessionID,
605- ruleset : Permission . merge ( taskAgent . permission , session . permission ?? [ ] ) ,
606- } ) ,
607- )
608- } ,
609- } )
610- . catch ( ( e ) => {
611- error = e instanceof Error ? e : new Error ( String ( e ) )
586+ const taskAbort = new AbortController ( )
587+ const result = yield * taskTool
588+ . execute ( taskArgs , {
589+ agent : task . agent ,
590+ messageID : assistantMessage . id ,
591+ sessionID,
592+ abort : taskAbort . signal ,
593+ callID : part . callID ,
594+ extra : { bypassAgentCheck : true , promptOps } ,
595+ messages : msgs ,
596+ metadata ( val : { title ?: string ; metadata ?: Record < string , any > } ) {
597+ return run . promise (
598+ Effect . gen ( function * ( ) {
599+ part = yield * sessions . updatePart ( {
600+ ...part ,
601+ type : "tool" ,
602+ state : { ...part . state , ...val } ,
603+ } satisfies MessageV2 . ToolPart )
604+ } ) ,
605+ )
606+ } ,
607+ ask : ( req : any ) =>
608+ permission
609+ . ask ( {
610+ ...req ,
611+ sessionID,
612+ ruleset : Permission . merge ( taskAgent . permission , session . permission ?? [ ] ) ,
613+ } )
614+ . pipe ( Effect . orDie ) ,
615+ } )
616+ . pipe (
617+ Effect . catchCause ( ( cause ) => {
618+ const defect = Cause . squash ( cause )
619+ error = defect instanceof Error ? defect : new Error ( String ( defect ) )
612620 log . error ( "subtask execution failed" , { error, agent : task . agent , description : task . description } )
613- return undefined
621+ return Effect . void
614622 } ) ,
615- ) . pipe (
616- Effect . onInterrupt ( ( ) =>
617- Effect . gen ( function * ( ) {
623+ Effect . onInterrupt ( ( ) =>
624+ Effect . gen ( function * ( ) {
625+ taskAbort . abort ( )
618626 assistantMessage . finish = "tool-calls"
619627 assistantMessage . time . completed = Date . now ( )
620628 yield * sessions . updateMessage ( assistantMessage )
@@ -630,9 +638,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the
630638 } ,
631639 } satisfies MessageV2 . ToolPart )
632640 }
633- } ) ,
634- ) ,
635- )
641+ } ) ) ,
642+ )
636643
637644 const attachments = result ?. attachments ?. map ( ( attachment ) => ( {
638645 ...attachment ,
@@ -855,7 +862,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
855862 output += chunk
856863 if ( part . state . status === "running" ) {
857864 part . state . metadata = { output, description : "" }
858- void Effect . runFork ( sessions . updatePart ( part ) )
865+ void run . fork ( sessions . updatePart ( part ) )
859866 }
860867 } ) ,
861868 )
@@ -1037,19 +1044,21 @@ NOTE: At any point in time through this workflow you should feel free to ask the
10371044 if ( yield * fsys . isDir ( filepath ) ) part . mime = "application/x-directory"
10381045
10391046 const { read } = yield * registry . named ( )
1040- const execRead = ( args : Parameters < typeof read . execute > [ 0 ] , extra ?: Tool . Context [ "extra" ] ) =>
1041- Effect . promise ( ( signal : AbortSignal ) =>
1042- read . execute ( args , {
1047+ const execRead = ( args : Parameters < typeof read . execute > [ 0 ] , extra ?: Tool . Context [ "extra" ] ) => {
1048+ const controller = new AbortController ( )
1049+ return read
1050+ . execute ( args , {
10431051 sessionID : input . sessionID ,
1044- abort : signal ,
1052+ abort : controller . signal ,
10451053 agent : input . agent ! ,
10461054 messageID : info . id ,
10471055 extra : { bypassCwdCheck : true , ...extra } ,
10481056 messages : [ ] ,
1049- metadata : async ( ) => { } ,
1050- ask : async ( ) => { } ,
1051- } ) ,
1052- )
1057+ metadata : ( ) => { } ,
1058+ ask : ( ) => Effect . void ,
1059+ } )
1060+ . pipe ( Effect . onInterrupt ( ( ) => Effect . sync ( ( ) => controller . abort ( ) ) ) )
1061+ }
10531062
10541063 if ( part . mime === "text/plain" ) {
10551064 let offset : number | undefined
@@ -1655,9 +1664,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
16551664 } )
16561665
16571666 const promptOps : TaskPromptOps = {
1658- cancel : ( sessionID ) => Effect . runFork ( cancel ( sessionID ) ) ,
1659- resolvePromptParts : ( template ) => Effect . runPromise ( resolvePromptParts ( template ) ) ,
1660- prompt : ( input ) => Effect . runPromise ( prompt ( input ) ) ,
1667+ cancel : ( sessionID ) => run . fork ( cancel ( sessionID ) ) ,
1668+ resolvePromptParts : ( template ) => run . promise ( resolvePromptParts ( template ) ) ,
1669+ prompt : ( input ) => run . promise ( prompt ( input ) ) ,
16611670 }
16621671
16631672 return Service . of ( {
0 commit comments