Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/server/services/lake/__tests__/replay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,56 @@ describe("nextReplayBatch", () => {
expect(result).toMatchObject({ status: "COMPLETED", done: true, replayedEvents: BigInt(10) });
});

it("marks the job FAILED with a reason when the drained replay served fewer than totalEvents (NF-6)", async () => {
// Estimated 10 events at create, but the window drains after only 3 (short
// read) → cumulative 3 < 10 → FAILED with a reason, not a silent COMPLETED.
prismaMock.replayJob.findFirst.mockResolvedValue(
jobFixture({ status: "RUNNING", replayedEvents: BigInt(0), totalEvents: BigInt(10), startedAt: FROM }) as never,
);
lakeQueryMock.mockResolvedValueOnce([lakeEvent("a"), lakeEvent("b"), lakeEvent("c")]);
prismaMock.replayJob.updateMany.mockResolvedValue({ count: 1 } as never);

const result = await nextReplayBatch({ orgId: ORG, targetPipelineId: "tgt", batchSize: 5 });

expect(prismaMock.replayJob.updateMany).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({
status: "FAILED",
replayedEvents: BigInt(3),
completedAt: expect.any(Date),
error: expect.stringContaining("3 of 10"),
}),
}),
);
expect(result).toMatchObject({ status: "FAILED", done: true, replayedEvents: BigInt(3) });
// The final partial batch is still handed back — those events are real lake rows.
expect(result?.events).toHaveLength(3);
});

it("marks COMPLETED and clears the error when the drained replay met or exceeded totalEvents (NF-6)", async () => {
// totalEvents was under-counted at create (the lake grew afterwards); the
// window drains at 3 >= 2 → COMPLETED, with `error` explicitly cleared.
prismaMock.replayJob.findFirst.mockResolvedValue(
jobFixture({ status: "RUNNING", replayedEvents: BigInt(0), totalEvents: BigInt(2), startedAt: FROM }) as never,
);
lakeQueryMock.mockResolvedValueOnce([lakeEvent("a"), lakeEvent("b"), lakeEvent("c")]);
prismaMock.replayJob.updateMany.mockResolvedValue({ count: 1 } as never);

const result = await nextReplayBatch({ orgId: ORG, targetPipelineId: "tgt", batchSize: 5 });

expect(prismaMock.replayJob.updateMany).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({
status: "COMPLETED",
replayedEvents: BigInt(3),
completedAt: expect.any(Date),
error: null,
}),
}),
);
expect(result).toMatchObject({ status: "COMPLETED", done: true, replayedEvents: BigInt(3) });
});

it("completes immediately on an empty window (totalEvents 0)", async () => {
prismaMock.replayJob.findFirst.mockResolvedValue(
jobFixture({ status: "PENDING", replayedEvents: BigInt(0), totalEvents: BigInt(0) }) as never,
Expand Down
29 changes: 25 additions & 4 deletions src/server/services/lake/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ export interface ReplayBatch {
/** Cursor after this batch (BigInt — total events served so far). */
replayedEvents: bigint;
totalEvents: bigint;
/** True once the window is drained and the job flipped to COMPLETED. */
/** True once the window is drained and the job reached a terminal state —
* COMPLETED on a full replay, FAILED on a short one (see nextReplayBatch). */
done: boolean;
events: ReplayEvent[];
}
Expand Down Expand Up @@ -331,7 +332,8 @@ export async function createReplayJob(args: {
/**
* Agent-pull primitive. Find the org's oldest active (PENDING|RUNNING) job for
* `targetPipelineId`, serve the next bounded window, advance the cursor, and
* flip status: PENDING→RUNNING on the first pull, →COMPLETED once drained.
* flip status: PENDING→RUNNING on the first pull, then on drain →COMPLETED when
* the full `totalEvents` were served or →FAILED on a shortfall (NF-6).
* Returns `null` when no active job exists (the route answers 204).
*
* The cursor read+advance happen in one org transaction, and the advancing
Expand Down Expand Up @@ -378,12 +380,31 @@ export async function nextReplayBatch(args: {
// estimate from create time, and trusting it would silently drop events if
// the window were still receiving writes. The cost is one final empty pull.
const done = fetched < batchSize;
const status: ReplayStatus = done ? REPLAY_STATUS.COMPLETED : REPLAY_STATUS.RUNNING;
// NF-6: validate completion. A drained replay is COMPLETED only when it
// served at least the `totalEvents` counted at create time; a shortfall
// means events that existed at create time never made it into the replay
// (a lake gap, TTL eviction, or a window that lost rows) — so the job is
// FAILED with a reason instead of a silent partial COMPLETED. An over-count
// (the lake grew after create) still satisfies `>=` and completes cleanly.
let status: ReplayStatus;
let failureReason: string | null = null;
if (!done) {
status = REPLAY_STATUS.RUNNING;
} else if (replayedEvents >= job.totalEvents) {
status = REPLAY_STATUS.COMPLETED;
} else {
status = REPLAY_STATUS.FAILED;
failureReason = `Replay drained after ${replayedEvents} of ${job.totalEvents} expected events`;
}
const now = new Date();

const data: Prisma.ReplayJobUpdateManyMutationInput = { status, replayedEvents };
if (!job.startedAt) data.startedAt = now;
if (done) data.completedAt = now;
if (done) {
data.completedAt = now;
// Stamp the failure reason on a short replay; null clears it on a clean finish.
data.error = failureReason;
}

// Guard on the job still being active: if it was cancelled/completed
// concurrently, do not resurrect it or hand out the batch.
Expand Down
Loading