-
-
Notifications
You must be signed in to change notification settings - Fork 757
fix: avoid premature cleanup of dispatcher in Agent #5034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
e4dd6c8
759edcf
3f057c7
f0927a8
6e2a7da
5d75f78
d897e6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| 'use strict' | ||
|
|
||
| const { InvalidArgumentError, MaxOriginsReachedError } = require('../core/errors') | ||
| const { kClients, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols') | ||
| const { kBusy, kClients, kConnected, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols') | ||
| const DispatcherBase = require('./dispatcher-base') | ||
| const Pool = require('./pool') | ||
| const Client = require('./client') | ||
|
|
@@ -65,7 +65,7 @@ class Agent extends DispatcherBase { | |
|
|
||
| get [kRunning] () { | ||
| let ret = 0 | ||
| for (const { dispatcher } of this[kClients].values()) { | ||
| for (const dispatcher of this[kClients].values()) { | ||
| ret += dispatcher[kRunning] | ||
| } | ||
| return ret | ||
|
|
@@ -86,54 +86,52 @@ class Agent extends DispatcherBase { | |
| throw new MaxOriginsReachedError() | ||
| } | ||
|
|
||
| const result = this[kClients].get(key) | ||
| let dispatcher = result && result.dispatcher | ||
| let dispatcher = this[kClients].get(key) | ||
| if (!dispatcher) { | ||
| const closeClientIfUnused = (connected) => { | ||
| const result = this[kClients].get(key) | ||
| if (result) { | ||
| if (connected) result.count -= 1 | ||
| if (result.count <= 0) { | ||
| this[kClients].delete(key) | ||
| if (!result.dispatcher.destroyed) { | ||
| result.dispatcher.close() | ||
| } | ||
| } | ||
| dispatcher = this[kFactory](opts.origin, allowH2 === false | ||
| ? { ...this[kOptions], allowH2: false } | ||
| : this[kOptions]) | ||
|
|
||
| let hasOrigin = false | ||
| for (const entry of this[kClients].values()) { | ||
| if (entry.origin === origin) { | ||
| hasOrigin = true | ||
| break | ||
| } | ||
| } | ||
| const closeClientIfUnused = () => { | ||
| if (this[kClients].get(key) !== dispatcher) { | ||
| return | ||
| } | ||
|
|
||
| if (dispatcher[kConnected] > 0 || dispatcher[kBusy]) { | ||
| return | ||
| } | ||
|
|
||
| this[kClients].delete(key) | ||
| if (!dispatcher.destroyed) { | ||
| dispatcher.close() | ||
| } | ||
|
|
||
| if (!hasOrigin) { | ||
| this[kOrigins].delete(origin) | ||
| let hasOrigin = false | ||
| for (const client of this[kClients].values()) { | ||
| if (client[kUrl].origin === dispatcher[kUrl].origin) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. slightly updated logic here, again to reduce bookkeeping -- access origin from |
||
| hasOrigin = true | ||
| break | ||
| } | ||
| } | ||
|
|
||
| if (!hasOrigin) { | ||
| this[kOrigins].delete(dispatcher[kUrl].origin) | ||
| } | ||
| } | ||
| dispatcher = this[kFactory](opts.origin, allowH2 === false | ||
| ? { ...this[kOptions], allowH2: false } | ||
| : this[kOptions]) | ||
|
|
||
| dispatcher | ||
| .on('drain', this[kOnDrain]) | ||
| .on('connect', (origin, targets) => { | ||
| const result = this[kClients].get(key) | ||
| if (result) { | ||
| result.count += 1 | ||
| } | ||
| this[kOnConnect](origin, targets) | ||
| }) | ||
| .on('connect', this[kOnConnect]) | ||
| .on('disconnect', (origin, targets, err) => { | ||
| closeClientIfUnused(true) | ||
| closeClientIfUnused() | ||
| this[kOnDisconnect](origin, targets, err) | ||
| }) | ||
| .on('connectionError', (origin, targets, err) => { | ||
| closeClientIfUnused(false) | ||
| closeClientIfUnused() | ||
| this[kOnConnectionError](origin, targets, err) | ||
| }) | ||
|
|
||
| this[kClients].set(key, { count: 0, dispatcher, origin }) | ||
| this[kClients].set(key, dispatcher) | ||
| this[kOrigins].add(origin) | ||
| } | ||
|
|
||
|
|
@@ -142,7 +140,7 @@ class Agent extends DispatcherBase { | |
|
|
||
| [kClose] () { | ||
| const closePromises = [] | ||
| for (const { dispatcher } of this[kClients].values()) { | ||
| for (const dispatcher of this[kClients].values()) { | ||
| closePromises.push(dispatcher.close()) | ||
| } | ||
| this[kClients].clear() | ||
|
|
@@ -152,7 +150,7 @@ class Agent extends DispatcherBase { | |
|
|
||
| [kDestroy] (err) { | ||
| const destroyPromises = [] | ||
| for (const { dispatcher } of this[kClients].values()) { | ||
| for (const dispatcher of this[kClients].values()) { | ||
| destroyPromises.push(dispatcher.destroy(err)) | ||
| } | ||
| this[kClients].clear() | ||
|
|
@@ -162,7 +160,7 @@ class Agent extends DispatcherBase { | |
|
|
||
| get stats () { | ||
| const allClientStats = {} | ||
| for (const { dispatcher } of this[kClients].values()) { | ||
| for (const dispatcher of this[kClients].values()) { | ||
| if (dispatcher.stats) { | ||
| allClientStats[dispatcher[kUrl].origin] = dispatcher.stats | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1106,7 +1106,7 @@ function writeH1 (client, request) { | |
| socket[kReset] = reset | ||
| } | ||
|
|
||
| if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) { | ||
| if (client[kMaxRequests] && ++socket[kCounter] >= client[kMaxRequests]) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That change is covered by your tests?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it is covered by the assertion here - https://github.com/nodejs/undici/pull/5034/changes#diff-aa5d6e637dbf3a57f70e146170bcc8917253e98f0885ff7d1d1875936a60255eR116-R117 but I fixed some assertions in |
||
| socket[kReset] = true | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key difference is here - we check active connections as well as
kBusy:undici/lib/dispatcher/client.js
Lines 326 to 332 in bc0a19c
If we have either active connections or the dispatcher is busy, we don't close it.
This lets us clean up the connection-tracking bookkeeping and just use information already in the dispatcher.