diff --git a/src/instrumentation.node.ts b/src/instrumentation.node.ts index 00837a47..bc0cd9bc 100644 --- a/src/instrumentation.node.ts +++ b/src/instrumentation.node.ts @@ -212,6 +212,19 @@ export async function registerNodeInstrumentation() { errorLog("instrumentation", "Failed to initialize lake alert scheduler", error); } + try { + // VectorFlow Lake: retention sweeper. Enforces each dataset's effective + // coldDays drop horizon (per-policy, not just the table default TTL). + // Leader-gated and a no-op when the lake is disabled (same contract as the + // alert scheduler). Best-effort — a scheduler init hiccup never blocks boot. + const { initLakeRetentionScheduler } = await import( + "@/server/services/lake/lake-retention" + ); + initLakeRetentionScheduler(); + } catch (error) { + errorLog("instrumentation", "Failed to initialize lake retention scheduler", error); + } + try { const { importLegacyBackups } = await import("@/server/services/backup"); const result = await importLegacyBackups(); diff --git a/src/server/routers/lake.ts b/src/server/routers/lake.ts index 20cc35ff..3b6953dd 100644 --- a/src/server/routers/lake.ts +++ b/src/server/routers/lake.ts @@ -5,6 +5,7 @@ import { withAudit } from "@/server/middleware/audit"; import { prisma } from "@/lib/prisma"; import type { Prisma } from "@/generated/prisma"; import { isLakeEnabled } from "@/server/services/lake/clickhouse"; +import { evaluateLakeQuota } from "@/server/services/lake/lake-quota"; import { LAKE_ALERT_COMPARATORS, testFireLakeAlertRule, @@ -213,6 +214,14 @@ export const lakeRouter = router({ * no team gate. */ status: protectedProcedure.query(() => ({ enabled: isLakeEnabled() })), + /** Per-org Lake byte-quota status — a read-only badge surface (mirrors the + * agents/pipelines quota badges). Unlimited in OSS (the default provider); + * a commercial tier provider returns a finite ceiling + over-quota flag. + * Org is resolved from the caller's session (`ctx.organizationId`), never + * from input, so it carries no tenant-id input and reads only the caller's + * own org. */ + quotaStatus: protectedProcedure.query(({ ctx }) => evaluateLakeQuota(ctx.organizationId)), + /** Scheduled threshold alerts over lake datasets. */ alert: lakeAlertRouter, diff --git a/src/server/services/lake/__tests__/lake-catalog.test.ts b/src/server/services/lake/__tests__/lake-catalog.test.ts index 68963c5b..28bb8585 100644 --- a/src/server/services/lake/__tests__/lake-catalog.test.ts +++ b/src/server/services/lake/__tests__/lake-catalog.test.ts @@ -1,6 +1,11 @@ import { describe, it, expect, vi, beforeEach } from "vitest"; import type { MetricsDataPoint, PreviousSnapshot } from "@/server/services/metrics-ingest"; +// @clickhouse/client is an optional native dep absent from some dev installs; +// stub it so the real clickhouse.ts loads (isLakeEnabled reads env only — this +// suite toggles VF_LAKE_CLICKHOUSE_URL rather than mocking the module). +vi.mock("@clickhouse/client", () => ({ createClient: vi.fn() })); + // Shared prisma mock, hoisted so the vi.mock factories can reference it. const { prismaMock } = vi.hoisted(() => ({ prismaMock: { diff --git a/src/server/services/lake/__tests__/lake-quota.test.ts b/src/server/services/lake/__tests__/lake-quota.test.ts new file mode 100644 index 00000000..aa291b57 --- /dev/null +++ b/src/server/services/lake/__tests__/lake-quota.test.ts @@ -0,0 +1,150 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; + +// Hoisted prisma mock. The lakeDataset surface includes mutation methods so we +// can assert the quota path NEVER drops/rewrites data (it is a soft signal). +const { prismaMock } = vi.hoisted(() => ({ + prismaMock: { + lakeDataset: { + aggregate: vi.fn(), + update: vi.fn(), + updateMany: vi.fn(), + delete: vi.fn(), + deleteMany: vi.fn(), + }, + }, +})); + +vi.mock("@/lib/prisma", () => ({ + prisma: prismaMock, + basePrisma: prismaMock, + adminPrisma: prismaMock, +})); + +vi.mock("@/lib/logger", () => ({ + errorLog: vi.fn(), + warnLog: vi.fn(), + infoLog: vi.fn(), + debugLog: vi.fn(), +})); + +import { warnLog } from "@/lib/logger"; +import { + checkLakeQuota, + evaluateLakeQuota, + setLakeQuotaProvider, + resetLakeQuotaProvider, + DefaultUnlimitedLakeQuotaProvider, + type LakeQuotaProvider, +} from "../lake-quota"; + +/** A provider that returns a fixed ceiling for every org. */ +class FixedLakeQuota implements LakeQuotaProvider { + constructor(private readonly bytes: bigint | null) {} + getLakeQuotaBytes(): bigint | null { + return this.bytes; + } +} + +beforeEach(() => { + vi.clearAllMocks(); +}); + +afterEach(() => { + resetLakeQuotaProvider(); +}); + +describe("checkLakeQuota (pure)", () => { + it("is under quota when current < ceiling", () => { + const r = checkLakeQuota("org-1", BigInt(50), BigInt(100)); + expect(r.overQuota).toBe(false); + expect(r.usageRatio).toBeCloseTo(0.5); + expect(r.quotaBytes).toBe(BigInt(100)); + expect(r.currentBytes).toBe(BigInt(50)); + }); + + it("is over quota when current > ceiling", () => { + const r = checkLakeQuota("org-1", BigInt(150), BigInt(100)); + expect(r.overQuota).toBe(true); + expect(r.usageRatio).toBeCloseTo(1.5); + }); + + it("is NOT over quota exactly at the ceiling (strict >)", () => { + const r = checkLakeQuota("org-1", BigInt(100), BigInt(100)); + expect(r.overQuota).toBe(false); + expect(r.usageRatio).toBeCloseTo(1); + }); + + it("treats a null ceiling as unlimited (never over quota)", () => { + const r = checkLakeQuota("org-1", BigInt("999999999999"), null); + expect(r.overQuota).toBe(false); + expect(r.quotaBytes).toBeNull(); + expect(r.usageRatio).toBeNull(); + }); + + it("a zero ceiling is over quota once any byte is stored", () => { + const over = checkLakeQuota("org-1", BigInt(1), BigInt(0)); + expect(over.overQuota).toBe(true); + expect(over.usageRatio).toBe(Number.POSITIVE_INFINITY); + + const empty = checkLakeQuota("org-1", BigInt(0), BigInt(0)); + expect(empty.overQuota).toBe(false); + expect(empty.usageRatio).toBe(0); + }); +}); + +describe("evaluateLakeQuota", () => { + it("OSS default is unlimited and does not read the catalog", async () => { + // Default provider (no override) — must short-circuit before any DB read. + setLakeQuotaProvider(new DefaultUnlimitedLakeQuotaProvider()); + + const r = await evaluateLakeQuota("org-1"); + + expect(r.quotaBytes).toBeNull(); + expect(r.overQuota).toBe(false); + expect(prismaMock.lakeDataset.aggregate).not.toHaveBeenCalled(); + }); + + it("sums the catalog and reports under quota without signalling", async () => { + setLakeQuotaProvider(new FixedLakeQuota(BigInt(1000))); + prismaMock.lakeDataset.aggregate.mockResolvedValue({ _sum: { byteCount: BigInt(500) } }); + + const r = await evaluateLakeQuota("org-1"); + + expect(prismaMock.lakeDataset.aggregate).toHaveBeenCalledWith({ + where: { organizationId: "org-1" }, + _sum: { byteCount: true }, + }); + expect(r.currentBytes).toBe(BigInt(500)); + expect(r.overQuota).toBe(false); + expect(vi.mocked(warnLog)).not.toHaveBeenCalled(); + }); + + it("fires a soft signal when over quota but NEVER drops or rewrites data", async () => { + setLakeQuotaProvider(new FixedLakeQuota(BigInt(100))); + prismaMock.lakeDataset.aggregate.mockResolvedValue({ _sum: { byteCount: BigInt(250) } }); + + const r = await evaluateLakeQuota("org-1"); + + expect(r.overQuota).toBe(true); + expect(r.currentBytes).toBe(BigInt(250)); + // Signal fired … + expect(vi.mocked(warnLog)).toHaveBeenCalledTimes(1); + expect(vi.mocked(warnLog).mock.calls[0][0]).toBe("lake-quota"); + // … and absolutely no data mutation (soft enforcement: read-only). + expect(prismaMock.lakeDataset.update).not.toHaveBeenCalled(); + expect(prismaMock.lakeDataset.updateMany).not.toHaveBeenCalled(); + expect(prismaMock.lakeDataset.delete).not.toHaveBeenCalled(); + expect(prismaMock.lakeDataset.deleteMany).not.toHaveBeenCalled(); + }); + + it("treats an empty catalog (null sum) as zero bytes", async () => { + setLakeQuotaProvider(new FixedLakeQuota(BigInt(100))); + prismaMock.lakeDataset.aggregate.mockResolvedValue({ _sum: { byteCount: null } }); + + const r = await evaluateLakeQuota("org-1"); + + expect(r.currentBytes).toBe(BigInt(0)); + expect(r.overQuota).toBe(false); + expect(vi.mocked(warnLog)).not.toHaveBeenCalled(); + }); +}); diff --git a/src/server/services/lake/__tests__/lake-retention.test.ts b/src/server/services/lake/__tests__/lake-retention.test.ts new file mode 100644 index 00000000..05db04cd --- /dev/null +++ b/src/server/services/lake/__tests__/lake-retention.test.ts @@ -0,0 +1,193 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +const { mocks } = vi.hoisted(() => ({ + mocks: { + isLakeEnabled: vi.fn(), + command: vi.fn(), + findMany: vi.fn(), + }, +})); + +vi.mock("@/server/services/lake/clickhouse", () => ({ + isLakeEnabled: mocks.isLakeEnabled, + getLakeClient: () => ({ command: mocks.command }), +})); + +vi.mock("@/lib/prisma", () => { + const client = { lakeDataset: { findMany: mocks.findMany } }; + return { prisma: client, basePrisma: client, adminPrisma: client }; +}); + +vi.mock("@/lib/logger", () => ({ + errorLog: vi.fn(), + warnLog: vi.fn(), + infoLog: vi.fn(), + debugLog: vi.fn(), +})); + +vi.mock("@/server/services/leader-election", () => ({ isLeader: vi.fn(() => true) })); + +import { + effectiveRetention, + buildLakeTtlClause, + enforceDatasetRetention, + sweepLakeRetention, + LAKE_DEFAULT_HOT_DAYS, + LAKE_DEFAULT_COLD_DAYS, +} from "../lake-retention"; + +const MS_PER_DAY = 24 * 60 * 60 * 1000; + +/** Extract the single object argument of the Nth `command()` call. */ +function commandCall(n: number): { + query: string; + query_params: { orgId: string; pipelineId: string; cutoff: Date }; +} { + return mocks.command.mock.calls[n][0] as { + query: string; + query_params: { orgId: string; pipelineId: string; cutoff: Date }; + }; +} + +beforeEach(() => { + vi.clearAllMocks(); + mocks.command.mockResolvedValue(undefined); +}); + +describe("effectiveRetention", () => { + it("falls back to the table defaults when there is no policy", () => { + expect(effectiveRetention(null)).toEqual({ + hotDays: LAKE_DEFAULT_HOT_DAYS, + coldDays: LAKE_DEFAULT_COLD_DAYS, + }); + expect(effectiveRetention(undefined)).toEqual({ hotDays: 7, coldDays: 90 }); + }); + + it("honours a per-dataset policy", () => { + expect(effectiveRetention({ hotDays: 3, coldDays: 30 })).toEqual({ hotDays: 3, coldDays: 30 }); + }); + + it("falls back on non-positive windows", () => { + expect(effectiveRetention({ hotDays: 0, coldDays: -5 })).toEqual({ hotDays: 7, coldDays: 90 }); + }); + + it("clamps coldDays up to hotDays so the drop horizon never precedes the move", () => { + expect(effectiveRetention({ hotDays: 30, coldDays: 10 })).toEqual({ hotDays: 30, coldDays: 30 }); + }); +}); + +describe("buildLakeTtlClause", () => { + it("cold-tier disabled → DELETE-only at coldDays (plain MergeTree)", () => { + const clause = buildLakeTtlClause({ hotDays: 7, coldDays: 90 }, false); + expect(clause).toBe("TTL toDateTime(timestamp) + INTERVAL 90 DAY DELETE"); + expect(clause).not.toContain("TO VOLUME 'cold'"); + expect(clause).not.toContain("storage_policy"); + }); + + it("cold-tier enabled → move-to-cold at hotDays + DELETE at coldDays + storage policy", () => { + const clause = buildLakeTtlClause({ hotDays: 7, coldDays: 90 }, true); + expect(clause).toContain("INTERVAL 7 DAY TO VOLUME 'cold'"); + expect(clause).toContain("INTERVAL 90 DAY DELETE"); + expect(clause).toContain("storage_policy = 'vf_hot_cold'"); + }); + + it("reflects a per-dataset window", () => { + const clause = buildLakeTtlClause(effectiveRetention({ hotDays: 1, coldDays: 14 }), false); + expect(clause).toBe("TTL toDateTime(timestamp) + INTERVAL 14 DAY DELETE"); + }); +}); + +describe("enforceDatasetRetention", () => { + it("no-ops when the lake is disabled (never connects)", async () => { + mocks.isLakeEnabled.mockReturnValue(false); + + const r = await enforceDatasetRetention({ orgId: "org-1", pipelineId: "pipe-1" }); + + expect(r).toBeNull(); + expect(mocks.command).not.toHaveBeenCalled(); + }); + + it("deletes events older than the policy's coldDays horizon, org+pipeline scoped", async () => { + mocks.isLakeEnabled.mockReturnValue(true); + const now = new Date("2026-06-08T00:00:00.000Z"); + + const r = await enforceDatasetRetention({ + orgId: "org-1", + pipelineId: "pipe-1", + policy: { hotDays: 3, coldDays: 30 }, + now, + }); + + expect(r).toEqual({ + pipelineId: "pipe-1", + coldDays: 30, + cutoff: new Date(now.getTime() - 30 * MS_PER_DAY).toISOString(), + }); + const call = commandCall(0); + expect(call.query).toContain("DELETE FROM lake_events"); + expect(call.query).toContain("organizationId = {orgId:String}"); + expect(call.query).toContain("pipelineId = {pipelineId:String}"); + expect(call.query).toContain("timestamp < {cutoff:DateTime64(3)}"); + expect(call.query_params.orgId).toBe("org-1"); + expect(call.query_params.pipelineId).toBe("pipe-1"); + expect(call.query_params.cutoff).toEqual(new Date(now.getTime() - 30 * MS_PER_DAY)); + }); + + it("uses the default 90-day horizon when the dataset has no policy", async () => { + mocks.isLakeEnabled.mockReturnValue(true); + const now = new Date("2026-06-08T00:00:00.000Z"); + + const r = await enforceDatasetRetention({ orgId: "org-1", pipelineId: "pipe-1", now }); + + expect(r?.coldDays).toBe(90); + expect(commandCall(0).query_params.cutoff).toEqual(new Date(now.getTime() - 90 * MS_PER_DAY)); + }); +}); + +describe("sweepLakeRetention", () => { + it("no-ops when the lake is disabled", async () => { + mocks.isLakeEnabled.mockReturnValue(false); + + const r = await sweepLakeRetention(); + + expect(r).toEqual({ skipped: true, swept: 0, errors: 0 }); + expect(mocks.findMany).not.toHaveBeenCalled(); + expect(mocks.command).not.toHaveBeenCalled(); + }); + + it("enforces each dataset's own effective horizon", async () => { + mocks.isLakeEnabled.mockReturnValue(true); + const now = new Date("2026-06-08T00:00:00.000Z"); + mocks.findMany.mockResolvedValue([ + { organizationId: "org-1", pipelineId: "pipe-a", retentionPolicy: { hotDays: 1, coldDays: 7 } }, + { organizationId: "org-2", pipelineId: "pipe-b", retentionPolicy: null }, + ]); + + const r = await sweepLakeRetention(now); + + expect(r).toEqual({ skipped: false, swept: 2, errors: 0 }); + expect(mocks.command).toHaveBeenCalledTimes(2); + // Dataset A: 7-day horizon for org-1/pipe-a. + expect(commandCall(0).query_params).toMatchObject({ orgId: "org-1", pipelineId: "pipe-a" }); + expect(commandCall(0).query_params.cutoff).toEqual(new Date(now.getTime() - 7 * MS_PER_DAY)); + // Dataset B: default 90-day horizon for org-2/pipe-b. + expect(commandCall(1).query_params).toMatchObject({ orgId: "org-2", pipelineId: "pipe-b" }); + expect(commandCall(1).query_params.cutoff).toEqual(new Date(now.getTime() - 90 * MS_PER_DAY)); + }); + + it("logs + counts a per-dataset failure and continues the sweep", async () => { + mocks.isLakeEnabled.mockReturnValue(true); + mocks.findMany.mockResolvedValue([ + { organizationId: "org-1", pipelineId: "pipe-a", retentionPolicy: null }, + { organizationId: "org-2", pipelineId: "pipe-b", retentionPolicy: null }, + ]); + mocks.command.mockRejectedValueOnce(new Error("clickhouse down")).mockResolvedValue(undefined); + + const r = await sweepLakeRetention(); + + expect(r.skipped).toBe(false); + expect(r.errors).toBe(1); + expect(r.swept).toBe(1); + expect(mocks.command).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/server/services/lake/lake-catalog.ts b/src/server/services/lake/lake-catalog.ts index ade342b0..75ff6d4c 100644 --- a/src/server/services/lake/lake-catalog.ts +++ b/src/server/services/lake/lake-catalog.ts @@ -4,6 +4,7 @@ import { withOrgTx } from "@/lib/with-org-tx"; import { errorLog } from "@/lib/logger"; import { configHasLakeSink, LAKE_SINK_TYPE } from "@/lib/vector/lake-sink"; import { isLakeEnabled } from "./clickhouse"; +import { evaluateLakeQuota } from "./lake-quota"; import { clamp, splitLakeOutput, @@ -246,6 +247,18 @@ export async function updateLakeCatalogFromHeartbeat( errorLog("lake-catalog", `Failed to update lake catalog for pipeline ${pipeline.id}`, err); } } + + // Soft per-org Lake byte-quota signal: after folding this heartbeat's writes + // into the catalog, re-evaluate cumulative usage against the org's byte + // ceiling. Best-effort — a quota-eval failure must never block the catalog + // update — and it never drops data (over-quota only logs + surfaces). Free in + // OSS: the default provider returns "unlimited" and short-circuits before any + // extra query. + try { + await evaluateLakeQuota(orgId); + } catch (err) { + errorLog("lake-catalog", `Failed to evaluate lake quota for org ${orgId}`, err); + } } function configYamlHasLakeSink(configYaml: string): boolean { diff --git a/src/server/services/lake/lake-quota.ts b/src/server/services/lake/lake-quota.ts new file mode 100644 index 00000000..447b2179 --- /dev/null +++ b/src/server/services/lake/lake-quota.ts @@ -0,0 +1,143 @@ +import { prisma } from "@/lib/prisma"; +import { warnLog } from "@/lib/logger"; + +/** + * Per-organization VectorFlow Lake BYTE quota — a SOFT signal, never a hard drop. + * + * # Why soft + * + * The lake stores customer observability data in ClickHouse; silently dropping + * events because an org crossed a byte ceiling is data-loss-unsafe. Enforcement + * here is therefore advisory: when an org is over quota we emit a structured + * warning (ops / Sentry surface) and expose the over-quota state for a read-only + * UI badge (`lake.quotaStatus`). Retention (TTL — see `lake-retention.ts`) is the + * mechanism that actually bounds storage; the quota only signals. + * + * # Provider seam (mirrors `src/server/services/quotas.ts`) + * + * The byte ceiling is supplied by an injectable `LakeQuotaProvider`. OSS ships + * `DefaultUnlimitedLakeQuotaProvider` (every org → unlimited), so a self-hosted + * deployment is unmetered by default — exactly like `DefaultUnboundedQuotaPolicy` + * for agents/pipelines/environments. A commercial deployment registers its own + * provider at startup via `setLakeQuotaProvider(...)` to map an org's tier to a + * finite byte cap. The provider is keyed by `orgId` (the cloud resolves the org's + * tier → bytes); implementations MUST be synchronous and cheap (a constant or a + * cached tier lookup, no I/O) because `evaluateLakeQuota` runs on the ingest path. + */ + +/** Provider interface — a deployment may register a commercial tier overlay. */ +export interface LakeQuotaProvider { + /** + * Per-org Lake byte ceiling. `null` = unlimited (no quota enforcement, the OSS + * default). MUST be synchronous and free of I/O — called on the ingest path. + */ + getLakeQuotaBytes(orgId: string): bigint | null; +} + +/** + * Default OSS provider: every org is unlimited. Self-hosted deployments are + * unmetered by default. To enforce a Lake byte ceiling, register a custom + * provider at startup via `setLakeQuotaProvider(...)`. + */ +export class DefaultUnlimitedLakeQuotaProvider implements LakeQuotaProvider { + getLakeQuotaBytes(_orgId: string): bigint | null { + return null; + } +} + +let activeProvider: LakeQuotaProvider = new DefaultUnlimitedLakeQuotaProvider(); + +/** + * Replace the active Lake quota provider. Intended to be called once at startup + * by the deployment bootstrap. Returns the previous provider so a test can + * restore it in `afterEach`. + */ +export function setLakeQuotaProvider(provider: LakeQuotaProvider): LakeQuotaProvider { + const prev = activeProvider; + activeProvider = provider; + return prev; +} + +/** Inspect the currently-registered provider. Mostly for tests. */ +export function getLakeQuotaProvider(): LakeQuotaProvider { + return activeProvider; +} + +/** Reset to the OSS default. Provided for `afterEach` test cleanup. */ +export function resetLakeQuotaProvider(): void { + activeProvider = new DefaultUnlimitedLakeQuotaProvider(); +} + +export interface LakeQuotaResult { + organizationId: string; + /** Current cumulative lake bytes for the org (sum of the catalog's byteCount). */ + currentBytes: bigint; + /** Byte ceiling; `null` = unlimited. */ + quotaBytes: bigint | null; + /** True iff `currentBytes > quotaBytes` (always false when unlimited). */ + overQuota: boolean; + /** Fraction of the quota used (0..1+); `null` when unlimited. Display-only. */ + usageRatio: number | null; +} + +/** + * PURE quota check — no I/O. `quotaBytes === null` means unlimited, so the org is + * never over quota. `usageRatio` is computed via `bigint → number` and is + * approximate above 2^53 bytes (~9 PB); it is a display value, not the gate. + */ +export function checkLakeQuota( + orgId: string, + currentBytes: bigint, + quotaBytes: bigint | null, +): LakeQuotaResult { + if (quotaBytes === null) { + return { + organizationId: orgId, + currentBytes, + quotaBytes: null, + overQuota: false, + usageRatio: null, + }; + } + const overQuota = currentBytes > quotaBytes; + let usageRatio: number; + if (quotaBytes > BigInt(0)) { + usageRatio = Number(currentBytes) / Number(quotaBytes); + } else { + // A zero ceiling: any stored bytes is infinitely over; zero bytes is exactly at. + usageRatio = currentBytes > BigInt(0) ? Number.POSITIVE_INFINITY : 0; + } + return { organizationId: orgId, currentBytes, quotaBytes, overQuota, usageRatio }; +} + +/** + * Resolve the org's Lake byte ceiling from the active provider, sum the org's + * lake bytes from the Postgres catalog (no ClickHouse read) and evaluate. + * + * SOFT: emits a warning when over quota — it never drops, rejects, or rolls back + * data. Returns the result for read-only surfacing (UI badge / ops). When the + * provider returns `null` (the OSS default) it short-circuits to an unlimited + * result WITHOUT touching the database, so the unmetered path is free. + */ +export async function evaluateLakeQuota(orgId: string): Promise { + const quotaBytes = activeProvider.getLakeQuotaBytes(orgId); + if (quotaBytes === null) { + return checkLakeQuota(orgId, BigInt(0), null); + } + + const agg = await prisma.lakeDataset.aggregate({ + where: { organizationId: orgId }, + _sum: { byteCount: true }, + }); + const currentBytes = agg._sum.byteCount ?? BigInt(0); + + const result = checkLakeQuota(orgId, currentBytes, quotaBytes); + if (result.overQuota) { + warnLog( + "lake-quota", + `Lake byte quota exceeded for org ${orgId}: ${currentBytes}/${quotaBytes} bytes ` + + `(soft signal — data retained; reduce retention or raise the quota)`, + ); + } + return result; +} diff --git a/src/server/services/lake/lake-retention.ts b/src/server/services/lake/lake-retention.ts new file mode 100644 index 00000000..2ad3655e --- /dev/null +++ b/src/server/services/lake/lake-retention.ts @@ -0,0 +1,250 @@ +import { adminPrisma } from "@/lib/prisma"; +import { debugLog, infoLog, errorLog } from "@/lib/logger"; +import { isLeader } from "@/server/services/leader-election"; +import type { LakeRetentionPolicy } from "@/generated/prisma"; +import { getLakeClient, isLakeEnabled } from "@/server/services/lake/clickhouse"; + +/** + * VectorFlow Lake — retention enforcement (CL-9). + * + * `LakeRetentionPolicy(hotDays, coldDays)` was attached to datasets but never + * enforced beyond the table-level default TTL baked into the base DDL. This + * module turns the policy into the dataset's *effective* retention window and + * enforces it two ways: + * + * 1. `effectiveRetention` + `buildLakeTtlClause` compute the TTL clause from a + * policy (falling back to the table defaults). The migration runner uses + * these for the base `lake_events` TTL, so the table default now flows + * through the SAME code path a per-dataset window would. + * 2. `enforceDatasetRetention` applies a dataset's `coldDays` as a hard DROP + * horizon in ClickHouse — a bounded, org+pipeline-scoped DELETE of events + * older than `now - coldDays`. This catches datasets whose policy is SHORTER + * than the table TTL (which would otherwise keep their rows until the global + * 90-day delete). `sweepLakeRetention` runs it across every catalog dataset + * on a coarse leader-gated cadence. + * + * Retention deletion is the intended lifecycle of stored data (unlike the Lake + * BYTE quota in `lake-quota.ts`, which is a soft signal and never drops). All + * ClickHouse access goes through the shared lake client so it is mockable. + */ + +/** ClickHouse events table — unqualified so it resolves against the lake + * connection's default database (VF_LAKE_CLICKHOUSE_DATABASE). */ +const LAKE_EVENTS_TABLE = "lake_events"; + +/** Table-default retention windows. Mirror the `LakeRetentionPolicy` schema + * defaults; used when a dataset has no attached policy. */ +export const LAKE_DEFAULT_HOT_DAYS = 7; +export const LAKE_DEFAULT_COLD_DAYS = 90; + +const MS_PER_DAY = 24 * 60 * 60 * 1000; + +/** Coarse sweep cadence — retention is a slow lifecycle, so daily is plenty and + * keeps ClickHouse mutation churn low. */ +const RETENTION_SWEEP_INTERVAL_MS = 24 * 60 * 60 * 1000; + +export interface EffectiveRetention { + /** Days kept in the hot MergeTree tier before TTL-move to the cold disk. */ + hotDays: number; + /** Total retention before TTL-delete (the drop horizon). Always >= hotDays. */ + coldDays: number; +} + +/** A retention policy is only the two windows we act on (accepts the full model). */ +export type RetentionPolicyWindows = Pick; + +/** + * Compute the effective retention window for a dataset. Falls back to the table + * defaults (7/90) when the dataset has no attached policy. Defends against a + * malformed policy: non-positive windows fall back, and `coldDays` is clamped up + * to `hotDays` so the drop horizon can never precede the hot→cold move. + */ +export function effectiveRetention( + policy?: RetentionPolicyWindows | null, +): EffectiveRetention { + const hotDays = policy && policy.hotDays > 0 ? policy.hotDays : LAKE_DEFAULT_HOT_DAYS; + const coldDaysRaw = + policy && policy.coldDays > 0 ? policy.coldDays : LAKE_DEFAULT_COLD_DAYS; + const coldDays = Math.max(coldDaysRaw, hotDays); + return { hotDays, coldDays }; +} + +/** + * Build the ClickHouse `TTL` clause for `lake_events` from an effective window. + * Cold-tier enabled → move-to-cold at `hotDays` + DELETE at `coldDays` + the + * hot/cold storage policy; cold-tier disabled → DELETE at `coldDays` only (plain + * MergeTree). Shared by the base migration and any per-dataset tiering path so a + * single function owns the TTL shape. + */ +export function buildLakeTtlClause( + retention: EffectiveRetention, + coldTierEnabled: boolean, +): string { + return coldTierEnabled + ? `TTL toDateTime(timestamp) + INTERVAL ${retention.hotDays} DAY TO VOLUME 'cold', ` + + `toDateTime(timestamp) + INTERVAL ${retention.coldDays} DAY DELETE\n` + + `SETTINGS storage_policy = 'vf_hot_cold'` + : `TTL toDateTime(timestamp) + INTERVAL ${retention.coldDays} DAY DELETE`; +} + +export interface LakeRetentionSweepItem { + pipelineId: string; + coldDays: number; + /** ISO cutoff: events strictly older than this were targeted for deletion. */ + cutoff: string; +} + +export interface DatasetRetentionTarget { + orgId: string; + pipelineId: string; + policy?: RetentionPolicyWindows | null; + /** Override "now" (tests). Defaults to the wall clock. */ + now?: Date; +} + +/** + * Enforce a single dataset's effective `coldDays` as a DROP horizon: issue a + * bounded, org+pipeline-scoped ClickHouse DELETE for events older than + * `now - coldDays`. Returns the computed window + cutoff for logging. No-op + * (returns `null`) when the lake is disabled. The org/pipeline scope and cutoff + * are bound parameters, never interpolated. + */ +export async function enforceDatasetRetention( + target: DatasetRetentionTarget, +): Promise { + if (!isLakeEnabled()) return null; + + const { coldDays } = effectiveRetention(target.policy); + const now = target.now ?? new Date(); + const cutoff = new Date(now.getTime() - coldDays * MS_PER_DAY); + + await getLakeClient().command({ + query: + `DELETE FROM ${LAKE_EVENTS_TABLE} ` + + `WHERE organizationId = {orgId:String} ` + + `AND pipelineId = {pipelineId:String} ` + + `AND timestamp < {cutoff:DateTime64(3)}`, + query_params: { + orgId: target.orgId, + pipelineId: target.pipelineId, + cutoff, + }, + }); + + return { pipelineId: target.pipelineId, coldDays, cutoff: cutoff.toISOString() }; +} + +export interface LakeRetentionSweepResult { + /** True when the lake is disabled and nothing was swept. */ + skipped: boolean; + /** Number of datasets whose drop horizon was enforced. */ + swept: number; + /** Number of datasets that errored (logged + skipped, sweep continues). */ + errors: number; +} + +/** + * Enforce retention across every catalog dataset, applying each dataset's + * effective `coldDays` drop horizon. Cross-tenant read via `adminPrisma` (the + * ClickHouse DELETE is org+pipeline scoped by bound params); a per-dataset error + * is logged and skipped so one bad dataset never stalls the sweep. No-op when the + * lake is disabled. + */ +export async function sweepLakeRetention( + now: Date = new Date(), +): Promise { + const result: LakeRetentionSweepResult = { skipped: true, swept: 0, errors: 0 }; + if (!isLakeEnabled()) return result; + result.skipped = false; + + let datasets: Array<{ + organizationId: string; + pipelineId: string; + retentionPolicy: RetentionPolicyWindows | null; + }>; + try { + datasets = await adminPrisma.lakeDataset.findMany({ + select: { + organizationId: true, + pipelineId: true, + retentionPolicy: { select: { hotDays: true, coldDays: true } }, + }, + }); + } catch (err) { + errorLog("lake-retention", "Failed to list lake datasets (skipping sweep)", err); + return result; + } + + for (const ds of datasets) { + try { + await enforceDatasetRetention({ + orgId: ds.organizationId, + pipelineId: ds.pipelineId, + policy: ds.retentionPolicy, + now, + }); + result.swept += 1; + } catch (err) { + result.errors += 1; + errorLog( + "lake-retention", + `Retention sweep failed for org ${ds.organizationId} pipeline ${ds.pipelineId} (continuing)`, + err, + ); + } + } + return result; +} + +// ── Scheduler ──────────────────────────────────────────────────────────────── +let timer: NodeJS.Timeout | null = null; +let tickInFlight = false; + +async function tick(): Promise { + // Re-check leadership each tick (mirrors lake-alerts): a demoted leader's + // setInterval keeps firing for up to one lease TTL; guarding here stops the + // old + new leader both sweeping (the DELETEs are idempotent, but this avoids + // doubling ClickHouse mutation load). + if (!isLeader()) { + debugLog("lake-retention", "Skipping sweep — instance is no longer leader"); + return; + } + if (tickInFlight) return; // setInterval does not skip overlapping callbacks + tickInFlight = true; + try { + const r = await sweepLakeRetention(); + if (!r.skipped && (r.swept > 0 || r.errors > 0)) { + infoLog("lake-retention", `sweep: enforced=${r.swept} errors=${r.errors}`); + } + } catch (err) { + errorLog("lake-retention", "sweep failed", err); + } finally { + tickInFlight = false; + } +} + +/** + * Start the leader-gated lake retention sweeper. No-op when the lake is disabled, + * so enabling the lake is env-only (matches `runLakeMigrations` / + * `initLakeAlertScheduler`). Idempotent. + */ +export function initLakeRetentionScheduler(): void { + if (!isLakeEnabled()) { + infoLog("lake-retention", "Lake disabled — retention sweeper not started"); + return; + } + if (timer) return; + timer = setInterval(() => void tick(), RETENTION_SWEEP_INTERVAL_MS); + timer.unref(); + infoLog( + "lake-retention", + `Retention sweeper started (every ${RETENTION_SWEEP_INTERVAL_MS / (60 * 60 * 1000)}h)`, + ); +} + +export function stopLakeRetentionScheduler(): void { + if (timer) { + clearInterval(timer); + timer = null; + } +} diff --git a/src/server/services/lake/migrate.ts b/src/server/services/lake/migrate.ts index a8696b7d..573159bf 100644 --- a/src/server/services/lake/migrate.ts +++ b/src/server/services/lake/migrate.ts @@ -33,12 +33,7 @@ import { getLakeClient, getLakeConfig, } from "@/server/services/lake/clickhouse"; - -// Default hot/cold retention windows for the base DDL. Mirror the -// LakeRetentionPolicy defaults (hotDays=7, coldDays=90); per-dataset retention -// is governed by that catalog model at a higher layer. -const HOT_DAYS = 7; -const COLD_DAYS = 90; +import { effectiveRetention, buildLakeTtlClause } from "@/server/services/lake/lake-retention"; export interface LakeMigrationResult { /** True when the lake is disabled and nothing was applied. */ @@ -63,11 +58,10 @@ export async function runLakeMigrations(): Promise { } const config = getLakeConfig(); - const ttlClause = isLakeColdTierEnabled() - ? `TTL toDateTime(timestamp) + INTERVAL ${HOT_DAYS} DAY TO VOLUME 'cold', ` + - `toDateTime(timestamp) + INTERVAL ${COLD_DAYS} DAY DELETE\n` + - `SETTINGS storage_policy = 'vf_hot_cold'` - : `TTL toDateTime(timestamp) + INTERVAL ${COLD_DAYS} DAY DELETE`; + // The base table TTL flows through the same per-dataset retention helpers, so + // the table default (LAKE_DEFAULT_HOT/COLD_DAYS = 7/90) and any per-dataset + // window share one TTL-clause shape. No policy here → the defaults. + const ttlClause = buildLakeTtlClause(effectiveRetention(null), isLakeColdTierEnabled()); const client = getLakeClient();