diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3dead93..7c87581 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,6 +5,8 @@ on: branches: - main pull_request: + schedule: + - cron: '15 2,10,18 * * *' jobs: test: diff --git a/src/buffer/base.ts b/src/buffer/base.ts index 9030743..d6dfa33 100644 --- a/src/buffer/base.ts +++ b/src/buffer/base.ts @@ -10,6 +10,7 @@ import { DEFAULT_MAX_BUFFER_SIZE, } from "./index"; import { + ArrayPrimitive, isInteger, timestampToMicros, timestampToNanos, @@ -232,6 +233,8 @@ abstract class SenderBufferBase implements SenderBuffer { */ abstract floatColumn(name: string, value: number): SenderBuffer; + abstract arrayColumn(name: string, value: unknown[]): SenderBuffer; + /** * Write an integer column with its value into the buffer. * @@ -373,36 +376,60 @@ abstract class SenderBufferBase implements SenderBuffer { this.writeEscaped(name); this.write("="); writeValue(); + this.assertBufferOverflow(); this.hasColumns = true; } protected write(data: string) { this.position += this.buffer.write(data, this.position); - if (this.position > this.bufferSize) { - // should never happen, if checkCapacity() is correctly used - throw new Error( - `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, - ); - } } protected writeByte(data: number) { this.position = this.buffer.writeInt8(data, this.position); - if (this.position > this.bufferSize) { - // should never happen, if checkCapacity() is correctly used - throw new Error( - `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, - ); - } + } + + protected writeInt(data: number) { + this.position = this.buffer.writeInt32LE(data, this.position); } protected writeDouble(data: number) { this.position = this.buffer.writeDoubleLE(data, this.position); - if (this.position > this.bufferSize) { - // should never happen, if checkCapacity() is correctly used - throw new Error( - `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, - ); + } + + protected writeArray( + arr: unknown[], + dimensions: number[], + type: ArrayPrimitive, + ) { + this.checkCapacity([], 1 + dimensions.length * 4); + this.writeByte(dimensions.length); + for (let i = 0; i < dimensions.length; i++) { + this.writeInt(dimensions[i]); + } + + this.checkCapacity([], SenderBufferBase.arraySize(dimensions, type)); + this.writeArrayValues(arr, dimensions); + } + + private writeArrayValues(arr: unknown[], dimensions: number[]) { + if (Array.isArray(arr[0])) { + for (let i = 0; i < arr.length; i++) { + this.writeArrayValues(arr[i] as unknown[], dimensions); + } + } else { + const type = typeof arr[0]; + switch (type) { + case "number": + for (let i = 0; i < arr.length; i++) { + this.position = this.buffer.writeDoubleLE( + arr[i] as number, + this.position, + ); + } + break; + default: + throw new Error(`Unsupported array type [type=${type}]`); + } } } @@ -442,6 +469,34 @@ abstract class SenderBufferBase implements SenderBuffer { } } } + + private static arraySize(dimensions: number[], type: ArrayPrimitive): number { + let numOfElements = 1; + for (let i = 0; i < dimensions.length; i++) { + numOfElements *= dimensions[i]; + } + + switch (type) { + case "number": + return numOfElements * 8; + case "boolean": + return numOfElements; + case "string": + // in case of string[] capacity check is done separately for each array element + return 0; + default: + throw new Error(`Unsupported array type [type=${type}]`); + } + } + + private assertBufferOverflow() { + if (this.position > this.bufferSize) { + // should never happen, if checkCapacity() is correctly used + throw new Error( + `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, + ); + } + } } export { SenderBufferBase }; diff --git a/src/buffer/bufferv1.ts b/src/buffer/bufferv1.ts index ff36fb3..2297c44 100644 --- a/src/buffer/bufferv1.ts +++ b/src/buffer/bufferv1.ts @@ -32,6 +32,10 @@ class SenderBufferV1 extends SenderBufferBase { ); return this; } + + arrayColumn(): SenderBuffer { + throw new Error("Arrays are not supported in protocol v1"); + } } export { SenderBufferV1 }; diff --git a/src/buffer/bufferv2.ts b/src/buffer/bufferv2.ts index 3e812d8..4aebdb5 100644 --- a/src/buffer/bufferv2.ts +++ b/src/buffer/bufferv2.ts @@ -2,8 +2,14 @@ import { SenderOptions } from "../options"; import { SenderBuffer } from "./index"; import { SenderBufferBase } from "./base"; +import { getDimensions, validateArray } from "../utils"; +const COLUMN_TYPE_DOUBLE: number = 10; +const COLUMN_TYPE_NULL: number = 33; + +const ENTITY_TYPE_ARRAY: number = 14; const ENTITY_TYPE_DOUBLE: number = 16; + const EQUALS_SIGN: number = "=".charCodeAt(0); /** @@ -36,6 +42,29 @@ class SenderBufferV2 extends SenderBufferBase { ); return this; } + + arrayColumn(name: string, value: unknown[]): SenderBuffer { + const dimensions = getDimensions(value); + const type = validateArray(value, dimensions); + // only number arrays and NULL supported for now + if (type !== "number" && type !== null) { + throw new Error(`Unsupported array type [type=${type}]`); + } + + this.writeColumn(name, value, () => { + this.checkCapacity([], 3); + this.writeByte(EQUALS_SIGN); + this.writeByte(ENTITY_TYPE_ARRAY); + + if (!value) { + this.writeByte(COLUMN_TYPE_NULL); + } else { + this.writeByte(COLUMN_TYPE_DOUBLE); + this.writeArray(value, dimensions, type); + } + }); + return this; + } } export { SenderBufferV2 }; diff --git a/src/buffer/index.ts b/src/buffer/index.ts index cb72692..42aa36f 100644 --- a/src/buffer/index.ts +++ b/src/buffer/index.ts @@ -104,6 +104,8 @@ interface SenderBuffer { */ floatColumn(name: string, value: number): SenderBuffer; + arrayColumn(name: string, value: unknown[]): SenderBuffer; + /** * Write an integer column with its value into the buffer. * diff --git a/src/sender.ts b/src/sender.ts index bd41d89..149ff92 100644 --- a/src/sender.ts +++ b/src/sender.ts @@ -241,6 +241,11 @@ class Sender { return this; } + arrayColumn(name: string, value: unknown[]): Sender { + this.buffer.arrayColumn(name, value); + return this; + } + /** * Write an integer column with its value into the buffer of the sender. * diff --git a/src/utils.ts b/src/utils.ts index 0867b2c..69ec7f6 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,3 +1,5 @@ +type ArrayPrimitive = "number" | "boolean" | "string" | null; + type TimestampUnit = "ns" | "us" | "ms"; function isBoolean(value: unknown): value is boolean { @@ -36,6 +38,73 @@ function timestampToNanos(timestamp: bigint, unit: TimestampUnit) { } } +function getDimensions(data: unknown) { + const dimensions: number[] = []; + while (Array.isArray(data)) { + if (data.length === 0) { + throw new Error("Zero length array not supported"); + } + dimensions.push(data.length); + data = data[0]; + } + return dimensions; +} + +function validateArray(data: unknown[], dimensions: number[]): ArrayPrimitive { + if (data === null || data === undefined) { + return null; + } + if (!Array.isArray(data)) { + throw new Error( + `The value must be an array [value=${JSON.stringify(data)}, type=${typeof data}]`, + ); + } + + let expectedType: ArrayPrimitive = null; + + function checkArray( + array: unknown[], + depth: number = 0, + path: string = "", + ): void { + const expectedLength = dimensions[depth]; + if (array.length !== expectedLength) { + throw new Error( + `Length of arrays do not match [expected=${expectedLength}, actual=${array.length}, dimensions=[${dimensions}], path=${path}]`, + ); + } + + if (depth < dimensions.length - 1) { + // intermediate level, expecting arrays + for (let i = 0; i < array.length; i++) { + if (!Array.isArray(array[i])) { + throw new Error( + `Mixed types found [expected=array, current=${typeof array[i]}, path=${path}[${i}]]`, + ); + } + checkArray(array[i] as unknown[], depth + 1, `${path}[${i}]`); + } + } else { + // leaf level, expecting primitives + if (expectedType === null) { + expectedType = typeof array[0] as ArrayPrimitive; + } + + for (let i = 0; i < array.length; i++) { + const currentType = typeof array[i] as ArrayPrimitive; + if (currentType !== expectedType) { + throw new Error( + `Mixed types found [expected=${expectedType}, current=${currentType}, path=${path}[${i}]]`, + ); + } + } + } + } + + checkArray(data); + return expectedType; +} + /** * Fetches JSON data from a URL. * @template T - The expected type of the JSON response @@ -66,4 +135,7 @@ export { timestampToNanos, TimestampUnit, fetchJson, + getDimensions, + validateArray, + ArrayPrimitive, }; diff --git a/test/sender.buffer.test.ts b/test/sender.buffer.test.ts index 4f43623..89dfeac 100644 --- a/test/sender.buffer.test.ts +++ b/test/sender.buffer.test.ts @@ -167,6 +167,245 @@ describe("Sender message builder test suite (anything not covered in client inte await sender.close(); }); + it("does not support arrays with protocol v1", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "1", + host: "host", + init_buf_size: 1024, + }); + expect(() => + sender.table("tableName").arrayColumn("arrayCol", [12.3, 23.4]), + ).toThrow("Arrays are not supported in protocol v1"); + await sender.close(); + }); + + it("supports arrays with protocol v2", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + await sender + .table("tableName") + .arrayColumn("arrayCol", [12.3, 23.4]) + .atNow(); + expect(bufferContentHex(sender)).toBe( + toHex("tableName arrayCol==") + + " 0e 0a 01 02 00 00 00 9a 99 99 99 99 99 28 40 66 66 66 66 66 66 37 40 " + + toHex("\n"), + ); + await sender.close(); + }); + + it("supports multidimensional arrays with protocol v2", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + await sender + .table("tableName") + .arrayColumn("arrayCol", [[12.3], [23.4]]) + .atNow(); + expect(bufferContentHex(sender)).toBe( + toHex("tableName arrayCol==") + + " 0e 0a 02 02 00 00 00 01 00 00 00 9a 99 99 99 99 99 28 40 66 66 66 66 66 66 37 40 " + + toHex("\n"), + ); + await sender.close(); + }); + + it("does not accept empty array", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + sender.table("tableName"); + expect(() => sender.arrayColumn("arrayCol", [])).toThrow( + "Zero length array not supported", + ); + expect(() => sender.arrayColumn("arrayCol", [[], []])).toThrow( + "Zero length array not supported", + ); + await sender.close(); + }); + + it("does not accept irregularly sized array", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + expect(() => + sender.table("tableName").arrayColumn("arrayCol", [ + [ + [1.1, 2.2], + [3.3, 4.4], + [5.5, 6.6], + ], + [ + [1.1, 2.2], + [3.3, 4.4], + [5.5, 6.6], + ], + [ + [1.1, 2.2], + [3.3, 4.4], + [5.5, 6.6], + ], + [[1.1, 2.2], [3.3], [5.5, 6.6]], + ]), + ).toThrow( + "Length of arrays do not match [expected=2, actual=1, dimensions=[4,3,2], path=[3][1]]", + ); + await sender.close(); + }); + + it("does not accept non-homogenous array", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + sender.table("tableName"); + expect(() => + sender.arrayColumn("arrayCol", [ + [ + [1.1, 2.2], + [3.3, 4.4], + [5.5, 6.6], + ], + [ + [1.1, 2.2], + [3.3, 4.4], + [5.5, 6.6], + ], + [ + [1.1, 2.2], + [3.3, 4.4], + [5.5, 6.6], + ], + [ + [1.1, 2.2], + [3.3, "4.4"], + [5.5, 6.6], + ], + ]), + ).toThrow( + "Mixed types found [expected=number, current=string, path=[3][1][1]]", + ); + expect(() => + sender.arrayColumn("arrayCol", [ + [ + [1.1, 2.2], + [3.3, 4.4], + [5.5, 6.6], + ], + [ + [1.1, 2.2], + [3.3, 4.4], + [5.5, 6.6], + ], + [ + [1.1, 2.2], + [3.3, 4.4], + [5.5, 6.6], + ], + [[1.1, 2.2], 3.3, [5.5, 6.6]], + ]), + ).toThrow( + "Mixed types found [expected=array, current=number, path=[3][1]]", + ); + await sender.close(); + }); + + it("does not accept unsupported types", async function () { + const sender = new Sender({ + protocol: "http", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + sender.table("tableName"); + expect(() => sender.arrayColumn("col", ["str"])).toThrow( + "Unsupported array type [type=string]", + ); + expect(() => sender.arrayColumn("col", [true])).toThrow( + "Unsupported array type [type=boolean]", + ); + expect(() => sender.arrayColumn("col", [{}])).toThrow( + "Unsupported array type [type=object]", + ); + expect(() => sender.arrayColumn("col", [null])).toThrow( + "Unsupported array type [type=object]", + ); + expect(() => sender.arrayColumn("col", [undefined])).toThrow( + "Unsupported array type [type=undefined]", + ); + await sender.close(); + }); + + it("does not accept non-array types", async function () { + const sender = new Sender({ + protocol: "http", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + sender.table("tableName"); + // @ts-expect-error - Testing invalid input + expect(() => sender.arrayColumn("col", 12.345)).toThrow( + "The value must be an array [value=12.345, type=number]", + ); + // @ts-expect-error - Testing invalid input + expect(() => sender.arrayColumn("col", 42)).toThrow( + "The value must be an array [value=42, type=number]", + ); + // @ts-expect-error - Testing invalid input + expect(() => sender.arrayColumn("col", "str")).toThrow( + 'The value must be an array [value="str", type=string]', + ); + // @ts-expect-error - Testing invalid input + expect(() => sender.arrayColumn("col", "")).toThrow( + 'The value must be an array [value="", type=string]', + ); + // @ts-expect-error - Testing invalid input + expect(() => sender.arrayColumn("col", true)).toThrow( + "The value must be an array [value=true, type=boolean]", + ); + // @ts-expect-error - Testing invalid input + expect(() => sender.arrayColumn("col", {})).toThrow( + "The value must be an array [value={}, type=object]", + ); + await sender.close(); + }); + + it("supports arrays with NULL value", async function () { + const sender = new Sender({ + protocol: "http", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + await sender.table("tableName").arrayColumn("arrayCol", undefined).atNow(); + await sender.table("tableName").arrayColumn("arrayCol", null).atNow(); + expect(bufferContentHex(sender)).toBe( + toHex("tableName arrayCol==") + + " 0e 21 " + + toHex("\ntableName arrayCol==") + + " 0e 21 " + + toHex("\n"), + ); + await sender.close(); + }); + it("supports timestamp field as number", async function () { const sender = new Sender({ protocol: "tcp", @@ -838,6 +1077,21 @@ function bufferContent(sender: Sender) { return sender.buffer.toBufferView().toString(); } +function bufferContentHex(sender: Sender) { + // @ts-expect-error - Accessing private field + return toHexString(sender.buffer.toBufferView()); +} + +function toHex(str: string) { + return toHexString(Buffer.from(str)); +} + +function toHexString(buffer: Buffer) { + return Array.from(buffer) + .map((b) => b.toString(16).padStart(2, "0")) + .join(" "); +} + function bufferSize(sender: Sender) { // @ts-expect-error - Accessing private field return sender.buffer.bufferSize;