Skip to content

Commit af20191

Browse files
authored
feat(core): sync routes, refactor proxy, session restore, and more syncing (#22518)
1 parent 47af00b commit af20191

11 files changed

Lines changed: 1132 additions & 63 deletions

File tree

packages/opencode/src/control-plane/workspace.ts

Lines changed: 237 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import z from "zod"
22
import { setTimeout as sleep } from "node:timers/promises"
33
import { fn } from "@/util/fn"
4-
import { Database, eq } from "@/storage/db"
4+
import { Database, asc, eq } from "@/storage/db"
55
import { Project } from "@/project/project"
66
import { BusEvent } from "@/bus/bus-event"
77
import { GlobalBus } from "@/bus/global"
88
import { SyncEvent } from "@/sync"
9+
import { EventTable } from "@/sync/event.sql"
10+
import { Flag } from "@/flag/flag"
911
import { Log } from "@/util/log"
1012
import { Filesystem } from "@/util/filesystem"
1113
import { ProjectID } from "@/project/schema"
@@ -15,6 +17,11 @@ import { getAdaptor } from "./adaptors"
1517
import { WorkspaceInfo } from "./types"
1618
import { WorkspaceID } from "./schema"
1719
import { parseSSE } from "./sse"
20+
import { Session } from "@/session"
21+
import { SessionTable } from "@/session/session.sql"
22+
import { SessionID } from "@/session/schema"
23+
import { errorData } from "@/util/error"
24+
import { AppRuntime } from "@/effect/app-runtime"
1825

1926
export namespace Workspace {
2027
export const Info = WorkspaceInfo.meta({
@@ -29,6 +36,13 @@ export namespace Workspace {
2936
})
3037
export type ConnectionStatus = z.infer<typeof ConnectionStatus>
3138

39+
const Restore = z.object({
40+
workspaceID: WorkspaceID.zod,
41+
sessionID: SessionID.zod,
42+
total: z.number().int().min(0),
43+
step: z.number().int().min(0),
44+
})
45+
3246
export const Event = {
3347
Ready: BusEvent.define(
3448
"workspace.ready",
@@ -42,6 +56,7 @@ export namespace Workspace {
4256
message: z.string(),
4357
}),
4458
),
59+
Restore: BusEvent.define("workspace.restore", Restore),
4560
Status: BusEvent.define("workspace.status", ConnectionStatus),
4661
}
4762

@@ -102,11 +117,170 @@ export namespace Workspace {
102117
return info
103118
})
104119

120+
const SessionRestoreInput = z.object({
121+
workspaceID: WorkspaceID.zod,
122+
sessionID: SessionID.zod,
123+
})
124+
125+
export const sessionRestore = fn(SessionRestoreInput, async (input) => {
126+
log.info("session restore requested", {
127+
workspaceID: input.workspaceID,
128+
sessionID: input.sessionID,
129+
})
130+
try {
131+
const space = await get(input.workspaceID)
132+
if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`)
133+
134+
const adaptor = await getAdaptor(space.projectID, space.type)
135+
const target = await adaptor.target(space)
136+
137+
// Need to switch the workspace of the session
138+
SyncEvent.run(Session.Event.Updated, {
139+
sessionID: input.sessionID,
140+
info: {
141+
workspaceID: input.workspaceID,
142+
},
143+
})
144+
145+
const rows = Database.use((db) =>
146+
db
147+
.select({
148+
id: EventTable.id,
149+
aggregateID: EventTable.aggregate_id,
150+
seq: EventTable.seq,
151+
type: EventTable.type,
152+
data: EventTable.data,
153+
})
154+
.from(EventTable)
155+
.where(eq(EventTable.aggregate_id, input.sessionID))
156+
.orderBy(asc(EventTable.seq))
157+
.all(),
158+
)
159+
if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`)
160+
161+
const all = rows
162+
163+
const size = 10
164+
const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size))
165+
const total = sets.length
166+
log.info("session restore prepared", {
167+
workspaceID: input.workspaceID,
168+
sessionID: input.sessionID,
169+
workspaceType: space.type,
170+
directory: space.directory,
171+
target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
172+
events: all.length,
173+
batches: total,
174+
first: all[0]?.seq,
175+
last: all.at(-1)?.seq,
176+
})
177+
GlobalBus.emit("event", {
178+
directory: "global",
179+
workspace: input.workspaceID,
180+
payload: {
181+
type: Event.Restore.type,
182+
properties: {
183+
workspaceID: input.workspaceID,
184+
sessionID: input.sessionID,
185+
total,
186+
step: 0,
187+
},
188+
},
189+
})
190+
for (const [i, events] of sets.entries()) {
191+
log.info("session restore batch starting", {
192+
workspaceID: input.workspaceID,
193+
sessionID: input.sessionID,
194+
step: i + 1,
195+
total,
196+
events: events.length,
197+
first: events[0]?.seq,
198+
last: events.at(-1)?.seq,
199+
target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
200+
})
201+
if (target.type === "local") {
202+
SyncEvent.replayAll(events)
203+
log.info("session restore batch replayed locally", {
204+
workspaceID: input.workspaceID,
205+
sessionID: input.sessionID,
206+
step: i + 1,
207+
total,
208+
events: events.length,
209+
})
210+
} else {
211+
const url = route(target.url, "/sync/replay")
212+
const headers = new Headers(target.headers)
213+
headers.set("content-type", "application/json")
214+
const res = await fetch(url, {
215+
method: "POST",
216+
headers,
217+
body: JSON.stringify({
218+
directory: space.directory ?? "",
219+
events,
220+
}),
221+
})
222+
if (!res.ok) {
223+
const body = await res.text()
224+
log.error("session restore batch failed", {
225+
workspaceID: input.workspaceID,
226+
sessionID: input.sessionID,
227+
step: i + 1,
228+
total,
229+
status: res.status,
230+
body,
231+
})
232+
throw new Error(
233+
`Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`,
234+
)
235+
}
236+
log.info("session restore batch posted", {
237+
workspaceID: input.workspaceID,
238+
sessionID: input.sessionID,
239+
step: i + 1,
240+
total,
241+
status: res.status,
242+
})
243+
}
244+
GlobalBus.emit("event", {
245+
directory: "global",
246+
workspace: input.workspaceID,
247+
payload: {
248+
type: Event.Restore.type,
249+
properties: {
250+
workspaceID: input.workspaceID,
251+
sessionID: input.sessionID,
252+
total,
253+
step: i + 1,
254+
},
255+
},
256+
})
257+
}
258+
259+
log.info("session restore complete", {
260+
workspaceID: input.workspaceID,
261+
sessionID: input.sessionID,
262+
batches: total,
263+
})
264+
265+
return {
266+
total,
267+
}
268+
} catch (err) {
269+
log.error("session restore failed", {
270+
workspaceID: input.workspaceID,
271+
sessionID: input.sessionID,
272+
error: errorData(err),
273+
})
274+
throw err
275+
}
276+
})
277+
105278
export function list(project: Project.Info) {
106279
const rows = Database.use((db) =>
107280
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
108281
)
109282
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
283+
110284
for (const space of spaces) startSync(space)
111285
return spaces
112286
}
@@ -120,13 +294,25 @@ export namespace Workspace {
120294
})
121295

