Skip to content

Commit 7ce8469

Browse files
committed
Merge branch 'binary_protocol_arrays' into arrays_support
2 parents 505a9bb + 7c79739 commit 7ce8469

File tree

10 files changed

+118
-57
lines changed

10 files changed

+118
-57
lines changed

src/buffer/base.ts

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,7 @@ abstract class SenderBufferBase implements SenderBuffer {
8181
);
8282
}
8383
this.bufferSize = bufferSize;
84-
// Allocating an extra byte because Buffer.write() does not fail if the length of the data to be written is
85-
// longer than the size of the buffer. It simply just writes whatever it can, and returns.
86-
// If we can write into the extra byte, that indicates buffer overflow.
87-
// See the check in the write() function.
88-
const newBuffer = Buffer.alloc(this.bufferSize + 1, 0);
84+
const newBuffer = Buffer.alloc(this.bufferSize, 0);
8985
if (this.buffer) {
9086
this.buffer.copy(newBuffer);
9187
}
@@ -136,7 +132,7 @@ abstract class SenderBufferBase implements SenderBuffer {
136132
}
137133

138134
/**
139-
* Write the table name into the buffer.
135+
* Writes the table name into the buffer.
140136
*
141137
* @param {string} table - Table name.
142138
* @return {Sender} Returns with a reference to this sender.
@@ -156,7 +152,7 @@ abstract class SenderBufferBase implements SenderBuffer {
156152
}
157153

158154
/**
159-
* Write a symbol name and value into the buffer.
155+
* Writes a symbol name and value into the buffer.
160156
*
161157
* @param {string} name - Symbol name.
162158
* @param {unknown} value - Symbol value, toString() is called to extract the actual symbol value from the parameter.
@@ -183,7 +179,7 @@ abstract class SenderBufferBase implements SenderBuffer {
183179
}
184180

185181
/**
186-
* Write a string column with its value into the buffer.
182+
* Writes a string column with its value into the buffer.
187183
*
188184
* @param {string} name - Column name.
189185
* @param {string} value - Column value, accepts only string values.
@@ -205,7 +201,7 @@ abstract class SenderBufferBase implements SenderBuffer {
205201
}
206202

207203
/**
208-
* Write a boolean column with its value into the buffer.
204+
* Writes a boolean column with its value into the buffer.
209205
*
210206
* @param {string} name - Column name.
211207
* @param {boolean} value - Column value, accepts only boolean values.
@@ -225,18 +221,25 @@ abstract class SenderBufferBase implements SenderBuffer {
225221
}
226222

227223
/**
228-
* Write a float column with its value into the buffer.
224+
* Writes a float column with its value into the buffer.
229225
*
230226
* @param {string} name - Column name.
231227
* @param {number} value - Column value, accepts only number values.
232228
* @return {Sender} Returns with a reference to this sender.
233229
*/
234230
abstract floatColumn(name: string, value: number): SenderBuffer;
235231

232+
/**
233+
* Writes an array column with its values into the buffer.
234+
*
235+
* @param {string} name - Column name.
236+
* @param {unknown[]} value - Column value, accepts only arrays.
237+
* @return {Sender} Returns with a reference to this sender.
238+
*/
236239
abstract arrayColumn(name: string, value: unknown[]): SenderBuffer;
237240

