Skip to content

Commit 2ee5bf4

Browse files
committed
Fix pipelining activation race and add edge-case tests
Gate _sentQueryQueue activation on readyForQuery=true inside _pulsePipelinedQueryQueue (and remove the redundant promotion block from _handleReadyForQuery) to eliminate the microtask/macrotask race where the next query could be activated as _activeQuery before the error's ReadyForQuery arrived, causing that RFQ to be handled by the wrong query. Also adds the error-listener fix for the query_timeout integration test so the expected stream-destroy doesn't leak as an unhandled 'error'.
1 parent 5631c02 commit 2ee5bf4

File tree

4 files changed

+148
-10
lines changed

4 files changed

+148
-10
lines changed

packages/pg/lib/client.js

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -368,10 +368,6 @@ class Client extends EventEmitter {
368368
if (activeQuery) {
369369
activeQuery.handleReadyForQuery(this.connection)
370370
}
371-
if (this.pipelining && this._sentQueryQueue.length > 0) {
372-
this._activeQuery = this._sentQueryQueue.shift()
373-
this.readyForQuery = false
374-
}
375371
this._pulseQueryQueue()
376372
}
377373

@@ -636,7 +632,7 @@ class Client extends EventEmitter {
636632
}
637633
this._sentQueryQueue.push(query)
638634
}
639-
if (!this._activeQuery && this._sentQueryQueue.length > 0) {
635+
if (this.readyForQuery && !this._activeQuery && this._sentQueryQueue.length > 0) {
640636
this._activeQuery = this._sentQueryQueue.shift()
641637
this.readyForQuery = false
642638
}
@@ -700,9 +696,12 @@ class Client extends EventEmitter {
700696
const index = this._queryQueue.indexOf(query)
701697
if (index > -1) {
702698
this._queryQueue.splice(index, 1)
699+
} else if (this.pipelining) {
700+
// Query already sent — the pipeline is blocked until it completes.
701+
// Destroy the connection to unblock all remaining pipelined queries.
702+
this.connection.stream.destroy()
703+
return
703704
}
704-
// If already sent on the wire in pipelining mode, we can't remove it
705-
// without corrupting the pipeline — the callback was already no-op'd above
706705

707706
this._pulseQueryQueue()
708707
}, readTimeout)
@@ -763,9 +762,15 @@ class Client extends EventEmitter {
763762
}
764763
}
765764

