diff --git a/README.md b/README.md index c0e03d1..611d24f 100644 --- a/README.md +++ b/README.md @@ -13,10 +13,86 @@ This project provides tools to convert HEP packets into GigAPI Line Protocol. - Maintains the same column structure and table as hep - Parses SIP payload data to extract useful fields - Can be used as a library or as a standalone server -- Supports both TCP and UDP for HEP packet reception +- Supports all of the following for HEP packet reception: + - UDP + - TCP + - HTTP/2 (optional, POST endpoint for raw HEPv3 binary) - Batch processing for efficient InfluxDB writes - Optional file output for debugging or offline processing +## Quickstart: All Protocols (Local Development) + +```js +import HepToInfluxDBServer from './hep-server.js'; + +const server = new HepToInfluxDBServer({ + hepPort: 9060, // UDP/TCP port + hepBindAddress: '0.0.0.0', + influxDbUrl: 'http://localhost:7971', + influxDbDatabase: 'hep', + batchSize: 1000, + flushInterval: 5000, + debug: true, + // Enable HTTP/2 HEPv3 ingestion (optional) + http2Port: 8080, // HTTP/2 port + http2BindAddress: '0.0.0.0', // HTTP/2 bind address + http2Endpoint: '/test/api', // HTTP/2 POST endpoint +}); + +server.initialize().catch(console.error); +``` + +- **UDP/TCP**: Send HEPv3 packets to `hepPort` (default: 9060) +- **HTTP/2**: POST raw HEPv3 binary to `http://localhost:8080/test/api` (see below) + +### HTTP/2 HEPv3 Ingestion Example + +If enabled, you can POST raw HEPv3 binary packets to the configured HTTP/2 endpoint: + +```http +POST /test/api HTTP/2 +Host: localhost:8080 +Content-Type: application/octet-stream + + +``` + +Example with curl (using HTTP/2): + +```bash +curl --http2-prior-knowledge -X POST \ + -H "Content-Type: application/octet-stream" \ + --data-binary @packet.bin \ + http://localhost:8080/test/api +``` + +## Configuration Options + +| Option | Description | Default | +|------------------|-----------------------------------------------------|-------------| +| hepPort | Port to listen for HEP packets (UDP/TCP) | 9060 | +| hepBindAddress | Address to bind HEP server (UDP/TCP) | 0.0.0.0 | +| influxDbUrl | InfluxDB server URL | http://localhost:7971 | +| influxDbDatabase | InfluxDB database name | hep | +| batchSize | Number of records to batch before sending | 1000 | +| flushInterval | Maximum time between flushes (ms) | 5000 | +| maxBufferSize | Maximum buffer size before forced flush | 10000 | +| debug | Enable debug logging | false | +| writeToFile | Save Line Protocol to files | false | +| outputDir | Directory for output files | ./data | +| http2Port | Port for HTTP/2 HEPv3 ingestion (optional) | undefined | +| http2BindAddress | Address for HTTP/2 server (optional) | undefined | +| http2Endpoint | HTTP/2 POST endpoint path (optional) | undefined | + +- **To enable HTTP/2 ingestion, set all three: `http2Port`, `http2BindAddress`, and `http2Endpoint`.** +- If not set, HTTP/2 server will not start. + +## Testing All Sockets Locally + +- **UDP**: Use a HEP generator (e.g., hepgen) or a tool that can send HEPv3 packets to `localhost:9060` (UDP). +- **TCP**: Use a HEP generator or netcat to send HEPv3 packets to `localhost:9060` (TCP). +- **HTTP/2**: Use the curl example above, or any HTTP/2 client, to POST a raw HEPv3 binary to `http://localhost:8080/test/api`. + ## Components 1. **hep-proto**: Core library for converting HEP packets to Line Protocol @@ -121,13 +197,36 @@ const server = new HepToInfluxDBServer({ influxDbDatabase: 'hep', batchSize: 1000, flushInterval: 5000, - debug: true + debug: true, + // Enable HTTP/2 HEPv3 ingestion (optional) + http2Port: 8080, // HTTP/2 port + http2BindAddress: '0.0.0.0', // HTTP/2 bind address + http2Endpoint: '/test/api', // HTTP/2 POST endpoint }); server.initialize().catch(console.error); ``` -## Configuration Options +### HTTP/2 HEPv3 Ingestion Example + +If enabled, you can POST raw HEPv3 binary packets to the configured HTTP/2 endpoint: + +```http +POST /test/api HTTP/2 +Host: localhost:8080 +Content-Type: application/octet-stream + + +``` + +Example with curl (using HTTP/2): + +```bash +curl --http2-prior-knowledge -X POST \ + -H "Content-Type: application/octet-stream" \ + --data-binary @packet.bin \ + http://localhost:8080/test/api +``` ### HepToInfluxDBServer diff --git a/bun.lock b/bun.lock new file mode 100644 index 0000000..a32a1fa --- /dev/null +++ b/bun.lock @@ -0,0 +1,93 @@ +{ + "lockfileVersion": 1, + "workspaces": { + "": { + "name": "hep2gig", + "dependencies": { + "@duckdb/node-api": "^1.2.2-alpha.18", + "axios": "^1.8.4", + "hep-js": "^1.0.22", + "parsip": "^1.0.7", + }, + }, + }, + "packages": { + "@duckdb/node-api": ["@duckdb/node-api@1.2.2-alpha.19", "", { "dependencies": { "@duckdb/node-bindings": "1.2.2-alpha.19" } }, "sha512-qH/PqK+Kp7TDZJU5SdS9+2jrYOJGHME4NcDsgrcbZk7fKTlXsDI8f8F0CnPxGCsBD35B+BEgphz9TTLoj1/vNA=="], + + "@duckdb/node-bindings": ["@duckdb/node-bindings@1.2.2-alpha.19", "", { "optionalDependencies": { "@duckdb/node-bindings-darwin-arm64": "1.2.2-alpha.19", "@duckdb/node-bindings-darwin-x64": "1.2.2-alpha.19", "@duckdb/node-bindings-linux-arm64": "1.2.2-alpha.19", "@duckdb/node-bindings-linux-x64": "1.2.2-alpha.19", "@duckdb/node-bindings-win32-x64": "1.2.2-alpha.19" } }, "sha512-EZpUPCPNJaQVmI3jWa/SIrs7lV9SJYS5a1bV3SRSQ6ZBJYFMrr2Mik2RjCVepA9cIwD2KIrXaWV7jiSVM29zJA=="], + + "@duckdb/node-bindings-darwin-arm64": ["@duckdb/node-bindings-darwin-arm64@1.2.2-alpha.19", "", { "os": "darwin", "cpu": "arm64" }, "sha512-r+MDv7XFxl4eCZEthmHWV2mxnczUFVc+w2cH0KzdTWmCgtjT3jpDhEulDekOG9SP9lFvfsTRTRgKatE2TTqu6w=="], + + "@duckdb/node-bindings-darwin-x64": ["@duckdb/node-bindings-darwin-x64@1.2.2-alpha.19", "", { "os": "darwin", "cpu": "x64" }, "sha512-WibHtKNQVDFr8AQumML9HLdgkYIRiC0B+MSrLvfojcrYmupoHB0Nwyn/T2UoTXNnTsn3NOrZECHUZGwRFmmF9g=="], + + "@duckdb/node-bindings-linux-arm64": ["@duckdb/node-bindings-linux-arm64@1.2.2-alpha.19", "", { "os": "linux", "cpu": "arm64" }, "sha512-TfQ/k4YMAsDFpI5Y/nvA7yXs7KqzKsOZX4XzCyqWdfNPDQvheW+wRCxJVUE8kWqniMep62EkvbuX94yFtTZb8A=="], + + "@duckdb/node-bindings-linux-x64": ["@duckdb/node-bindings-linux-x64@1.2.2-alpha.19", "", { "os": "linux", "cpu": "x64" }, "sha512-KDGD50H/sA/HWZFVuZhZTVZEKB3N8+ZPhHxxPs67p46WNm/fiQ6/FlaLasQ/gEmOY+I3bWcwjrba4h6LhFXnAQ=="], + + "@duckdb/node-bindings-win32-x64": ["@duckdb/node-bindings-win32-x64@1.2.2-alpha.19", "", { "os": "win32", "cpu": "x64" }, "sha512-jsHu/tORVBfJ6QQ02c1vVKEJM6aVSNwwmFtd1G7n9HGjJeE1Wclewpfp3k7R7qU9vxtQtHoAMHeTVsnC2d3ymQ=="], + + "asynckit": ["asynckit@0.4.0", "", {}, "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q=="], + + "axios": ["axios@1.10.0", "", { "dependencies": { "follow-redirects": "^1.15.6", "form-data": "^4.0.0", "proxy-from-env": "^1.1.0" } }, "sha512-/1xYAC4MP/HEG+3duIhFr4ZQXR4sQXOIe+o6sdqzeykGLx6Upp/1p8MHqhINOvGeP7xyNHe7tsiJByc4SSVUxw=="], + + "binary-parser": ["binary-parser@1.9.2", "", { "dependencies": { "fast-text-encoding": "^1.0.3" } }, "sha512-aQvTapiRIV/x7bAaR2KT3Ptyjff4R8i1zxljgw/Je2kUt0babNPUdtuo7AKN4CWklyzPK4u5e99Z9/3Ib03u9w=="], + + "call-bind-apply-helpers": ["call-bind-apply-helpers@1.0.2", "", { "dependencies": { "es-errors": "^1.3.0", "function-bind": "^1.1.2" } }, "sha512-Sp1ablJ0ivDkSzjcaJdxEunN5/XvksFJ2sMBFfq6x0ryhQV/2b/KwFe21cMpmHtPOSij8K99/wSfoEuTObmuMQ=="], + + "combined-stream": ["combined-stream@1.0.8", "", { "dependencies": { "delayed-stream": "~1.0.0" } }, "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg=="], + + "debug": ["debug@4.4.1", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-KcKCqiftBJcZr++7ykoDIEwSa3XWowTfNPo92BYxjXiyYEVrUQh2aLyhxBCwww+heortUFxEJYcRzosstTEBYQ=="], + + "delayed-stream": ["delayed-stream@1.0.0", "", {}, "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ=="], + + "dunder-proto": ["dunder-proto@1.0.1", "", { "dependencies": { "call-bind-apply-helpers": "^1.0.1", "es-errors": "^1.3.0", "gopd": "^1.2.0" } }, "sha512-KIN/nDJBQRcXw0MLVhZE9iQHmG68qAVIBg9CqmUYjmQIhgij9U5MFvrqkUL5FbtyyzZuOeOt0zdeRe4UY7ct+A=="], + + "es-define-property": ["es-define-property@1.0.1", "", {}, "sha512-e3nRfgfUZ4rNGL232gUgX06QNyyez04KdjFrF+LTRoOXmrOgFKDg4BCdsjW8EnT69eqdYGmRpJwiPVYNrCaW3g=="], + + "es-errors": ["es-errors@1.3.0", "", {}, "sha512-Zf5H2Kxt2xjTvbJvP2ZWLEICxA6j+hAmMzIlypy4xcBg1vKVnx89Wy0GbS+kf5cwCVFFzdCFh2XSCFNULS6csw=="], + + "es-object-atoms": ["es-object-atoms@1.1.1", "", { "dependencies": { "es-errors": "^1.3.0" } }, "sha512-FGgH2h8zKNim9ljj7dankFPcICIK9Cp5bm+c2gQSYePhpaG5+esrLODihIorn+Pe6FGJzWhXQotPv73jTaldXA=="], + + "es-set-tostringtag": ["es-set-tostringtag@2.1.0", "", { "dependencies": { "es-errors": "^1.3.0", "get-intrinsic": "^1.2.6", "has-tostringtag": "^1.0.2", "hasown": "^2.0.2" } }, "sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA=="], + + "fast-text-encoding": ["fast-text-encoding@1.0.6", "", {}, "sha512-VhXlQgj9ioXCqGstD37E/HBeqEGV/qOD/kmbVG8h5xKBYvM1L3lR1Zn4555cQ8GkYbJa8aJSipLPndE1k6zK2w=="], + + "follow-redirects": ["follow-redirects@1.15.9", "", {}, "sha512-gew4GsXizNgdoRyqmyfMHyAmXsZDk6mHkSxZFCzW9gwlbtOW44CDtYavM+y+72qD/Vq2l550kMF52DT8fOLJqQ=="], + + "form-data": ["form-data@4.0.3", "", { "dependencies": { "asynckit": "^0.4.0", "combined-stream": "^1.0.8", "es-set-tostringtag": "^2.1.0", "hasown": "^2.0.2", "mime-types": "^2.1.12" } }, "sha512-qsITQPfmvMOSAdeyZ+12I1c+CKSstAFAwu+97zrnWAbIr5u8wfsExUzCesVLC8NgHuRUqNN4Zy6UPWUTRGslcA=="], + + "function-bind": ["function-bind@1.1.2", "", {}, "sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA=="], + + "get-intrinsic": ["get-intrinsic@1.3.0", "", { "dependencies": { "call-bind-apply-helpers": "^1.0.2", "es-define-property": "^1.0.1", "es-errors": "^1.3.0", "es-object-atoms": "^1.1.1", "function-bind": "^1.1.2", "get-proto": "^1.0.1", "gopd": "^1.2.0", "has-symbols": "^1.1.0", "hasown": "^2.0.2", "math-intrinsics": "^1.1.0" } }, "sha512-9fSjSaos/fRIVIp+xSJlE6lfwhES7LNtKaCBIamHsjr2na1BiABJPo0mOjjz8GJDURarmCPGqaiVg5mfjb98CQ=="], + + "get-proto": ["get-proto@1.0.1", "", { "dependencies": { "dunder-proto": "^1.0.1", "es-object-atoms": "^1.0.0" } }, "sha512-sTSfBjoXBp89JvIKIefqw7U2CCebsc74kiY6awiGogKtoSGbgjYE/G/+l9sF3MWFPNc9IcoOC4ODfKHfxFmp0g=="], + + "gopd": ["gopd@1.2.0", "", {}, "sha512-ZUKRh6/kUFoAiTAtTYPZJ3hw9wNxx+BIBOijnlG9PnrJsCcSjs1wyyD6vJpaYtgnzDrKYRSqf3OO6Rfa93xsRg=="], + + "has-symbols": ["has-symbols@1.1.0", "", {}, "sha512-1cDNdwJ2Jaohmb3sg4OmKaMBwuC48sYni5HUw2DvsC8LjGTLK9h+eb1X6RyuOHe4hT0ULCW68iomhjUoKUqlPQ=="], + + "has-tostringtag": ["has-tostringtag@1.0.2", "", { "dependencies": { "has-symbols": "^1.0.3" } }, "sha512-NqADB8VjPFLM2V0VvHUewwwsw0ZWBaIdgo+ieHtK3hasLz4qeCRjYcqfB6AQrBggRKppKF8L52/VqdVsO47Dlw=="], + + "hasown": ["hasown@2.0.2", "", { "dependencies": { "function-bind": "^1.1.2" } }, "sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ=="], + + "hep-js": ["hep-js@1.0.22", "", { "dependencies": { "binary-parser": "^1.3.2", "mixin-deep": "^2.0.1" } }, "sha512-o5VQpr50FLFjJlFIEaq0voYMItHVWUEASnGyVf7bWxV1Ew9agFFEtSjeMDVby1UJEHf2RZkpnferGHPIP6QGDA=="], + + "jwt-decode": ["jwt-decode@4.0.0", "", {}, "sha512-+KJGIyHgkGuIq3IEBNftfhW/LfWhXUIY6OmyVWjliu5KH1y0fw7VQ8YndE2O4qZdMSd9SqbnC8GOcZEy0Om7sA=="], + + "math-intrinsics": ["math-intrinsics@1.1.0", "", {}, "sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g=="], + + "mime-db": ["mime-db@1.52.0", "", {}, "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg=="], + + "mime-types": ["mime-types@2.1.35", "", { "dependencies": { "mime-db": "1.52.0" } }, "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw=="], + + "mixin-deep": ["mixin-deep@2.0.1", "", {}, "sha512-imbHQNRglyaplMmjBLL3V5R6Bfq5oM+ivds3SKgc6oRtzErEnBUUc5No11Z2pilkUvl42gJvi285xTNswcKCMA=="], + + "ms": ["ms@2.1.3", "", {}, "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA=="], + + "parsip": ["parsip@1.0.7", "", { "dependencies": { "debug": "^4.3.4", "jwt-decode": "^4.0.0", "sdp-transform": "^2.4.0" } }, "sha512-XFIQaFKVu1vMBYF36mQRn08Ns8RYW+D2NQZqqxDE5ju7CQp5OItmL+g4fxCHW57PV3uvPkck/9aX9A5IVZfIgg=="], + + "proxy-from-env": ["proxy-from-env@1.1.0", "", {}, "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg=="], + + "sdp-transform": ["sdp-transform@2.15.0", "", { "bin": { "sdp-verify": "checker.js" } }, "sha512-KrOH82c/W+GYQ0LHqtr3caRpM3ITglq3ljGUIb8LTki7ByacJZ9z+piSGiwZDsRyhQbYBOBJgr2k6X4BZXi3Kw=="], + } +} diff --git a/example.js b/example.js index c2fdfdb..35a7363 100644 --- a/example.js +++ b/example.js @@ -85,3 +85,125 @@ async function main() { // Run the example main().catch(console.error); + +// --- Smarter Socket Test Section --- +async function isPortOpen(port, host, protocol = 'tcp') { + if (protocol === 'tcp') { + const net = await import('net'); + return await new Promise((resolve) => { + const socket = net.createConnection({ port, host }); + socket.setTimeout(1000); + socket.on('connect', () => { + socket.destroy(); + resolve(true); + }); + socket.on('timeout', () => { + socket.destroy(); + resolve(false); + }); + socket.on('error', () => { + resolve(false); + }); + }); + } else if (protocol === 'udp') { + // UDP is connectionless, so we just try to send + return true; + } + return false; +} + +async function testAllSocketsSmart() { + const packet = createTestHepPacket(); + const udpPort = 9060; + const tcpPort = 9060; + const http2Port = 8080; + const http2Endpoint = '/test/api'; + const host = '127.0.0.1'; + const results = []; + + // UDP Test + try { + const udpOpen = await isPortOpen(udpPort, host, 'udp'); + if (!udpOpen) { + results.push({ protocol: 'UDP', port: udpPort, status: 'DOWN', details: 'UDP port not open' }); + } else { + const dgram = await import('dgram'); + const udpClient = dgram.createSocket('udp4'); + await new Promise((resolve, reject) => { + udpClient.send(packet, udpPort, host, (err) => { + if (err) reject(err); + else resolve(); + }); + }); + udpClient.close(); + results.push({ protocol: 'UDP', port: udpPort, status: 'SENT', details: 'Packet sent (no response expected)' }); + } + } catch (err) { + results.push({ protocol: 'UDP', port: udpPort, status: 'FAIL', details: err.message }); + } + + // TCP Test + try { + const tcpOpen = await isPortOpen(tcpPort, host, 'tcp'); + if (!tcpOpen) { + results.push({ protocol: 'TCP', port: tcpPort, status: 'DOWN', details: 'TCP port not open' }); + } else { + const net = await import('net'); + await new Promise((resolve, reject) => { + const client = net.createConnection({ port: tcpPort, host }, () => { + client.write(packet); + client.end(); + resolve(); + }); + client.on('error', reject); + }); + results.push({ protocol: 'TCP', port: tcpPort, status: 'SENT', details: 'Packet sent (no response expected)' }); + } + } catch (err) { + results.push({ protocol: 'TCP', port: tcpPort, status: 'FAIL', details: err.message }); + } + + // HTTP/2 Test + try { + const http2Open = await isPortOpen(http2Port, host, 'tcp'); + if (!http2Open) { + results.push({ protocol: 'HTTP/2', port: http2Port, status: 'DOWN', details: 'HTTP/2 port not open' }); + } else { + let fetchImpl = globalThis.fetch; + if (!fetchImpl) fetchImpl = (await import('node-fetch')).default; + const url = `http://${host}:${http2Port}${http2Endpoint}`; + const res = await fetchImpl(url, { + method: 'POST', + headers: { 'Content-Type': 'application/octet-stream' }, + body: packet + }); + const body = await res.text(); + if (res.status === 200) { + results.push({ protocol: 'HTTP/2', port: http2Port, status: 'OK', details: `200 OK: ${body}` }); + } else { + results.push({ protocol: 'HTTP/2', port: http2Port, status: 'FAIL', details: `${res.status}: ${body}` }); + } + } + } catch (err) { + results.push({ protocol: 'HTTP/2', port: http2Port, status: 'FAIL', details: err.message }); + } + + // Print summary table + console.log('\n=== Socket Test Summary ==='); + console.log('| Protocol | Port | Status | Details'); + console.log('|----------|-------|--------|------------------------------------------'); + for (const r of results) { + console.log(`| ${r.protocol.padEnd(8)} | ${String(r.port).padEnd(5)} | ${r.status.padEnd(6)} | ${r.details}`); + } + console.log('===========================================\n'); +} + +// If run with 'test' argument, run the smarter socket test +if (process.argv.includes('test')) { + testAllSocketsSmart().then(() => { + process.exit(0); + }).catch((err) => { + console.error('Socket test error:', err); + process.exit(1); + }); +} diff --git a/hep-server.js b/hep-server.js index abdf05e..b265eba 100644 --- a/hep-server.js +++ b/hep-server.js @@ -23,7 +23,11 @@ class HepToInfluxDBServer { maxBufferSize: config.maxBufferSize || process.env.MAX_BUFFER || 10000, debug: config.debug || false, writeToFile: config.writeToFile || false, - outputDir: config.outputDir || './data' + outputDir: config.outputDir || './data', + // HTTP/2 HEPv3 ingestion options + http2Port: config.http2Port || process.env.HTTP2_PORT, + http2BindAddress: config.http2BindAddress || process.env.HTTP2_HOST, + http2Endpoint: config.http2Endpoint || process.env.HTTP2_ENDPOINT }; this.buffer = []; @@ -54,6 +58,11 @@ class HepToInfluxDBServer { // Start the server await this.startServer(); + // Conditionally start HTTP/2 HEPv3 ingestion server + if (this.config.http2Port && this.config.http2Endpoint) { + this.startHttp2Server(); + } + // Set up the flush interval this.flushIntervalId = setInterval(() => { this.conditionalFlush(); @@ -240,6 +249,43 @@ class HepToInfluxDBServer { }; } + /** + * Start HTTP/2 HEPv3 ingestion server (Bun-specific) + * Only enabled if http2Port and http2Endpoint are set in config + */ + startHttp2Server() { + const port = this.config.http2Port; + const host = this.config.http2BindAddress || '0.0.0.0'; + const endpoint = this.config.http2Endpoint; + this.http2Server = Bun.serve({ + port, + hostname: host, + http2: true, + fetch: async (req) => { + const url = new URL(req.url); + if (req.method === 'POST' && url.pathname === endpoint) { + try { + // Accept only application/octet-stream + if (req.headers.get('content-type') !== 'application/octet-stream') { + return new Response('Unsupported Media Type', { status: 415 }); + } + // Convert request body to Buffer + const data = Buffer.from(await req.arrayBuffer()); + // Dummy socket for logging/statistics + const dummySocket = { remoteAddress: req.headers.get('x-forwarded-for') || req.headers.get('host') }; + this.handleData(data, dummySocket); + return new Response('OK', { status: 200 }); + } catch (err) { + if (this.config.debug) console.error('HEPv3 HTTP/2 error:', err); + return new Response('HEPv3 Error', { status: 400 }); + } + } + return new Response('Not Found', { status: 404 }); + } + }); + console.log(`HEP HTTP/2 Server listening on ${host}:${port}${endpoint}`); + } + /** * Shutdown the server */ @@ -274,6 +320,16 @@ class HepToInfluxDBServer { } } + // Stop HTTP/2 server if running + if (this.http2Server) { + try { + this.http2Server.stop(true); + this.http2Server = null; + } catch (error) { + console.error('Error stopping HTTP/2 server:', error); + } + } + console.log('Server shutdown complete'); // Final stats @@ -287,7 +343,11 @@ class HepToInfluxDBServer { if (require.main === module) { const server = new HepToInfluxDBServer({ debug: true, - writeToFile: false + writeToFile: false, + // Example: enable HTTP/2 HEPv3 ingestion + // http2Port: 8080, + // http2BindAddress: '0.0.0.0', + // http2Endpoint: '/test/api', }); server.initialize().catch(error => { diff --git a/test/hep-server.integration.test.js b/test/hep-server.integration.test.js new file mode 100644 index 0000000..55348f1 --- /dev/null +++ b/test/hep-server.integration.test.js @@ -0,0 +1,123 @@ +import { spawn } from 'child_process'; +import { test, expect, beforeAll, afterAll } from 'bun:test'; +import net from 'net'; +import dgram from 'dgram'; + +const TEST_UDP_PORT = 19060; +const TEST_TCP_PORT = 19060; +const TEST_HTTP2_PORT = 18080; +const TEST_HTTP2_ENDPOINT = '/test/api'; +const HOST = '127.0.0.1'; + +let serverProcess; + +function waitForPort(port, host, timeout = 5000) { + return new Promise((resolve, reject) => { + const start = Date.now(); + function tryConnect() { + const socket = net.createConnection({ port, host }, () => { + socket.destroy(); + resolve(); + }); + socket.on('error', () => { + if (Date.now() - start > timeout) { + reject(new Error(`Timeout waiting for port ${port}`)); + } else { + setTimeout(tryConnect, 100); + } + }); + } + tryConnect(); + }); +} + +beforeAll(async () => { + // Start the server as a child process + serverProcess = spawn('bun', ['hep-server.js'], { + env: { + ...process.env, + PORT: TEST_UDP_PORT, + HOST: HOST, + HTTP2_PORT: TEST_HTTP2_PORT, + HTTP2_HOST: HOST, + HTTP2_ENDPOINT: TEST_HTTP2_ENDPOINT, + DEBUG: '1', + INFLUX_DBURL: 'http://localhost:7971', // dummy + INFLUXB_DBNAME: 'hep', + BATCH_SIZE: '1', // flush immediately for test + FLUSH_INTERVAL: '1000', + }, + stdio: ['ignore', 'pipe', 'pipe'] + }); + + // Wait for all ports to be open + await waitForPort(TEST_UDP_PORT, HOST); + await waitForPort(TEST_TCP_PORT, HOST); + await waitForPort(TEST_HTTP2_PORT, HOST); +}); + +afterAll(async () => { + if (serverProcess) { + serverProcess.kill('SIGINT'); + await new Promise((resolve) => serverProcess.on('exit', resolve)); + } +}); + +function createTestHepPacket() { + const hepjs = require('hep-js'); + const rcinfo = { + type: 'HEP', + version: 3, + payload_type: 1, // SIP + captureId: 2001, + capturePass: 'myHep', + srcIp: '192.168.1.1', + dstIp: '192.168.1.2', + srcPort: 5060, + dstPort: 5060, + timeSeconds: Math.floor(Date.now() / 1000), + timeUseconds: (Date.now() % 1000) * 1000, + proto_type: 1, // SIP + }; + const payload = 'INVITE sip:alice@example.com SIP/2.0\r\n...'; + return hepjs.encapsulate(payload, rcinfo); +} + +test('UDP ingestion', async () => { + const packet = createTestHepPacket(); + const udpClient = dgram.createSocket('udp4'); + await new Promise((resolve, reject) => { + udpClient.send(packet, TEST_UDP_PORT, HOST, (err) => { + udpClient.close(); + if (err) reject(err); + else resolve(); + }); + }); + expect(true).toBe(true); // If no error, test passes +}); + +test('TCP ingestion', async () => { + const packet = createTestHepPacket(); + await new Promise((resolve, reject) => { + const client = net.createConnection({ port: TEST_TCP_PORT, host: HOST }, () => { + client.write(packet); + client.end(); + resolve(); + }); + client.on('error', reject); + }); + expect(true).toBe(true); +}); + +test('HTTP/2 ingestion', async () => { + const packet = createTestHepPacket(); + const url = `http://${HOST}:${TEST_HTTP2_PORT}${TEST_HTTP2_ENDPOINT}`; + const res = await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/octet-stream' }, + body: packet + }); + const body = await res.text(); + expect(res.status).toBe(200); + expect(body).toBe('OK'); +}); \ No newline at end of file