Skip to content

Commit 2dbf69f

Browse files
zgliczvdiez
andauthored
JS-730 Websockets implementation (#5398)
Co-authored-by: Victor Diez <victor.diez@sonarsource.com>
1 parent 6223e3f commit 2dbf69f

51 files changed

Lines changed: 1301 additions & 441 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

its/plugin/tests/src/test/java/com/sonar/javascript/it/plugin/SonarJsIntegrationTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class Bridge {
166166
private Process process;
167167

168168
Bridge() {
169-
this.client = HttpClient.newHttpClient();
169+
this.client = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
170170
}
171171

172172
void start(Path dest) throws IOException {
@@ -184,6 +184,7 @@ void start(Path dest) throws IOException {
184184

185185
String request(String json, String endpoint) throws IOException, InterruptedException {
186186
var request = HttpRequest.newBuilder(url(endpoint))
187+
.version(HttpClient.Version.HTTP_1_1)
187188
.header("Content-Type", "application/json")
188189
.POST(HttpRequest.BodyPublishers.ofString(json))
189190
.build();

package-lock.json

Lines changed: 108 additions & 75 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
"@types/node": "22.15.24",
7474
"@types/semver": "7.7.0",
7575
"@types/tmp": "0.2.6",
76+
"@types/ws": "8.18.1",
7677
"cpy-cli": "5.0.0",
7778
"cross-env": "7.0.3",
7879
"dir-compare": "5.0.0",
@@ -121,7 +122,7 @@
121122
"functional-red-black-tree": "1.0.1",
122123
"globals": "16.2.0",
123124
"htmlparser2": "10.0.0",
124-
"http-status-codes": "^2.3.0",
125+
"http-status-codes": "2.3.0",
125126
"jsx-ast-utils": "3.3.5",
126127
"lodash.merge": "4.6.2",
127128
"minimatch": "9.0.5",
@@ -141,6 +142,7 @@
141142
"tmp": "0.2.3",
142143
"typescript": "5.8.3",
143144
"vue-eslint-parser": "10.1.3",
145+
"ws": "8.18.2",
144146
"yaml": "2.8.0"
145147
},
146148
"prettier": {

packages/bridge/src/delegate.ts

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,24 @@
1616
*/
1717
import express from 'express';
1818
import { Worker } from 'node:worker_threads';
19-
import { JsTsAnalysisOutputWithAst } from '../../jsts/src/analysis/analysis.js';
2019
import { handleRequest } from './handle-request.js';
21-
import { AnalysisOutput } from '../../shared/src/types/analysis.js';
22-
import { RequestResult, RequestType } from './request.js';
20+
import { RequestResult, RequestType, WsIncrementalResult } from './request.js';
2321
import { WorkerData } from '../../shared/src/helpers/worker.js';
22+
import { info, debug, error } from '../../shared/src/helpers/logging.js';
23+
import type { RawData, WebSocket } from 'ws';
24+
import { WorkerMessageListeners } from './router.js';
2425

2526
/**
2627
* Returns a delegate function to handle an HTTP request
2728
*/
28-
export function createDelegator(worker: Worker | undefined, workerData: WorkerData) {
29-
return function (type: RequestType) {
30-
return worker ? createWorkerHandler(worker, type) : createHandler(type, workerData);
31-
};
29+
export function createDelegator(
30+
worker: Worker | undefined,
31+
workerData: WorkerData,
32+
listeners: WorkerMessageListeners,
33+
) {
34+
return worker
35+
? (type: RequestType) => createWorkerHandler(worker, type, listeners)
36+
: (type: RequestType) => createHandler(type, workerData);
3237
}
3338

3439
/**
@@ -46,14 +51,16 @@ function createHandler(type: RequestType, workerData: WorkerData) {
4651
};
4752
}
4853

49-
function createWorkerHandler(worker: Worker, type: RequestType) {
54+
function createWorkerHandler(worker: Worker, type: RequestType, listeners: WorkerMessageListeners) {
5055
return async (
5156
request: express.Request,
5257
response: express.Response,
5358
next: express.NextFunction,
5459
) => {
55-
worker.once('message', message => {
56-
handleResult(message, response, next);
60+
listeners.oneTimers.push(message => {
61+
if (!message.ws) {
62+
handleResult(message, response, next);
63+
}
5764
});
5865
worker.postMessage({ type, data: request.body });
5966
};
@@ -75,6 +82,53 @@ function handleResult(
7582
}
7683
}
7784

78-
export function outputContainsAst(result: AnalysisOutput): result is JsTsAnalysisOutputWithAst {
79-
return 'astFilePath' in result;
85+
/**
86+
* Returns a delegate function to handle a web socket message
87+
*/
88+
export function createWsDelegator(
89+
worker: Worker | undefined,
90+
workerData: WorkerData,
91+
listeners: WorkerMessageListeners,
92+
) {
93+
return (ws: WebSocket) => {
94+
info('WebSocket client connected on /ws');
95+
if (worker) {
96+
listeners.permanent.push(message => {
97+
if (message.ws) {
98+
handleWsResult(ws, message.results);
99+
}
100+
});
101+
}
102+
103+
ws.on('message', async message => {
104+
const data = { type: 'on-analyze-project' as const, data: decodeMessage(message), ws: true };
105+
if (worker) {
106+
worker.postMessage(data);
107+
} else {
108+
await handleRequest(data, workerData, message => handleWsResult(ws, message));
109+
}
110+
});
111+
112+
ws.on('close', (code, reason) => {
113+
debug(`WebSocket client disconnected: ${reason} with code ${code}`);
114+
});
115+
116+
ws.on('error', err => {
117+
error(`WebSocket client error: ${err}`);
118+
});
119+
};
120+
}
121+
122+
function handleWsResult(ws: WebSocket, message: WsIncrementalResult) {
123+
ws.send(JSON.stringify(message));
124+
}
125+
126+
function decodeMessage(message: RawData) {
127+
let jsonString = '';
128+
if (Buffer.isBuffer(message)) {
129+
jsonString = message.toString('utf8');
130+
} else if (Array.isArray(message)) {
131+
jsonString = Buffer.concat(message).toString('utf8');
132+
}
133+
return JSON.parse(jsonString);
80134
}

packages/bridge/src/handle-request.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import {
2828
} from '../../jsts/src/program/program.js';
2929
import { Linter } from '../../jsts/src/linter/linter.js';
3030
import { clearTypeScriptESLintParserCaches } from '../../jsts/src/parsers/eslint.js';
31-
import { BridgeRequest, RequestResult, serializeError } from './request.js';
31+
import { BridgeRequest, RequestResult, serializeError, WsIncrementalResult } from './request.js';
3232
import { WorkerData } from '../../shared/src/helpers/worker.js';
3333

3434
export async function handleRequest(
3535
request: BridgeRequest,
3636
workerData: WorkerData,
37+
incrementalResultsChannel?: (result: WsIncrementalResult) => void,
3738
): Promise<RequestResult> {
3839
try {
3940
switch (request.type) {
@@ -101,7 +102,7 @@ export async function handleRequest(
101102
}
102103
case 'on-analyze-project': {
103104
logHeapStatistics(workerData?.debugMemory);
104-
const output = await analyzeProject(request.data);
105+
const output = await analyzeProject(request.data, incrementalResultsChannel);
105106
logHeapStatistics(workerData?.debugMemory);
106107
return { type: 'success', result: output };
107108
}

packages/bridge/src/request.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
* along with this program; if not, see https://sonarsource.com/license/ssal/
1616
*/
1717
import { AnalysisOutput } from '../../shared/src/types/analysis.js';
18-
import { ProjectAnalysisInput } from '../../jsts/src/analysis/projectAnalysis/projectAnalysis.js';
18+
import {
19+
type FileResult,
20+
ProjectAnalysisInput,
21+
ProjectAnalysisMeta,
22+
} from '../../jsts/src/analysis/projectAnalysis/projectAnalysis.js';
1923
import { TsConfigJson } from 'type-fest';
2024
import { RuleConfig } from '../../jsts/src/linter/config/rule-config.js';
2125
import { APIError, ErrorCode } from '../../shared/src/errors/error.js';
@@ -34,6 +38,10 @@ export type RequestResult =
3438
error: ReturnType<typeof serializeError>;
3539
};
3640

41+
export type WsMetaResult = { messageType: 'meta' } & ProjectAnalysisMeta;
42+
export type WsFileResult = { filename: string; messageType: 'fileResult' } & FileResult;
43+
export type WsIncrementalResult = WsFileResult | WsMetaResult;
44+
3745
export type Telemetry = {
3846
dependencies: NamedDependency[];
3947
};

packages/bridge/src/router.ts

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,33 @@
1616
*/
1717
import * as express from 'express';
1818
import { Worker } from 'worker_threads';
19-
import { createDelegator } from './delegate.js';
19+
import { createDelegator, createWsDelegator } from './delegate.js';
2020
import { WorkerData } from '../../shared/src/helpers/worker.js';
2121
import { StatusCodes } from 'http-status-codes';
22+
import { WebSocketServer } from 'ws';
2223

23-
export default function (worker: Worker | undefined, workerData: WorkerData): express.Router {
24-
const router = express.Router();
25-
const delegate = createDelegator(worker, workerData);
24+
export type WorkerMessageListeners = {
25+
permanent: ((message: any) => void)[];
26+
oneTimers: ((message: any) => void)[];
27+
};
28+
29+
export default function (
30+
worker: Worker | undefined,
31+
workerData: WorkerData,
32+
wss: WebSocketServer,
33+
): express.Router {
34+
const workerMessageListeners: WorkerMessageListeners = { permanent: [], oneTimers: [] };
35+
if (worker) {
36+
worker.on('message', message => {
37+
workerMessageListeners.permanent.forEach(listener => listener(message));
38+
workerMessageListeners.oneTimers.forEach(listener => listener(message));
39+
workerMessageListeners.oneTimers = [];
40+
});
41+
}
2642

43+
const router = express.Router();
44+
const delegate = createDelegator(worker, workerData, workerMessageListeners);
45+
const wsDelegate = createWsDelegator(worker, workerData, workerMessageListeners);
2746
/** Endpoints running on the worker thread */
2847
router.post('/analyze-project', delegate('on-analyze-project'));
2948
router.post('/analyze-css', delegate('on-analyze-css'));
@@ -38,6 +57,8 @@ export default function (worker: Worker | undefined, workerData: WorkerData): ex
3857
router.post('/tsconfig-files', delegate('on-tsconfig-files'));
3958
router.get('/get-telemetry', delegate('on-get-telemetry'));
4059

60+
wss.on('connection', wsDelegate);
61+
4162
/** Endpoints running on the main thread */
4263
router.get('/status', (_, response) => {
4364
response.sendStatus(StatusCodes.OK);

packages/bridge/src/server.ts

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
logMemoryConfiguration,
3232
logMemoryError,
3333
} from './memory.js';
34+
import { WebSocketServer } from 'ws';
3435

3536
/**
3637
* The maximum request body size
@@ -99,6 +100,18 @@ export function start(
99100

100101
const app = express();
101102
const server = http.createServer(app);
103+
const wss = new WebSocketServer({ noServer: true });
104+
105+
server.on('upgrade', (request, socket, head) => {
106+
// Only handle upgrade requests for /ws
107+
if (request.headers.upgrade?.toLowerCase() === 'websocket' && request.url === '/ws') {
108+
wss.handleUpgrade(request, socket, head, ws => {
109+
wss.emit('connection', ws, request);
110+
});
111+
} else {
112+
socket.destroy();
113+
}
114+
});
102115

103116
/**
104117
* Builds a timeout middleware to shut down the server
@@ -112,7 +125,7 @@ export function start(
112125
*/
113126
app.use(express.json({ limit: MAX_REQUEST_SIZE }));
114127
app.use(orphanTimeout.middleware);
115-
app.use(router(worker, { debugMemory }));
128+
app.use(router(worker, { debugMemory }, wss));
116129
app.use(errorMiddleware);
117130

118131
app.post('/close', (_: express.Request, response: express.Response) => {
@@ -157,20 +170,27 @@ export function start(
157170
* Shutdown the server and the worker thread
158171
*/
159172
function closeServer() {
160-
unregisterGarbageCollectionObserver();
161-
if (server.listening) {
162-
while (pendingCloseRequests.length) {
163-
pendingCloseRequests.pop()?.end();
173+
debug('Closing server');
174+
wss.clients.forEach(client => {
175+
client.terminate(); // Immediately destroys the connection
176+
});
177+
wss.close(() => {
178+
debug('Closed WebSocket connection');
179+
unregisterGarbageCollectionObserver();
180+
if (server.listening) {
181+
while (pendingCloseRequests.length) {
182+
pendingCloseRequests.pop()?.end();
183+
}
184+
/**
185+
* At this point, the worker thread can no longer respond to any request from the plugin.
186+
* If we reached this due to worker failure, existing requests are stalled until they time out.
187+
* Since the bridge server is about to be shut down in an unexpected manner anyway, we can
188+
* close all connections and avoid waiting unnecessarily for them to eventually close.
189+
*/
190+
server.closeAllConnections();
191+
server.close();
164192
}
165-
/**
166-
* At this point, the worker thread can no longer respond to any request from the plugin.
167-
* If we reached this due to worker failure, existing requests are stalled until they time out.
168-
* Since the bridge server is about to be shut down in an unexpected manner anyway, we can
169-
* close all connections and avoid waiting unnecessarily for them to eventually close.
170-
*/
171-
server.closeAllConnections();
172-
server.close();
173-
}
193+
});
174194
}
175195
});
176196
}

packages/bridge/src/worker.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,26 @@
1616
*/
1717
import { parentPort, workerData } from 'worker_threads';
1818
import { handleRequest } from './handle-request.js';
19-
import { BridgeRequest } from './request.js';
19+
import { BridgeRequest, WsIncrementalResult } from './request.js';
2020

2121
/**
2222
* Code executed by the worker thread
2323
*/
2424
if (parentPort) {
2525
const parentThread = parentPort;
26-
parentThread.on('message', async (message: BridgeRequest | { type: 'close' }) => {
27-
const { type } = message;
28-
if (type === 'close') {
29-
parentThread.close();
30-
} else {
31-
parentThread.postMessage(await handleRequest(message, workerData));
32-
}
33-
});
26+
parentThread.on(
27+
'message',
28+
async (message: (BridgeRequest | { type: 'close' }) & { ws?: boolean }) => {
29+
const { type, ws } = message;
30+
if (type === 'close') {
31+
parentThread.close();
32+
} else if (ws) {
33+
await handleRequest(message, workerData, (results: WsIncrementalResult) =>
34+
parentThread.postMessage({ ws: true, results }),
35+
);
36+
} else {
37+
parentThread.postMessage(await handleRequest(message, workerData));
38+
}
39+
},
40+
);
3441
}

packages/bridge/tests/router.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ describe('router', () => {
4141
let server: http.Server;
4242

4343
before(async () => {
44-
const worker = createWorker(workerPath);
44+
const worker = await createWorker(workerPath);
4545
const { server: serverInstance, serverClosed } = await start(port, '127.0.0.1', worker);
4646
server = serverInstance;
4747
closePromise = serverClosed;

0 commit comments

Comments
 (0)