766-
if (this._getActiveQuery() || this._sentQueryQueue.length > 0 || !this._queryable) {
767-
// if we have an active query we need to force a disconnect
768-
// on the socket - otherwise a hung query could block end forever
765+
if (!this._queryable) {
766+
// socket is dead — force close
767+
this.connection.stream.destroy()
768+
} else if (this.pipelining && (this._getActiveQuery() || this._sentQueryQueue.length > 0 || this._queryQueue.length > 0)) {
769+
// pipelined queries are already on the wire (or queued to send) and will
770+
// complete normally; wait for drain then do a graceful goodbye
771+
this.once('drain', () => this.connection.end())
772+
} else if (this._getActiveQuery()) {
773+
// non-pipelining: a hung query could block end forever — force disconnect
769774
this.connection.stream.destroy()
770775
} else {
771776
this.connection.end()

packages/pg/test/integration/client/pipelining-tests.js

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,67 @@ suite.test('pipelining drain event', async function () {
8686
await drainPromise
8787
await client.end()
8888
})
89+
90+
// #12: end() during active pipelining — should drain gracefully, not destroy
91+
suite.test('end() waits for in-flight pipelined queries to complete', async function () {
92+
const client = helper.client()
93+
client.pipelining = true
94+
95+
// Fire queries then call end() immediately without awaiting them
96+
const p1 = client.query('SELECT 1 AS num')
97+
const p2 = client.query('SELECT 2 AS num')
98+
const endPromise = client.end()
99+
100+
// All queries should resolve (not error) because end() drains gracefully
101+
const [r1, r2] = await Promise.all([p1, p2])
102+
assert.equal(r1.rows[0].num, 1)
103+
assert.equal(r2.rows[0].num, 2)
104+
await endPromise
105+
})
106+
107+
// #13: named statement error cleanup — submittedNamedStatements not left stale
108+
suite.test('named statement parse error cleans up and allows re-preparation', async function () {
109+
const client = helper.client()
110+
client.pipelining = true
111+
112+
// Use an invalid type to force a server-side parse error
113+
const err = await client
114+
.query({ name: 'bad-stmt', text: 'SELECT $1::nonexistent_type_xyz', values: [1] })
115+
.then(() => null)
116+
.catch((e) => e)
117+
118+
assert.ok(err, 'expected parse to fail')
119+
120+
// The stale submittedNamedStatements entry should be gone.
121+
// Re-using the same name with valid SQL should work.
122+
const result = await client.query({ name: 'bad-stmt', text: 'SELECT $1::int AS n', values: [42] })
123+
assert.equal(result.rows[0].n, 42)
124+
125+
await client.end()
126+
})
127+
128+
// #14: query_timeout with pipelining
129+
// When an already-sent pipelined query times out, the connection is destroyed
130+
// to unblock the pipeline — subsequent queries error rather than hanging.
131+
suite.test('query_timeout on sent pipelined query destroys connection to unblock', async function () {
132+
const client = helper.client()
133+
client.pipelining = true
134+
client.on('error', () => {}) // absorb the 'error' event emitted when stream is destroyed
135+
136+
const results = await Promise.allSettled([
137+
client.query('SELECT 1 AS num'),
138+
client.query({ text: 'SELECT pg_sleep(30)', query_timeout: 100 }),
139+
client.query('SELECT 3 AS num'),
140+
])
141+
142+
// Query 1 completes before the slow query enters the pipeline
143+
assert.equal(results[0].status, 'fulfilled')
144+
assert.equal(results[0].value.rows[0].num, 1)
145+
146+
// Query 2 times out
147+
assert.equal(results[1].status, 'rejected')
148+
assert.ok(results[1].reason.message.includes('timeout'), `unexpected error: ${results[1].reason.message}`)
149+
150+
// Query 3 errors because the connection was destroyed to unblock the pipeline
151+
assert.equal(results[2].status, 'rejected')
152+
})

packages/pg/test/unit/client/simple-query-tests.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,60 @@ test('executing query', function () {
172172
con.emit('readyForQuery')
173173
con.emit('readyForQuery')
174174
})
175+
176+
test('extended protocol: sends parse/bind/sync for each pipelined parameterized query', function () {
177+
const client = helper.client()
178+
client.pipelining = true
179+
const con = client.connection
180+
con.emit('readyForQuery')
181+
182+
client.query({ text: 'SELECT $1::int', values: [1] })
183+
client.query({ text: 'SELECT $1::int', values: [2] })
184+
185+
// both parse messages should have been sent immediately
186+
assert.lengthIs(con.parseMessages, 2)
187+
assert.equal(con.parseMessages[0].text, 'SELECT $1::int')
188+
assert.equal(con.parseMessages[1].text, 'SELECT $1::int')
189+
// both bind messages too
190+
assert.lengthIs(con.bindMessages, 2)
191+
// each query sends its own sync
192+
assert.equal(con.syncCount, 2)
193+
})
194+
195+
test('named statement: parse sent only once when pipelining the same name', function () {
196+
const client = helper.client()
197+
client.pipelining = true
198+
const con = client.connection
199+
con.emit('readyForQuery')
200+
201+
client.query({ name: 'my-stmt', text: 'SELECT $1::int', values: [1] })
202+
client.query({ name: 'my-stmt', text: 'SELECT $1::int', values: [2] })
203+
204+
// parse sent only once — second query reuses the submitted statement
205+
assert.lengthIs(con.parseMessages, 1)
206+
// both bind messages sent
207+
assert.lengthIs(con.bindMessages, 2)
208+
})
209+
210+
test('enabling pipelining while no queries are in flight', function () {
211+
const client = helper.client()
212+
const con = client.connection
213+
con.emit('readyForQuery')
214+
215+
// start non-pipelining
216+
assert.equal(client.pipelining, false)
217+
client.query('before')
218+
assert.lengthIs(con.queries, 1)
219+
220+
// simulate server responds
221+
con.emit('readyForQuery')
222+
223+
// now enable pipelining — should work for subsequent queries
224+
client.pipelining = true
225+
client.query('after-one')
226+
client.query('after-two')
227+
assert.lengthIs(con.queries, 3)
228+
})
175229
})
176230

177231
test('handles errors', function () {

packages/pg/test/unit/client/test-helper.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,22 @@ const makeClient = function (config) {
1010
connection.query = function (text) {
1111
this.queries.push(text)
1212
}
13+
connection.parse = function (msg) {
14+
this.parseMessages.push(msg)
15+
}
16+
connection.bind = function (msg) {
17+
this.bindMessages.push(msg)
18+
}
19+
connection.describe = function (msg) {}
20+
connection.execute = function (msg) {}
21+
connection.sync = function () {
22+
this.syncCount++
23+
}
24+
connection.flush = function () {}
1325
connection.queries = []
26+
connection.parseMessages = []
27+
connection.bindMessages = []
28+
connection.syncCount = 0
1429
const client = new Client({ connection: connection, ...config })
1530
client.connect()
1631
client.connection.emit('connect')

0 commit comments

Comments
 (0)