Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/server/services/__tests__/anomaly-detection-job.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,20 @@ vi.mock("@/server/services/anomaly-detector", () => ({
},
}));

// SC-3: control leadership so the tick guard can be exercised both ways.
// Defaults to leader so the existing tick-driven tests keep doing work.
vi.mock("@/server/services/leader-election", () => ({
isLeader: vi.fn(() => true),
}));

import { prisma } from "@/lib/prisma";
import { AnomalyDetectionService } from "@/server/services/anomaly-detection-job";
import { evaluateAllPipelines } from "@/server/services/anomaly-detector";
import { isLeader } from "@/server/services/leader-election";

const prismaMock = prisma as unknown as DeepMockProxy<PrismaClient>;
const mockEvaluateAll = evaluateAllPipelines as ReturnType<typeof vi.fn>;
const mockIsLeader = isLeader as ReturnType<typeof vi.fn>;

// ─── Tests ──────────────────────────────────────────────────────────────────

Expand All @@ -28,6 +36,7 @@ describe("AnomalyDetectionService", () => {
beforeEach(() => {
mockReset(prismaMock);
vi.clearAllMocks();
mockIsLeader.mockReturnValue(true);
vi.useFakeTimers();
// The tick now iterates orgs from prisma.organization.findMany. Default
// mock returns a single default org so existing single-org test cases
Expand Down Expand Up @@ -155,4 +164,36 @@ describe("AnomalyDetectionService", () => {
// Release the hung evaluate so the test exits cleanly.
releaseFirst([]);
});

// ── SC-3: leadership guard (de-SPOF schedulers) ──────────────────────────

describe("leadership guard", () => {
it("tick is a no-op when the instance is no longer leader", async () => {
mockIsLeader.mockReturnValue(false);
prismaMock.organization.findMany.mockResolvedValue([
{ id: "org-a" } as never,
]);

service.init();
await vi.advanceTimersByTimeAsync(60_000);

// A demoted instance must do no work: no org scan, no pipeline
// evaluation — otherwise it duplicates the new leader's anomaly runs.
expect(prismaMock.organization.findMany).not.toHaveBeenCalled();
expect(mockEvaluateAll).not.toHaveBeenCalled();
});

it("tick proceeds normally while the instance is leader", async () => {
mockIsLeader.mockReturnValue(true);
prismaMock.organization.findMany.mockResolvedValue([
{ id: "org-a" } as never,
]);

service.init();
await vi.advanceTimersByTimeAsync(60_000);

expect(prismaMock.organization.findMany).toHaveBeenCalledTimes(1);
expect(mockEvaluateAll).toHaveBeenCalledWith({ organizationId: "org-a" });
});
});
});
49 changes: 49 additions & 0 deletions src/server/services/__tests__/fleet-alert-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ vi.mock("@/server/services/cost-alert", () => ({
evaluateCostAlerts: vi.fn().mockResolvedValue(undefined),
}));

// SC-3: control leadership so the tick guard can be exercised both ways.
// Defaults to leader so the existing tick-driven tests keep doing work.
vi.mock("@/server/services/leader-election", () => ({
isLeader: vi.fn(() => true),
}));

import { prisma } from "@/lib/prisma";
import { FleetAlertService } from "@/server/services/fleet-alert-service";
import {
Expand All @@ -44,8 +50,10 @@ import {
} from "@/server/services/fleet-metrics";
import { deliverToChannels } from "@/server/services/channels";
import { getVersionDrift } from "@/server/services/drift-metrics";
import { isLeader } from "@/server/services/leader-election";

const prismaMock = prisma as unknown as DeepMockProxy<PrismaClient>;
const mockIsLeader = isLeader as ReturnType<typeof vi.fn>;

const mockGetFleetErrorRate = getFleetErrorRate as ReturnType<typeof vi.fn>;
const mockGetFleetEventVolume = getFleetEventVolume as ReturnType<typeof vi.fn>;
Expand Down Expand Up @@ -130,6 +138,8 @@ describe("FleetAlertService", () => {
mockGetFleetThroughputDrop.mockReset();
mockGetNodeLoadImbalance.mockReset();
(deliverToChannels as ReturnType<typeof vi.fn>).mockReset().mockResolvedValue([]);
mockIsLeader.mockReset();
mockIsLeader.mockReturnValue(true);

service = new FleetAlertService();
});
Expand Down Expand Up @@ -672,4 +682,43 @@ describe("FleetAlertService", () => {
expect(prismaMock.alertRule.findMany).not.toHaveBeenCalled();
});
});

// ── SC-3: leadership guard (de-SPOF schedulers) ──────────────────────────

describe("leadership guard", () => {
it("tick is a no-op when the instance is no longer leader", async () => {
mockIsLeader.mockReturnValue(false);
prismaMock.organization.findMany.mockResolvedValue([
{ id: "org-a" } as never,
]);
prismaMock.alertRule.findMany.mockResolvedValue([]);

service.start();
await vi.advanceTimersByTimeAsync(30_000);

// A demoted instance must do no work: no org scan, no rule evaluation,
// no channel delivery — otherwise it double-fires with the new leader.
expect(prismaMock.organization.findMany).not.toHaveBeenCalled();
expect(prismaMock.alertRule.findMany).not.toHaveBeenCalled();
expect(deliverToChannels).not.toHaveBeenCalled();
});

it("tick proceeds normally while the instance is leader", async () => {
mockIsLeader.mockReturnValue(true);
prismaMock.organization.findMany.mockResolvedValue([
{ id: "org-a" } as never,
]);
prismaMock.alertRule.findMany.mockResolvedValue([]);

service.start();
await vi.advanceTimersByTimeAsync(30_000);

expect(prismaMock.organization.findMany).toHaveBeenCalledTimes(1);
expect(prismaMock.alertRule.findMany).toHaveBeenCalledWith(
expect.objectContaining({
where: expect.objectContaining({ organizationId: "org-a" }),
}),
);
});
});
});
26 changes: 26 additions & 0 deletions src/server/services/__tests__/git-sync-retry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,42 @@ vi.mock("@/server/services/event-alerts", () => ({
vi.mock("@/server/services/sse-broadcast", () => ({
broadcastSSE: vi.fn(),
}));
vi.mock("@/server/services/leader-election", () => ({
isLeader: vi.fn(() => true),
}));


import { prisma } from "@/lib/prisma";
import { gitSyncCommitPipeline } from "@/server/services/git-sync";
import { fireEventAlert } from "@/server/services/event-alerts";
import { GitSyncRetryService, getNextRetryAt, createGitSyncJob } from "../git-sync-retry";
import { isLeader } from "@/server/services/leader-election";

const prismaMock = prisma as unknown as DeepMockProxy<PrismaClient>;
const commitMock = vi.mocked(gitSyncCommitPipeline);
const fireAlertMock = vi.mocked(fireEventAlert);

// Keep leadership true by default so the per-tick guard never suppresses the
// existing tick tests; individual tests opt into the not-leader path.
beforeEach(() => {
vi.mocked(isLeader).mockReturnValue(true);
});

describe("GitSyncRetryService leadership guard", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.mocked(isLeader).mockReturnValue(true);
});

it("skips the tick (no work) when this instance is not the leader", async () => {
vi.mocked(isLeader).mockReturnValue(false);
const service = new GitSyncRetryService();
await (service as unknown as { tick(): Promise<void> }).tick();
expect(prismaMock.organization.findMany).not.toHaveBeenCalled();
expect(commitMock).not.toHaveBeenCalled();
});
});

