diff --git a/src/server/services/lake/__tests__/replay.test.ts b/src/server/services/lake/__tests__/replay.test.ts index 5c597682..6a84b726 100644 --- a/src/server/services/lake/__tests__/replay.test.ts +++ b/src/server/services/lake/__tests__/replay.test.ts @@ -298,6 +298,56 @@ describe("nextReplayBatch", () => { expect(result).toMatchObject({ status: "COMPLETED", done: true, replayedEvents: BigInt(10) }); }); + it("marks the job FAILED with a reason when the drained replay served fewer than totalEvents (NF-6)", async () => { + // Estimated 10 events at create, but the window drains after only 3 (short + // read) → cumulative 3 < 10 → FAILED with a reason, not a silent COMPLETED. + prismaMock.replayJob.findFirst.mockResolvedValue( + jobFixture({ status: "RUNNING", replayedEvents: BigInt(0), totalEvents: BigInt(10), startedAt: FROM }) as never, + ); + lakeQueryMock.mockResolvedValueOnce([lakeEvent("a"), lakeEvent("b"), lakeEvent("c")]); + prismaMock.replayJob.updateMany.mockResolvedValue({ count: 1 } as never); + + const result = await nextReplayBatch({ orgId: ORG, targetPipelineId: "tgt", batchSize: 5 }); + + expect(prismaMock.replayJob.updateMany).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ + status: "FAILED", + replayedEvents: BigInt(3), + completedAt: expect.any(Date), + error: expect.stringContaining("3 of 10"), + }), + }), + ); + expect(result).toMatchObject({ status: "FAILED", done: true, replayedEvents: BigInt(3) }); + // The final partial batch is still handed back — those events are real lake rows. + expect(result?.events).toHaveLength(3); + }); + + it("marks COMPLETED and clears the error when the drained replay met or exceeded totalEvents (NF-6)", async () => { + // totalEvents was under-counted at create (the lake grew afterwards); the + // window drains at 3 >= 2 → COMPLETED, with `error` explicitly cleared. + prismaMock.replayJob.findFirst.mockResolvedValue( + jobFixture({ status: "RUNNING", replayedEvents: BigInt(0), totalEvents: BigInt(2), startedAt: FROM }) as never, + ); + lakeQueryMock.mockResolvedValueOnce([lakeEvent("a"), lakeEvent("b"), lakeEvent("c")]); + prismaMock.replayJob.updateMany.mockResolvedValue({ count: 1 } as never); + + const result = await nextReplayBatch({ orgId: ORG, targetPipelineId: "tgt", batchSize: 5 }); + + expect(prismaMock.replayJob.updateMany).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ + status: "COMPLETED", + replayedEvents: BigInt(3), + completedAt: expect.any(Date), + error: null, + }), + }), + ); + expect(result).toMatchObject({ status: "COMPLETED", done: true, replayedEvents: BigInt(3) }); + }); + it("completes immediately on an empty window (totalEvents 0)", async () => { prismaMock.replayJob.findFirst.mockResolvedValue( jobFixture({ status: "PENDING", replayedEvents: BigInt(0), totalEvents: BigInt(0) }) as never, diff --git a/src/server/services/lake/replay.ts b/src/server/services/lake/replay.ts index 84b1194a..c69397e7 100644 --- a/src/server/services/lake/replay.ts +++ b/src/server/services/lake/replay.ts @@ -103,7 +103,8 @@ export interface ReplayBatch { /** Cursor after this batch (BigInt — total events served so far). */ replayedEvents: bigint; totalEvents: bigint; - /** True once the window is drained and the job flipped to COMPLETED. */ + /** True once the window is drained and the job reached a terminal state — + * COMPLETED on a full replay, FAILED on a short one (see nextReplayBatch). */ done: boolean; events: ReplayEvent[]; } @@ -331,7 +332,8 @@ export async function createReplayJob(args: { /** * Agent-pull primitive. Find the org's oldest active (PENDING|RUNNING) job for * `targetPipelineId`, serve the next bounded window, advance the cursor, and - * flip status: PENDING→RUNNING on the first pull, →COMPLETED once drained. + * flip status: PENDING→RUNNING on the first pull, then on drain →COMPLETED when + * the full `totalEvents` were served or →FAILED on a shortfall (NF-6). * Returns `null` when no active job exists (the route answers 204). * * The cursor read+advance happen in one org transaction, and the advancing @@ -378,12 +380,31 @@ export async function nextReplayBatch(args: { // estimate from create time, and trusting it would silently drop events if // the window were still receiving writes. The cost is one final empty pull. const done = fetched < batchSize; - const status: ReplayStatus = done ? REPLAY_STATUS.COMPLETED : REPLAY_STATUS.RUNNING; + // NF-6: validate completion. A drained replay is COMPLETED only when it + // served at least the `totalEvents` counted at create time; a shortfall + // means events that existed at create time never made it into the replay + // (a lake gap, TTL eviction, or a window that lost rows) — so the job is + // FAILED with a reason instead of a silent partial COMPLETED. An over-count + // (the lake grew after create) still satisfies `>=` and completes cleanly. + let status: ReplayStatus; + let failureReason: string | null = null; + if (!done) { + status = REPLAY_STATUS.RUNNING; + } else if (replayedEvents >= job.totalEvents) { + status = REPLAY_STATUS.COMPLETED; + } else { + status = REPLAY_STATUS.FAILED; + failureReason = `Replay drained after ${replayedEvents} of ${job.totalEvents} expected events`; + } const now = new Date(); const data: Prisma.ReplayJobUpdateManyMutationInput = { status, replayedEvents }; if (!job.startedAt) data.startedAt = now; - if (done) data.completedAt = now; + if (done) { + data.completedAt = now; + // Stamp the failure reason on a short replay; null clears it on a clean finish. + data.error = failureReason; + } // Guard on the job still being active: if it was cancelled/completed // concurrently, do not resurrect it or hand out the batch.