diff --git a/src/options.ts b/src/options.ts
index b0d6d5d..9b32645 100644
--- a/src/options.ts
+++ b/src/options.ts
@@ -105,11 +105,6 @@ const UNSAFE_OFF = "unsafe_off";
*
max_name_len: integer - The maximum length of a table or column name, the Sender defaults this parameter to 127.
* Recommended to use the same setting as the server, which also uses 127 by default.
*
- * copy_buffer: enum, accepted values: on, off - By default, the Sender creates a new buffer for every flush() call,
- * and the data to be sent to the server is copied into this new buffer.
- * Setting the flag to off results in reusing the same buffer instance for each flush() call.
- * Use this flag only if calls to the client are serialised.
- *
*
*/
class SenderOptions {
@@ -129,9 +124,6 @@ class SenderOptions {
auto_flush_rows?: number;
auto_flush_interval?: number;
- // replaces `copyBuffer` option
- copy_buffer?: string | boolean | null;
-
request_min_throughput?: number;
request_timeout?: number;
retry_timeout?: number;
@@ -243,7 +235,6 @@ function parseConfigurationString(
parseTlsOptions(options);
parseRequestTimeoutOptions(options);
parseMaxNameLength(options);
- parseCopyBuffer(options);
}
function parseSettings(
@@ -299,7 +290,6 @@ const ValidConfigKeys = [
"auto_flush",
"auto_flush_rows",
"auto_flush_interval",
- "copy_buffer",
"request_min_throughput",
"request_timeout",
"retry_timeout",
@@ -432,10 +422,6 @@ function parseMaxNameLength(options: SenderOptions) {
parseInteger(options, "max_name_len", "max name length", 1);
}
-function parseCopyBuffer(options: SenderOptions) {
- parseBoolean(options, "copy_buffer", "copy buffer");
-}
-
function parseBoolean(
options: SenderOptions,
property: string,
diff --git a/src/sender.ts b/src/sender.ts
index cdcab05..4cc6cb9 100644
--- a/src/sender.ts
+++ b/src/sender.ts
@@ -7,7 +7,7 @@ import crypto from "node:crypto";
import { Agent, RetryAgent } from "undici";
import { log } from "./logging";
-import { validateTableName, validateColumnName } from "./validation";
+import { validateColumnName, validateTableName } from "./validation";
import { SenderOptions, HTTP, HTTPS, TCP, TCPS } from "./options";
const HTTP_NO_CONTENT = 204; // success
@@ -121,8 +121,6 @@ class Sender {
/** @private */ bufferSize;
/** @private */ maxBufferSize;
/** @private */ buffer;
- /** @private */ toBuffer;
- /** @private */ doResolve;
/** @private */ position;
/** @private */ endOfLastRow;
@@ -145,7 +143,6 @@ class Sender {
/** @private */ log;
/** @private */ agent;
/** @private */ jwk;
- /** @private */ flushPromiseChain: Promise;
/**
* Creates an instance of Sender.
@@ -157,10 +154,8 @@ class Sender {
if (!options || !options.protocol) {
throw new Error("The 'protocol' option is mandatory");
}
- replaceDeprecatedOptions(options);
-
this.log = typeof options.log === "function" ? options.log : log;
- this.flushPromiseChain = Promise.resolve(true as boolean);
+ replaceDeprecatedOptions(options, this.log);
switch (options.protocol) {
case HTTP:
@@ -248,8 +243,6 @@ class Sender {
? options.retry_timeout
: DEFAULT_RETRY_TIMEOUT;
- const noCopy = isBoolean(options.copy_buffer) && !options.copy_buffer;
- this.toBuffer = noCopy ? this.toBufferView : this.toBufferNew;
this.maxBufferSize = isInteger(options.max_buf_size, 1)
? options.max_buf_size
: DEFAULT_MAX_BUFFER_SIZE;
@@ -440,64 +433,17 @@ class Sender {
}
/**
- * @ignore
- * Compacts the buffer after data has been sent and resets pending row count.
- * This method should only be called after a flush operation has successfully sent data.
- * @param {number} bytesSent The number of bytes that were successfully sent and should be compacted.
- */
- private _compactBufferAndResetState(bytesSent: number) {
- if (bytesSent > 0 && bytesSent <= this.position) {
- this.buffer.copy(this.buffer, 0, bytesSent, this.position);
- this.position = this.position - bytesSent;
- } else if (bytesSent > this.position) {
- // This case should ideally not happen if logic is correct, means we tried to compact more than available
- this.position = 0;
- }
- // If bytesSent is 0 or negative, or if no actual data was at the start of the buffer to be shifted,
- // this.position effectively remains the same relative to the start of new data.
-
- this.endOfLastRow = Math.max(0, this.endOfLastRow - bytesSent);
- // Ensure endOfLastRow is also shifted if it was within the compacted area,
- // or reset if it pointed to data that's now gone.
- // If new rows were added while flushing, endOfLastRow would be > position post-compaction of old data.
- // This needs careful handling if new data is added *during* an async flush.
- // For now, we assume endOfLastRow is relative to the data just flushed.
- // A simpler approach might be to always set this.endOfLastRow = 0 after a successful flush,
- // as startNewRow() will set it correctly for the *next* new row.
- // However, if a flush doesn't clear all pending complete rows, this needs to be accurate.
- // The current `flush` logic sends up to `this.endOfLastRow`, so after sending `dataAmountToSend`
- // (which was `this.endOfLastRow` at the time of prepping the flush), the new `this.endOfLastRow`
- // should effectively be 0 relative to the start of the compacted buffer, until a new row is started.
-
- this.lastFlushTime = Date.now();
- this.pendingRowCount = 0; // Reset after successful flush
- // If autoFlush was triggered by row count, this reset is crucial.
- // If triggered by interval, this is also fine.
- }
-
- /**
- * @ignore
- * Executes the actual data sending logic (HTTP or TCP).
- * This is called by the `flush` method, wrapped in the promise chain.
- * @return {Promise} Resolves to true if data was sent.
+ * Sends the buffer's content to the database and compacts the buffer.
+ * If the last row is not finished it stays in the sender's buffer.
+ *
+ * @return {Promise} Resolves to true when there was data in the buffer to send, and it was sent successfully.
*/
- private async _executeFlush(): Promise {
- const dataAmountToSend = this.endOfLastRow;
- if (dataAmountToSend <= 0) {
+ async flush(): Promise {
+ const dataToSend = this.toBufferNew(this.endOfLastRow);
+ if (!dataToSend) {
return false; // Nothing to send
}
- // Use toBufferView to get a reference, actual data copy for sending happens based on protocol needs
- const dataView = this.toBufferView(dataAmountToSend);
- if (!dataView) {
- return false; // Should not happen if dataAmountToSend > 0, but a safe check
- }
-
- // Create a copy for sending to avoid issues if the underlying buffer changes
- // This is especially important for async operations.
- const dataToSend = Buffer.allocUnsafe(dataView.length);
- dataView.copy(dataToSend);
-
try {
if (this.http) {
const { timeout: calculatedTimeoutMillis } = createRequestOptions(this, dataToSend);
@@ -557,13 +503,11 @@ class Sender {
`Unexpected message from server: ${Buffer.from(responseBody).toString()}`,
);
}
- this._compactBufferAndResetState(dataAmountToSend);
return true;
} else {
- const error = new Error(
+ throw new Error(
`HTTP request failed, statusCode=${statusCode}, error=${Buffer.from(responseBody).toString()}`,
);
- throw error;
}
} else { // TCP
if (!this.socket || this.socket.destroyed) {
@@ -574,7 +518,6 @@ class Sender {
if (err) {
reject(err);
} else {
- this._compactBufferAndResetState(dataAmountToSend);
resolve(true);
}
});
@@ -609,37 +552,11 @@ class Sender {
}
}
- /**
- * Sends the buffer's content to the database and compacts the buffer.
- * If the last row is not finished it stays in the sender's buffer.
- * This operation is added to a queue and executed sequentially.
- *
- * @return {Promise} Resolves to true when there was data in the buffer to send, and it was sent successfully.
- */
- async flush(): Promise {
- // Add to the promise chain to ensure sequential execution
- this.flushPromiseChain = this.flushPromiseChain
- .then(async () => {
- // Check if there's anything to flush just before execution
- if (this.endOfLastRow <= 0) {
- return false; // Nothing to flush
- }
- return this._executeFlush();
- })
- .catch((err: Error) => {
- // Log or handle error. If _executeFlush throws, it will be caught here.
- // The error should have already been logged by _executeFlush.
- // We re-throw to ensure the promise chain reflects the failure.
- this.log("error", `Flush operation failed in chain: ${err.message}`);
- throw err; // Propagate error to the caller of this specific flush()
- });
- return this.flushPromiseChain;
- }
-
/**
* @ignore
- * @return {Buffer} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
+ * @return {Buffer} Returns a cropped buffer, or null if there is nothing to send.
* The returned buffer is backed by the sender's buffer.
+ * Used only in tests.
*/
toBufferView(pos = this.position): Buffer {
return pos > 0 ? this.buffer.subarray(0, pos) : null;
@@ -647,13 +564,15 @@ class Sender {
/**
* @ignore
- * @return {Buffer|null} Returns a cropped buffer ready to send to the server or null if there is nothing to send.
+ * @return {Buffer|null} Returns a cropped buffer ready to send to the server, or null if there is nothing to send.
* The returned buffer is a copy of the sender's buffer.
+ * It also compacts the Sender's buffer.
*/
toBufferNew(pos = this.position): Buffer | null {
if (pos > 0) {
const data = Buffer.allocUnsafe(pos);
this.buffer.copy(data, 0, 0, pos);
+ compact(this);
return data;
}
return null;
@@ -928,7 +847,7 @@ function createRequestOptions(
): InternalHttpOptions {
const timeoutMillis =
(data.length / sender.requestMinThroughput) * 1000 + sender.requestTimeout;
- const options: InternalHttpOptions = {
+ return {
hostname: sender.host,
port: sender.port,
agent: sender.agent,
@@ -937,8 +856,6 @@ function createRequestOptions(
method: "POST",
timeout: timeoutMillis,
};
-
- return options;
}
async function autoFlush(sender: Sender) {
@@ -950,12 +867,7 @@ async function autoFlush(sender: Sender) {
(sender.autoFlushInterval > 0 &&
Date.now() - sender.lastFlushTime >= sender.autoFlushInterval))
) {
- // await sender.flush(); // Old call
- sender.flush().catch(err => {
- // Auto-flush errors should be logged but not necessarily crash the application
- // The error is already logged by the flush chain's catch block or _executeFlush
- sender.log("error", `Auto-flush failed: ${err.message}`);
- });
+ await sender.flush();
}
}
@@ -973,6 +885,17 @@ function checkCapacity(sender: Sender, data: string[], base = 0) {
}
}
+function compact(sender: Sender) {
+ if (sender.endOfLastRow > 0) {
+ sender.buffer.copy(sender.buffer, 0, sender.endOfLastRow, sender.position);
+ sender.position = sender.position - sender.endOfLastRow;
+ sender.endOfLastRow = 0;
+
+ sender.lastFlushTime = Date.now();
+ sender.pendingRowCount = 0;
+ }
+}
+
function writeColumn(
sender: Sender,
name: string,
@@ -1073,18 +996,26 @@ function timestampToNanos(timestamp: bigint, unit: "ns" | "us" | "ms") {
}
type DeprecatedOptions = {
+ /** @deprecated */
+ copy_buffer?: boolean;
/** @deprecated */
copyBuffer?: boolean;
/** @deprecated */
bufferSize?: number;
};
-function replaceDeprecatedOptions(options: SenderOptions & DeprecatedOptions) {
+function replaceDeprecatedOptions(
+ options: SenderOptions & DeprecatedOptions,
+ log: (level: "error" | "warn" | "info" | "debug", message: string) => void
+) {
// deal with deprecated options
- if (options.copyBuffer) {
- options.copy_buffer = options.copyBuffer;
- options.copyBuffer = undefined;
+ if (options.copy_buffer !== undefined) {
+ log("warn", `Option 'copy_buffer' is not supported anymore, please, remove it`);
+ }
+ if (options.copyBuffer !== undefined) {
+ log("warn", `Option 'copyBuffer' is not supported anymore, please, remove it`);
}
- if (options.bufferSize) {
+ if (options.bufferSize !== undefined) {
+ log("warn", `Option 'bufferSize' is not supported anymore, please, replace it with 'init_buf_size'`);
options.init_buf_size = options.bufferSize;
options.bufferSize = undefined;
}
diff --git a/test/options.test.ts b/test/options.test.ts
index 12b867a..576670c 100644
--- a/test/options.test.ts
+++ b/test/options.test.ts
@@ -627,41 +627,6 @@ describe("Configuration string parser suite", function () {
).toThrow("Invalid retry timeout option, not a number: '6w0'");
});
- it("can parse copy_buffer config", function () {
- let options = SenderOptions.fromConfig(
- "http::addr=host:9000;copy_buffer=on;",
- );
- expect(options.protocol).toBe("http");
- expect(options.host).toBe("host");
- expect(options.port).toBe(9000);
- expect(options.copy_buffer).toBe(true);
-
- options = SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=off;");
- expect(options.protocol).toBe("http");
- expect(options.host).toBe("host");
- expect(options.port).toBe(9000);
- expect(options.copy_buffer).toBe(false);
-
- expect(() =>
- SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=ON;"),
- ).toThrow("Invalid copy buffer option: 'ON'");
- expect(() =>
- SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=On;"),
- ).toThrow("Invalid copy buffer option: 'On'");
- expect(() =>
- SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=true;"),
- ).toThrow("Invalid copy buffer option: 'true'");
- expect(() =>
- SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=OFF;"),
- ).toThrow("Invalid copy buffer option: 'OFF'");
- expect(() =>
- SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=Off;"),
- ).toThrow("Invalid copy buffer option: 'Off'");
- expect(() =>
- SenderOptions.fromConfig("http::addr=host:9000;copy_buffer=false;"),
- ).toThrow("Invalid copy buffer option: 'false'");
- });
-
it("can parse max_name_len config", function () {
const options = SenderOptions.fromConfig(
"http::addr=host:9000;max_name_len=30",
diff --git a/test/sender.test.ts b/test/sender.test.ts
index c925d7c..57a60b5 100644
--- a/test/sender.test.ts
+++ b/test/sender.test.ts
@@ -164,110 +164,99 @@ describe("Sender options test suite", function () {
}
});
- it("does copy the buffer during flush() if copyBuffer is not set", async function () {
- const sender = new Sender({ protocol: "http", host: "host" });
- expect(sender.toBuffer).toBe(sender.toBufferNew);
- await sender.close();
- });
-
- it("does copy the buffer during flush() if copyBuffer is set to true", async function () {
+ it("sets default buffer size if init_buf_size is not set", async function () {
const sender = new Sender({
protocol: "http",
host: "host",
- copy_buffer: true,
});
- expect(sender.toBuffer).toBe(sender.toBufferNew);
- await sender.close();
- });
-
- it("does copy the buffer during flush() if copyBuffer is not a boolean", async function () {
- const sender = new Sender({
- protocol: "http",
- host: "host",
- copy_buffer: "",
- });
- expect(sender.toBuffer).toBe(sender.toBufferNew);
- await sender.close();
- });
-
- it("does not copy the buffer during flush() if copyBuffer is set to false", async function () {
- const sender = new Sender({
- protocol: "http",
- host: "host",
- copy_buffer: false,
- });
- expect(sender.toBuffer).toBe(sender.toBufferView);
+ expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE);
await sender.close();
});
- it("does not copy the buffer during flush() if copyBuffer is set to null", async function () {
+ it("sets the requested buffer size if init_buf_size is set", async function () {
const sender = new Sender({
protocol: "http",
host: "host",
- copy_buffer: null,
+ init_buf_size: 1024,
});
- expect(sender.toBuffer).toBe(sender.toBufferNew);
+ expect(sender.bufferSize).toBe(1024);
await sender.close();
});
- it("does not copy the buffer during flush() if copyBuffer is undefined", async function () {
+ it("sets default buffer size if init_buf_size is set to null", async function () {
const sender = new Sender({
protocol: "http",
host: "host",
- copy_buffer: undefined,
+ init_buf_size: null,
});
- expect(sender.toBuffer).toBe(sender.toBufferNew);
+ expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE);
await sender.close();
});
- it("sets default buffer size if bufferSize is not set", async function () {
+ it("sets default buffer size if init_buf_size is set to undefined", async function () {
const sender = new Sender({
protocol: "http",
host: "host",
- copy_buffer: true,
+ init_buf_size: undefined,
});
expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE);
await sender.close();
});
- it("sets the requested buffer size if bufferSize is set", async function () {
+ it("sets default buffer size if init_buf_size is not a number", async function () {
const sender = new Sender({
protocol: "http",
host: "host",
- init_buf_size: 1024,
+ // @ts-expect-error - Testing invalid options
+ init_buf_size: "1024",
});
- expect(sender.bufferSize).toBe(1024);
+ expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE);
await sender.close();
});
- it("sets default buffer size if bufferSize is set to null", async function () {
+ it("sets the requested buffer size if 'bufferSize' is set, but warns that it is deprecated", async function () {
+ const log = (level: "error" | "warn" | "info" | "debug", message: string) => {
+ expect(level).toBe("warn");
+ expect(message).toMatch("Option 'bufferSize' is not supported anymore, please, replace it with 'init_buf_size'");
+ };
const sender = new Sender({
protocol: "http",
host: "host",
- init_buf_size: null,
+ // @ts-expect-error - Testing deprecated option
+ bufferSize: 2048,
+ log: log,
});
- expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE);
+ expect(sender.bufferSize).toBe(2048);
await sender.close();
});
- it("sets default buffer size if bufferSize is set to undefined", async function () {
+ it("warns about deprecated option 'copy_buffer'", async function () {
+ const log = (level: "error" | "warn" | "info" | "debug", message: string) => {
+ expect(level).toBe("warn");
+ expect(message).toMatch("Option 'copy_buffer' is not supported anymore, please, remove it");
+ };
const sender = new Sender({
protocol: "http",
host: "host",
- init_buf_size: undefined,
+ // @ts-expect-error - Testing deprecated option
+ copy_buffer: false,
+ log: log,
});
- expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE);
await sender.close();
});
- it("sets default buffer size if bufferSize is not a number", async function () {
+ it("warns about deprecated option 'copyBuffer'", async function () {
+ const log = (level: "error" | "warn" | "info" | "debug", message: string) => {
+ expect(level).toBe("warn");
+ expect(message).toMatch("Option 'copyBuffer' is not supported anymore, please, remove it");
+ };
const sender = new Sender({
protocol: "http",
host: "host",
- // @ts-expect-error - Testing invalid options
- init_buf_size: "1024",
+ // @ts-expect-error - Testing deprecated option
+ copyBuffer: false,
+ log: log,
});
- expect(sender.bufferSize).toBe(DEFAULT_BUFFER_SIZE);
await sender.close();
});
@@ -1151,8 +1140,7 @@ describe("Sender message builder test suite (anything not covered in client inte
const sender = new Sender({
protocol: "tcp",
host: "host",
- // @ts-expect-error - Technically it's a private field, but I'm not sure
- bufferSize: 256,
+ init_buf_size: 256,
});
for (const p of pages) {
await sender
@@ -1596,10 +1584,11 @@ describe("Sender message builder test suite (anything not covered in client inte
init_buf_size: 1024,
});
expect(sender.toBufferView()).toBe(null);
+ expect(sender.toBufferNew()).toBe(null);
await sender.close();
});
- it("ignores unfinished rows when preparing a buffer for send", async function () {
+ it("leaves unfinished rows in the sender's buffer when preparing a copy of the buffer for send", async function () {
const sender = new Sender({
protocol: "tcp",
host: "host",
@@ -1608,9 +1597,15 @@ describe("Sender message builder test suite (anything not covered in client inte
sender.table("tableName").symbol("name", "value");
await sender.at(1234567890n, "ns");
sender.table("tableName").symbol("name", "value2");
- expect(sender.toBufferView(sender.endOfLastRow).toString()).toBe(
+
+ // copy of the sender's buffer contains the finished row
+ expect(sender.toBufferNew(sender.endOfLastRow).toString()).toBe(
"tableName,name=value 1234567890\n",
);
+ // the sender's buffer is compacted, and contains only the unfinished row
+ expect(sender.toBufferView().toString()).toBe(
+ "tableName,name=value2",
+ );
await sender.close();
});
@@ -2096,7 +2091,6 @@ describe("Sender tests with containerized QuestDB instance", () => {
protocol: "tcp",
host: container.getHost(),
port: container.getMappedPort(QUESTDB_ILP_PORT),
- copy_buffer: true,
});
await sender.connect();
@@ -2140,7 +2134,7 @@ describe("Sender tests with containerized QuestDB instance", () => {
it("ingests all data without loss under high load with auto-flush", async () => {
const sender = Sender.fromConfig(
- `tcp::addr=${container.getHost()}:${container.getMappedPort(QUESTDB_ILP_PORT)};auto_flush_rows=5;auto_flush_interval=1;copy_buffer=on`,
+ `tcp::addr=${container.getHost()}:${container.getMappedPort(QUESTDB_ILP_PORT)};auto_flush_rows=5;auto_flush_interval=1`,
);
await sender.connect();