Skip to content

Fix backpressure when using TLS #1752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/base/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class BaseConnection extends EventEmitter {
secureSocket.on('data', (data) => {
this.packetParser.execute(data);
});
this.write = (buffer) => secureSocket.write(buffer);
this.stream = secureSocket;
}

protocolError(message, code) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'use strict';

const { assert, test, log } = require('poku');
const common = require('../../common.test.cjs');
const { Readable, Duplex } = require('stream');
const Net = require('node:net');
const driver = require('../../../index.js');
const { setTimeout } = require('node:timers/promises');

class BigInput extends Readable {
count = 0;
MAX_EXPECTED_ROWS = 100_000;
onStart = null;

_read() {
if (this.onStart) {
this.onStart();
this.onStart = null;
}

if (this.count < this.MAX_EXPECTED_ROWS) {
this.count++;
const row = `${this.count}-${Math.random()}\n`;
this.push(row);
} else {
this.push(null);
}
}
}

test('load data infile backpressure on local stream', async () => {
const config = common.config;
const netStream = Net.connect(config.port, config.host);
netStream.setNoDelay(true);
await new Promise((resolve, reject) =>
netStream.once('connect', resolve).once('error', reject)
);

class NetworkInterceptor extends Duplex {
simulateWriteBackpressure = false;

constructor() {
super({ writableHighWaterMark: 65536 });
netStream.on('data', (data) => {
const continueReading = this.push(data);
if (!continueReading) {
netStream.pause();
}
});
netStream.on('error', (err) => this.destroy(err));
}

_read() {
netStream.resume();
}

_write(chunk, encoding, callback) {
netStream.write(chunk, encoding, (err) => {
if (err) {
callback(err);
} else if (!this.simulateWriteBackpressure) {
callback();
}
});
}
}

const interceptor = new NetworkInterceptor();
const connection = driver.createConnection({
...config,
multipleStatements: true,
stream: interceptor,
});

try {
const bigInput = new BigInput();
bigInput.onStart = () => (interceptor.simulateWriteBackpressure = true);

connection.query(
{
sql: `
set global local_infile = 1;
create temporary table test_load_data_backpressure (id varchar(100));
load data local infile "_" replace into table test_load_data_backpressure;
`,
infileStreamFactory: () => bigInput,
},
(err, result) => {
if (err) throw err;
log('Load complete', result);
}
);

await setTimeout(100); // allow time for backpressure to take effect

assert.ok(
bigInput.count < bigInput.MAX_EXPECTED_ROWS,
`expected backpressure to stop infile stream at less than ${bigInput.MAX_EXPECTED_ROWS} rows (read ${bigInput.count} rows)`
);
} finally {
connection.close();
netStream.destroy();
}
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict';

const { assert, test } = require('poku');
const common = require('../../common.test.cjs');
const timers = require('node:timers');

test('result event backpressure with pause/resume', async () => {
const connection = common.createConnection({
multipleStatements: true,
});
try {
// in case wrapping with TLS, get the underlying socket first so we can see actual number of bytes read
const originalSocket = connection.stream;

// the full result set will be over 6 MB
const largeQuery = `
SET SESSION cte_max_recursion_depth = 100000;
WITH RECURSIVE cte (n, s) AS (
SELECT 1, 'this is just to cause more bytes transferred for each row'
UNION ALL
SELECT n + 1, s
FROM cte
WHERE n < 100000
)
SELECT * FROM cte;
`;

let resultRowsCount = 0;
await new Promise((resolve, reject) =>
connection
.query(largeQuery)
.on('result', (row) => {
resultRowsCount++;
if (row.n === 1) {
connection.pause();
resolve();
}
})
.on('error', reject)
);

// if backpressure is not working, the bytes received will grow during this time, even though connection is paused
await timers.promises.setTimeout(500);

assert.equal(resultRowsCount, 2, 'stop receiving result rows when paused');

// if backpressure is working, there should be less than 1 MB received;
// experimentally it appears to be around 100 KB but may vary if buffer sizes change
assert.ok(
originalSocket.bytesRead < 1000000,
`Received ${originalSocket.bytesRead} bytes on paused connection`
);
} finally {
connection.close();
}
});
Loading