Skip to content

Commit a557dbc

Browse files
roblourensCopilot
andauthored
Await pending session DB writes on agent host shutdown (#311432)
* Await pending session DB writes on agent host shutdown The agent host server's SIGTERM/SIGINT handler called process.exit(0) synchronously, abandoning any fire-and-forget SQLite writes that were in flight (configValues, customTitle, isRead/isDone, diffs). Under CI load this caused the 'Session Config persistence across restarts' integration test to the most recent SessionConfigChanged writeflake could lose the race against shutdown, leaving the previous value persisted instead. Track in-flight writes inside SessionDatabase via a _pendingWrites set populated by every public mutating method (the outermost wrap is required so the await this._ensureDb() window is also covered). SessionDataService aggregates whenIdle() across all live per-session DBs. The server's shutdown handler now awaits this with a 3s raceTimeout before disposing. Removes the await timeout(500) hack the test previously needed. (Written by Copilot) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address PR review: close ws server first, simplify whenIdle loop - Close the WebSocket server before awaiting whenIdle() so no further actions can be dispatched during the flush window. - Simplify SessionDataService.whenIdle(): per-DB whenIdle() already drains writes against existing DBs, so the outer loop only needs to re-pass when a NEW DB was opened during the await. Comment now matches the code. (Written by Copilot) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix Windows: trigger graceful shutdown via stdin close in test On Windows, child.kill() (SIGTERM) terminates the process unconditionally without invoking the SIGTERM so the in-flight setMetadata writehandler never reaches SQLite and the second phase sees no persisted config at all. Closing the child's stdin fires process.stdin.on('end', shutdown) on every platform, exercising the same graceful flush path. (Written by Copilot) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 11d49ab commit a557dbc

8 files changed

Lines changed: 205 additions & 78 deletions

File tree

src/vs/platform/agentHost/common/sessionDataService.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ export interface ISessionDatabase extends IDisposable {
173173
*/
174174
remapTurnIds(mapping: ReadonlyMap<string, string>): Promise<void>;
175175

176+
/**
177+
* Resolves once all in-flight write operations on this database have
178+
* settled. Used by graceful shutdown to flush fire-and-forget writes
179+
* before the process exits.
180+
*/
181+
whenIdle(): Promise<void>;
182+
176183
/**
177184
* Close the database connection. After calling this method, the object is
178185
* considered disposed and all other methods will reject with an error.
@@ -235,4 +242,12 @@ export interface ISessionDataService {
235242
* Called at startup; safe to call multiple times.
236243
*/
237244
cleanupOrphanedData(knownSessionIds: Set<string>): Promise<void>;
245+
246+
/**
247+
* Resolves once all in-flight write operations across every currently
248+
* open per-session database have settled. Intended for graceful
249+
* shutdown — fire-and-forget writes (e.g. metadata persistence) would
250+
* otherwise be lost when the process exits.
251+
*/
252+
whenIdle(): Promise<void>;
238253
}

src/vs/platform/agentHost/node/agentHostServerMain.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ globalThis._VSCODE_FILE_ROOT = fileURLToPath(new URL('../../../..', import.meta.
1616
import * as fs from 'fs';
1717
import * as os from 'os';
1818
import { DisposableStore } from '../../../base/common/lifecycle.js';
19+
import { raceTimeout } from '../../../base/common/async.js';
1920
import { URI } from '../../../base/common/uri.js';
2021
import { generateUuid } from '../../../base/common/uuid.js';
2122
import { localize } from '../../../nls.js';
@@ -253,12 +254,31 @@ async function main(): Promise<void> {
253254

254255
// Keep alive until stdin closes or signal
255256
process.stdin.resume();
256-
process.stdin.on('end', shutdown);
257-
process.on('SIGTERM', shutdown);
258-
process.on('SIGINT', shutdown);
259-
260-
function shutdown(): void {
257+
process.stdin.on('end', () => { void shutdown(); });
258+
process.on('SIGTERM', () => { void shutdown(); });
259+
process.on('SIGINT', () => { void shutdown(); });
260+
261+
let shuttingDown = false;
262+
async function shutdown(): Promise<void> {
263+
if (shuttingDown) {
264+
return;
265+
}
266+
shuttingDown = true;
261267
logService.info('[AgentHostServer] Shutting down...');
268+
// Close the WebSocket server first so no further actions can be
269+
// dispatched while we wait for in-flight writes to flush — otherwise
270+
// a late-arriving action could keep queuing DB writes and either
271+
// undermine the flush or push us past the timeout.
272+
wsServer.dispose();
273+
// Wait for in-flight persistence writes to flush to the per-session
274+
// SQLite databases. Without this, a SIGTERM arriving while a
275+
// `setMetadata` write (configValues, customTitle, isRead, isDone,
276+
// diffs) is in flight can drop the latest value — see the
277+
// "Session Config persistence across restarts" integration test.
278+
// Capped so a stuck write cannot hang shutdown indefinitely.
279+
await raceTimeout(sessionDataService.whenIdle(), 3000, () => {
280+
logService.warn('[AgentHostServer] Timed out waiting for session database writes to flush; exiting anyway.');
281+
});
262282
disposables.dispose();
263283
loggerService?.dispose();
264284
process.exit(0);

src/vs/platform/agentHost/node/sessionDataService.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ import { ISessionDatabase, ISessionDataService, SESSION_DB_FILENAME } from '../c
1212
import { SessionDatabase } from './sessionDatabase.js';
1313

1414
class SessionDatabaseCollection extends ReferenceCollection<ISessionDatabase> {
15+
16+
/**
17+
* The set of currently-open databases. Mirrors what's held by the
18+
* underlying ref-counted map, but exposed so {@link SessionDataService.whenIdle}
19+
* can iterate without reaching into private state.
20+
*/
21+
readonly liveDatabases = new Set<ISessionDatabase>();
22+
1523
constructor(
1624
private readonly _getDbPath: (key: string) => string,
1725
private readonly _logService: ILogService,
@@ -22,10 +30,13 @@ class SessionDatabaseCollection extends ReferenceCollection<ISessionDatabase> {
2230
protected createReferencedObject(key: string): ISessionDatabase {
2331
const dbPath = this._getDbPath(key);
2432
this._logService.trace(`[SessionDataService] Opening database: ${dbPath}`);
25-
return new SessionDatabase(dbPath);
33+
const db = new SessionDatabase(dbPath);
34+
this.liveDatabases.add(db);
35+
return db;
2636
}
2737

2838
protected destroyReferencedObject(_key: string, object: ISessionDatabase): void {
39+
this.liveDatabases.delete(object);
2940
object.dispose();
3041
}
3142
}
@@ -124,4 +135,22 @@ export class SessionDataService implements ISessionDataService {
124135
this._logService.warn('[SessionDataService] Failed to run orphan cleanup', err);
125136
}
126137
}
138+
139+
async whenIdle(): Promise<void> {
140+
// Each `SessionDatabase.whenIdle()` already loops internally until
141+
// that DB is quiescent, so the outer loop only needs to handle the
142+
// case where a new DB was opened (and writes queued against it)
143+
// while we were awaiting an earlier pass.
144+
while (true) {
145+
const dbs = [...this._databases.liveDatabases];
146+
if (dbs.length === 0) {
147+
return;
148+
}
149+
await Promise.all(dbs.map(db => db.whenIdle()));
150+
const newOnes = [...this._databases.liveDatabases].filter(db => !dbs.includes(db));
151+
if (newOnes.length === 0) {
152+
return;
153+
}
154+
}
155+
}
127156
}

src/vs/platform/agentHost/node/sessionDatabase.ts

Lines changed: 120 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,17 @@ export class SessionDatabase implements ISessionDatabase {
195195
protected _closed: Promise<void> | true | undefined;
196196
private readonly _fileEditSequencer = new SequencerByKey<string>();
197197

198+
/**
199+
* In-flight write operations. Tracked so {@link whenIdle} can await them
200+
* before the process exits — without this, a `SIGTERM` arriving between
201+
* a fire-and-forget mutating call (e.g. `setMetadata`) being invoked and
202+
* its underlying SQLite query completing would silently drop the write.
203+
* Every public mutating method routes its returned promise through
204+
* {@link _track}; reads (`getMetadata`, `getFileEdits`, ...) skip
205+
* tracking since shutdown does not need to wait for them.
206+
*/
207+
private readonly _pendingWrites = new Set<Promise<unknown>>();
208+
198209
constructor(
199210
private readonly _path: string,
200211
private readonly _migrations: readonly ISessionDatabaseMigration[] = sessionDatabaseMigrations,
@@ -251,23 +262,29 @@ export class SessionDatabase implements ISessionDatabase {
251262

252263
// ---- Turns ----------------------------------------------------------
253264

254-
async createTurn(turnId: string): Promise<void> {
255-
const db = await this._ensureDb();
256-
await dbRun(db, 'INSERT OR IGNORE INTO turns (id) VALUES (?)', [turnId]);
265+
createTurn(turnId: string): Promise<void> {
266+
return this._track(async () => {
267+
const db = await this._ensureDb();
268+
await dbRun(db, 'INSERT OR IGNORE INTO turns (id) VALUES (?)', [turnId]);
269+
});
257270
}
258271

259-
async deleteTurn(turnId: string): Promise<void> {
260-
const db = await this._ensureDb();
261-
await dbRun(db, 'DELETE FROM turns WHERE id = ?', [turnId]);
272+
deleteTurn(turnId: string): Promise<void> {
273+
return this._track(async () => {
274+
const db = await this._ensureDb();
275+
await dbRun(db, 'DELETE FROM turns WHERE id = ?', [turnId]);
276+
});
262277
}
263278

264-
async setTurnEventId(turnId: string, eventId: string): Promise<void> {
265-
const db = await this._ensureDb();
266-
await dbRun(db, 'INSERT OR IGNORE INTO turns (id) VALUES (?)', [turnId]);
267-
// Only set the event ID if not already set — steering messages
268-
// trigger additional user.message events within the same turn,
269-
// and we must preserve the first (boundary) event ID.
270-
await dbRun(db, 'UPDATE turns SET event_id = ? WHERE id = ? AND event_id IS NULL', [eventId, turnId]);
279+
setTurnEventId(turnId: string, eventId: string): Promise<void> {
280+
return this._track(async () => {
281+
const db = await this._ensureDb();
282+
await dbRun(db, 'INSERT OR IGNORE INTO turns (id) VALUES (?)', [turnId]);
283+
// Only set the event ID if not already set — steering messages
284+
// trigger additional user.message events within the same turn,
285+
// and we must preserve the first (boundary) event ID.
286+
await dbRun(db, 'UPDATE turns SET event_id = ? WHERE id = ? AND event_id IS NULL', [eventId, turnId]);
287+
});
271288
}
272289

273290
async getTurnEventId(turnId: string): Promise<string | undefined> {
@@ -292,36 +309,42 @@ export class SessionDatabase implements ISessionDatabase {
292309
return row?.event_id as string | undefined ?? undefined;
293310
}
294311

295-
async truncateFromTurn(turnId: string): Promise<void> {
296-
const db = await this._ensureDb();
297-
// Delete the target turn and all turns inserted after it (by rowid order).
298-
// File edits cascade-delete via the foreign key constraint.
299-
await dbRun(db,
300-
`DELETE FROM turns WHERE rowid >= (SELECT rowid FROM turns WHERE id = ?)`,
301-
[turnId],
302-
);
312+
truncateFromTurn(turnId: string): Promise<void> {
313+
return this._track(async () => {
314+
const db = await this._ensureDb();
315+
// Delete the target turn and all turns inserted after it (by rowid order).
316+
// File edits cascade-delete via the foreign key constraint.
317+
await dbRun(db,
318+
`DELETE FROM turns WHERE rowid >= (SELECT rowid FROM turns WHERE id = ?)`,
319+
[turnId],
320+
);
321+
});
303322
}
304323

305-
async deleteTurnsAfter(turnId: string): Promise<void> {
306-
const db = await this._ensureDb();
307-
// Delete all turns inserted after the given turn (by rowid order),
308-
// keeping the given turn itself.
309-
// File edits cascade-delete via the foreign key constraint.
310-
await dbRun(db,
311-
`DELETE FROM turns WHERE rowid > (SELECT rowid FROM turns WHERE id = ?)`,
312-
[turnId],
313-
);
324+
deleteTurnsAfter(turnId: string): Promise<void> {
325+
return this._track(async () => {
326+
const db = await this._ensureDb();
327+
// Delete all turns inserted after the given turn (by rowid order),
328+
// keeping the given turn itself.
329+
// File edits cascade-delete via the foreign key constraint.
330+
await dbRun(db,
331+
`DELETE FROM turns WHERE rowid > (SELECT rowid FROM turns WHERE id = ?)`,
332+
[turnId],
333+
);
334+
});
314335
}
315336

316-
async deleteAllTurns(): Promise<void> {
317-
const db = await this._ensureDb();
318-
await dbExec(db, 'DELETE FROM turns');
337+
deleteAllTurns(): Promise<void> {
338+
return this._track(async () => {
339+
const db = await this._ensureDb();
340+
await dbExec(db, 'DELETE FROM turns');
341+
});
319342
}
320343

321344
// ---- File edits -----------------------------------------------------
322345

323-
async storeFileEdit(edit: IFileEditRecord & IFileEditContent): Promise<void> {
324-
return this._fileEditSequencer.queue(edit.filePath, async () => {
346+
storeFileEdit(edit: IFileEditRecord & IFileEditContent): Promise<void> {
347+
return this._track(() => this._fileEditSequencer.queue(edit.filePath, async () => {
325348
const db = await this._ensureDb();
326349
// Ensure the turn exists — lazily insert since the turn record
327350
// may not have been created by an explicit createTurn() call.
@@ -343,7 +366,7 @@ export class SessionDatabase implements ISessionDatabase {
343366
edit.removedLines ?? null,
344367
],
345368
);
346-
});
369+
}));
347370
}
348371

349372
async getFileEdits(toolCallIds: string[]): Promise<IFileEditRecord[]> {
@@ -459,42 +482,74 @@ export class SessionDatabase implements ISessionDatabase {
459482
return result;
460483
}
461484

462-
async setMetadata(key: string, value: string): Promise<void> {
463-
const db = await this._ensureDb();
464-
await dbRun(db, 'INSERT OR REPLACE INTO session_metadata (key, value) VALUES (?, ?)', [key, value]);
485+
setMetadata(key: string, value: string): Promise<void> {
486+
return this._track(async () => {
487+
const db = await this._ensureDb();
488+
await dbRun(db, 'INSERT OR REPLACE INTO session_metadata (key, value) VALUES (?, ?)', [key, value]);
489+
});
465490
}
466491

467-
async remapTurnIds(mapping: ReadonlyMap<string, string>): Promise<void> {
468-
const db = await this._ensureDb();
469-
// Defer FK checks to commit time so we can update turns.id and
470-
// file_edits.turn_id in any order without mid-statement violations.
471-
// This pragma auto-resets after the transaction ends.
472-
await dbExec(db, 'PRAGMA defer_foreign_keys = ON');
473-
await dbExec(db, 'BEGIN TRANSACTION');
474-
try {
475-
// Delete turns not present in the mapping (e.g. turns beyond
476-
// the fork point). File edits cascade-delete via FK.
477-
const oldIds = [...mapping.keys()];
478-
if (oldIds.length > 0) {
479-
const placeholders = oldIds.map(() => '?').join(',');
480-
await dbRun(db,
481-
`DELETE FROM turns WHERE id NOT IN (${placeholders})`,
482-
oldIds,
483-
);
484-
}
492+
remapTurnIds(mapping: ReadonlyMap<string, string>): Promise<void> {
493+
return this._track(async () => {
494+
const db = await this._ensureDb();
495+
// Defer FK checks to commit time so we can update turns.id and
496+
// file_edits.turn_id in any order without mid-statement violations.
497+
// This pragma auto-resets after the transaction ends.
498+
await dbExec(db, 'PRAGMA defer_foreign_keys = ON');
499+
await dbExec(db, 'BEGIN TRANSACTION');
500+
try {
501+
// Delete turns not present in the mapping (e.g. turns beyond
502+
// the fork point). File edits cascade-delete via FK.
503+
const oldIds = [...mapping.keys()];
504+
if (oldIds.length > 0) {
505+
const placeholders = oldIds.map(() => '?').join(',');
506+
await dbRun(db,
507+
`DELETE FROM turns WHERE id NOT IN (${placeholders})`,
508+
oldIds,
509+
);
510+
}
485511

486-
// Remap the remaining turn IDs to their new values
487-
for (const [oldId, newId] of mapping) {
488-
await dbRun(db, 'UPDATE turns SET id = ? WHERE id = ?', [newId, oldId]);
489-
await dbRun(db, 'UPDATE file_edits SET turn_id = ? WHERE turn_id = ?', [newId, oldId]);
512+
// Remap the remaining turn IDs to their new values
513+
for (const [oldId, newId] of mapping) {
514+
await dbRun(db, 'UPDATE turns SET id = ? WHERE id = ?', [newId, oldId]);
515+
await dbRun(db, 'UPDATE file_edits SET turn_id = ? WHERE turn_id = ?', [newId, oldId]);
516+
}
517+
await dbExec(db, 'COMMIT');
518+
} catch (err) {
519+
await dbExec(db, 'ROLLBACK');
520+
throw err;
490521
}
491-
await dbExec(db, 'COMMIT');
492-
} catch (err) {
493-
await dbExec(db, 'ROLLBACK');
494-
throw err;
522+
});
523+
}
524+
525+
/**
526+
* Resolves once all currently in-flight write operations have settled.
527+
* Used by graceful shutdown to flush pending fire-and-forget writes
528+
* before the process exits. Should be called from a path where no
529+
* further writes are expected; loops until idle to also drain any
530+
* writes that get queued while we're awaiting.
531+
*/
532+
async whenIdle(): Promise<void> {
533+
while (this._pendingWrites.size > 0) {
534+
await Promise.allSettled([...this._pendingWrites]);
495535
}
496536
}
497537

538+
/**
539+
* Wrap a mutating operation's promise so {@link whenIdle} can await it.
540+
* Invoke at the **outermost** layer of every public mutating method so
541+
* that any internal awaits (notably `_ensureDb()`) are covered too —
542+
* tracking only the leaf `dbRun`/`dbExec` would miss the window
543+
* between the method being called and the query actually being queued.
544+
*/
545+
private _track<T>(fn: () => Promise<T>): Promise<T> {
546+
const p = fn();
547+
this._pendingWrites.add(p);
548+
const untrack = () => { this._pendingWrites.delete(p); };
549+
p.then(untrack, untrack);
550+
return p;
551+
}
552+
498553
async close() {
499554
await (this._closed ??= this._dbPromise?.then(db => db.close()).catch(() => { }) || true);
500555
}

src/vs/platform/agentHost/test/common/sessionTestHelpers.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ export class TestSessionDatabase implements ISessionDatabase {
9090

9191
async remapTurnIds(_mapping: ReadonlyMap<string, string>): Promise<void> { }
9292

93+
async whenIdle(): Promise<void> { }
94+
9395
private _toEditRecords(edits: (IFileEditRecord & IFileEditContent)[]): IFileEditRecord[] {
9496
return edits.map(({ beforeContent: _, afterContent: _2, ...metadata }) => metadata);
9597
}
@@ -130,6 +132,7 @@ export function createSessionDataService(database: ISessionDatabase = new TestSe
130132
tryOpenDatabase: async () => createReference(database),
131133
deleteSessionData: async () => { },
132134
cleanupOrphanedData: async () => { },
135+
whenIdle: async () => { },
133136
};
134137
}
135138

@@ -142,6 +145,7 @@ export function createNullSessionDataService(): ISessionDataService {
142145
tryOpenDatabase: async () => undefined,
143146
deleteSessionData: async () => { },
144147
cleanupOrphanedData: async () => { },
148+
whenIdle: async () => { },
145149
};
146150
}
147151

0 commit comments

Comments
 (0)