describe("getNextRetryAt", () => {
it("returns 30s delay for attempt 0", () => {
const result = getNextRetryAt(0);
Expand Down
11 changes: 10 additions & 1 deletion src/server/services/anomaly-detection-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import {
evaluateAllPipelines,
ANOMALY_CONFIG,
} from "@/server/services/anomaly-detector";
import { infoLog, errorLog } from "@/lib/logger";
import { debugLog, infoLog, errorLog } from "@/lib/logger";
import { isLeader } from "@/server/services/leader-election";

// ─── Constants ──────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -59,6 +60,14 @@ export class AnomalyDetectionService {
* tenant's analysis (or failure) does not stall another's.
*/
private async tick(): Promise<void> {
// SC-3: re-check leadership each tick. A demoted leader's setInterval keeps
// firing for up to one TTL (~15s) after Redis renewals fail; without this
// guard the old + new leader both run anomaly analysis, duplicating events.
// Guard only — the timer stays so it resumes if leadership is re-acquired.
if (!isLeader()) {
debugLog("anomaly-detection", "Skipping tick — instance is no longer leader");
return;
}
if (this.tickInFlight) {
infoLog(
"anomaly-detection",
Expand Down
12 changes: 12 additions & 0 deletions src/server/services/backup-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { getOrgSettings } from "@/lib/org-settings";
import { debugLog, infoLog, errorLog } from "@/lib/logger";
import { createBackup, runRetentionCleanup, runOrphanCleanup } from "./backup";
import { fireEventAlert } from "./event-alerts";
import { isLeader } from "@/server/services/leader-election";

/**
* Per-organization backup scheduler.
Expand Down Expand Up @@ -98,6 +99,17 @@ function scheduleJobForOrg(
}

const task = cron.schedule(cronExpression, async () => {
// SC-3: a demoted leader's cron tasks keep firing for up to one TTL (~15s)
// after Redis renewals fail. Re-check leadership at the top of the per-org
// callback so a demoted instance skips the backup instead of racing the new
// leader (duplicate backups). Guard only — the cron task is left registered.
if (!isLeader()) {
debugLog(
"backup-scheduler",
`Skipping scheduled backup for org=${organizationId} — instance is no longer leader`,
);
return;
}
infoLog(
"backup-scheduler",
`Starting scheduled backup for org=${organizationId}`,
Expand Down
10 changes: 10 additions & 0 deletions src/server/services/cost-optimizer-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
cleanupExpiredRecommendations,
} from "@/server/services/cost-recommendations";
import { generateAiRecommendations } from "@/server/services/cost-optimizer-ai";
import { isLeader } from "@/server/services/leader-election";

/**
* Cost-optimizer scheduler — single global cron tick fans out across orgs.
Expand Down Expand Up @@ -86,6 +87,15 @@ function scheduleJob(
}

const task = cron.schedule(cronExpression, async () => {
// SC-3: both the daily and continuous passes route through this callback.
// A demoted leader's cron tasks keep firing for up to one TTL (~15s) after
// Redis renewals fail, so re-check leadership here — a demoted instance
// skips the run instead of racing the new leader (duplicate analysis runs).
// Guard only — the cron task is left registered.
if (!isLeader()) {
debugLog("cost-optimizer", `Skipping ${label} — instance is no longer leader`);
return;
}
infoLog("cost-optimizer", `Starting ${label}...`);
try {
await run();
Expand Down
12 changes: 11 additions & 1 deletion src/server/services/fleet-alert-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import type { LoadImbalanceResult, ThroughputDropDetail } from "@/server/service
import { getVersionDrift } from "@/server/services/drift-metrics";
import { checkCertificateExpiry } from "@/server/services/cert-expiry-checker";
import { evaluateCostAlerts } from "@/server/services/cost-alert";
import { infoLog, errorLog } from "@/lib/logger";
import { debugLog, infoLog, errorLog } from "@/lib/logger";
import { isLeader } from "@/server/services/leader-election";

// Re-export the constant for downstream use (e.g. T03 validation)
export { FLEET_METRICS } from "@/server/services/alert-evaluator";
Expand Down Expand Up @@ -83,6 +84,15 @@ export class FleetAlertService {
* their own queries.
*/
private async tick(): Promise<void> {
// SC-3: a demoted leader's timers keep firing for up to one TTL (~15s)
// after Redis renewals fail. Without this guard the old + new leader both
// evaluate every rule, double-firing alerts. Re-check leadership each tick
// so a demoted instance becomes a no-op (we keep the timer, not tear it
// down, so it resumes cleanly if it re-acquires leadership).
if (!isLeader()) {
debugLog("fleet-alert", "Skipping tick — instance is no longer leader");
return;
}
if (this.tickInFlight) {
infoLog(
"fleet-alert",
Expand Down
5 changes: 5 additions & 0 deletions src/server/services/git-sync-retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { gitSyncCommitPipeline, gitSyncDeletePipeline } from "@/server/services/
import { fireEventAlert } from "@/server/services/event-alerts";
import { loadOrgDataKeyCiphertext } from "@/server/services/crypto-v3-callsite";
import { broadcastSSE } from "@/server/services/sse-broadcast";
import { isLeader } from "@/server/services/leader-election";

// --- Constants ---

Expand Down Expand Up @@ -61,6 +62,10 @@ export class GitSyncRetryService {
* Single tick: iterate orgs and process retries per org.
*/
private async tick(): Promise<void> {
if (!isLeader()) {
debugLog("git-sync-retry", "Skipping tick — instance is no longer leader");
return;
}
if (this.tickInFlight) {
infoLog(
"git-sync-retry",
Expand Down
11 changes: 10 additions & 1 deletion src/server/services/lake/lake-alerts.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { adminPrisma, prisma } from "@/lib/prisma";
import { withOrgTx } from "@/lib/with-org-tx";
import { runWithOrgContext } from "@/lib/org-context";
import { infoLog, errorLog } from "@/lib/logger";
import { debugLog, infoLog, errorLog } from "@/lib/logger";
import { isLeader } from "@/server/services/leader-election";
import type { LakeAlertRule } from "@/generated/prisma";
import { isLakeEnabled } from "./clickhouse";
import { aggregateValue, type LakeAggFunction } from "./lake-query";
Expand Down Expand Up @@ -279,6 +280,14 @@ let timer: NodeJS.Timeout | null = null;
let tickInFlight = false;

async function tick(): Promise<void> {
// SC-3: re-check leadership each tick. A demoted leader's setInterval keeps
// firing for up to one TTL (~15s) after Redis renewals fail; without this
// guard the old + new leader both evaluate lake alert rules, double-firing.
// Guard only — the timer stays so it resumes if leadership is re-acquired.
if (!isLeader()) {
debugLog("lake-alert", "Skipping tick — instance is no longer leader");
return;
}
if (tickInFlight) return; // setInterval does not skip overlapping callbacks
tickInFlight = true;
try {
Expand Down
7 changes: 6 additions & 1 deletion src/server/services/metrics-rollup.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { adminPrisma } from "@/lib/prisma";
import { withOrgTx } from "@/lib/with-org-tx";
import { infoLog, errorLog } from "@/lib/logger";
import { infoLog, errorLog, debugLog } from "@/lib/logger";
import { isLeader } from "@/server/services/leader-election";

/**
* Long-retention metric rollups (B5).
Expand Down Expand Up @@ -602,6 +603,10 @@ const ROLLUP_INTERVAL_MS = HOUR_MS;
let timer: ReturnType<typeof setInterval> | null = null;

async function runRollupTick(): Promise<void> {
if (!isLeader()) {
debugLog("metrics-rollup", "Skipping tick — instance is no longer leader");
return;
}
for (const granularity of ["HOUR", "DAY"] as const) {
try {
const result = await rollupMetrics({ granularity });
Expand Down
12 changes: 11 additions & 1 deletion src/server/services/retry-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import { getDriver } from "@/server/services/channels";
import { decryptChannelConfig } from "@/server/services/channel-secrets";
import type { ChannelPayload } from "@/server/services/channels/types";
import { deliverOutboundWebhook, isPermanentFailure } from "@/server/services/outbound-webhook";
import { infoLog, errorLog } from "@/lib/logger";
import { debugLog, infoLog, errorLog } from "@/lib/logger";
import { isLeader } from "@/server/services/leader-election";

// ─── Constants ──────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -51,6 +52,15 @@ export class RetryService {
* tracking wrappers, which handle backoff scheduling automatically.
*/
async processRetries(): Promise<void> {
// SC-3: a demoted leader's setInterval keeps firing for up to one TTL
// (~15s) after Redis renewals fail. Re-check leadership so a demoted
// instance doesn't re-deliver alerts/webhooks the new leader is already
// retrying (duplicate deliveries). Covers processOutboundRetries too (it
// runs below). Guard only — the timer stays so it resumes on re-acquisition.
if (!isLeader()) {
debugLog("retry-service", "Skipping poll — instance is no longer leader");
return;
}
let dueRetries;
try {
dueRetries = await prisma.deliveryAttempt.findMany({
Expand Down
9 changes: 9 additions & 0 deletions src/server/services/telemetry-scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import cron, { type ScheduledTask } from "node-cron";
import { sendTelemetryHeartbeat } from "./telemetry-sender";
import { debugLog } from "@/lib/logger";
import { isLeader } from "@/server/services/leader-election";

// Daily at 03:42 UTC. Jitter across instances happens naturally because each
// VF install runs at a different wall clock when the cron crosses the boundary.
Expand All @@ -10,6 +12,13 @@ let task: ScheduledTask | null = null;
export function initTelemetryScheduler(): void {
if (task) return;
task = cron.schedule(DAILY_CRON, async () => {
// SC-3: a demoted leader's cron keeps firing for up to one TTL (~15s) after
// Redis renewals fail. Re-check leadership so only the current leader sends
// the telemetry heartbeat. Guard only — the cron task is left registered.
if (!isLeader()) {
debugLog("telemetry", "Skipping heartbeat — instance is no longer leader");
return;
}
try {
await sendTelemetryHeartbeat();
} catch (err) {
Expand Down
Loading