Skip to content

chore(nodejs): refactor buffer handling #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ const UNSAFE_OFF = "unsafe_off";
* <li> max_name_len: <i>integer</i> - The maximum length of a table or column name, the Sender defaults this parameter to 127. <br>
* Recommended to use the same setting as the server, which also uses 127 by default.
* </li>
* <li> copy_buffer: <i>enum, accepted values: on, off</i> - By default, the Sender creates a new buffer for every flush() call,
* and the data to be sent to the server is copied into this new buffer.
* Setting the flag to <i>off</i> results in reusing the same buffer instance for each flush() call. <br>
* Use this flag only if calls to the client are serialised.
* </li>
* </ul>
*/
class SenderOptions {
Expand All @@ -129,9 +124,6 @@ class SenderOptions {
auto_flush_rows?: number;
auto_flush_interval?: number;

// replaces `copyBuffer` option
copy_buffer?: string | boolean | null;

request_min_throughput?: number;
request_timeout?: number;
retry_timeout?: number;
Expand Down Expand Up @@ -243,7 +235,6 @@ function parseConfigurationString(
parseTlsOptions(options);
parseRequestTimeoutOptions(options);
parseMaxNameLength(options);
parseCopyBuffer(options);
}

function parseSettings(
Expand Down Expand Up @@ -299,7 +290,6 @@ const ValidConfigKeys = [
"auto_flush",
"auto_flush_rows",
"auto_flush_interval",
"copy_buffer",
"request_min_throughput",
"request_timeout",
"retry_timeout",
Expand Down Expand Up @@ -432,10 +422,6 @@ function parseMaxNameLength(options: SenderOptions) {
parseInteger(options, "max_name_len", "max name length", 1);
}

function parseCopyBuffer(options: SenderOptions) {
parseBoolean(options, "copy_buffer", "copy buffer");
}

function parseBoolean(
options: SenderOptions,
property: string,
Expand Down
151 changes: 41 additions & 110 deletions src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import crypto from "node:crypto";
import { Agent, RetryAgent } from "undici";

import { log } from "./logging";
import { validateTableName, validateColumnName } from "./validation";
import { validateColumnName, validateTableName } from "./validation";
import { SenderOptions, HTTP, HTTPS, TCP, TCPS } from "./options";

const HTTP_NO_CONTENT = 204; // success
Expand Down Expand Up @@ -121,8 +121,6 @@ class Sender {
/** @private */ bufferSize;
/** @private */ maxBufferSize;
/** @private */ buffer;
/** @private */ toBuffer;
/** @private */ doResolve;
/** @private */ position;
/** @private */ endOfLastRow;

Expand All @@ -145,7 +143,6 @@ class Sender {
/** @private */ log;
/** @private */ agent;
/** @private */ jwk;
/** @private */ flushPromiseChain: Promise<boolean>;

/**
* Creates an instance of Sender.
Expand All @@ -157,10 +154,8 @@ class Sender {
if (!options || !options.protocol) {
throw new Error("The 'protocol' option is mandatory");
}
replaceDeprecatedOptions(options);

this.log = typeof options.log === "function" ? options.log : log;
this.flushPromiseChain = Promise.resolve(true as boolean);
replaceDeprecatedOptions(options, this.log);

switch (options.protocol) {
case HTTP:
Expand Down Expand Up @@ -248,8 +243,6 @@ class Sender {
? options.retry_timeout
: DEFAULT_RETRY_TIMEOUT;

const noCopy = isBoolean(options.copy_buffer) && !options.copy_buffer;
this.toBuffer = noCopy ? this.toBufferView : this.toBufferNew;
this.maxBufferSize = isInteger(options.max_buf_size, 1)
? options.max_buf_size
: DEFAULT_MAX_BUFFER_SIZE;
Expand Down Expand Up @@ -440,64 +433,17 @@ class Sender {
}

/**
* @ignore
* Compacts the buffer after data has been sent and resets pending row count.
* This method should only be called after a flush operation has successfully sent data.
* @param {number} bytesSent The number of bytes that were successfully sent and should be compacted.
*/
private _compactBufferAndResetState(bytesSent: number) {
if (bytesSent > 0 && bytesSent <= this.position) {
this.buffer.copy(this.buffer, 0, bytesSent, this.position);
this.position = this.position - bytesSent;
} else if (bytesSent > this.position) {
// This case should ideally not happen if logic is correct, means we tried to compact more than available
this.position = 0;
}
// If bytesSent is 0 or negative, or if no actual data was at the start of the buffer to be shifted,
// this.position effectively remains the same relative to the start of new data.

this.endOfLastRow = Math.max(0, this.endOfLastRow - bytesSent);
// Ensure endOfLastRow is also shifted if it was within the compacted area,
// or reset if it pointed to data that's now gone.
// If new rows were added while flushing, endOfLastRow would be > position post-compaction of old data.
// This needs careful handling if new data is added *during* an async flush.
// For now, we assume endOfLastRow is relative to the data just flushed.
// A simpler approach might be to always set this.endOfLastRow = 0 after a successful flush,
// as startNewRow() will set it correctly for the *next* new row.
// However, if a flush doesn't clear all pending complete rows, this needs to be accurate.
// The current `flush` logic sends up to `this.endOfLastRow`, so after sending `dataAmountToSend`
// (which was `this.endOfLastRow` at the time of prepping the flush), the new `this.endOfLastRow`
// should effectively be 0 relative to the start of the compacted buffer, until a new row is started.

this.lastFlushTime = Date.now();
this.pendingRowCount = 0; // Reset after successful flush
// If autoFlush was triggered by row count, this reset is crucial.
// If triggered by interval, this is also fine.
}

/**
* @ignore
* Executes the actual data sending logic (HTTP or TCP).
* This is called by the `flush` method, wrapped in the promise chain.
* @return {Promise<boolean>} Resolves to true if data was sent.
* Sends the buffer's content to the database and compacts the buffer.
* If the last row is not finished it stays in the sender's buffer.
*
* @return {Promise<boolean>} Resolves to true when there was data in the buffer to send, and it was sent successfully.
*/
private async _executeFlush(): Promise<boolean> {
const dataAmountToSend = this.endOfLastRow;
if (dataAmountToSend <= 0) {
async flush(): Promise<boolean> {
const dataToSend = this.toBufferNew(this.endOfLastRow);
if (!dataToSend) {
return false; // Nothing to send
}

// Use toBufferView to get a reference, actual data copy for sending happens based on protocol needs
const dataView = this.toBufferView(dataAmountToSend);
if (!dataView) {
return false; // Should not happen if dataAmountToSend > 0, but a safe check
}

// Create a copy for sending to avoid issues if the underlying buffer changes
// This is especially important for async operations.
const dataToSend = Buffer.allocUnsafe(dataView.length);
dataView.copy(dataToSend);

try {
if (this.http) {
const { timeout: calculatedTimeoutMillis } = createRequestOptions(this, dataToSend);
Expand Down Expand Up @@ -557,13 +503,11 @@ class Sender {
`Unexpected message from server: ${Buffer.from(responseBody).toString()}`,
);
}
this._compactBufferAndResetState(dataAmountToSend);
return true;
} else {
const error = new Error(
throw new Error(
`HTTP request failed, statusCode=${statusCode}, error=${Buffer.from(responseBody).toString()}`,
);
throw error;
}
} else { // TCP
if (!this.socket || this.socket.destroyed) {
Expand All @@ -574,7 +518,6 @@ class Sender {
if (err) {
reject(err);
} else {
this._compactBufferAndResetState(dataAmountToSend);
resolve(true);
}
});
Expand Down Expand Up @@ -609,51 +552,27 @@ class Sender {
}
}

/**
* Sends the buffer's content to the database and compacts the buffer.
* If the last row is not finished it stays in the sender's buffer.
* This operation is added to a queue and executed sequentially.
*
* @return {Promise<boolean>} Resolves to true when there was data in the buffer to send, and it was sent successfully.
*/
async flush(): Promise<boolean> {
// Add to the promise chain to ensure sequential execution
this.flushPromiseChain = this.flushPromiseChain
.then(async () => {
// Check if there's anything to flush just before execution
if (this.endOfLastRow <= 0) {
return false; // Nothing to flush
}
return this._executeFlush();
})
.catch((err: Error) => {
// Log or handle error. If _executeFlush throws, it will be caught here.
// The error should have already been logged by _executeFlush.
// We re-throw to ensure the promise chain reflects the failure.
this.log("error", `Flush operation failed in chain: ${err.message}`);
throw err; // Propagate error to the caller of this specific flush()
});
return this.flushPromiseChain;
}

/**
* @ignore
* @return {Buffer} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
* @return {Buffer} Returns a cropped buffer, or null if there is nothing to send.
* The returned buffer is backed by the sender's buffer.
* Used only in tests.
*/
toBufferView(pos = this.position): Buffer {
return pos > 0 ? this.buffer.subarray(0, pos) : null;
}

/**
* @ignore
* @return {Buffer|null} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
* @return {Buffer|null} Returns a cropped buffer ready to send to the server, or null if there is nothing to send.
* The returned buffer is a copy of the sender's buffer.
* It also compacts the Sender's buffer.
*/
toBufferNew(pos = this.position): Buffer | null {
if (pos > 0) {
const data = Buffer.allocUnsafe(pos);
this.buffer.copy(data, 0, 0, pos);
compact(this);
return data;
}
return null;
Expand Down Expand Up @@ -928,7 +847,7 @@ function createRequestOptions(
): InternalHttpOptions {
const timeoutMillis =
(data.length / sender.requestMinThroughput) * 1000 + sender.requestTimeout;
const options: InternalHttpOptions = {
return {
hostname: sender.host,
port: sender.port,
agent: sender.agent,
Expand All @@ -937,8 +856,6 @@ function createRequestOptions(
method: "POST",
timeout: timeoutMillis,
};

return options;
}

async function autoFlush(sender: Sender) {
Expand All @@ -950,12 +867,7 @@ async function autoFlush(sender: Sender) {
(sender.autoFlushInterval > 0 &&
Date.now() - sender.lastFlushTime >= sender.autoFlushInterval))
) {
// await sender.flush(); // Old call
sender.flush().catch(err => {
// Auto-flush errors should be logged but not necessarily crash the application
// The error is already logged by the flush chain's catch block or _executeFlush
sender.log("error", `Auto-flush failed: ${err.message}`);
});
await sender.flush();
}
}

Expand All @@ -973,6 +885,17 @@ function checkCapacity(sender: Sender, data: string[], base = 0) {
}
}

function compact(sender: Sender) {
if (sender.endOfLastRow > 0) {
sender.buffer.copy(sender.buffer, 0, sender.endOfLastRow, sender.position);
sender.position = sender.position - sender.endOfLastRow;
sender.endOfLastRow = 0;

sender.lastFlushTime = Date.now();
sender.pendingRowCount = 0;
}
}

function writeColumn(
sender: Sender,
name: string,
Expand Down Expand Up @@ -1073,18 +996,26 @@ function timestampToNanos(timestamp: bigint, unit: "ns" | "us" | "ms") {
}

type DeprecatedOptions = {
/** @deprecated */
copy_buffer?: boolean;
/** @deprecated */
copyBuffer?: boolean;
/** @deprecated */
bufferSize?: number;
};
function replaceDeprecatedOptions(options: SenderOptions & DeprecatedOptions) {
function replaceDeprecatedOptions(
options: SenderOptions & DeprecatedOptions,
log: (level: "error" | "warn" | "info" | "debug", message: string) => void
) {
// deal with deprecated options
if (options.copyBuffer) {
options.copy_buffer = options.copyBuffer;
options.copyBuffer = undefined;
if (options.copy_buffer !== undefined) {
log("warn", `Option 'copy_buffer' is not supported anymore, please, remove it`);
}
if (options.copyBuffer !== undefined) {
log("warn", `Option 'copyBuffer' is not supported anymore, please, remove it`);
}
if (options.bufferSize) {
if (options.bufferSize !== undefined) {
log("warn", `Option 'bufferSize' is not supported anymore, please, replace it with 'init_buf_size'`);
options.init_buf_size = options.bufferSize;
options.bufferSize = undefined;
}
Expand Down
35 changes: 0 additions & 35 deletions test/options.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -627,41 +627,6 @@ describe("Configuration string parser suite", function () {
).toThrow("Invalid retry timeout option, not a number: '6w0'");
});

it("can parse copy_buffer config", function () {
let options = SenderOptions.fromConfig(
"http::addr=host:9000;copy_buffer=on;",
);
expect(options.protocol).toBe("http");
expect(options.host).toBe("host");
expect(options.port).toBe(9000);
expect(options.copy_buffer).toBe(true);

options = SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=off;");
expect(options.protocol).toBe("http");
expect(options.host).toBe("host");
expect(options.port).toBe(9000);
expect(options.copy_buffer).toBe(false);

expect(() =>
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=ON;"),
).toThrow("Invalid copy buffer option: 'ON'");
expect(() =>
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=On;"),
).toThrow("Invalid copy buffer option: 'On'");
expect(() =>
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=true;"),
).toThrow("Invalid copy buffer option: 'true'");
expect(() =>
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=OFF;"),
).toThrow("Invalid copy buffer option: 'OFF'");
expect(() =>
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=Off;"),
).toThrow("Invalid copy buffer option: 'Off'");
expect(() =>
SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=false;"),
).toThrow("Invalid copy buffer option: 'false'");
});

it("can parse max_name_len config", function () {
const options = SenderOptions.fromConfig(
"http::addr=host:9000;max_name_len=30",
Expand Down
Loading