Skip to content

Commit 37883a9

Browse files
authored
refactor(core): add full http proxy and change workspace adaptor interface (anomalyco#21239)
1 parent 3c31d04 commit 37883a9

5 files changed

Lines changed: 182 additions & 20 deletions

File tree

packages/opencode/src/control-plane/adaptors/worktree.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ export const WorktreeAdaptor: Adaptor = {
3232
const config = Config.parse(info)
3333
await Worktree.remove({ directory: config.directory })
3434
},
35-
async fetch(_info, _input: RequestInfo | URL, _init?: RequestInit) {
36-
throw new Error("fetch not implemented")
35+
target(info) {
36+
const config = Config.parse(info)
37+
return {
38+
type: "local",
39+
directory: config.directory,
40+
}
3741
},
3842
}

packages/opencode/src/control-plane/types.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,20 @@ export const WorkspaceInfo = z.object({
1313
})
1414
export type WorkspaceInfo = z.infer<typeof WorkspaceInfo>
1515

16+
export type Target =
17+
| {
18+
type: "local"
19+
directory: string
20+
}
21+
| {
22+
type: "remote"
23+
url: string | URL
24+
headers?: HeadersInit
25+
}
26+
1627
export type Adaptor = {
1728
configure(input: WorkspaceInfo): WorkspaceInfo | Promise<WorkspaceInfo>
18-
create(input: WorkspaceInfo, from?: WorkspaceInfo): Promise<void>
29+
create(config: WorkspaceInfo, from?: WorkspaceInfo): Promise<void>
1930
remove(config: WorkspaceInfo): Promise<void>
20-
fetch(config: WorkspaceInfo, input: RequestInfo | URL, init?: RequestInit): Promise<Response>
31+
target(config: WorkspaceInfo): Target | Promise<Target>
2132
}

packages/opencode/src/control-plane/workspace.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,17 +116,31 @@ export namespace Workspace {
116116
async function workspaceEventLoop(space: Info, stop: AbortSignal) {
117117
while (!stop.aborted) {
118118
const adaptor = await getAdaptor(space.type)
119-
const res = await adaptor.fetch(space, "/event", { method: "GET", signal: stop }).catch(() => undefined)
120-
if (!res || !res.ok || !res.body) {
119+
const target = await Promise.resolve(adaptor.target(space))
120+
121+
if (target.type === "local") {
122+
return
123+
}
124+
125+
const baseURL = String(target.url).replace(/\/?$/, "/")
126+
127+
const res = await fetch(new URL(baseURL + "/event"), {
128+
method: "GET",
129+
signal: stop,
130+
})
131+
132+
if (!res.ok || !res.body) {
121133
await sleep(1000)
122134
continue
123135
}
136+
124137
await parseSSE(res.body, stop, (event) => {
125138
GlobalBus.emit("event", {
126139
directory: space.id,
127140
payload: event,
128141
})
129142
})
143+
130144
// Wait 250ms and retry if SSE connection fails
131145
await sleep(250)
132146
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import type { Target } from "@/control-plane/types"
2+
import { lazy } from "@/util/lazy"
3+
import { Hono } from "hono"
4+
import { upgradeWebSocket } from "hono/bun"
5+
6+
const hop = new Set([
7+
"connection",
8+
"keep-alive",
9+
"proxy-authenticate",
10+
"proxy-authorization",
11+
"proxy-connection",
12+
"te",
13+
"trailer",
14+
"transfer-encoding",
15+
"upgrade",
16+
"host",
17+
])
18+
19+
type Msg = string | ArrayBuffer | Uint8Array
20+
21+
function headers(req: Request, extra?: HeadersInit) {
22+
const out = new Headers(req.headers)
23+
for (const key of hop) out.delete(key)
24+
out.delete("x-opencode-directory")
25+
out.delete("x-opencode-workspace")
26+
if (!extra) return out
27+
for (const [key, value] of new Headers(extra).entries()) {
28+
out.set(key, value)
29+
}
30+
return out
31+
}
32+
33+
function protocols(req: Request) {
34+
const value = req.headers.get("sec-websocket-protocol")
35+
if (!value) return []
36+
return value
37+
.split(",")
38+
.map((item) => item.trim())
39+
.filter(Boolean)
40+
}
41+
42+
function socket(url: string | URL) {
43+
const next = new URL(url)
44+
if (next.protocol === "http:") next.protocol = "ws:"
45+
if (next.protocol === "https:") next.protocol = "wss:"
46+
return next.toString()
47+
}
48+
49+
function send(ws: { send(data: string | ArrayBuffer | Uint8Array): void }, data: any) {
50+
if (data instanceof Blob) {
51+
return data.arrayBuffer().then((x) => ws.send(x))
52+
}
53+
return ws.send(data)
54+
}
55+
56+
const app = lazy(() =>
57+
new Hono().get(
58+
"/__workspace_ws",
59+
upgradeWebSocket((c) => {
60+
const url = c.req.header("x-opencode-proxy-url")
61+
const queue: Msg[] = []
62+
let remote: WebSocket | undefined
63+
return {
64+
onOpen(_, ws) {
65+
if (!url) {
66+
ws.close(1011, "missing proxy target")
67+
return
68+
}
69+
remote = new WebSocket(url, protocols(c.req.raw))
70+
remote.binaryType = "arraybuffer"
71+
remote.onopen = () => {
72+
for (const item of queue) remote?.send(item)
73+
queue.length = 0
74+
}
75+
remote.onmessage = (event) => {
76+
send(ws, event.data)
77+
}
78+
remote.onerror = () => {
79+
ws.close(1011, "proxy error")
80+
}
81+
remote.onclose = (event) => {
82+
ws.close(event.code, event.reason)
83+
}
84+
},
85+
onMessage(event) {
86+
const data = event.data
87+
if (typeof data !== "string" && !(data instanceof Uint8Array) && !(data instanceof ArrayBuffer)) return
88+
if (remote?.readyState === WebSocket.OPEN) {
89+
remote.send(data)
90+
return
91+
}
92+
queue.push(data)
93+
},
94+
onClose(event) {
95+
remote?.close(event.code, event.reason)
96+
},
97+
}
98+
}),
99+
),
100+
)
101+
102+
export namespace ServerProxy {
103+
export function http(target: Extract<Target, { type: "remote" }>, req: Request) {
104+
return fetch(
105+
new Request(target.url, {
106+
method: req.method,
107+
headers: headers(req, target.headers),
108+
body: req.method === "GET" || req.method === "HEAD" ? undefined : req.body,
109+
redirect: "manual",
110+
signal: req.signal,
111+
}),
112+
)
113+
}
114+
115+
export function websocket(target: Extract<Target, { type: "remote" }>, req: Request, env: unknown) {
116+
const url = new URL(req.url)
117+
url.pathname = "/__workspace_ws"
118+
url.search = ""
119+
const next = new Headers(req.headers)
120+
next.set("x-opencode-proxy-url", socket(target.url))
121+
return app().fetch(
122+
new Request(url, {
123+
method: req.method,
124+
headers: next,
125+
signal: req.signal,
126+
}),
127+
env as never,
128+
)
129+
}
130+
}

packages/opencode/src/server/router.ts

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { UpgradeWebSocket } from "hono/ws"
33
import { getAdaptor } from "@/control-plane/adaptors"
44
import { WorkspaceID } from "@/control-plane/schema"
55
import { Workspace } from "@/control-plane/workspace"
6+
import { ServerProxy } from "./proxy"
67
import { lazy } from "@/util/lazy"
78
import { Filesystem } from "@/util/filesystem"
89
import { Instance } from "@/project/instance"
@@ -41,7 +42,7 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
4142
)
4243

4344
const url = new URL(c.req.url)
44-
const workspaceParam = url.searchParams.get("workspace")
45+
const workspaceParam = url.searchParams.get("workspace") || c.req.header("x-opencode-workspace")
4546

4647
// TODO: If session is being routed, force it to lookup the
4748
// project/workspace
@@ -68,35 +69,37 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
6869
})
6970
}
7071

71-
// Handle local workspaces directly so we can pass env to `fetch`,
72-
// necessary for websocket upgrades
73-
if (workspace.type === "worktree") {
72+
const adaptor = await getAdaptor(workspace.type)
73+
const target = await adaptor.target(workspace)
74+
75+
if (target.type === "local") {
7476
return Instance.provide({
75-
directory: workspace.directory!,
77+
directory: target.directory,
7678
init: InstanceBootstrap,
7779
async fn() {
7880
return routes().fetch(c.req.raw, c.env)
7981
},
8082
})
8183
}
8284

83-
// Remote workspaces
84-
8585
if (local(c.req.method, url.pathname)) {
8686
// No instance provided because we are serving cached data; there
8787
// is no instance to work with
8888
return routes().fetch(c.req.raw, c.env)
8989
}
9090

91-
const adaptor = await getAdaptor(workspace.type)
91+
if (c.req.header("upgrade")?.toLowerCase() === "websocket") {
92+
return ServerProxy.websocket(target, c.req.raw, c.env)
93+
}
94+
9295
const headers = new Headers(c.req.raw.headers)
9396
headers.delete("x-opencode-workspace")
9497

95-
return adaptor.fetch(workspace, `${url.pathname}${url.search}`, {
96-
method: c.req.method,
97-
body: c.req.method === "GET" || c.req.method === "HEAD" ? undefined : await c.req.raw.arrayBuffer(),
98-
signal: c.req.raw.signal,
99-
headers,
100-
})
98+
return ServerProxy.http(
99+
target,
100+
new Request(c.req.raw, {
101+
headers,
102+
}),
103+
)
101104
}
102105
}

0 commit comments

Comments
 (0)