diff --git a/src/server/routers/__tests__/release-canary.test.ts b/src/server/routers/__tests__/release-canary.test.ts index 27301c5b..d5d8a42f 100644 --- a/src/server/routers/__tests__/release-canary.test.ts +++ b/src/server/routers/__tests__/release-canary.test.ts @@ -41,9 +41,19 @@ vi.mock("@/server/services/audit", () => ({ writeAuditLog: vi.fn().mockResolvedValue({}), })); +vi.mock("@/server/services/lake/replay", () => ({ + getReplayJob: vi.fn(), +})); + +vi.mock("@/server/services/lake/replay-validation", () => ({ + evaluateReplayValidation: vi.fn(), +})); + import { prisma } from "@/lib/prisma"; import { canaryReleaseRouter } from "@/server/routers/release/canary"; import { stagedRolloutService } from "@/server/services/staged-rollout"; +import { getReplayJob } from "@/server/services/lake/replay"; +import { evaluateReplayValidation } from "@/server/services/lake/replay-validation"; const prismaMock = prisma as unknown as DeepMockProxy; const stagedRolloutServiceMock = stagedRolloutService as unknown as { @@ -131,6 +141,128 @@ describe("release.canary router", () => { expect(result).toEqual({ success: true }); expect(stagedRolloutServiceMock.broadenRollout).toHaveBeenCalledWith("rollout-1"); }); + + it("blocks the broaden when replay validation FAILs and force is not set", async () => { + const caller = t.createCallerFactory(appRouter)({ + session: { user: { id: "user-1", email: "test@test.com", name: "Test User" } }, + userRole: "ADMIN", + teamId: "team-1", + }); + + prismaMock.release.findFirst.mockResolvedValueOnce({ + id: "rollout-1", + pipelineId: "pipe-1", + pipeline: { environmentId: "env-1" }, + } as never); + vi.mocked(getReplayJob).mockResolvedValueOnce({ + id: "job-1", + targetPipelineId: "pipe-1", + startedAt: NOW, + completedAt: NOW, + } as never); + vi.mocked(evaluateReplayValidation).mockResolvedValueOnce({ + verdict: "FAIL", + slis: [{ metric: "error_rate", status: "breached", value: 0.2, threshold: 0.05, condition: "lt" }], + window: { from: NOW.toISOString(), to: NOW.toISOString() }, + }); + + await expect( + caller.release.canary.broaden({ rolloutId: "rollout-1", replayJobId: "job-1" }), + ).rejects.toThrow(/Replay validation failed/); + expect(stagedRolloutServiceMock.broadenRollout).not.toHaveBeenCalled(); + }); + + it("allows a forced broaden over a FAILed validation and records the override", async () => { + const caller = t.createCallerFactory(appRouter)({ + session: { user: { id: "user-1", email: "test@test.com", name: "Test User" } }, + userRole: "ADMIN", + teamId: "team-1", + }); + + prismaMock.release.findFirst.mockResolvedValueOnce({ + id: "rollout-1", + pipelineId: "pipe-1", + pipeline: { environmentId: "env-1" }, + } as never); + vi.mocked(getReplayJob).mockResolvedValueOnce({ + id: "job-1", + targetPipelineId: "pipe-1", + startedAt: NOW, + completedAt: NOW, + } as never); + vi.mocked(evaluateReplayValidation).mockResolvedValueOnce({ + verdict: "FAIL", + slis: [{ metric: "error_rate", status: "breached", value: 0.2, threshold: 0.05, condition: "lt" }], + window: { from: NOW.toISOString(), to: NOW.toISOString() }, + }); + stagedRolloutServiceMock.broadenRollout.mockResolvedValueOnce(undefined); + + const result = await caller.release.canary.broaden({ + rolloutId: "rollout-1", + replayJobId: "job-1", + force: true, + }); + + expect(result).toEqual({ success: true, replayValidation: { verdict: "FAIL", overridden: true } }); + expect(stagedRolloutServiceMock.broadenRollout).toHaveBeenCalledWith("rollout-1"); + }); + + it("broadens when replay validation PASSes and records the verdict", async () => { + const caller = t.createCallerFactory(appRouter)({ + session: { user: { id: "user-1", email: "test@test.com", name: "Test User" } }, + userRole: "ADMIN", + teamId: "team-1", + }); + + prismaMock.release.findFirst.mockResolvedValueOnce({ + id: "rollout-1", + pipelineId: "pipe-1", + pipeline: { environmentId: "env-1" }, + } as never); + vi.mocked(getReplayJob).mockResolvedValueOnce({ + id: "job-1", + targetPipelineId: "pipe-1", + startedAt: NOW, + completedAt: NOW, + } as never); + vi.mocked(evaluateReplayValidation).mockResolvedValueOnce({ + verdict: "PASS", + slis: [{ metric: "error_rate", status: "met", value: 0.0, threshold: 0.05, condition: "lt" }], + window: { from: NOW.toISOString(), to: NOW.toISOString() }, + }); + stagedRolloutServiceMock.broadenRollout.mockResolvedValueOnce(undefined); + + const result = await caller.release.canary.broaden({ rolloutId: "rollout-1", replayJobId: "job-1" }); + + expect(result).toEqual({ success: true, replayValidation: { verdict: "PASS", overridden: false } }); + expect(stagedRolloutServiceMock.broadenRollout).toHaveBeenCalledWith("rollout-1"); + }); + + it("rejects a replay job that targets a different pipeline", async () => { + const caller = t.createCallerFactory(appRouter)({ + session: { user: { id: "user-1", email: "test@test.com", name: "Test User" } }, + userRole: "ADMIN", + teamId: "team-1", + }); + + prismaMock.release.findFirst.mockResolvedValueOnce({ + id: "rollout-1", + pipelineId: "pipe-1", + pipeline: { environmentId: "env-1" }, + } as never); + vi.mocked(getReplayJob).mockResolvedValueOnce({ + id: "job-1", + targetPipelineId: "other-pipe", + startedAt: NOW, + completedAt: NOW, + } as never); + + await expect( + caller.release.canary.broaden({ rolloutId: "rollout-1", replayJobId: "job-1" }), + ).rejects.toThrow(/does not target this rollout/); + expect(vi.mocked(evaluateReplayValidation)).not.toHaveBeenCalled(); + expect(stagedRolloutServiceMock.broadenRollout).not.toHaveBeenCalled(); + }); }); describe("rollback", () => { diff --git a/src/server/routers/release/canary.ts b/src/server/routers/release/canary.ts index 2440557d..7ea338dc 100644 --- a/src/server/routers/release/canary.ts +++ b/src/server/routers/release/canary.ts @@ -4,6 +4,8 @@ import { router, protectedProcedure, withTeamAccess } from "@/trpc/init"; import { prisma } from "@/lib/prisma"; import { stagedRolloutService } from "@/server/services/staged-rollout"; import { writeAuditLog } from "@/server/services/audit"; +import { getReplayJob } from "@/server/services/lake/replay"; +import { evaluateReplayValidation } from "@/server/services/lake/replay-validation"; export const canaryReleaseRouter = router({ create: protectedProcedure @@ -62,7 +64,16 @@ export const canaryReleaseRouter = router({ }), broaden: protectedProcedure - .input(z.object({ rolloutId: z.string() })) + .input( + z.object({ + rolloutId: z.string(), + /** Optional NF-6 gate: a completed replay whose target is this + * rollout's pipeline. When present, a FAILED error-budget verdict + * blocks the broaden unless `force` is set. Absent → no gate. */ + replayJobId: z.string().optional(), + force: z.boolean().optional(), + }), + ) .use(withTeamAccess("EDITOR")) .mutation(async ({ input, ctx }) => { // Fetch rollout to get pipelineId for audit log @@ -71,6 +82,37 @@ export const canaryReleaseRouter = router({ select: { pipelineId: true, pipeline: { select: { environmentId: true } } }, }); + // NF-6: gate canary -> full-fleet broaden on the candidate's replay + // error-budget when the caller opts in. With no replayJobId this is a + // no-op and broaden behaves exactly as before. + let replayValidation: { verdict: string; overridden: boolean } | null = null; + if (input.replayJobId && rollout) { + const job = await getReplayJob({ orgId: ctx.organizationId, jobId: input.replayJobId }); + if (!job || job.targetPipelineId !== rollout.pipelineId) { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Replay job does not target this rollout's pipeline", + }); + } + const result = await evaluateReplayValidation({ + targetPipelineId: job.targetPipelineId, + startedAt: job.startedAt, + completedAt: job.completedAt, + }); + const overridden = result.verdict === "FAIL" && input.force === true; + if (result.verdict === "FAIL" && !overridden) { + const breached = result.slis + .filter((s) => s.status === "breached") + .map((s) => s.metric) + .join(", "); + throw new TRPCError({ + code: "PRECONDITION_FAILED", + message: `Replay validation failed${breached ? ` (breached: ${breached})` : ""}. Re-run the canary replay or pass force to override.`, + }); + } + replayValidation = { verdict: result.verdict, overridden }; + } + await stagedRolloutService.broadenRollout(input.rolloutId); if (rollout) { @@ -83,6 +125,7 @@ export const canaryReleaseRouter = router({ metadata: { timestamp: new Date().toISOString(), rolloutId: input.rolloutId, + ...(replayValidation ? { replayValidation } : {}), }, teamId: (ctx as Record).teamId as string | null ?? null, environmentId: rollout.pipeline.environmentId, @@ -92,7 +135,7 @@ export const canaryReleaseRouter = router({ }).catch(() => {}); } - return { success: true }; + return replayValidation ? { success: true, replayValidation } : { success: true }; }), rollback: protectedProcedure diff --git a/src/server/routers/replay.ts b/src/server/routers/replay.ts index 693894a9..c7fe8e81 100644 --- a/src/server/routers/replay.ts +++ b/src/server/routers/replay.ts @@ -12,6 +12,7 @@ import { ReplayError, type ReplayFilter, } from "@/server/services/lake/replay"; +import { evaluateReplayValidation } from "@/server/services/lake/replay-validation"; /** * VectorFlow Lake — replay / rehydration router (A4). @@ -134,6 +135,25 @@ export const replayRouter = router({ return job; }), + /** Score a completed replay against the TARGET pipeline's SLIs over the + * replay window — the promotion-gate signal (NF-6). The job's target must be + * `pipelineId`: the verdict is about the candidate the events were + * re-injected into, not the source they were read from. */ + validate: protectedProcedure + .input(z.object({ pipelineId: z.string(), jobId: z.string() })) + .use(withTeamAccess("VIEWER")) + .query(async ({ input, ctx }) => { + const job = await getReplayJob({ orgId: ctx.organizationId, jobId: input.jobId }); + if (!job || job.targetPipelineId !== input.pipelineId) { + throw new TRPCError({ code: "NOT_FOUND", message: "Replay job not found" }); + } + return evaluateReplayValidation({ + targetPipelineId: job.targetPipelineId, + startedAt: job.startedAt, + completedAt: job.completedAt, + }); + }), + /** Cancel an in-flight replay job (EDITOR; audited). The job must reference * `pipelineId`. Leaves the progress counters consistent. */ cancel: protectedProcedure diff --git a/src/server/services/__tests__/sli-evaluator.test.ts b/src/server/services/__tests__/sli-evaluator.test.ts index 0dff2501..35ed2072 100644 --- a/src/server/services/__tests__/sli-evaluator.test.ts +++ b/src/server/services/__tests__/sli-evaluator.test.ts @@ -75,4 +75,21 @@ describe("evaluatePipelineHealth", () => { }), ]); }); + + it("reports a dark pipeline (empty metric window) as degraded", async () => { + prismaMock.pipelineSli.findMany.mockResolvedValue([ + makeSli({ metric: "throughput_floor", condition: "gt", threshold: 10, windowMinutes: 5 }), + ]); + prismaMock.pipelineMetric.aggregate.mockResolvedValue({ + _sum: { eventsIn: null, errorsTotal: null, eventsDiscarded: null }, + _count: 0, + } as never); + + const result = await evaluatePipelineHealth("pipeline-1"); + + expect(result.status).toBe("degraded"); + expect(result.slis[0]).toEqual( + expect.objectContaining({ metric: "throughput_floor", status: "breached", value: 0 }), + ); + }); }); diff --git a/src/server/services/lake/__tests__/replay-validation.test.ts b/src/server/services/lake/__tests__/replay-validation.test.ts new file mode 100644 index 00000000..1ab44b64 --- /dev/null +++ b/src/server/services/lake/__tests__/replay-validation.test.ts @@ -0,0 +1,147 @@ +import { vi, describe, it, expect, beforeEach } from "vitest"; +import { mockDeep, mockReset, type DeepMockProxy } from "vitest-mock-extended"; +import type { PrismaClient } from "@/generated/prisma"; + +vi.mock("@/lib/prisma", () => { + const __pm = mockDeep(); + return { prisma: __pm, basePrisma: __pm, adminPrisma: __pm }; +}); + +import { prisma } from "@/lib/prisma"; +import { + evaluateReplayValidation, + REPLAY_GATED_METRICS, +} from "@/server/services/lake/replay-validation"; + +const prismaMock = prisma as unknown as DeepMockProxy; + +const FROM = new Date("2026-03-01T00:00:00Z"); +const TO = new Date("2026-03-01T01:00:00Z"); + +function makeSli(overrides: { metric: string; condition?: string; threshold?: number }) { + return { + id: `sli-${overrides.metric}`, + pipelineId: "p1", + metric: overrides.metric, + condition: overrides.condition ?? "lt", + threshold: overrides.threshold ?? 0.05, + windowMinutes: 5, + enabled: true, + createdAt: new Date(), + }; +} + +function aggregateResult(sum: { + eventsIn: number | null; + errorsTotal?: number; + eventsDiscarded?: number; +}, count: number) { + return { + _sum: { + eventsIn: sum.eventsIn === null ? null : BigInt(sum.eventsIn), + errorsTotal: BigInt(sum.errorsTotal ?? 0), + eventsDiscarded: BigInt(sum.eventsDiscarded ?? 0), + }, + _count: count, + } as never; +} + +describe("evaluateReplayValidation", () => { + beforeEach(() => { + mockReset(prismaMock); + }); + + it("returns NO_DATA without touching the DB when the replay has no window", async () => { + const result = await evaluateReplayValidation({ + targetPipelineId: "p1", + startedAt: null, + completedAt: null, + }); + + expect(result).toEqual({ verdict: "NO_DATA", slis: [], window: null }); + expect(prismaMock.pipelineSli.findMany).not.toHaveBeenCalled(); + expect(prismaMock.pipelineMetric.aggregate).not.toHaveBeenCalled(); + }); + + it("PASSes when the candidate meets its SLIs over the replay window", async () => { + prismaMock.pipelineSli.findMany.mockResolvedValue([makeSli({ metric: "error_rate" })] as never); + // 10 / 1000 = 1% < 5% threshold → met + prismaMock.pipelineMetric.aggregate.mockResolvedValue( + aggregateResult({ eventsIn: 1000, errorsTotal: 10 }, 3), + ); + + const result = await evaluateReplayValidation({ + targetPipelineId: "p1", + startedAt: FROM, + completedAt: TO, + }); + + expect(result.verdict).toBe("PASS"); + expect(result.window).toEqual({ from: FROM.toISOString(), to: TO.toISOString() }); + // Scored strictly over the replay window, not a rolling one. + expect(prismaMock.pipelineMetric.aggregate).toHaveBeenCalledWith( + expect.objectContaining({ + where: expect.objectContaining({ timestamp: { gte: FROM, lte: TO } }), + }), + ); + }); + + it("FAILs when an SLI is breached over the replay window", async () => { + prismaMock.pipelineSli.findMany.mockResolvedValue([makeSli({ metric: "error_rate" })] as never); + // 200 / 1000 = 20% > 5% threshold → breached + prismaMock.pipelineMetric.aggregate.mockResolvedValue( + aggregateResult({ eventsIn: 1000, errorsTotal: 200 }, 3), + ); + + const result = await evaluateReplayValidation({ + targetPipelineId: "p1", + startedAt: FROM, + completedAt: TO, + }); + + expect(result.verdict).toBe("FAIL"); + expect(result.slis[0]).toEqual( + expect.objectContaining({ metric: "error_rate", status: "breached" }), + ); + }); + + it("returns NO_DATA when the target has no replay-applicable SLIs", async () => { + prismaMock.pipelineSli.findMany.mockResolvedValue([] as never); + + const result = await evaluateReplayValidation({ + targetPipelineId: "p1", + startedAt: FROM, + completedAt: TO, + }); + + expect(result.verdict).toBe("NO_DATA"); + expect(result.slis).toEqual([]); + expect(prismaMock.pipelineMetric.aggregate).not.toHaveBeenCalled(); + }); + + it("returns NO_DATA when no metrics landed in the replay window", async () => { + prismaMock.pipelineSli.findMany.mockResolvedValue([makeSli({ metric: "error_rate" })] as never); + prismaMock.pipelineMetric.aggregate.mockResolvedValue(aggregateResult({ eventsIn: null }, 0)); + + const result = await evaluateReplayValidation({ + targetPipelineId: "p1", + startedAt: FROM, + completedAt: TO, + }); + + expect(result.verdict).toBe("NO_DATA"); + }); + + it("only scores replay-applicable SLI metrics (excludes throughput_floor)", async () => { + prismaMock.pipelineSli.findMany.mockResolvedValue([] as never); + + await evaluateReplayValidation({ targetPipelineId: "p1", startedAt: FROM, completedAt: TO }); + + expect(prismaMock.pipelineSli.findMany).toHaveBeenCalledWith( + expect.objectContaining({ + where: expect.objectContaining({ metric: { in: [...REPLAY_GATED_METRICS] } }), + }), + ); + expect(REPLAY_GATED_METRICS).not.toContain("throughput_floor"); + }); +}); diff --git a/src/server/services/lake/replay-validation.ts b/src/server/services/lake/replay-validation.ts new file mode 100644 index 00000000..577f764d --- /dev/null +++ b/src/server/services/lake/replay-validation.ts @@ -0,0 +1,91 @@ +import { prisma } from "@/lib/prisma"; +import { + evaluateSliOverWindow, + rollUpSliStatus, + type SliResult, + type SliStatus, +} from "@/server/services/sli-evaluator"; + +/** + * Replay-driven promotion validation (NF-6). + * + * A replay re-injects a bounded, immutable sample of past lake events into a + * candidate pipeline (the canary). This module scores how the candidate + * behaved *over exactly that sample* against the pipeline's own SLIs, so a + * promotion can be gated on a real "prove the new config is safe" signal + * rather than a deploy-and-pray rollout. + * + * The score reuses the pipeline's configured `PipelineSli` thresholds as the + * error budget — there is no second, parallel notion of "acceptable" — but + * evaluates them strictly over the replay's `[startedAt, completedAt]` window + * (via {@link evaluateSliOverWindow}) instead of a rolling trailing window. + */ + +/** PASS = every scored SLI met; FAIL = at least one breached; NO_DATA = nothing + * scorable (no replay window, no applicable SLIs, or no metrics in window). */ +export type ReplayVerdict = "PASS" | "FAIL" | "NO_DATA"; + +export interface ReplayValidationResult { + verdict: ReplayVerdict; + /** Per-SLI breakdown (empty when there was nothing to score). */ + slis: SliResult[]; + /** The replay window the SLIs were scored over, or null when absent. */ + window: { from: string; to: string } | null; +} + +/** + * SLI metrics meaningful to score over a bounded replay sample. + * + * `throughput_floor` is deliberately excluded: it is a time-rate gate, and a + * replay's wall-clock duration is an artifact of the agent's polling cadence, + * not of real traffic — so events/second over a replay window carries no + * signal. `error_rate` / `discard_rate` are ratios and `latency_mean` an + * average; all three are independent of the window's duration. + */ +export const REPLAY_GATED_METRICS = ["error_rate", "discard_rate", "latency_mean"] as const; + +const VERDICT_BY_STATUS: Record = { + healthy: "PASS", + degraded: "FAIL", + no_data: "NO_DATA", +}; + +/** + * Score a completed replay against its target pipeline's SLIs. + * + * Returns `NO_DATA` (never a spurious PASS/FAIL) when the replay has no + * window yet, the target has no replay-applicable SLIs, or no metrics landed + * in the window — a gate built on this result must treat `NO_DATA` as + * "no opinion", not as approval. + */ +export async function evaluateReplayValidation(args: { + targetPipelineId: string; + startedAt: Date | null; + completedAt: Date | null; +}): Promise { + const { targetPipelineId, startedAt, completedAt } = args; + + // A replay only carries signal once it has actually run a window of events. + if (!startedAt || !completedAt) { + return { verdict: "NO_DATA", slis: [], window: null }; + } + + const sliDefs = await prisma.pipelineSli.findMany({ + where: { + pipelineId: targetPipelineId, + enabled: true, + metric: { in: [...REPLAY_GATED_METRICS] }, + }, + }); + + const slis: SliResult[] = []; + for (const sli of sliDefs) { + slis.push(await evaluateSliOverWindow(targetPipelineId, sli, startedAt, completedAt)); + } + + return { + verdict: VERDICT_BY_STATUS[rollUpSliStatus(slis)], + slis, + window: { from: startedAt.toISOString(), to: completedAt.toISOString() }, + }; +} diff --git a/src/server/services/sli-evaluator.ts b/src/server/services/sli-evaluator.ts index c2dd632f..8ffaf8a9 100644 --- a/src/server/services/sli-evaluator.ts +++ b/src/server/services/sli-evaluator.ts @@ -1,4 +1,5 @@ import { prisma } from "@/lib/prisma"; +import type { PipelineSli } from "@/generated/prisma"; const AGGREGATE_PIPELINE_METRIC_FILTER = { componentId: null, @@ -15,6 +16,121 @@ export interface SliResult { condition: string; } +/** The subset of a `PipelineSli` row this evaluator reads. Accepting a + * structural shape (not the full Prisma model) keeps the function unit- + * testable with a plain fixture. */ +export type SliDefinition = Pick< + PipelineSli, + "metric" | "threshold" | "condition" | "windowMinutes" +>; + +/** + * Evaluate a single SLI for a pipeline over an explicit time window. + * + * `until` omitted → open-ended window `[since, now)`, used by the rolling + * health check. Both bounds set → a fixed `[since, until]` window, used by + * replay validation to score a pipeline strictly over the events a replay + * re-injected (so a healthy pipeline isn't dragged down — or propped up — by + * traffic outside the replay). + * + * Returns `no_data` when the window holds no metric rows, or when a rate + * metric (`error_rate` / `discard_rate`) has zero throughput — a ratio over + * zero events carries no signal and must not be scored as a breach. + */ +export async function evaluateSliOverWindow( + pipelineId: string, + sli: SliDefinition, + since: Date, + until?: Date, + /** Verdict for a window with zero metric rows. Rolling health treats a dark + * pipeline as `breached` — its documented contract, mirrored by + * batch-health.ts / fleet-metrics.ts. Replay validation uses `no_data`: an + * unscored window is "no opinion", never a failure. */ + emptyWindowStatus: "breached" | "no_data" = "no_data", +): Promise { + const timestamp = until ? { gte: since, lte: until } : { gte: since }; + const noData: SliResult = { + metric: sli.metric, + status: "no_data", + value: null, + threshold: sli.threshold, + condition: sli.condition, + }; + + // Use aggregate to avoid transferring all metric rows to the application + const agg = await prisma.pipelineMetric.aggregate({ + where: { pipelineId, ...AGGREGATE_PIPELINE_METRIC_FILTER, timestamp }, + _sum: { eventsIn: true, errorsTotal: true, eventsDiscarded: true }, + _count: true, + }); + + if (agg._count === 0) { + return emptyWindowStatus === "breached" + ? { metric: sli.metric, status: "breached", value: 0, threshold: sli.threshold, condition: sli.condition } + : noData; + } + + let value: number; + const totalEventsIn = Number(agg._sum.eventsIn ?? 0); + + // For rate-based metrics, zero throughput means no meaningful signal + if (totalEventsIn === 0 && (sli.metric === "error_rate" || sli.metric === "discard_rate")) { + return noData; + } + + switch (sli.metric) { + case "error_rate": { + const totalErrors = Number(agg._sum.errorsTotal ?? 0); + value = totalErrors / totalEventsIn; + break; + } + case "discard_rate": { + const totalDiscarded = Number(agg._sum.eventsDiscarded ?? 0); + value = totalDiscarded / totalEventsIn; + break; + } + case "throughput_floor": { + const windowSeconds = sli.windowMinutes * 60; + value = totalEventsIn / windowSeconds; + break; + } + case "latency_mean": { + const latencyAgg = await prisma.pipelineMetric.aggregate({ + where: { + pipelineId, + ...AGGREGATE_PIPELINE_METRIC_FILTER, + timestamp, + latencyMeanMs: { not: null }, + }, + _avg: { latencyMeanMs: true }, + _count: true, + }); + if (latencyAgg._count === 0) return noData; + value = latencyAgg._avg.latencyMeanMs ?? 0; + break; + } + default: + value = 0; + } + + const met = sli.condition === "lt" ? value < sli.threshold : value > sli.threshold; + return { + metric: sli.metric, + status: met ? "met" : "breached", + value, + threshold: sli.threshold, + condition: sli.condition, + }; +} + +/** Roll per-SLI results up to a single status: `no_data` when nothing scored, + * `healthy` when every scored SLI is met, `degraded` if any breached. */ +export function rollUpSliStatus(results: SliResult[]): SliStatus { + const evaluated = results.filter((r) => r.status !== "no_data"); + if (evaluated.length === 0) return "no_data"; + return evaluated.every((r) => r.status === "met") ? "healthy" : "degraded"; +} + export async function evaluatePipelineHealth(pipelineId: string): Promise<{ status: SliStatus; slis: SliResult[]; @@ -26,104 +142,11 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{ if (sliDefs.length === 0) return { status: "no_data", slis: [] }; const results: SliResult[] = []; - for (const sli of sliDefs) { + // Each rolling SLI scores its own trailing window. const since = new Date(Date.now() - sli.windowMinutes * 60_000); - - // Use aggregate to avoid transferring all metric rows to the application - const agg = await prisma.pipelineMetric.aggregate({ - where: { pipelineId, ...AGGREGATE_PIPELINE_METRIC_FILTER, timestamp: { gte: since } }, - _sum: { eventsIn: true, errorsTotal: true, eventsDiscarded: true }, - _count: true, - }); - - if (agg._count === 0) { - results.push({ - metric: sli.metric, - status: "breached", - value: 0, - threshold: sli.threshold, - condition: sli.condition, - }); - continue; - } - - let value: number; - const totalEventsIn = Number(agg._sum.eventsIn ?? 0); - - // For rate-based metrics, zero throughput means no meaningful signal - if (totalEventsIn === 0 && (sli.metric === "error_rate" || sli.metric === "discard_rate")) { - results.push({ - metric: sli.metric, - status: "no_data", - value: null, - threshold: sli.threshold, - condition: sli.condition, - }); - continue; - } - - switch (sli.metric) { - case "error_rate": { - const totalErrors = Number(agg._sum.errorsTotal ?? 0); - value = totalErrors / totalEventsIn; - break; - } - case "discard_rate": { - const totalDiscarded = Number(agg._sum.eventsDiscarded ?? 0); - value = totalDiscarded / totalEventsIn; - break; - } - case "throughput_floor": { - const windowSeconds = sli.windowMinutes * 60; - value = totalEventsIn / windowSeconds; - break; - } - case "latency_mean": { - const latencyAgg = await prisma.pipelineMetric.aggregate({ - where: { - pipelineId, - ...AGGREGATE_PIPELINE_METRIC_FILTER, - timestamp: { gte: since }, - latencyMeanMs: { not: null }, - }, - _avg: { latencyMeanMs: true }, - _count: true, - }); - if (latencyAgg._count === 0) { - results.push({ - metric: sli.metric, - status: "no_data", - value: null, - threshold: sli.threshold, - condition: sli.condition, - }); - continue; - } - value = latencyAgg._avg.latencyMeanMs ?? 0; - break; - } - default: - value = 0; - } - - const met = - sli.condition === "lt" ? value < sli.threshold : value > sli.threshold; - results.push({ - metric: sli.metric, - status: met ? "met" : "breached", - value, - threshold: sli.threshold, - condition: sli.condition, - }); + results.push(await evaluateSliOverWindow(pipelineId, sli, since, undefined, "breached")); } - const evaluated = results.filter((r) => r.status !== "no_data"); - const overallStatus: SliStatus = - evaluated.length === 0 - ? "no_data" - : evaluated.every((r) => r.status === "met") - ? "healthy" - : "degraded"; - return { status: overallStatus, slis: results }; + return { status: rollUpSliStatus(results), slis: results }; }