Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/engine/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export {
captureFrame,
captureFrameToBuffer,
captureFrameToBufferPipelined,
captureFramesBatchPipelined,
writeCapturedFrame,
discardWarmupCapture,
getCompositionDuration,
Expand Down
237 changes: 237 additions & 0 deletions packages/engine/src/services/drawElementService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -832,3 +832,240 @@ export async function produceDrawElementFrame(

return { encodeResult };
}

/**
* P6 prototype (HF_DE_BATCH): batch-produce N consecutive frames in ONE CDP
* round-trip. In-page loop per frame: `__hf.seek(t)` → paint-wait (tick toggle +
* canvas `paint` event) → drawElementImage composite → createImageBitmap →
* postMessage to the encode worker. Bitmaps are posted per-frame (encode starts
* immediately); only the CDP protocol round-trips are amortized N-fold.
* Micro-pipeline inside the batch: frame i+1's seek/paint-wait overlaps frame
* i's createImageBitmap (the canvas is only redrawn after i's bitmap resolves).
*
* macOS-GPU sync path only (the worker-encode gate guarantees this at the call
* site). On an in-page failure at frame k, frames < k are already at the worker
* (their promises resolve normally); pending entries for frames >= k are
* rejected here and `failedAt` tells the caller to re-capture k.. via the
* per-frame path (which owns the screenshot-fallback semantics).
*/
export async function produceDrawElementFrameBatch(
page: Page,
times: number[],
width: number,
height: number,
quality = 80,
): Promise<{ encodeResults: Array<Promise<Buffer>>; failedAt: number | null; error?: string }> {
const state = workerEncodeStates.get(page);
if (!state) {
throw new Error(
"drawElement worker encode not initialized; call initDrawElementWorkerEncode first",
);
}

const fids: number[] = [];
const encodeResults: Array<Promise<Buffer>> = [];
for (let i = 0; i < times.length; i++) {
const frameId = ++state.nextId;
fids.push(frameId);
const p = new Promise<Buffer>((resolve, reject) => {
const timer = setTimeout(() => {
if (state.pending.delete(frameId)) {
reject(new Error(`drawElement worker encode timed out (frame ${frameId})`));
}
}, 30_000);
state.pending.set(frameId, {
resolve: (b) => {
clearTimeout(timer);
resolve(b);
},
reject: (e) => {
clearTimeout(timer);
reject(e);
},
});
});
void p.catch(() => {}); // same orphan-rejection guard as produceDrawElementFrame
encodeResults.push(p);
}

const outcome = await page.evaluate(
async ({
frames,
w,
h,
q,
}: {
frames: Array<{ t: number; fid: number }>;
w: number;
h: number;
q: number;
}): Promise<{ failedAt: number | null; error?: string }> => {
const canvas = document.getElementById("__hf_de_canvas") as HTMLCanvasElement | null;
const root = document.querySelector("[data-composition-id]") as HTMLElement | null;
if (!canvas || !root) return { failedAt: 0, error: "drawElement canvas not initialized" };
const ctx = canvas.getContext("2d");
if (!ctx) return { failedAt: 0, error: "drawElement: 2d context unavailable" };

type AccelWindow = Window & {
__hf_accel_canvases?: HTMLCanvasElement[];
__hf3d?: { update: () => void };
__hf?: { seek?: (t: number) => void };
__HF_ROOT_PROPS__?: boolean;
__HF_ROOT_BASE_OPACITY__?: number;
__hfEncWorker?: Worker;
};
const aw = window as AccelWindow;

const waitPaint = (): Promise<void> =>
new Promise((res) => {
let done = false;
const settle = () => {
if (done) return;
done = true;
canvas.removeEventListener("paint", settle);
res();
};
canvas.addEventListener("paint", settle);
const tick = document.getElementById("__hf_de_tick");
if (tick) {
tick.style.backgroundColor =
tick.style.backgroundColor === "rgb(0, 0, 0)" ? "rgb(1, 1, 1)" : "rgb(0, 0, 0)";
}
setTimeout(settle, 250);
});

let prevBitmap: Promise<void> = Promise.resolve();
let prevBitmapIdx = -1;
const errMsg = (e: unknown) => (e instanceof Error ? e.message : String(e));

for (let i = 0; i < frames.length; i++) {
const frame = frames[i];
if (!frame) return { failedAt: i, error: "batch frame missing" };
const { t, fid } = frame;
try {
if (aw.__hf && typeof aw.__hf.seek === "function") aw.__hf.seek(t);
aw.__hf3d?.update();
const accel = (aw.__hf_accel_canvases ?? []).filter((c) => root.contains(c));
for (const c of accel) {
if (c.style.visibility !== "hidden") c.style.visibility = "hidden";
}
await waitPaint();
// Wait for the previous frame's bitmap before overwriting the canvas.
try {
await prevBitmap;
} catch (e) {
return { failedAt: prevBitmapIdx, error: errMsg(e) };
}

ctx.clearRect(0, 0, w, h);
let bg = "";
for (let el = root.parentElement; el; el = el.parentElement) {
const c = getComputedStyle(el).backgroundColor;
if (c && c !== "transparent" && c !== "rgba(0, 0, 0, 0)") {
bg = c;
break;
}
}
ctx.fillStyle = bg || "#fff";
ctx.fillRect(0, 0, w, h);
const rootRect = root.getBoundingClientRect();
for (const c of accel) {
if (c.hasAttribute("data-hf-3d")) continue;
const r = c.getBoundingClientRect();
try {
ctx.drawImage(c, r.left - rootRect.left, r.top - rootRect.top, r.width, r.height);
} catch {
// skip
}
}
// Root compositor-applied opacity/transform correction — mirrors
// produceDrawElementFrame (see its comment).
let appliedAlpha = false;
let appliedTransform = false;
if (aw.__HF_ROOT_PROPS__) {
try {
const rcs = getComputedStyle(root);
const baseOp = aw.__HF_ROOT_BASE_OPACITY__ ?? 1;
const curOp = parseFloat(rcs.opacity);
if (baseOp > 0.001 && Number.isFinite(curOp)) {
const ratio = curOp / baseOp;
if (Math.abs(ratio - 1) > 0.002) {
ctx.globalAlpha = Math.max(0, Math.min(1, ratio));
appliedAlpha = true;
}
}
const curTransform = rcs.transform;
if (curTransform && curTransform !== "none") {
const m = new DOMMatrix(curTransform);
const origin = rcs.transformOrigin.split(" ");
const ox = parseFloat(origin[0] ?? "0") || 0;
const oy = parseFloat(origin[1] ?? "0") || 0;
ctx.translate(ox, oy);
ctx.transform(m.a, m.b, m.c, m.d, m.e, m.f);
ctx.translate(-ox, -oy);
appliedTransform = true;
}
} catch {
/* leave context unchanged → uncorrected (no worse than before) */
}
}
(
ctx as unknown as { drawElementImage(el: Element, x: number, y: number): void }
).drawElementImage(root, 0, 0);
if (appliedAlpha) ctx.globalAlpha = 1;
if (appliedTransform) ctx.setTransform(1, 0, 0, 1, 0, 0);
for (const c of accel) {
if (!c.hasAttribute("data-hf-3d")) continue;
const r = c.getBoundingClientRect();
try {
ctx.drawImage(c, r.left - rootRect.left, r.top - rootRect.top, r.width, r.height);
} catch {
// skip
}
}

prevBitmapIdx = i;
prevBitmap = createImageBitmap(canvas).then((bmp) => {
if (!aw.__hfEncWorker) {
bmp.close();
throw new Error("drawElement: encode worker not initialized");
}
aw.__hfEncWorker.postMessage({ bmp, id: fid, w, h, q: q / 100 }, [bmp]);
});
} catch (e) {
try {
await prevBitmap;
} catch {
/* prior frame's failure surfaces via its own pending timeout path */
}
return { failedAt: i, error: errMsg(e) };
}
}
try {
await prevBitmap;
} catch (e) {
return { failedAt: prevBitmapIdx, error: errMsg(e) };
}
return { failedAt: null };
},
{ frames: times.map((t, i) => ({ t, fid: fids[i] ?? 0 })), w: width, h: height, q: quality },
);

if (outcome.failedAt !== null) {
// Frames >= failedAt never reached the worker — reject their pendings now
// so nothing waits 30s on the watchdog.
for (let k = outcome.failedAt; k < fids.length; k++) {
const fid = fids[k];
if (fid === undefined) continue;
const entry = state.pending.get(fid);
if (entry) {
state.pending.delete(fid);
entry.reject(
new Error(`drawElement batch produce failed at frame ${k}: ${outcome.error ?? "?"}`),
);
}
}
}

return { encodeResults, failedAt: outcome.failedAt, error: outcome.error };
}
68 changes: 68 additions & 0 deletions packages/engine/src/services/frameCapture.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
initDrawElementWorkerEncode,
cleanupDrawElementWorkerEncode,
produceDrawElementFrame,
produceDrawElementFrameBatch,
} from "./drawElementService.js";
import { initThreeDProjection, detectCssEffectRisk } from "./threeDProjection.js";
import { DEFAULT_CONFIG, type EngineConfig } from "../config.js";
Expand Down Expand Up @@ -2496,6 +2497,73 @@ export async function captureFrameToBufferPipelined(
}
}

