Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/instrumentation.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,19 @@ export async function registerNodeInstrumentation() {
errorLog("instrumentation", "Failed to initialize lake alert scheduler", error);
}

try {
// VectorFlow Lake: retention sweeper. Enforces each dataset's effective
// coldDays drop horizon (per-policy, not just the table default TTL).
// Leader-gated and a no-op when the lake is disabled (same contract as the
// alert scheduler). Best-effort — a scheduler init hiccup never blocks boot.
const { initLakeRetentionScheduler } = await import(
"@/server/services/lake/lake-retention"
);
initLakeRetentionScheduler();
} catch (error) {
errorLog("instrumentation", "Failed to initialize lake retention scheduler", error);
}

try {
const { importLegacyBackups } = await import("@/server/services/backup");
const result = await importLegacyBackups();
Expand Down
9 changes: 9 additions & 0 deletions src/server/routers/lake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { withAudit } from "@/server/middleware/audit";
import { prisma } from "@/lib/prisma";
import type { Prisma } from "@/generated/prisma";
import { isLakeEnabled } from "@/server/services/lake/clickhouse";
import { evaluateLakeQuota } from "@/server/services/lake/lake-quota";
import {
LAKE_ALERT_COMPARATORS,
testFireLakeAlertRule,
Expand Down Expand Up @@ -213,6 +214,14 @@ export const lakeRouter = router({
* no team gate. */
status: protectedProcedure.query(() => ({ enabled: isLakeEnabled() })),

/** Per-org Lake byte-quota status — a read-only badge surface (mirrors the
* agents/pipelines quota badges). Unlimited in OSS (the default provider);
* a commercial tier provider returns a finite ceiling + over-quota flag.
* Org is resolved from the caller's session (`ctx.organizationId`), never
* from input, so it carries no tenant-id input and reads only the caller's
* own org. */
quotaStatus: protectedProcedure.query(({ ctx }) => evaluateLakeQuota(ctx.organizationId)),

/** Scheduled threshold alerts over lake datasets. */
alert: lakeAlertRouter,

Expand Down
5 changes: 5 additions & 0 deletions src/server/services/lake/__tests__/lake-catalog.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import type { MetricsDataPoint, PreviousSnapshot } from "@/server/services/metrics-ingest";

// @clickhouse/client is an optional native dep absent from some dev installs;
// stub it so the real clickhouse.ts loads (isLakeEnabled reads env only — this
// suite toggles VF_LAKE_CLICKHOUSE_URL rather than mocking the module).
vi.mock("@clickhouse/client", () => ({ createClient: vi.fn() }));

// Shared prisma mock, hoisted so the vi.mock factories can reference it.
const { prismaMock } = vi.hoisted(() => ({
prismaMock: {
Expand Down
150 changes: 150 additions & 0 deletions src/server/services/lake/__tests__/lake-quota.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";

// Hoisted prisma mock. The lakeDataset surface includes mutation methods so we
// can assert the quota path NEVER drops/rewrites data (it is a soft signal).
const { prismaMock } = vi.hoisted(() => ({
prismaMock: {
lakeDataset: {
aggregate: vi.fn(),
update: vi.fn(),
updateMany: vi.fn(),
delete: vi.fn(),
deleteMany: vi.fn(),
},
},
}));

vi.mock("@/lib/prisma", () => ({
prisma: prismaMock,
basePrisma: prismaMock,
adminPrisma: prismaMock,
}));

vi.mock("@/lib/logger", () => ({
errorLog: vi.fn(),
warnLog: vi.fn(),
infoLog: vi.fn(),
debugLog: vi.fn(),
}));

import { warnLog } from "@/lib/logger";
import {
checkLakeQuota,
evaluateLakeQuota,
setLakeQuotaProvider,
resetLakeQuotaProvider,
DefaultUnlimitedLakeQuotaProvider,
type LakeQuotaProvider,
} from "../lake-quota";

/** A provider that returns a fixed ceiling for every org. */
class FixedLakeQuota implements LakeQuotaProvider {
constructor(private readonly bytes: bigint | null) {}
getLakeQuotaBytes(): bigint | null {
return this.bytes;
}
}

beforeEach(() => {
vi.clearAllMocks();
});

afterEach(() => {
resetLakeQuotaProvider();
});

describe("checkLakeQuota (pure)", () => {
it("is under quota when current < ceiling", () => {
const r = checkLakeQuota("org-1", BigInt(50), BigInt(100));
expect(r.overQuota).toBe(false);
expect(r.usageRatio).toBeCloseTo(0.5);
expect(r.quotaBytes).toBe(BigInt(100));
expect(r.currentBytes).toBe(BigInt(50));
});

it("is over quota when current > ceiling", () => {
const r = checkLakeQuota("org-1", BigInt(150), BigInt(100));
expect(r.overQuota).toBe(true);
expect(r.usageRatio).toBeCloseTo(1.5);
});

it("is NOT over quota exactly at the ceiling (strict >)", () => {
const r = checkLakeQuota("org-1", BigInt(100), BigInt(100));
expect(r.overQuota).toBe(false);
expect(r.usageRatio).toBeCloseTo(1);
});

it("treats a null ceiling as unlimited (never over quota)", () => {
const r = checkLakeQuota("org-1", BigInt("999999999999"), null);
expect(r.overQuota).toBe(false);
expect(r.quotaBytes).toBeNull();
expect(r.usageRatio).toBeNull();
});

it("a zero ceiling is over quota once any byte is stored", () => {
const over = checkLakeQuota("org-1", BigInt(1), BigInt(0));
expect(over.overQuota).toBe(true);
expect(over.usageRatio).toBe(Number.POSITIVE_INFINITY);

const empty = checkLakeQuota("org-1", BigInt(0), BigInt(0));
expect(empty.overQuota).toBe(false);
expect(empty.usageRatio).toBe(0);
});
});

describe("evaluateLakeQuota", () => {
it("OSS default is unlimited and does not read the catalog", async () => {
// Default provider (no override) — must short-circuit before any DB read.
setLakeQuotaProvider(new DefaultUnlimitedLakeQuotaProvider());

const r = await evaluateLakeQuota("org-1");

expect(r.quotaBytes).toBeNull();
expect(r.overQuota).toBe(false);
expect(prismaMock.lakeDataset.aggregate).not.toHaveBeenCalled();
});

it("sums the catalog and reports under quota without signalling", async () => {
setLakeQuotaProvider(new FixedLakeQuota(BigInt(1000)));
prismaMock.lakeDataset.aggregate.mockResolvedValue({ _sum: { byteCount: BigInt(500) } });

const r = await evaluateLakeQuota("org-1");

expect(prismaMock.lakeDataset.aggregate).toHaveBeenCalledWith({
where: { organizationId: "org-1" },
_sum: { byteCount: true },
});
expect(r.currentBytes).toBe(BigInt(500));
expect(r.overQuota).toBe(false);
expect(vi.mocked(warnLog)).not.toHaveBeenCalled();
});

it("fires a soft signal when over quota but NEVER drops or rewrites data", async () => {
setLakeQuotaProvider(new FixedLakeQuota(BigInt(100)));
prismaMock.lakeDataset.aggregate.mockResolvedValue({ _sum: { byteCount: BigInt(250) } });

const r = await evaluateLakeQuota("org-1");

expect(r.overQuota).toBe(true);
expect(r.currentBytes).toBe(BigInt(250));
// Signal fired …
expect(vi.mocked(warnLog)).toHaveBeenCalledTimes(1);
expect(vi.mocked(warnLog).mock.calls[0][0]).toBe("lake-quota");
// … and absolutely no data mutation (soft enforcement: read-only).
expect(prismaMock.lakeDataset.update).not.toHaveBeenCalled();
expect(prismaMock.lakeDataset.updateMany).not.toHaveBeenCalled();
expect(prismaMock.lakeDataset.delete).not.toHaveBeenCalled();
expect(prismaMock.lakeDataset.deleteMany).not.toHaveBeenCalled();
});

it("treats an empty catalog (null sum) as zero bytes", async () => {
setLakeQuotaProvider(new FixedLakeQuota(BigInt(100)));
prismaMock.lakeDataset.aggregate.mockResolvedValue({ _sum: { byteCount: null } });

const r = await evaluateLakeQuota("org-1");

expect(r.currentBytes).toBe(BigInt(0));
expect(r.overQuota).toBe(false);
expect(vi.mocked(warnLog)).not.toHaveBeenCalled();
});
});
193 changes: 193 additions & 0 deletions src/server/services/lake/__tests__/lake-retention.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import { describe, it, expect, vi, beforeEach } from "vitest";

const { mocks } = vi.hoisted(() => ({
mocks: {
isLakeEnabled: vi.fn(),
command: vi.fn(),
findMany: vi.fn(),
},
}));

vi.mock("@/server/services/lake/clickhouse", () => ({
isLakeEnabled: mocks.isLakeEnabled,
getLakeClient: () => ({ command: mocks.command }),
}));

vi.mock("@/lib/prisma", () => {
const client = { lakeDataset: { findMany: mocks.findMany } };
return { prisma: client, basePrisma: client, adminPrisma: client };
});

vi.mock("@/lib/logger", () => ({
errorLog: vi.fn(),
warnLog: vi.fn(),
infoLog: vi.fn(),
debugLog: vi.fn(),
}));

vi.mock("@/server/services/leader-election", () => ({ isLeader: vi.fn(() => true) }));

import {
effectiveRetention,
buildLakeTtlClause,
enforceDatasetRetention,
sweepLakeRetention,
LAKE_DEFAULT_HOT_DAYS,
LAKE_DEFAULT_COLD_DAYS,
} from "../lake-retention";

const MS_PER_DAY = 24 * 60 * 60 * 1000;

/** Extract the single object argument of the Nth `command()` call. */
function commandCall(n: number): {
query: string;
query_params: { orgId: string; pipelineId: string; cutoff: Date };
} {
return mocks.command.mock.calls[n][0] as {
query: string;
query_params: { orgId: string; pipelineId: string; cutoff: Date };
};
}

beforeEach(() => {
vi.clearAllMocks();
mocks.command.mockResolvedValue(undefined);
});

describe("effectiveRetention", () => {
it("falls back to the table defaults when there is no policy", () => {
expect(effectiveRetention(null)).toEqual({
hotDays: LAKE_DEFAULT_HOT_DAYS,
coldDays: LAKE_DEFAULT_COLD_DAYS,
});
expect(effectiveRetention(undefined)).toEqual({ hotDays: 7, coldDays: 90 });
});

it("honours a per-dataset policy", () => {
expect(effectiveRetention({ hotDays: 3, coldDays: 30 })).toEqual({ hotDays: 3, coldDays: 30 });
});

it("falls back on non-positive windows", () => {
expect(effectiveRetention({ hotDays: 0, coldDays: -5 })).toEqual({ hotDays: 7, coldDays: 90 });
});

it("clamps coldDays up to hotDays so the drop horizon never precedes the move", () => {
expect(effectiveRetention({ hotDays: 30, coldDays: 10 })).toEqual({ hotDays: 30, coldDays: 30 });
});
});

describe("buildLakeTtlClause", () => {
it("cold-tier disabled → DELETE-only at coldDays (plain MergeTree)", () => {
const clause = buildLakeTtlClause({ hotDays: 7, coldDays: 90 }, false);
expect(clause).toBe("TTL toDateTime(timestamp) + INTERVAL 90 DAY DELETE");
expect(clause).not.toContain("TO VOLUME 'cold'");
expect(clause).not.toContain("storage_policy");
});

it("cold-tier enabled → move-to-cold at hotDays + DELETE at coldDays + storage policy", () => {
const clause = buildLakeTtlClause({ hotDays: 7, coldDays: 90 }, true);
expect(clause).toContain("INTERVAL 7 DAY TO VOLUME 'cold'");
expect(clause).toContain("INTERVAL 90 DAY DELETE");
expect(clause).toContain("storage_policy = 'vf_hot_cold'");
});

it("reflects a per-dataset window", () => {
const clause = buildLakeTtlClause(effectiveRetention({ hotDays: 1, coldDays: 14 }), false);
expect(clause).toBe("TTL toDateTime(timestamp) + INTERVAL 14 DAY DELETE");
});
});

describe("enforceDatasetRetention", () => {
it("no-ops when the lake is disabled (never connects)", async () => {
mocks.isLakeEnabled.mockReturnValue(false);

const r = await enforceDatasetRetention({ orgId: "org-1", pipelineId: "pipe-1" });

expect(r).toBeNull();
expect(mocks.command).not.toHaveBeenCalled();
});

it("deletes events older than the policy's coldDays horizon, org+pipeline scoped", async () => {
mocks.isLakeEnabled.mockReturnValue(true);
const now = new Date("2026-06-08T00:00:00.000Z");

const r = await enforceDatasetRetention({
orgId: "org-1",
pipelineId: "pipe-1",
policy: { hotDays: 3, coldDays: 30 },
now,
});

expect(r).toEqual({
pipelineId: "pipe-1",
coldDays: 30,
cutoff: new Date(now.getTime() - 30 * MS_PER_DAY).toISOString(),
});
const call = commandCall(0);
expect(call.query).toContain("DELETE FROM lake_events");
expect(call.query).toContain("organizationId = {orgId:String}");
expect(call.query).toContain("pipelineId = {pipelineId:String}");
expect(call.query).toContain("timestamp < {cutoff:DateTime64(3)}");
expect(call.query_params.orgId).toBe("org-1");
expect(call.query_params.pipelineId).toBe("pipe-1");
expect(call.query_params.cutoff).toEqual(new Date(now.getTime() - 30 * MS_PER_DAY));
});

it("uses the default 90-day horizon when the dataset has no policy", async () => {
mocks.isLakeEnabled.mockReturnValue(true);
const now = new Date("2026-06-08T00:00:00.000Z");

const r = await enforceDatasetRetention({ orgId: "org-1", pipelineId: "pipe-1", now });

expect(r?.coldDays).toBe(90);
expect(commandCall(0).query_params.cutoff).toEqual(new Date(now.getTime() - 90 * MS_PER_DAY));
});
});

describe("sweepLakeRetention", () => {
it("no-ops when the lake is disabled", async () => {
mocks.isLakeEnabled.mockReturnValue(false);

const r = await sweepLakeRetention();

expect(r).toEqual({ skipped: true, swept: 0, errors: 0 });
expect(mocks.findMany).not.toHaveBeenCalled();
expect(mocks.command).not.toHaveBeenCalled();
});

it("enforces each dataset's own effective horizon", async () => {
mocks.isLakeEnabled.mockReturnValue(true);
const now = new Date("2026-06-08T00:00:00.000Z");
mocks.findMany.mockResolvedValue([
{ organizationId: "org-1", pipelineId: "pipe-a", retentionPolicy: { hotDays: 1, coldDays: 7 } },
{ organizationId: "org-2", pipelineId: "pipe-b", retentionPolicy: null },
]);

const r = await sweepLakeRetention(now);

expect(r).toEqual({ skipped: false, swept: 2, errors: 0 });
expect(mocks.command).toHaveBeenCalledTimes(2);
// Dataset A: 7-day horizon for org-1/pipe-a.
expect(commandCall(0).query_params).toMatchObject({ orgId: "org-1", pipelineId: "pipe-a" });
expect(commandCall(0).query_params.cutoff).toEqual(new Date(now.getTime() - 7 * MS_PER_DAY));
// Dataset B: default 90-day horizon for org-2/pipe-b.
expect(commandCall(1).query_params).toMatchObject({ orgId: "org-2", pipelineId: "pipe-b" });
expect(commandCall(1).query_params.cutoff).toEqual(new Date(now.getTime() - 90 * MS_PER_DAY));
});

it("logs + counts a per-dataset failure and continues the sweep", async () => {
mocks.isLakeEnabled.mockReturnValue(true);
mocks.findMany.mockResolvedValue([
{ organizationId: "org-1", pipelineId: "pipe-a", retentionPolicy: null },
{ organizationId: "org-2", pipelineId: "pipe-b", retentionPolicy: null },
]);
mocks.command.mockRejectedValueOnce(new Error("clickhouse down")).mockResolvedValue(undefined);

const r = await sweepLakeRetention();

expect(r.skipped).toBe(false);
expect(r.errors).toBe(1);
expect(r.swept).toBe(1);
expect(mocks.command).toHaveBeenCalledTimes(2);
});
});
Loading
Loading