From 035d45bcbad6a5ed3c35f3f94265c2edd7ed4593 Mon Sep 17 00:00:00 2001 From: glasstiger Date: Wed, 9 Jul 2025 02:42:31 +0100 Subject: [PATCH] chore(nodejs): refactor buffer handling --- src/options.ts | 14 ---- src/sender.ts | 151 ++++++++++++------------------------------- test/options.test.ts | 35 ---------- test/sender.test.ts | 106 ++++++++++++++---------------- 4 files changed, 91 insertions(+), 215 deletions(-) diff --git a/src/options.ts b/src/options.ts index b0d6d5d..9b32645 100644 --- a/src/options.ts +++ b/src/options.ts @@ -105,11 +105,6 @@ const UNSAFE_OFF = "unsafe_off"; *
  • max_name_len: integer - The maximum length of a table or column name, the Sender defaults this parameter to 127.
    * Recommended to use the same setting as the server, which also uses 127 by default. *
  • - *
  • copy_buffer: enum, accepted values: on, off - By default, the Sender creates a new buffer for every flush() call, - * and the data to be sent to the server is copied into this new buffer. - * Setting the flag to off results in reusing the same buffer instance for each flush() call.
    - * Use this flag only if calls to the client are serialised. - *
  • * */ class SenderOptions { @@ -129,9 +124,6 @@ class SenderOptions { auto_flush_rows?: number; auto_flush_interval?: number; - // replaces `copyBuffer` option - copy_buffer?: string | boolean | null; - request_min_throughput?: number; request_timeout?: number; retry_timeout?: number; @@ -243,7 +235,6 @@ function parseConfigurationString( parseTlsOptions(options); parseRequestTimeoutOptions(options); parseMaxNameLength(options); - parseCopyBuffer(options); } function parseSettings( @@ -299,7 +290,6 @@ const ValidConfigKeys = [ "auto_flush", "auto_flush_rows", "auto_flush_interval", - "copy_buffer", "request_min_throughput", "request_timeout", "retry_timeout", @@ -432,10 +422,6 @@ function parseMaxNameLength(options: SenderOptions) { parseInteger(options, "max_name_len", "max name length", 1); } -function parseCopyBuffer(options: SenderOptions) { - parseBoolean(options, "copy_buffer", "copy buffer"); -} - function parseBoolean( options: SenderOptions, property: string, diff --git a/src/sender.ts b/src/sender.ts index cdcab05..4cc6cb9 100644 --- a/src/sender.ts +++ b/src/sender.ts @@ -7,7 +7,7 @@ import crypto from "node:crypto"; import { Agent, RetryAgent } from "undici"; import { log } from "./logging"; -import { validateTableName, validateColumnName } from "./validation"; +import { validateColumnName, validateTableName } from "./validation"; import { SenderOptions, HTTP, HTTPS, TCP, TCPS } from "./options"; const HTTP_NO_CONTENT = 204; // success @@ -121,8 +121,6 @@ class Sender { /** @private */ bufferSize; /** @private */ maxBufferSize; /** @private */ buffer; - /** @private */ toBuffer; - /** @private */ doResolve; /** @private */ position; /** @private */ endOfLastRow; @@ -145,7 +143,6 @@ class Sender { /** @private */ log; /** @private */ agent; /** @private */ jwk; - /** @private */ flushPromiseChain: Promise; /** * Creates an instance of Sender. @@ -157,10 +154,8 @@ class Sender { if (!options || !options.protocol) { throw new Error("The 'protocol' option is mandatory"); } - replaceDeprecatedOptions(options); - this.log = typeof options.log === "function" ? options.log : log; - this.flushPromiseChain = Promise.resolve(true as boolean); + replaceDeprecatedOptions(options, this.log); switch (options.protocol) { case HTTP: @@ -248,8 +243,6 @@ class Sender { ? options.retry_timeout : DEFAULT_RETRY_TIMEOUT; - const noCopy = isBoolean(options.copy_buffer) && !options.copy_buffer; - this.toBuffer = noCopy ? this.toBufferView : this.toBufferNew; this.maxBufferSize = isInteger(options.max_buf_size, 1) ? options.max_buf_size : DEFAULT_MAX_BUFFER_SIZE; @@ -440,64 +433,17 @@ class Sender { } /** - * @ignore - * Compacts the buffer after data has been sent and resets pending row count. - * This method should only be called after a flush operation has successfully sent data. - * @param {number} bytesSent The number of bytes that were successfully sent and should be compacted. - */ - private _compactBufferAndResetState(bytesSent: number) { - if (bytesSent > 0 && bytesSent <= this.position) { - this.buffer.copy(this.buffer, 0, bytesSent, this.position); - this.position = this.position - bytesSent; - } else if (bytesSent > this.position) { - // This case should ideally not happen if logic is correct, means we tried to compact more than available - this.position = 0; - } - // If bytesSent is 0 or negative, or if no actual data was at the start of the buffer to be shifted, - // this.position effectively remains the same relative to the start of new data. - - this.endOfLastRow = Math.max(0, this.endOfLastRow - bytesSent); - // Ensure endOfLastRow is also shifted if it was within the compacted area, - // or reset if it pointed to data that's now gone. - // If new rows were added while flushing, endOfLastRow would be > position post-compaction of old data. - // This needs careful handling if new data is added *during* an async flush. - // For now, we assume endOfLastRow is relative to the data just flushed. - // A simpler approach might be to always set this.endOfLastRow = 0 after a successful flush, - // as startNewRow() will set it correctly for the *next* new row. - // However, if a flush doesn't clear all pending complete rows, this needs to be accurate. - // The current `flush` logic sends up to `this.endOfLastRow`, so after sending `dataAmountToSend` - // (which was `this.endOfLastRow` at the time of prepping the flush), the new `this.endOfLastRow` - // should effectively be 0 relative to the start of the compacted buffer, until a new row is started. - - this.lastFlushTime = Date.now(); - this.pendingRowCount = 0; // Reset after successful flush - // If autoFlush was triggered by row count, this reset is crucial. - // If triggered by interval, this is also fine. - } - - /** - * @ignore - * Executes the actual data sending logic (HTTP or TCP). - * This is called by the `flush` method, wrapped in the promise chain. - * @return {Promise} Resolves to true if data was sent. + * Sends the buffer's content to the database and compacts the buffer. + * If the last row is not finished it stays in the sender's buffer. + * + * @return {Promise} Resolves to true when there was data in the buffer to send, and it was sent successfully. */ - private async _executeFlush(): Promise { - const dataAmountToSend = this.endOfLastRow; - if (dataAmountToSend <= 0) { + async flush(): Promise { + const dataToSend = this.toBufferNew(this.endOfLastRow); + if (!dataToSend) { return false; // Nothing to send } - // Use toBufferView to get a reference, actual data copy for sending happens based on protocol needs - const dataView = this.toBufferView(dataAmountToSend); - if (!dataView) { - return false; // Should not happen if dataAmountToSend > 0, but a safe check - } - - // Create a copy for sending to avoid issues if the underlying buffer changes - // This is especially important for async operations. - const dataToSend = Buffer.allocUnsafe(dataView.length); - dataView.copy(dataToSend); - try { if (this.http) { const { timeout: calculatedTimeoutMillis } = createRequestOptions(this, dataToSend); @@ -557,13 +503,11 @@ class Sender { `Unexpected message from server: ${Buffer.from(responseBody).toString()}`, ); } - this._compactBufferAndResetState(dataAmountToSend); return true; } else { - const error = new Error( + throw new Error( `HTTP request failed, statusCode=${statusCode}, error=${Buffer.from(responseBody).toString()}`, ); - throw error; } } else { // TCP if (!this.socket || this.socket.destroyed) { @@ -574,7 +518,6 @@ class Sender { if (err) { reject(err); } else { - this._compactBufferAndResetState(dataAmountToSend); resolve(true); } }); @@ -609,37 +552,11 @@ class Sender { } } - /** - * Sends the buffer's content to the database and compacts the buffer. - * If the last row is not finished it stays in the sender's buffer. - * This operation is added to a queue and executed sequentially. - * - * @return {Promise} Resolves to true when there was data in the buffer to send, and it was sent successfully. - */ - async flush(): Promise { - // Add to the promise chain to ensure sequential execution - this.flushPromiseChain = this.flushPromiseChain - .then(async () => { - // Check if there's anything to flush just before execution - if (this.endOfLastRow <= 0) { - return false; // Nothing to flush - } - return this._executeFlush(); - }) - .catch((err: Error) => { - // Log or handle error. If _executeFlush throws, it will be caught here. - // The error should have already been logged by _executeFlush. - // We re-throw to ensure the promise chain reflects the failure. - this.log("error", `Flush operation failed in chain: ${err.message}`); - throw err; // Propagate error to the caller of this specific flush() - }); - return this.flushPromiseChain; - } - /** * @ignore - * @return {Buffer} Returns a cropped buffer ready to send to the server or null if there is nothing to send. + * @return {Buffer} Returns a cropped buffer, or null if there is nothing to send. * The returned buffer is backed by the sender's buffer. + * Used only in tests. */ toBufferView(pos = this.position): Buffer { return pos > 0 ? this.buffer.subarray(0, pos) : null; @@ -647,13 +564,15 @@ class Sender { /** * @ignore - * @return {Buffer|null} Returns a cropped buffer ready to send to the server or null if there is nothing to send. + * @return {Buffer|null} Returns a cropped buffer ready to send to the server, or null if there is nothing to send. * The returned buffer is a copy of the sender's buffer. + * It also compacts the Sender's buffer. */ toBufferNew(pos = this.position): Buffer | null { if (pos > 0) { const data = Buffer.allocUnsafe(pos); this.buffer.copy(data, 0, 0, pos); + compact(this); return data; } return null; @@ -928,7 +847,7 @@ function createRequestOptions( ): InternalHttpOptions { const timeoutMillis = (data.length / sender.requestMinThroughput) * 1000 + sender.requestTimeout; - const options: InternalHttpOptions = { + return { hostname: sender.host, port: sender.port, agent: sender.agent, @@ -937,8 +856,6 @@ function createRequestOptions( method: "POST", timeout: timeoutMillis, }; - - return options; } async function autoFlush(sender: Sender) { @@ -950,12 +867,7 @@ async function autoFlush(sender: Sender) { (sender.autoFlushInterval > 0 && Date.now() - sender.lastFlushTime >= sender.autoFlushInterval)) ) { - // await sender.flush(); // Old call - sender.flush().catch(err => { - // Auto-flush errors should be logged but not necessarily crash the application - // The error is already logged by the flush chain's catch block or _executeFlush - sender.log("error", `Auto-flush failed: ${err.message}`); - }); + await sender.flush(); } } @@ -973,6 +885,17 @@ function checkCapacity(sender: Sender, data: string[], base = 0) { } } +function compact(sender: Sender) { + if (sender.endOfLastRow > 0) { + sender.buffer.copy(sender.buffer, 0, sender.endOfLastRow, sender.position); + sender.position = sender.position - sender.endOfLastRow; + sender.endOfLastRow = 0; + + sender.lastFlushTime = Date.now(); + sender.pendingRowCount = 0; + } +} + function writeColumn( sender: Sender, name: string, @@ -1073,18 +996,26 @@ function timestampToNanos(timestamp: bigint, unit: "ns" | "us" | "ms") { } type DeprecatedOptions = { + /** @deprecated */ + copy_buffer?: boolean; /** @deprecated */ copyBuffer?: boolean; /** @deprecated */ bufferSize?: number; }; -function replaceDeprecatedOptions(options: SenderOptions & DeprecatedOptions) { +function replaceDeprecatedOptions( + options: SenderOptions & DeprecatedOptions, + log: (level: "error" | "warn" | "info" | "debug", message: string) => void +) { // deal with deprecated options - if (options.copyBuffer) { - options.copy_buffer = options.copyBuffer; - options.copyBuffer = undefined; + if (options.copy_buffer !== undefined) { + log("warn", `Option 'copy_buffer' is not supported anymore, please, remove it`); + } + if (options.copyBuffer !== undefined) { + log("warn", `Option 'copyBuffer' is not supported anymore, please, remove it`); } - if (options.bufferSize) { + if (options.bufferSize !== undefined) { + log("warn", `Option 'bufferSize' is not supported anymore, please, replace it with 'init_buf_size'`); options.init_buf_size = options.bufferSize; options.bufferSize = undefined; } diff --git a/test/options.test.ts b/test/options.test.ts index 12b867a..576670c 100644 --- a/test/options.test.ts +++ b/test/options.test.ts @@ -627,41 +627,6 @@ describe("Configuration string parser suite", function () { ).toThrow("Invalid retry timeout option, not a number: '6w0'"); }); - it("can parse copy_buffer config", function () { - let options = SenderOptions.fromConfig( - "http::addr=host:9000;copy_buffer=on;", - ); - expect(options.protocol).toBe("http"); - expect(options.host).toBe("host"); - expect(options.port).toBe(9000); - expect(options.copy_buffer).toBe(true); - - options = SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=off;"); - expect(options.protocol).toBe("http"); - expect(options.host).toBe("host"); - expect(options.port).toBe(9000); - expect(options.copy_buffer).toBe(false); - - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=ON;"), - ).toThrow("Invalid copy buffer option: 'ON'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=On;"), - ).toThrow("Invalid copy buffer option: 'On'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=true;"), - ).toThrow("Invalid copy buffer option: 'true'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=OFF;"), - ).toThrow("Invalid copy buffer option: 'OFF'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=Off;"), - ).toThrow("Invalid copy buffer option: 'Off'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=false;"), - ).toThrow("Invalid copy buffer option: 'false'"); - }); - it("can parse max_name_len config", function () { const options = SenderOptions.fromConfig( "http::addr=host:9000;max_name_len=30", diff --git a/test/sender.test.ts b/test/sender.test.ts index c925d7c..57a60b5 100644 --- a/test/sender.test.ts +++ b/test/sender.test.ts @@ -164,110 +164,99 @@ describe("Sender options test suite", function () { } }); - it("does copy the buffer during flush() if copyBuffer is not set", async function () { - const sender = new Sender({ protocol: "http", host: "host" }); - expect(sender.toBuffer).toBe(sender.toBufferNew); - await sender.close(); - }); - - it("does copy the buffer during flush() if copyBuffer is set to true", async function () { + it("sets default buffer size if init_buf_size is not set", async function () { const sender = new Sender({ protocol: "http", host: "host", - copy_buffer: true, }); - expect(sender.toBuffer).toBe(sender.toBufferNew); - await sender.close(); - }); - - it("does copy the buffer during flush() if copyBuffer is not a boolean", async function () { - const sender = new Sender({ - protocol: "http", - host: "host", - copy_buffer: "", - }); - expect(sender.toBuffer).toBe(sender.toBufferNew); - await sender.close(); - }); - - it("does not copy the buffer during flush() if copyBuffer is set to false", async function () { - const sender = new Sender({ - protocol: "http", - host: "host", - copy_buffer: false, - }); - expect(sender.toBuffer).toBe(sender.toBufferView); + expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE); await sender.close(); }); - it("does not copy the buffer during flush() if copyBuffer is set to null", async function () { + it("sets the requested buffer size if init_buf_size is set", async function () { const sender = new Sender({ protocol: "http", host: "host", - copy_buffer: null, + init_buf_size: 1024, }); - expect(sender.toBuffer).toBe(sender.toBufferNew); + expect(sender.bufferSize).toBe(1024); await sender.close(); }); - it("does not copy the buffer during flush() if copyBuffer is undefined", async function () { + it("sets default buffer size if init_buf_size is set to null", async function () { const sender = new Sender({ protocol: "http", host: "host", - copy_buffer: undefined, + init_buf_size: null, }); - expect(sender.toBuffer).toBe(sender.toBufferNew); + expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE); await sender.close(); }); - it("sets default buffer size if bufferSize is not set", async function () { + it("sets default buffer size if init_buf_size is set to undefined", async function () { const sender = new Sender({ protocol: "http", host: "host", - copy_buffer: true, + init_buf_size: undefined, }); expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE); await sender.close(); }); - it("sets the requested buffer size if bufferSize is set", async function () { + it("sets default buffer size if init_buf_size is not a number", async function () { const sender = new Sender({ protocol: "http", host: "host", - init_buf_size: 1024, + // @ts-expect-error - Testing invalid options + init_buf_size: "1024", }); - expect(sender.bufferSize).toBe(1024); + expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE); await sender.close(); }); - it("sets default buffer size if bufferSize is set to null", async function () { + it("sets the requested buffer size if 'bufferSize' is set, but warns that it is deprecated", async function () { + const log = (level: "error" | "warn" | "info" | "debug", message: string) => { + expect(level).toBe("warn"); + expect(message).toMatch("Option 'bufferSize' is not supported anymore, please, replace it with 'init_buf_size'"); + }; const sender = new Sender({ protocol: "http", host: "host", - init_buf_size: null, + // @ts-expect-error - Testing deprecated option + bufferSize: 2048, + log: log, }); - expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE); + expect(sender.bufferSize).toBe(2048); await sender.close(); }); - it("sets default buffer size if bufferSize is set to undefined", async function () { + it("warns about deprecated option 'copy_buffer'", async function () { + const log = (level: "error" | "warn" | "info" | "debug", message: string) => { + expect(level).toBe("warn"); + expect(message).toMatch("Option 'copy_buffer' is not supported anymore, please, remove it"); + }; const sender = new Sender({ protocol: "http", host: "host", - init_buf_size: undefined, + // @ts-expect-error - Testing deprecated option + copy_buffer: false, + log: log, }); - expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE); await sender.close(); }); - it("sets default buffer size if bufferSize is not a number", async function () { + it("warns about deprecated option 'copyBuffer'", async function () { + const log = (level: "error" | "warn" | "info" | "debug", message: string) => { + expect(level).toBe("warn"); + expect(message).toMatch("Option 'copyBuffer' is not supported anymore, please, remove it"); + }; const sender = new Sender({ protocol: "http", host: "host", - // @ts-expect-error - Testing invalid options - init_buf_size: "1024", + // @ts-expect-error - Testing deprecated option + copyBuffer: false, + log: log, }); - expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE); await sender.close(); }); @@ -1151,8 +1140,7 @@ describe("Sender message builder test suite (anything not covered in client inte const sender = new Sender({ protocol: "tcp", host: "host", - // @ts-expect-error - Technically it's a private field, but I'm not sure - bufferSize: 256, + init_buf_size: 256, }); for (const p of pages) { await sender @@ -1596,10 +1584,11 @@ describe("Sender message builder test suite (anything not covered in client inte init_buf_size: 1024, }); expect(sender.toBufferView()).toBe(null); + expect(sender.toBufferNew()).toBe(null); await sender.close(); }); - it("ignores unfinished rows when preparing a buffer for send", async function () { + it("leaves unfinished rows in the sender's buffer when preparing a copy of the buffer for send", async function () { const sender = new Sender({ protocol: "tcp", host: "host", @@ -1608,9 +1597,15 @@ describe("Sender message builder test suite (anything not covered in client inte sender.table("tableName").symbol("name", "value"); await sender.at(1234567890n, "ns"); sender.table("tableName").symbol("name", "value2"); - expect(sender.toBufferView(sender.endOfLastRow).toString()).toBe( + + // copy of the sender's buffer contains the finished row + expect(sender.toBufferNew(sender.endOfLastRow).toString()).toBe( "tableName,name=value 1234567890\n", ); + // the sender's buffer is compacted, and contains only the unfinished row + expect(sender.toBufferView().toString()).toBe( + "tableName,name=value2", + ); await sender.close(); }); @@ -2096,7 +2091,6 @@ describe("Sender tests with containerized QuestDB instance", () => { protocol: "tcp", host: container.getHost(), port: container.getMappedPort(QUESTDB_ILP_PORT), - copy_buffer: true, }); await sender.connect(); @@ -2140,7 +2134,7 @@ describe("Sender tests with containerized QuestDB instance", () => { it("ingests all data without loss under high load with auto-flush", async () => { const sender = Sender.fromConfig( - `tcp::addr=${container.getHost()}:${container.getMappedPort(QUESTDB_ILP_PORT)};auto_flush_rows=5;auto_flush_interval=1;copy_buffer=on`, + `tcp::addr=${container.getHost()}:${container.getMappedPort(QUESTDB_ILP_PORT)};auto_flush_rows=5;auto_flush_interval=1`, ); await sender.connect();