Skip to content

Commit a1915db

Browse files
authored
chore(nodejs): refactor buffer handling (#47)
1 parent 7e63cf5 commit a1915db

File tree

4 files changed

+91
-215
lines changed

4 files changed

+91
-215
lines changed

src/options.ts

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,6 @@ const UNSAFE_OFF = "unsafe_off";
105105
* <li> max_name_len: <i>integer</i> - The maximum length of a table or column name, the Sender defaults this parameter to 127. <br>
106106
* Recommended to use the same setting as the server, which also uses 127 by default.
107107
* </li>
108-
* <li> copy_buffer: <i>enum, accepted values: on, off</i> - By default, the Sender creates a new buffer for every flush() call,
109-
* and the data to be sent to the server is copied into this new buffer.
110-
* Setting the flag to <i>off</i> results in reusing the same buffer instance for each flush() call. <br>
111-
* Use this flag only if calls to the client are serialised.
112-
* </li>
113108
* </ul>
114109
*/
115110
class SenderOptions {
@@ -129,9 +124,6 @@ class SenderOptions {
129124
auto_flush_rows?: number;
130125
auto_flush_interval?: number;
131126

132-
// replaces `copyBuffer` option
133-
copy_buffer?: string | boolean | null;
134-
135127
request_min_throughput?: number;
136128
request_timeout?: number;
137129
retry_timeout?: number;
@@ -243,7 +235,6 @@ function parseConfigurationString(
243235
parseTlsOptions(options);
244236
parseRequestTimeoutOptions(options);
245237
parseMaxNameLength(options);
246-
parseCopyBuffer(options);
247238
}
248239

249240
function parseSettings(
@@ -299,7 +290,6 @@ const ValidConfigKeys = [
299290
"auto_flush",
300291
"auto_flush_rows",
301292
"auto_flush_interval",
302-
"copy_buffer",
303293
"request_min_throughput",
304294
"request_timeout",
305295
"retry_timeout",
@@ -432,10 +422,6 @@ function parseMaxNameLength(options: SenderOptions) {
432422
parseInteger(options, "max_name_len", "max name length", 1);
433423
}
434424

435-
function parseCopyBuffer(options: SenderOptions) {
436-
parseBoolean(options, "copy_buffer", "copy buffer");
437-
}
438-
439425
function parseBoolean(
440426
options: SenderOptions,
441427
property: string,

src/sender.ts

Lines changed: 41 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import crypto from "node:crypto";
77
import { Agent, RetryAgent } from "undici";
88

99
import { log } from "./logging";
10-
import { validateTableName, validateColumnName } from "./validation";
10+
import { validateColumnName, validateTableName } from "./validation";
1111
import { SenderOptions, HTTP, HTTPS, TCP, TCPS } from "./options";
1212

1313
const HTTP_NO_CONTENT = 204; // success
@@ -121,8 +121,6 @@ class Sender {
121121
/** @private */ bufferSize;
122122
/** @private */ maxBufferSize;
123123
/** @private */ buffer;
124-
/** @private */ toBuffer;
125-
/** @private */ doResolve;
126124
/** @private */ position;
127125
/** @private */ endOfLastRow;
128126

@@ -145,7 +143,6 @@ class Sender {
145143
/** @private */ log;
146144
/** @private */ agent;
147145
/** @private */ jwk;
148-
/** @private */ flushPromiseChain: Promise<boolean>;
149146

150147
/**
151148
* Creates an instance of Sender.
@@ -157,10 +154,8 @@ class Sender {
157154
if (!options || !options.protocol) {
158155
throw new Error("The 'protocol' option is mandatory");
159156
}
160-
replaceDeprecatedOptions(options);
161-
162157
this.log = typeof options.log === "function" ? options.log : log;
163-
this.flushPromiseChain = Promise.resolve(true as boolean);
158+
replaceDeprecatedOptions(options, this.log);
164159

165160
switch (options.protocol) {
166161
case HTTP:
@@ -248,8 +243,6 @@ class Sender {
248243
? options.retry_timeout
249244
: DEFAULT_RETRY_TIMEOUT;
250245

251-
const noCopy = isBoolean(options.copy_buffer) && !options.copy_buffer;
252-
this.toBuffer = noCopy ? this.toBufferView : this.toBufferNew;
253246
this.maxBufferSize = isInteger(options.max_buf_size, 1)
254247
? options.max_buf_size
255248
: DEFAULT_MAX_BUFFER_SIZE;
@@ -440,64 +433,17 @@ class Sender {
440433
}
441434

442435
/**
443-
* @ignore
444-
* Compacts the buffer after data has been sent and resets pending row count.
445-
* This method should only be called after a flush operation has successfully sent data.
446-
* @param {number} bytesSent The number of bytes that were successfully sent and should be compacted.
447-
*/
448-
private _compactBufferAndResetState(bytesSent: number) {
449-
if (bytesSent > 0 && bytesSent <= this.position) {
450-
this.buffer.copy(this.buffer, 0, bytesSent, this.position);
451-
this.position = this.position - bytesSent;
452-
} else if (bytesSent > this.position) {
453-
// This case should ideally not happen if logic is correct, means we tried to compact more than available
454-
this.position = 0;
455-
}
456-
// If bytesSent is 0 or negative, or if no actual data was at the start of the buffer to be shifted,
457-
// this.position effectively remains the same relative to the start of new data.
458-
459-
this.endOfLastRow = Math.max(0, this.endOfLastRow - bytesSent);
460-
// Ensure endOfLastRow is also shifted if it was within the compacted area,
461-
// or reset if it pointed to data that's now gone.
462-
// If new rows were added while flushing, endOfLastRow would be > position post-compaction of old data.
463-
// This needs careful handling if new data is added *during* an async flush.
464-
// For now, we assume endOfLastRow is relative to the data just flushed.
465-
// A simpler approach might be to always set this.endOfLastRow = 0 after a successful flush,
466-
// as startNewRow() will set it correctly for the *next* new row.
467-
// However, if a flush doesn't clear all pending complete rows, this needs to be accurate.
468-
// The current `flush` logic sends up to `this.endOfLastRow`, so after sending `dataAmountToSend`
469-
// (which was `this.endOfLastRow` at the time of prepping the flush), the new `this.endOfLastRow`
470-
// should effectively be 0 relative to the start of the compacted buffer, until a new row is started.
471-
472-
this.lastFlushTime = Date.now();
473-
this.pendingRowCount = 0; // Reset after successful flush
474-
// If autoFlush was triggered by row count, this reset is crucial.
475-
// If triggered by interval, this is also fine.
476-
}
477-
478-
/**
479-
* @ignore
480-
* Executes the actual data sending logic (HTTP or TCP).
481-
* This is called by the `flush` method, wrapped in the promise chain.
482-
* @return {Promise<boolean>} Resolves to true if data was sent.
436+
* Sends the buffer's content to the database and compacts the buffer.
437+
* If the last row is not finished it stays in the sender's buffer.
438+
*
439+
* @return {Promise<boolean>} Resolves to true when there was data in the buffer to send, and it was sent successfully.
483440
*/
484-
private async _executeFlush(): Promise<boolean> {
485-
const dataAmountToSend = this.endOfLastRow;
486-
if (dataAmountToSend <= 0) {
441+
async flush(): Promise<boolean> {
442+
const dataToSend = this.toBufferNew(this.endOfLastRow);
443+
if (!dataToSend) {
487444
return false; // Nothing to send
488445
}
489446

490-
// Use toBufferView to get a reference, actual data copy for sending happens based on protocol needs
491-
const dataView = this.toBufferView(dataAmountToSend);
492-
if (!dataView) {
493-
return false; // Should not happen if dataAmountToSend > 0, but a safe check
494-
}
495-
496-
// Create a copy for sending to avoid issues if the underlying buffer changes
497-
// This is especially important for async operations.
498-
const dataToSend = Buffer.allocUnsafe(dataView.length);
499-
dataView.copy(dataToSend);
500-
501447
try {
502448
if (this.http) {
503449
const { timeout: calculatedTimeoutMillis } = createRequestOptions(this, dataToSend);
@@ -557,13 +503,11 @@ class Sender {
557503
`Unexpected message from server: ${Buffer.from(responseBody).toString()}`,
558504
);
559505
}
560-
this._compactBufferAndResetState(dataAmountToSend);
561506
return true;
562507
} else {
563-
const error = new Error(
508+
throw new Error(
564509
`HTTP request failed, statusCode=${statusCode}, error=${Buffer.from(responseBody).toString()}`,
565510
);
566-
throw error;
567511
}
568512
} else { // TCP
569513
if (!this.socket || this.socket.destroyed) {
@@ -574,7 +518,6 @@ class Sender {
574518
if (err) {
575519
reject(err);
576520
} else {
577-
this._compactBufferAndResetState(dataAmountToSend);
578521
resolve(true);
579522
}
580523
});
@@ -609,51 +552,27 @@ class Sender {
609552
}
610553
}
611554

612-
/**
613-
* Sends the buffer's content to the database and compacts the buffer.
614-
* If the last row is not finished it stays in the sender's buffer.
615-
* This operation is added to a queue and executed sequentially.
616-
*
617-
* @return {Promise<boolean>} Resolves to true when there was data in the buffer to send, and it was sent successfully.
618-
*/
619-
async flush(): Promise<boolean> {
620-
// Add to the promise chain to ensure sequential execution
621-
this.flushPromiseChain = this.flushPromiseChain
622-
.then(async () => {
623-
// Check if there's anything to flush just before execution
624-
if (this.endOfLastRow <= 0) {
625-
return false; // Nothing to flush
626-
}
627-
return this._executeFlush();
628-
})
629-
.catch((err: Error) => {
630-
// Log or handle error. If _executeFlush throws, it will be caught here.
631-
// The error should have already been logged by _executeFlush.
632-
// We re-throw to ensure the promise chain reflects the failure.
633-
this.log("error", `Flush operation failed in chain: ${err.message}`);
634-
throw err; // Propagate error to the caller of this specific flush()
635-
});
636-
return this.flushPromiseChain;
637-
}
638-
639555
/**
640556
* @ignore
641-
* @return {Buffer} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
557+
* @return {Buffer} Returns a cropped buffer, or null if there is nothing to send.
642558
* The returned buffer is backed by the sender's buffer.
559+
* Used only in tests.
643560
*/
644561
toBufferView(pos = this.position): Buffer {
645562
return pos > 0 ? this.buffer.subarray(0, pos) : null;
646563
}
647564

648565
/**
649566
* @ignore
650-
* @return {Buffer|null} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
567+
* @return {Buffer|null} Returns a cropped buffer ready to send to the server, or null if there is nothing to send.
651568
* The returned buffer is a copy of the sender's buffer.
569+
* It also compacts the Sender's buffer.
652570
*/
653571
toBufferNew(pos = this.position): Buffer | null {
654572
if (pos > 0) {
655573
const data = Buffer.allocUnsafe(pos);
656574
this.buffer.copy(data, 0, 0, pos);
575+
compact(this);
657576
return data;
658577
}
659578
return null;
@@ -928,7 +847,7 @@ function createRequestOptions(
928847
): InternalHttpOptions {
929848
const timeoutMillis =
930849
(data.length / sender.requestMinThroughput) * 1000 + sender.requestTimeout;
931-
const options: InternalHttpOptions = {
850+
return {
932851
hostname: sender.host,
933852
port: sender.port,
934853
agent: sender.agent,
@@ -937,8 +856,6 @@ function createRequestOptions(
937856
method: "POST",
938857
timeout: timeoutMillis,
939858
};
940-
941-
return options;
942859
}
943860

944861
async function autoFlush(sender: Sender) {
@@ -950,12 +867,7 @@ async function autoFlush(sender: Sender) {
950867
(sender.autoFlushInterval > 0 &&
951868
Date.now() - sender.lastFlushTime >= sender.autoFlushInterval))
952869
) {
953-
// await sender.flush(); // Old call
954-
sender.flush().catch(err => {
955-
// Auto-flush errors should be logged but not necessarily crash the application
956-
// The error is already logged by the flush chain's catch block or _executeFlush
957-
sender.log("error", `Auto-flush failed: ${err.message}`);
958-
});
870+
await sender.flush();
959871
}
960872
}
961873

@@ -973,6 +885,17 @@ function checkCapacity(sender: Sender, data: string[], base = 0) {
973885
}
974886
}
975887

888+
function compact(sender: Sender) {
889+
if (sender.endOfLastRow > 0) {
890+
sender.buffer.copy(sender.buffer, 0, sender.endOfLastRow, sender.position);
891+
sender.position = sender.position - sender.endOfLastRow;
892+
sender.endOfLastRow = 0;
893+
894+
sender.lastFlushTime = Date.now();
895+
sender.pendingRowCount = 0;
896+
}
897+
}
898+
976899
function writeColumn(
977900
sender: Sender,
978901
name: string,
@@ -1073,18 +996,26 @@ function timestampToNanos(timestamp: bigint, unit: "ns" | "us" | "ms") {
1073996
}
1074997

1075998
type DeprecatedOptions = {
999+
/** @deprecated */
1000+
copy_buffer?: boolean;
10761001
/** @deprecated */
10771002
copyBuffer?: boolean;
10781003
/** @deprecated */
10791004
bufferSize?: number;
10801005
};
1081-
function replaceDeprecatedOptions(options: SenderOptions & DeprecatedOptions) {
1006+
function replaceDeprecatedOptions(
1007+
options: SenderOptions & DeprecatedOptions,
1008+
log: (level: "error" | "warn" | "info" | "debug", message: string) => void
1009+
) {
10821010
// deal with deprecated options
1083-
if (options.copyBuffer) {
1084-
options.copy_buffer = options.copyBuffer;
1085-
options.copyBuffer = undefined;
1011+
if (options.copy_buffer !== undefined) {
1012+
log("warn", `Option 'copy_buffer' is not supported anymore, please, remove it`);
1013+
}
1014+
if (options.copyBuffer !== undefined) {
1015+
log("warn", `Option 'copyBuffer' is not supported anymore, please, remove it`);
10861016
}
1087-
if (options.bufferSize) {
1017+
if (options.bufferSize !== undefined) {
1018+
log("warn", `Option 'bufferSize' is not supported anymore, please, replace it with 'init_buf_size'`);
10881019
options.init_buf_size = options.bufferSize;
10891020
options.bufferSize = undefined;
10901021
}

test/options.test.ts

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -627,41 +627,6 @@ describe("Configuration string parser suite", function () {
627627
).toThrow("Invalid retry timeout option, not a number: '6w0'");
628628
});
629629

630-
it("can parse copy_buffer config", function () {
631-
let options = SenderOptions.fromConfig(
632-
"http::addr=host:9000;copy_buffer=on;",
633-
);
634-
expect(options.protocol).toBe("http");
635-
expect(options.host).toBe("host");
636-
expect(options.port).toBe(9000);
637-
expect(options.copy_buffer).toBe(true);
638-
639-
options = SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=off;");
640-
expect(options.protocol).toBe("http");
641-
expect(options.host).toBe("host");
642-
expect(options.port).toBe(9000);
643-
expect(options.copy_buffer).toBe(false);
644-
645-
expect(() =>
646-
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=ON;"),
647-
).toThrow("Invalid copy buffer option: 'ON'");
648-
expect(() =>
649-
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=On;"),
650-
).toThrow("Invalid copy buffer option: 'On'");
651-
expect(() =>
652-
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=true;"),
653-
).toThrow("Invalid copy buffer option: 'true'");
654-
expect(() =>
655-
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=OFF;"),
656-
).toThrow("Invalid copy buffer option: 'OFF'");
657-
expect(() =>
658-
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=Off;"),
659-
).toThrow("Invalid copy buffer option: 'Off'");
660-
expect(() =>
661-
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=false;"),
662-
).toThrow("Invalid copy buffer option: 'false'");
663-
});
664-
665630
it("can parse max_name_len config", function () {
666631
const options = SenderOptions.fromConfig(
667632
"http::addr=host:9000;max_name_len=30",

0 commit comments

Comments
 (0)