diff --git a/src/server/services/__tests__/anomaly-detection-job.test.ts b/src/server/services/__tests__/anomaly-detection-job.test.ts index f242f1e3..7fe23605 100644 --- a/src/server/services/__tests__/anomaly-detection-job.test.ts +++ b/src/server/services/__tests__/anomaly-detection-job.test.ts @@ -13,12 +13,20 @@ vi.mock("@/server/services/anomaly-detector", () => ({ }, })); +// SC-3: control leadership so the tick guard can be exercised both ways. +// Defaults to leader so the existing tick-driven tests keep doing work. +vi.mock("@/server/services/leader-election", () => ({ + isLeader: vi.fn(() => true), +})); + import { prisma } from "@/lib/prisma"; import { AnomalyDetectionService } from "@/server/services/anomaly-detection-job"; import { evaluateAllPipelines } from "@/server/services/anomaly-detector"; +import { isLeader } from "@/server/services/leader-election"; const prismaMock = prisma as unknown as DeepMockProxy; const mockEvaluateAll = evaluateAllPipelines as ReturnType; +const mockIsLeader = isLeader as ReturnType; // ─── Tests ────────────────────────────────────────────────────────────────── @@ -28,6 +36,7 @@ describe("AnomalyDetectionService", () => { beforeEach(() => { mockReset(prismaMock); vi.clearAllMocks(); + mockIsLeader.mockReturnValue(true); vi.useFakeTimers(); // The tick now iterates orgs from prisma.organization.findMany. Default // mock returns a single default org so existing single-org test cases @@ -155,4 +164,36 @@ describe("AnomalyDetectionService", () => { // Release the hung evaluate so the test exits cleanly. releaseFirst([]); }); + + // ── SC-3: leadership guard (de-SPOF schedulers) ────────────────────────── + + describe("leadership guard", () => { + it("tick is a no-op when the instance is no longer leader", async () => { + mockIsLeader.mockReturnValue(false); + prismaMock.organization.findMany.mockResolvedValue([ + { id: "org-a" } as never, + ]); + + service.init(); + await vi.advanceTimersByTimeAsync(60_000); + + // A demoted instance must do no work: no org scan, no pipeline + // evaluation — otherwise it duplicates the new leader's anomaly runs. + expect(prismaMock.organization.findMany).not.toHaveBeenCalled(); + expect(mockEvaluateAll).not.toHaveBeenCalled(); + }); + + it("tick proceeds normally while the instance is leader", async () => { + mockIsLeader.mockReturnValue(true); + prismaMock.organization.findMany.mockResolvedValue([ + { id: "org-a" } as never, + ]); + + service.init(); + await vi.advanceTimersByTimeAsync(60_000); + + expect(prismaMock.organization.findMany).toHaveBeenCalledTimes(1); + expect(mockEvaluateAll).toHaveBeenCalledWith({ organizationId: "org-a" }); + }); + }); }); diff --git a/src/server/services/__tests__/fleet-alert-service.test.ts b/src/server/services/__tests__/fleet-alert-service.test.ts index 214d41d0..389e11e9 100644 --- a/src/server/services/__tests__/fleet-alert-service.test.ts +++ b/src/server/services/__tests__/fleet-alert-service.test.ts @@ -33,6 +33,12 @@ vi.mock("@/server/services/cost-alert", () => ({ evaluateCostAlerts: vi.fn().mockResolvedValue(undefined), })); +// SC-3: control leadership so the tick guard can be exercised both ways. +// Defaults to leader so the existing tick-driven tests keep doing work. +vi.mock("@/server/services/leader-election", () => ({ + isLeader: vi.fn(() => true), +})); + import { prisma } from "@/lib/prisma"; import { FleetAlertService } from "@/server/services/fleet-alert-service"; import { @@ -44,8 +50,10 @@ import { } from "@/server/services/fleet-metrics"; import { deliverToChannels } from "@/server/services/channels"; import { getVersionDrift } from "@/server/services/drift-metrics"; +import { isLeader } from "@/server/services/leader-election"; const prismaMock = prisma as unknown as DeepMockProxy; +const mockIsLeader = isLeader as ReturnType; const mockGetFleetErrorRate = getFleetErrorRate as ReturnType; const mockGetFleetEventVolume = getFleetEventVolume as ReturnType; @@ -130,6 +138,8 @@ describe("FleetAlertService", () => { mockGetFleetThroughputDrop.mockReset(); mockGetNodeLoadImbalance.mockReset(); (deliverToChannels as ReturnType).mockReset().mockResolvedValue([]); + mockIsLeader.mockReset(); + mockIsLeader.mockReturnValue(true); service = new FleetAlertService(); }); @@ -672,4 +682,43 @@ describe("FleetAlertService", () => { expect(prismaMock.alertRule.findMany).not.toHaveBeenCalled(); }); }); + + // ── SC-3: leadership guard (de-SPOF schedulers) ────────────────────────── + + describe("leadership guard", () => { + it("tick is a no-op when the instance is no longer leader", async () => { + mockIsLeader.mockReturnValue(false); + prismaMock.organization.findMany.mockResolvedValue([ + { id: "org-a" } as never, + ]); + prismaMock.alertRule.findMany.mockResolvedValue([]); + + service.start(); + await vi.advanceTimersByTimeAsync(30_000); + + // A demoted instance must do no work: no org scan, no rule evaluation, + // no channel delivery — otherwise it double-fires with the new leader. + expect(prismaMock.organization.findMany).not.toHaveBeenCalled(); + expect(prismaMock.alertRule.findMany).not.toHaveBeenCalled(); + expect(deliverToChannels).not.toHaveBeenCalled(); + }); + + it("tick proceeds normally while the instance is leader", async () => { + mockIsLeader.mockReturnValue(true); + prismaMock.organization.findMany.mockResolvedValue([ + { id: "org-a" } as never, + ]); + prismaMock.alertRule.findMany.mockResolvedValue([]); + + service.start(); + await vi.advanceTimersByTimeAsync(30_000); + + expect(prismaMock.organization.findMany).toHaveBeenCalledTimes(1); + expect(prismaMock.alertRule.findMany).toHaveBeenCalledWith( + expect.objectContaining({ + where: expect.objectContaining({ organizationId: "org-a" }), + }), + ); + }); + }); }); diff --git a/src/server/services/__tests__/git-sync-retry.test.ts b/src/server/services/__tests__/git-sync-retry.test.ts index 69e2be9a..3d248314 100644 --- a/src/server/services/__tests__/git-sync-retry.test.ts +++ b/src/server/services/__tests__/git-sync-retry.test.ts @@ -23,16 +23,42 @@ vi.mock("@/server/services/event-alerts", () => ({ vi.mock("@/server/services/sse-broadcast", () => ({ broadcastSSE: vi.fn(), })); +vi.mock("@/server/services/leader-election", () => ({ + isLeader: vi.fn(() => true), +})); + import { prisma } from "@/lib/prisma"; import { gitSyncCommitPipeline } from "@/server/services/git-sync"; import { fireEventAlert } from "@/server/services/event-alerts"; import { GitSyncRetryService, getNextRetryAt, createGitSyncJob } from "../git-sync-retry"; +import { isLeader } from "@/server/services/leader-election"; const prismaMock = prisma as unknown as DeepMockProxy; const commitMock = vi.mocked(gitSyncCommitPipeline); const fireAlertMock = vi.mocked(fireEventAlert); +// Keep leadership true by default so the per-tick guard never suppresses the +// existing tick tests; individual tests opt into the not-leader path. +beforeEach(() => { + vi.mocked(isLeader).mockReturnValue(true); +}); + +describe("GitSyncRetryService leadership guard", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(isLeader).mockReturnValue(true); + }); + + it("skips the tick (no work) when this instance is not the leader", async () => { + vi.mocked(isLeader).mockReturnValue(false); + const service = new GitSyncRetryService(); + await (service as unknown as { tick(): Promise }).tick(); + expect(prismaMock.organization.findMany).not.toHaveBeenCalled(); + expect(commitMock).not.toHaveBeenCalled(); + }); +}); + describe("getNextRetryAt", () => { it("returns 30s delay for attempt 0", () => { const result = getNextRetryAt(0); diff --git a/src/server/services/anomaly-detection-job.ts b/src/server/services/anomaly-detection-job.ts index c45a3c12..fe83595c 100644 --- a/src/server/services/anomaly-detection-job.ts +++ b/src/server/services/anomaly-detection-job.ts @@ -4,7 +4,8 @@ import { evaluateAllPipelines, ANOMALY_CONFIG, } from "@/server/services/anomaly-detector"; -import { infoLog, errorLog } from "@/lib/logger"; +import { debugLog, infoLog, errorLog } from "@/lib/logger"; +import { isLeader } from "@/server/services/leader-election"; // ─── Constants ────────────────────────────────────────────────────────────── @@ -59,6 +60,14 @@ export class AnomalyDetectionService { * tenant's analysis (or failure) does not stall another's. */ private async tick(): Promise { + // SC-3: re-check leadership each tick. A demoted leader's setInterval keeps + // firing for up to one TTL (~15s) after Redis renewals fail; without this + // guard the old + new leader both run anomaly analysis, duplicating events. + // Guard only — the timer stays so it resumes if leadership is re-acquired. + if (!isLeader()) { + debugLog("anomaly-detection", "Skipping tick — instance is no longer leader"); + return; + } if (this.tickInFlight) { infoLog( "anomaly-detection", diff --git a/src/server/services/backup-scheduler.ts b/src/server/services/backup-scheduler.ts index f2886c29..a103e369 100644 --- a/src/server/services/backup-scheduler.ts +++ b/src/server/services/backup-scheduler.ts @@ -5,6 +5,7 @@ import { getOrgSettings } from "@/lib/org-settings"; import { debugLog, infoLog, errorLog } from "@/lib/logger"; import { createBackup, runRetentionCleanup, runOrphanCleanup } from "./backup"; import { fireEventAlert } from "./event-alerts"; +import { isLeader } from "@/server/services/leader-election"; /** * Per-organization backup scheduler. @@ -98,6 +99,17 @@ function scheduleJobForOrg( } const task = cron.schedule(cronExpression, async () => { + // SC-3: a demoted leader's cron tasks keep firing for up to one TTL (~15s) + // after Redis renewals fail. Re-check leadership at the top of the per-org + // callback so a demoted instance skips the backup instead of racing the new + // leader (duplicate backups). Guard only — the cron task is left registered. + if (!isLeader()) { + debugLog( + "backup-scheduler", + `Skipping scheduled backup for org=${organizationId} — instance is no longer leader`, + ); + return; + } infoLog( "backup-scheduler", `Starting scheduled backup for org=${organizationId}`, diff --git a/src/server/services/cost-optimizer-scheduler.ts b/src/server/services/cost-optimizer-scheduler.ts index cf07da58..ef6c0bff 100644 --- a/src/server/services/cost-optimizer-scheduler.ts +++ b/src/server/services/cost-optimizer-scheduler.ts @@ -8,6 +8,7 @@ import { cleanupExpiredRecommendations, } from "@/server/services/cost-recommendations"; import { generateAiRecommendations } from "@/server/services/cost-optimizer-ai"; +import { isLeader } from "@/server/services/leader-election"; /** * Cost-optimizer scheduler — single global cron tick fans out across orgs. @@ -86,6 +87,15 @@ function scheduleJob( } const task = cron.schedule(cronExpression, async () => { + // SC-3: both the daily and continuous passes route through this callback. + // A demoted leader's cron tasks keep firing for up to one TTL (~15s) after + // Redis renewals fail, so re-check leadership here — a demoted instance + // skips the run instead of racing the new leader (duplicate analysis runs). + // Guard only — the cron task is left registered. + if (!isLeader()) { + debugLog("cost-optimizer", `Skipping ${label} — instance is no longer leader`); + return; + } infoLog("cost-optimizer", `Starting ${label}...`); try { await run(); diff --git a/src/server/services/fleet-alert-service.ts b/src/server/services/fleet-alert-service.ts index 02160116..3c0e9507 100644 --- a/src/server/services/fleet-alert-service.ts +++ b/src/server/services/fleet-alert-service.ts @@ -17,7 +17,8 @@ import type { LoadImbalanceResult, ThroughputDropDetail } from "@/server/service import { getVersionDrift } from "@/server/services/drift-metrics"; import { checkCertificateExpiry } from "@/server/services/cert-expiry-checker"; import { evaluateCostAlerts } from "@/server/services/cost-alert"; -import { infoLog, errorLog } from "@/lib/logger"; +import { debugLog, infoLog, errorLog } from "@/lib/logger"; +import { isLeader } from "@/server/services/leader-election"; // Re-export the constant for downstream use (e.g. T03 validation) export { FLEET_METRICS } from "@/server/services/alert-evaluator"; @@ -83,6 +84,15 @@ export class FleetAlertService { * their own queries. */ private async tick(): Promise { + // SC-3: a demoted leader's timers keep firing for up to one TTL (~15s) + // after Redis renewals fail. Without this guard the old + new leader both + // evaluate every rule, double-firing alerts. Re-check leadership each tick + // so a demoted instance becomes a no-op (we keep the timer, not tear it + // down, so it resumes cleanly if it re-acquires leadership). + if (!isLeader()) { + debugLog("fleet-alert", "Skipping tick — instance is no longer leader"); + return; + } if (this.tickInFlight) { infoLog( "fleet-alert", diff --git a/src/server/services/git-sync-retry.ts b/src/server/services/git-sync-retry.ts index 194f6ee6..4b79bc31 100644 --- a/src/server/services/git-sync-retry.ts +++ b/src/server/services/git-sync-retry.ts @@ -5,6 +5,7 @@ import { gitSyncCommitPipeline, gitSyncDeletePipeline } from "@/server/services/ import { fireEventAlert } from "@/server/services/event-alerts"; import { loadOrgDataKeyCiphertext } from "@/server/services/crypto-v3-callsite"; import { broadcastSSE } from "@/server/services/sse-broadcast"; +import { isLeader } from "@/server/services/leader-election"; // --- Constants --- @@ -61,6 +62,10 @@ export class GitSyncRetryService { * Single tick: iterate orgs and process retries per org. */ private async tick(): Promise { + if (!isLeader()) { + debugLog("git-sync-retry", "Skipping tick — instance is no longer leader"); + return; + } if (this.tickInFlight) { infoLog( "git-sync-retry", diff --git a/src/server/services/lake/lake-alerts.ts b/src/server/services/lake/lake-alerts.ts index 72d52b75..ba6fdff2 100644 --- a/src/server/services/lake/lake-alerts.ts +++ b/src/server/services/lake/lake-alerts.ts @@ -1,7 +1,8 @@ import { adminPrisma, prisma } from "@/lib/prisma"; import { withOrgTx } from "@/lib/with-org-tx"; import { runWithOrgContext } from "@/lib/org-context"; -import { infoLog, errorLog } from "@/lib/logger"; +import { debugLog, infoLog, errorLog } from "@/lib/logger"; +import { isLeader } from "@/server/services/leader-election"; import type { LakeAlertRule } from "@/generated/prisma"; import { isLakeEnabled } from "./clickhouse"; import { aggregateValue, type LakeAggFunction } from "./lake-query"; @@ -279,6 +280,14 @@ let timer: NodeJS.Timeout | null = null; let tickInFlight = false; async function tick(): Promise { + // SC-3: re-check leadership each tick. A demoted leader's setInterval keeps + // firing for up to one TTL (~15s) after Redis renewals fail; without this + // guard the old + new leader both evaluate lake alert rules, double-firing. + // Guard only — the timer stays so it resumes if leadership is re-acquired. + if (!isLeader()) { + debugLog("lake-alert", "Skipping tick — instance is no longer leader"); + return; + } if (tickInFlight) return; // setInterval does not skip overlapping callbacks tickInFlight = true; try { diff --git a/src/server/services/metrics-rollup.ts b/src/server/services/metrics-rollup.ts index c42688cd..e894cff5 100644 --- a/src/server/services/metrics-rollup.ts +++ b/src/server/services/metrics-rollup.ts @@ -1,6 +1,7 @@ import { adminPrisma } from "@/lib/prisma"; import { withOrgTx } from "@/lib/with-org-tx"; -import { infoLog, errorLog } from "@/lib/logger"; +import { infoLog, errorLog, debugLog } from "@/lib/logger"; +import { isLeader } from "@/server/services/leader-election"; /** * Long-retention metric rollups (B5). @@ -602,6 +603,10 @@ const ROLLUP_INTERVAL_MS = HOUR_MS; let timer: ReturnType | null = null; async function runRollupTick(): Promise { + if (!isLeader()) { + debugLog("metrics-rollup", "Skipping tick — instance is no longer leader"); + return; + } for (const granularity of ["HOUR", "DAY"] as const) { try { const result = await rollupMetrics({ granularity }); diff --git a/src/server/services/retry-service.ts b/src/server/services/retry-service.ts index d560029f..f9883727 100644 --- a/src/server/services/retry-service.ts +++ b/src/server/services/retry-service.ts @@ -7,7 +7,8 @@ import { getDriver } from "@/server/services/channels"; import { decryptChannelConfig } from "@/server/services/channel-secrets"; import type { ChannelPayload } from "@/server/services/channels/types"; import { deliverOutboundWebhook, isPermanentFailure } from "@/server/services/outbound-webhook"; -import { infoLog, errorLog } from "@/lib/logger"; +import { debugLog, infoLog, errorLog } from "@/lib/logger"; +import { isLeader } from "@/server/services/leader-election"; // ─── Constants ────────────────────────────────────────────────────────────── @@ -51,6 +52,15 @@ export class RetryService { * tracking wrappers, which handle backoff scheduling automatically. */ async processRetries(): Promise { + // SC-3: a demoted leader's setInterval keeps firing for up to one TTL + // (~15s) after Redis renewals fail. Re-check leadership so a demoted + // instance doesn't re-deliver alerts/webhooks the new leader is already + // retrying (duplicate deliveries). Covers processOutboundRetries too (it + // runs below). Guard only — the timer stays so it resumes on re-acquisition. + if (!isLeader()) { + debugLog("retry-service", "Skipping poll — instance is no longer leader"); + return; + } let dueRetries; try { dueRetries = await prisma.deliveryAttempt.findMany({ diff --git a/src/server/services/telemetry-scheduler.ts b/src/server/services/telemetry-scheduler.ts index cd257157..275ba727 100644 --- a/src/server/services/telemetry-scheduler.ts +++ b/src/server/services/telemetry-scheduler.ts @@ -1,5 +1,7 @@ import cron, { type ScheduledTask } from "node-cron"; import { sendTelemetryHeartbeat } from "./telemetry-sender"; +import { debugLog } from "@/lib/logger"; +import { isLeader } from "@/server/services/leader-election"; // Daily at 03:42 UTC. Jitter across instances happens naturally because each // VF install runs at a different wall clock when the cron crosses the boundary. @@ -10,6 +12,13 @@ let task: ScheduledTask | null = null; export function initTelemetryScheduler(): void { if (task) return; task = cron.schedule(DAILY_CRON, async () => { + // SC-3: a demoted leader's cron keeps firing for up to one TTL (~15s) after + // Redis renewals fail. Re-check leadership so only the current leader sends + // the telemetry heartbeat. Guard only — the cron task is left registered. + if (!isLeader()) { + debugLog("telemetry", "Skipping heartbeat — instance is no longer leader"); + return; + } try { await sendTelemetryHeartbeat(); } catch (err) {