From deb7f7644fb28f0942195d86de2a78889662419d Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 15:58:14 +0200 Subject: [PATCH 01/11] feat: pipeline --- packages/pg/lib/client.js | 156 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 151 insertions(+), 5 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 903db6c66..97e052fa8 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -68,6 +68,9 @@ class Client extends EventEmitter { this._connectionError = false this._queryable = true this._activeQuery = null + this._pipelining = false + this._pipelineQueue = [] + this._pipelineSync = false this.enableChannelBinding = Boolean(c.enableChannelBinding) // set true to use SCRAM-SHA-256-PLUS when offered this.connection = @@ -123,8 +126,15 @@ class Client extends EventEmitter { this._activeQuery = null } + // Error all queued queries this._queryQueue.forEach(enqueueError) this._queryQueue.length = 0 + + // Error all pipeline queries + if (this._pipelining) { + this._pipelineQueue.forEach(enqueueError) + this._pipelineQueue.length = 0 + } } _connect(callback) { @@ -354,12 +364,36 @@ class Client extends EventEmitter { } this.emit('connect') } - const activeQuery = this._getActiveQuery() - this._activeQuery = null - this.readyForQuery = true - if (activeQuery) { - activeQuery.handleReadyForQuery(this.connection) + + if (this._pipelining) { + // In pipeline mode, readyForQuery indicates end of current query in pipeline + const activeQuery = this._getActiveQuery() + if (activeQuery) { + activeQuery.handleReadyForQuery(this.connection) + // Remove completed query from pipeline queue + const index = this._pipelineQueue.indexOf(activeQuery) + if (index > -1) { + this._pipelineQueue.splice(index, 1) + } + } + + // Set next query as active if available + if (this._pipelineQueue.length > 0) { + this._activeQuery = this._pipelineQueue[0] + } else { + this._activeQuery = null + this.readyForQuery = true + this.emit('drain') + } + } else { + const activeQuery = this._getActiveQuery() + this._activeQuery = null + this.readyForQuery = true + if (activeQuery) { + activeQuery.handleReadyForQuery(this.connection) + } } + this._pulseQueryQueue() } @@ -538,6 +572,10 @@ class Client extends EventEmitter { } _pulseQueryQueue() { + if (this._pipelining) { + return this._pipelinePulseQueryQueue() + } + if (this.readyForQuery === true) { this._activeQuery = this._queryQueue.shift() const activeQuery = this._getActiveQuery() @@ -591,6 +629,16 @@ class Client extends EventEmitter { } } + // In pipeline mode, only extended query protocol is allowed + if (this._pipelining) { + if (typeof config === 'string') { + throw new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries instead.') + } + if (query.text && query.text.includes(';') && query.text.trim().split(';').filter(s => s.trim()).length > 1) { + throw new Error('Multiple SQL commands in a single query are not allowed in pipeline mode.') + } + } + if (readTimeout) { queryCallback = query.callback @@ -689,6 +737,104 @@ class Client extends EventEmitter { queryQueueDeprecationNotice() return this._queryQueue } + + // Pipeline mode support + get pipelining() { + return this._pipelining + } + + set pipelining(value) { + if (typeof value !== 'boolean') { + throw new TypeError('pipelining must be a boolean') + } + + if (value && !this._connected) { + throw new Error('Cannot enable pipelining before connection is established') + } + + if (value && this._getActiveQuery()) { + throw new Error('Cannot enable pipelining while a query is active') + } + + if (value && !this._pipelining) { + this._enterPipelineMode() + } else if (!value && this._pipelining) { + this._exitPipelineMode() + } + } + + pipelineStatus() { + return this._pipelining ? 'PIPELINE_ON' : 'PIPELINE_OFF' + } + + _enterPipelineMode() { + if (this._pipelining) { + return + } + + if (!this._connected) { + throw new Error('Cannot enter pipeline mode before connection is established') + } + + if (this._getActiveQuery()) { + throw new Error('Cannot enter pipeline mode while a query is active') + } + + this._pipelining = true + this._pipelineQueue = [] + this._pipelineSync = false + } + + _exitPipelineMode() { + if (!this._pipelining) { + return + } + + // Send sync to end pipeline if we have pending queries + if (this._pipelineQueue.length > 0 && !this._pipelineSync) { + this.connection.sync() + this._pipelineSync = true + } + + this._pipelining = false + } + + _pipelinePulseQueryQueue() { + if (!this._pipelining) { + return this._pulseQueryQueue() + } + + // In pipeline mode, send all queued queries immediately without waiting for responses + while (this._queryQueue.length > 0) { + const query = this._queryQueue.shift() + this._pipelineQueue.push(query) + + // Force extended query protocol for pipeline mode + if (!query.requiresPreparation()) { + query.queryMode = 'extended' + } + + const queryError = query.submit(this.connection) + if (queryError) { + process.nextTick(() => { + query.handleError(queryError, this.connection) + }) + // Remove failed query from pipeline queue + const index = this._pipelineQueue.indexOf(query) + if (index > -1) { + this._pipelineQueue.splice(index, 1) + } + continue + } + } + + // Set active query to first in pipeline if we don't have one + if (!this._getActiveQuery() && this._pipelineQueue.length > 0) { + this._activeQuery = this._pipelineQueue[0] + } + + this.readyForQuery = true + } } // expose a Query constructor From 18d563ab19be1f852b8a934897ecfd90bb1cd0e9 Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 15:58:43 +0200 Subject: [PATCH 02/11] Create pipeline-tests.js --- packages/pg/lib/pipeline-tests.js | 206 ++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 packages/pg/lib/pipeline-tests.js diff --git a/packages/pg/lib/pipeline-tests.js b/packages/pg/lib/pipeline-tests.js new file mode 100644 index 000000000..3137e7a62 --- /dev/null +++ b/packages/pg/lib/pipeline-tests.js @@ -0,0 +1,206 @@ +'use strict' + +const helper = require('../test-helper') +const pg = helper.pg +const assert = require('assert') + +const suite = new helper.Suite() + +suite.test('pipeline mode basic functionality', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + // Initially not in pipeline mode + assert.equal(client.pipelineStatus(), 'PIPELINE_OFF') + assert.equal(client.pipelining, false) + + // Enable pipeline mode + client.pipelining = true + assert.equal(client.pipelineStatus(), 'PIPELINE_ON') + assert.equal(client.pipelining, true) + + // Disable pipeline mode + client.pipelining = false + assert.equal(client.pipelineStatus(), 'PIPELINE_OFF') + assert.equal(client.pipelining, false) + + client.end(cb) + }) +}) + +suite.test('cannot enable pipeline before connection', (cb) => { + const client = new pg.Client(helper.config) + + try { + client.pipelining = true + cb(new Error('Should have thrown error')) + } catch (err) { + assert.equal(err.message, 'Cannot enable pipelining before connection is established') + cb() + } +}) + +suite.test('pipeline mode with multiple queries', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + let results = [] + let completed = 0 + + // Send multiple queries in pipeline mode + client.query('SELECT 1 as num', (err, res) => { + if (err) return cb(err) + results[0] = res.rows[0].num + completed++ + if (completed === 3) checkResults() + }) + + client.query('SELECT 2 as num', (err, res) => { + if (err) return cb(err) + results[1] = res.rows[0].num + completed++ + if (completed === 3) checkResults() + }) + + client.query('SELECT 3 as num', (err, res) => { + if (err) return cb(err) + results[2] = res.rows[0].num + completed++ + if (completed === 3) checkResults() + }) + + function checkResults() { + assert.equal(results[0], 1) + assert.equal(results[1], 2) + assert.equal(results[2], 3) + client.end(cb) + } + }) +}) + +suite.test('pipeline mode rejects simple query protocol', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + try { + client.query('SELECT 1', (err, res) => { + // This should not be called + cb(new Error('Simple query should have been rejected')) + }) + } catch (err) { + assert(err.message.includes('Simple query protocol is not allowed in pipeline mode')) + client.end(cb) + } + }) +}) + +suite.test('pipeline mode rejects multiple SQL commands', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + try { + client.query({ text: 'SELECT 1; SELECT 2;' }, (err, res) => { + // This should not be called + cb(new Error('Multiple SQL commands should have been rejected')) + }) + } catch (err) { + assert(err.message.includes('Multiple SQL commands in a single query are not allowed in pipeline mode')) + client.end(cb) + } + }) +}) + +suite.test('pipeline mode with parameterized queries', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + let results = [] + let completed = 0 + + // Send parameterized queries in pipeline mode + client.query({ text: 'SELECT $1::int as num', values: [10] }, (err, res) => { + if (err) return cb(err) + results[0] = res.rows[0].num + completed++ + if (completed === 2) checkResults() + }) + + client.query({ text: 'SELECT $1::text as str', values: ['hello'] }, (err, res) => { + if (err) return cb(err) + results[1] = res.rows[0].str + completed++ + if (completed === 2) checkResults() + }) + + function checkResults() { + assert.equal(results[0], 10) + assert.equal(results[1], 'hello') + client.end(cb) + } + }) +}) + +suite.test('pipeline mode performance benefit', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + const numQueries = 10 + + // Test without pipeline mode + const startNormal = Date.now() + let normalCompleted = 0 + + function runNormalQueries() { + for (let i = 0; i < numQueries; i++) { + client.query({ text: 'SELECT $1::int as num', values: [i] }, (err, res) => { + if (err) return cb(err) + normalCompleted++ + if (normalCompleted === numQueries) { + const normalTime = Date.now() - startNormal + runPipelineQueries(normalTime) + } + }) + } + } + + function runPipelineQueries(normalTime) { + client.pipelining = true + const startPipeline = Date.now() + let pipelineCompleted = 0 + + for (let i = 0; i < numQueries; i++) { + client.query({ text: 'SELECT $1::int as num', values: [i] }, (err, res) => { + if (err) return cb(err) + pipelineCompleted++ + if (pipelineCompleted === numQueries) { + const pipelineTime = Date.now() - startPipeline + + // Pipeline should be faster or at least not significantly slower + // In real network conditions with latency, pipeline would show more benefit + console.log(`Normal mode: ${normalTime}ms, Pipeline mode: ${pipelineTime}ms`) + + client.end(cb) + } + }) + } + } + + runNormalQueries() + }) +}) + +module.exports = suite From c808c23ff5693b1520785895f7f4c5eecf8e949b Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 16:09:08 +0200 Subject: [PATCH 03/11] fix: lint --- packages/pg/lib/client.js | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 97e052fa8..360f197d7 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -129,7 +129,7 @@ class Client extends EventEmitter { // Error all queued queries this._queryQueue.forEach(enqueueError) this._queryQueue.length = 0 - + // Error all pipeline queries if (this._pipelining) { this._pipelineQueue.forEach(enqueueError) @@ -364,7 +364,7 @@ class Client extends EventEmitter { } this.emit('connect') } - + if (this._pipelining) { // In pipeline mode, readyForQuery indicates end of current query in pipeline const activeQuery = this._getActiveQuery() @@ -376,7 +376,7 @@ class Client extends EventEmitter { this._pipelineQueue.splice(index, 1) } } - + // Set next query as active if available if (this._pipelineQueue.length > 0) { this._activeQuery = this._pipelineQueue[0] @@ -393,7 +393,7 @@ class Client extends EventEmitter { activeQuery.handleReadyForQuery(this.connection) } } - + this._pulseQueryQueue() } @@ -575,7 +575,7 @@ class Client extends EventEmitter { if (this._pipelining) { return this._pipelinePulseQueryQueue() } - + if (this.readyForQuery === true) { this._activeQuery = this._queryQueue.shift() const activeQuery = this._getActiveQuery() @@ -634,7 +634,7 @@ class Client extends EventEmitter { if (typeof config === 'string') { throw new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries instead.') } - if (query.text && query.text.includes(';') && query.text.trim().split(';').filter(s => s.trim()).length > 1) { + if (query.text && query.text.includes(';')) { throw new Error('Multiple SQL commands in a single query are not allowed in pipeline mode.') } } @@ -747,15 +747,15 @@ class Client extends EventEmitter { if (typeof value !== 'boolean') { throw new TypeError('pipelining must be a boolean') } - + if (value && !this._connected) { throw new Error('Cannot enable pipelining before connection is established') } - + if (value && this._getActiveQuery()) { throw new Error('Cannot enable pipelining while a query is active') } - + if (value && !this._pipelining) { this._enterPipelineMode() } else if (!value && this._pipelining) { @@ -771,15 +771,15 @@ class Client extends EventEmitter { if (this._pipelining) { return } - + if (!this._connected) { throw new Error('Cannot enter pipeline mode before connection is established') } - + if (this._getActiveQuery()) { throw new Error('Cannot enter pipeline mode while a query is active') } - + this._pipelining = true this._pipelineQueue = [] this._pipelineSync = false @@ -789,13 +789,13 @@ class Client extends EventEmitter { if (!this._pipelining) { return } - + // Send sync to end pipeline if we have pending queries if (this._pipelineQueue.length > 0 && !this._pipelineSync) { this.connection.sync() this._pipelineSync = true } - + this._pipelining = false } @@ -803,17 +803,17 @@ class Client extends EventEmitter { if (!this._pipelining) { return this._pulseQueryQueue() } - + // In pipeline mode, send all queued queries immediately without waiting for responses while (this._queryQueue.length > 0) { const query = this._queryQueue.shift() this._pipelineQueue.push(query) - + // Force extended query protocol for pipeline mode if (!query.requiresPreparation()) { query.queryMode = 'extended' } - + const queryError = query.submit(this.connection) if (queryError) { process.nextTick(() => { @@ -827,12 +827,12 @@ class Client extends EventEmitter { continue } } - + // Set active query to first in pipeline if we don't have one if (!this._getActiveQuery() && this._pipelineQueue.length > 0) { this._activeQuery = this._pipelineQueue[0] } - + this.readyForQuery = true } } From b1976ae2285b995d3aa40e73e3e5b411c7643fdf Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 16:11:51 +0200 Subject: [PATCH 04/11] fix: lint --- packages/pg/lib/pipeline-tests.js | 54 +++++++++++++++---------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/packages/pg/lib/pipeline-tests.js b/packages/pg/lib/pipeline-tests.js index 3137e7a62..ce5620b19 100644 --- a/packages/pg/lib/pipeline-tests.js +++ b/packages/pg/lib/pipeline-tests.js @@ -10,28 +10,28 @@ suite.test('pipeline mode basic functionality', (cb) => { const client = new pg.Client(helper.config) client.connect((err) => { if (err) return cb(err) - + // Initially not in pipeline mode assert.equal(client.pipelineStatus(), 'PIPELINE_OFF') assert.equal(client.pipelining, false) - + // Enable pipeline mode client.pipelining = true assert.equal(client.pipelineStatus(), 'PIPELINE_ON') assert.equal(client.pipelining, true) - + // Disable pipeline mode client.pipelining = false assert.equal(client.pipelineStatus(), 'PIPELINE_OFF') assert.equal(client.pipelining, false) - + client.end(cb) }) }) suite.test('cannot enable pipeline before connection', (cb) => { const client = new pg.Client(helper.config) - + try { client.pipelining = true cb(new Error('Should have thrown error')) @@ -45,12 +45,12 @@ suite.test('pipeline mode with multiple queries', (cb) => { const client = new pg.Client(helper.config) client.connect((err) => { if (err) return cb(err) - + client.pipelining = true - + let results = [] let completed = 0 - + // Send multiple queries in pipeline mode client.query('SELECT 1 as num', (err, res) => { if (err) return cb(err) @@ -58,21 +58,21 @@ suite.test('pipeline mode with multiple queries', (cb) => { completed++ if (completed === 3) checkResults() }) - + client.query('SELECT 2 as num', (err, res) => { if (err) return cb(err) results[1] = res.rows[0].num completed++ if (completed === 3) checkResults() }) - + client.query('SELECT 3 as num', (err, res) => { if (err) return cb(err) results[2] = res.rows[0].num completed++ if (completed === 3) checkResults() }) - + function checkResults() { assert.equal(results[0], 1) assert.equal(results[1], 2) @@ -86,9 +86,9 @@ suite.test('pipeline mode rejects simple query protocol', (cb) => { const client = new pg.Client(helper.config) client.connect((err) => { if (err) return cb(err) - + client.pipelining = true - + try { client.query('SELECT 1', (err, res) => { // This should not be called @@ -105,9 +105,9 @@ suite.test('pipeline mode rejects multiple SQL commands', (cb) => { const client = new pg.Client(helper.config) client.connect((err) => { if (err) return cb(err) - + client.pipelining = true - + try { client.query({ text: 'SELECT 1; SELECT 2;' }, (err, res) => { // This should not be called @@ -124,12 +124,12 @@ suite.test('pipeline mode with parameterized queries', (cb) => { const client = new pg.Client(helper.config) client.connect((err) => { if (err) return cb(err) - + client.pipelining = true - + let results = [] let completed = 0 - + // Send parameterized queries in pipeline mode client.query({ text: 'SELECT $1::int as num', values: [10] }, (err, res) => { if (err) return cb(err) @@ -137,14 +137,14 @@ suite.test('pipeline mode with parameterized queries', (cb) => { completed++ if (completed === 2) checkResults() }) - + client.query({ text: 'SELECT $1::text as str', values: ['hello'] }, (err, res) => { if (err) return cb(err) results[1] = res.rows[0].str completed++ if (completed === 2) checkResults() }) - + function checkResults() { assert.equal(results[0], 10) assert.equal(results[1], 'hello') @@ -157,9 +157,9 @@ suite.test('pipeline mode performance benefit', (cb) => { const client = new pg.Client(helper.config) client.connect((err) => { if (err) return cb(err) - + const numQueries = 10 - + // Test without pipeline mode const startNormal = Date.now() let normalCompleted = 0 @@ -176,29 +176,29 @@ suite.test('pipeline mode performance benefit', (cb) => { }) } } - + function runPipelineQueries(normalTime) { client.pipelining = true const startPipeline = Date.now() let pipelineCompleted = 0 - + for (let i = 0; i < numQueries; i++) { client.query({ text: 'SELECT $1::int as num', values: [i] }, (err, res) => { if (err) return cb(err) pipelineCompleted++ if (pipelineCompleted === numQueries) { const pipelineTime = Date.now() - startPipeline - + // Pipeline should be faster or at least not significantly slower // In real network conditions with latency, pipeline would show more benefit console.log(`Normal mode: ${normalTime}ms, Pipeline mode: ${pipelineTime}ms`) - + client.end(cb) } }) } } - + runNormalQueries() }) }) From aa6b68e17248783601d515c60441410504fdab9d Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 16:14:08 +0200 Subject: [PATCH 05/11] fix: lint (again) --- packages/pg/lib/pipeline-tests.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/pg/lib/pipeline-tests.js b/packages/pg/lib/pipeline-tests.js index ce5620b19..5138f9bf7 100644 --- a/packages/pg/lib/pipeline-tests.js +++ b/packages/pg/lib/pipeline-tests.js @@ -48,7 +48,7 @@ suite.test('pipeline mode with multiple queries', (cb) => { client.pipelining = true - let results = [] + const results = [] let completed = 0 // Send multiple queries in pipeline mode @@ -127,7 +127,7 @@ suite.test('pipeline mode with parameterized queries', (cb) => { client.pipelining = true - let results = [] + const results = [] let completed = 0 // Send parameterized queries in pipeline mode @@ -163,7 +163,7 @@ suite.test('pipeline mode performance benefit', (cb) => { // Test without pipeline mode const startNormal = Date.now() let normalCompleted = 0 - + function runNormalQueries() { for (let i = 0; i < numQueries; i++) { client.query({ text: 'SELECT $1::int as num', values: [i] }, (err, res) => { From 0bdb19c4f50878e239d6a4060546ce56077139a1 Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 16:22:33 +0200 Subject: [PATCH 06/11] fix: move test in the right folder --- packages/pg/{lib => test/unit/client}/pipeline-tests.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename packages/pg/{lib => test/unit/client}/pipeline-tests.js (99%) diff --git a/packages/pg/lib/pipeline-tests.js b/packages/pg/test/unit/client/pipeline-tests.js similarity index 99% rename from packages/pg/lib/pipeline-tests.js rename to packages/pg/test/unit/client/pipeline-tests.js index 5138f9bf7..dca25112c 100644 --- a/packages/pg/lib/pipeline-tests.js +++ b/packages/pg/test/unit/client/pipeline-tests.js @@ -1,6 +1,6 @@ 'use strict' -const helper = require('../test-helper') +const helper = require('./test-helper') const pg = helper.pg const assert = require('assert') From d8bb1a1aad5f626a56a03b3bc1bbfd7569c6a637 Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 16:30:09 +0200 Subject: [PATCH 07/11] fix: test --- packages/pg/lib/client.js | 1 + packages/pg/test/unit/client/pipeline-tests.js | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 360f197d7..2bba7e197 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -631,6 +631,7 @@ class Client extends EventEmitter { // In pipeline mode, only extended query protocol is allowed if (this._pipelining) { + // TODO: better check! if (typeof config === 'string') { throw new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries instead.') } diff --git a/packages/pg/test/unit/client/pipeline-tests.js b/packages/pg/test/unit/client/pipeline-tests.js index dca25112c..e3cdad1e4 100644 --- a/packages/pg/test/unit/client/pipeline-tests.js +++ b/packages/pg/test/unit/client/pipeline-tests.js @@ -52,21 +52,21 @@ suite.test('pipeline mode with multiple queries', (cb) => { let completed = 0 // Send multiple queries in pipeline mode - client.query('SELECT 1 as num', (err, res) => { + client.query({text: 'SELECT 1 as num'}, (err, res) => { if (err) return cb(err) results[0] = res.rows[0].num completed++ if (completed === 3) checkResults() }) - client.query('SELECT 2 as num', (err, res) => { + client.query({text: 'SELECT 2 as num'}, (err, res) => { if (err) return cb(err) results[1] = res.rows[0].num completed++ if (completed === 3) checkResults() }) - client.query('SELECT 3 as num', (err, res) => { + client.query({text: 'SELECT 3 as num'}, (err, res) => { if (err) return cb(err) results[2] = res.rows[0].num completed++ @@ -192,6 +192,7 @@ suite.test('pipeline mode performance benefit', (cb) => { // Pipeline should be faster or at least not significantly slower // In real network conditions with latency, pipeline would show more benefit console.log(`Normal mode: ${normalTime}ms, Pipeline mode: ${pipelineTime}ms`) + assert(normalTime <= pipelineTime) client.end(cb) } From 059d6fe2ffaecb1fa086bf05bed4583f0261ee80 Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 16:32:19 +0200 Subject: [PATCH 08/11] fix: lint --- packages/pg/test/unit/client/pipeline-tests.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/pg/test/unit/client/pipeline-tests.js b/packages/pg/test/unit/client/pipeline-tests.js index e3cdad1e4..20e19b08a 100644 --- a/packages/pg/test/unit/client/pipeline-tests.js +++ b/packages/pg/test/unit/client/pipeline-tests.js @@ -52,21 +52,21 @@ suite.test('pipeline mode with multiple queries', (cb) => { let completed = 0 // Send multiple queries in pipeline mode - client.query({text: 'SELECT 1 as num'}, (err, res) => { + client.query({ text: 'SELECT 1 as num' }, (err, res) => { if (err) return cb(err) results[0] = res.rows[0].num completed++ if (completed === 3) checkResults() }) - client.query({text: 'SELECT 2 as num'}, (err, res) => { + client.query({ text: 'SELECT 2 as num' }, (err, res) => { if (err) return cb(err) results[1] = res.rows[0].num completed++ if (completed === 3) checkResults() }) - client.query({text: 'SELECT 3 as num'}, (err, res) => { + client.query({ text: 'SELECT 3 as num' }, (err, res) => { if (err) return cb(err) results[2] = res.rows[0].num completed++ From 007d616abb20c02c01242b83e57a1070dd7634e9 Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 16:40:00 +0200 Subject: [PATCH 09/11] chore: see result --- packages/pg/test/unit/client/pipeline-tests.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/pg/test/unit/client/pipeline-tests.js b/packages/pg/test/unit/client/pipeline-tests.js index 20e19b08a..41735bef1 100644 --- a/packages/pg/test/unit/client/pipeline-tests.js +++ b/packages/pg/test/unit/client/pipeline-tests.js @@ -54,6 +54,7 @@ suite.test('pipeline mode with multiple queries', (cb) => { // Send multiple queries in pipeline mode client.query({ text: 'SELECT 1 as num' }, (err, res) => { if (err) return cb(err) + console.log(res) results[0] = res.rows[0].num completed++ if (completed === 3) checkResults() @@ -61,6 +62,7 @@ suite.test('pipeline mode with multiple queries', (cb) => { client.query({ text: 'SELECT 2 as num' }, (err, res) => { if (err) return cb(err) + console.log(res) results[1] = res.rows[0].num completed++ if (completed === 3) checkResults() @@ -68,6 +70,7 @@ suite.test('pipeline mode with multiple queries', (cb) => { client.query({ text: 'SELECT 3 as num' }, (err, res) => { if (err) return cb(err) + console.log(res) results[2] = res.rows[0].num completed++ if (completed === 3) checkResults() From d3b7156ef6932c77d5d5632472adfba7b81e3799 Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 17:05:45 +0200 Subject: [PATCH 10/11] fix: race condition --- packages/pg/lib/client.js | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 2bba7e197..7cddb60a9 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -70,7 +70,6 @@ class Client extends EventEmitter { this._activeQuery = null this._pipelining = false this._pipelineQueue = [] - this._pipelineSync = false this.enableChannelBinding = Boolean(c.enableChannelBinding) // set true to use SCRAM-SHA-256-PLUS when offered this.connection = @@ -365,22 +364,18 @@ class Client extends EventEmitter { this.emit('connect') } - if (this._pipelining) { - // In pipeline mode, readyForQuery indicates end of current query in pipeline - const activeQuery = this._getActiveQuery() - if (activeQuery) { - activeQuery.handleReadyForQuery(this.connection) - // Remove completed query from pipeline queue - const index = this._pipelineQueue.indexOf(activeQuery) - if (index > -1) { - this._pipelineQueue.splice(index, 1) - } + if (this._pipelining && this._pipelineQueue.length > 0) { + // In pipeline mode, readyForQuery indicates completion of the current query + const completedQuery = this._pipelineQueue.shift() + if (completedQuery) { + completedQuery.handleReadyForQuery(this.connection) } - + // Set next query as active if available if (this._pipelineQueue.length > 0) { this._activeQuery = this._pipelineQueue[0] } else { + // All pipeline queries completed this._activeQuery = null this.readyForQuery = true this.emit('drain') @@ -631,11 +626,10 @@ class Client extends EventEmitter { // In pipeline mode, only extended query protocol is allowed if (this._pipelining) { - // TODO: better check! if (typeof config === 'string') { throw new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries instead.') } - if (query.text && query.text.includes(';')) { + if (query.text && query.text.split(';').filter(s => s.trim()).length > 1) { throw new Error('Multiple SQL commands in a single query are not allowed in pipeline mode.') } } @@ -809,12 +803,12 @@ class Client extends EventEmitter { while (this._queryQueue.length > 0) { const query = this._queryQueue.shift() this._pipelineQueue.push(query) - + // Force extended query protocol for pipeline mode if (!query.requiresPreparation()) { query.queryMode = 'extended' } - + const queryError = query.submit(this.connection) if (queryError) { process.nextTick(() => { @@ -829,12 +823,16 @@ class Client extends EventEmitter { } } - // Set active query to first in pipeline if we don't have one - if (!this._getActiveQuery() && this._pipelineQueue.length > 0) { - this._activeQuery = this._pipelineQueue[0] + // Send sync message to end the pipeline batch + if (this._pipelineQueue.length > 0 && this._queryQueue.length === 0) { + this.connection.sync() + // Set active query to first in pipeline to start processing responses + if (!this._getActiveQuery() && this._pipelineQueue.length > 0) { + this._activeQuery = this._pipelineQueue[0] + } } - this.readyForQuery = true + this.readyForQuery = false // We're not ready for more queries until pipeline completes } } From 1cc090aa6c67fc119d56b635e0809fe326f37a64 Mon Sep 17 00:00:00 2001 From: francesco Date: Fri, 18 Jul 2025 17:08:15 +0200 Subject: [PATCH 11/11] fix: lint --- packages/pg/lib/client.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 7cddb60a9..7f2aee98f 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -370,7 +370,7 @@ class Client extends EventEmitter { if (completedQuery) { completedQuery.handleReadyForQuery(this.connection) } - + // Set next query as active if available if (this._pipelineQueue.length > 0) { this._activeQuery = this._pipelineQueue[0] @@ -629,7 +629,7 @@ class Client extends EventEmitter { if (typeof config === 'string') { throw new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries instead.') } - if (query.text && query.text.split(';').filter(s => s.trim()).length > 1) { + if (query.text && query.text.split(';').filter((s) => s.trim()).length > 1) { throw new Error('Multiple SQL commands in a single query are not allowed in pipeline mode.') } } @@ -803,12 +803,12 @@ class Client extends EventEmitter { while (this._queryQueue.length > 0) { const query = this._queryQueue.shift() this._pipelineQueue.push(query) - + // Force extended query protocol for pipeline mode if (!query.requiresPreparation()) { query.queryMode = 'extended' } - + const queryError = query.submit(this.connection) if (queryError) { process.nextTick(() => {