Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/lib/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ const runtimeEnvSchema = z
VF_LAKE_CLICKHOUSE_PASSWORD: z.string().optional(),
// Default DB mirrors DEFAULT_LAKE_DATABASE in the lake wrapper.
VF_LAKE_CLICKHOUSE_DATABASE: z.string().default("vectorflow_lake"),
// Connection-pool bounds for the lake ClickHouse client. createClient()
// otherwise opens unbounded sockets under load (the @clickhouse/client pool
// is per-client and we keep a single cached client). Cap concurrent sockets
// and fail slow requests rather than hang. Conservative defaults — raise
// VF_LAKE_CH_POOL_MAX for higher lake query/insert parallelism.
VF_LAKE_CH_POOL_MAX: z.coerce.number().int().positive().default(10),
VF_LAKE_CH_REQUEST_TIMEOUT_MS: z.coerce
.number()
.int()
.positive()
.default(30000),
// Cold tier (S3-backed). When VF_LAKE_S3_BUCKET is set the lake migration
// runner (scripts/lake-migrate.ts) applies a TTL move-to-cold +
// `storage_policy='vf_hot_cold'`; otherwise lake_events is a plain MergeTree
Expand Down
79 changes: 79 additions & 0 deletions src/server/services/lake/__tests__/clickhouse-pool.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";

// Mock the ClickHouse driver — @clickhouse/client is not installed in OSS dev
// and we must never open a real connection. createClient returns a distinct
// truthy object so the globalThis-cached singleton (getLakeClient) has
// something to cache.
const { createClientMock } = vi.hoisted(() => ({
createClientMock: vi.fn((_opts?: unknown) => ({
query: vi.fn(),
insert: vi.fn(),
command: vi.fn(),
ping: vi.fn(),
close: vi.fn(),
})),
}));
vi.mock("@clickhouse/client", () => ({ createClient: createClientMock }));

// Mock the centralized env so each test drives the *parsed* pool settings
// directly. The real env singleton is frozen at import, so exercising both the
// default and an override in one file would otherwise require re-importing the
// module — which the no-dynamic-import rule forbids. Mutating this hoisted
// object is the static-import-friendly equivalent. Initial values mirror the
// defaults declared in src/lib/env.ts (VF_LAKE_CH_POOL_MAX=10,
// VF_LAKE_CH_REQUEST_TIMEOUT_MS=30000).
const { envMock } = vi.hoisted(() => ({
envMock: { VF_LAKE_CH_POOL_MAX: 10, VF_LAKE_CH_REQUEST_TIMEOUT_MS: 30000 },
}));
vi.mock("@/lib/env", () => ({ env: envMock }));

import { getLakeClient } from "../clickhouse";

interface CreateClientOptions {
max_open_connections?: number;
request_timeout?: number;
keep_alive?: { enabled: boolean };
}

function firstCallOptions(): CreateClientOptions {
return (createClientMock.mock.calls[0]?.[0] ?? {}) as CreateClientOptions;
}

describe("lake clickhouse connection pool", () => {
beforeEach(() => {
// Reset the globalThis-cached singleton + the driver mock + parsed env.
delete (globalThis as unknown as { __vfLakeClient?: unknown }).__vfLakeClient;
createClientMock.mockClear();
envMock.VF_LAKE_CH_POOL_MAX = 10;
envMock.VF_LAKE_CH_REQUEST_TIMEOUT_MS = 30000;
process.env.VF_LAKE_CLICKHOUSE_URL = "http://clickhouse:8123";
});

afterEach(() => {
delete process.env.VF_LAKE_CLICKHOUSE_URL;
});

it("bounds the pool with the default max_open_connections from env", () => {
getLakeClient();
expect(createClientMock).toHaveBeenCalledTimes(1);
const opts = firstCallOptions();
expect(opts.max_open_connections).toBe(10);
expect(opts.request_timeout).toBe(30000);
// keep_alive is set explicitly so sockets are reused across requests.
expect(opts.keep_alive).toEqual({ enabled: true });
});

it("uses an overridden VF_LAKE_CH_POOL_MAX", () => {
envMock.VF_LAKE_CH_POOL_MAX = 25;
getLakeClient();
expect(createClientMock).toHaveBeenCalledTimes(1);
expect(firstCallOptions().max_open_connections).toBe(25);
});

it("caches the client — createClient runs once across two getLakeClient calls", () => {
const first = getLakeClient();
const second = getLakeClient();
expect(first).toBe(second);
expect(createClientMock).toHaveBeenCalledTimes(1);
});
});
11 changes: 11 additions & 0 deletions src/server/services/lake/clickhouse.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { createClient, type ClickHouseClient } from "@clickhouse/client";

import { env } from "@/lib/env";

/**
* VectorFlow Lake — thin ClickHouse connection layer (A1).
*
Expand Down Expand Up @@ -108,6 +110,15 @@ export function getLakeClient(): ClickHouseClient {
password: config.password,
database: config.database,
application: "vectorflow",
// Bound the per-client connection pool so the lake never opens unbounded
// sockets under load. We keep a single cached client (above), so this pool
// is the process-wide ceiling. Sourced from the centralized env module.
max_open_connections: env.VF_LAKE_CH_POOL_MAX,
// Fail slow lake requests instead of hanging a held connection.
request_timeout: env.VF_LAKE_CH_REQUEST_TIMEOUT_MS,
// Reuse sockets across requests (HTTP keep-alive) — default in
// @clickhouse/client@1.x; set explicitly to document the intent.
keep_alive: { enabled: true },
});
globalForLake.__vfLakeClient = client;
return client;
Expand Down
Loading