/**
* P6 prototype (HF_DE_BATCH): capture N consecutive frames in one CDP
* round-trip via {@link produceDrawElementFrameBatch}. The caller pre-plans the
* batch (consecutive frame indices, none static-dedup'd, none opt-in
* boundary-screenshot). On a mid-batch in-page failure the remaining frames are
* re-captured through {@link captureFrameToBufferPipelined}, which owns the
* per-frame screenshot-fallback semantics — so failure behavior is identical to
* the unbatched path, just discovered at batch granularity.
*/
export async function captureFramesBatchPipelined(
session: CaptureSession,
frameIndices: number[],
times: number[],
): Promise<Array<{ frameIndex: number; encodeResult: Promise<Buffer> }>> {
const { page, options } = session;
if (!session.isInitialized) {
throw new Error("[FrameCapture] Session not initialized");
}
const startTime = Date.now();
const fps = fpsToNumber(options.fps);
const quantized = times.map((t) => quantizeTimeToFrame(t, fps));

const { encodeResults, failedAt, error } = await produceDrawElementFrameBatch(
page,
quantized,
options.width,
options.height,
options.quality ?? 80,
);

const okCount = failedAt === null ? frameIndices.length : failedAt;
const elapsed = Date.now() - startTime;
session.capturePerf.frames += okCount;
// Round-trips are fused — attribute the whole batch to produce time.
session.capturePerf.screenshotMs += elapsed;
session.capturePerf.totalMs += elapsed;

const results: Array<{ frameIndex: number; encodeResult: Promise<Buffer> }> = [];
for (let i = 0; i < okCount; i++) {
const frameIndex = frameIndices[i];
const encodeResult = encodeResults[i];
if (frameIndex === undefined || !encodeResult) break;
results.push({ frameIndex, encodeResult });
}

if (failedAt !== null) {
console.log(
`[engine] fast capture: batch produce failed at frame ` +
`${frameIndices[failedAt] ?? "?"} (${error ?? "?"}); ` +
`re-capturing ${frameIndices.length - failedAt} frame(s) per-frame`,
);
for (let i = failedAt; i < frameIndices.length; i++) {
const frameIndex = frameIndices[i];
const time = times[i];
if (frameIndex === undefined || time === undefined) break;
const { encodeResult } = await captureFrameToBufferPipelined(session, frameIndex, time);
results.push({ frameIndex, encodeResult });
}
}

// Task B: retain the last encode result so a following static frame can reuse it.
const last = results[results.length - 1];
if (session.staticFrames && last) session.lastEncodeResult = last.encodeResult;

return results;
}

