Skip to content

Commit 942d22b

Browse files
committed
feat: added event for per-request tracking
1 parent 4e0b251 commit 942d22b

File tree

5 files changed

+136
-23
lines changed

5 files changed

+136
-23
lines changed

src/chain.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,19 @@ export interface HandlerOpts {
2727
ipFamily?: number;
2828
dnsLookup?: typeof dns['lookup'];
2929
customTag?: unknown;
30+
requestId: string;
31+
id: number;
3032
}
3133

3234
interface ChainOpts {
3335
request: { url?: string };
3436
sourceSocket: Socket;
3537
head?: Buffer;
3638
handlerOpts: HandlerOpts;
37-
server: EventEmitter & { log: (connectionId: unknown, str: string) => void };
39+
server: EventEmitter & {
40+
log: (connectionId: unknown, str: string) => void,
41+
emit: (event: string, ...args: any[]) => boolean,
42+
};
3843
isPlain: boolean;
3944
}
4045

@@ -166,6 +171,14 @@ export const chain = (
166171
// We need to enable flowing, otherwise the socket would remain open indefinitely.
167172
// Nothing would consume the data, we just want to close the socket.
168173
targetSocket.on('close', () => {
174+
const { requestId, id: connectionId } = handlerOpts;
175+
176+
server.emit('requestFinished', {
177+
id: requestId,
178+
request,
179+
connectionId,
180+
customTag,
181+
});
169182
sourceSocket.resume();
170183

171184
if (sourceSocket.writable) {

src/direct.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,19 @@ export interface HandlerOpts {
1111
localAddress?: string;
1212
ipFamily?: number;
1313
dnsLookup?: typeof dns['lookup'];
14+
customTag?: unknown;
15+
requestId: string;
16+
id: number;
1417
}
1518

1619
interface DirectOpts {
17-
request: { url?: string };
20+
request: { url?: string, [key: string]: any };
1821
sourceSocket: Socket;
1922
head: Buffer;
20-
server: EventEmitter & { log: (connectionId: unknown, str: string) => void };
23+
server: EventEmitter & {
24+
log: (connectionId: unknown, str:string) => void,
25+
emit: (event: string, ...args: any[]) => boolean,
26+
};
2127
handlerOpts: HandlerOpts;
2228
}
2329

@@ -79,6 +85,15 @@ export const direct = (
7985
// We need to enable flowing, otherwise the socket would remain open indefinitely.
8086
// Nothing would consume the data, we just want to close the socket.
8187
targetSocket.on('close', () => {
88+
const { requestId, customTag, id: connectionId } = handlerOpts;
89+
90+
server.emit('requestFinished', {
91+
id: requestId,
92+
request,
93+
connectionId,
94+
customTag,
95+
});
96+
8297
sourceSocket.resume();
8398

8499
if (sourceSocket.writable) {

src/forward.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type dns from 'node:dns';
2+
import type { EventEmitter } from 'node:events';
23
import http from 'node:http';
34
import https from 'node:https';
45
import stream from 'node:stream';
@@ -29,6 +30,10 @@ export interface HandlerOpts {
2930
localAddress?: string;
3031
ipFamily?: number;
3132
dnsLookup?: typeof dns['lookup'];
33+
requestId: string;
34+
customTag?: unknown;
35+
id: number;
36+
server: EventEmitter;
3237
}
3338

3439
/**
@@ -122,6 +127,22 @@ export const forward = async (
122127

123128
: http.request(origin!, options as unknown as http.RequestOptions, requestCallback);
124129

130+
response.once('close', () => {
131+
const {
132+
requestId,
133+
customTag,
134+
id: connectionId,
135+
server,
136+
} = handlerOpts;
137+
138+
server.emit('requestFinished', {
139+
id: requestId,
140+
request,
141+
connectionId,
142+
customTag,
143+
});
144+
});
145+
125146
client.once('socket', (socket: SocketWithPreviousStats) => {
126147
// Socket can be re-used by multiple requests.
127148
// That's why we need to track the previous stats.

src/server.ts

Lines changed: 80 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/* eslint-disable no-use-before-define */
22
import { Buffer } from 'node:buffer';
3+
import { randomUUID } from 'node:crypto';
34
import type dns from 'node:dns';
45
import { EventEmitter } from 'node:events';
56
import http from 'node:http';
@@ -47,9 +48,22 @@ export type ConnectionStats = {
4748
trgRxBytes: number | null;
4849
};
4950

51+
export type RequestStats = {
52+
/** Total bytes received from the client. */
53+
srcRxBytes: number,
54+
/** Total bytes sent to the client. */
55+
srcTxBytes: number,
56+
/** Total bytes received from the target. */
57+
trgRxBytes: number | null,
58+
/** Total bytes sent to the target. */
59+
trgTxBytes: number | null,
60+
};
61+
5062
type HandlerOpts = {
5163
server: Server;
5264
id: number;
65+
requestId: string;
66+
startTime: number;
5367
srcRequest: http.IncomingMessage;
5468
srcResponse: http.ServerResponse | null;
5569
srcHead: Buffer | null;
@@ -75,6 +89,18 @@ export type PrepareRequestFunctionOpts = {
7589
isHttp: boolean;
7690
};
7791

92+
export type RequestBypassedData = {
93+
id: string;
94+
request: http.IncomingMessage;
95+
connectionId: number;
96+
customTag?: unknown;
97+
};
98+
99+
export type RequestFinishedData = RequestBypassedData & {
100+
stats: RequestStats;
101+
response?: http.IncomingMessage;
102+
};
103+
78104
export type PrepareRequestFunctionResult = {
79105
customResponseFunction?: CustomResponseOpts['customResponseFunction'];
80106
customConnectServer?: http.Server | null;
@@ -93,8 +119,11 @@ export type PrepareRequestFunction = (opts: PrepareRequestFunctionOpts) => Promi
93119

94120
/**
95121
* Represents the proxy server.
96-
* It emits the 'requestFailed' event on unexpected request errors, with the following parameter `{ error, request }`.
97-
* It emits the 'connectionClosed' event when connection to proxy server is closed, with parameter `{ connectionId, stats }`.
122+
*
123+
* It emits the `requestFailed` event on unexpected request errors, with the following parameter `{ error, request }`.
124+
* It emits the `connectionClosed` event when connection to proxy server is closed, with parameter `{ connectionId, stats }`.
125+
* It emits the `requestBypassed` event when a request is bypassed, with parameter `RequestBypassedData`.
126+
* It emits the `requestFinished` event when a request is finished, with parameter `RequestFinishedData`.
98127
*/
99128
export class Server extends EventEmitter {
100129
port: number;
@@ -271,13 +300,20 @@ export class Server extends EventEmitter {
271300
async onRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
272301
try {
273302
const handlerOpts = await this.prepareRequestHandling(request);
303+
274304
handlerOpts.srcResponse = response;
275305

276306
const { proxyChainId } = request.socket as Socket;
277307

278308
if (handlerOpts.customResponseFunction) {
279309
this.log(proxyChainId, 'Using handleCustomResponse()');
280310
await handleCustomResponse(request, response, handlerOpts as CustomResponseOpts);
311+
this.emit('requestBypassed', {
312+
id: handlerOpts.requestId,
313+
request,
314+
connectionId: handlerOpts.id,
315+
customTag: handlerOpts.customTag,
316+
});
281317
return;
282318
}
283319

@@ -310,6 +346,12 @@ export class Server extends EventEmitter {
310346
if (handlerOpts.customConnectServer) {
311347
socket.unshift(head); // See chain.ts for why we do this
312348
await customConnect(socket, handlerOpts.customConnectServer);
349+
this.emit('requestBypassed', {
350+
id: handlerOpts.requestId,
351+
request,
352+
connectionId: handlerOpts.id,
353+
customTag: handlerOpts.customTag,
354+
});
313355
return;
314356
}
315357

@@ -336,9 +378,15 @@ export class Server extends EventEmitter {
336378
* @see {prepareRequestHandling}
337379
*/
338380
getHandlerOpts(request: http.IncomingMessage): HandlerOpts {
381+
const requestId = randomUUID();
382+
// Casing does not matter, but we do it to avoid breaking changes.
383+
request.headers['request-id'] = requestId;
384+
339385
const handlerOpts: HandlerOpts = {
340386
server: this,
341387
id: (request.socket as Socket).proxyChainId!,
388+
requestId,
389+
startTime: Date.now(),
342390
srcRequest: request,
343391
srcHead: null,
344392
trgParsed: null,
@@ -504,20 +552,31 @@ export class Server extends EventEmitter {
504552
* @param error
505553
*/
506554
failRequest(request: http.IncomingMessage, error: NodeJS.ErrnoException): void {
507-
const { proxyChainId } = request.socket as Socket;
555+
this.emit('requestFailed', {
556+
request,
557+
error,
558+
});
508559

509-
if (error.name === 'RequestError') {
510-
const typedError = error as RequestError;
560+
const { srcResponse } = (request as any).handlerOpts as HandlerOpts;
511561

512-
this.log(proxyChainId, `Request failed (status ${typedError.statusCode}): ${error.message}`);
513-
this.sendSocketResponse(request.socket, typedError.statusCode, typedError.headers, error.message);
514-
} else {
515-
this.log(proxyChainId, `Request failed with error: ${error.stack || error}`);
516-
this.sendSocketResponse(request.socket, 500, {}, 'Internal error in proxy server');
517-
this.emit('requestFailed', { error, request });
562+
if (!request.socket) {
563+
return;
518564
}
519565

520-
this.log(proxyChainId, 'Closing because request failed with error');
566+
if (request.socket.destroyed) {
567+
return;
568+
}
569+
570+
// If the request was not handled yet, we need to close the socket.
571+
// The client will get an empty response.
572+
if (srcResponse && !srcResponse.headersSent) {
573+
// We need to wait for the client to send the full request, otherwise it may get ECONNRESET.
574+
// This is particularly important for HTTP CONNECT, because the client sends the first data packet
575+
// along with the request headers.
576+
request.on('end', () => request.socket.end());
577+
// If the client never sends the full request, the socket will timeout and close.
578+
request.resume();
579+
}
521580
}
522581

523582
/**
@@ -607,22 +666,23 @@ export class Server extends EventEmitter {
607666
}
608667

609668
/**
610-
* Gets data transfer statistics of a specific proxy connection.
669+
* Returns the statistics of a specific connection.
670+
* @param connectionId The ID of the connection.
671+
* @returns The statistics object, or undefined if the connection does not exist.
611672
*/
612673
getConnectionStats(connectionId: number): ConnectionStats | undefined {
613674
const socket = this.connections.get(connectionId);
614-
if (!socket) return undefined;
615675

616-
const targetStats = getTargetStats(socket);
676+
if (!socket) return;
617677

618-
const result = {
678+
const { bytesWritten, bytesRead } = getTargetStats(socket);
679+
680+
return {
619681
srcTxBytes: socket.bytesWritten,
620682
srcRxBytes: socket.bytesRead,
621-
trgTxBytes: targetStats.bytesWritten,
622-
trgRxBytes: targetStats.bytesRead,
683+
trgTxBytes: bytesWritten,
684+
trgRxBytes: bytesRead,
623685
};
624-
625-
return result;
626686
}
627687

628688
/**

src/tcp_tunnel_tools.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { randomUUID } from 'node:crypto';
12
import net from 'node:net';
23
import { URL } from 'node:url';
34

@@ -71,6 +72,9 @@ export async function createTunnel(
7172
handlerOpts: {
7273
upstreamProxyUrlParsed: parsedProxyUrl,
7374
ignoreUpstreamProxyCertificate: options?.ignoreProxyCertificate ?? false,
75+
requestId: randomUUID(),
76+
customTag: undefined,
77+
id: -1,
7478
},
7579
server: server as net.Server & { log: typeof log },
7680
isPlain: true,

0 commit comments

Comments
 (0)