diff --git a/src/lib/env.ts b/src/lib/env.ts index cbd2ebee..243d7354 100644 --- a/src/lib/env.ts +++ b/src/lib/env.ts @@ -100,6 +100,17 @@ const runtimeEnvSchema = z VF_LAKE_CLICKHOUSE_PASSWORD: z.string().optional(), // Default DB mirrors DEFAULT_LAKE_DATABASE in the lake wrapper. VF_LAKE_CLICKHOUSE_DATABASE: z.string().default("vectorflow_lake"), + // Connection-pool bounds for the lake ClickHouse client. createClient() + // otherwise opens unbounded sockets under load (the @clickhouse/client pool + // is per-client and we keep a single cached client). Cap concurrent sockets + // and fail slow requests rather than hang. Conservative defaults — raise + // VF_LAKE_CH_POOL_MAX for higher lake query/insert parallelism. + VF_LAKE_CH_POOL_MAX: z.coerce.number().int().positive().default(10), + VF_LAKE_CH_REQUEST_TIMEOUT_MS: z.coerce + .number() + .int() + .positive() + .default(30000), // Cold tier (S3-backed). When VF_LAKE_S3_BUCKET is set the lake migration // runner (scripts/lake-migrate.ts) applies a TTL move-to-cold + // `storage_policy='vf_hot_cold'`; otherwise lake_events is a plain MergeTree diff --git a/src/server/services/lake/__tests__/clickhouse-pool.test.ts b/src/server/services/lake/__tests__/clickhouse-pool.test.ts new file mode 100644 index 00000000..5d439807 --- /dev/null +++ b/src/server/services/lake/__tests__/clickhouse-pool.test.ts @@ -0,0 +1,79 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; + +// Mock the ClickHouse driver — @clickhouse/client is not installed in OSS dev +// and we must never open a real connection. createClient returns a distinct +// truthy object so the globalThis-cached singleton (getLakeClient) has +// something to cache. +const { createClientMock } = vi.hoisted(() => ({ + createClientMock: vi.fn((_opts?: unknown) => ({ + query: vi.fn(), + insert: vi.fn(), + command: vi.fn(), + ping: vi.fn(), + close: vi.fn(), + })), +})); +vi.mock("@clickhouse/client", () => ({ createClient: createClientMock })); + +// Mock the centralized env so each test drives the *parsed* pool settings +// directly. The real env singleton is frozen at import, so exercising both the +// default and an override in one file would otherwise require re-importing the +// module — which the no-dynamic-import rule forbids. Mutating this hoisted +// object is the static-import-friendly equivalent. Initial values mirror the +// defaults declared in src/lib/env.ts (VF_LAKE_CH_POOL_MAX=10, +// VF_LAKE_CH_REQUEST_TIMEOUT_MS=30000). +const { envMock } = vi.hoisted(() => ({ + envMock: { VF_LAKE_CH_POOL_MAX: 10, VF_LAKE_CH_REQUEST_TIMEOUT_MS: 30000 }, +})); +vi.mock("@/lib/env", () => ({ env: envMock })); + +import { getLakeClient } from "../clickhouse"; + +interface CreateClientOptions { + max_open_connections?: number; + request_timeout?: number; + keep_alive?: { enabled: boolean }; +} + +function firstCallOptions(): CreateClientOptions { + return (createClientMock.mock.calls[0]?.[0] ?? {}) as CreateClientOptions; +} + +describe("lake clickhouse connection pool", () => { + beforeEach(() => { + // Reset the globalThis-cached singleton + the driver mock + parsed env. + delete (globalThis as unknown as { __vfLakeClient?: unknown }).__vfLakeClient; + createClientMock.mockClear(); + envMock.VF_LAKE_CH_POOL_MAX = 10; + envMock.VF_LAKE_CH_REQUEST_TIMEOUT_MS = 30000; + process.env.VF_LAKE_CLICKHOUSE_URL = "http://clickhouse:8123"; + }); + + afterEach(() => { + delete process.env.VF_LAKE_CLICKHOUSE_URL; + }); + + it("bounds the pool with the default max_open_connections from env", () => { + getLakeClient(); + expect(createClientMock).toHaveBeenCalledTimes(1); + const opts = firstCallOptions(); + expect(opts.max_open_connections).toBe(10); + expect(opts.request_timeout).toBe(30000); + // keep_alive is set explicitly so sockets are reused across requests. + expect(opts.keep_alive).toEqual({ enabled: true }); + }); + + it("uses an overridden VF_LAKE_CH_POOL_MAX", () => { + envMock.VF_LAKE_CH_POOL_MAX = 25; + getLakeClient(); + expect(createClientMock).toHaveBeenCalledTimes(1); + expect(firstCallOptions().max_open_connections).toBe(25); + }); + + it("caches the client — createClient runs once across two getLakeClient calls", () => { + const first = getLakeClient(); + const second = getLakeClient(); + expect(first).toBe(second); + expect(createClientMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/server/services/lake/clickhouse.ts b/src/server/services/lake/clickhouse.ts index 1c34ab94..f0e8f120 100644 --- a/src/server/services/lake/clickhouse.ts +++ b/src/server/services/lake/clickhouse.ts @@ -1,5 +1,7 @@ import { createClient, type ClickHouseClient } from "@clickhouse/client"; +import { env } from "@/lib/env"; + /** * VectorFlow Lake — thin ClickHouse connection layer (A1). * @@ -108,6 +110,15 @@ export function getLakeClient(): ClickHouseClient { password: config.password, database: config.database, application: "vectorflow", + // Bound the per-client connection pool so the lake never opens unbounded + // sockets under load. We keep a single cached client (above), so this pool + // is the process-wide ceiling. Sourced from the centralized env module. + max_open_connections: env.VF_LAKE_CH_POOL_MAX, + // Fail slow lake requests instead of hanging a held connection. + request_timeout: env.VF_LAKE_CH_REQUEST_TIMEOUT_MS, + // Reuse sockets across requests (HTTP keep-alive) — default in + // @clickhouse/client@1.x; set explicitly to document the intent. + keep_alive: { enabled: true }, }); globalForLake.__vfLakeClient = client; return client;