Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions src/lib/__tests__/org-concurrency.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { describe, it, expect } from "vitest";
import { withOrgConcurrencyLimit } from "../org-concurrency";

/**
* Drain the microtask queue so all pending acquire/handoff continuations settle
* before we assert. The limiter is microtask-only, so a single macrotask tick
* (setImmediate) runs strictly after every queued handoff.
*/
function flush(): Promise<void> {
const { promise, resolve } = Promise.withResolvers<void>();
setImmediate(resolve);
return promise;
}

describe("withOrgConcurrencyLimit", () => {
it("never runs more than `max` tasks at once for one org, draining FIFO", async () => {
const gates = Array.from({ length: 5 }, () => Promise.withResolvers<void>());
let running = 0;
let peak = 0;
const startOrder: number[] = [];

const tasks = gates.map((gate, i) =>
withOrgConcurrencyLimit("org-cap", "k", 2, async () => {
running++;
peak = Math.max(peak, running);
startOrder.push(i);
await gate.promise;
running--;
}),
);

// Only the first two acquire a slot; tasks 2..4 queue.
await flush();
expect(running).toBe(2);
expect(startOrder).toEqual([0, 1]);

// Each released slot admits exactly the next FIFO waiter — never a 3rd at once.
gates[0].resolve();
await flush();
expect(running).toBe(2);
expect(startOrder).toEqual([0, 1, 2]);

gates[1].resolve();
await flush();
expect(running).toBe(2);
expect(startOrder).toEqual([0, 1, 2, 3]);

gates[2].resolve();
await flush();
expect(startOrder).toEqual([0, 1, 2, 3, 4]);

gates[3].resolve();
gates[4].resolve();
await Promise.all(tasks);

expect(peak).toBe(2);
});

it("isolates the limit per org — a saturated org never blocks another", async () => {
const aGate = Promise.withResolvers<void>();
let aFirstRunning = false;
let aSecondRan = false;
let bRan = false;

// org-a (cap 1) is held open by its first task.
const aFirst = withOrgConcurrencyLimit("org-a", "k", 1, async () => {
aFirstRunning = true;
await aGate.promise;
});
await flush();
expect(aFirstRunning).toBe(true);

// A second org-a task must queue (cap reached) ...
const aSecond = withOrgConcurrencyLimit("org-a", "k", 1, async () => {
aSecondRan = true;
});
// ... while org-b is independent and runs immediately.
const bTask = withOrgConcurrencyLimit("org-b", "k", 1, async () => {
bRan = true;
});

await flush();
expect(bRan).toBe(true); // org-b unaffected by org-a's saturation
expect(aSecondRan).toBe(false); // org-a still at its cap

aGate.resolve();
await Promise.all([aFirst, aSecond, bTask]);
expect(aSecondRan).toBe(true);
});

it("isolates different keys within the same org", async () => {
const gate = Promise.withResolvers<void>();
let key2Ran = false;

const key1 = withOrgConcurrencyLimit("org-keys", "key-1", 1, async () => {
await gate.promise;
});
await flush();

const key2 = withOrgConcurrencyLimit("org-keys", "key-2", 1, async () => {
key2Ran = true;
});
await flush();
expect(key2Ran).toBe(true); // distinct key → its own slot, not blocked by key-1

gate.resolve();
await Promise.all([key1, key2]);
});

it("releases the slot when fn throws, admitting the next waiter", async () => {
const gate = Promise.withResolvers<void>();
let secondRan = false;

const first = withOrgConcurrencyLimit("org-throw", "k", 1, async () => {
await gate.promise;
throw new Error("boom");
});
first.catch(() => {}); // handled below via expect().rejects; avoid unhandled-rejection noise

const second = withOrgConcurrencyLimit("org-throw", "k", 1, async () => {
secondRan = true;
});

await flush();
expect(secondRan).toBe(false); // queued behind the still-running first

gate.resolve();
await expect(first).rejects.toThrow("boom");

await second;
expect(secondRan).toBe(true); // slot was released despite the throw
});

it("routes a missing/empty orgId to a shared 'global' bucket", async () => {
const gate = Promise.withResolvers<void>();
let firstRunning = false;
let secondRan = false;

const first = withOrgConcurrencyLimit("", "global-key", 1, async () => {
firstRunning = true;
await gate.promise;
});
// Whitespace-only orgId collapses to the same "global" bucket as "".
const second = withOrgConcurrencyLimit(" ", "global-key", 1, async () => {
secondRan = true;
});

await flush();
expect(firstRunning).toBe(true);
expect(secondRan).toBe(false); // shares first's bucket → must queue

gate.resolve();
await Promise.all([first, second]);
expect(secondRan).toBe(true);
});

it("returns fn's resolved value", async () => {
const value = await withOrgConcurrencyLimit("org-ret", "k", 2, async () => 42);
expect(value).toBe(42);
});
});
94 changes: 94 additions & 0 deletions src/lib/org-concurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* In-process per-(org, key) async concurrency limiter.
*
* Some server operations shell out to an expensive shared resource — most
* notably `evaluateVrl` (transform-eval.ts), which spawns a `vector` subprocess.
* Without a per-tenant bound, one organization issuing many concurrent requests
* (live-tap iteration, cost what-if, unit-test "run all") can spawn unbounded
* subprocesses and starve the shared host for every other tenant.
*
* `withOrgConcurrencyLimit(orgId, key, max, fn)` caps how many `fn`s run at once
* for a given (orgId, key) pair, queueing the rest FIFO and admitting the next
* waiter as each slot frees. The slot is always released — on success or throw.
* This is a single-process primitive (one Node worker): fairness within a host,
* not a distributed limiter.
*
* A missing/empty `orgId` falls back to a shared `"global"` bucket so callers
* without org context still get a bound rather than none.
*/

