Skip to content

Commit 141d3bc

Browse files
committed
Fix pipeline mode to use extended query protocol and add JS vs native benchmarks
Pipeline mode requires sendQueryParams (extended query protocol), not sendQuery (simple query protocol). PostgreSQL rejects PQsendQuery in pipeline mode. Benchmark script now tests all four combinations: JS serial, JS pipelined, native serial, and native pipelined.
1 parent 8cb7e38 commit 141d3bc

File tree

2 files changed

+151
-37
lines changed

2 files changed

+151
-37
lines changed

packages/pg-native/index.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -341,10 +341,10 @@ Client.prototype.pipeline = function (queries, cb) {
341341
sent = pq.sendQueryPrepared(q.name, q.values || [])
342342
}
343343
}
344-
} else if (q.values) {
345-
sent = pq.sendQueryParams(q.text, q.values)
346344
} else {
347-
sent = pq.sendQuery(q.text)
345+
// In pipeline mode, simple query protocol (sendQuery) is not allowed.
346+
// Always use extended query protocol (sendQueryParams).
347+
sent = pq.sendQueryParams(q.text, q.values || [])
348348
}
349349

350350
if (!sent) {

packages/pg/bench-pipelining.js

Lines changed: 148 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
'use strict'
22
const pg = require('./lib')
33

4-
// Queries to benchmark
5-
const SIMPLE = { text: 'SELECT 1' }
6-
const PARAM = {
7-
text: 'SELECT $1::int AS n',
8-
values: [42],
9-
}
10-
const NAMED = {
11-
name: 'bench-named',
12-
text: 'SELECT $1::int AS n',
13-
values: [42],
4+
let Native
5+
try {
6+
Native = require('pg-native')
7+
} catch (e) {
8+
// pg-native not available — skip native benchmarks
149
}
1510

11+
const SECONDS = 5
12+
const BATCH = 10
13+
1614
async function bench(label, fn, seconds) {
1715
// warmup
1816
for (let i = 0; i < 100; i++) await fn()
@@ -28,7 +26,17 @@ async function bench(label, fn, seconds) {
2826
return count / seconds
2927
}
3028

31-
async function benchPipelined(label, makeQueries, batchSize, seconds) {
29+
// --- JS client helpers ---
30+
31+
async function jsSerial(label, query, seconds) {
32+
const client = new pg.Client()
33+
await client.connect()
34+
const qps = await bench(label, () => client.query(query), seconds)
35+
await client.end()
36+
return qps
37+
}
38+
39+
async function jsPipelined(label, makeQueries, batchSize, seconds) {
3240
const client = new pg.Client()
3341
await client.connect()
3442
client.pipelining = true
@@ -41,60 +49,166 @@ async function benchPipelined(label, makeQueries, batchSize, seconds) {
4149
const deadline = Date.now() + seconds * 1000
4250
let count = 0
4351
while (Date.now() < deadline) {
44-
const queries = makeQueries(batchSize)
45-
await Promise.all(queries.map((q) => client.query(q)))
52+
await Promise.all(makeQueries(batchSize).map((q) => client.query(q)))
4653
count += batchSize
4754
}
4855
const qps = (count / seconds).toFixed(0)
4956
console.log(` ${label} (batch=${batchSize}): ${qps} qps`)
50-
5157
await client.end()
5258
return count / seconds
5359
}
5460

55-
async function runSerial(label, query, seconds) {
56-
const client = new pg.Client()
57-
await client.connect()
61+
// --- Native client helpers ---
5862

59-
const qps = await bench(label, () => client.query(query), seconds)
60-
await client.end()
61-
return qps
63+
function nativeConnect() {
64+
return new Promise((resolve, reject) => {
65+
const client = new Native()
66+
client.connect((err) => {
67+
if (err) return reject(err)
68+
resolve(client)
69+
})
70+
})
71+
}
72+
73+
function nativeQuery(client, text, values) {
74+
return new Promise((resolve, reject) => {
75+
client.query(text, values, (err, rows) => {
76+
if (err) return reject(err)
77+
resolve(rows)
78+
})
79+
})
80+
}
81+
82+
function nativeEnd(client) {
83+
return new Promise((resolve) => {
84+
client.end(() => resolve())
85+
})
86+
}
87+
88+
function nativePipeline(client, queries) {
89+
return new Promise((resolve, reject) => {
90+
client.pipeline(queries, (err, results) => {
91+
if (err) return reject(err)
92+
resolve(results)
93+
})
94+
})
95+
}
96+
97+
async function nativeSerial(label, text, values, seconds) {
98+
const client = await nativeConnect()
99+
100+
// warmup
101+
for (let i = 0; i < 100; i++) await nativeQuery(client, text, values)
102+
103+
const deadline = Date.now() + seconds * 1000
104+
let count = 0
105+
while (Date.now() < deadline) {
106+
await nativeQuery(client, text, values)
107+
count++
108+
}
109+
const qps = (count / seconds).toFixed(0)
110+
console.log(` ${label}: ${qps} qps (${count} queries in ${seconds}s)`)
111+
await nativeEnd(client)
112+
return count / seconds
62113
}
63114

115+
async function nativePipelined(label, makeQueries, batchSize, seconds) {
116+
const client = await nativeConnect()
117+
118+
// warmup
119+
for (let i = 0; i < 10; i++) {
120+
await nativePipeline(client, makeQueries(batchSize))
121+
}
122+
123+
const deadline = Date.now() + seconds * 1000
124+
let count = 0
125+
while (Date.now() < deadline) {
126+
await nativePipeline(client, makeQueries(batchSize))
127+
count += batchSize
128+
}
129+
const qps = (count / seconds).toFixed(0)
130+
console.log(` ${label} (batch=${batchSize}): ${qps} qps`)
131+
await nativeEnd(client)
132+
return count / seconds
133+
}
134+
135+
// --- Main ---
136+
64137
async function run() {
65-
const SECONDS = 5
66-
const BATCH = 10
138+
const results = {}
67139

68-
console.log('\n=== Serial (no pipelining) ===')
69-
const serialSimple = await runSerial('simple SELECT 1', SIMPLE, SECONDS)
70-
const serialParam = await runSerial('parameterized', PARAM, SECONDS)
71-
const serialNamed = await runSerial('named prepared', NAMED, SECONDS)
140+
console.log('\n=== JS Client — Serial ===')
141+
results.jsSerialSimple = await jsSerial('simple SELECT 1', { text: 'SELECT 1' }, SECONDS)
142+
results.jsSerialParam = await jsSerial('parameterized', { text: 'SELECT $1::int AS n', values: [42] }, SECONDS)
143+
results.jsSerialNamed = await jsSerial(
144+
'named prepared',
145+
{ name: 'bench-named', text: 'SELECT $1::int AS n', values: [42] },
146+
SECONDS
147+
)
72148

73-
console.log('\n=== Pipelined ===')
74-
const pipedSimple = await benchPipelined(
149+
console.log('\n=== JS Client — Pipelined ===')
150+
results.jsPipedSimple = await jsPipelined(
75151
'simple SELECT 1',
76152
(n) => Array.from({ length: n }, () => ({ text: 'SELECT 1' })),
77153
BATCH,
78154
SECONDS
79155
)
80-
const pipedParam = await benchPipelined(
156+
results.jsPipedParam = await jsPipelined(
81157
'parameterized',
82158
(n) => Array.from({ length: n }, () => ({ text: 'SELECT $1::int AS n', values: [42] })),
83159
BATCH,
84160
SECONDS
85161
)
86-
const pipedNamed = await benchPipelined(
162+
results.jsPipedNamed = await jsPipelined(
87163
'named prepared',
88164
(n) =>
89165
Array.from({ length: n }, (_, i) => ({ name: `bench-named-${i}`, text: 'SELECT $1::int AS n', values: [42] })),
90166
BATCH,
91167
SECONDS
92168
)
93169

94-
console.log('\n=== Speedup ===')
95-
console.log(` simple: ${(pipedSimple / serialSimple).toFixed(2)}x`)
96-
console.log(` parameterized: ${(pipedParam / serialParam).toFixed(2)}x`)
97-
console.log(` named: ${(pipedNamed / serialNamed).toFixed(2)}x`)
170+
if (Native) {
171+
console.log('\n=== Native Client — Serial ===')
172+
results.nativeSerialSimple = await nativeSerial('simple SELECT 1', 'SELECT 1', undefined, SECONDS)
173+
results.nativeSerialParam = await nativeSerial('parameterized', 'SELECT $1::int AS n', [42], SECONDS)
174+
175+
console.log('\n=== Native Client — Pipelined ===')
176+
results.nativePipedSimple = await nativePipelined(
177+
'simple SELECT 1',
178+
(n) => Array.from({ length: n }, () => ({ text: 'SELECT 1' })),
179+
BATCH,
180+
SECONDS
181+
)
182+
results.nativePipedParam = await nativePipelined(
183+
'parameterized',
184+
(n) => Array.from({ length: n }, () => ({ text: 'SELECT $1::int AS n', values: [42] })),
185+
BATCH,
186+
SECONDS
187+
)
188+
} else {
189+
console.log('\n(pg-native not available — skipping native benchmarks)')
190+
}
191+
192+
// --- Summary ---
193+
console.log('\n=== Speedup Summary ===')
194+
console.log('JS pipelining vs serial:')
195+
console.log(` simple: ${(results.jsPipedSimple / results.jsSerialSimple).toFixed(2)}x`)
196+
console.log(` parameterized: ${(results.jsPipedParam / results.jsSerialParam).toFixed(2)}x`)
197+
console.log(` named: ${(results.jsPipedNamed / results.jsSerialNamed).toFixed(2)}x`)
198+
199+
if (Native) {
200+
console.log('Native pipelining vs serial:')
201+
console.log(` simple: ${(results.nativePipedSimple / results.nativeSerialSimple).toFixed(2)}x`)
202+
console.log(` parameterized: ${(results.nativePipedParam / results.nativeSerialParam).toFixed(2)}x`)
203+
204+
console.log('Native serial vs JS serial:')
205+
console.log(` simple: ${(results.nativeSerialSimple / results.jsSerialSimple).toFixed(2)}x`)
206+
console.log(` parameterized: ${(results.nativeSerialParam / results.jsSerialParam).toFixed(2)}x`)
207+
208+
console.log('Native pipelined vs JS pipelined:')
209+
console.log(` simple: ${(results.nativePipedSimple / results.jsPipedSimple).toFixed(2)}x`)
210+
console.log(` parameterized: ${(results.nativePipedParam / results.jsPipedParam).toFixed(2)}x`)
211+
}
98212
}
99213

100214
run().catch((e) => {

0 commit comments

Comments
 (0)