Skip to content

Commit cd004cf

Browse files
authored
refactor(session): eliminate Effect.promise roundtrips for sync MessageV2.stream (#21973)
1 parent 19ae8c8 commit cd004cf

2 files changed

Lines changed: 27 additions & 17 deletions

File tree

packages/opencode/src/session/index.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import type { Provider } from "@/provider/provider"
2929
import { Permission } from "@/permission"
3030
import { Global } from "@/global"
3131
import type { LanguageModelV2Usage } from "@ai-sdk/provider"
32-
import { Effect, Layer, Context } from "effect"
32+
import { Effect, Layer, Option, Context } from "effect"
3333
import { makeRuntime } from "@/effect/run-service"
3434

3535
export namespace Session {
@@ -352,6 +352,11 @@ export namespace Session {
352352
field: string
353353
delta: string
354354
}) => Effect.Effect<void>
355+
/** Finds the first message matching the predicate, searching newest-first. */
356+
readonly findMessage: (
357+
sessionID: SessionID,
358+
predicate: (msg: MessageV2.WithParts) => boolean,
359+
) => Effect.Effect<Option.Option<MessageV2.WithParts>>
355360
}
356361

357362
export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}
@@ -636,6 +641,17 @@ export namespace Session {
636641
yield* bus.publish(MessageV2.Event.PartDelta, input)
637642
})
638643

644+
/** Finds the first message matching the predicate, searching newest-first. */
645+
const findMessage = Effect.fn("Session.findMessage")(function* (
646+
sessionID: SessionID,
647+
predicate: (msg: MessageV2.WithParts) => boolean,
648+
) {
649+
for (const item of MessageV2.stream(sessionID)) {
650+
if (predicate(item)) return Option.some(item)
651+
}
652+
return Option.none<MessageV2.WithParts>()
653+
})
654+
639655
return Service.of({
640656
create,
641657
fork,
@@ -657,6 +673,7 @@ export namespace Session {
657673
updatePart,
658674
getPart,
659675
updatePartDelta,
676+
findMessage,
660677
})
661678
}),
662679
)

packages/opencode/src/session/prompt.ts

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -902,12 +902,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the
902902
})
903903

904904
const lastModel = Effect.fnUntraced(function* (sessionID: SessionID) {
905-
const model = yield* Effect.promise(async () => {
906-
for await (const item of MessageV2.stream(sessionID)) {
907-
if (item.info.role === "user" && item.info.model) return item.info.model
908-
}
909-
})
910-
if (model) return model
905+
const match = yield* sessions.findMessage(sessionID, (m) => m.info.role === "user" && !!m.info.model)
906+
if (Option.isSome(match) && match.value.info.role === "user") return match.value.info.model
911907
return yield* provider.defaultModel()
912908
})
913909

@@ -1290,16 +1286,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the
12901286
},
12911287
)
12921288

1293-
const lastAssistant = (sessionID: SessionID) =>
1294-
Effect.promise(async () => {
1295-
let latest: MessageV2.WithParts | undefined
1296-
for await (const item of MessageV2.stream(sessionID)) {
1297-
latest ??= item
1298-
if (item.info.role !== "user") return item
1299-
}
1300-
if (latest) return latest
1301-
throw new Error("Impossible")
1302-
})
1289+
const lastAssistant = Effect.fnUntraced(function* (sessionID: SessionID) {
1290+
const match = yield* sessions.findMessage(sessionID, (m) => m.info.role !== "user")
1291+
if (Option.isSome(match)) return match.value
1292+
const msgs = yield* sessions.messages({ sessionID, limit: 1 })
1293+
if (msgs.length > 0) return msgs[0]
1294+
throw new Error("Impossible")
1295+
})
13031296

13041297
const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")(
13051298
function* (sessionID: SessionID) {

0 commit comments

Comments
 (0)