Skip to content

Commit 9dcc82c

Browse files
committed
Add opt-in query pipelining support
Allow multiple queries to be sent on the wire before waiting for responses, reducing round-trip latency. Enabled via client.pipelining = true. Each query gets its own Sync boundary so errors are isolated. Tracks in-flight named statements (submittedNamedStatements) to prevent duplicate Parse messages when pipelining queries with the same prepared statement name. Handles error/disconnect cleanup for the sent queue.
1 parent c78b302 commit 9dcc82c

File tree

5 files changed

+202
-4
lines changed

5 files changed

+202
-4
lines changed

packages/pg/lib/client.js

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ class Client extends EventEmitter {
8383
encoding: this.connectionParameters.client_encoding || 'utf8',
8484
})
8585
this._queryQueue = []
86+
this._sentQueryQueue = []
87+
this.pipelining = false
8688
this.binary = c.binary || defaults.binary
8789
this.processID = null
8890
this.secretKey = null
@@ -126,6 +128,9 @@ class Client extends EventEmitter {
126128
this._activeQuery = null
127129
}
128130

131+
this._sentQueryQueue.forEach(enqueueError)
132+
this._sentQueryQueue.length = 0
133+
129134
this._queryQueue.forEach(enqueueError)
130135
this._queryQueue.length = 0
131136
}
@@ -363,6 +368,10 @@ class Client extends EventEmitter {
363368
if (activeQuery) {
364369
activeQuery.handleReadyForQuery(this.connection)
365370
}
371+
if (this.pipelining && this._sentQueryQueue.length > 0) {
372+
this._activeQuery = this._sentQueryQueue.shift()
373+
this.readyForQuery = false
374+
}
366375
this._pulseQueryQueue()
367376
}
368377

@@ -476,6 +485,7 @@ class Client extends EventEmitter {
476485
// it again on the same client
477486
if (activeQuery.name) {
478487
this.connection.parsedStatements[activeQuery.name] = activeQuery.text
488+
delete this.connection.submittedNamedStatements[activeQuery.name]
479489
}
480490
}
481491

@@ -554,6 +564,8 @@ class Client extends EventEmitter {
554564
})
555565
} else if (client._queryQueue.indexOf(query) !== -1) {
556566
client._queryQueue.splice(client._queryQueue.indexOf(query), 1)
567+
} else if (client._sentQueryQueue.indexOf(query) !== -1) {
568+
client._sentQueryQueue.splice(client._sentQueryQueue.indexOf(query), 1)
557569
}
558570
}
559571

@@ -577,6 +589,10 @@ class Client extends EventEmitter {
577589
}
578590

