Skip to content

Commit 5631c02

Browse files
committed
Fix pipelining edge cases and add benchmark
- Clean up submittedNamedStatements on error in _handleErrorMessage to prevent stale entries from blocking future re-preparation of the same named statement after a parse failure - Guard _pulsePipelinedQueryQueue against non-queryable connections - Fix cancel() and readTimeout for sent queries: removing an already-sent query from _sentQueryQueue corrupts the pipeline response mapping since the server will still respond to it; no-op the callback instead - Add bench-pipelining.js comparing serial vs pipelined throughput
1 parent 9dcc82c commit 5631c02

File tree

2 files changed

+112
-8
lines changed

2 files changed

+112
-8
lines changed

packages/pg/bench-pipelining.js

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
'use strict'
2+
const pg = require('./lib')
3+
4+
// Queries to benchmark
5+
const SIMPLE = { text: 'SELECT 1' }
6+
const PARAM = {
7+
text: 'SELECT $1::int AS n',
8+
values: [42],
9+
}
10+
const NAMED = {
11+
name: 'bench-named',
12+
text: 'SELECT $1::int AS n',
13+
values: [42],
14+
}
15+
16+
async function bench(label, fn, seconds) {
17+
// warmup
18+
for (let i = 0; i < 100; i++) await fn()
19+
20+
const deadline = Date.now() + seconds * 1000
21+
let count = 0
22+
while (Date.now() < deadline) {
23+
await fn()
24+
count++
25+
}
26+
const qps = (count / seconds).toFixed(0)
27+
console.log(` ${label}: ${qps} qps (${count} queries in ${seconds}s)`)
28+
return count / seconds
29+
}
30+
31+
async function benchPipelined(label, makeQueries, batchSize, seconds) {
32+
const client = new pg.Client()
33+
await client.connect()
34+
client.pipelining = true
35+
36+
// warmup
37+
for (let i = 0; i < 10; i++) {
38+
await Promise.all(makeQueries(batchSize).map((q) => client.query(q)))
39+
}
40+
41+
const deadline = Date.now() + seconds * 1000
42+
let count = 0
43+
while (Date.now() < deadline) {
44+
const queries = makeQueries(batchSize)
45+
await Promise.all(queries.map((q) => client.query(q)))
46+
count += batchSize
47+
}
48+
const qps = (count / seconds).toFixed(0)
49+
console.log(` ${label} (batch=${batchSize}): ${qps} qps`)
50+
51+
await client.end()
52+
return count / seconds
53+
}
54+
55+
async function runSerial(label, query, seconds) {
56+
const client = new pg.Client()
57+
await client.connect()
58+
59+
const qps = await bench(label, () => client.query(query), seconds)
60+
await client.end()
61+
return qps
62+
}
63+
64+
async function run() {
65+
const SECONDS = 5
66+
const BATCH = 10
67+
68+
console.log('\n=== Serial (no pipelining) ===')
69+
const serialSimple = await runSerial('simple SELECT 1', SIMPLE, SECONDS)
70+
const serialParam = await runSerial('parameterized', PARAM, SECONDS)
71+
const serialNamed = await runSerial('named prepared', NAMED, SECONDS)
72+
73+
console.log('\n=== Pipelined ===')
74+
const pipedSimple = await benchPipelined(
75+
'simple SELECT 1',
76+
(n) => Array.from({ length: n }, () => ({ text: 'SELECT 1' })),
77+
BATCH,
78+
SECONDS
79+
)
80+
const pipedParam = await benchPipelined(
81+
'parameterized',
82+
(n) => Array.from({ length: n }, () => ({ text: 'SELECT $1::int AS n', values: [42] })),
83+
BATCH,
84+
SECONDS
85+
)
86+
const pipedNamed = await benchPipelined(
87+
'named prepared',
88+
(n) => Array.from({ length: n }, (_, i) => ({ name: `bench-named-${i}`, text: 'SELECT $1::int AS n', values: [42] })),
89+
BATCH,
90+
SECONDS
91+
)
92+
93+
console.log('\n=== Speedup ===')
94+
console.log(` simple: ${(pipedSimple / serialSimple).toFixed(2)}x`)
95+
console.log(` parameterized: ${(pipedParam / serialParam).toFixed(2)}x`)
96+
console.log(` named: ${(pipedNamed / serialNamed).toFixed(2)}x`)
97+
}
98+
99+
run().catch((e) => {
100+
console.error(e)
101+
process.exit(1)
102+
})

packages/pg/lib/client.js

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,9 @@ class Client extends EventEmitter {
415415
}
416416

417417
this._activeQuery = null
418+
if (activeQuery.name) {
419+
delete this.connection.submittedNamedStatements[activeQuery.name]
420+
}
418421
activeQuery.handleError(msg, this.connection)
419422
}
420423

@@ -565,7 +568,9 @@ class Client extends EventEmitter {
565568
} else if (client._queryQueue.indexOf(query) !== -1) {
566569
client._queryQueue.splice(client._queryQueue.indexOf(query), 1)
567570
} else if (client._sentQueryQueue.indexOf(query) !== -1) {
568-
client._sentQueryQueue.splice(client._sentQueryQueue.indexOf(query), 1)
571+
// Query already sent on wire — can't remove it without corrupting the
572+
// pipeline. No-op the callback so the result is silently discarded.
573+
query.callback = () => {}
569574
}
570575
}
571576

@@ -616,7 +621,7 @@ class Client extends EventEmitter {
616621
}
617622

618623
_pulsePipelinedQueryQueue() {
619-
if (!this._connected) {
624+
if (!this._connected || !this._queryable) {
620625
return
621626
}
622627
while (this._queryQueue.length > 0) {
@@ -691,16 +696,13 @@ class Client extends EventEmitter {
691696
// just do nothing if query completes
692697
query.callback = () => {}
693698

694-
// Remove from queue
699+
// Remove from queue (only safe if not yet sent)
695700
const index = this._queryQueue.indexOf(query)
696701
if (index > -1) {
697702
this._queryQueue.splice(index, 1)
698-
} else if (this.pipelining) {
699-
const sentIndex = this._sentQueryQueue.indexOf(query)
700-
if (sentIndex > -1) {
701-
this._sentQueryQueue.splice(sentIndex, 1)
702-
}
703703
}
704+
// If already sent on the wire in pipelining mode, we can't remove it
705+
// without corrupting the pipeline — the callback was already no-op'd above
704706

705707
this._pulseQueryQueue()
706708
}, readTimeout)

0 commit comments

Comments
 (0)