Skip to content

Commit 6824eed

Browse files
authored
chore: use native addAbortListener (#5021)
Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com>
1 parent d64e7bb commit 6824eed

File tree

5 files changed

+85
-6
lines changed

5 files changed

+85
-6
lines changed

lib/api/readable.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict'
22

33
const assert = require('node:assert')
4+
const { addAbortListener } = require('node:events')
45
const { Readable } = require('node:stream')
56
const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors')
67
const util = require('../core/util')
@@ -293,10 +294,10 @@ class BodyReadable extends Readable {
293294
const onAbort = () => {
294295
this.destroy(signal.reason ?? new AbortError())
295296
}
296-
signal.addEventListener('abort', onAbort)
297+
const abortListener = addAbortListener(signal, onAbort)
297298
this
298299
.on('close', function () {
299-
signal.removeEventListener('abort', onAbort)
300+
abortListener[Symbol.dispose]()
300301
if (signal.aborted) {
301302
reject(signal.reason ?? new AbortError())
302303
} else {

lib/core/util.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const { IncomingMessage } = require('node:http')
66
const stream = require('node:stream')
77
const net = require('node:net')
88
const { stringify } = require('node:querystring')
9-
const { EventEmitter: EE } = require('node:events')
9+
const { EventEmitter: EE, addAbortListener: addAbortListenerNative } = require('node:events')
1010
const timers = require('../util/timers')
1111
const { InvalidArgumentError, ConnectTimeoutError } = require('./errors')
1212
const { headerNameLowerCasedRecord } = require('./constants')
@@ -678,7 +678,12 @@ function isFormDataLike (object) {
678678
}
679679

680680
function addAbortListener (signal, listener) {
681-
if ('addEventListener' in signal) {
681+
if (signal instanceof AbortSignal) {
682+
const disposable = addAbortListenerNative(signal, listener)
683+
return () => disposable[Symbol.dispose]()
684+
}
685+
686+
if (typeof signal.addEventListener === 'function') {
682687
signal.addEventListener('abort', listener, { once: true })
683688
return () => signal.removeEventListener('abort', listener)
684689
}

lib/web/websocket/stream/websocketstream.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict'
22

3+
const { addAbortListener } = require('node:events')
34
const { environmentSettingsObject } = require('../../fetch/util')
45
const { states, opcodes, sentCloseFrameState } = require('../constants')
56
const { webidl } = require('../../webidl')
@@ -132,7 +133,7 @@ class WebSocketStream {
132133
}
133134

134135
// 8.3. Add the following abort steps to signal :
135-
signal.addEventListener('abort', () => {
136+
addAbortListener(signal, () => {
136137
// 8.3.1. If the WebSocket connection is not yet established : [WSP]
137138
if (!isEstablished(this.#handler.readyState)) {
138139
// 8.3.1.1. Fail the WebSocket connection .
@@ -148,7 +149,7 @@ class WebSocketStream {
148149
// Set this 's handshake aborted to true.
149150
this.#handshakeAborted = true
150151
}
151-
}, { once: true })
152+
})
152153
}
153154

154155
// 9. Let client be this 's relevant settings object .

test/node-test/util.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,35 @@ test('isStream', () => {
2020
assert.ok(util.isStream(ee) === false)
2121
})
2222

23+
test('addAbortListener supports AbortSignal', async () => {
24+
const ac = new AbortController()
25+
let calls = 0
26+
27+
util.addAbortListener(ac.signal, () => {
28+
calls++
29+
})
30+
31+
ac.abort()
32+
await new Promise((resolve) => setImmediate(resolve))
33+
34+
assert.equal(calls, 1)
35+
})
36+
37+
test('addAbortListener removes native AbortSignal listener', async () => {
38+
const ac = new AbortController()
39+
let calls = 0
40+
41+
const remove = util.addAbortListener(ac.signal, () => {
42+
calls++
43+
})
44+
45+
remove()
46+
ac.abort()
47+
await new Promise((resolve) => setImmediate(resolve))
48+
49+
assert.equal(calls, 0)
50+
})
51+
2352
test('getServerName', () => {
2453
assert.equal(util.getServerName('1.1.1.1'), '')
2554
assert.equal(util.getServerName('1.1.1.1:443'), '')
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
'use strict'
2+
3+
const { test } = require('node:test')
4+
const { createServer } = require('node:http')
5+
6+
const { WebSocketStream } = require('../../..')
7+
8+
test('WebSocketStream aborts before handshake completes', async (t) => {
9+
const sockets = new Set()
10+
const server = createServer()
11+
12+
server.on('upgrade', (req, socket) => {
13+
sockets.add(socket)
14+
socket.on('close', () => sockets.delete(socket))
15+
})
16+
17+
await new Promise((resolve) => server.listen(0, resolve))
18+
19+
t.after(async () => {
20+
for (const socket of sockets) {
21+
socket.destroy()
22+
}
23+
24+
await new Promise((resolve) => server.close(resolve))
25+
})
26+
27+
const ac = new AbortController()
28+
const wss = new WebSocketStream(`ws://localhost:${server.address().port}`, {
29+
signal: ac.signal
30+
})
31+
32+
ac.abort(new Error('abort before open'))
33+
34+
const [opened, closed] = await Promise.allSettled([wss.opened, wss.closed])
35+
36+
t.assert.strictEqual(opened.status, 'rejected')
37+
t.assert.strictEqual(opened.reason.name, 'WebSocketError')
38+
t.assert.strictEqual(opened.reason.message, 'Socket never opened')
39+
40+
t.assert.strictEqual(closed.status, 'rejected')
41+
t.assert.strictEqual(closed.reason.name, 'WebSocketError')
42+
t.assert.strictEqual(closed.reason.message, 'unclean close')
43+
})

0 commit comments

Comments
 (0)