Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions src/server/routers/__tests__/release-canary.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,19 @@ vi.mock("@/server/services/audit", () => ({
writeAuditLog: vi.fn().mockResolvedValue({}),
}));

vi.mock("@/server/services/lake/replay", () => ({
getReplayJob: vi.fn(),
}));

vi.mock("@/server/services/lake/replay-validation", () => ({
evaluateReplayValidation: vi.fn(),
}));

import { prisma } from "@/lib/prisma";
import { canaryReleaseRouter } from "@/server/routers/release/canary";
import { stagedRolloutService } from "@/server/services/staged-rollout";
import { getReplayJob } from "@/server/services/lake/replay";
import { evaluateReplayValidation } from "@/server/services/lake/replay-validation";

const prismaMock = prisma as unknown as DeepMockProxy<PrismaClient>;
const stagedRolloutServiceMock = stagedRolloutService as unknown as {
Expand Down Expand Up @@ -131,6 +141,128 @@ describe("release.canary router", () => {
expect(result).toEqual({ success: true });
expect(stagedRolloutServiceMock.broadenRollout).toHaveBeenCalledWith("rollout-1");
});

it("blocks the broaden when replay validation FAILs and force is not set", async () => {
const caller = t.createCallerFactory(appRouter)({
session: { user: { id: "user-1", email: "test@test.com", name: "Test User" } },
userRole: "ADMIN",
teamId: "team-1",
});

prismaMock.release.findFirst.mockResolvedValueOnce({
id: "rollout-1",
pipelineId: "pipe-1",
pipeline: { environmentId: "env-1" },
} as never);
vi.mocked(getReplayJob).mockResolvedValueOnce({
id: "job-1",
targetPipelineId: "pipe-1",
startedAt: NOW,
completedAt: NOW,
} as never);
vi.mocked(evaluateReplayValidation).mockResolvedValueOnce({
verdict: "FAIL",
slis: [{ metric: "error_rate", status: "breached", value: 0.2, threshold: 0.05, condition: "lt" }],
window: { from: NOW.toISOString(), to: NOW.toISOString() },
});

await expect(
caller.release.canary.broaden({ rolloutId: "rollout-1", replayJobId: "job-1" }),
).rejects.toThrow(/Replay validation failed/);
expect(stagedRolloutServiceMock.broadenRollout).not.toHaveBeenCalled();
});

it("allows a forced broaden over a FAILed validation and records the override", async () => {
const caller = t.createCallerFactory(appRouter)({
session: { user: { id: "user-1", email: "test@test.com", name: "Test User" } },
userRole: "ADMIN",
teamId: "team-1",
});

prismaMock.release.findFirst.mockResolvedValueOnce({
id: "rollout-1",
pipelineId: "pipe-1",
pipeline: { environmentId: "env-1" },
} as never);
vi.mocked(getReplayJob).mockResolvedValueOnce({
id: "job-1",
targetPipelineId: "pipe-1",
startedAt: NOW,
completedAt: NOW,
} as never);
vi.mocked(evaluateReplayValidation).mockResolvedValueOnce({
verdict: "FAIL",
slis: [{ metric: "error_rate", status: "breached", value: 0.2, threshold: 0.05, condition: "lt" }],
window: { from: NOW.toISOString(), to: NOW.toISOString() },
});
stagedRolloutServiceMock.broadenRollout.mockResolvedValueOnce(undefined);

const result = await caller.release.canary.broaden({
rolloutId: "rollout-1",
replayJobId: "job-1",
force: true,
});

expect(result).toEqual({ success: true, replayValidation: { verdict: "FAIL", overridden: true } });
expect(stagedRolloutServiceMock.broadenRollout).toHaveBeenCalledWith("rollout-1");
});

it("broadens when replay validation PASSes and records the verdict", async () => {
const caller = t.createCallerFactory(appRouter)({
session: { user: { id: "user-1", email: "test@test.com", name: "Test User" } },
userRole: "ADMIN",
teamId: "team-1",
});

prismaMock.release.findFirst.mockResolvedValueOnce({
id: "rollout-1",
pipelineId: "pipe-1",
pipeline: { environmentId: "env-1" },
} as never);
vi.mocked(getReplayJob).mockResolvedValueOnce({
id: "job-1",
targetPipelineId: "pipe-1",
startedAt: NOW,
completedAt: NOW,
} as never);
vi.mocked(evaluateReplayValidation).mockResolvedValueOnce({
verdict: "PASS",
slis: [{ metric: "error_rate", status: "met", value: 0.0, threshold: 0.05, condition: "lt" }],
window: { from: NOW.toISOString(), to: NOW.toISOString() },
});
stagedRolloutServiceMock.broadenRollout.mockResolvedValueOnce(undefined);

const result = await caller.release.canary.broaden({ rolloutId: "rollout-1", replayJobId: "job-1" });

expect(result).toEqual({ success: true, replayValidation: { verdict: "PASS", overridden: false } });
expect(stagedRolloutServiceMock.broadenRollout).toHaveBeenCalledWith("rollout-1");
});

it("rejects a replay job that targets a different pipeline", async () => {
const caller = t.createCallerFactory(appRouter)({
session: { user: { id: "user-1", email: "test@test.com", name: "Test User" } },
userRole: "ADMIN",
teamId: "team-1",
});

prismaMock.release.findFirst.mockResolvedValueOnce({
id: "rollout-1",
pipelineId: "pipe-1",
pipeline: { environmentId: "env-1" },
} as never);
vi.mocked(getReplayJob).mockResolvedValueOnce({
id: "job-1",
targetPipelineId: "other-pipe",
startedAt: NOW,
completedAt: NOW,
} as never);

await expect(
caller.release.canary.broaden({ rolloutId: "rollout-1", replayJobId: "job-1" }),
).rejects.toThrow(/does not target this rollout/);
expect(vi.mocked(evaluateReplayValidation)).not.toHaveBeenCalled();
expect(stagedRolloutServiceMock.broadenRollout).not.toHaveBeenCalled();
});
});

