diff --git a/src/buffer/base.ts b/src/buffer/base.ts new file mode 100644 index 0000000..9030743 --- /dev/null +++ b/src/buffer/base.ts @@ -0,0 +1,447 @@ +// @ts-check +import { Buffer } from "node:buffer"; + +import { log, Logger } from "../logging"; +import { validateColumnName, validateTableName } from "../validation"; +import { SenderOptions } from "../options"; +import { + SenderBuffer, + DEFAULT_BUFFER_SIZE, + DEFAULT_MAX_BUFFER_SIZE, +} from "./index"; +import { + isInteger, + timestampToMicros, + timestampToNanos, + TimestampUnit, +} from "../utils"; + +// Default maximum length for table and column names. +const DEFAULT_MAX_NAME_LENGTH = 127; + +/** + * Abstract base class for SenderBuffer implementations.
+ * Provides common functionality for writing data into the buffer. + */ +abstract class SenderBufferBase implements SenderBuffer { + private bufferSize: number; + private readonly maxBufferSize: number; + private buffer: Buffer; + private position: number; + private endOfLastRow: number; + + private hasTable: boolean; + private hasSymbols: boolean; + private hasColumns: boolean; + + private readonly maxNameLength: number; + + protected readonly log: Logger; + + /** + * Creates an instance of SenderBufferBase. + * + * @param {SenderOptions} options - Sender configuration object.
+ * See SenderOptions documentation for detailed description of configuration options.
+ */ + protected constructor(options: SenderOptions) { + this.log = options && typeof options.log === "function" ? options.log : log; + SenderOptions.resolveDeprecated(options, this.log); + + this.maxNameLength = + options && isInteger(options.max_name_len, 1) + ? options.max_name_len + : DEFAULT_MAX_NAME_LENGTH; + + this.maxBufferSize = + options && isInteger(options.max_buf_size, 1) + ? options.max_buf_size + : DEFAULT_MAX_BUFFER_SIZE; + this.resize( + options && isInteger(options.init_buf_size, 1) + ? options.init_buf_size + : DEFAULT_BUFFER_SIZE, + ); + + this.reset(); + } + + /** + * Extends the size of the buffer.
+ * Can be used to increase the size of buffer if overflown. + * The buffer's content is copied into the new buffer. + * + * @param {number} bufferSize - New size of the buffer used by the sender, provided in bytes. + */ + private resize(bufferSize: number) { + if (bufferSize > this.maxBufferSize) { + throw new Error( + `Max buffer size is ${this.maxBufferSize} bytes, requested buffer size: ${bufferSize}`, + ); + } + this.bufferSize = bufferSize; + // Allocating an extra byte because Buffer.write() does not fail if the length of the data to be written is + // longer than the size of the buffer. It simply just writes whatever it can, and returns. + // If we can write into the extra byte, that indicates buffer overflow. + // See the check in the write() function. + const newBuffer = Buffer.alloc(this.bufferSize + 1, 0); + if (this.buffer) { + this.buffer.copy(newBuffer); + } + this.buffer = newBuffer; + } + + /** + * Resets the buffer, data added to the buffer will be lost.
+ * In other words it clears the buffer and sets the writing position to the beginning of the buffer. + * + * @return {Sender} Returns with a reference to this sender. + */ + reset(): SenderBuffer { + this.position = 0; + this.startNewRow(); + return this; + } + + private startNewRow() { + this.endOfLastRow = this.position; + this.hasTable = false; + this.hasSymbols = false; + this.hasColumns = false; + } + + /** + * @return {Buffer} Returns a cropped buffer, or null if there is nothing to send.
+ * The returned buffer is backed by this buffer instance, meaning the view can change as the buffer is mutated. + * Used only in tests to assert the buffer's content. + */ + toBufferView(pos = this.endOfLastRow): Buffer { + return pos > 0 ? this.buffer.subarray(0, pos) : null; + } + + /** + * @return {Buffer} Returns a cropped buffer ready to send to the server, or null if there is nothing to send.
+ * The returned buffer is a copy of this buffer. + * It also compacts the buffer. + */ + toBufferNew(pos = this.endOfLastRow): Buffer { + if (pos > 0) { + const data = Buffer.allocUnsafe(pos); + this.buffer.copy(data, 0, 0, pos); + this.compact(); + return data; + } + return null; + } + + /** + * Write the table name into the buffer. + * + * @param {string} table - Table name. + * @return {Sender} Returns with a reference to this sender. + */ + table(table: string): SenderBuffer { + if (typeof table !== "string") { + throw new Error(`Table name must be a string, received ${typeof table}`); + } + if (this.hasTable) { + throw new Error("Table name has already been set"); + } + validateTableName(table, this.maxNameLength); + this.checkCapacity([table], table.length); + this.writeEscaped(table); + this.hasTable = true; + return this; + } + + /** + * Write a symbol name and value into the buffer. + * + * @param {string} name - Symbol name. + * @param {unknown} value - Symbol value, toString() is called to extract the actual symbol value from the parameter. + * @return {Sender} Returns with a reference to this sender. + */ + symbol(name: string, value: unknown): SenderBuffer { + if (typeof name !== "string") { + throw new Error(`Symbol name must be a string, received ${typeof name}`); + } + if (!this.hasTable || this.hasColumns) { + throw new Error( + "Symbol can be added only after table name is set and before any column added", + ); + } + const valueStr = value.toString(); + this.checkCapacity([name, valueStr], 2 + name.length + valueStr.length); + this.write(","); + validateColumnName(name, this.maxNameLength); + this.writeEscaped(name); + this.write("="); + this.writeEscaped(valueStr); + this.hasSymbols = true; + return this; + } + + /** + * Write a string column with its value into the buffer. + * + * @param {string} name - Column name. + * @param {string} value - Column value, accepts only string values. + * @return {Sender} Returns with a reference to this sender. + */ + stringColumn(name: string, value: string): SenderBuffer { + this.writeColumn( + name, + value, + () => { + this.checkCapacity([value], 2 + value.length); + this.write('"'); + this.writeEscaped(value, true); + this.write('"'); + }, + "string", + ); + return this; + } + + /** + * Write a boolean column with its value into the buffer. + * + * @param {string} name - Column name. + * @param {boolean} value - Column value, accepts only boolean values. + * @return {Sender} Returns with a reference to this sender. + */ + booleanColumn(name: string, value: boolean): SenderBuffer { + this.writeColumn( + name, + value, + () => { + this.checkCapacity([], 1); + this.write(value ? "t" : "f"); + }, + "boolean", + ); + return this; + } + + /** + * Write a float column with its value into the buffer. + * + * @param {string} name - Column name. + * @param {number} value - Column value, accepts only number values. + * @return {Sender} Returns with a reference to this sender. + */ + abstract floatColumn(name: string, value: number): SenderBuffer; + + /** + * Write an integer column with its value into the buffer. + * + * @param {string} name - Column name. + * @param {number} value - Column value, accepts only number values. + * @return {Sender} Returns with a reference to this sender. + * @throws Error if the value is not an integer + */ + intColumn(name: string, value: number): SenderBuffer { + if (!Number.isInteger(value)) { + throw new Error(`Value must be an integer, received ${value}`); + } + this.writeColumn(name, value, () => { + const valueStr = value.toString(); + this.checkCapacity([valueStr], 1); + this.write(valueStr); + this.write("i"); + }); + return this; + } + + /** + * Write a timestamp column with its value into the buffer. + * + * @param {string} name - Column name. + * @param {number | bigint} value - Epoch timestamp, accepts numbers or BigInts. + * @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'. + * @return {Sender} Returns with a reference to this sender. + */ + timestampColumn( + name: string, + value: number | bigint, + unit: TimestampUnit = "us", + ): SenderBuffer { + if (typeof value !== "bigint" && !Number.isInteger(value)) { + throw new Error(`Value must be an integer or BigInt, received ${value}`); + } + this.writeColumn(name, value, () => { + const valueMicros = timestampToMicros(BigInt(value), unit); + const valueStr = valueMicros.toString(); + this.checkCapacity([valueStr], 1); + this.write(valueStr); + this.write("t"); + }); + return this; + } + + /** + * Closing the row after writing the designated timestamp into the buffer. + * + * @param {number | bigint} timestamp - Designated epoch timestamp, accepts numbers or BigInts. + * @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'. + */ + at(timestamp: number | bigint, unit: TimestampUnit = "us") { + if (!this.hasSymbols && !this.hasColumns) { + throw new Error( + "The row must have a symbol or column set before it is closed", + ); + } + if (typeof timestamp !== "bigint" && !Number.isInteger(timestamp)) { + throw new Error( + `Designated timestamp must be an integer or BigInt, received ${timestamp}`, + ); + } + const timestampNanos = timestampToNanos(BigInt(timestamp), unit); + const timestampStr = timestampNanos.toString(); + this.checkCapacity([timestampStr], 2); + this.write(" "); + this.write(timestampStr); + this.write("\n"); + this.startNewRow(); + } + + /** + * Closing the row without writing designated timestamp into the buffer.
+ * Designated timestamp will be populated by the server on this record. + */ + atNow() { + if (!this.hasSymbols && !this.hasColumns) { + throw new Error( + "The row must have a symbol or column set before it is closed", + ); + } + this.checkCapacity([], 1); + this.write("\n"); + this.startNewRow(); + } + + /** + * Returns the current position of the buffer.
+ * New data will be written into the buffer starting from this position. + */ + currentPosition(): number { + return this.position; + } + + protected checkCapacity(data: string[], base = 0) { + let length = base; + for (const str of data) { + length += Buffer.byteLength(str, "utf8"); + } + if (this.position + length > this.bufferSize) { + let newSize = this.bufferSize; + do { + newSize += this.bufferSize; + } while (this.position + length > newSize); + this.resize(newSize); + } + } + + private compact() { + if (this.endOfLastRow > 0) { + this.buffer.copy(this.buffer, 0, this.endOfLastRow, this.position); + this.position = this.position - this.endOfLastRow; + this.endOfLastRow = 0; + } + } + + protected writeColumn( + name: string, + value: unknown, + writeValue: () => void, + valueType?: string, + ) { + if (typeof name !== "string") { + throw new Error(`Column name must be a string, received ${typeof name}`); + } + if (valueType && typeof value !== valueType) { + throw new Error( + `Column value must be of type ${valueType}, received ${typeof value}`, + ); + } + if (!this.hasTable) { + throw new Error("Column can be set only after table name is set"); + } + this.checkCapacity([name], 2 + name.length); + this.write(this.hasColumns ? "," : " "); + validateColumnName(name, this.maxNameLength); + this.writeEscaped(name); + this.write("="); + writeValue(); + this.hasColumns = true; + } + + protected write(data: string) { + this.position += this.buffer.write(data, this.position); + if (this.position > this.bufferSize) { + // should never happen, if checkCapacity() is correctly used + throw new Error( + `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, + ); + } + } + + protected writeByte(data: number) { + this.position = this.buffer.writeInt8(data, this.position); + if (this.position > this.bufferSize) { + // should never happen, if checkCapacity() is correctly used + throw new Error( + `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, + ); + } + } + + protected writeDouble(data: number) { + this.position = this.buffer.writeDoubleLE(data, this.position); + if (this.position > this.bufferSize) { + // should never happen, if checkCapacity() is correctly used + throw new Error( + `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, + ); + } + } + + private writeEscaped(data: string, quoted = false) { + for (const ch of data) { + if (ch > "\\") { + this.write(ch); + continue; + } + + switch (ch) { + case " ": + case ",": + case "=": + if (!quoted) { + this.write("\\"); + } + this.write(ch); + break; + case "\n": + case "\r": + this.write("\\"); + this.write(ch); + break; + case '"': + if (quoted) { + this.write("\\"); + } + this.write(ch); + break; + case "\\": + this.write("\\\\"); + break; + default: + this.write(ch); + break; + } + } + } +} + +export { SenderBufferBase }; diff --git a/src/buffer/bufferv1.ts b/src/buffer/bufferv1.ts new file mode 100644 index 0000000..ff36fb3 --- /dev/null +++ b/src/buffer/bufferv1.ts @@ -0,0 +1,37 @@ +// @ts-check +import { SenderOptions } from "../options"; +import { SenderBuffer } from "./index"; +import { SenderBufferBase } from "./base"; + +/** + * Buffer implementation for protocol version 1. + * Sends floating point numbers in their text form. + */ +class SenderBufferV1 extends SenderBufferBase { + constructor(options: SenderOptions) { + super(options); + } + + /** + * Write a float column with its value into the buffer using v1 serialization (text format). + * + * @param {string} name - Column name. + * @param {number} value - Column value, accepts only number values. + * @return {Sender} Returns with a reference to this sender. + */ + floatColumn(name: string, value: number): SenderBuffer { + this.writeColumn( + name, + value, + () => { + const valueStr = value.toString(); + this.checkCapacity([valueStr]); + this.write(valueStr); + }, + "number", + ); + return this; + } +} + +export { SenderBufferV1 }; diff --git a/src/buffer/bufferv2.ts b/src/buffer/bufferv2.ts new file mode 100644 index 0000000..3e812d8 --- /dev/null +++ b/src/buffer/bufferv2.ts @@ -0,0 +1,41 @@ +// @ts-check +import { SenderOptions } from "../options"; +import { SenderBuffer } from "./index"; +import { SenderBufferBase } from "./base"; + +const ENTITY_TYPE_DOUBLE: number = 16; +const EQUALS_SIGN: number = "=".charCodeAt(0); + +/** + * Buffer implementation for protocol version 2. + * Sends floating point numbers in binary form. + */ +class SenderBufferV2 extends SenderBufferBase { + constructor(options: SenderOptions) { + super(options); + } + + /** + * Write a float column with its value into the buffer using v2 serialization (binary format). + * + * @param {string} name - Column name. + * @param {number} value - Column value, accepts only number values. + * @return {Sender} Returns with a reference to this sender. + */ + floatColumn(name: string, value: number): SenderBuffer { + this.writeColumn( + name, + value, + () => { + this.checkCapacity([], 10); + this.writeByte(EQUALS_SIGN); + this.writeByte(ENTITY_TYPE_DOUBLE); + this.writeDouble(value); + }, + "number", + ); + return this; + } +} + +export { SenderBufferV2 }; diff --git a/src/buffer/index.ts b/src/buffer/index.ts index 425ac8a..cb72692 100644 --- a/src/buffer/index.ts +++ b/src/buffer/index.ts @@ -1,269 +1,121 @@ // @ts-check import { Buffer } from "node:buffer"; -import { log, Logger } from "../logging"; -import { validateColumnName, validateTableName } from "../validation"; -import { SenderOptions } from "../options"; import { - isInteger, - timestampToMicros, - timestampToNanos, - TimestampUnit, -} from "../utils"; - -const DEFAULT_MAX_NAME_LENGTH = 127; + SenderOptions, + PROTOCOL_VERSION_V1, + PROTOCOL_VERSION_V2, + PROTOCOL_VERSION_AUTO, +} from "../options"; +import { TimestampUnit } from "../utils"; +import { SenderBufferV1 } from "./bufferv1"; +import { SenderBufferV2 } from "./bufferv2"; const DEFAULT_BUFFER_SIZE = 65536; // 64 KB const DEFAULT_MAX_BUFFER_SIZE = 104857600; // 100 MB -/** - * Buffer used by the Sender. - */ -class SenderBuffer { - private bufferSize: number; - private readonly maxBufferSize: number; - private buffer: Buffer; - private position: number; - private endOfLastRow: number; - - private hasTable: boolean; - private hasSymbols: boolean; - private hasColumns: boolean; - - private readonly maxNameLength: number; - - private readonly log: Logger; - - /** - * Creates an instance of SenderBuffer. - * - * @param {SenderOptions} options - Sender configuration object.
- * See SenderOptions documentation for detailed description of configuration options.
- */ - constructor(options: SenderOptions) { - this.log = options && typeof options.log === "function" ? options.log : log; - SenderOptions.resolveDeprecated(options, this.log); - - this.maxNameLength = - options && isInteger(options.max_name_len, 1) - ? options.max_name_len - : DEFAULT_MAX_NAME_LENGTH; - - this.maxBufferSize = - options && isInteger(options.max_buf_size, 1) - ? options.max_buf_size - : DEFAULT_MAX_BUFFER_SIZE; - this.resize( - options && isInteger(options.init_buf_size, 1) - ? options.init_buf_size - : DEFAULT_BUFFER_SIZE, - ); - - this.reset(); - } - - /** - * Extends the size of the buffer.
- * Can be used to increase the size of buffer if overflown. - * The buffer's content is copied into the new buffer. - * - * @param {number} bufferSize - New size of the buffer used by the sender, provided in bytes. - */ - private resize(bufferSize: number) { - if (bufferSize > this.maxBufferSize) { +function createBuffer(options: SenderOptions): SenderBuffer { + switch (options.protocol_version) { + case PROTOCOL_VERSION_V2: + return new SenderBufferV2(options); + case PROTOCOL_VERSION_V1: + return new SenderBufferV1(options); + case PROTOCOL_VERSION_AUTO: + case undefined: + case null: + case "": + throw new Error( + "Provide the 'protocol_version' option, or call 'await SenderOptions.resolveAuto(options)' first", + ); + default: throw new Error( - `Max buffer size is ${this.maxBufferSize} bytes, requested buffer size: ${bufferSize}`, + "Unsupported protocol version: " + options.protocol_version, ); - } - this.bufferSize = bufferSize; - // Allocating an extra byte because Buffer.write() does not fail if the length of the data to be written is - // longer than the size of the buffer. It simply just writes whatever it can, and returns. - // If we can write into the extra byte, that indicates buffer overflow. - // See the check in the write() function. - const newBuffer = Buffer.alloc(this.bufferSize + 1, 0, "utf8"); - if (this.buffer) { - this.buffer.copy(newBuffer); - } - this.buffer = newBuffer; } +} +/** + * Buffer used by the Sender. + */ +interface SenderBuffer { /** * Resets the buffer, data added to the buffer will be lost.
* In other words it clears the buffer and sets the writing position to the beginning of the buffer. * * @return {Sender} Returns with a reference to this sender. */ - reset(): SenderBuffer { - this.position = 0; - this.startNewRow(); - return this; - } - - private startNewRow() { - this.endOfLastRow = this.position; - this.hasTable = false; - this.hasSymbols = false; - this.hasColumns = false; - } + reset(): SenderBuffer; /** - * @ignore - * @return {Buffer} Returns a cropped buffer, or null if there is nothing to send. - * The returned buffer is backed by the sender's buffer. - * Used only in tests. + * @return {Buffer} Returns a cropped buffer, or null if there is nothing to send.
+ * The returned buffer is backed by this buffer instance, meaning the view can change as the buffer is mutated. + * Used only in tests to assert the buffer's content. */ - toBufferView(pos = this.endOfLastRow): Buffer { - return pos > 0 ? this.buffer.subarray(0, pos) : null; - } + toBufferView(pos?: number): Buffer; /** - * @ignore - * @return {Buffer|null} Returns a cropped buffer ready to send to the server, or null if there is nothing to send. - * The returned buffer is a copy of the sender's buffer. - * It also compacts the Sender's buffer. + * @return {Buffer} Returns a cropped buffer ready to send to the server, or null if there is nothing to send.
+ * The returned buffer is a copy of this buffer. + * It also compacts the buffer. */ - toBufferNew(pos = this.endOfLastRow): Buffer | null { - if (pos > 0) { - const data = Buffer.allocUnsafe(pos); - this.buffer.copy(data, 0, 0, pos); - this.compact(); - return data; - } - return null; - } + toBufferNew(pos?: number): Buffer | null; /** - * Write the table name into the buffer of the sender. + * Write the table name into the buffer. * * @param {string} table - Table name. * @return {Sender} Returns with a reference to this sender. */ - table(table: string): SenderBuffer { - if (typeof table !== "string") { - throw new Error(`Table name must be a string, received ${typeof table}`); - } - if (this.hasTable) { - throw new Error("Table name has already been set"); - } - validateTableName(table, this.maxNameLength); - this.checkCapacity([table], table.length); - this.writeEscaped(table); - this.hasTable = true; - return this; - } + table(table: string): SenderBuffer; /** - * Write a symbol name and value into the buffer of the sender. + * Write a symbol name and value into the buffer. * * @param {string} name - Symbol name. * @param {unknown} value - Symbol value, toString() is called to extract the actual symbol value from the parameter. * @return {Sender} Returns with a reference to this sender. */ - symbol(name: string, value: unknown): SenderBuffer { - if (typeof name !== "string") { - throw new Error(`Symbol name must be a string, received ${typeof name}`); - } - if (!this.hasTable || this.hasColumns) { - throw new Error( - "Symbol can be added only after table name is set and before any column added", - ); - } - const valueStr = value.toString(); - this.checkCapacity([name, valueStr], 2 + name.length + valueStr.length); - this.write(","); - validateColumnName(name, this.maxNameLength); - this.writeEscaped(name); - this.write("="); - this.writeEscaped(valueStr); - this.hasSymbols = true; - return this; - } + symbol(name: string, value: unknown): SenderBuffer; /** - * Write a string column with its value into the buffer of the sender. + * Write a string column with its value into the buffer. * * @param {string} name - Column name. * @param {string} value - Column value, accepts only string values. * @return {Sender} Returns with a reference to this sender. */ - stringColumn(name: string, value: string): SenderBuffer { - this.writeColumn( - name, - value, - () => { - this.checkCapacity([value], 2 + value.length); - this.write('"'); - this.writeEscaped(value, true); - this.write('"'); - }, - "string", - ); - return this; - } + stringColumn(name: string, value: string): SenderBuffer; /** - * Write a boolean column with its value into the buffer of the sender. + * Write a boolean column with its value into the buffer. * * @param {string} name - Column name. * @param {boolean} value - Column value, accepts only boolean values. * @return {Sender} Returns with a reference to this sender. */ - booleanColumn(name: string, value: boolean): SenderBuffer { - this.writeColumn( - name, - value, - () => { - this.checkCapacity([], 1); - this.write(value ? "t" : "f"); - }, - "boolean", - ); - return this; - } + booleanColumn(name: string, value: boolean): SenderBuffer; /** - * Write a float column with its value into the buffer of the sender. + * Write a float column with its value into the buffer. * * @param {string} name - Column name. * @param {number} value - Column value, accepts only number values. * @return {Sender} Returns with a reference to this sender. */ - floatColumn(name: string, value: number): SenderBuffer { - this.writeColumn( - name, - value, - () => { - const valueStr = value.toString(); - this.checkCapacity([valueStr]); - this.write(valueStr); - }, - "number", - ); - return this; - } + floatColumn(name: string, value: number): SenderBuffer; /** - * Write an integer column with its value into the buffer of the sender. + * Write an integer column with its value into the buffer. * * @param {string} name - Column name. * @param {number} value - Column value, accepts only number values. * @return {Sender} Returns with a reference to this sender. + * @throws Error if the value is not an integer */ - intColumn(name: string, value: number): SenderBuffer { - if (!Number.isInteger(value)) { - throw new Error(`Value must be an integer, received ${value}`); - } - this.writeColumn(name, value, () => { - const valueStr = value.toString(); - this.checkCapacity([valueStr], 1); - this.write(valueStr); - this.write("i"); - }); - return this; - } + intColumn(name: string, value: number): SenderBuffer; /** - * Write a timestamp column with its value into the buffer of the sender. + * Write a timestamp column with its value into the buffer. * * @param {string} name - Column name. * @param {number | bigint} value - Epoch timestamp, accepts numbers or BigInts. @@ -273,164 +125,33 @@ class SenderBuffer { timestampColumn( name: string, value: number | bigint, - unit: TimestampUnit = "us", - ): SenderBuffer { - if (typeof value !== "bigint" && !Number.isInteger(value)) { - throw new Error(`Value must be an integer or BigInt, received ${value}`); - } - this.writeColumn(name, value, () => { - const valueMicros = timestampToMicros(BigInt(value), unit); - const valueStr = valueMicros.toString(); - this.checkCapacity([valueStr], 1); - this.write(valueStr); - this.write("t"); - }); - return this; - } + unit: TimestampUnit, + ): SenderBuffer; /** - * Closing the row after writing the designated timestamp into the buffer of the sender. + * Closing the row after writing the designated timestamp into the buffer. * * @param {number | bigint} timestamp - Designated epoch timestamp, accepts numbers or BigInts. * @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'. */ - at(timestamp: number | bigint, unit: TimestampUnit = "us") { - if (!this.hasSymbols && !this.hasColumns) { - throw new Error( - "The row must have a symbol or column set before it is closed", - ); - } - if (typeof timestamp !== "bigint" && !Number.isInteger(timestamp)) { - throw new Error( - `Designated timestamp must be an integer or BigInt, received ${timestamp}`, - ); - } - const timestampNanos = timestampToNanos(BigInt(timestamp), unit); - const timestampStr = timestampNanos.toString(); - this.checkCapacity([timestampStr], 2); - this.write(" "); - this.write(timestampStr); - this.write("\n"); - this.startNewRow(); - } + at(timestamp: number | bigint, unit: TimestampUnit): void; /** - * Closing the row without writing designated timestamp into the buffer of the sender.
+ * Closing the row without writing designated timestamp into the buffer.
* Designated timestamp will be populated by the server on this record. */ - atNow() { - if (!this.hasSymbols && !this.hasColumns) { - throw new Error( - "The row must have a symbol or column set before it is closed", - ); - } - this.checkCapacity([], 1); - this.write("\n"); - this.startNewRow(); - } + atNow(): void; /** - * Returns the current position of the buffer. + * Returns the current position of the buffer.
* New data will be written into the buffer starting from this position. */ - currentPosition() { - return this.position; - } - - private checkCapacity(data: string[], base = 0) { - let length = base; - for (const str of data) { - length += Buffer.byteLength(str, "utf8"); - } - if (this.position + length > this.bufferSize) { - let newSize = this.bufferSize; - do { - newSize += this.bufferSize; - } while (this.position + length > newSize); - this.resize(newSize); - } - } - - private compact() { - if (this.endOfLastRow > 0) { - this.buffer.copy(this.buffer, 0, this.endOfLastRow, this.position); - this.position = this.position - this.endOfLastRow; - this.endOfLastRow = 0; - } - } - - private writeColumn( - name: string, - value: unknown, - writeValue: () => void, - valueType?: string, - ) { - if (typeof name !== "string") { - throw new Error(`Column name must be a string, received ${typeof name}`); - } - if (valueType && typeof value !== valueType) { - throw new Error( - `Column value must be of type ${valueType}, received ${typeof value}`, - ); - } - if (!this.hasTable) { - throw new Error("Column can be set only after table name is set"); - } - this.checkCapacity([name], 2 + name.length); - this.write(this.hasColumns ? "," : " "); - validateColumnName(name, this.maxNameLength); - this.writeEscaped(name); - this.write("="); - writeValue(); - this.hasColumns = true; - } - - private write(data: string) { - this.position += this.buffer.write(data, this.position); - if (this.position > this.bufferSize) { - // should never happen, if checkCapacity() is correctly used - throw new Error( - `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, - ); - } - } - - private writeEscaped(data: string, quoted = false) { - for (const ch of data) { - if (ch > "\\") { - this.write(ch); - continue; - } - - switch (ch) { - case " ": - case ",": - case "=": - if (!quoted) { - this.write("\\"); - } - this.write(ch); - break; - case "\n": - case "\r": - this.write("\\"); - this.write(ch); - break; - case '"': - if (quoted) { - this.write("\\"); - } - this.write(ch); - break; - case "\\": - this.write("\\\\"); - break; - default: - this.write(ch); - break; - } - } - } + currentPosition(): number; } -export { SenderBuffer, DEFAULT_BUFFER_SIZE, DEFAULT_MAX_BUFFER_SIZE }; +export { + SenderBuffer, + createBuffer, + DEFAULT_BUFFER_SIZE, + DEFAULT_MAX_BUFFER_SIZE, +}; diff --git a/src/options.ts b/src/options.ts index 35e21fa..59c4853 100644 --- a/src/options.ts +++ b/src/options.ts @@ -4,6 +4,7 @@ import http from "http"; import https from "https"; import { Logger } from "./logging"; +import { fetchJson } from "./utils"; const HTTP_PORT = 9000; const TCP_PORT = 9009; @@ -17,6 +18,12 @@ const ON = "on"; const OFF = "off"; const UNSAFE_OFF = "unsafe_off"; +const PROTOCOL_VERSION_AUTO = "auto"; +const PROTOCOL_VERSION_V1 = "1"; +const PROTOCOL_VERSION_V2 = "2"; + +const LINE_PROTO_SUPPORT_VERSION = "line.proto.support.versions"; + type ExtraOptions = { log?: Logger; agent?: Agent | http.Agent | https.Agent; @@ -44,6 +51,11 @@ type DeprecatedOptions = { *
  • protocol: enum, accepted values: http, https, tcp, tcps - The protocol used to communicate with the server.
    * When https or tcps used, the connection is secured with TLS encryption. *
  • + *
  • protocol_version: enum, accepted values: auto, 1, 2 - The protocol version used for data serialization.
    + * Version 1 uses text-based serialization for all data types. Version 2 uses binary encoding for doubles.
    + * When set to 'auto' (default for HTTP/HTTPS), the client automatically negotiates the highest supported version with the server.
    + * TCP/TCPS connections default to version 1. + *
  • *
  • addr: string - Hostname and port, separated by colon. This key is mandatory, but the port part is optional.
    * If no port is specified, a default will be used.
    * When the protocol is HTTP/HTTPS, the port defaults to 9000. When the protocol is TCP/TCPS, the port defaults to 9009.
    @@ -130,6 +142,8 @@ type DeprecatedOptions = { */ class SenderOptions { protocol: string; + protocol_version?: string; + addr?: string; host?: string; // derived from addr port?: number; // derived from addr @@ -205,6 +219,42 @@ class SenderOptions { } } + /** + * Resolves the protocol version, if it is set to 'auto'.
    + * If TCP transport is used, the protocol version will default to 1. + * In case of HTTP transport the /settings endpoint of the database is used to find the protocol versions + * supported by the server, and the highest will be selected. + * @param options SenderOptions instance needs resolving protocol version + */ + static async resolveAuto(options: SenderOptions) { + parseProtocolVersion(options); + if (options.protocol_version !== PROTOCOL_VERSION_AUTO) { + return options; + } + + const url = `${options.protocol}://${options.host}:${options.port}/settings`; + const settings: { + config: { LINE_PROTO_SUPPORT_VERSION: number[] }; + } = await fetchJson(url); + const supportedVersions: string[] = ( + settings.config[LINE_PROTO_SUPPORT_VERSION] ?? [] + ).map((version: unknown) => String(version)); + + if (supportedVersions.length === 0) { + options.protocol_version = PROTOCOL_VERSION_V1; + } else if (supportedVersions.includes(PROTOCOL_VERSION_V2)) { + options.protocol_version = PROTOCOL_VERSION_V2; + } else if (supportedVersions.includes(PROTOCOL_VERSION_V1)) { + options.protocol_version = PROTOCOL_VERSION_V1; + } else { + throw new Error( + "Unsupported protocol versions received from server: " + + supportedVersions, + ); + } + return options; + } + static resolveDeprecated( options: SenderOptions & DeprecatedOptions, log: Logger, @@ -250,11 +300,13 @@ class SenderOptions { * * @return {SenderOptions} A Sender configuration object initialized from the provided configuration string. */ - static fromConfig( + static async fromConfig( configurationString: string, extraOptions?: ExtraOptions, - ): SenderOptions { - return new SenderOptions(configurationString, extraOptions); + ): Promise { + const options = new SenderOptions(configurationString, extraOptions); + await SenderOptions.resolveAuto(options); + return options; } /** @@ -268,8 +320,11 @@ class SenderOptions { * * @return {SenderOptions} A Sender configuration object initialized from the QDB_CLIENT_CONF environment variable. */ - static fromEnv(extraOptions?: ExtraOptions): SenderOptions { - return SenderOptions.fromConfig(process.env.QDB_CLIENT_CONF, extraOptions); + static async fromEnv(extraOptions?: ExtraOptions): Promise { + return await SenderOptions.fromConfig( + process.env.QDB_CLIENT_CONF, + extraOptions, + ); } } @@ -283,6 +338,7 @@ function parseConfigurationString( const position = parseProtocol(options, configString); parseSettings(options, configString, position); + parseProtocolVersion(options); parseAddress(options); parseBufferSizes(options); parseAutoFlushOptions(options); @@ -336,6 +392,7 @@ function parseSetting( } const ValidConfigKeys = [ + "protocol_version", "addr", "username", "password", @@ -401,6 +458,30 @@ function parseProtocol(options: SenderOptions, configString: string) { return index + 2; } +function parseProtocolVersion(options: SenderOptions) { + const protocol_version = options.protocol_version ?? PROTOCOL_VERSION_AUTO; + switch (protocol_version) { + case PROTOCOL_VERSION_AUTO: + switch (options.protocol) { + case HTTP: + case HTTPS: + options.protocol_version = PROTOCOL_VERSION_AUTO; + break; + default: + options.protocol_version = PROTOCOL_VERSION_V1; + } + break; + case PROTOCOL_VERSION_V1: + case PROTOCOL_VERSION_V2: + break; + default: + throw new Error( + `Invalid protocol version: '${protocol_version}', accepted values: 'auto', '1', '2'`, + ); + } + return; +} + function parseAddress(options: SenderOptions) { if (!options.addr) { throw new Error("Invalid configuration, 'addr' is required"); @@ -523,4 +604,14 @@ function parseInteger( } } -export { SenderOptions, ExtraOptions, HTTP, HTTPS, TCP, TCPS }; +export { + SenderOptions, + ExtraOptions, + HTTP, + HTTPS, + TCP, + TCPS, + PROTOCOL_VERSION_AUTO, + PROTOCOL_VERSION_V1, + PROTOCOL_VERSION_V2, +}; diff --git a/src/sender.ts b/src/sender.ts index 5c88de2..bd41d89 100644 --- a/src/sender.ts +++ b/src/sender.ts @@ -2,8 +2,8 @@ import { log, Logger } from "./logging"; import { SenderOptions, ExtraOptions } from "./options"; import { SenderTransport, createTransport } from "./transport"; +import { SenderBuffer, createBuffer } from "./buffer"; import { isBoolean, isInteger, TimestampUnit } from "./utils"; -import { SenderBuffer } from "./buffer"; const DEFAULT_AUTO_FLUSH_INTERVAL = 1000; // 1 sec @@ -72,7 +72,7 @@ class Sender { */ constructor(options: SenderOptions) { this.transport = createTransport(options); - this.buffer = new SenderBuffer(options); + this.buffer = createBuffer(options); this.log = typeof options.log === "function" ? options.log : log; @@ -99,12 +99,12 @@ class Sender { * * @return {Sender} A Sender object initialized from the provided configuration string. */ - static fromConfig( + static async fromConfig( configurationString: string, extraOptions?: ExtraOptions, - ): Sender { + ): Promise { return new Sender( - SenderOptions.fromConfig(configurationString, extraOptions), + await SenderOptions.fromConfig(configurationString, extraOptions), ); } @@ -119,9 +119,9 @@ class Sender { * * @return {Sender} A Sender object initialized from the QDB_CLIENT_CONF environment variable. */ - static fromEnv(extraOptions?: ExtraOptions): Sender { + static async fromEnv(extraOptions?: ExtraOptions): Promise { return new Sender( - SenderOptions.fromConfig(process.env.QDB_CLIENT_CONF, extraOptions), + await SenderOptions.fromConfig(process.env.QDB_CLIENT_CONF, extraOptions), ); } diff --git a/src/utils.ts b/src/utils.ts index e4e67fa..0867b2c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -36,10 +36,34 @@ function timestampToNanos(timestamp: bigint, unit: TimestampUnit) { } } +/** + * Fetches JSON data from a URL. + * @template T - The expected type of the JSON response + * @param url - The URL to fetch from + * @returns Promise resolving to the parsed JSON data + * @throws Error if the request fails or returns a non-OK status + */ +async function fetchJson(url: string): Promise { + let response: globalThis.Response; + try { + response = await fetch(url); + } catch (error) { + throw new Error(`Failed to load ${url} [error=${error}]`); + } + + if (!response.ok) { + throw new Error( + `Failed to load ${url} [statusCode=${response.status} (${response.statusText})]`, + ); + } + return (await response.json()) as T; +} + export { isBoolean, isInteger, timestampToMicros, timestampToNanos, TimestampUnit, + fetchJson, }; diff --git a/test/options.test.ts b/test/options.test.ts index e8be4e5..19ce449 100644 --- a/test/options.test.ts +++ b/test/options.test.ts @@ -1,15 +1,42 @@ // @ts-check -import { describe, it, expect } from "vitest"; +import { describe, it, expect, beforeAll, afterAll } from "vitest"; import { Agent } from "undici"; import { SenderOptions } from "../src/options"; +import { MockHttp } from "./util/mockhttp"; +import { readFileSync } from "fs"; + +const MOCK_HTTP_PORT = 9097; +const MOCK_HTTPS_PORT = 9096; + +const proxyOptions = { + key: readFileSync("test/certs/server/server.key"), + cert: readFileSync("test/certs/server/server.crt"), + ca: readFileSync("test/certs/ca/ca.crt"), +}; + +process.env["NODE_TLS_REJECT_UNAUTHORIZED"] = "0"; describe("Configuration string parser suite", function () { - it("can parse a basic config string", function () { - const options = SenderOptions.fromConfig( - "https::addr=host;username=user1;password=pwd;", + const mockHttp = new MockHttp(); + const mockHttps = new MockHttp(); + + beforeAll(async function () { + await mockHttp.start(MOCK_HTTP_PORT); + await mockHttps.start(MOCK_HTTPS_PORT, true, proxyOptions); + }); + + afterAll(async function () { + await mockHttp.stop(); + await mockHttps.stop(); + }, 30000); + + it("can parse a basic config string", async function () { + const options = await SenderOptions.fromConfig( + "https::addr=host;username=user1;password=pwd;protocol_version=2", ); expect(options.protocol).toBe("https"); + expect(options.protocol_version).toBe("2"); expect(options.addr).toBe("host"); expect(options.username).toBe("user1"); expect(options.password).toBe("pwd"); @@ -17,47 +44,63 @@ describe("Configuration string parser suite", function () { it("can parse a config string from environment variable", async function () { process.env.QDB_CLIENT_CONF = "tcp::addr=host;"; - const options = SenderOptions.fromEnv(); + const options = await SenderOptions.fromEnv(); expect(options.protocol).toBe("tcp"); expect(options.addr).toBe("host"); }); - it("accepts only lowercase protocols", function () { - let options = SenderOptions.fromConfig("tcp::addr=host;"); + it("accepts only lowercase protocols", async function () { + let options = await SenderOptions.fromConfig("tcp::addr=host;"); expect(options.protocol).toBe("tcp"); - options = SenderOptions.fromConfig("tcps::addr=host;"); + options = await SenderOptions.fromConfig("tcps::addr=host;"); expect(options.protocol).toBe("tcps"); - options = SenderOptions.fromConfig("http::addr=host;"); + options = await SenderOptions.fromConfig( + "http::addr=host;protocol_version=2", + ); expect(options.protocol).toBe("http"); - options = SenderOptions.fromConfig("https::addr=host;"); + options = await SenderOptions.fromConfig( + "https::addr=host;protocol_version=2", + ); expect(options.protocol).toBe("https"); - expect(() => SenderOptions.fromConfig("HTTP::")).toThrow( + await expect( + async () => await SenderOptions.fromConfig("HTTP::"), + ).rejects.toThrow( "Invalid protocol: 'HTTP', accepted protocols: 'http', 'https', 'tcp', 'tcps'", ); - expect(() => SenderOptions.fromConfig("Http::")).toThrow( + await expect( + async () => await SenderOptions.fromConfig("Http::"), + ).rejects.toThrow( "Invalid protocol: 'Http', accepted protocols: 'http', 'https', 'tcp', 'tcps'", ); - expect(() => SenderOptions.fromConfig("HtTps::")).toThrow( + await expect( + async () => await SenderOptions.fromConfig("HtTps::"), + ).rejects.toThrow( "Invalid protocol: 'HtTps', accepted protocols: 'http', 'https', 'tcp', 'tcps'", ); - expect(() => SenderOptions.fromConfig("TCP::")).toThrow( + await expect( + async () => await SenderOptions.fromConfig("TCP::"), + ).rejects.toThrow( "Invalid protocol: 'TCP', accepted protocols: 'http', 'https', 'tcp', 'tcps'", ); - expect(() => SenderOptions.fromConfig("TcP::")).toThrow( + await expect( + async () => await SenderOptions.fromConfig("TcP::"), + ).rejects.toThrow( "Invalid protocol: 'TcP', accepted protocols: 'http', 'https', 'tcp', 'tcps'", ); - expect(() => SenderOptions.fromConfig("Tcps::")).toThrow( + await expect( + async () => await SenderOptions.fromConfig("Tcps::"), + ).rejects.toThrow( "Invalid protocol: 'Tcps', accepted protocols: 'http', 'https', 'tcp', 'tcps'", ); }); - it("considers that keys and values are case-sensitive", function () { - const options = SenderOptions.fromConfig( + it("considers that keys and values are case-sensitive", async function () { + const options = await SenderOptions.fromConfig( "tcps::addr=Host;username=useR1;token=TOKEN;", ); expect(options.protocol).toBe("tcps"); @@ -65,47 +108,63 @@ describe("Configuration string parser suite", function () { expect(options.username).toBe("useR1"); expect(options.token).toBe("TOKEN"); - expect(() => - SenderOptions.fromConfig("tcps::addr=Host;UserNAME=useR1;PaSswOrD=pWd;"), - ).toThrow("Unknown configuration key: 'UserNAME'"); - expect(() => - SenderOptions.fromConfig("tcps::addr=Host;PaSswOrD=pWd;"), - ).toThrow("Unknown configuration key: 'PaSswOrD'"); + await expect( + async () => + await SenderOptions.fromConfig( + "tcps::addr=Host;UserNAME=useR1;PaSswOrD=pWd;", + ), + ).rejects.toThrow("Unknown configuration key: 'UserNAME'"); + await expect( + async () => + await SenderOptions.fromConfig("tcps::addr=Host;PaSswOrD=pWd;"), + ).rejects.toThrow("Unknown configuration key: 'PaSswOrD'"); }); - it("can parse with or without the last semicolon", function () { - let options = SenderOptions.fromConfig("https::addr=host:9002"); + it("can parse with or without the last semicolon", async function () { + let options = await SenderOptions.fromConfig( + "https::addr=host:9002;protocol_version=2;", + ); expect(options.protocol).toBe("https"); expect(options.addr).toBe("host:9002"); - options = SenderOptions.fromConfig("https::addr=host:9002;"); + options = await SenderOptions.fromConfig( + "https::addr=host:9002;protocol_version=2", + ); expect(options.protocol).toBe("https"); expect(options.addr).toBe("host:9002"); - options = SenderOptions.fromConfig("https::addr=host:9002;token=abcde"); + options = await SenderOptions.fromConfig( + "https::addr=host:9002;token=abcde;protocol_version=2", + ); expect(options.protocol).toBe("https"); expect(options.addr).toBe("host:9002"); expect(options.token).toBe("abcde"); - options = SenderOptions.fromConfig("https::addr=host:9002;token=abcde;"); + options = await SenderOptions.fromConfig( + "https::addr=host:9002;token=abcde;protocol_version=2;", + ); expect(options.protocol).toBe("https"); expect(options.addr).toBe("host:9002"); expect(options.token).toBe("abcde"); - options = SenderOptions.fromConfig("https::addr=host:9002;token=abcde;;"); + options = await SenderOptions.fromConfig( + "https::addr=host:9002;protocol_version=2;token=abcde;;", + ); expect(options.protocol).toBe("https"); expect(options.addr).toBe("host:9002"); expect(options.token).toBe("abcde;"); - options = SenderOptions.fromConfig("https::addr=host:9002;token=abcde;;;"); + options = await SenderOptions.fromConfig( + "https::addr=host:9002;protocol_version=2;token=abcde;;;", + ); expect(options.protocol).toBe("https"); expect(options.addr).toBe("host:9002"); expect(options.token).toBe("abcde;"); }); - it("can parse escaped config string values", function () { - const options = SenderOptions.fromConfig( - "https::addr=host:9002;username=us;;;;;;er;;1;;;password=p;;wd;", + it("can parse escaped config string values", async function () { + const options = await SenderOptions.fromConfig( + "https::addr=host:9002;protocol_version=2;username=us;;;;;;er;;1;;;password=p;;wd;", ); expect(options.protocol).toBe("https"); expect(options.addr).toBe("host:9002"); @@ -113,17 +172,18 @@ describe("Configuration string parser suite", function () { expect(options.password).toBe("p;wd"); }); - it("can parse the address", function () { - let options = SenderOptions.fromConfig( - "https::addr=host1:9002;token=resttoken123;", + it("can parse the address", async function () { + let options = await SenderOptions.fromConfig( + "https::addr=host1:9002;token=resttoken123;protocol_version=2;", ); expect(options.protocol).toBe("https"); + expect(options.protocol_version).toBe("2"); expect(options.addr).toBe("host1:9002"); expect(options.host).toBe("host1"); expect(options.port).toBe(9002); expect(options.token).toBe("resttoken123"); - options = SenderOptions.fromConfig( + options = await SenderOptions.fromConfig( "tcps::addr=host2:9005;username=user1;token=jwkprivkey123;", ); expect(options.protocol).toBe("tcps"); @@ -134,9 +194,9 @@ describe("Configuration string parser suite", function () { expect(options.token).toBe("jwkprivkey123"); }); - it("can default the port", function () { - let options = SenderOptions.fromConfig( - "https::addr=hostname;token=resttoken123;", + it("can default the port", async function () { + let options = await SenderOptions.fromConfig( + "https::addr=hostname;protocol_version=2;token=resttoken123;", ); expect(options.protocol).toBe("https"); expect(options.addr).toBe("hostname"); @@ -144,8 +204,8 @@ describe("Configuration string parser suite", function () { expect(options.port).toBe(9000); expect(options.token).toBe("resttoken123"); - options = SenderOptions.fromConfig( - "http::addr=hostname;token=resttoken123;", + options = await SenderOptions.fromConfig( + "http::addr=hostname;protocol_version=2;token=resttoken123;", ); expect(options.protocol).toBe("http"); expect(options.addr).toBe("hostname"); @@ -153,7 +213,7 @@ describe("Configuration string parser suite", function () { expect(options.port).toBe(9000); expect(options.token).toBe("resttoken123"); - options = SenderOptions.fromConfig( + options = await SenderOptions.fromConfig( "tcps::addr=hostname;username=user1;token=jwkprivkey123;", ); expect(options.protocol).toBe("tcps"); @@ -163,7 +223,7 @@ describe("Configuration string parser suite", function () { expect(options.username).toBe("user1"); expect(options.token).toBe("jwkprivkey123"); - options = SenderOptions.fromConfig( + options = await SenderOptions.fromConfig( "tcp::addr=hostname;username=user1;token=jwkprivkey123;", ); expect(options.protocol).toBe("tcp"); @@ -174,89 +234,284 @@ describe("Configuration string parser suite", function () { expect(options.token).toBe("jwkprivkey123"); }); - it("fails if port is not a positive integer", function () { - expect(() => SenderOptions.fromConfig("tcp::addr=host:;")).toThrow( - "Port is required", + it("can parse protocol version", async function () { + // invalid protocol version + await expect( + async () => + await SenderOptions.fromConfig("tcp::addr=hostname;protocol_version=3"), + ).rejects.toThrow( + "Invalid protocol version: '3', accepted values: 'auto', '1', '2'", ); - expect(() => SenderOptions.fromConfig("tcp::addr=host:0")).toThrow( - "Invalid port: 0", + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=hostname;protocol_version=0", + ), + ).rejects.toThrow( + "Invalid protocol version: '0', accepted values: 'auto', '1', '2'", ); - expect(() => SenderOptions.fromConfig("tcp::addr=host:0.2")).toThrow( - "Invalid port: 0.2", + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=hostname;protocol_version=-1", + ), + ).rejects.toThrow( + "Invalid protocol version: '-1', accepted values: 'auto', '1', '2'", ); - expect(() => SenderOptions.fromConfig("tcp::addr=host:-2")).toThrow( - "Invalid port: -2", + await expect( + async () => + await SenderOptions.fromConfig( + "https::addr=hostname;protocol_version=automatic", + ), + ).rejects.toThrow( + "Invalid protocol version: 'automatic', accepted values: 'auto', '1', '2'", ); - expect(() => SenderOptions.fromConfig("tcp::addr=host:!;")).toThrow( - "Invalid port: '!'", + + let options: SenderOptions; + + // defaults with supported versions: 1,2 + mockHttp.reset(); + mockHttps.reset(); + options = await SenderOptions.fromConfig("tcp::addr=localhost"); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig("tcps::addr=localhost"); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig( + `http::addr=localhost:${MOCK_HTTP_PORT}`, ); - expect(() => SenderOptions.fromConfig("tcp::addr=host:9009x;")).toThrow( - "Invalid port: '9009x'", + expect(options.protocol_version).toBe("2"); + options = await SenderOptions.fromConfig( + `https::addr=localhost:${MOCK_HTTPS_PORT}`, ); - expect(() => SenderOptions.fromConfig("tcp::addr=host:900 9;")).toThrow( - "Invalid port: '900 9'", + expect(options.protocol_version).toBe("2"); + + // defaults with supported versions: 1 + const only1 = { + settings: { + config: { "line.proto.support.versions": [1] }, + }, + }; + mockHttp.reset(only1); + mockHttps.reset(only1); + options = await SenderOptions.fromConfig("tcp::addr=localhost"); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig("tcps::addr=localhost"); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig( + `http::addr=localhost:${MOCK_HTTP_PORT}`, ); - }); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig( + `https::addr=localhost:${MOCK_HTTPS_PORT}`, + ); + expect(options.protocol_version).toBe("1"); + + // defaults with no supported versions + const noVersions = { + settings: { + config: {}, + }, + }; + mockHttp.reset(noVersions); + mockHttps.reset(noVersions); + options = await SenderOptions.fromConfig("tcp::addr=localhost"); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig("tcps::addr=localhost"); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig( + `http::addr=localhost:${MOCK_HTTP_PORT}`, + ); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig( + `https::addr=localhost:${MOCK_HTTPS_PORT}`, + ); + expect(options.protocol_version).toBe("1"); + + // defaults with no match with supported versions + const no1and2 = { + settings: { + config: { "line.proto.support.versions": [3, 5] }, + }, + }; + mockHttp.reset(no1and2); + mockHttps.reset(no1and2); + options = await SenderOptions.fromConfig("tcp::addr=localhost"); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig("tcps::addr=localhost"); + expect(options.protocol_version).toBe("1"); + await expect( + async () => + await SenderOptions.fromConfig( + `http::addr=localhost:${MOCK_HTTP_PORT}`, + ), + ).rejects.toThrow( + "Unsupported protocol versions received from server: 3,5", + ); + await expect( + async () => + await SenderOptions.fromConfig( + `https::addr=localhost:${MOCK_HTTPS_PORT}`, + ), + ).rejects.toThrow( + "Unsupported protocol versions received from server: 3,5", + ); + + // auto, 1, 2 with each protocol (tcp, tcps, http, https), supported versions: 1,2 + mockHttp.reset(); + mockHttps.reset(); + options = await SenderOptions.fromConfig( + "tcp::addr=localhost;protocol_version=1", + ); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig( + "tcp::addr=localhost;protocol_version=2", + ); + expect(options.protocol_version).toBe("2"); + options = await SenderOptions.fromConfig( + "tcp::addr=localhost;protocol_version=auto", + ); + expect(options.protocol_version).toBe("1"); + + options = await SenderOptions.fromConfig( + "tcps::addr=localhost;protocol_version=1", + ); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig( + "tcps::addr=localhost;protocol_version=2", + ); + expect(options.protocol_version).toBe("2"); + options = await SenderOptions.fromConfig( + "tcps::addr=localhost;protocol_version=auto", + ); + expect(options.protocol_version).toBe("1"); - it("fails if init_buf_size is not a positive integer", function () { - expect(() => - SenderOptions.fromConfig("tcp::addr=host;init_buf_size=;"), - ).toThrow("Invalid configuration, value is not set for 'init_buf_size'"); - expect(() => - SenderOptions.fromConfig("tcp::addr=host;init_buf_size=1024a;"), - ).toThrow("Invalid initial buffer size option, not a number: '1024a'"); - expect(() => - SenderOptions.fromConfig("tcp::addr=host;init_buf_size=102 4;"), - ).toThrow("Invalid initial buffer size option, not a number: '102 4'"); - expect(() => - SenderOptions.fromConfig("tcp::addr=host;init_buf_size=0;"), - ).toThrow("Invalid initial buffer size option: 0"); + options = await SenderOptions.fromConfig( + `http::addr=localhost:${MOCK_HTTP_PORT};protocol_version=1`, + ); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig( + `http::addr=localhost:${MOCK_HTTP_PORT};protocol_version=2`, + ); + expect(options.protocol_version).toBe("2"); + options = await SenderOptions.fromConfig( + `http::addr=localhost:${MOCK_HTTP_PORT};protocol_version=auto`, + ); + expect(options.protocol_version).toBe("2"); + + options = await SenderOptions.fromConfig( + `https::addr=localhost:${MOCK_HTTPS_PORT};protocol_version=1`, + ); + expect(options.protocol_version).toBe("1"); + options = await SenderOptions.fromConfig( + `https::addr=localhost:${MOCK_HTTPS_PORT};protocol_version=2`, + ); + expect(options.protocol_version).toBe("2"); + options = await SenderOptions.fromConfig( + `https::addr=localhost:${MOCK_HTTPS_PORT};protocol_version=auto`, + ); + expect(options.protocol_version).toBe("2"); }); - it("fails if max_buf_size is not a positive integer", function () { - expect(() => - SenderOptions.fromConfig("tcp::addr=host;max_buf_size=;"), - ).toThrow("Invalid configuration, value is not set for 'max_buf_size'"); - expect(() => - SenderOptions.fromConfig("tcp::addr=host;max_buf_size=1024a;"), - ).toThrow("Invalid max buffer size option, not a number: '1024a'"); - expect(() => - SenderOptions.fromConfig("tcp::addr=host;max_buf_size=102 4;"), - ).toThrow("Invalid max buffer size option, not a number: '102 4'"); - expect(() => - SenderOptions.fromConfig("tcp::addr=host;max_buf_size=0;"), - ).toThrow("Invalid max buffer size option: 0"); + it("fails if port is not a positive integer", async function () { + await expect( + async () => await SenderOptions.fromConfig("tcp::addr=host:;"), + ).rejects.toThrow("Port is required"); + await expect( + async () => await SenderOptions.fromConfig("tcp::addr=host:0"), + ).rejects.toThrow("Invalid port: 0"); + await expect( + async () => await SenderOptions.fromConfig("tcp::addr=host:0.2"), + ).rejects.toThrow("Invalid port: 0.2"); + await expect( + async () => await SenderOptions.fromConfig("tcp::addr=host:-2"), + ).rejects.toThrow("Invalid port: -2"); + await expect( + async () => await SenderOptions.fromConfig("tcp::addr=host:!;"), + ).rejects.toThrow("Invalid port: '!'"); + await expect( + async () => await SenderOptions.fromConfig("tcp::addr=host:9009x;"), + ).rejects.toThrow("Invalid port: '9009x'"); + await expect( + async () => await SenderOptions.fromConfig("tcp::addr=host:900 9;"), + ).rejects.toThrow("Invalid port: '900 9'"); }); - it("rejects missing or empty hostname", function () { - expect(() => SenderOptions.fromConfig("http::")).toThrow( - "Invalid configuration, 'addr' is required", + it("fails if init_buf_size is not a positive integer", async function () { + await expect( + async () => + await SenderOptions.fromConfig("tcp::addr=host;init_buf_size=;"), + ).rejects.toThrow( + "Invalid configuration, value is not set for 'init_buf_size'", ); - expect(() => SenderOptions.fromConfig("http::;")).toThrow( - "Missing '=' sign in ''", + await expect( + async () => + await SenderOptions.fromConfig("tcp::addr=host;init_buf_size=1024a;"), + ).rejects.toThrow( + "Invalid initial buffer size option, not a number: '1024a'", ); - expect(() => SenderOptions.fromConfig("http::addr=;")).toThrow( - "Invalid configuration, value is not set for 'addr'", + await expect( + async () => + await SenderOptions.fromConfig("tcp::addr=host;init_buf_size=102 4;"), + ).rejects.toThrow( + "Invalid initial buffer size option, not a number: '102 4'", ); - expect(() => - SenderOptions.fromConfig("http::addr=;username=user1;"), - ).toThrow("Invalid configuration, value is not set for 'addr'"); - expect(() => - SenderOptions.fromConfig("http::username=user1;addr=;"), - ).toThrow("Invalid configuration, value is not set for 'addr'"); - expect(() => SenderOptions.fromConfig("http::addr=:9000;")).toThrow( - "Host name is required", + await expect( + async () => + await SenderOptions.fromConfig("tcp::addr=host;init_buf_size=0;"), + ).rejects.toThrow("Invalid initial buffer size option: 0"); + }); + + it("fails if max_buf_size is not a positive integer", async function () { + await expect( + async () => + await SenderOptions.fromConfig("tcp::addr=host;max_buf_size=;"), + ).rejects.toThrow( + "Invalid configuration, value is not set for 'max_buf_size'", ); + await expect( + async () => + await SenderOptions.fromConfig("tcp::addr=host;max_buf_size=1024a;"), + ).rejects.toThrow("Invalid max buffer size option, not a number: '1024a'"); + await expect( + async () => + await SenderOptions.fromConfig("tcp::addr=host;max_buf_size=102 4;"), + ).rejects.toThrow("Invalid max buffer size option, not a number: '102 4'"); + await expect( + async () => + await SenderOptions.fromConfig("tcp::addr=host;max_buf_size=0;"), + ).rejects.toThrow("Invalid max buffer size option: 0"); + }); - const options = SenderOptions.fromConfig("http::addr=x;"); + it("rejects missing or empty hostname", async function () { + await expect( + async () => await SenderOptions.fromConfig("http::"), + ).rejects.toThrow("Invalid configuration, 'addr' is required"); + await expect( + async () => await SenderOptions.fromConfig("http::;"), + ).rejects.toThrow("Missing '=' sign in ''"); + await expect( + async () => await SenderOptions.fromConfig("http::addr=;"), + ).rejects.toThrow("Invalid configuration, value is not set for 'addr'"); + await expect( + async () => await SenderOptions.fromConfig("http::addr=;username=user1;"), + ).rejects.toThrow("Invalid configuration, value is not set for 'addr'"); + await expect( + async () => await SenderOptions.fromConfig("http::username=user1;addr=;"), + ).rejects.toThrow("Invalid configuration, value is not set for 'addr'"); + await expect( + async () => await SenderOptions.fromConfig("http::addr=:9000;"), + ).rejects.toThrow("Host name is required"); + + const options = await SenderOptions.fromConfig( + "http::addr=x;protocol_version=2", + ); expect(options.protocol).toBe("http"); expect(options.host).toBe("x"); - expect(options.host).toBe("x"); }); - it("does not default optional fields", function () { - const options = SenderOptions.fromConfig( - "https::addr=host:9000;token=abcdef123;", + it("does not default optional fields", async function () { + const options = await SenderOptions.fromConfig( + "https::addr=host:9000;token=abcdef123;protocol_version=2;", ); expect(options.protocol).toBe("https"); expect(options.token).toBe("abcdef123"); @@ -264,440 +519,657 @@ describe("Configuration string parser suite", function () { expect(options.password).toBe(undefined); }); - it("rejects invalid config value", function () { - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;username=;"), - ).toThrow("Invalid configuration, value is not set for 'username'"); + it("rejects invalid config value", async function () { + await expect( + async () => + await SenderOptions.fromConfig("http::addr=host:9000;username=;"), + ).rejects.toThrow("Invalid configuration, value is not set for 'username'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;username=user\t;"), - ).toThrow( + await expect( + async () => + await SenderOptions.fromConfig("http::addr=host:9000;username=user\t;"), + ).rejects.toThrow( "Invalid configuration, control characters are not allowed: 'user\t'", ); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;username=user\n;"), - ).toThrow( + await expect( + async () => + await SenderOptions.fromConfig("http::addr=host:9000;username=user\n;"), + ).rejects.toThrow( "Invalid configuration, control characters are not allowed: 'user\n'", ); - let options = SenderOptions.fromConfig( - "http::addr=host:9000;username=us\x7Eer;", + let options = await SenderOptions.fromConfig( + "http::addr=host:9000;username=us\x7Eer;protocol_version=2;", ); expect(options.protocol).toBe("http"); expect(options.addr).toBe("host:9000"); expect(options.username).toBe("us\x7Eer"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;username=us\x7Fer;"), - ).toThrow( + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;username=us\x7Fer;", + ), + ).rejects.toThrow( "Invalid configuration, control characters are not allowed: 'us\x7Fer'", ); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;username=us\x9Fer;"), - ).toThrow( + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;username=us\x9Fer;", + ), + ).rejects.toThrow( "Invalid configuration, control characters are not allowed: 'us\x9Fer'", ); - options = SenderOptions.fromConfig( - "http::addr=host:9000;username=us\xA0er;", + options = await SenderOptions.fromConfig( + "http::addr=host:9000;username=us\xA0er;protocol_version=2;", ); expect(options.protocol).toBe("http"); expect(options.addr).toBe("host:9000"); expect(options.username).toBe("us\xA0er"); }); - it("reject invalid config keys", function () { - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;username=user1;pass=pwd;"), - ).toThrow("Unknown configuration key: 'pass'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;user=user1;password=pwd;"), - ).toThrow("Unknown configuration key: 'user'"); - expect(() => - SenderOptions.fromConfig( - "http::addr=host:9000;username =user1;password=pwd;", - ), - ).toThrow("Unknown configuration key: 'username '"); - expect(() => - SenderOptions.fromConfig( - "http::addr=host:9000; username=user1;password=pwd;", - ), - ).toThrow("Unknown configuration key: ' username'"); - expect(() => - SenderOptions.fromConfig( - "http::addr=host:9000;user name=user1;password=pwd;", - ), - ).toThrow("Unknown configuration key: 'user name'"); - }); - - it("rejects keys without value", function () { - expect(() => SenderOptions.fromConfig("http::addr;username=user1")).toThrow( - "Missing '=' sign in 'addr'", - ); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;username;"), - ).toThrow("Missing '=' sign in 'username'"); - }); - - it("throws error if protocol is invalid", function () { - expect(() => - SenderOptions.fromConfig("::addr=host;username=user1;password=pwd;"), - ).toThrow( + it("reject invalid config keys", async function () { + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;username=user1;pass=pwd;", + ), + ).rejects.toThrow("Unknown configuration key: 'pass'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;user=user1;password=pwd;", + ), + ).rejects.toThrow("Unknown configuration key: 'user'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;username =user1;password=pwd;", + ), + ).rejects.toThrow("Unknown configuration key: 'username '"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000; username=user1;password=pwd;", + ), + ).rejects.toThrow("Unknown configuration key: ' username'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;user name=user1;password=pwd;", + ), + ).rejects.toThrow("Unknown configuration key: 'user name'"); + }); + + it("rejects keys without value", async function () { + await expect( + async () => await SenderOptions.fromConfig("http::addr;username=user1"), + ).rejects.toThrow("Missing '=' sign in 'addr'"); + await expect( + async () => + await SenderOptions.fromConfig("http::addr=host:9000;username;"), + ).rejects.toThrow("Missing '=' sign in 'username'"); + }); + + it("throws error if protocol is invalid", async function () { + await expect( + async () => + await SenderOptions.fromConfig( + "::addr=host;username=user1;password=pwd;", + ), + ).rejects.toThrow( "Invalid protocol: '', accepted protocols: 'http', 'https', 'tcp', 'tcps'", ); - expect(() => - SenderOptions.fromConfig("htt::addr=host;username=user1;password=pwd;"), - ).toThrow( + await expect( + async () => + await SenderOptions.fromConfig( + "htt::addr=host;username=user1;password=pwd;", + ), + ).rejects.toThrow( "Invalid protocol: 'htt', accepted protocols: 'http', 'https', 'tcp', 'tcps'", ); }); - it("throws error if protocol is missing", function () { - expect(() => - SenderOptions.fromConfig("addr=host;username=user1;password=pwd;"), - ).toThrow( + it("throws error if protocol is missing", async function () { + await expect( + async () => + await SenderOptions.fromConfig( + "addr=host;username=user1;password=pwd;", + ), + ).rejects.toThrow( "Missing protocol, configuration string format: 'protocol::key1=value1;key2=value2;key3=value3;'", ); - expect(() => - SenderOptions.fromConfig("https:addr=host;username=user1;password=pwd;"), - ).toThrow( + await expect( + async () => + await SenderOptions.fromConfig( + "https:addr=host;username=user1;password=pwd;", + ), + ).rejects.toThrow( "Missing protocol, configuration string format: 'protocol::key1=value1;key2=value2;key3=value3;'", ); - expect(() => - SenderOptions.fromConfig("https addr=host;username=user1;password=pwd;"), - ).toThrow( + await expect( + async () => + await SenderOptions.fromConfig( + "https addr=host;username=user1;password=pwd;", + ), + ).rejects.toThrow( "Missing protocol, configuration string format: 'protocol::key1=value1;key2=value2;key3=value3;'", ); }); - it("throws error if configuration string is missing", function () { + it("throws error if configuration string is missing", async function () { // @ts-expect-error - Testing invalid input - expect(() => SenderOptions.fromConfig()).toThrow( - "Configuration string is missing", - ); - expect(() => SenderOptions.fromConfig("")).toThrow( - "Configuration string is missing", - ); - expect(() => SenderOptions.fromConfig(null)).toThrow( - "Configuration string is missing", - ); - expect(() => SenderOptions.fromConfig(undefined)).toThrow( + await expect(async () => await SenderOptions.fromConfig()).rejects.toThrow( "Configuration string is missing", ); + await expect( + async () => await SenderOptions.fromConfig(""), + ).rejects.toThrow("Configuration string is missing"); + await expect( + async () => await SenderOptions.fromConfig(null), + ).rejects.toThrow("Configuration string is missing"); + await expect( + async () => await SenderOptions.fromConfig(undefined), + ).rejects.toThrow("Configuration string is missing"); }); - it("can parse auto_flush config", function () { - let options = SenderOptions.fromConfig( - "http::addr=host:9000;auto_flush=on;", + it("can parse auto_flush config", async function () { + let options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush=on;", ); expect(options.protocol).toBe("http"); expect(options.host).toBe("host"); expect(options.port).toBe(9000); expect(options.auto_flush).toBe(true); - options = SenderOptions.fromConfig("http::addr=host:9000;auto_flush=off;"); + options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush=off;", + ); expect(options.protocol).toBe("http"); expect(options.host).toBe("host"); expect(options.port).toBe(9000); expect(options.auto_flush).toBe(false); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush=ON;"), - ).toThrow("Invalid auto flush option: 'ON'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush=On;"), - ).toThrow("Invalid auto flush option: 'On'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush=true;"), - ).toThrow("Invalid auto flush option: 'true'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush=OFF;"), - ).toThrow("Invalid auto flush option: 'OFF'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush=Off;"), - ).toThrow("Invalid auto flush option: 'Off'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush=false;"), - ).toThrow("Invalid auto flush option: 'false'"); - }); - - it("can parse auto_flush_rows config", function () { - let options = SenderOptions.fromConfig( - "http::addr=host:9000;auto_flush_rows=123;", + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush=ON;", + ), + ).rejects.toThrow("Invalid auto flush option: 'ON'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush=On;", + ), + ).rejects.toThrow("Invalid auto flush option: 'On'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush=true;", + ), + ).rejects.toThrow("Invalid auto flush option: 'true'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush=OFF;", + ), + ).rejects.toThrow("Invalid auto flush option: 'OFF'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush=Off;", + ), + ).rejects.toThrow("Invalid auto flush option: 'Off'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush=false;", + ), + ).rejects.toThrow("Invalid auto flush option: 'false'"); + }); + + it("can parse auto_flush_rows config", async function () { + let options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_rows=123;", ); expect(options.protocol).toBe("http"); expect(options.auto_flush_rows).toBe(123); - options = SenderOptions.fromConfig( - "http::addr=host:9000;auto_flush_rows=0;", + options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_rows=0;", ); expect(options.protocol).toBe("http"); expect(options.auto_flush_rows).toBe(0); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush_rows=-123;"), - ).toThrow("Invalid auto flush rows option: -123"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush_rows=1.23;"), - ).toThrow("Invalid auto flush rows option: 1.23"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush_rows=123x;"), - ).toThrow("Invalid auto flush rows option, not a number: '123x'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush_rows=a123;"), - ).toThrow("Invalid auto flush rows option, not a number: 'a123'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush_rows=1w23;"), - ).toThrow("Invalid auto flush rows option, not a number: '1w23'"); - }); - - it("can parse auto_flush_interval config", function () { - let options = SenderOptions.fromConfig( - "http::addr=host:9000;auto_flush_interval=30", + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_rows=-123;", + ), + ).rejects.toThrow("Invalid auto flush rows option: -123"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_rows=1.23;", + ), + ).rejects.toThrow("Invalid auto flush rows option: 1.23"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_rows=123x;", + ), + ).rejects.toThrow("Invalid auto flush rows option, not a number: '123x'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_rows=a123;", + ), + ).rejects.toThrow("Invalid auto flush rows option, not a number: 'a123'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_rows=1w23;", + ), + ).rejects.toThrow("Invalid auto flush rows option, not a number: '1w23'"); + }); + + it("can parse auto_flush_interval config", async function () { + let options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_interval=30", ); expect(options.protocol).toBe("http"); expect(options.auto_flush_interval).toBe(30); - options = SenderOptions.fromConfig( - "http::addr=host:9000;auto_flush_interval=0", + options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_interval=0", ); expect(options.protocol).toBe("http"); expect(options.auto_flush_interval).toBe(0); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush_interval=-60"), - ).toThrow("Invalid auto flush interval option: -60"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush_interval=-6.0"), - ).toThrow("Invalid auto flush interval option: -6"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush_interval=60x"), - ).toThrow("Invalid auto flush interval option, not a number: '60x'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush_interval=a60"), - ).toThrow("Invalid auto flush interval option, not a number: 'a60'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;auto_flush_interval=6w0"), - ).toThrow("Invalid auto flush interval option, not a number: '6w0'"); - }); - - it("can parse tls_verify config", function () { - let options = SenderOptions.fromConfig( - "http::addr=host:9000;tls_verify=on", + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_interval=-60", + ), + ).rejects.toThrow("Invalid auto flush interval option: -60"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_interval=-6.0", + ), + ).rejects.toThrow("Invalid auto flush interval option: -6"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_interval=60x", + ), + ).rejects.toThrow( + "Invalid auto flush interval option, not a number: '60x'", + ); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_interval=a60", + ), + ).rejects.toThrow( + "Invalid auto flush interval option, not a number: 'a60'", + ); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;auto_flush_interval=6w0", + ), + ).rejects.toThrow( + "Invalid auto flush interval option, not a number: '6w0'", + ); + }); + + it("can parse tls_verify config", async function () { + let options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_verify=on", ); expect(options.protocol).toBe("http"); expect(options.host).toBe("host"); expect(options.port).toBe(9000); expect(options.tls_verify).toBe(true); - options = SenderOptions.fromConfig( - "http::addr=host:9000;tls_verify=unsafe_off", + options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_verify=unsafe_off", ); expect(options.protocol).toBe("http"); expect(options.host).toBe("host"); expect(options.port).toBe(9000); expect(options.tls_verify).toBe(false); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;tls_verify=ON"), - ).toThrow("Invalid TLS verify option: 'ON'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;tls_verify=On"), - ).toThrow("Invalid TLS verify option: 'On'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;tls_verify=true"), - ).toThrow("Invalid TLS verify option: 'true'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;tls_verify=OFF"), - ).toThrow("Invalid TLS verify option: 'OFF'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;tls_verify=Off"), - ).toThrow("Invalid TLS verify option: 'Off'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;tls_verify=UNSAFE_OFF"), - ).toThrow("Invalid TLS verify option: 'UNSAFE_OFF'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;tls_verify=Unsafe_Off"), - ).toThrow("Invalid TLS verify option: 'Unsafe_Off'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;tls_verify=false"), - ).toThrow("Invalid TLS verify option: 'false'"); - }); - - it("fails with tls_roots or tls_roots_password config", function () { - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;tls_roots=/whatever/path"), - ).toThrow( + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_verify=ON", + ), + ).rejects.toThrow("Invalid TLS verify option: 'ON'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_verify=On", + ), + ).rejects.toThrow("Invalid TLS verify option: 'On'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_verify=true", + ), + ).rejects.toThrow("Invalid TLS verify option: 'true'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_verify=OFF", + ), + ).rejects.toThrow("Invalid TLS verify option: 'OFF'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_verify=Off", + ), + ).rejects.toThrow("Invalid TLS verify option: 'Off'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_verify=UNSAFE_OFF", + ), + ).rejects.toThrow("Invalid TLS verify option: 'UNSAFE_OFF'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_verify=Unsafe_Off", + ), + ).rejects.toThrow("Invalid TLS verify option: 'Unsafe_Off'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_verify=false", + ), + ).rejects.toThrow("Invalid TLS verify option: 'false'"); + }); + + it("fails with tls_roots or tls_roots_password config", async function () { + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_roots=/whatever/path", + ), + ).rejects.toThrow( "'tls_roots' and 'tls_roots_password' options are not supported, please, use the 'tls_ca' option or the NODE_EXTRA_CA_CERTS environment variable instead", ); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;tls_roots_password=pwd"), - ).toThrow( + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;tls_roots_password=pwd", + ), + ).rejects.toThrow( "'tls_roots' and 'tls_roots_password' options are not supported, please, use the 'tls_ca' option or the NODE_EXTRA_CA_CERTS environment variable instead", ); }); - it("can parse request_min_throughput config", function () { - const options = SenderOptions.fromConfig( - "http::addr=host:9000;request_min_throughput=300", + it("can parse request_min_throughput config", async function () { + const options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_min_throughput=300", ); expect(options.protocol).toBe("http"); expect(options.request_min_throughput).toBe(300); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;request_min_throughput=0"), - ).toThrow("Invalid request min throughput option: 0"); - expect(() => - SenderOptions.fromConfig( - "http::addr=host:9000;request_min_throughput=0.5", - ), - ).toThrow("Invalid request min throughput option: 0.5"); - expect(() => - SenderOptions.fromConfig( - "http::addr=host:9000;request_min_throughput=-60", - ), - ).toThrow("Invalid request min throughput option: -60"); - expect(() => - SenderOptions.fromConfig( - "http::addr=host:9000;request_min_throughput=60x", - ), - ).toThrow("Invalid request min throughput option, not a number: '60x'"); - expect(() => - SenderOptions.fromConfig( - "http::addr=host:9000;request_min_throughput=a60", - ), - ).toThrow("Invalid request min throughput option, not a number: 'a60'"); - expect(() => - SenderOptions.fromConfig( - "http::addr=host:9000;request_min_throughput=6w0", - ), - ).toThrow("Invalid request min throughput option, not a number: '6w0'"); - }); - - it("can parse request_timeout config", function () { - const options = SenderOptions.fromConfig( - "http::addr=host:9000;request_timeout=30", + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_min_throughput=0", + ), + ).rejects.toThrow("Invalid request min throughput option: 0"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_min_throughput=0.5", + ), + ).rejects.toThrow("Invalid request min throughput option: 0.5"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_min_throughput=-60", + ), + ).rejects.toThrow("Invalid request min throughput option: -60"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_min_throughput=60x", + ), + ).rejects.toThrow( + "Invalid request min throughput option, not a number: '60x'", + ); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_min_throughput=a60", + ), + ).rejects.toThrow( + "Invalid request min throughput option, not a number: 'a60'", + ); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_min_throughput=6w0", + ), + ).rejects.toThrow( + "Invalid request min throughput option, not a number: '6w0'", + ); + }); + + it("can parse request_timeout config", async function () { + const options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_timeout=30", ); expect(options.protocol).toBe("http"); expect(options.request_timeout).toBe(30); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;request_timeout=0"), - ).toThrow("Invalid request timeout option: 0"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;request_timeout=10.32"), - ).toThrow("Invalid request timeout option: 10.32"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;request_timeout=-60"), - ).toThrow("Invalid request timeout option: -60"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;request_timeout=60x"), - ).toThrow("Invalid request timeout option, not a number: '60x'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;request_timeout=a60"), - ).toThrow("Invalid request timeout option, not a number: 'a60'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;request_timeout=6w0"), - ).toThrow("Invalid request timeout option, not a number: '6w0'"); - }); - - it("can parse retry_timeout config", function () { - let options = SenderOptions.fromConfig( - "http::addr=host:9000;retry_timeout=60", + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_timeout=0", + ), + ).rejects.toThrow("Invalid request timeout option: 0"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_timeout=10.32", + ), + ).rejects.toThrow("Invalid request timeout option: 10.32"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_timeout=-60", + ), + ).rejects.toThrow("Invalid request timeout option: -60"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_timeout=60x", + ), + ).rejects.toThrow("Invalid request timeout option, not a number: '60x'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_timeout=a60", + ), + ).rejects.toThrow("Invalid request timeout option, not a number: 'a60'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;request_timeout=6w0", + ), + ).rejects.toThrow("Invalid request timeout option, not a number: '6w0'"); + }); + + it("can parse retry_timeout config", async function () { + let options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;retry_timeout=60", ); expect(options.protocol).toBe("http"); expect(options.retry_timeout).toBe(60); - options = SenderOptions.fromConfig("http::addr=host:9000;retry_timeout=0"); + options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;retry_timeout=0", + ); expect(options.protocol).toBe("http"); expect(options.retry_timeout).toBe(0); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;retry_timeout=-60"), - ).toThrow("Invalid retry timeout option: -60"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;retry_timeout=-60.444"), - ).toThrow("Invalid retry timeout option: -60.444"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;retry_timeout=60x"), - ).toThrow("Invalid retry timeout option, not a number: '60x'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;retry_timeout=a60"), - ).toThrow("Invalid retry timeout option, not a number: 'a60'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;retry_timeout=6w0"), - ).toThrow("Invalid retry timeout option, not a number: '6w0'"); - }); - - it("can parse max_name_len config", function () { - const options = SenderOptions.fromConfig( - "http::addr=host:9000;max_name_len=30", + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;retry_timeout=-60", + ), + ).rejects.toThrow("Invalid retry timeout option: -60"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;retry_timeout=-60.444", + ), + ).rejects.toThrow("Invalid retry timeout option: -60.444"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;retry_timeout=60x", + ), + ).rejects.toThrow("Invalid retry timeout option, not a number: '60x'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;retry_timeout=a60", + ), + ).rejects.toThrow("Invalid retry timeout option, not a number: 'a60'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;retry_timeout=6w0", + ), + ).rejects.toThrow("Invalid retry timeout option, not a number: '6w0'"); + }); + + it("can parse max_name_len config", async function () { + const options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;max_name_len=30", ); expect(options.protocol).toBe("http"); expect(options.max_name_len).toBe(30); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;max_name_len=0"), - ).toThrow("Invalid max name length option: 0"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;max_name_len=10.32"), - ).toThrow("Invalid max name length option: 10.32"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;max_name_len=-60"), - ).toThrow("Invalid max name length option: -60"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;max_name_len=60x"), - ).toThrow("Invalid max name length option, not a number: '60x'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;max_name_len=a60"), - ).toThrow("Invalid max name length option, not a number: 'a60'"); - expect(() => - SenderOptions.fromConfig("http::addr=host:9000;max_name_len=6w0"), - ).toThrow("Invalid max name length option, not a number: '6w0'"); - }); - - it("can take a custom logger", function () { - const options = SenderOptions.fromConfig("http::addr=host:9000", { - log: console.log, - }); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;max_name_len=0", + ), + ).rejects.toThrow("Invalid max name length option: 0"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;max_name_len=10.32", + ), + ).rejects.toThrow("Invalid max name length option: 10.32"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;max_name_len=-60", + ), + ).rejects.toThrow("Invalid max name length option: -60"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;max_name_len=60x", + ), + ).rejects.toThrow("Invalid max name length option, not a number: '60x'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;max_name_len=a60", + ), + ).rejects.toThrow("Invalid max name length option, not a number: 'a60'"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2;max_name_len=6w0", + ), + ).rejects.toThrow("Invalid max name length option, not a number: '6w0'"); + }); + + it("can take a custom logger", async function () { + const options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2", + { + log: console.log, + }, + ); expect(options.protocol).toBe("http"); expect(options.log).toBe(console.log); - expect(() => - // @ts-expect-error - Testing invalid input - SenderOptions.fromConfig("http::addr=host:9000", { log: 1234 }), - ).toThrow("Invalid logging function"); - expect(() => - // @ts-expect-error - Testing invalid input - SenderOptions.fromConfig("http::addr=host:9000", { log: "hoppa" }), - ).toThrow("Invalid logging function"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2", + // @ts-expect-error - Testing invalid input + { log: 1234 }, + ), + ).rejects.toThrow("Invalid logging function"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2", + { + // @ts-expect-error - Testing invalid input + log: "hoppa", + }, + ), + ).rejects.toThrow("Invalid logging function"); }); - it("can take a custom agent", function () { + it("can take a custom agent", async function () { const agent = new Agent({ connect: { keepAlive: true } }); - const options = SenderOptions.fromConfig("http::addr=host:9000", { - agent: agent, - }); + const options = await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2", + { + agent: agent, + }, + ); expect(options.protocol).toBe("http"); const symbols = Object.getOwnPropertySymbols(options.agent); expect(agent[symbols[6]]).toEqual({ connect: { keepAlive: true } }); - agent.destroy(); - - expect(() => - SenderOptions.fromConfig("http::addr=host:9000", { - // @ts-expect-error - Testing invalid input - agent: { keepAlive: true }, - }), - ).toThrow("Invalid HTTP agent"); - expect(() => - // @ts-expect-error - Testing invalid input - SenderOptions.fromConfig("http::addr=host:9000", { agent: 4567 }), - ).toThrow("Invalid HTTP agent"); - expect(() => - // @ts-expect-error - Testing invalid input - SenderOptions.fromConfig("http::addr=host:9000", { agent: "hopp" }), - ).toThrow("Invalid HTTP agent"); + await agent.destroy(); + + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2", + { + // @ts-expect-error - Testing invalid input + agent: { keepAlive: true }, + }, + ), + ).rejects.toThrow("Invalid HTTP agent"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2", + // @ts-expect-error - Testing invalid input + { agent: 4567 }, + ), + ).rejects.toThrow("Invalid HTTP agent"); + await expect( + async () => + await SenderOptions.fromConfig( + "http::addr=host:9000;protocol_version=2", + { + // @ts-expect-error - Testing invalid input + agent: "hopp", + }, + ), + ).rejects.toThrow("Invalid HTTP agent"); }); }); diff --git a/test/sender.buffer.test.ts b/test/sender.buffer.test.ts index 7f00f80..4f43623 100644 --- a/test/sender.buffer.test.ts +++ b/test/sender.buffer.test.ts @@ -3,6 +3,7 @@ import { describe, it, expect } from "vitest"; import { readFileSync } from "fs"; import { Sender } from "../src"; +import { SenderOptions } from "../src/options"; describe("Client interop test suite", function () { it("runs client tests as per json test config", async function () { @@ -15,12 +16,14 @@ describe("Client interop test suite", function () { for (const testCase of testCases) { console.info(`test name: ${testCase.testName}`); - const sender = new Sender({ - protocol: "tcp", - host: "host", - auto_flush: false, - init_buf_size: 1024, - }); + const sender = new Sender( + await SenderOptions.resolveAuto({ + protocol: "tcp", + host: "host", + auto_flush: false, + init_buf_size: 1024, + }), + ); let errorMessage: string; try { @@ -94,6 +97,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws on invalid timestamp unit", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", auto_flush: false, init_buf_size: 1024, @@ -143,6 +147,7 @@ describe("Sender message builder test suite (anything not covered in client inte const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 256, }); @@ -165,6 +170,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports timestamp field as number", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -182,6 +188,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports timestamp field as ns number", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -199,6 +206,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports timestamp field as us number", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -216,6 +224,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports timestamp field as ms number", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -233,6 +242,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports timestamp field as BigInt", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -250,6 +260,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports timestamp field as ns BigInt", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -267,6 +278,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports timestamp field as us BigInt", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -284,6 +296,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports timestamp field as ms BigInt", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -301,6 +314,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws on invalid designated timestamp unit", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -320,6 +334,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports setting designated us timestamp as number from client", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -337,6 +352,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports setting designated ms timestamp as number from client", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -354,6 +370,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports setting designated timestamp as BigInt from client", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -371,6 +388,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports setting designated ns timestamp as BigInt from client", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -388,6 +406,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports setting designated us timestamp as BigInt from client", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -405,6 +424,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("supports setting designated ms timestamp as BigInt from client", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -422,6 +442,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if table name is not a string", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -435,6 +456,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if table name is too long", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -450,6 +472,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if table name is set more times", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -462,6 +485,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if symbol name is not a string", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -475,6 +499,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if symbol name is empty string", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -487,6 +512,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if column name is not a string", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -500,6 +526,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if column name is empty string", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -512,6 +539,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if column name is too long", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -530,6 +558,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if column value is not the right type", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -543,6 +572,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if adding column without setting table name", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -555,6 +585,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if adding symbol without setting table name", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -567,6 +598,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if adding symbol after columns", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -584,6 +616,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("returns null if preparing an empty buffer for send", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -597,6 +630,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("leaves unfinished rows in the sender's buffer when preparing a copy of the buffer for send", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -620,6 +654,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if a float is passed as integer field", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -632,6 +667,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if a float is passed as timestamp field", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -644,6 +680,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if designated timestamp is not an integer or bigint", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -663,6 +700,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if designated timestamp is invalid", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -680,6 +718,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("throws exception if designated timestamp is set without any fields added", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -696,6 +735,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("extends the size of the buffer if data does not fit", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 8, }); @@ -728,7 +768,7 @@ describe("Sender message builder test suite (anything not covered in client inte }); it("throws exception if tries to extend the size of the buffer above max buffer size", async function () { - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( "tcp::addr=host;init_buf_size=8;max_buf_size=64;", ); expect(bufferSize(sender)).toBe(8); @@ -761,6 +801,7 @@ describe("Sender message builder test suite (anything not covered in client inte it("is possible to clear the buffer by calling reset()", async function () { const sender = new Sender({ protocol: "tcp", + protocol_version: "1", host: "host", init_buf_size: 1024, }); diff --git a/test/sender.config.test.ts b/test/sender.config.test.ts index 8b7d4d1..ac11c77 100644 --- a/test/sender.config.test.ts +++ b/test/sender.config.test.ts @@ -7,46 +7,51 @@ import { log } from "../src/logging"; describe("Sender configuration options suite", function () { it("creates a sender from a configuration string", async function () { - await Sender.fromConfig("tcps::addr=hostname;").close(); + const sender = await Sender.fromConfig("tcps::addr=hostname;"); + await sender.close(); }); it("creates a sender from a configuration string picked up from env", async function () { - process.env.QDB_CLIENT_CONF = "https::addr=hostname;"; - await Sender.fromEnv().close(); + process.env.QDB_CLIENT_CONF = "https::addr=hostname;protocol_version=1"; + await (await Sender.fromEnv()).close(); }); it("throws exception if the username or the token is missing when TCP transport is used", async function () { - await expect( - async () => - await Sender.fromConfig("tcp::addr=hostname;username=bobo;").close(), - ).rejects.toThrow( + await expect(async () => { + const sender = await Sender.fromConfig( + "tcp::addr=hostname;username=bobo;", + ); + await sender.close(); + }).rejects.toThrow( "TCP transport requires a username and a private key for authentication, please, specify the 'username' and 'token' config options", ); - await expect( - async () => - await Sender.fromConfig("tcp::addr=hostname;token=bobo_token;").close(), - ).rejects.toThrow( + await expect(async () => { + const sender = await Sender.fromConfig( + "tcp::addr=hostname;token=bobo_token;", + ); + await sender.close(); + }).rejects.toThrow( "TCP transport requires a username and a private key for authentication, please, specify the 'username' and 'token' config options", ); }); it("throws exception if tls_roots or tls_roots_password is used", async function () { - await expect( - async () => - await Sender.fromConfig( - "tcps::addr=hostname;username=bobo;tls_roots=bla;", - ).close(), - ).rejects.toThrow( + await expect(async () => { + const sender = await Sender.fromConfig( + "tcps::addr=hostname;username=bobo;tls_roots=bla;", + ); + await sender.close(); + }).rejects.toThrow( "'tls_roots' and 'tls_roots_password' options are not supported, please, use the 'tls_ca' option or the NODE_EXTRA_CA_CERTS environment variable instead", ); - await expect( - async () => - await Sender.fromConfig( - "tcps::addr=hostname;token=bobo_token;tls_roots_password=bla;", - ).close(), - ).rejects.toThrow( + await expect(async () => { + const sender = await Sender.fromConfig( + "tcps::addr=hostname;token=bobo_token;tls_roots_password=bla;", + ); + await sender.close(); + }).rejects.toThrow( "'tls_roots' and 'tls_roots_password' options are not supported, please, use the 'tls_ca' option or the NODE_EXTRA_CA_CERTS environment variable instead", ); }); @@ -54,7 +59,9 @@ describe("Sender configuration options suite", function () { it("throws exception if connect() is called when http transport is used", async function () { let sender: Sender; await expect(async () => { - sender = Sender.fromConfig("http::addr=hostname"); + sender = await Sender.fromConfig( + "http::addr=hostname;protocol_version=2", + ); await sender.connect(); }).rejects.toThrow("'connect()' is not required for HTTP transport"); await sender.close(); @@ -108,6 +115,7 @@ describe("Sender options test suite", function () { it("sets default buffer size if init_buf_size is not set", async function () { const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", }); expect(bufferSize(sender)).toBe(DEFAULT_BUFFER_SIZE); @@ -117,6 +125,7 @@ describe("Sender options test suite", function () { it("sets the requested buffer size if init_buf_size is set", async function () { const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", init_buf_size: 1024, }); @@ -127,6 +136,7 @@ describe("Sender options test suite", function () { it("sets default buffer size if init_buf_size is set to null", async function () { const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", init_buf_size: null, }); @@ -137,6 +147,7 @@ describe("Sender options test suite", function () { it("sets default buffer size if init_buf_size is set to undefined", async function () { const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", init_buf_size: undefined, }); @@ -147,6 +158,7 @@ describe("Sender options test suite", function () { it("sets default buffer size if init_buf_size is not a number", async function () { const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", // @ts-expect-error - Testing invalid options init_buf_size: "1024", @@ -169,6 +181,7 @@ describe("Sender options test suite", function () { }; const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", // @ts-expect-error - Testing deprecated option bufferSize: 2048, @@ -192,6 +205,7 @@ describe("Sender options test suite", function () { }; const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", // @ts-expect-error - Testing deprecated option copy_buffer: false, @@ -214,6 +228,7 @@ describe("Sender options test suite", function () { }; const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", // @ts-expect-error - Testing deprecated option copyBuffer: false, @@ -223,7 +238,11 @@ describe("Sender options test suite", function () { }); it("sets default max buffer size if max_buf_size is not set", async function () { - const sender = new Sender({ protocol: "http", host: "host" }); + const sender = new Sender({ + protocol: "http", + protocol_version: "2", + host: "host", + }); expect(maxBufferSize(sender)).toBe(DEFAULT_MAX_BUFFER_SIZE); await sender.close(); }); @@ -231,6 +250,7 @@ describe("Sender options test suite", function () { it("sets the requested max buffer size if max_buf_size is set", async function () { const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", max_buf_size: 131072, }); @@ -243,6 +263,7 @@ describe("Sender options test suite", function () { async () => await new Sender({ protocol: "http", + protocol_version: "1", host: "host", max_buf_size: 8192, init_buf_size: 16384, @@ -255,6 +276,7 @@ describe("Sender options test suite", function () { it("sets default max buffer size if max_buf_size is set to null", async function () { const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", max_buf_size: null, }); @@ -265,6 +287,7 @@ describe("Sender options test suite", function () { it("sets default max buffer size if max_buf_size is set to undefined", async function () { const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", max_buf_size: undefined, }); @@ -275,6 +298,7 @@ describe("Sender options test suite", function () { it("sets default max buffer size if max_buf_size is not a number", async function () { const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", // @ts-expect-error - Testing invalid value max_buf_size: "1024", @@ -284,7 +308,11 @@ describe("Sender options test suite", function () { }); it("uses default logger if log function is not set", async function () { - const sender = new Sender({ protocol: "http", host: "host" }); + const sender = new Sender({ + protocol: "http", + protocol_version: "1", + host: "host", + }); expect(logger(sender)).toBe(log); await sender.close(); }); @@ -293,6 +321,7 @@ describe("Sender options test suite", function () { const testFunc = () => {}; const sender = new Sender({ protocol: "http", + protocol_version: "1", host: "host", log: testFunc, }); @@ -301,7 +330,12 @@ describe("Sender options test suite", function () { }); it("uses default logger if log is set to null", async function () { - const sender = new Sender({ protocol: "http", host: "host", log: null }); + const sender = new Sender({ + protocol: "http", + protocol_version: "1", + host: "host", + log: null, + }); expect(logger(sender)).toBe(log); await sender.close(); }); @@ -309,6 +343,7 @@ describe("Sender options test suite", function () { it("uses default logger if log is set to undefined", async function () { const sender = new Sender({ protocol: "http", + protocol_version: "2", host: "host", log: undefined, }); @@ -317,8 +352,13 @@ describe("Sender options test suite", function () { }); it("uses default logger if log is not a function", async function () { - // @ts-expect-error - Testing invalid options - const sender = new Sender({ protocol: "http", host: "host", log: "" }); + const sender = new Sender({ + protocol: "http", + protocol_version: "2", + host: "host", + // @ts-expect-error - Testing invalid options + log: "", + }); expect(logger(sender)).toBe(log); await sender.close(); }); @@ -330,6 +370,7 @@ describe("Sender auth config checks suite", function () { async () => await new Sender({ protocol: "tcp", + protocol_version: "2", host: "host", auth: { token: "privateKey", @@ -346,6 +387,7 @@ describe("Sender auth config checks suite", function () { async () => await new Sender({ protocol: "tcp", + protocol_version: "2", host: "host", auth: { keyId: "", diff --git a/test/sender.integration.test.ts b/test/sender.integration.test.ts index eff352e..fd5a5c7 100644 --- a/test/sender.integration.test.ts +++ b/test/sender.integration.test.ts @@ -4,6 +4,7 @@ import { GenericContainer, StartedTestContainer } from "testcontainers"; import http from "http"; import { Sender } from "../src"; +import { SenderOptions } from "../src/options"; const HTTP_OK = 200; @@ -101,11 +102,13 @@ describe("Sender tests with containerized QuestDB instance", () => { }); it("can ingest data via TCP and run queries", async () => { - const sender = new Sender({ - protocol: "tcp", - host: container.getHost(), - port: container.getMappedPort(QUESTDB_ILP_PORT), - }); + const sender = new Sender( + await SenderOptions.resolveAuto({ + protocol: "tcp", + host: container.getHost(), + port: container.getMappedPort(QUESTDB_ILP_PORT), + }), + ); await sender.connect(); const tableName = "test_tcp"; @@ -182,7 +185,7 @@ describe("Sender tests with containerized QuestDB instance", () => { { name: "timestamp", type: "TIMESTAMP" }, ]; - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${container.getHost()}:${container.getMappedPort(QUESTDB_HTTP_PORT)};auto_flush_interval=0;auto_flush_rows=1`, ); @@ -251,7 +254,7 @@ describe("Sender tests with containerized QuestDB instance", () => { { name: "timestamp", type: "TIMESTAMP" }, ]; - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${container.getHost()}:${container.getMappedPort(QUESTDB_HTTP_PORT)};auto_flush_interval=1;auto_flush_rows=0`, ); @@ -319,11 +322,13 @@ describe("Sender tests with containerized QuestDB instance", () => { }); it("does not duplicate rows if await is missing when calling flush", async () => { - const sender = new Sender({ - protocol: "tcp", - host: container.getHost(), - port: container.getMappedPort(QUESTDB_ILP_PORT), - }); + const sender = new Sender( + await SenderOptions.resolveAuto({ + protocol: "tcp", + host: container.getHost(), + port: container.getMappedPort(QUESTDB_ILP_PORT), + }), + ); await sender.connect(); const tableName = "test2"; @@ -367,7 +372,7 @@ describe("Sender tests with containerized QuestDB instance", () => { }); it("ingests all data without loss under high load with auto-flush", async () => { - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `tcp::addr=${container.getHost()}:${container.getMappedPort(QUESTDB_ILP_PORT)};auto_flush_rows=5;auto_flush_interval=1`, ); await sender.connect(); diff --git a/test/sender.transport.test.ts b/test/sender.transport.test.ts index 7f3e20e..6bf27ea 100644 --- a/test/sender.transport.test.ts +++ b/test/sender.transport.test.ts @@ -54,12 +54,12 @@ describe("Sender HTTP suite", function () { afterAll(async function () { await mockHttp.stop(); await mockHttps.stop(); - }); + }, 30000); it("can ingest via HTTP", async function () { mockHttp.reset(); - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT}`, ); await sendData(sender); @@ -71,23 +71,23 @@ describe("Sender HTTP suite", function () { it("can ingest via HTTPS", async function () { mockHttps.reset(); - const senderCertCheckFail = Sender.fromConfig( - `https::addr=${PROXY_HOST}:${MOCK_HTTPS_PORT}`, - ); - await expect(sendData(senderCertCheckFail)).rejects.toThrowError( - "self-signed certificate in certificate chain", + const senderCertCheckFail = await Sender.fromConfig( + `https::addr=${PROXY_HOST}:${MOCK_HTTPS_PORT};protocol_version=2`, ); + await expect( + async () => await sendData(senderCertCheckFail), + ).rejects.toThrowError("self-signed certificate in certificate chain"); await senderCertCheckFail.close(); - const senderWithCA = Sender.fromConfig( - `https::addr=${PROXY_HOST}:${MOCK_HTTPS_PORT};tls_ca=test/certs/ca/ca.crt`, + const senderWithCA = await Sender.fromConfig( + `https::addr=${PROXY_HOST}:${MOCK_HTTPS_PORT};protocol_version=2;tls_ca=test/certs/ca/ca.crt`, ); await sendData(senderWithCA); expect(mockHttps.numOfRequests).toEqual(1); await senderWithCA.close(); - const senderVerifyOff = Sender.fromConfig( - `https::addr=${PROXY_HOST}:${MOCK_HTTPS_PORT};tls_verify=unsafe_off`, + const senderVerifyOff = await Sender.fromConfig( + `https::addr=${PROXY_HOST}:${MOCK_HTTPS_PORT};protocol_version=2;tls_verify=unsafe_off`, ); await sendData(senderVerifyOff); expect(mockHttps.numOfRequests).toEqual(2); @@ -97,85 +97,85 @@ describe("Sender HTTP suite", function () { it("can ingest via HTTP with basic auth", async function () { mockHttp.reset({ username: "user1", password: "pwd" }); - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};username=user1;password=pwd`, ); await sendData(sender); expect(mockHttp.numOfRequests).toEqual(1); await sender.close(); - const senderFailPwd = Sender.fromConfig( + const senderFailPwd = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};username=user1;password=xyz`, ); - await expect(sendData(senderFailPwd)).rejects.toThrowError( - "HTTP request failed, statusCode=401", - ); + await expect( + async () => await sendData(senderFailPwd), + ).rejects.toThrowError("HTTP request failed, statusCode=401"); await senderFailPwd.close(); - const senderFailMissingPwd = Sender.fromConfig( + const senderFailMissingPwd = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};username=user1z`, ); - await expect(sendData(senderFailMissingPwd)).rejects.toThrowError( - "HTTP request failed, statusCode=401", - ); + await expect( + async () => await sendData(senderFailMissingPwd), + ).rejects.toThrowError("HTTP request failed, statusCode=401"); await senderFailMissingPwd.close(); - const senderFailUsername = Sender.fromConfig( + const senderFailUsername = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};username=xyz;password=pwd`, ); - await expect(sendData(senderFailUsername)).rejects.toThrowError( - "HTTP request failed, statusCode=401", - ); + await expect( + async () => await sendData(senderFailUsername), + ).rejects.toThrowError("HTTP request failed, statusCode=401"); await senderFailUsername.close(); - const senderFailMissingUsername = Sender.fromConfig( + const senderFailMissingUsername = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};password=pwd`, ); - await expect(sendData(senderFailMissingUsername)).rejects.toThrowError( - "HTTP request failed, statusCode=401", - ); + await expect( + async () => await sendData(senderFailMissingUsername), + ).rejects.toThrowError("HTTP request failed, statusCode=401"); await senderFailMissingUsername.close(); - const senderFailMissing = Sender.fromConfig( + const senderFailMissing = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT}`, ); - await expect(sendData(senderFailMissing)).rejects.toThrowError( - "HTTP request failed, statusCode=401", - ); + await expect( + async () => await sendData(senderFailMissing), + ).rejects.toThrowError("HTTP request failed, statusCode=401"); await senderFailMissing.close(); }); it("can ingest via HTTP with token auth", async function () { mockHttp.reset({ token: "abcdefghijkl123" }); - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};token=abcdefghijkl123`, ); await sendData(sender); expect(mockHttp.numOfRequests).toBe(1); await sender.close(); - const senderFailToken = Sender.fromConfig( + const senderFailToken = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};token=xyz`, ); - await expect(sendData(senderFailToken)).rejects.toThrowError( - "HTTP request failed, statusCode=401", - ); + await expect( + async () => await sendData(senderFailToken), + ).rejects.toThrowError("HTTP request failed, statusCode=401"); await senderFailToken.close(); - const senderFailMissing = Sender.fromConfig( + const senderFailMissing = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT}`, ); - await expect(sendData(senderFailMissing)).rejects.toThrowError( - "HTTP request failed, statusCode=401", - ); + await expect( + async () => await sendData(senderFailMissing), + ).rejects.toThrowError("HTTP request failed, statusCode=401"); await senderFailMissing.close(); }); it("can retry via HTTP", async function () { mockHttp.reset({ responseCodes: [204, 500, 523, 504, 500] }); - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT}`, ); await sendData(sender); @@ -192,10 +192,10 @@ describe("Sender HTTP suite", function () { responseDelays: [1000, 1000, 1000], }); - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};retry_timeout=1000`, ); - await expect(sendData(sender)).rejects.toThrowError( + await expect(async () => await sendData(sender)).rejects.toThrowError( "HTTP request timeout, no response from server in time", ); await sender.close(); @@ -209,10 +209,10 @@ describe("Sender HTTP suite", function () { responseDelays: [1000], }); - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};retry_timeout=0;request_timeout=100`, ); - await expect(sendData(sender)).rejects.toThrowError( + await expect(async () => await sendData(sender)).rejects.toThrowError( "HTTP request timeout, no response from server in time", ); await sender.close(); @@ -224,7 +224,7 @@ describe("Sender HTTP suite", function () { responseDelays: [2000, 2000], }); - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};retry_timeout=600000;request_timeout=1000`, ); await sendData(sender); @@ -240,7 +240,7 @@ describe("Sender HTTP suite", function () { const senders: Sender[] = []; const promises: Promise[] = []; for (let i = 0; i < num; i++) { - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT}`, { agent: agent }, ); @@ -261,7 +261,7 @@ describe("Sender HTTP suite", function () { mockHttp.reset(); const agent = new Agent({ pipelining: 3 }); - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT}`, { agent: agent }, ); @@ -282,7 +282,7 @@ describe("Sender HTTP suite", function () { mockHttp.reset(); const agent = new http.Agent({ maxSockets: 128 }); - const sender = Sender.fromConfig( + const sender = await Sender.fromConfig( `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};stdlib_http=on`, { agent: agent }, ); @@ -314,6 +314,7 @@ describe("Sender TCP suite", function () { async function createSender(auth: SenderOptions["auth"], secure = false) { const sender = new Sender({ protocol: secure ? "tcps" : "tcp", + protocol_version: "1", port: PROXY_PORT, host: PROXY_HOST, auth: auth, @@ -390,6 +391,7 @@ describe("Sender TCP suite", function () { const proxy = await createProxy(true); const sender = new Sender({ protocol: "tcp", + protocol_version: "1", port: PROXY_PORT, host: PROXY_HOST, jwk: JWK, @@ -470,7 +472,11 @@ describe("Sender TCP suite", function () { }); it("fails to send data if not connected", async function () { - const sender = new Sender({ protocol: "tcp", host: "localhost" }); + const sender = new Sender({ + protocol: "tcp", + protocol_version: "2", + host: "localhost", + }); await expect(async () => { await sender.table("test").symbol("location", "us").atNow(); await sender.flush(); @@ -492,6 +498,7 @@ describe("Sender TCP suite", function () { const proxy = await createProxy(true, proxyOptions); const sender = new Sender({ protocol: "tcps", + protocol_version: "1", port: PROXY_PORT, host: PROXY_HOST, auth: AUTH, @@ -506,22 +513,22 @@ describe("Sender TCP suite", function () { it("can disable the server certificate check", async function () { const proxy = await createProxy(true, proxyOptions); - const senderCertCheckFail = Sender.fromConfig( - `tcps::addr=${PROXY_HOST}:${PROXY_PORT}`, + const senderCertCheckFail = await Sender.fromConfig( + `tcps::addr=${PROXY_HOST}:${PROXY_PORT};protocol_version=1`, ); await expect( async () => await senderCertCheckFail.connect(), ).rejects.toThrow("self-signed certificate in certificate chain"); await senderCertCheckFail.close(); - const senderCertCheckOn = Sender.fromConfig( - `tcps::addr=${PROXY_HOST}:${PROXY_PORT};tls_ca=test/certs/ca/ca.crt`, + const senderCertCheckOn = await Sender.fromConfig( + `tcps::addr=${PROXY_HOST}:${PROXY_PORT};protocol_version=1;tls_ca=test/certs/ca/ca.crt`, ); await senderCertCheckOn.connect(); await senderCertCheckOn.close(); - const senderCertCheckOff = Sender.fromConfig( - `tcps::addr=${PROXY_HOST}:${PROXY_PORT};tls_verify=unsafe_off`, + const senderCertCheckOff = await Sender.fromConfig( + `tcps::addr=${PROXY_HOST}:${PROXY_PORT};protocol_version=1;tls_verify=unsafe_off`, ); await senderCertCheckOff.connect(); await senderCertCheckOff.close(); @@ -556,6 +563,7 @@ describe("Sender TCP suite", function () { const proxy = await createProxy(); const sender = new Sender({ protocol: "tcp", + protocol_version: "1", port: PROXY_PORT, host: PROXY_HOST, log: log, @@ -575,7 +583,7 @@ describe("Sender TCP suite", function () { // we expect a warning about non-flushed data at close() const expectedMessages = [ "Successfully connected to localhost:9088", - "Buffer contains data which has not been flushed before closing the sender, and it will be lost [position=54]", + `Buffer contains data which has not been flushed before closing the sender, and it will be lost [position=${"test,location=gb".length}]`, /^Connection to .*1:9088 is closed$/, ]; const log = ( @@ -589,6 +597,7 @@ describe("Sender TCP suite", function () { const proxy = await createProxy(); const sender = new Sender({ protocol: "tcp", + protocol_version: "1", port: PROXY_PORT, host: PROXY_HOST, log: log, @@ -596,12 +605,8 @@ describe("Sender TCP suite", function () { await sender.connect(); await sendData(sender); - // add another line to the buffer without calling flush() - await sender - .table("test") - .symbol("location", "gb") - .floatColumn("temperature", 16.4) - .at(1658484775000000000n, "ns"); + // write something into the buffer without calling flush() + sender.table("test").symbol("location", "gb"); // assert that only the first line was sent await assertSentData( diff --git a/test/util/mockhttp.ts b/test/util/mockhttp.ts index 45de985..35b72a0 100644 --- a/test/util/mockhttp.ts +++ b/test/util/mockhttp.ts @@ -7,6 +7,9 @@ type MockConfig = { username?: string; password?: string; token?: string; + settings?: { + config?: { "line.proto.support.versions"?: number[] }; + }; }; class MockHttp { @@ -18,7 +21,13 @@ class MockHttp { this.reset(); } - reset(mockConfig = {}) { + reset(mockConfig: MockConfig = {}) { + if (!mockConfig.settings) { + mockConfig.settings = { + config: { "line.proto.support.versions": [1, 2] }, + }; + } + this.mockConfig = mockConfig; this.numOfRequests = 0; } @@ -33,35 +42,48 @@ class MockHttp { this.server = serverCreator( options, (req: http.IncomingMessage, res: http.ServerResponse) => { - const authFailed = checkAuthHeader(this.mockConfig, req); - - const body: Uint8Array[] = []; - req.on("data", (chunk: Uint8Array) => { - body.push(chunk); - }); - - req.on("end", async () => { - console.info(`Received data: ${Buffer.concat(body)}`); - this.numOfRequests++; - - const delay = - this.mockConfig.responseDelays && - this.mockConfig.responseDelays.length > 0 - ? this.mockConfig.responseDelays.pop() - : undefined; - if (delay) { - await sleep(delay); - } - - const responseCode = authFailed - ? 401 - : this.mockConfig.responseCodes && - this.mockConfig.responseCodes.length > 0 - ? this.mockConfig.responseCodes.pop() - : 204; - res.writeHead(responseCode); - res.end(); - }); + const { url, method } = req; + if (url.startsWith("/write") && method === "POST") { + const authFailed = checkAuthHeader(this.mockConfig, req); + + const body: Uint8Array[] = []; + req.on("data", (chunk: Uint8Array) => { + body.push(chunk); + }); + + req.on("end", async () => { + console.info(`Received data: ${Buffer.concat(body)}`); + this.numOfRequests++; + + const delay = + this.mockConfig.responseDelays && + this.mockConfig.responseDelays.length > 0 + ? this.mockConfig.responseDelays.pop() + : undefined; + if (delay) { + await sleep(delay); + } + + const responseCode = authFailed + ? 401 + : this.mockConfig.responseCodes && + this.mockConfig.responseCodes.length > 0 + ? this.mockConfig.responseCodes.pop() + : 204; + res.writeHead(responseCode); + res.end(); + }); + } else if (url === "/settings" && method === "GET") { + const settingsStr = JSON.stringify(this.mockConfig.settings); + console.info(`Settings reply: ${settingsStr}`); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(settingsStr); + return; + } else { + console.info(`No handler for: ${method} ${url}`); + res.writeHead(404, { "Content-Type": "text/plain" }); + res.end("Not found"); + } }, );