579591
_pulseQueryQueue() {
592+
if (this.pipelining) {
593+
this._pulsePipelinedQueryQueue()
594+
return
595+
}
580596
if (this.readyForQuery === true) {
581597
this._activeQuery = this._queryQueue.shift()
582598
const activeQuery = this._getActiveQuery()
@@ -599,6 +615,31 @@ class Client extends EventEmitter {
599615
}
600616
}
601617

618+
_pulsePipelinedQueryQueue() {
619+
if (!this._connected) {
620+
return
621+
}
622+
while (this._queryQueue.length > 0) {
623+
const query = this._queryQueue.shift()
624+
this.hasExecuted = true
625+
const queryError = query.submit(this.connection)
626+
if (queryError) {
627+
process.nextTick(() => {
628+
query.handleError(queryError, this.connection)
629+
})
630+
continue
631+
}
632+
this._sentQueryQueue.push(query)
633+
}
634+
if (!this._activeQuery && this._sentQueryQueue.length > 0) {
635+
this._activeQuery = this._sentQueryQueue.shift()
636+
this.readyForQuery = false
637+
}
638+
if (!this._activeQuery && this._sentQueryQueue.length === 0 && this._queryQueue.length === 0 && this.hasExecuted) {
639+
this.emit('drain')
640+
}
641+
}
642+
602643
query(config, values, callback) {
603644
// can take in strings, config object or query object
604645
let query
@@ -654,6 +695,11 @@ class Client extends EventEmitter {
654695
const index = this._queryQueue.indexOf(query)
655696
if (index > -1) {
656697
this._queryQueue.splice(index, 1)
698+
} else if (this.pipelining) {
699+
const sentIndex = this._sentQueryQueue.indexOf(query)
700+
if (sentIndex > -1) {
701+
this._sentQueryQueue.splice(sentIndex, 1)
702+
}
657703
}
658704

659705
this._pulseQueryQueue()
@@ -687,7 +733,7 @@ class Client extends EventEmitter {
687733
return result
688734
}
689735

690-
if (this._queryQueue.length > 0) {
736+
if (this._queryQueue.length > 0 && !this.pipelining) {
691737
queryQueueLengthDeprecationNotice()
692738
}
693739
this._queryQueue.push(query)
@@ -715,7 +761,7 @@ class Client extends EventEmitter {
715761
}
716762
}
717763

718-
if (this._getActiveQuery() || !this._queryable) {
764+
if (this._getActiveQuery() || this._sentQueryQueue.length > 0 || !this._queryable) {
719765
// if we have an active query we need to force a disconnect
720766
// on the socket - otherwise a hung query could block end forever
721767
this.connection.stream.destroy()

packages/pg/lib/connection.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class Connection extends EventEmitter {
2323
this._keepAlive = config.keepAlive
2424
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
2525
this.parsedStatements = {}
26+
this.submittedNamedStatements = {}
2627
this.ssl = config.ssl || false
2728
this._ending = false
2829
this._emitMessage = false

packages/pg/lib/query.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class Query extends EventEmitter {
153153
if (typeof this.text !== 'string' && typeof this.name !== 'string') {
154154
return new Error('A query must have either text or a name. Supplying neither is unsupported.')
155155
}
156-
const previous = connection.parsedStatements[this.name]
156+
const previous = connection.parsedStatements[this.name] || connection.submittedNamedStatements[this.name]
157157
if (this.text && previous && this.text !== previous) {
158158
return new Error(`Prepared statements must be unique - '${this.name}' was used for a different statement`)
159159
}
@@ -183,7 +183,7 @@ class Query extends EventEmitter {
183183
}
184184

185185
hasBeenParsed(connection) {
186-
return this.name && connection.parsedStatements[this.name]
186+
return this.name && (connection.parsedStatements[this.name] || connection.submittedNamedStatements[this.name])
187187
}
188188

189189
handlePortalSuspended(connection) {
@@ -214,6 +214,9 @@ class Query extends EventEmitter {
214214
name: this.name,
215215
types: this.types,
216216
})
217+
if (this.name) {
218+
connection.submittedNamedStatements[this.name] = this.text
219+
}
217220
}
218221

219222
// because we're mapping user supplied values to
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
'use strict'
2+
const helper = require('./test-helper')
3+
const assert = require('assert')
4+
const suite = new helper.Suite()
5+
6+
suite.test('basic pipelining with simple queries', async function () {
7+
const client = helper.client()
8+
client.pipelining = true
9+
10+
const [r1, r2, r3] = await Promise.all([
11+
client.query('SELECT 1 AS num'),
12+
client.query('SELECT 2 AS num'),
13+
client.query('SELECT 3 AS num'),
14+
])
15+
16+
assert.equal(r1.rows[0].num, 1)
17+
assert.equal(r2.rows[0].num, 2)
18+
assert.equal(r3.rows[0].num, 3)
19+
20+
await client.end()
21+
})
22+
23+
suite.test('pipelining with parameterized queries', async function () {
24+
const client = helper.client()
25+
client.pipelining = true
26+
27+
const [r1, r2, r3] = await Promise.all([
28+
client.query('SELECT $1::int AS num', [10]),
29+
client.query('SELECT $1::text AS name', ['hello']),
30+
client.query('SELECT $1::int + $2::int AS sum', [3, 4]),
31+
])
32+
33+
assert.equal(r1.rows[0].num, 10)
34+
assert.equal(r2.rows[0].name, 'hello')
35+
assert.equal(r3.rows[0].sum, 7)
36+
37+
await client.end()
38+
})
39+
40+
suite.test('pipelining with named prepared statements', async function () {
41+
const client = helper.client()
42+
client.pipelining = true
43+
44+
const [r1, r2] = await Promise.all([
45+
client.query({ name: 'fetch-num', text: 'SELECT $1::int AS num', values: [42] }),
46+
client.query({ name: 'fetch-num', text: 'SELECT $1::int AS num', values: [99] }),
47+
])
48+
49+
assert.equal(r1.rows[0].num, 42)
50+
assert.equal(r2.rows[0].num, 99)
51+
52+
await client.end()
53+
})
54+
55+
suite.test('pipelining error isolation', async function () {
56+
const client = helper.client()
57+
client.pipelining = true
58+
59+
const results = await Promise.allSettled([
60+
client.query('SELECT 1 AS num'),
61+
client.query('SELECT INVALID SYNTAX'),
62+
client.query('SELECT 3 AS num'),
63+
])
64+
65+
assert.equal(results[0].status, 'fulfilled')
66+
assert.equal(results[0].value.rows[0].num, 1)
67+
assert.equal(results[1].status, 'rejected')
68+
assert.equal(results[2].status, 'fulfilled')
69+
assert.equal(results[2].value.rows[0].num, 3)
70+
71+
await client.end()
72+
})
73+
74+
suite.test('pipelining drain event', async function () {
75+
const client = helper.client()
76+
client.pipelining = true
77+
78+
const drainPromise = new Promise((resolve) => {
79+
client.on('drain', resolve)
80+
})
81+
82+
client.query('SELECT 1')
83+
client.query('SELECT 2')
84+
client.query('SELECT 3')
85+
86+
await drainPromise
87+
await client.end()
88+
})

packages/pg/test/unit/client/simple-query-tests.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,66 @@ test('executing query', function () {
114114
})
115115
})
116116

117+
test('pipelining', function () {
118+
test('sends all queries immediately after readyForQuery', function () {
119+
const client = helper.client()
120+
client.pipelining = true
121+
client.connection.emit('readyForQuery')
122+
client.query('one')
123+
client.query('two')
124+
client.query('three')
125+
assert.lengthIs(client.connection.queries, 3)
126+
assert.equal(client.connection.queries[0], 'one')
127+
assert.equal(client.connection.queries[1], 'two')
128+
assert.equal(client.connection.queries[2], 'three')
129+
})
130+
131+
test('completes queries in order', function (done) {
132+
const client = helper.client()
133+
client.pipelining = true
134+
const con = client.connection
135+
con.emit('readyForQuery')
136+
137+
const results = []
138+
client.query('one', (err, res) => {
139+
results.push('one')
140+
})
141+
client.query('two', (err, res) => {
142+
results.push('two')
143+
})
144+
client.query('three', (err, res) => {
145+
results.push('three')
146+
})
147+
148+
// simulate server responding to each query in order
149+
con.emit('readyForQuery')
150+
con.emit('readyForQuery')
151+
con.emit('readyForQuery')
152+
153+
process.nextTick(() => {
154+
assert.deepStrictEqual(results, ['one', 'two', 'three'])
155+
done()
156+
})
157+
})
158+
159+
test('emits drain after all queries complete', function (done) {
160+
const client = helper.client()
161+
client.pipelining = true
162+
const con = client.connection
163+
con.emit('readyForQuery')
164+
165+
client.query('one')
166+
client.query('two')
167+
168+
client.on('drain', () => {
169+
done()
170+
})
171+
172+
con.emit('readyForQuery')
173+
con.emit('readyForQuery')
174+
})
175+
})
176+
117177
test('handles errors', function () {
118178
const client = helper.client()
119179

0 commit comments

Comments
 (0)