238241
/**
239-
* Write an integer column with its value into the buffer.
242+
* Writes an integer column with its value into the buffer.
240243
*
241244
* @param {string} name - Column name.
242245
* @param {number} value - Column value, accepts only number values.
@@ -257,7 +260,7 @@ abstract class SenderBufferBase implements SenderBuffer {
257260
}
258261

259262
/**
260-
* Write a timestamp column with its value into the buffer.
263+
* Writes a timestamp column with its value into the buffer.
261264
*
262265
* @param {string} name - Column name.
263266
* @param {number | bigint} value - Epoch timestamp, accepts numbers or BigInts.
@@ -283,7 +286,7 @@ abstract class SenderBufferBase implements SenderBuffer {
283286
}
284287

285288
/**
286-
* Closing the row after writing the designated timestamp into the buffer.
289+
* Closes the row after writing the designated timestamp into the buffer.
287290
*
288291
* @param {number | bigint} timestamp - Designated epoch timestamp, accepts numbers or BigInts.
289292
* @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'.
@@ -309,7 +312,7 @@ abstract class SenderBufferBase implements SenderBuffer {
309312
}
310313

311314
/**
312-
* Closing the row without writing designated timestamp into the buffer. <br>
315+
* Closes the row without writing designated timestamp into the buffer. <br>
313316
* Designated timestamp will be populated by the server on this record.
314317
*/
315318
atNow() {
@@ -376,7 +379,6 @@ abstract class SenderBufferBase implements SenderBuffer {
376379
this.writeEscaped(name);
377380
this.write("=");
378381
writeValue();
379-
this.assertBufferOverflow();
380382
this.hasColumns = true;
381383
}
382384

@@ -488,15 +490,6 @@ abstract class SenderBufferBase implements SenderBuffer {
488490
throw new Error(`Unsupported array type [type=${type}]`);
489491
}
490492
}
491-
492-
private assertBufferOverflow() {
493-
if (this.position > this.bufferSize) {
494-
// should never happen, if checkCapacity() is correctly used
495-
throw new Error(
496-
`Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`,
497-
);
498-
}
499-
}
500493
}
501494

502495
export { SenderBufferBase };

src/buffer/bufferv1.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class SenderBufferV1 extends SenderBufferBase {
1313
}
1414

1515
/**
16-
* Write a float column with its value into the buffer using v1 serialization (text format).
16+
* Writes a float column with its value into the buffer using v1 serialization (text format).
1717
*
1818
* @param {string} name - Column name.
1919
* @param {number} value - Column value, accepts only number values.

src/buffer/bufferv2.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class SenderBufferV2 extends SenderBufferBase {
2222
}
2323

2424
/**
25-
* Write a float column with its value into the buffer using v2 serialization (binary format).
25+
* Writes a float column with its value into the buffer using v2 serialization (binary format).
2626
*
2727
* @param {string} name - Column name.
2828
* @param {number} value - Column value, accepts only number values.

src/buffer/index.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,15 @@ interface SenderBuffer {
6161
toBufferNew(pos?: number): Buffer | null;
6262

6363
/**
64-
* Write the table name into the buffer.
64+
* Writes the table name into the buffer.
6565
*
6666
* @param {string} table - Table name.
6767
* @return {Sender} Returns with a reference to this sender.
6868
*/
6969
table(table: string): SenderBuffer;
7070

7171
/**
72-
* Write a symbol name and value into the buffer.
72+
* Writes a symbol name and value into the buffer.
7373
*
7474
* @param {string} name - Symbol name.
7575
* @param {unknown} value - Symbol value, toString() is called to extract the actual symbol value from the parameter.
@@ -78,7 +78,7 @@ interface SenderBuffer {
7878
symbol(name: string, value: unknown): SenderBuffer;
7979

8080
/**
81-
* Write a string column with its value into the buffer.
81+
* Writes a string column with its value into the buffer.
8282
*
8383
* @param {string} name - Column name.
8484
* @param {string} value - Column value, accepts only string values.
@@ -87,7 +87,7 @@ interface SenderBuffer {
8787
stringColumn(name: string, value: string): SenderBuffer;
8888

8989
/**
90-
* Write a boolean column with its value into the buffer.
90+
* Writes a boolean column with its value into the buffer.
9191
*
9292
* @param {string} name - Column name.
9393
* @param {boolean} value - Column value, accepts only boolean values.
@@ -96,7 +96,7 @@ interface SenderBuffer {
9696
booleanColumn(name: string, value: boolean): SenderBuffer;
9797

9898
/**
99-
* Write a float column with its value into the buffer.
99+
* Writes a float column with its value into the buffer.
100100
*
101101
* @param {string} name - Column name.
102102
* @param {number} value - Column value, accepts only number values.
@@ -107,7 +107,7 @@ interface SenderBuffer {
107107
arrayColumn(name: string, value: unknown[]): SenderBuffer;
108108

109109
/**
110-
* Write an integer column with its value into the buffer.
110+
* Writes an integer column with its value into the buffer.
111111
*
112112
* @param {string} name - Column name.
113113
* @param {number} value - Column value, accepts only number values.
@@ -117,7 +117,7 @@ interface SenderBuffer {
117117
intColumn(name: string, value: number): SenderBuffer;
118118

119119
/**
120-
* Write a timestamp column with its value into the buffer.
120+
* Writes a timestamp column with its value into the buffer.
121121
*
122122
* @param {string} name - Column name.
123123
* @param {number | bigint} value - Epoch timestamp, accepts numbers or BigInts.
@@ -131,15 +131,15 @@ interface SenderBuffer {
131131
): SenderBuffer;
132132

133133
/**
134-
* Closing the row after writing the designated timestamp into the buffer.
134+
* Closes the row after writing the designated timestamp into the buffer.
135135
*
136136
* @param {number | bigint} timestamp - Designated epoch timestamp, accepts numbers or BigInts.
137137
* @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'.
138138
*/
139139
at(timestamp: number | bigint, unit: TimestampUnit): void;
140140

