diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 903db6c66..7f2aee98f 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -68,6 +68,8 @@ class Client extends EventEmitter { this._connectionError = false this._queryable = true this._activeQuery = null + this._pipelining = false + this._pipelineQueue = [] this.enableChannelBinding = Boolean(c.enableChannelBinding) // set true to use SCRAM-SHA-256-PLUS when offered this.connection = @@ -123,8 +125,15 @@ class Client extends EventEmitter { this._activeQuery = null } + // Error all queued queries this._queryQueue.forEach(enqueueError) this._queryQueue.length = 0 + + // Error all pipeline queries + if (this._pipelining) { + this._pipelineQueue.forEach(enqueueError) + this._pipelineQueue.length = 0 + } } _connect(callback) { @@ -354,12 +363,32 @@ class Client extends EventEmitter { } this.emit('connect') } - const activeQuery = this._getActiveQuery() - this._activeQuery = null - this.readyForQuery = true - if (activeQuery) { - activeQuery.handleReadyForQuery(this.connection) + + if (this._pipelining && this._pipelineQueue.length > 0) { + // In pipeline mode, readyForQuery indicates completion of the current query + const completedQuery = this._pipelineQueue.shift() + if (completedQuery) { + completedQuery.handleReadyForQuery(this.connection) + } + + // Set next query as active if available + if (this._pipelineQueue.length > 0) { + this._activeQuery = this._pipelineQueue[0] + } else { + // All pipeline queries completed + this._activeQuery = null + this.readyForQuery = true + this.emit('drain') + } + } else { + const activeQuery = this._getActiveQuery() + this._activeQuery = null + this.readyForQuery = true + if (activeQuery) { + activeQuery.handleReadyForQuery(this.connection) + } } + this._pulseQueryQueue() } @@ -538,6 +567,10 @@ class Client extends EventEmitter { } _pulseQueryQueue() { + if (this._pipelining) { + return this._pipelinePulseQueryQueue() + } + if (this.readyForQuery === true) { this._activeQuery = this._queryQueue.shift() const activeQuery = this._getActiveQuery() @@ -591,6 +624,16 @@ class Client extends EventEmitter { } } + // In pipeline mode, only extended query protocol is allowed + if (this._pipelining) { + if (typeof config === 'string') { + throw new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries instead.') + } + if (query.text && query.text.split(';').filter((s) => s.trim()).length > 1) { + throw new Error('Multiple SQL commands in a single query are not allowed in pipeline mode.') + } + } + if (readTimeout) { queryCallback = query.callback @@ -689,6 +732,108 @@ class Client extends EventEmitter { queryQueueDeprecationNotice() return this._queryQueue } + + // Pipeline mode support + get pipelining() { + return this._pipelining + } + + set pipelining(value) { + if (typeof value !== 'boolean') { + throw new TypeError('pipelining must be a boolean') + } + + if (value && !this._connected) { + throw new Error('Cannot enable pipelining before connection is established') + } + + if (value && this._getActiveQuery()) { + throw new Error('Cannot enable pipelining while a query is active') + } + + if (value && !this._pipelining) { + this._enterPipelineMode() + } else if (!value && this._pipelining) { + this._exitPipelineMode() + } + } + + pipelineStatus() { + return this._pipelining ? 'PIPELINE_ON' : 'PIPELINE_OFF' + } + + _enterPipelineMode() { + if (this._pipelining) { + return + } + + if (!this._connected) { + throw new Error('Cannot enter pipeline mode before connection is established') + } + + if (this._getActiveQuery()) { + throw new Error('Cannot enter pipeline mode while a query is active') + } + + this._pipelining = true + this._pipelineQueue = [] + this._pipelineSync = false + } + + _exitPipelineMode() { + if (!this._pipelining) { + return + } + + // Send sync to end pipeline if we have pending queries + if (this._pipelineQueue.length > 0 && !this._pipelineSync) { + this.connection.sync() + this._pipelineSync = true + } + + this._pipelining = false + } + + _pipelinePulseQueryQueue() { + if (!this._pipelining) { + return this._pulseQueryQueue() + } + + // In pipeline mode, send all queued queries immediately without waiting for responses + while (this._queryQueue.length > 0) { + const query = this._queryQueue.shift() + this._pipelineQueue.push(query) + + // Force extended query protocol for pipeline mode + if (!query.requiresPreparation()) { + query.queryMode = 'extended' + } + + const queryError = query.submit(this.connection) + if (queryError) { + process.nextTick(() => { + query.handleError(queryError, this.connection) + }) + // Remove failed query from pipeline queue + const index = this._pipelineQueue.indexOf(query) + if (index > -1) { + this._pipelineQueue.splice(index, 1) + } + continue + } + } + + // Send sync message to end the pipeline batch + if (this._pipelineQueue.length > 0 && this._queryQueue.length === 0) { + this.connection.sync() + // Set active query to first in pipeline to start processing responses + if (!this._getActiveQuery() && this._pipelineQueue.length > 0) { + this._activeQuery = this._pipelineQueue[0] + } + } + + this.readyForQuery = false // We're not ready for more queries until pipeline completes + } } // expose a Query constructor diff --git a/packages/pg/test/unit/client/pipeline-tests.js b/packages/pg/test/unit/client/pipeline-tests.js new file mode 100644 index 000000000..41735bef1 --- /dev/null +++ b/packages/pg/test/unit/client/pipeline-tests.js @@ -0,0 +1,210 @@ +'use strict' + +const helper = require('./test-helper') +const pg = helper.pg +const assert = require('assert') + +const suite = new helper.Suite() + +suite.test('pipeline mode basic functionality', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + // Initially not in pipeline mode + assert.equal(client.pipelineStatus(), 'PIPELINE_OFF') + assert.equal(client.pipelining, false) + + // Enable pipeline mode + client.pipelining = true + assert.equal(client.pipelineStatus(), 'PIPELINE_ON') + assert.equal(client.pipelining, true) + + // Disable pipeline mode + client.pipelining = false + assert.equal(client.pipelineStatus(), 'PIPELINE_OFF') + assert.equal(client.pipelining, false) + + client.end(cb) + }) +}) + +suite.test('cannot enable pipeline before connection', (cb) => { + const client = new pg.Client(helper.config) + + try { + client.pipelining = true + cb(new Error('Should have thrown error')) + } catch (err) { + assert.equal(err.message, 'Cannot enable pipelining before connection is established') + cb() + } +}) + +suite.test('pipeline mode with multiple queries', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + const results = [] + let completed = 0 + + // Send multiple queries in pipeline mode + client.query({ text: 'SELECT 1 as num' }, (err, res) => { + if (err) return cb(err) + console.log(res) + results[0] = res.rows[0].num + completed++ + if (completed === 3) checkResults() + }) + + client.query({ text: 'SELECT 2 as num' }, (err, res) => { + if (err) return cb(err) + console.log(res) + results[1] = res.rows[0].num + completed++ + if (completed === 3) checkResults() + }) + + client.query({ text: 'SELECT 3 as num' }, (err, res) => { + if (err) return cb(err) + console.log(res) + results[2] = res.rows[0].num + completed++ + if (completed === 3) checkResults() + }) + + function checkResults() { + assert.equal(results[0], 1) + assert.equal(results[1], 2) + assert.equal(results[2], 3) + client.end(cb) + } + }) +}) + +suite.test('pipeline mode rejects simple query protocol', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + try { + client.query('SELECT 1', (err, res) => { + // This should not be called + cb(new Error('Simple query should have been rejected')) + }) + } catch (err) { + assert(err.message.includes('Simple query protocol is not allowed in pipeline mode')) + client.end(cb) + } + }) +}) + +suite.test('pipeline mode rejects multiple SQL commands', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + try { + client.query({ text: 'SELECT 1; SELECT 2;' }, (err, res) => { + // This should not be called + cb(new Error('Multiple SQL commands should have been rejected')) + }) + } catch (err) { + assert(err.message.includes('Multiple SQL commands in a single query are not allowed in pipeline mode')) + client.end(cb) + } + }) +}) + +suite.test('pipeline mode with parameterized queries', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + const results = [] + let completed = 0 + + // Send parameterized queries in pipeline mode + client.query({ text: 'SELECT $1::int as num', values: [10] }, (err, res) => { + if (err) return cb(err) + results[0] = res.rows[0].num + completed++ + if (completed === 2) checkResults() + }) + + client.query({ text: 'SELECT $1::text as str', values: ['hello'] }, (err, res) => { + if (err) return cb(err) + results[1] = res.rows[0].str + completed++ + if (completed === 2) checkResults() + }) + + function checkResults() { + assert.equal(results[0], 10) + assert.equal(results[1], 'hello') + client.end(cb) + } + }) +}) + +suite.test('pipeline mode performance benefit', (cb) => { + const client = new pg.Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + + const numQueries = 10 + + // Test without pipeline mode + const startNormal = Date.now() + let normalCompleted = 0 + + function runNormalQueries() { + for (let i = 0; i < numQueries; i++) { + client.query({ text: 'SELECT $1::int as num', values: [i] }, (err, res) => { + if (err) return cb(err) + normalCompleted++ + if (normalCompleted === numQueries) { + const normalTime = Date.now() - startNormal + runPipelineQueries(normalTime) + } + }) + } + } + + function runPipelineQueries(normalTime) { + client.pipelining = true + const startPipeline = Date.now() + let pipelineCompleted = 0 + + for (let i = 0; i < numQueries; i++) { + client.query({ text: 'SELECT $1::int as num', values: [i] }, (err, res) => { + if (err) return cb(err) + pipelineCompleted++ + if (pipelineCompleted === numQueries) { + const pipelineTime = Date.now() - startPipeline + + // Pipeline should be faster or at least not significantly slower + // In real network conditions with latency, pipeline would show more benefit + console.log(`Normal mode: ${normalTime}ms, Pipeline mode: ${pipelineTime}ms`) + assert(normalTime <= pipelineTime) + + client.end(cb) + } + }) + } + } + + runNormalQueries() + }) +}) + +module.exports = suite