/** Per-(org, key) counting semaphore: `max` slots and a FIFO queue of waiters. */
interface Semaphore {
max: number;
active: number;
/** FIFO resolvers; calling one admits that waiter into a freed slot. */
queue: Array<() => void>;
}

/** Live buckets keyed by `"<org>\u0000<key>"`. Idle buckets are deleted. */
const semaphores = new Map<string, Semaphore>();

function getSemaphore(mapKey: string, max: number): Semaphore {
let sem = semaphores.get(mapKey);
if (!sem) {
// The bound is fixed when the bucket is created (and re-established if the
// bucket is recreated after going idle); callers MUST pass a consistent
// `max` per (orgId, key). Clamp to >= 1 so a stray 0 can't deadlock.
sem = { max: Math.max(1, Math.floor(max)), active: 0, queue: [] };
semaphores.set(mapKey, sem);
}
return sem;
}

/** Acquire a slot — resolves immediately if one is free, else queues FIFO. */
function acquire(sem: Semaphore): Promise<void> {
if (sem.active < sem.max) {
sem.active++;
return Promise.resolve();
}
const { promise, resolve } = Promise.withResolvers<void>();
sem.queue.push(resolve);
return promise;
}

/**
* Release a held slot. If a waiter is queued, hand the slot directly to it —
* `active` is unchanged, the slot transfers — which preserves FIFO and stops a
* newcomer from jumping the queue. Otherwise free the slot, deleting the bucket
* once it is fully idle so the Map stays proportional to live concurrency
* rather than the historical org count.
*/
function release(mapKey: string, sem: Semaphore): void {
const next = sem.queue.shift();
if (next) {
next();
return;
}
sem.active--;
if (sem.active <= 0 && sem.queue.length === 0) {
semaphores.delete(mapKey);
}
}

/**
* Run `fn` under a per-(orgId, key) concurrency cap of `max`. At most `max`
* `fn`s run concurrently for the pair; the rest queue FIFO and drain as slots
* free. The slot is always released, on success or throw.
*/
export async function withOrgConcurrencyLimit<T>(
orgId: string,
key: string,
max: number,
fn: () => Promise<T>,
): Promise<T> {
// NUL-join keeps the composite key unambiguous (NUL cannot appear in an id);
// an empty/whitespace orgId collapses to a shared "global" bucket.
const mapKey = `${orgId.trim() || "global"}\u0000${key}`;
const sem = getSemaphore(mapKey, max);
await acquire(sem);
try {
return await fn();
} finally {
release(mapKey, sem);
}
}
2 changes: 1 addition & 1 deletion src/server/routers/__tests__/tap-capture.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ describe("tapCaptureRouter.testTransform", () => {
source: "del(.a)",
});