141141
/**
142-
* Closing the row without writing designated timestamp into the buffer. <br>
142+
* Closes the row without writing designated timestamp into the buffer. <br>
143143
* Designated timestamp will be populated by the server on this record.
144144
*/
145145
atNow(): void;

src/options.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
import { readFileSync } from "node:fs";
12
import { PathOrFileDescriptor } from "fs";
23
import { Agent } from "undici";
34
import http from "http";
45
import https from "https";
56

67
import { Logger } from "./logging";
7-
import { fetchJson } from "./utils";
8+
import { fetchJson, isBoolean, isInteger } from "./utils";
9+
import { DEFAULT_REQUEST_TIMEOUT } from "./transport/http/base";
810

911
const HTTP_PORT = 9000;
1012
const TCP_PORT = 9009;
@@ -234,8 +236,21 @@ class SenderOptions {
234236

235237
const url = `${options.protocol}://${options.host}:${options.port}/settings`;
236238
const settings: {
237-
config: { LINE_PROTO_SUPPORT_VERSION: number[] };
238-
} = await fetchJson(url);
239+
config: { [LINE_PROTO_SUPPORT_VERSION]: number[] };
240+
} = await fetchJson(
241+
url,
242+
isInteger(options.request_timeout, 1)
243+
? options.request_timeout
244+
: DEFAULT_REQUEST_TIMEOUT,
245+
new Agent({
246+
connect: {
247+
ca: options.tls_ca ? readFileSync(options.tls_ca) : undefined,
248+
rejectUnauthorized: isBoolean(options.tls_verify)
249+
? options.tls_verify
250+
: true,
251+
},
252+
}),
253+
);
239254
const supportedVersions: string[] = (
240255
settings.config[LINE_PROTO_SUPPORT_VERSION] ?? []
241256
).map((version: unknown) => String(version));

src/sender.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ class Sender {
183183
}
184184

