Skip to content

POC/WIP: pipeline mode #3517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 150 additions & 5 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class Client extends EventEmitter {
this._connectionError = false
this._queryable = true
this._activeQuery = null
this._pipelining = false
this._pipelineQueue = []

this.enableChannelBinding = Boolean(c.enableChannelBinding) // set true to use SCRAM-SHA-256-PLUS when offered
this.connection =
Expand Down Expand Up @@ -123,8 +125,15 @@ class Client extends EventEmitter {
this._activeQuery = null
}

// Error all queued queries
this._queryQueue.forEach(enqueueError)
this._queryQueue.length = 0

// Error all pipeline queries
if (this._pipelining) {
this._pipelineQueue.forEach(enqueueError)
this._pipelineQueue.length = 0
}
}

_connect(callback) {
Expand Down Expand Up @@ -354,12 +363,32 @@ class Client extends EventEmitter {
}
this.emit('connect')
}
const activeQuery = this._getActiveQuery()
this._activeQuery = null
this.readyForQuery = true
if (activeQuery) {
activeQuery.handleReadyForQuery(this.connection)

if (this._pipelining && this._pipelineQueue.length > 0) {
// In pipeline mode, readyForQuery indicates completion of the current query
const completedQuery = this._pipelineQueue.shift()
if (completedQuery) {
completedQuery.handleReadyForQuery(this.connection)
}

// Set next query as active if available
if (this._pipelineQueue.length > 0) {
this._activeQuery = this._pipelineQueue[0]
} else {
// All pipeline queries completed
this._activeQuery = null
this.readyForQuery = true
this.emit('drain')
}
} else {
const activeQuery = this._getActiveQuery()
this._activeQuery = null
this.readyForQuery = true
if (activeQuery) {
activeQuery.handleReadyForQuery(this.connection)
}
}

this._pulseQueryQueue()
}

Expand Down Expand Up @@ -538,6 +567,10 @@ class Client extends EventEmitter {
}

_pulseQueryQueue() {
if (this._pipelining) {
return this._pipelinePulseQueryQueue()
}

if (this.readyForQuery === true) {
this._activeQuery = this._queryQueue.shift()
const activeQuery = this._getActiveQuery()
Expand Down Expand Up @@ -591,6 +624,16 @@ class Client extends EventEmitter {
}
}

// In pipeline mode, only extended query protocol is allowed
if (this._pipelining) {
if (typeof config === 'string') {
throw new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries instead.')
}
if (query.text && query.text.split(';').filter((s) => s.trim()).length > 1) {
throw new Error('Multiple SQL commands in a single query are not allowed in pipeline mode.')
}
}

if (readTimeout) {
queryCallback = query.callback

Expand Down Expand Up @@ -689,6 +732,108 @@ class Client extends EventEmitter {
queryQueueDeprecationNotice()
return this._queryQueue
}

// Pipeline mode support
get pipelining() {
return this._pipelining
}

set pipelining(value) {
if (typeof value !== 'boolean') {
throw new TypeError('pipelining must be a boolean')
}

if (value && !this._connected) {
throw new Error('Cannot enable pipelining before connection is established')
}

if (value && this._getActiveQuery()) {
throw new Error('Cannot enable pipelining while a query is active')
}

if (value && !this._pipelining) {
this._enterPipelineMode()
} else if (!value && this._pipelining) {
this._exitPipelineMode()
}
}

pipelineStatus() {
return this._pipelining ? 'PIPELINE_ON' : 'PIPELINE_OFF'
}

_enterPipelineMode() {
if (this._pipelining) {
return
}

if (!this._connected) {
throw new Error('Cannot enter pipeline mode before connection is established')
}

if (this._getActiveQuery()) {
throw new Error('Cannot enter pipeline mode while a query is active')
}

this._pipelining = true
this._pipelineQueue = []
this._pipelineSync = false
}

_exitPipelineMode() {
if (!this._pipelining) {
return
}

// Send sync to end pipeline if we have pending queries
if (this._pipelineQueue.length > 0 && !this._pipelineSync) {
this.connection.sync()
this._pipelineSync = true
}

this._pipelining = false
}

_pipelinePulseQueryQueue() {
if (!this._pipelining) {
return this._pulseQueryQueue()
}

// In pipeline mode, send all queued queries immediately without waiting for responses
while (this._queryQueue.length > 0) {
const query = this._queryQueue.shift()
this._pipelineQueue.push(query)

// Force extended query protocol for pipeline mode
if (!query.requiresPreparation()) {
query.queryMode = 'extended'
}

const queryError = query.submit(this.connection)
if (queryError) {
process.nextTick(() => {
query.handleError(queryError, this.connection)
})
// Remove failed query from pipeline queue
const index = this._pipelineQueue.indexOf(query)
if (index > -1) {
this._pipelineQueue.splice(index, 1)
}
continue
}
}

// Send sync message to end the pipeline batch
if (this._pipelineQueue.length > 0 && this._queryQueue.length === 0) {
this.connection.sync()
// Set active query to first in pipeline to start processing responses
if (!this._getActiveQuery() && this._pipelineQueue.length > 0) {
this._activeQuery = this._pipelineQueue[0]
}
}

this.readyForQuery = false // We're not ready for more queries until pipeline completes
}
}

// expose a Query constructor
Expand Down
Loading
Loading