Skip to content

Commit a147ad6

Browse files
authored
feat(shared): add Effect-idiomatic file lock (EffectFlock) (#22681)
1 parent ac2fa66 commit a147ad6

3 files changed

Lines changed: 730 additions & 0 deletions

File tree

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
import path from "path"
2+
import os from "os"
3+
import { randomUUID } from "crypto"
4+
import { Context, Effect, Function, Layer, Option, Schedule, Schema } from "effect"
5+
import type { FileSystem, Scope } from "effect"
6+
import type { PlatformError } from "effect/PlatformError"
7+
import { AppFileSystem } from "../filesystem"
8+
import { Global } from "../global"
9+
import { Hash } from "./hash"
10+
11+
export namespace EffectFlock {
12+
// ---------------------------------------------------------------------------
13+
// Errors
14+
// ---------------------------------------------------------------------------
15+
16+
export class LockTimeoutError extends Schema.TaggedErrorClass<LockTimeoutError>()("LockTimeoutError", {
17+
key: Schema.String,
18+
}) {}
19+
20+
export class LockCompromisedError extends Schema.TaggedErrorClass<LockCompromisedError>()("LockCompromisedError", {
21+
detail: Schema.String,
22+
}) {}
23+
24+
class ReleaseError extends Schema.TaggedErrorClass<ReleaseError>()("ReleaseError", {
25+
detail: Schema.String,
26+
cause: Schema.optional(Schema.Defect),
27+
}) {
28+
override get message() {
29+
return this.detail
30+
}
31+
}
32+
33+
/** Internal: signals "lock is held, retry later". Never leaks to callers. */
34+
class NotAcquired extends Schema.TaggedErrorClass<NotAcquired>()("NotAcquired", {}) {}
35+
36+
export type LockError = LockTimeoutError | LockCompromisedError
37+
38+
// ---------------------------------------------------------------------------
39+
// Timing (baked in — no caller ever overrides these)
40+
// ---------------------------------------------------------------------------
41+
42+
const STALE_MS = 60_000
43+
const TIMEOUT_MS = 5 * 60_000
44+
const BASE_DELAY_MS = 100
45+
const MAX_DELAY_MS = 2_000
46+
const HEARTBEAT_MS = Math.max(100, Math.floor(STALE_MS / 3))
47+
48+
const retrySchedule = Schedule.exponential(BASE_DELAY_MS, 1.7).pipe(
49+
Schedule.either(Schedule.spaced(MAX_DELAY_MS)),
50+
Schedule.jittered,
51+
Schedule.while((meta) => meta.elapsed < TIMEOUT_MS),
52+
)
53+
54+
// ---------------------------------------------------------------------------
55+
// Lock metadata schema
56+
// ---------------------------------------------------------------------------
57+
58+
const LockMetaJson = Schema.fromJsonString(
59+
Schema.Struct({
60+
token: Schema.String,
61+
pid: Schema.Number,
62+
hostname: Schema.String,
63+
createdAt: Schema.String,
64+
}),
65+
)
66+
67+
const decodeMeta = Schema.decodeUnknownSync(LockMetaJson)
68+
const encodeMeta = Schema.encodeSync(LockMetaJson)
69+
70+
// ---------------------------------------------------------------------------
71+
// Service
72+
// ---------------------------------------------------------------------------
73+
74+
export interface Interface {
75+
readonly acquire: (key: string, dir?: string) => Effect.Effect<void, LockError, Scope.Scope>
76+
readonly withLock: {
77+
(key: string, dir?: string): <A, E, R>(body: Effect.Effect<A, E, R>) => Effect.Effect<A, E | LockError, R>
78+
<A, E, R>(body: Effect.Effect<A, E, R>, key: string, dir?: string): Effect.Effect<A, E | LockError, R>
79+
}
80+
}
81+
82+
export class Service extends Context.Service<Service, Interface>()("EffectFlock") {}
83+
84+
// ---------------------------------------------------------------------------
85+
// Layer
86+
// ---------------------------------------------------------------------------
87+
88+
function wall() {
89+
return performance.timeOrigin + performance.now()
90+
}
91+
92+
const mtimeMs = (info: FileSystem.File.Info) => Option.getOrElse(info.mtime, () => new Date(0)).getTime()
93+
94+
const isPathGone = (e: PlatformError) => e.reason._tag === "NotFound" || e.reason._tag === "Unknown"
95+
96+
export const layer: Layer.Layer<Service, never, Global.Service | AppFileSystem.Service> = Layer.effect(
97+
Service,
98+
Effect.gen(function* () {
99+
const global = yield* Global.Service
100+
const fs = yield* AppFileSystem.Service
101+
const lockRoot = path.join(global.state, "locks")
102+
const hostname = os.hostname()
103+
const ensuredDirs = new Set<string>()
104+
105+
// -- helpers (close over fs) --
106+
107+
const safeStat = (file: string) =>
108+
fs.stat(file).pipe(
109+
Effect.catchIf(isPathGone, () => Effect.void),
110+
Effect.orDie,
111+
)
112+
113+
const forceRemove = (target: string) => fs.remove(target, { recursive: true }).pipe(Effect.ignore)
114+
115+
/** Atomic mkdir — returns true if created, false if already exists, dies on other errors. */
116+
const atomicMkdir = (dir: string) =>
117+
fs.makeDirectory(dir, { mode: 0o700 }).pipe(
118+
Effect.as(true),
119+
Effect.catchIf(
120+
(e) => e.reason._tag === "AlreadyExists",
121+
() => Effect.succeed(false),
122+
),
123+
Effect.orDie,
124+
)
125+
126+
/** Write with exclusive create — compromised error if file already exists. */
127+
const exclusiveWrite = (filePath: string, content: string, lockDir: string, detail: string) =>
128+
fs.writeFileString(filePath, content, { flag: "wx" }).pipe(
129+
Effect.catch(() =>
130+
Effect.gen(function* () {
131+
yield* forceRemove(lockDir)
132+
return yield* new LockCompromisedError({ detail })
133+
}),
134+
),
135+
)
136+
137+
const cleanStaleBreaker = Effect.fnUntraced(function* (breakerPath: string) {
138+
const bs = yield* safeStat(breakerPath)
139+
if (bs && wall() - mtimeMs(bs) > STALE_MS) yield* forceRemove(breakerPath)
140+
return false
141+
})
142+
143+
const ensureDir = Effect.fnUntraced(function* (dir: string) {
144+
if (ensuredDirs.has(dir)) return
145+
yield* fs.makeDirectory(dir, { recursive: true }).pipe(Effect.orDie)
146+
ensuredDirs.add(dir)
147+
})
148+
149+
const isStale = Effect.fnUntraced(function* (lockDir: string, heartbeatPath: string, metaPath: string) {
150+
const now = wall()
151+
152+
const hb = yield* safeStat(heartbeatPath)
153+
if (hb) return now - mtimeMs(hb) > STALE_MS
154+
155+
const meta = yield* safeStat(metaPath)
156+
if (meta) return now - mtimeMs(meta) > STALE_MS
157+
158+
const dir = yield* safeStat(lockDir)
159+
if (!dir) return false
160+
161+
return now - mtimeMs(dir) > STALE_MS
162+
})
163+
164+
// -- single lock attempt --
165+
166+
type Handle = { token: string; metaPath: string; heartbeatPath: string; lockDir: string }
167+
168+
const tryAcquireLockDir = Effect.fn("EffectFlock.tryAcquire")(function* (lockDir: string) {
169+
const token = randomUUID()
170+
const metaPath = path.join(lockDir, "meta.json")
171+
const heartbeatPath = path.join(lockDir, "heartbeat")
172+
173+
// Atomic mkdir — the POSIX lock primitive
174+
const created = yield* atomicMkdir(lockDir)
175+
176+
if (!created) {
177+
if (!(yield* isStale(lockDir, heartbeatPath, metaPath))) return yield* new NotAcquired()
178+
179+
// Stale — race for breaker ownership
180+
const breakerPath = lockDir + ".breaker"
181+
182+
const claimed = yield* fs.makeDirectory(breakerPath, { mode: 0o700 }).pipe(
183+
Effect.as(true),
184+
Effect.catchIf(
185+
(e) => e.reason._tag === "AlreadyExists",
186+
() => cleanStaleBreaker(breakerPath),
187+
),
188+
Effect.catchIf(isPathGone, () => Effect.succeed(false)),
189+
Effect.orDie,
190+
)
191+
192+
if (!claimed) return yield* new NotAcquired()
193+
194+
// We own the breaker — double-check staleness, nuke, recreate
195+
const recreated = yield* Effect.gen(function* () {
196+
if (!(yield* isStale(lockDir, heartbeatPath, metaPath))) return false
197+
yield* forceRemove(lockDir)
198+
return yield* atomicMkdir(lockDir)
199+
}).pipe(Effect.ensuring(forceRemove(breakerPath)))
200+
201+
if (!recreated) return yield* new NotAcquired()
202+
}
203+
204+
// We own the lock dir — write heartbeat + meta with exclusive create
205+
yield* exclusiveWrite(heartbeatPath, "", lockDir, "heartbeat already existed")
206+
207+
const metaJson = encodeMeta({ token, pid: process.pid, hostname, createdAt: new Date().toISOString() })
208+
yield* exclusiveWrite(metaPath, metaJson, lockDir, "meta.json already existed")
209+
210+
return { token, metaPath, heartbeatPath, lockDir } satisfies Handle
211+
})
212+
213+
// -- retry wrapper (preserves Handle type) --
214+
215+
const acquireHandle = (lockfile: string, key: string): Effect.Effect<Handle, LockError> =>
216+
tryAcquireLockDir(lockfile).pipe(
217+
Effect.retry({
218+
while: (err) => err._tag === "NotAcquired",
219+
schedule: retrySchedule,
220+
}),
221+
Effect.catchTag("NotAcquired", () => Effect.fail(new LockTimeoutError({ key }))),
222+
)
223+
224+
// -- release --
225+
226+
const release = (handle: Handle) =>
227+
Effect.gen(function* () {
228+
const raw = yield* fs.readFileString(handle.metaPath).pipe(
229+
Effect.catch((err) => {
230+
if (isPathGone(err)) return Effect.die(new ReleaseError({ detail: "metadata missing" }))
231+
return Effect.die(err)
232+
}),
233+
)
234+
235+
const parsed = yield* Effect.try({
236+
try: () => decodeMeta(raw),
237+
catch: (cause) => new ReleaseError({ detail: "metadata invalid", cause }),
238+
}).pipe(Effect.orDie)
239+
240+
if (parsed.token !== handle.token) return yield* Effect.die(new ReleaseError({ detail: "token mismatch" }))
241+
242+
yield* forceRemove(handle.lockDir)
243+
})
244+
245+
// -- build service --
246+
247+
const acquire = Effect.fn("EffectFlock.acquire")(function* (key: string, dir?: string) {
248+
const lockDir = dir ?? lockRoot
249+
yield* ensureDir(lockDir)
250+
251+
const lockfile = path.join(lockDir, Hash.fast(key) + ".lock")
252+
253+
// acquireRelease: acquire is uninterruptible, release is guaranteed
254+
const handle = yield* Effect.acquireRelease(acquireHandle(lockfile, key), (handle) => release(handle))
255+
256+
// Heartbeat fiber — scoped, so it's interrupted before release runs
257+
yield* fs
258+
.utimes(handle.heartbeatPath, new Date(), new Date())
259+
.pipe(Effect.ignore, Effect.repeat(Schedule.spaced(HEARTBEAT_MS)), Effect.forkScoped)
260+
})
261+
262+
const withLock: Interface["withLock"] = Function.dual(
263+
(args) => Effect.isEffect(args[0]),
264+
<A, E, R>(body: Effect.Effect<A, E, R>, key: string, dir?: string): Effect.Effect<A, E | LockError, R> =>
265+
Effect.scoped(
266+
Effect.gen(function* () {
267+
yield* acquire(key, dir)
268+
return yield* body
269+
}),
270+
),
271+
)
272+
273+
return Service.of({ acquire, withLock })
274+
}),
275+
)
276+
277+
export const live = layer.pipe(Layer.provide(AppFileSystem.defaultLayer))
278+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import fs from "fs/promises"
2+
import path from "path"
3+
import os from "os"
4+
import { Effect, Layer } from "effect"
5+
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
6+
import { EffectFlock } from "@opencode-ai/shared/util/effect-flock"
7+
import { Global } from "@opencode-ai/shared/global"
8+
9+
type Msg = {
10+
key: string
11+
dir: string
12+
holdMs?: number
13+
ready?: string
14+
active?: string
15+
done?: string
16+
}
17+
18+
function sleep(ms: number) {
19+
return new Promise<void>((resolve) => setTimeout(resolve, ms))
20+
}
21+
22+
const msg: Msg = JSON.parse(process.argv[2]!)
23+
24+
const testGlobal = Layer.succeed(
25+
Global.Service,
26+
Global.Service.of({
27+
home: os.homedir(),
28+
data: os.tmpdir(),
29+
cache: os.tmpdir(),
30+
config: os.tmpdir(),
31+
state: os.tmpdir(),
32+
bin: os.tmpdir(),
33+
log: os.tmpdir(),
34+
}),
35+
)
36+
37+
const testLayer = EffectFlock.layer.pipe(Layer.provide(testGlobal), Layer.provide(AppFileSystem.defaultLayer))
38+
39+
async function job() {
40+
if (msg.ready) await fs.writeFile(msg.ready, String(process.pid))
41+
if (msg.active) await fs.writeFile(msg.active, String(process.pid), { flag: "wx" })
42+
43+
try {
44+
if (msg.holdMs && msg.holdMs > 0) await sleep(msg.holdMs)
45+
if (msg.done) await fs.appendFile(msg.done, "1\n")
46+
} finally {
47+
if (msg.active) await fs.rm(msg.active, { force: true })
48+
}
49+
}
50+
51+
await Effect.runPromise(
52+
Effect.gen(function* () {
53+
const flock = yield* EffectFlock.Service
54+
yield* flock.withLock(
55+
Effect.promise(() => job()),
56+
msg.key,
57+
msg.dir,
58+
)
59+
}).pipe(Effect.provide(testLayer)),
60+
).catch((err) => {
61+
const text = err instanceof Error ? (err.stack ?? err.message) : String(err)
62+
process.stderr.write(text)
63+
process.exit(1)
64+
})

0 commit comments

Comments
 (0)