185185
/**
186-
* Write the table name into the buffer of the sender.
186+
* Writes the table name into the buffer of the sender.
187187
*
188188
* @param {string} table - Table name.
189189
* @return {Sender} Returns with a reference to this sender.
@@ -194,7 +194,7 @@ class Sender {
194194
}
195195

196196
/**
197-
* Write a symbol name and value into the buffer of the sender.
197+
* Writes a symbol name and value into the buffer of the sender.
198198
*
199199
* @param {string} name - Symbol name.
200200
* @param {unknown} value - Symbol value, toString() is called to extract the actual symbol value from the parameter.
@@ -206,7 +206,7 @@ class Sender {
206206
}
207207

208208
/**
209-
* Write a string column with its value into the buffer of the sender.
209+
* Writes a string column with its value into the buffer of the sender.
210210
*
211211
* @param {string} name - Column name.
212212
* @param {string} value - Column value, accepts only string values.
@@ -218,7 +218,7 @@ class Sender {
218218
}
219219

220220
/**
221-
* Write a boolean column with its value into the buffer of the sender.
221+
* Writes a boolean column with its value into the buffer of the sender.
222222
*
223223
* @param {string} name - Column name.
224224
* @param {boolean} value - Column value, accepts only boolean values.
@@ -230,7 +230,7 @@ class Sender {
230230
}
231231

232232
/**
233-
* Write a float column with its value into the buffer of the sender.
233+
* Writes a float column with its value into the buffer of the sender.
234234
*
235235
* @param {string} name - Column name.
236236
* @param {number} value - Column value, accepts only number values.
@@ -247,7 +247,7 @@ class Sender {
247247
}
248248

249249
/**
250-
* Write an integer column with its value into the buffer of the sender.
250+
* Writes an integer column with its value into the buffer of the sender.
251251
*
252252
* @param {string} name - Column name.
253253
* @param {number} value - Column value, accepts only number values.
@@ -259,7 +259,7 @@ class Sender {
259259
}
260260

261261
/**
262-
* Write a timestamp column with its value into the buffer of the sender.
262+
* Writes a timestamp column with its value into the buffer of the sender.
263263
*
264264
* @param {string} name - Column name.
265265
* @param {number | bigint} value - Epoch timestamp, accepts numbers or BigInts.
@@ -276,7 +276,7 @@ class Sender {
276276
}
277277

278278
/**
279-
* Closing the row after writing the designated timestamp into the buffer of the sender.
279+
* Closes the row after writing the designated timestamp into the buffer of the sender.
280280
*
281281
* @param {number | bigint} timestamp - Designated epoch timestamp, accepts numbers or BigInts.
282282
* @param {string} [unit=us] - Timestamp unit. Supported values: 'ns' - nanoseconds, 'us' - microseconds, 'ms' - milliseconds. Defaults to 'us'.
@@ -289,7 +289,7 @@ class Sender {
289289
}
290290

291291
/**
292-
* Closing the row without writing designated timestamp into the buffer of the sender. <br>
292+
* Closes the row without writing designated timestamp into the buffer of the sender. <br>
293293
* Designated timestamp will be populated by the server on this record.
294294
*/
295295
async atNow() {

src/transport/http/base.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,9 @@ abstract class HttpTransportBase implements SenderTransport {
135135
abstract send(data: Buffer): Promise<boolean>;
136136
}
137137

138-
export { HttpTransportBase, RETRIABLE_STATUS_CODES, HTTP_NO_CONTENT };
138+
export {
139+
HttpTransportBase,
140+
RETRIABLE_STATUS_CODES,
141+
HTTP_NO_CONTENT,
142+
DEFAULT_REQUEST_TIMEOUT,
143+
};

src/utils.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { Agent } from "undici";
2+
13
type ArrayPrimitive = "number" | "boolean" | "string" | null;
24

35
type TimestampUnit = "ns" | "us" | "ms";
@@ -109,13 +111,26 @@ function validateArray(data: unknown[], dimensions: number[]): ArrayPrimitive {
109111
* Fetches JSON data from a URL.
110112
* @template T - The expected type of the JSON response
111113
* @param url - The URL to fetch from
114+
* @param agent - HTTP agent to be used for the request
115+
* @param timeout - Request timeout, query will be aborted if not finished in time
112116
* @returns Promise resolving to the parsed JSON data
113117
* @throws Error if the request fails or returns a non-OK status
114118
*/
115-
async function fetchJson<T>(url: string): Promise<T> {
119+
async function fetchJson<T>(
120+
url: string,
121+
timeout: number,
122+
agent: Agent,
123+
): Promise<T> {
124+
const controller = new AbortController();
125+
const { signal } = controller;
126+
setTimeout(() => controller.abort(), timeout);
127+
116128
let response: globalThis.Response;
117129
try {
118-
response = await fetch(url);
130+
response = await fetch(url, {
131+
dispatcher: agent,
132+
signal,
133+
});
119134
} catch (error) {
120135
throw new Error(`Failed to load ${url} [error=${error}]`);
121136
}

0 commit comments

Comments
 (0)