-
-
Notifications
You must be signed in to change notification settings - Fork 756
fix: avoid premature cleanup of dispatcher in Agent #5034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
e4dd6c8
759edcf
3f057c7
f0927a8
6e2a7da
5d75f78
d897e6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| 'use strict' | ||
|
|
||
| const { InvalidArgumentError, MaxOriginsReachedError } = require('../core/errors') | ||
| const { kClients, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols') | ||
| const { kBusy, kClients, kConnected, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols') | ||
| const DispatcherBase = require('./dispatcher-base') | ||
| const Pool = require('./pool') | ||
| const Client = require('./client') | ||
|
|
@@ -65,7 +65,7 @@ class Agent extends DispatcherBase { | |
|
|
||
| get [kRunning] () { | ||
| let ret = 0 | ||
| for (const { dispatcher } of this[kClients].values()) { | ||
| for (const dispatcher of this[kClients].values()) { | ||
| ret += dispatcher[kRunning] | ||
| } | ||
| return ret | ||
|
|
@@ -86,54 +86,52 @@ class Agent extends DispatcherBase { | |
| throw new MaxOriginsReachedError() | ||
| } | ||
|
|
||
| const result = this[kClients].get(key) | ||
| let dispatcher = result && result.dispatcher | ||
| let dispatcher = this[kClients].get(key) | ||
| if (!dispatcher) { | ||
| const closeClientIfUnused = (connected) => { | ||
| const result = this[kClients].get(key) | ||
| if (result) { | ||
| if (connected) result.count -= 1 | ||
| if (result.count <= 0) { | ||
| this[kClients].delete(key) | ||
| if (!result.dispatcher.destroyed) { | ||
| result.dispatcher.close() | ||
| } | ||
| } | ||
| dispatcher = this[kFactory](opts.origin, allowH2 === false | ||
| ? { ...this[kOptions], allowH2: false } | ||
| : this[kOptions]) | ||
|
|
||
| let hasOrigin = false | ||
| for (const entry of this[kClients].values()) { | ||
| if (entry.origin === origin) { | ||
| hasOrigin = true | ||
| break | ||
| } | ||
| } | ||
| const closeClientIfUnused = () => { | ||
| if (this[kClients].get(key) !== dispatcher) { | ||
| return | ||
| } | ||
|
|
||
| if (dispatcher[kConnected] > 0 || dispatcher[kBusy]) { | ||
| return | ||
| } | ||
|
|
||
| this[kClients].delete(key) | ||
| if (!dispatcher.destroyed) { | ||
| dispatcher.close() | ||
| } | ||
|
|
||
| if (!hasOrigin) { | ||
| this[kOrigins].delete(origin) | ||
| let hasOrigin = false | ||
| for (const client of this[kClients].values()) { | ||
| if (client[kUrl].origin === dispatcher[kUrl].origin) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. slightly updated logic here, again to reduce bookkeeping -- access origin from |
||
| hasOrigin = true | ||
| break | ||
| } | ||
| } | ||
|
|
||
| if (!hasOrigin) { | ||
| this[kOrigins].delete(dispatcher[kUrl].origin) | ||
| } | ||
| } | ||
| dispatcher = this[kFactory](opts.origin, allowH2 === false | ||
| ? { ...this[kOptions], allowH2: false } | ||
| : this[kOptions]) | ||
|
|
||
| dispatcher | ||
| .on('drain', this[kOnDrain]) | ||
| .on('connect', (origin, targets) => { | ||
| const result = this[kClients].get(key) | ||
| if (result) { | ||
| result.count += 1 | ||
| } | ||
| this[kOnConnect](origin, targets) | ||
| }) | ||
| .on('connect', this[kOnConnect]) | ||
| .on('disconnect', (origin, targets, err) => { | ||
| closeClientIfUnused(true) | ||
| closeClientIfUnused() | ||
| this[kOnDisconnect](origin, targets, err) | ||
| }) | ||
| .on('connectionError', (origin, targets, err) => { | ||
| closeClientIfUnused(false) | ||
| closeClientIfUnused() | ||
| this[kOnConnectionError](origin, targets, err) | ||
| }) | ||
|
|
||
| this[kClients].set(key, { count: 0, dispatcher, origin }) | ||
| this[kClients].set(key, dispatcher) | ||
| this[kOrigins].add(origin) | ||
| } | ||
|
|
||
|
|
@@ -142,7 +140,7 @@ class Agent extends DispatcherBase { | |
|
|
||
| [kClose] () { | ||
| const closePromises = [] | ||
| for (const { dispatcher } of this[kClients].values()) { | ||
| for (const dispatcher of this[kClients].values()) { | ||
| closePromises.push(dispatcher.close()) | ||
| } | ||
| this[kClients].clear() | ||
|
|
@@ -152,7 +150,7 @@ class Agent extends DispatcherBase { | |
|
|
||
| [kDestroy] (err) { | ||
| const destroyPromises = [] | ||
| for (const { dispatcher } of this[kClients].values()) { | ||
| for (const dispatcher of this[kClients].values()) { | ||
| destroyPromises.push(dispatcher.destroy(err)) | ||
| } | ||
| this[kClients].clear() | ||
|
|
@@ -162,7 +160,7 @@ class Agent extends DispatcherBase { | |
|
|
||
| get stats () { | ||
| const allClientStats = {} | ||
| for (const { dispatcher } of this[kClients].values()) { | ||
| for (const dispatcher of this[kClients].values()) { | ||
| if (dispatcher.stats) { | ||
| allClientStats[dispatcher[kUrl].origin] = dispatcher.stats | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1106,7 +1106,7 @@ function writeH1 (client, request) { | |
| socket[kReset] = reset | ||
| } | ||
|
|
||
| if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) { | ||
| if (client[kMaxRequests] && ++socket[kCounter] >= client[kMaxRequests]) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That change is covered by your tests?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it is covered by the assertion here - https://github.com/nodejs/undici/pull/5034/changes#diff-aa5d6e637dbf3a57f70e146170bcc8917253e98f0885ff7d1d1875936a60255eR116-R117 but I fixed some assertions in |
||
| socket[kReset] = true | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| const { test, describe } = require('node:test') | ||
| const assert = require('node:assert') | ||
| const { createServer } = require('node:http') | ||
| const { request, Agent, Pool } = require('..') | ||
|
|
||
| // https://github.com/nodejs/undici/issues/4424 | ||
| describe('Agent should close inactive clients', () => { | ||
| test('without active connections', async (t) => { | ||
| const server = createServer({ keepAliveTimeout: 0 }, async (_req, res) => { | ||
| res.setHeader('connection', 'close') | ||
| res.writeHead(200) | ||
| res.end('ok') | ||
| }).listen(0) | ||
|
|
||
| t.after(() => { | ||
| server.closeAllConnections?.() | ||
| server.close() | ||
| }) | ||
|
|
||
| let p | ||
| const agent = new Agent({ | ||
| factory: (origin, opts) => { | ||
| const pool = new Pool(origin, opts) | ||
| let _resolve, _reject | ||
| p = new Promise((resolve, reject) => { | ||
| _resolve = resolve | ||
| _reject = reject | ||
| }) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These were existing tests from |
||
| pool.on('disconnect', () => { | ||
| setImmediate(() => pool.destroyed ? _resolve() : _reject(new Error('client not destroyed'))) | ||
| }) | ||
| return pool | ||
| } | ||
| }) | ||
| const { statusCode } = await request(`http://localhost:${server.address().port}`, { dispatcher: agent }) | ||
| assert.equal(statusCode, 200) | ||
|
|
||
| await p | ||
| }) | ||
|
|
||
| test('in case of connection error', async (t) => { | ||
| let p | ||
| const agent = new Agent({ | ||
| factory: (origin, opts) => { | ||
| const pool = new Pool(origin, opts) | ||
| let _resolve, _reject | ||
| p = new Promise((resolve, reject) => { | ||
| _resolve = resolve | ||
| _reject = reject | ||
| }) | ||
| pool.on('connectionError', () => { | ||
| setImmediate(() => pool.destroyed ? _resolve() : _reject(new Error('client not destroyed'))) | ||
| }) | ||
| return pool | ||
| } | ||
| }) | ||
| try { | ||
| await request('http://localhost:0', { dispatcher: agent }) | ||
| } catch (_) { | ||
| // ignore | ||
| } | ||
|
|
||
| await p | ||
| }) | ||
| }) | ||
|
|
||
| // https://github.com/nodejs/undici/issues/5022 | ||
| describe('Agent should not close active clients', () => { | ||
| test('should reuse replacement keep-alive connection after server closes the previous one', async (t) => { | ||
| let nextSocketId = 0 | ||
| const socketIds = new Map() | ||
| const requestsPerSocket = new Map() | ||
|
|
||
| const server = createServer((req, res) => { | ||
| const socket = req.socket | ||
| if (!socketIds.has(socket)) { | ||
| socketIds.set(socket, ++nextSocketId) | ||
| } | ||
|
|
||
| const count = (requestsPerSocket.get(socket) || 0) + 1 | ||
| requestsPerSocket.set(socket, count) | ||
|
|
||
| const remaining = 3 - count | ||
| res.setHeader('x-socket-id', String(socketIds.get(socket))) | ||
|
|
||
| if (remaining > 0) { | ||
| res.setHeader('connection', 'Keep-Alive') | ||
| res.setHeader('keep-alive', `timeout=30, max=${remaining}`) | ||
| } else { | ||
| res.setHeader('connection', 'close') | ||
| } | ||
|
|
||
| res.writeHead(200) | ||
| res.end('ok') | ||
| }).listen(0) | ||
|
|
||
| t.after(() => { | ||
| server.closeAllConnections?.() | ||
| server.close() | ||
| }) | ||
|
|
||
| const agent = new Agent({ connections: 1 }) | ||
| t.after(() => agent.close()) | ||
|
|
||
| const socketSequence = [] | ||
| for (let i = 0; i < 5; i++) { | ||
| const { statusCode, headers, body } = await request(`http://localhost:${server.address().port}`, { | ||
| dispatcher: agent | ||
| }) | ||
|
|
||
| assert.equal(statusCode, 200) | ||
| await body.dump() | ||
| socketSequence.push(headers['x-socket-id']) | ||
| } | ||
|
|
||
| assert.deepEqual(socketSequence.slice(0, 3), ['1', '1', '1']) | ||
| assert.deepEqual(socketSequence.slice(3), ['2', '2']) | ||
| }) | ||
|
|
||
| test('should reuse replacement connection after keep-alive max closes the previous one', async (t) => { | ||
| let nextSocketId = 0 | ||
| const socketIds = new Map() | ||
|
|
||
| const server = createServer((req, res) => { | ||
| const socket = req.socket | ||
| if (!socketIds.has(socket)) { | ||
| socketIds.set(socket, ++nextSocketId) | ||
| } | ||
|
|
||
| res.setHeader('x-socket-id', String(socketIds.get(socket))) | ||
| res.setHeader('connection', 'Keep-Alive') | ||
| res.setHeader('keep-alive', 'timeout=30') | ||
|
|
||
| res.writeHead(200) | ||
| res.end('ok') | ||
| }).listen(0) | ||
|
|
||
| t.after(() => { | ||
| server.closeAllConnections?.() | ||
| server.close() | ||
| }) | ||
|
|
||
| const agent = new Agent({ connections: 1, maxRequestsPerClient: 3 }) | ||
| t.after(() => agent.close()) | ||
|
|
||
| const socketSequence = [] | ||
| for (let i = 0; i < 5; i++) { | ||
| const { statusCode, headers, body } = await request(`http://localhost:${server.address().port}`, { | ||
| dispatcher: agent | ||
| }) | ||
|
|
||
| assert.equal(statusCode, 200) | ||
| await body.dump() | ||
| socketSequence.push(headers['x-socket-id']) | ||
| } | ||
|
|
||
| assert.deepEqual(socketSequence.slice(0, 3), ['1', '1', '1']) | ||
| assert.deepEqual(socketSequence.slice(3), ['2', '2']) | ||
| }) | ||
| }) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key difference is here - we check active connections as well as
kBusy:undici/lib/dispatcher/client.js
Lines 326 to 332 in bc0a19c
If we have either active connections or the dispatcher is busy, we don't close it.
This lets us clean up the connection-tracking bookkeeping and just use information already in the dispatcher.