diff --git a/lib/base/connection.js b/lib/base/connection.js index ea91f1813a..b8bc90af8a 100644 --- a/lib/base/connection.js +++ b/lib/base/connection.js @@ -392,7 +392,7 @@ class BaseConnection extends EventEmitter { secureSocket.on('data', (data) => { this.packetParser.execute(data); }); - this.write = (buffer) => secureSocket.write(buffer); + this.stream = secureSocket; } protocolError(message, code) { diff --git a/test/integration/connection/test-backpressure-load-data-infile.test.cjs b/test/integration/connection/test-backpressure-load-data-infile.test.cjs new file mode 100644 index 0000000000..f052541c6f --- /dev/null +++ b/test/integration/connection/test-backpressure-load-data-infile.test.cjs @@ -0,0 +1,104 @@ +'use strict'; + +const { assert, test, log } = require('poku'); +const common = require('../../common.test.cjs'); +const { Readable, Duplex } = require('stream'); +const Net = require('node:net'); +const driver = require('../../../index.js'); +const { setTimeout } = require('node:timers/promises'); + +class BigInput extends Readable { + count = 0; + MAX_EXPECTED_ROWS = 100_000; + onStart = null; + + _read() { + if (this.onStart) { + this.onStart(); + this.onStart = null; + } + + if (this.count < this.MAX_EXPECTED_ROWS) { + this.count++; + const row = `${this.count}-${Math.random()}\n`; + this.push(row); + } else { + this.push(null); + } + } +} + +test('load data infile backpressure on local stream', async () => { + const config = common.config; + const netStream = Net.connect(config.port, config.host); + netStream.setNoDelay(true); + await new Promise((resolve, reject) => + netStream.once('connect', resolve).once('error', reject) + ); + + class NetworkInterceptor extends Duplex { + simulateWriteBackpressure = false; + + constructor() { + super({ writableHighWaterMark: 65536 }); + netStream.on('data', (data) => { + const continueReading = this.push(data); + if (!continueReading) { + netStream.pause(); + } + }); + netStream.on('error', (err) => this.destroy(err)); + } + + _read() { + netStream.resume(); + } + + _write(chunk, encoding, callback) { + netStream.write(chunk, encoding, (err) => { + if (err) { + callback(err); + } else if (!this.simulateWriteBackpressure) { + callback(); + } + }); + } + } + + const interceptor = new NetworkInterceptor(); + const connection = driver.createConnection({ + ...config, + multipleStatements: true, + stream: interceptor, + }); + + try { + const bigInput = new BigInput(); + bigInput.onStart = () => (interceptor.simulateWriteBackpressure = true); + + connection.query( + { + sql: ` + set global local_infile = 1; + create temporary table test_load_data_backpressure (id varchar(100)); + load data local infile "_" replace into table test_load_data_backpressure; + `, + infileStreamFactory: () => bigInput, + }, + (err, result) => { + if (err) throw err; + log('Load complete', result); + } + ); + + await setTimeout(100); // allow time for backpressure to take effect + + assert.ok( + bigInput.count < bigInput.MAX_EXPECTED_ROWS, + `expected backpressure to stop infile stream at less than ${bigInput.MAX_EXPECTED_ROWS} rows (read ${bigInput.count} rows)` + ); + } finally { + connection.close(); + netStream.destroy(); + } +}); diff --git a/test/integration/connection/test-backpressure-result-streaming.test.cjs b/test/integration/connection/test-backpressure-result-streaming.test.cjs new file mode 100644 index 0000000000..792c22cd34 --- /dev/null +++ b/test/integration/connection/test-backpressure-result-streaming.test.cjs @@ -0,0 +1,56 @@ +'use strict'; + +const { assert, test } = require('poku'); +const common = require('../../common.test.cjs'); +const timers = require('node:timers'); + +test('result event backpressure with pause/resume', async () => { + const connection = common.createConnection({ + multipleStatements: true, + }); + try { + // in case wrapping with TLS, get the underlying socket first so we can see actual number of bytes read + const originalSocket = connection.stream; + + // the full result set will be over 6 MB + const largeQuery = ` + SET SESSION cte_max_recursion_depth = 100000; + WITH RECURSIVE cte (n, s) AS ( + SELECT 1, 'this is just to cause more bytes transferred for each row' + UNION ALL + SELECT n + 1, s + FROM cte + WHERE n < 100000 + ) + SELECT * FROM cte; + `; + + let resultRowsCount = 0; + await new Promise((resolve, reject) => + connection + .query(largeQuery) + .on('result', (row) => { + resultRowsCount++; + if (row.n === 1) { + connection.pause(); + resolve(); + } + }) + .on('error', reject) + ); + + // if backpressure is not working, the bytes received will grow during this time, even though connection is paused + await timers.promises.setTimeout(500); + + assert.equal(resultRowsCount, 2, 'stop receiving result rows when paused'); + + // if backpressure is working, there should be less than 1 MB received; + // experimentally it appears to be around 100 KB but may vary if buffer sizes change + assert.ok( + originalSocket.bytesRead < 1000000, + `Received ${originalSocket.bytesRead} bytes on paused connection` + ); + } finally { + connection.close(); + } +});