/**
* Type of the "inner capture" function consumed by
* {@link discardWarmupCapture}. Matches the real `captureFrameCore` signature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import {
type StreamingEncoder,
captureFrameToBuffer,
captureFrameToBufferPipelined,
captureFramesBatchPipelined,
closeCaptureSession,
createCaptureSession,
createFrameReorderBuffer,
Expand Down Expand Up @@ -167,6 +168,59 @@ async function runWorkerEncodePipelineLoop(
);
};

// P6 (HF_DE_BATCH=N, N>1): capture runs of consecutive frames in ONE CDP
// round-trip each (in-page seek+paint+draw loop), amortizing protocol latency
// N-fold. Batches break at static-dedup frames (skipped via lastEncodeResult
// reuse — order-dependent) and at opt-in boundary-screenshot frames; those go
// through the per-frame path unchanged. onBeforeCapture (video frame
// injection) needs a node-side hook per frame → no batching.
const batchN = Math.floor(Number(process.env.HF_DE_BATCH ?? "0"));
if (batchN > 1 && !session.onBeforeCapture) {
const frameTime = (f: number) => (f * job.config.fps.den) / job.config.fps.num;
const drainBatch = async (batch: Array<{ idx: number; encodeResult: Promise<Buffer> }>) => {
for (const item of batch) {
assertNotAborted();
const buf = await item.encodeResult;
await reorderBuffer.waitForFrame(item.idx);
ensureFrameWritten(await currentEncoder.writeFrame(buf), item.idx, currentEncoder);
reorderBuffer.advanceTo(item.idx + 1);
job.framesRendered = item.idx + 1;
updateJobStatus(
job,
"rendering",
`Streaming frame ${item.idx + 1}/${totalFrames}`,
Math.round(25 + ((item.idx + 1) / totalFrames) * 55),
onProgress,
);
}
};
const boundarySS = process.env.HF_FAST_CAPTURE_BOUNDARY_SS === "true";
const batchable = (f: number) =>
!session.staticFrames?.has(f) && !(boundarySS && session.clipBoundaryFrames?.has(f));
let prevBatch: Array<{ idx: number; encodeResult: Promise<Buffer> }> = [];
let i = 0;
while (i < totalFrames) {
assertNotAborted();
if (batchable(i)) {
const idxs: number[] = [];
while (idxs.length < batchN && i < totalFrames && batchable(i)) {
idxs.push(i);
i++;
}
const results = await captureFramesBatchPipelined(session, idxs, idxs.map(frameTime));
await drainBatch(prevBatch);
prevBatch = results.map((r) => ({ idx: r.frameIndex, encodeResult: r.encodeResult }));
} else {
const { encodeResult } = await captureFrameToBufferPipelined(session, i, frameTime(i));
await drainBatch(prevBatch);
prevBatch = [{ idx: i, encodeResult }];
i++;
}
}
await drainBatch(prevBatch);
return;
}

// On abort/throw the just-produced frame's encode is still in flight and never
// awaited (it isn't `prev` yet); cleanupDrawElementWorkerEncode rejects it on
// close. produceDrawElementFrame attaches a no-op catch to every encodeResult
Expand Down
Loading