From e5505d70f1556cafca20376f112487c4571a8162 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Mon, 8 Jun 2026 13:17:01 +0100 Subject: [PATCH] feat(transform-eval): bound vector subprocess concurrency per org Add src/lib/org-concurrency.ts: an in-process per-(org,key) async semaphore (withOrgConcurrencyLimit) that caps concurrent fn executions, queues the rest FIFO, and always releases on success/throw. A missing/empty orgId falls back to a shared "global" bucket; idle buckets are pruned. Thread an optional orgId into evaluateVrl(): when provided, the vector subprocess spawn runs under withOrgConcurrencyLimit(orgId, "vector-eval", 4) so one tenant's concurrent VRL evals (live-tap, cost what-if, unit-test "run all", AI propose auto-fix loop) can no longer spawn unbounded vector processes and starve the shared host. The no-op fast path never takes a slot, and behavior is identical when orgId is omitted (backward-compatible). Pass ctx/input organizationId at the org-scoped call sites: vrl runUnitTests/runPipelineUnitTests/testAgainstCapture, tap-capture testTransform, cost-model simulateTransform, and proposed-change VRL validation. --- src/lib/__tests__/org-concurrency.test.ts | 161 ++++++++++++++++++ src/lib/org-concurrency.ts | 94 ++++++++++ .../routers/__tests__/tap-capture.test.ts | 2 +- .../__tests__/vrl-pipeline-unit-test.test.ts | 6 +- .../routers/__tests__/vrl-unit-test.test.ts | 2 +- src/server/routers/proposed-change.ts | 4 +- src/server/routers/tap-capture.ts | 2 +- src/server/routers/vrl.ts | 11 +- .../cost-recommendation-procedures.ts | 2 +- src/server/services/transform-eval.ts | 72 ++++++-- 10 files changed, 327 insertions(+), 29 deletions(-) create mode 100644 src/lib/__tests__/org-concurrency.test.ts create mode 100644 src/lib/org-concurrency.ts diff --git a/src/lib/__tests__/org-concurrency.test.ts b/src/lib/__tests__/org-concurrency.test.ts new file mode 100644 index 00000000..8e45afe2 --- /dev/null +++ b/src/lib/__tests__/org-concurrency.test.ts @@ -0,0 +1,161 @@ +import { describe, it, expect } from "vitest"; +import { withOrgConcurrencyLimit } from "../org-concurrency"; + +/** + * Drain the microtask queue so all pending acquire/handoff continuations settle + * before we assert. The limiter is microtask-only, so a single macrotask tick + * (setImmediate) runs strictly after every queued handoff. + */ +function flush(): Promise { + const { promise, resolve } = Promise.withResolvers(); + setImmediate(resolve); + return promise; +} + +describe("withOrgConcurrencyLimit", () => { + it("never runs more than `max` tasks at once for one org, draining FIFO", async () => { + const gates = Array.from({ length: 5 }, () => Promise.withResolvers()); + let running = 0; + let peak = 0; + const startOrder: number[] = []; + + const tasks = gates.map((gate, i) => + withOrgConcurrencyLimit("org-cap", "k", 2, async () => { + running++; + peak = Math.max(peak, running); + startOrder.push(i); + await gate.promise; + running--; + }), + ); + + // Only the first two acquire a slot; tasks 2..4 queue. + await flush(); + expect(running).toBe(2); + expect(startOrder).toEqual([0, 1]); + + // Each released slot admits exactly the next FIFO waiter — never a 3rd at once. + gates[0].resolve(); + await flush(); + expect(running).toBe(2); + expect(startOrder).toEqual([0, 1, 2]); + + gates[1].resolve(); + await flush(); + expect(running).toBe(2); + expect(startOrder).toEqual([0, 1, 2, 3]); + + gates[2].resolve(); + await flush(); + expect(startOrder).toEqual([0, 1, 2, 3, 4]); + + gates[3].resolve(); + gates[4].resolve(); + await Promise.all(tasks); + + expect(peak).toBe(2); + }); + + it("isolates the limit per org — a saturated org never blocks another", async () => { + const aGate = Promise.withResolvers(); + let aFirstRunning = false; + let aSecondRan = false; + let bRan = false; + + // org-a (cap 1) is held open by its first task. + const aFirst = withOrgConcurrencyLimit("org-a", "k", 1, async () => { + aFirstRunning = true; + await aGate.promise; + }); + await flush(); + expect(aFirstRunning).toBe(true); + + // A second org-a task must queue (cap reached) ... + const aSecond = withOrgConcurrencyLimit("org-a", "k", 1, async () => { + aSecondRan = true; + }); + // ... while org-b is independent and runs immediately. + const bTask = withOrgConcurrencyLimit("org-b", "k", 1, async () => { + bRan = true; + }); + + await flush(); + expect(bRan).toBe(true); // org-b unaffected by org-a's saturation + expect(aSecondRan).toBe(false); // org-a still at its cap + + aGate.resolve(); + await Promise.all([aFirst, aSecond, bTask]); + expect(aSecondRan).toBe(true); + }); + + it("isolates different keys within the same org", async () => { + const gate = Promise.withResolvers(); + let key2Ran = false; + + const key1 = withOrgConcurrencyLimit("org-keys", "key-1", 1, async () => { + await gate.promise; + }); + await flush(); + + const key2 = withOrgConcurrencyLimit("org-keys", "key-2", 1, async () => { + key2Ran = true; + }); + await flush(); + expect(key2Ran).toBe(true); // distinct key → its own slot, not blocked by key-1 + + gate.resolve(); + await Promise.all([key1, key2]); + }); + + it("releases the slot when fn throws, admitting the next waiter", async () => { + const gate = Promise.withResolvers(); + let secondRan = false; + + const first = withOrgConcurrencyLimit("org-throw", "k", 1, async () => { + await gate.promise; + throw new Error("boom"); + }); + first.catch(() => {}); // handled below via expect().rejects; avoid unhandled-rejection noise + + const second = withOrgConcurrencyLimit("org-throw", "k", 1, async () => { + secondRan = true; + }); + + await flush(); + expect(secondRan).toBe(false); // queued behind the still-running first + + gate.resolve(); + await expect(first).rejects.toThrow("boom"); + + await second; + expect(secondRan).toBe(true); // slot was released despite the throw + }); + + it("routes a missing/empty orgId to a shared 'global' bucket", async () => { + const gate = Promise.withResolvers(); + let firstRunning = false; + let secondRan = false; + + const first = withOrgConcurrencyLimit("", "global-key", 1, async () => { + firstRunning = true; + await gate.promise; + }); + // Whitespace-only orgId collapses to the same "global" bucket as "". + const second = withOrgConcurrencyLimit(" ", "global-key", 1, async () => { + secondRan = true; + }); + + await flush(); + expect(firstRunning).toBe(true); + expect(secondRan).toBe(false); // shares first's bucket → must queue + + gate.resolve(); + await Promise.all([first, second]); + expect(secondRan).toBe(true); + }); + + it("returns fn's resolved value", async () => { + const value = await withOrgConcurrencyLimit("org-ret", "k", 2, async () => 42); + expect(value).toBe(42); + }); +}); diff --git a/src/lib/org-concurrency.ts b/src/lib/org-concurrency.ts new file mode 100644 index 00000000..105ee410 --- /dev/null +++ b/src/lib/org-concurrency.ts @@ -0,0 +1,94 @@ +/** + * In-process per-(org, key) async concurrency limiter. + * + * Some server operations shell out to an expensive shared resource — most + * notably `evaluateVrl` (transform-eval.ts), which spawns a `vector` subprocess. + * Without a per-tenant bound, one organization issuing many concurrent requests + * (live-tap iteration, cost what-if, unit-test "run all") can spawn unbounded + * subprocesses and starve the shared host for every other tenant. + * + * `withOrgConcurrencyLimit(orgId, key, max, fn)` caps how many `fn`s run at once + * for a given (orgId, key) pair, queueing the rest FIFO and admitting the next + * waiter as each slot frees. The slot is always released — on success or throw. + * This is a single-process primitive (one Node worker): fairness within a host, + * not a distributed limiter. + * + * A missing/empty `orgId` falls back to a shared `"global"` bucket so callers + * without org context still get a bound rather than none. + */ + +/** Per-(org, key) counting semaphore: `max` slots and a FIFO queue of waiters. */ +interface Semaphore { + max: number; + active: number; + /** FIFO resolvers; calling one admits that waiter into a freed slot. */ + queue: Array<() => void>; +} + +/** Live buckets keyed by `"\u0000"`. Idle buckets are deleted. */ +const semaphores = new Map(); + +function getSemaphore(mapKey: string, max: number): Semaphore { + let sem = semaphores.get(mapKey); + if (!sem) { + // The bound is fixed when the bucket is created (and re-established if the + // bucket is recreated after going idle); callers MUST pass a consistent + // `max` per (orgId, key). Clamp to >= 1 so a stray 0 can't deadlock. + sem = { max: Math.max(1, Math.floor(max)), active: 0, queue: [] }; + semaphores.set(mapKey, sem); + } + return sem; +} + +/** Acquire a slot — resolves immediately if one is free, else queues FIFO. */ +function acquire(sem: Semaphore): Promise { + if (sem.active < sem.max) { + sem.active++; + return Promise.resolve(); + } + const { promise, resolve } = Promise.withResolvers(); + sem.queue.push(resolve); + return promise; +} + +/** + * Release a held slot. If a waiter is queued, hand the slot directly to it — + * `active` is unchanged, the slot transfers — which preserves FIFO and stops a + * newcomer from jumping the queue. Otherwise free the slot, deleting the bucket + * once it is fully idle so the Map stays proportional to live concurrency + * rather than the historical org count. + */ +function release(mapKey: string, sem: Semaphore): void { + const next = sem.queue.shift(); + if (next) { + next(); + return; + } + sem.active--; + if (sem.active <= 0 && sem.queue.length === 0) { + semaphores.delete(mapKey); + } +} + +/** + * Run `fn` under a per-(orgId, key) concurrency cap of `max`. At most `max` + * `fn`s run concurrently for the pair; the rest queue FIFO and drain as slots + * free. The slot is always released, on success or throw. + */ +export async function withOrgConcurrencyLimit( + orgId: string, + key: string, + max: number, + fn: () => Promise, +): Promise { + // NUL-join keeps the composite key unambiguous (NUL cannot appear in an id); + // an empty/whitespace orgId collapses to a shared "global" bucket. + const mapKey = `${orgId.trim() || "global"}\u0000${key}`; + const sem = getSemaphore(mapKey, max); + await acquire(sem); + try { + return await fn(); + } finally { + release(mapKey, sem); + } +} diff --git a/src/server/routers/__tests__/tap-capture.test.ts b/src/server/routers/__tests__/tap-capture.test.ts index 408dc7d7..ff3e32c4 100644 --- a/src/server/routers/__tests__/tap-capture.test.ts +++ b/src/server/routers/__tests__/tap-capture.test.ts @@ -280,7 +280,7 @@ describe("tapCaptureRouter.testTransform", () => { source: "del(.a)", }); - expect(evaluateVrl).toHaveBeenCalledWith("del(.a)", [{ a: 1 }, { a: 2 }, { a: 3 }]); + expect(evaluateVrl).toHaveBeenCalledWith("del(.a)", [{ a: 1 }, { a: 2 }, { a: 3 }], { orgId: "org-1" }); expect(result.outputs).toHaveLength(2); expect(result.stats).toEqual({ inputCount: 3, diff --git a/src/server/routers/__tests__/vrl-pipeline-unit-test.test.ts b/src/server/routers/__tests__/vrl-pipeline-unit-test.test.ts index d31361b8..0cd2f083 100644 --- a/src/server/routers/__tests__/vrl-pipeline-unit-test.test.ts +++ b/src/server/routers/__tests__/vrl-pipeline-unit-test.test.ts @@ -115,8 +115,8 @@ describe("vrlRouter.runPipelineUnitTests", () => { const res = await caller.runPipelineUnitTests({ pipelineId: "pipe-1" }); // Each test ran against ITS component's persisted source, not a shared one. - expect(evaluateVrl).toHaveBeenCalledWith("SRC_ONE", [{ a: 1 }]); - expect(evaluateVrl).toHaveBeenCalledWith("SRC_TWO", [{ b: 2 }]); + expect(evaluateVrl).toHaveBeenCalledWith("SRC_ONE", [{ a: 1 }], { orgId: "org-1" }); + expect(evaluateVrl).toHaveBeenCalledWith("SRC_TWO", [{ b: 2 }], { orgId: "org-1" }); expect(res.results).toEqual([ { id: "t1", name: "one", componentKey: "remap_1", passed: true, actual: { a: 1 }, expected: { a: 1 } }, { id: "t2", name: "two", componentKey: "remap_2", passed: true, actual: { b: 2 }, expected: { b: 2 } }, @@ -165,7 +165,7 @@ describe("vrlRouter.runPipelineUnitTests", () => { expect(res.results.map((r: { id: string }) => r.id)).toEqual(["t1"]); expect(res.summary).toEqual({ total: 1, passed: 1, failed: 0 }); expect(evaluateVrl).toHaveBeenCalledTimes(1); - expect(evaluateVrl).toHaveBeenCalledWith("SRC", [{ a: 1 }]); + expect(evaluateVrl).toHaveBeenCalledWith("SRC", [{ a: 1 }], { orgId: "org-1" }); }); it("caps the number of tests run per component", async () => { diff --git a/src/server/routers/__tests__/vrl-unit-test.test.ts b/src/server/routers/__tests__/vrl-unit-test.test.ts index fc3889ed..a275af01 100644 --- a/src/server/routers/__tests__/vrl-unit-test.test.ts +++ b/src/server/routers/__tests__/vrl-unit-test.test.ts @@ -226,7 +226,7 @@ describe("vrlRouter.runUnitTests", () => { source: '.level = "info"', }); - expect(evaluateVrl).toHaveBeenCalledWith('.level = "info"', [{ level: "debug" }]); + expect(evaluateVrl).toHaveBeenCalledWith('.level = "info"', [{ level: "debug" }], { orgId: "org-1" }); expect(results).toEqual([ { id: "ut-1", name: "set level", passed: true, actual: { host: "h", level: "info" }, expected: { level: "info", host: "h" } }, ]); diff --git a/src/server/routers/proposed-change.ts b/src/server/routers/proposed-change.ts index cdf81e8b..9428353c 100644 --- a/src/server/routers/proposed-change.ts +++ b/src/server/routers/proposed-change.ts @@ -288,7 +288,7 @@ export const proposedChangeRouter = router({ }); } let source = input.vrlSource; - let result = await evaluateVrl(source, [VRL_SAMPLE_EVENT]); + let result = await evaluateVrl(source, [VRL_SAMPLE_EVENT], { orgId: ctx.organizationId }); let attempts = 0; while (result.error && attempts < MAX_AUTOFIX_ATTEMPTS) { attempts++; @@ -300,7 +300,7 @@ export const proposedChangeRouter = router({ }); if (!fixed) break; source = fixed; - result = await evaluateVrl(source, [VRL_SAMPLE_EVENT]); + result = await evaluateVrl(source, [VRL_SAMPLE_EVENT], { orgId: ctx.organizationId }); } const valid = !result.error; diff --git a/src/server/routers/tap-capture.ts b/src/server/routers/tap-capture.ts index 213967f1..e81e0aad 100644 --- a/src/server/routers/tap-capture.ts +++ b/src/server/routers/tap-capture.ts @@ -213,7 +213,7 @@ export const tapCaptureRouter = router({ const events = Array.isArray(capture.events) ? (capture.events as unknown[]) : []; - const result = await evaluateVrl(input.source, events); + const result = await evaluateVrl(input.source, events, { orgId: ctx.organizationId }); return { outputs: result.outputs, stats: { diff --git a/src/server/routers/vrl.ts b/src/server/routers/vrl.ts index c57db72a..d62a7caa 100644 --- a/src/server/routers/vrl.ts +++ b/src/server/routers/vrl.ts @@ -98,17 +98,20 @@ export interface PipelineVrlUnitTestRunResult extends VrlUnitTestRunResult { * `runPipelineUnitTests` (every component, each node's persisted source) so both * apply identical pass/fail semantics: a compile error, a dropped event, or a * mismatch all report `passed: false`. + * `orgId` threads through to `evaluateVrl` so the per-tenant `vector` + * subprocess bound applies across concurrent calls, not just within a batch. */ async function runTestsAgainstSource( source: string, tests: ReadonlyArray<{ id: string; name: string; input: unknown; expected: unknown }>, + orgId: string, ): Promise { const results: VrlUnitTestRunResult[] = []; for (let i = 0; i < tests.length; i += VRL_TEST_RUN_CONCURRENCY) { const batch = tests.slice(i, i + VRL_TEST_RUN_CONCURRENCY); const batchResults = await Promise.all( batch.map(async (test) => { - const result = await evaluateVrl(source, [test.input]); + const result = await evaluateVrl(source, [test.input], { orgId }); const actual = result.outputs.length === 1 ? result.outputs[0] : null; const passed = !result.error && @@ -304,7 +307,7 @@ export const vrlRouter = router({ const events = Array.isArray(capture.events) ? (capture.events as unknown[]) : []; - return evaluateVrl(input.source, events); + return evaluateVrl(input.source, events, { orgId: ctx.organizationId }); }), /** @@ -480,7 +483,7 @@ export const vrlRouter = router({ take: MAX_VRL_UNIT_TESTS_PER_COMPONENT, }); - return runTestsAgainstSource(input.source, tests); + return runTestsAgainstSource(input.source, tests, ctx.organizationId); }), /** @@ -551,7 +554,7 @@ export const vrlRouter = router({ const results: PipelineVrlUnitTestRunResult[] = []; for (const [componentKey, componentTests] of testsByComponent) { const source = sourceByComponent.get(componentKey)!; - const componentResults = await runTestsAgainstSource(source, componentTests); + const componentResults = await runTestsAgainstSource(source, componentTests, ctx.organizationId); for (const r of componentResults) { results.push({ ...r, componentKey }); } diff --git a/src/server/services/cost-recommendation-procedures.ts b/src/server/services/cost-recommendation-procedures.ts index 58a4fd8b..9f14965a 100644 --- a/src/server/services/cost-recommendation-procedures.ts +++ b/src/server/services/cost-recommendation-procedures.ts @@ -459,7 +459,7 @@ export async function simulateTransform( }; } - const result = await evaluateVrl(source, events); + const result = await evaluateVrl(source, events, { orgId: input.organizationId }); const estimatedSavingsCents = await projectSimulatedSavings( pipelineId, input.organizationId, diff --git a/src/server/services/transform-eval.ts b/src/server/services/transform-eval.ts index d0eda371..e382b051 100644 --- a/src/server/services/transform-eval.ts +++ b/src/server/services/transform-eval.ts @@ -17,12 +17,20 @@ import { execFile } from "child_process"; import { promisify } from "util"; import { join } from "path"; import { tmpdir } from "os"; +import { withOrgConcurrencyLimit } from "@/lib/org-concurrency"; const execFileAsync = promisify(execFile); const DEFAULT_TIMEOUT_MS = 15_000; const MAX_BUFFER_BYTES = 16 * 1024 * 1024; +/** + * Max concurrent `vector` subprocesses per org (bucket key `"vector-eval"`). + * Conservative: enough for an interactive editor's parallel unit-test batch + * without letting one tenant monopolize the shared host. + */ +const VECTOR_EVAL_CONCURRENCY = 4; + export interface TransformEvalStats { /** Events fed in. */ inputCount: number; @@ -50,6 +58,11 @@ export interface TransformEvalResult extends TransformEvalStats { export interface EvaluateVrlOptions { timeoutMs?: number; + /** + * Organization to bound the `vector` subprocess spawn under (see + * `withOrgConcurrencyLimit`). Omit to run unbounded (backward-compatible). + */ + orgId?: string; } /** NDJSON encoding for `vector vrl --input` (one compact JSON object per line). */ @@ -120,24 +133,19 @@ function resultFromOutputs( } /** - * Run a VRL program over `events`, returning transformed outputs + reduction - * stats. An empty program or empty input is a no-op pass-through (0% reduction). - * Never throws — failures land in `result.error`. + * Spawn the `vector` subprocess for a non-trivial program/input pair: write the + * program + NDJSON to a temp dir, run `vector vrl`, parse the surviving events. + * Never throws — compile errors, a missing binary, or a timeout land in + * `result.error`. Temp files are always cleaned up. */ -export async function evaluateVrl( +async function runVectorEval( source: string, - events: unknown[], - options: EvaluateVrlOptions = {}, + ndjson: string, + inputCount: number, + inputBytes: number, + start: number, + timeoutMs: number, ): Promise { - const start = performance.now(); - const inputCount = events.length; - const ndjson = buildNdjson(events); - const inputBytes = Buffer.byteLength(ndjson, "utf8"); - - if (!source.trim() || inputCount === 0) { - return resultFromOutputs(inputCount, inputBytes, [...events], 0); - } - let tmpDir: string; try { tmpDir = await mkdtemp(join(tmpdir(), "vf-transform-eval-")); @@ -156,7 +164,7 @@ export async function evaluateVrl( "vector", ["vrl", "--input", inputPath, "--program", programPath, "--print-object"], { - timeout: options.timeoutMs ?? DEFAULT_TIMEOUT_MS, + timeout: timeoutMs, maxBuffer: MAX_BUFFER_BYTES, env: { ...process.env, VECTOR_LOG: "error" }, }, @@ -177,3 +185,35 @@ export async function evaluateVrl( await unlink(inputPath).catch(() => {}); } } + +/** + * Run a VRL program over `events`, returning transformed outputs + reduction + * stats. An empty program or empty input is a no-op pass-through (0% reduction). + * Never throws — failures land in `result.error`. + * + * When `options.orgId` is set, the `vector` subprocess spawn is bounded per org + * (key `"vector-eval"`) so one tenant's concurrent evals (live-tap, cost + * what-if, unit-test "run all") cannot starve the shared host. The no-op fast + * path never takes a slot, and behavior is unchanged when `orgId` is omitted. + */ +export async function evaluateVrl( + source: string, + events: unknown[], + options: EvaluateVrlOptions = {}, +): Promise { + const start = performance.now(); + const inputCount = events.length; + const ndjson = buildNdjson(events); + const inputBytes = Buffer.byteLength(ndjson, "utf8"); + + if (!source.trim() || inputCount === 0) { + return resultFromOutputs(inputCount, inputBytes, [...events], 0); + } + + const timeoutMs = options.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const run = () => runVectorEval(source, ndjson, inputCount, inputBytes, start, timeoutMs); + + return options.orgId + ? withOrgConcurrencyLimit(options.orgId, "vector-eval", VECTOR_EVAL_CONCURRENCY, run) + : run(); +}