expect(evaluateVrl).toHaveBeenCalledWith("del(.a)", [{ a: 1 }, { a: 2 }, { a: 3 }]);
expect(evaluateVrl).toHaveBeenCalledWith("del(.a)", [{ a: 1 }, { a: 2 }, { a: 3 }], { orgId: "org-1" });
expect(result.outputs).toHaveLength(2);
expect(result.stats).toEqual({
inputCount: 3,
Expand Down
6 changes: 3 additions & 3 deletions src/server/routers/__tests__/vrl-pipeline-unit-test.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ describe("vrlRouter.runPipelineUnitTests", () => {
const res = await caller.runPipelineUnitTests({ pipelineId: "pipe-1" });

// Each test ran against ITS component's persisted source, not a shared one.
expect(evaluateVrl).toHaveBeenCalledWith("SRC_ONE", [{ a: 1 }]);
expect(evaluateVrl).toHaveBeenCalledWith("SRC_TWO", [{ b: 2 }]);
expect(evaluateVrl).toHaveBeenCalledWith("SRC_ONE", [{ a: 1 }], { orgId: "org-1" });
expect(evaluateVrl).toHaveBeenCalledWith("SRC_TWO", [{ b: 2 }], { orgId: "org-1" });
expect(res.results).toEqual([
{ id: "t1", name: "one", componentKey: "remap_1", passed: true, actual: { a: 1 }, expected: { a: 1 } },
{ id: "t2", name: "two", componentKey: "remap_2", passed: true, actual: { b: 2 }, expected: { b: 2 } },
Expand Down Expand Up @@ -165,7 +165,7 @@ describe("vrlRouter.runPipelineUnitTests", () => {
expect(res.results.map((r: { id: string }) => r.id)).toEqual(["t1"]);
expect(res.summary).toEqual({ total: 1, passed: 1, failed: 0 });
expect(evaluateVrl).toHaveBeenCalledTimes(1);
expect(evaluateVrl).toHaveBeenCalledWith("SRC", [{ a: 1 }]);
expect(evaluateVrl).toHaveBeenCalledWith("SRC", [{ a: 1 }], { orgId: "org-1" });
});

it("caps the number of tests run per component", async () => {
Expand Down
2 changes: 1 addition & 1 deletion src/server/routers/__tests__/vrl-unit-test.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ describe("vrlRouter.runUnitTests", () => {
source: '.level = "info"',
});

expect(evaluateVrl).toHaveBeenCalledWith('.level = "info"', [{ level: "debug" }]);
expect(evaluateVrl).toHaveBeenCalledWith('.level = "info"', [{ level: "debug" }], { orgId: "org-1" });
expect(results).toEqual([
{ id: "ut-1", name: "set level", passed: true, actual: { host: "h", level: "info" }, expected: { level: "info", host: "h" } },
]);
Expand Down
4 changes: 2 additions & 2 deletions src/server/routers/proposed-change.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ export const proposedChangeRouter = router({
});
}
let source = input.vrlSource;
let result = await evaluateVrl(source, [VRL_SAMPLE_EVENT]);
let result = await evaluateVrl(source, [VRL_SAMPLE_EVENT], { orgId: ctx.organizationId });
let attempts = 0;
while (result.error && attempts < MAX_AUTOFIX_ATTEMPTS) {
attempts++;
Expand All @@ -300,7 +300,7 @@ export const proposedChangeRouter = router({
});
if (!fixed) break;
source = fixed;
result = await evaluateVrl(source, [VRL_SAMPLE_EVENT]);
result = await evaluateVrl(source, [VRL_SAMPLE_EVENT], { orgId: ctx.organizationId });
}
const valid = !result.error;

Expand Down
2 changes: 1 addition & 1 deletion src/server/routers/tap-capture.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ export const tapCaptureRouter = router({
const events = Array.isArray(capture.events)
? (capture.events as unknown[])
: [];
const result = await evaluateVrl(input.source, events);
const result = await evaluateVrl(input.source, events, { orgId: ctx.organizationId });
return {
outputs: result.outputs,
stats: {
Expand Down
11 changes: 7 additions & 4 deletions src/server/routers/vrl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,20 @@ export interface PipelineVrlUnitTestRunResult extends VrlUnitTestRunResult {
* `runPipelineUnitTests` (every component, each node's persisted source) so both
* apply identical pass/fail semantics: a compile error, a dropped event, or a
* mismatch all report `passed: false`.
* `orgId` threads through to `evaluateVrl` so the per-tenant `vector`
* subprocess bound applies across concurrent calls, not just within a batch.
*/
async function runTestsAgainstSource(
source: string,
tests: ReadonlyArray<{ id: string; name: string; input: unknown; expected: unknown }>,
orgId: string,
): Promise<VrlUnitTestRunResult[]> {
const results: VrlUnitTestRunResult[] = [];
for (let i = 0; i < tests.length; i += VRL_TEST_RUN_CONCURRENCY) {
const batch = tests.slice(i, i + VRL_TEST_RUN_CONCURRENCY);
const batchResults = await Promise.all(
batch.map(async (test) => {
const result = await evaluateVrl(source, [test.input]);
const result = await evaluateVrl(source, [test.input], { orgId });
const actual = result.outputs.length === 1 ? result.outputs[0] : null;
const passed =
!result.error &&
Expand Down Expand Up @@ -304,7 +307,7 @@ export const vrlRouter = router({
const events = Array.isArray(capture.events)
? (capture.events as unknown[])
: [];
return evaluateVrl(input.source, events);
return evaluateVrl(input.source, events, { orgId: ctx.organizationId });
}),

/**
Expand Down Expand Up @@ -480,7 +483,7 @@ export const vrlRouter = router({
take: MAX_VRL_UNIT_TESTS_PER_COMPONENT,
});

return runTestsAgainstSource(input.source, tests);
return runTestsAgainstSource(input.source, tests, ctx.organizationId);
}),

/**
Expand Down Expand Up @@ -551,7 +554,7 @@ export const vrlRouter = router({
const results: PipelineVrlUnitTestRunResult[] = [];
for (const [componentKey, componentTests] of testsByComponent) {
const source = sourceByComponent.get(componentKey)!;
const componentResults = await runTestsAgainstSource(source, componentTests);
const componentResults = await runTestsAgainstSource(source, componentTests, ctx.organizationId);
for (const r of componentResults) {
results.push({ ...r, componentKey });
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/services/cost-recommendation-procedures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ export async function simulateTransform(
};
}

const result = await evaluateVrl(source, events);
const result = await evaluateVrl(source, events, { orgId: input.organizationId });
const estimatedSavingsCents = await projectSimulatedSavings(
pipelineId,
input.organizationId,
Expand Down
Loading
Loading