Skip to content

Commit 8cb7e38

Browse files
committed
Add pipelining support to native client via libpq pipeline mode
- pg-native: handle PGRES_PIPELINE_SYNC/PGRES_PIPELINE_ABORTED in _emitResult; add pipeline() batch method using libpq 14+ pipeline mode (enterPipelineMode, pipelineSync, exitPipelineMode) - pg-native: bump libpq dependency to ^1.9.0 (has pipeline bindings) - native client: add _pulsePipelinedQueryQueue that batches all queued queries through pg-native pipeline(), delivering results per-query - native client: suppress queue length deprecation when pipelining
1 parent 5cacf64 commit 8cb7e38

File tree

3 files changed

+240
-2
lines changed

3 files changed

+240
-2
lines changed

packages/pg-native/index.js

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ Client.prototype._emitResult = function (pq) {
191191
break
192192
}
193193

194+
case 'PGRES_PIPELINE_SYNC':
195+
case 'PGRES_PIPELINE_ABORTED':
196+
break
197+
194198
default:
195199
this._readError('unrecognized command status: ' + status)
196200
break
@@ -306,6 +310,158 @@ Client.prototype._onResult = function (result) {
306310
this._resultCount++
307311
}
308312

313+
// Send a batch of queries in pipeline mode and collect results in order.
314+
// Each entry in `queries` is {text, values?, name?}.
315+
// `cb(err, results)` where results is an array, one per query,
316+
// of {err, rows, result} objects.
317+
Client.prototype.pipeline = function (queries, cb) {
318+
const pq = this.pq
319+
320+
if (!pq.pipelineModeSupported || !pq.pipelineModeSupported()) {
321+
return cb(new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'))
322+
}
323+
324+
if (!pq.enterPipelineMode()) {
325+
return cb(new Error(pq.errorMessage() || 'Failed to enter pipeline mode'))
326+
}
327+
328+
pq.setNonBlocking(true)
329+
330+
// Send all queries, each followed by a sync
331+
for (let i = 0; i < queries.length; i++) {
332+
const q = queries[i]
333+
let sent
334+
if (q.name) {
335+
if (q._alreadyPrepared) {
336+
sent = pq.sendQueryPrepared(q.name, q.values || [])
337+
} else {
338+
// send prepare then execute in same pipeline batch
339+
sent = pq.sendPrepare(q.name, q.text, (q.values || []).length)
340+
if (sent) {
341+
sent = pq.sendQueryPrepared(q.name, q.values || [])
342+
}
343+
}
344+
} else if (q.values) {
345+
sent = pq.sendQueryParams(q.text, q.values)
346+
} else {
347+
sent = pq.sendQuery(q.text)
348+
}
349+
350+
if (!sent) {
351+
const err = new Error(pq.errorMessage() || 'Failed to send pipelined query')
352+
pq.exitPipelineMode()
353+
return cb(err)
354+
}
355+
356+
pq.pipelineSync()
357+
}
358+
359+
// Flush all queued data to the socket
360+
this._waitForDrain(pq, (err) => {
361+
if (err) {
362+
pq.exitPipelineMode()
363+
return cb(err)
364+
}
365+
this._readPipelineResults(queries, cb)
366+
})
367+
}
368+
369+
// Read pipeline results for `queries.length` sync points.
370+
// Calls cb(null, results) when all syncs have been received.
371+
Client.prototype._readPipelineResults = function (queries, cb) {
372+
const pq = this.pq
373+
const self = this
374+
const results = []
375+
let queryIndex = 0
376+
let currentResult = null
377+
let currentError = null
378+
379+
const processResults = function () {
380+
if (!pq.consumeInput()) {
381+
pq.exitPipelineMode()
382+
return cb(new Error(pq.errorMessage() || 'Failed to consume input'))
383+
}
384+
385+
while (!pq.isBusy()) {
386+
if (!pq.getResult()) {
387+
// null between result groups in pipeline — try again
388+
if (pq.isBusy()) return // more data needed
389+
if (!pq.getResult()) {
390+
// truly no more results — should not happen before all syncs
391+
break
392+
}
393+
}
394+
395+
const status = pq.resultStatus()
396+
397+
if (status === 'PGRES_PIPELINE_SYNC') {
398+
// End of one query's results + sync
399+
if (currentError) {
400+
results.push({ err: currentError, rows: null, result: null })
401+
} else if (currentResult) {
402+
results.push({ err: null, rows: currentResult.rows, result: currentResult })
403+
} else {
404+
results.push({ err: null, rows: [], result: null })
405+
}
406+
currentResult = null
407+
currentError = null
408+
queryIndex++
409+
410+
if (queryIndex >= queries.length) {
411+
// All queries processed
412+
pq.exitPipelineMode()
413+
return cb(null, results)
414+
}
415+
continue
416+
}
417+
418+
if (status === 'PGRES_FATAL_ERROR') {
419+
currentError = new Error(pq.resultErrorMessage())
420+
// Extract error fields
421+
const fields = pq.resultErrorFields()
422+
if (fields) {
423+
for (const key in fields) {
424+
currentError[key] = fields[key]
425+
}
426+
}
427+
continue
428+
}
429+
430+
if (status === 'PGRES_PIPELINE_ABORTED') {
431+
// Query skipped due to previous error in same sync group
432+
continue
433+
}
434+
435+
if (status === 'PGRES_TUPLES_OK' || status === 'PGRES_COMMAND_OK' || status === 'PGRES_EMPTY_QUERY') {
436+
currentResult = self._consumeQueryResults(pq)
437+
continue
438+
}
439+
}
440+
441+
// Still waiting for more data — will be called again when readable
442+
}
443+
444+
// Use the libuv readable watcher
445+
this._stopReading()
446+
let done = false
447+
const origCb = cb
448+
cb = function (err, results) {
449+
if (done) return
450+
done = true
451+
pq.removeListener('readable', onReadable)
452+
self._stopReading()
453+
origCb(err, results)
454+
}
455+
const onReadable = function () {
456+
processResults()
457+
}
458+
pq.on('readable', onReadable)
459+
pq.startReader()
460+
461+
// Try an initial read in case data is already available
462+
processResults()
463+
}
464+
309465
Client.prototype._onReadyForQuery = function () {
310466
// remove instance callback
311467
const cb = this._queryCallback

packages/pg-native/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
},
3535
"homepage": "https://github.com/brianc/node-postgres/tree/master/packages/pg-native",
3636
"dependencies": {
37-
"libpq": "^1.8.15",
37+
"libpq": "^1.9.0",
3838
"pg-types": "2.2.0"
3939
},
4040
"devDependencies": {

packages/pg/lib/native/client.js

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const Client = (module.exports = function (config) {
3636
this._connecting = false
3737
this._connected = false
3838
this._queryable = true
39+
this.pipelining = false
40+
this._pipeliningInFlight = false
3941

4042
// keep these on the object for legacy reasons
4143
// for the time being. TODO: deprecate all this jazz
@@ -234,7 +236,7 @@ Client.prototype.query = function (config, values, callback) {
234236
return result
235237
}
236238

237-
if (this._queryQueue.length > 0) {
239+
if (this._queryQueue.length > 0 && !this.pipelining) {
238240
queryQueueLengthDeprecationNotice()
239241
}
240242

@@ -280,6 +282,9 @@ Client.prototype._pulseQueryQueue = function (initialConnection) {
280282
if (!this._connected) {
281283
return
282284
}
285+
if (this.pipelining && !initialConnection) {
286+
return this._pulsePipelinedQueryQueue()
287+
}
283288
if (this._hasActiveQuery()) {
284289
return
285290
}
@@ -298,6 +303,83 @@ Client.prototype._pulseQueryQueue = function (initialConnection) {
298303
})
299304
}
300305

306+
Client.prototype._pulsePipelinedQueryQueue = function () {
307+
if (!this._connected || this._pipeliningInFlight) {
308+
return
309+
}
310+
if (this._queryQueue.length === 0) {
311+
if (this.hasExecuted) {
312+
this.emit('drain')
313+
}
314+
return
315+
}
316+
317+
this._pipeliningInFlight = true
318+
const self = this
319+
const queries = []
320+
const nativeQueries = []
321+
const utils = require('../utils')
322+
323+
while (this._queryQueue.length > 0) {
324+
const query = this._queryQueue.shift()
325+
this.hasExecuted = true
326+
nativeQueries.push(query)
327+
328+
const values = query.values ? query.values.map(utils.prepareValue) : null
329+
const pipelineEntry = { text: query.text, name: query.name }
330+
if (values) {
331+
pipelineEntry.values = values
332+
}
333+
if (query.name && this.namedQueries[query.name]) {
334+
pipelineEntry._alreadyPrepared = true
335+
}
336+
queries.push(pipelineEntry)
337+
}
338+
339+
this.native.pipeline(queries, function (err, results) {
340+
self._pipeliningInFlight = false
341+
342+
if (err) {
343+
// Total pipeline failure — error all queries
344+
for (let i = 0; i < nativeQueries.length; i++) {
345+
const q = nativeQueries[i]
346+
q.native = self.native
347+
q.handleError(err)
348+
}
349+
self._pulsePipelinedQueryQueue()
350+
return
351+
}
352+
353+
// Deliver results to each query
354+
for (let i = 0; i < nativeQueries.length; i++) {
355+
const q = nativeQueries[i]
356+
const r = results[i]
357+
q.native = self.native
358+
359+
if (r.err) {
360+
q.handleError(r.err)
361+
} else {
362+
// Track named queries on success
363+
if (q.name) {
364+
self.namedQueries[q.name] = q.text
365+
}
366+
q.state = 'end'
367+
q.emit('end', r.result)
368+
if (q.callback) {
369+
q.callback(null, r.result)
370+
}
371+
}
372+
373+
setImmediate(function () {
374+
q.emit('_done')
375+
})
376+
}
377+
378+
// Process any queries that arrived while we were reading
379+
self._pulsePipelinedQueryQueue()
380+
})
381+
}
382+
301383
// attempt to cancel an in-progress query
302384
Client.prototype.cancel = function (query) {
303385
if (this._activeQuery === query) {

0 commit comments

Comments
 (0)