122296
export const remove = fn(WorkspaceID.zod, async (id) => {
297+
const sessions = Database.use((db) =>
298+
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(),
299+
)
300+
for (const session of sessions) {
301+
await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id)))
302+
}
303+
123304
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
305+
124306
if (row) {
125307
stopSync(id)
126308

127309
const info = fromRow(row)
128-
const adaptor = await getAdaptor(info.projectID, row.type)
129-
adaptor.remove(info)
310+
try {
311+
const adaptor = await getAdaptor(info.projectID, row.type)
312+
await adaptor.remove(info)
313+
} catch (err) {
314+
log.error("adaptor not available when removing workspace", { type: row.type })
315+
}
130316
Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
131317
return info
132318
}
@@ -156,51 +342,81 @@ export namespace Workspace {
156342

157343
const log = Log.create({ service: "workspace-sync" })
158344

159-
async function workspaceEventLoop(space: Info, signal: AbortSignal) {
160-
log.info("starting sync: " + space.id)
345+
function route(url: string | URL, path: string) {
346+
const next = new URL(url)
347+
next.pathname = `${next.pathname.replace(/\/$/, "")}${path}`
348+
next.search = ""
349+
next.hash = ""
350+
return next
351+
}
161352

353+
async function syncWorkspace(space: Info, signal: AbortSignal) {
162354
while (!signal.aborted) {
163-
log.info("connecting to sync: " + space.id)
355+
log.info("connecting to global sync", { workspace: space.name })
164356

165-
setStatus(space.id, "connecting")
166357
const adaptor = await getAdaptor(space.projectID, space.type)
167358
const target = await adaptor.target(space)
168359

169360
if (target.type === "local") return
170361

171-
const res = await fetch(target.url + "/sync/event", { method: "GET", signal }).catch((err: unknown) => {
172-
setStatus(space.id, "error", String(err))
362+
const res = await fetch(route(target.url, "/global/event"), {
363+
method: "GET",
364+
headers: target.headers,
365+
signal,
366+
}).catch((err: unknown) => {
367+
setStatus(space.id, "error")
368+
369+
log.info("failed to connect to global sync", {
370+
workspace: space.name,
371+
error: err,
372+
})
173373
return undefined
174374
})
175-
if (!res || !res.ok || !res.body) {
176-
log.info("failed to connect to sync: " + res?.status)
177375

178-
setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
376+
if (!res || !res.ok || !res.body) {
377+
log.info("failed to connect to global sync", { workspace: space.name })
378+
setStatus(space.id, "error")
179379
await sleep(1000)
180380
continue
181381
}
382+
383+
log.info("global sync connected", { workspace: space.name })
182384
setStatus(space.id, "connected")
183-
await parseSSE(res.body, signal, (evt) => {
184-
const event = evt as SyncEvent.SerializedEvent
185385

386+
await parseSSE(res.body, signal, (evt: any) => {
186387
try {
187-
if (!event.type.startsWith("server.")) {
188-
SyncEvent.replay(event)
388+
if (!("payload" in evt)) return
389+
390+
if (evt.payload.type === "sync") {
391+
// This name -> type is temporary
392+
SyncEvent.replay({ ...evt.payload, type: evt.payload.name } as SyncEvent.SerializedEvent)
189393
}
394+
395+
GlobalBus.emit("event", {
396+
directory: evt.directory,
397+
project: evt.project,
398+
workspace: space.id,
399+
payload: evt.payload,
400+
})
190401
} catch (err) {
191-
log.warn("failed to replay sync event", {
402+
log.info("failed to replay global event", {
192403
workspaceID: space.id,
193404
error: err,
194405
})
195406
}
196407
})
408+
409+
log.info("disconnected from global sync: " + space.id)
197410
setStatus(space.id, "disconnected")
198-
log.info("disconnected to sync: " + space.id)
199-
await sleep(250)
411+
412+
// TODO: Implement exponential backoff
413+
await sleep(1000)
200414
}
201415
}
202416

203417
function startSync(space: Info) {
418+
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
419+
204420
if (space.type === "worktree") {
205421
void Filesystem.exists(space.directory!).then((exists) => {
206422
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
@@ -213,9 +429,9 @@ export namespace Workspace {
213429
aborts.set(space.id, abort)
214430
setStatus(space.id, "disconnected")
215431

216-
void workspaceEventLoop(space, abort.signal).catch((error) => {
432+
void syncWorkspace(space, abort.signal).catch((error) => {
217433
setStatus(space.id, "error", String(error))
218-
log.warn("workspace sync listener failed", {
434+
log.warn("workspace listener failed", {
219435
workspaceID: space.id,
220436
error,
221437
})

packages/opencode/src/server/instance/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { ConfigRoutes } from "./config"
2323
import { ExperimentalRoutes } from "./experimental"
2424
import { ProviderRoutes } from "./provider"
2525
import { EventRoutes } from "./event"
26+
import { SyncRoutes } from "./sync"
2627
import { WorkspaceRouterMiddleware } from "./middleware"
2728
import { AppRuntime } from "@/effect/app-runtime"
2829

@@ -37,6 +38,7 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket): Hono =>
3738
.route("/permission", PermissionRoutes())
3839
.route("/question", QuestionRoutes())
3940
.route("/provider", ProviderRoutes())
41+
.route("/sync", SyncRoutes())
4042
.route("/", FileRoutes())
4143
.route("/", EventRoutes())
4244
.route("/mcp", McpRoutes())

0 commit comments

Comments
 (0)