describe("rollback", () => {
Expand Down
47 changes: 45 additions & 2 deletions src/server/routers/release/canary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { router, protectedProcedure, withTeamAccess } from "@/trpc/init";
import { prisma } from "@/lib/prisma";
import { stagedRolloutService } from "@/server/services/staged-rollout";
import { writeAuditLog } from "@/server/services/audit";
import { getReplayJob } from "@/server/services/lake/replay";
import { evaluateReplayValidation } from "@/server/services/lake/replay-validation";

export const canaryReleaseRouter = router({
create: protectedProcedure
Expand Down Expand Up @@ -62,7 +64,16 @@ export const canaryReleaseRouter = router({
}),

broaden: protectedProcedure
.input(z.object({ rolloutId: z.string() }))
.input(
z.object({
rolloutId: z.string(),
/** Optional NF-6 gate: a completed replay whose target is this
* rollout's pipeline. When present, a FAILED error-budget verdict
* blocks the broaden unless `force` is set. Absent → no gate. */
replayJobId: z.string().optional(),
force: z.boolean().optional(),
}),
)
.use(withTeamAccess("EDITOR"))
.mutation(async ({ input, ctx }) => {
// Fetch rollout to get pipelineId for audit log
Expand All @@ -71,6 +82,37 @@ export const canaryReleaseRouter = router({
select: { pipelineId: true, pipeline: { select: { environmentId: true } } },
});

// NF-6: gate canary -> full-fleet broaden on the candidate's replay
// error-budget when the caller opts in. With no replayJobId this is a
// no-op and broaden behaves exactly as before.
let replayValidation: { verdict: string; overridden: boolean } | null = null;
if (input.replayJobId && rollout) {
const job = await getReplayJob({ orgId: ctx.organizationId, jobId: input.replayJobId });
if (!job || job.targetPipelineId !== rollout.pipelineId) {
throw new TRPCError({
code: "BAD_REQUEST",
message: "Replay job does not target this rollout's pipeline",
});
}
const result = await evaluateReplayValidation({
targetPipelineId: job.targetPipelineId,
startedAt: job.startedAt,
completedAt: job.completedAt,
});
const overridden = result.verdict === "FAIL" && input.force === true;
if (result.verdict === "FAIL" && !overridden) {
const breached = result.slis
.filter((s) => s.status === "breached")
.map((s) => s.metric)
.join(", ");
throw new TRPCError({
code: "PRECONDITION_FAILED",
message: `Replay validation failed${breached ? ` (breached: ${breached})` : ""}. Re-run the canary replay or pass force to override.`,
});
}
replayValidation = { verdict: result.verdict, overridden };
}

await stagedRolloutService.broadenRollout(input.rolloutId);

if (rollout) {
Expand All @@ -83,6 +125,7 @@ export const canaryReleaseRouter = router({
metadata: {
timestamp: new Date().toISOString(),
rolloutId: input.rolloutId,
...(replayValidation ? { replayValidation } : {}),
},
teamId: (ctx as Record<string, unknown>).teamId as string | null ?? null,
environmentId: rollout.pipeline.environmentId,
Expand All @@ -92,7 +135,7 @@ export const canaryReleaseRouter = router({
}).catch(() => {});
}

return { success: true };
return replayValidation ? { success: true, replayValidation } : { success: true };
}),

rollback: protectedProcedure
Expand Down
20 changes: 20 additions & 0 deletions src/server/routers/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
ReplayError,
type ReplayFilter,
} from "@/server/services/lake/replay";
import { evaluateReplayValidation } from "@/server/services/lake/replay-validation";

/**
* VectorFlow Lake — replay / rehydration router (A4).
Expand Down Expand Up @@ -134,6 +135,25 @@ export const replayRouter = router({
return job;
}),

/** Score a completed replay against the TARGET pipeline's SLIs over the
* replay window — the promotion-gate signal (NF-6). The job's target must be
* `pipelineId`: the verdict is about the candidate the events were
* re-injected into, not the source they were read from. */
validate: protectedProcedure
.input(z.object({ pipelineId: z.string(), jobId: z.string() }))
.use(withTeamAccess("VIEWER"))
.query(async ({ input, ctx }) => {
const job = await getReplayJob({ orgId: ctx.organizationId, jobId: input.jobId });
if (!job || job.targetPipelineId !== input.pipelineId) {
throw new TRPCError({ code: "NOT_FOUND", message: "Replay job not found" });
}
return evaluateReplayValidation({
targetPipelineId: job.targetPipelineId,
startedAt: job.startedAt,
completedAt: job.completedAt,
});
}),

/** Cancel an in-flight replay job (EDITOR; audited). The job must reference
* `pipelineId`. Leaves the progress counters consistent. */
cancel: protectedProcedure
Expand Down
17 changes: 17 additions & 0 deletions src/server/services/__tests__/sli-evaluator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,21 @@ describe("evaluatePipelineHealth", () => {
}),
]);
});

it("reports a dark pipeline (empty metric window) as degraded", async () => {
prismaMock.pipelineSli.findMany.mockResolvedValue([
makeSli({ metric: "throughput_floor", condition: "gt", threshold: 10, windowMinutes: 5 }),
]);
prismaMock.pipelineMetric.aggregate.mockResolvedValue({
_sum: { eventsIn: null, errorsTotal: null, eventsDiscarded: null },
_count: 0,
} as never);

const result = await evaluatePipelineHealth("pipeline-1");

expect(result.status).toBe("degraded");
expect(result.slis[0]).toEqual(
expect.objectContaining({ metric: "throughput_floor", status: "breached", value: 0 }),
);
});
});
Loading
Loading