diff --git a/.changeset/ecint-6417-alerts-daemon.md b/.changeset/ecint-6417-alerts-daemon.md new file mode 100644 index 00000000..f35c3a4c --- /dev/null +++ b/.changeset/ecint-6417-alerts-daemon.md @@ -0,0 +1,5 @@ +--- +"nansen-cli": minor +--- + +Add alerts daemon for real-time WebSocket alert streaming (`nansen alerts daemon run/start/stop/status/logs`) diff --git a/skills/nansen-alerts-daemon/SKILL.md b/skills/nansen-alerts-daemon/SKILL.md new file mode 100644 index 00000000..6f1678be --- /dev/null +++ b/skills/nansen-alerts-daemon/SKILL.md @@ -0,0 +1,97 @@ +--- +name: nansen-alerts-daemon +description: Manage the alerts daemon — run, start, stop, status, logs. Use when setting up real-time alert streaming via WebSocket or managing the background daemon process. +metadata: + openclaw: + requires: + env: + - NANSEN_API_KEY + bins: + - nansen + primaryEnv: NANSEN_API_KEY + install: + - kind: node + package: nansen-cli + bins: [nansen] +allowed-tools: Bash(nansen:*) +--- + +# Alerts Daemon + +Persistent WebSocket client that connects outbound to Nansen's alert stream. +No public URL or inbound ports needed — works behind any NAT/firewall. + +## Quick Reference + +```bash +nansen alerts daemon run # Foreground, NDJSON on stdout +nansen alerts daemon start --action '' # Background, runs per alert +nansen alerts daemon stop # Stop background daemon +nansen alerts daemon status # Running? Last alert? PID? +nansen alerts daemon logs # Tail daemon log +``` + +## Architecture + +``` +Nansen WS Server ←──outbound── nansen alerts daemon + │ │ + │ push alert JSON ├── stdout (NDJSON) + │ └── --action hook (shell cmd) +``` + +The daemon connects to `wss://api.nansen.ai/v1/smart-alert/stream` using your API key. +On disconnect, it reconnects with exponential backoff (5s → 300s max) and backfills +missed alerts from the `/past-alerts` REST endpoint. + +## Options (run / start) + +| Flag | Description | +|------|-------------| +| `--action ` | Shell command per alert. Alert JSON on stdin. Supports `{alertId}`, `{alertName}`, `{alertType}`, `{firedAt}` placeholders. | +| `--action-env` | Pass alert JSON as `NANSEN_ALERT` env var instead of stdin | +| `--no-backfill` | Skip past-alert replay on (re)connect | +| `--ws-url ` | Override WebSocket URL (e.g. local mock server) | +| `--state-file ` | State JSON path (default: `~/.nansen/alerts-daemon-state.json`) | +| `--pid-file ` | PID file path (default: `~/.nansen/alerts-daemon.pid`) | +| `--log-file ` | Log file path (default: `~/.nansen/alerts-daemon.log`) | + +## Examples + +```bash +# Pipe alerts to jq for filtering +nansen alerts daemon run | jq 'select(.alertType == "sm-token-flows")' + +# Wake up an agent on every alert +nansen alerts daemon start --action 'openclaw inject --message "Alert: {alertName}"' + +# Pass alert data as env var +nansen alerts daemon start --action 'process-alert.sh' --action-env + +# Test with local mock server +node src/daemon/mock-server.js & +nansen alerts daemon run --ws-url ws://localhost:9876/v1/smart-alert/stream + +# Check status +nansen alerts daemon status --pretty +``` + +## Output Format + +Each alert is emitted as a single JSON line (NDJSON): + +```json +{"type":"alert","alertId":"abc123","alertName":"ETH SM Inflow >5M","alertType":"sm-token-flows","firedAt":"2026-03-20T11:46:00Z","data":{...}} +``` + +## State File + +`~/.nansen/alerts-daemon-state.json` tracks `lastAlertAt` for backfill on reconnect. +Written atomically (tmp + rename). Permissions: `0600`. + +## Notes + +- Requires a valid Nansen API key (`nansen login` or `NANSEN_API_KEY` env var) +- Node 22+ has built-in WebSocket; older versions need `npm install ws` +- The `--action` hook runs each alert as a subprocess — not `eval` — preventing shell injection from alert data +- Log files contain only alert metadata (ID, name, type, timestamp), not data payloads diff --git a/src/__tests__/alerts-daemon.test.js b/src/__tests__/alerts-daemon.test.js new file mode 100644 index 00000000..65d788fe --- /dev/null +++ b/src/__tests__/alerts-daemon.test.js @@ -0,0 +1,318 @@ +/** + * Unit tests for AlertsDaemon + * Uses an in-process mock WebSocket server to verify: + * - connection & auth + * - alert dispatch + stdout emission + * - ping/pong keepalive + * - reconnect on close + * - backfill on reconnect + * - state persistence + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { EventEmitter } from 'events'; +import { AlertsDaemon } from '../daemon/alerts-daemon.js'; + +// ── Helpers ──────────────────────────────────────────────────────────────────── + +function makeAlert(overrides = {}) { + return { + type: 'alert', + alertId: 'test-alert-001', + alertName: 'Test Alert', + alertType: 'sm-token-flows', + firedAt: new Date().toISOString(), + data: { chain: 'ethereum' }, + ...overrides, + }; +} + +function makeDaemon(opts = {}) { + class MockWS extends EventEmitter { + constructor() { + super(); + this.readyState = 1; + this.send = vi.fn(); + this.close = vi.fn((code) => { + this.readyState = 3; + this.emit('close', code ?? 1000, ''); + }); + setImmediate(() => this.emit('open')); + } + } + + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: async () => ({ alerts: [], count: 0 }), + }); + + const daemon = new AlertsDaemon({ + apiKey: 'test-key', + wsUrl: 'ws://localhost:9876/v1/smart-alert/stream', + restUrl: 'http://localhost:9876/api/v1/smart-alert/past-alerts', + stateFile: '/tmp/test-daemon-state.json', + backfill: false, + WebSocket: MockWS, + fetchFn: mockFetch, + log: vi.fn(), + ...opts, + }); + + return { daemon, MockWS, mockFetch }; +} + +// ── Tests ────────────────────────────────────────────────────────────────────── + +describe('AlertsDaemon', () => { + let originalStdoutWrite; + let stdoutLines = []; + + beforeEach(() => { + stdoutLines = []; + originalStdoutWrite = process.stdout.write.bind(process.stdout); + process.stdout.write = (data) => { + if (typeof data === 'string') stdoutLines.push(data); + return true; + }; + }); + + afterEach(() => { + process.stdout.write = originalStdoutWrite; + }); + + it('throws if no apiKey provided', () => { + expect(() => new AlertsDaemon({})).toThrow('apiKey is required'); + }); + + it('emits "connected" event on WS open + connected message', async () => { + class AutoMockWS extends EventEmitter { + constructor() { + super(); + this.readyState = 1; + this.send = vi.fn(); + this.close = vi.fn((code) => { + this.readyState = 3; + this.emit('close', code ?? 1000, ''); + }); + setImmediate(() => { + this.emit('open'); + setImmediate(() => { + this.emit('message', JSON.stringify({ + type: 'connected', + sessionId: 'sess-abc', + serverTime: new Date().toISOString(), + })); + }); + }); + } + } + + const daemon = new AlertsDaemon({ + apiKey: 'test-key', + wsUrl: 'ws://localhost/stream', + restUrl: 'http://localhost/past-alerts', + stateFile: '/tmp/test-daemon-state2.json', + backfill: false, + WebSocket: AutoMockWS, + fetchFn: vi.fn(), + log: vi.fn(), + }); + + const conn = await new Promise((resolve) => { + daemon.once('connected', resolve); + daemon.start(); + }); + + daemon.stop(); + + expect(conn.sessionId).toBe('sess-abc'); + }); + + it('emits "alert" event and writes JSON to stdout', async () => { + const alert = makeAlert(); + + class AlertMockWS extends EventEmitter { + constructor() { + super(); + this.readyState = 1; + this.send = vi.fn(); + this.close = vi.fn((code) => { this.readyState = 3; this.emit('close', code ?? 1000, ''); }); + setImmediate(() => { + this.emit('open'); + setImmediate(() => this.emit('message', JSON.stringify(alert))); + }); + } + } + + const daemon = new AlertsDaemon({ + apiKey: 'test-key', + wsUrl: 'ws://localhost/stream', + restUrl: 'http://localhost/past-alerts', + stateFile: '/tmp/test-daemon-state3.json', + backfill: false, + WebSocket: AlertMockWS, + fetchFn: vi.fn(), + log: vi.fn(), + }); + + const received = await new Promise((resolve) => { + daemon.once('alert', resolve); + daemon.start(); + }); + + daemon.stop(); + + expect(received.alertId).toBe('test-alert-001'); + expect(received.alertName).toBe('Test Alert'); + + // Should have written NDJSON to stdout + const line = stdoutLines.find((l) => l.includes('test-alert-001')); + expect(line).toBeDefined(); + const parsed = JSON.parse(line.trim()); + expect(parsed.alertId).toBe('test-alert-001'); + }); + + it('sends ping and handles pong', async () => { + vi.useFakeTimers(); + + let wsSendCalls = []; + + class PingMockWS extends EventEmitter { + constructor() { + super(); + this.readyState = 1; + this.send = vi.fn((data) => wsSendCalls.push(JSON.parse(data))); + this.close = vi.fn((code) => { this.readyState = 3; this.emit('close', code ?? 1000, ''); }); + setImmediate(() => this.emit('open')); + } + } + + const daemon = new AlertsDaemon({ + apiKey: 'test-key', + wsUrl: 'ws://localhost/stream', + restUrl: 'http://localhost/past-alerts', + stateFile: '/tmp/test-daemon-state4.json', + backfill: false, + WebSocket: PingMockWS, + fetchFn: vi.fn(), + log: vi.fn(), + }); + + daemon.start(); + + // Advance past the first setImmediate (open), then past one ping interval (30s) + await vi.advanceTimersByTimeAsync(31_000); + + daemon.stop(); + + const pings = wsSendCalls.filter((m) => m.type === 'ping'); + expect(pings.length).toBeGreaterThan(0); + + vi.useRealTimers(); + }); + + it('stops reconnecting on UNAUTHORIZED server error', async () => { + class AuthErrorMockWS extends EventEmitter { + constructor() { + super(); + this.readyState = 1; + this.send = vi.fn(); + this.close = vi.fn((code) => { this.readyState = 3; this.emit('close', code ?? 1000, ''); }); + setImmediate(() => { + this.emit('open'); + setImmediate(() => { + this.emit('message', JSON.stringify({ + type: 'error', + code: 'UNAUTHORIZED', + message: 'Invalid API key', + })); + }); + }); + } + } + + const daemon = new AlertsDaemon({ + apiKey: 'bad-key', + wsUrl: 'ws://localhost/stream', + restUrl: 'http://localhost/past-alerts', + stateFile: '/tmp/test-daemon-state5.json', + backfill: false, + WebSocket: AuthErrorMockWS, + fetchFn: vi.fn(), + log: vi.fn(), + }); + + // Wait for both the error event and for the daemon loop to finish + const errorMsg = await new Promise((resolve) => { + daemon.once('server-error', resolve); + daemon.start(); + }); + + // The daemon sets _running = false synchronously in the error handler, + // but the connect loop needs a microtask tick to process the close event. + // Flush microtasks by awaiting a resolved promise a few times. + for (let i = 0; i < 5; i++) await Promise.resolve(); + + expect(daemon._running).toBe(false); + expect(errorMsg.code).toBe('UNAUTHORIZED'); + + daemon.stop(); // cleanup timers + }); + + it('calls /past-alerts on reconnect when backfill=true', async () => { + const missedAlert = makeAlert({ alertId: 'past-001', alertName: 'Past Alert' }); + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: async () => ({ alerts: [missedAlert], count: 1 }), + }); + + class BackfillMockWS extends EventEmitter { + constructor() { + super(); + this.readyState = 1; + this.send = vi.fn(); + this.close = vi.fn((code) => { this.readyState = 3; this.emit('close', code ?? 1000, ''); }); + setImmediate(() => this.emit('open')); + } + } + + const daemon = new AlertsDaemon({ + apiKey: 'test-key', + wsUrl: 'ws://localhost/stream', + restUrl: 'http://localhost/past-alerts', + stateFile: '/tmp/test-daemon-state6.json', + backfill: true, + WebSocket: BackfillMockWS, + fetchFn: mockFetch, + log: vi.fn(), + }); + + // Seed a lastAlertAt so backfill triggers + daemon._state = { lastAlertAt: '2026-03-20T10:00:00Z' }; + + const backfilledAlerts = []; + daemon.on('alert', (a) => backfilledAlerts.push(a)); + + // Start the daemon — it will connect, backfill, then wait for messages. + // We use a log spy to detect when backfill completes. + const backfillDone = new Promise((resolve) => { + const origLog = daemon._logFn; + daemon._logFn = (level, msg) => { + origLog?.(level, msg); + if (msg.includes('Replaying') || msg.includes('No missed alerts')) resolve(); + }; + }); + + daemon.start(); + + await backfillDone; + + daemon.stop(); + + expect(mockFetch).toHaveBeenCalledWith( + expect.stringContaining('/past-alerts'), + expect.objectContaining({ headers: expect.objectContaining({ Authorization: 'Bearer test-key' }) }) + ); + expect(backfilledAlerts.some((a) => a.alertId === 'past-001')).toBe(true); + }); +}); diff --git a/src/cli.js b/src/cli.js index 940f7c70..4bcb3494 100644 --- a/src/cli.js +++ b/src/cli.js @@ -165,7 +165,7 @@ export function parseArgs(args) { const key = arg.slice(2); const next = args[i + 1]; - if (key === 'pretty' || key === 'help' || key === 'version' || key === 'table' || key === 'no-retry' || key === 'cache' || key === 'no-cache' || key === 'stream' || key === 'enrich' || key === 'full' || key === 'human' || key === 'enabled' || key === 'disabled') { + if (key === 'pretty' || key === 'help' || key === 'version' || key === 'table' || key === 'no-retry' || key === 'cache' || key === 'no-cache' || key === 'stream' || key === 'enrich' || key === 'full' || key === 'human' || key === 'enabled' || key === 'disabled' || key === 'expert' || key === 'json' || key === 'no-backfill' || key === 'action-env') { result.flags[key] = true; } else if (next && (!next.startsWith('-') || /^-\d/.test(next))) { // Try to parse as JSON first (for objects/arrays/booleans), diff --git a/src/commands/alerts.js b/src/commands/alerts.js index a4787185..861887fd 100644 --- a/src/commands/alerts.js +++ b/src/commands/alerts.js @@ -4,6 +4,7 @@ */ import { NansenError, ErrorCode } from '../api.js'; +import { buildDaemonCommand } from './daemon.js'; // ============= Formatting ============= @@ -453,6 +454,7 @@ SUBCOMMANDS: update Update an existing alert toggle Enable or disable an alert delete Delete an alert + daemon Real-time alert streaming (run, start, stop, status, logs) Run: nansen alerts --help`, @@ -687,8 +689,13 @@ USAGE: }, }; + if (sub === 'daemon') { + const daemonHandler = buildDaemonCommand({ log, getApiKey: () => apiInstance?.apiKey }); + return daemonHandler(args.slice(1), apiInstance, flags, options); + } + if (!handlers[sub]) { - throw new NansenError(`Unknown alerts subcommand: ${sub}. Available: list, create, update, toggle, delete`, ErrorCode.UNKNOWN); + throw new NansenError(`Unknown alerts subcommand: ${sub}. Available: list, create, update, toggle, delete, daemon`, ErrorCode.UNKNOWN); } // Subcommand-level help: --help flag or "help" as second positional arg diff --git a/src/commands/daemon.js b/src/commands/daemon.js new file mode 100644 index 00000000..118f3647 --- /dev/null +++ b/src/commands/daemon.js @@ -0,0 +1,221 @@ +/** + * Nansen CLI - Alerts daemon subcommand + * + * nansen alerts daemon [options] + */ + +import fs from 'fs'; +import path from 'path'; +import os from 'os'; +import { AlertsDaemon } from '../daemon/alerts-daemon.js'; + +const NANSEN_DIR = path.join(os.homedir(), '.nansen'); +const DEFAULT_PID_FILE = path.join(NANSEN_DIR, 'alerts-daemon.pid'); +const DEFAULT_LOG_FILE = path.join(NANSEN_DIR, 'alerts-daemon.log'); +const DEFAULT_STATE_FILE = path.join(NANSEN_DIR, 'alerts-daemon-state.json'); + +const DAEMON_HELP = `nansen alerts daemon — Listen to Smart Alert events in real-time + +SUBCOMMANDS: + run Run in foreground (pipe mode — alerts emitted as NDJSON on stdout) + start Start daemon in background (writes PID file) + stop Stop the running daemon + status Show daemon state (running/stopped, last alert, uptime) + logs Tail the daemon log file + +OPTIONS (run / start): + --action Shell command to run per alert. + Alert JSON passed on stdin. Supports {alertId}, {alertName}, + {alertType}, {firedAt} placeholder substitutions. + --action-env Pass alert JSON as NANSEN_ALERT env var instead of stdin + --no-backfill Skip past-alert backfill on (re)connect + --reconnect-delay Base reconnect delay in seconds (default: 5) + --ws-url Override WebSocket server URL + --state-file Path to state JSON (default: ~/.nansen/alerts-daemon-state.json) + --pid-file Path to PID file (default: ~/.nansen/alerts-daemon.pid) + --log-file Path to log file (default: ~/.nansen/alerts-daemon.log) + +EXAMPLES: + # Print all alerts as JSON (pipe to jq, agent, etc.) + nansen alerts daemon run + + # Pipe to OpenClaw + nansen alerts daemon run | openclaw inject + + # Start background daemon, wake up OpenClaw per alert + nansen alerts daemon start --action 'openclaw inject --message "Alert: {alertName}"' + + # Run against local mock server (for development) + nansen alerts daemon run --ws-url ws://localhost:9876/v1/smart-alert/stream + + # Check daemon status + nansen alerts daemon status + + # Stop daemon + nansen alerts daemon stop +`; + +function readPid(pidFile) { + try { + return parseInt(fs.readFileSync(pidFile, 'utf8').trim()); + } catch { + return null; + } +} + +function isProcessRunning(pid) { + if (!pid) return false; + try { + process.kill(pid, 0); // signal 0 = just check existence + return true; + } catch { + return false; + } +} + +export function buildDaemonCommand(deps = {}) { + const { log = console.log, getApiKey } = deps; + + return async (args, _apiInstance, flags, options) => { + const sub = args[0]; + + if (!sub || sub === 'help' || flags.help || flags.h) { + log(DAEMON_HELP); + return; + } + + const pidFile = options['pid-file'] ?? DEFAULT_PID_FILE; + const logFile = options['log-file'] ?? DEFAULT_LOG_FILE; + const stateFile = options['state-file'] ?? DEFAULT_STATE_FILE; + + const handlers = { + // ── run ───────────────────────────────────────────────────────────────── + 'run': async () => { + const apiKey = getApiKey?.() ?? process.env.NANSEN_API_KEY; + if (!apiKey) { + throw new Error('No API key found. Run: nansen login --api-key '); + } + + const wsUrl = options['ws-url']; + const restUrl = wsUrl + ? wsUrl.replace(/^ws/, 'http').replace('/v1/smart-alert/stream', '/api/v1/smart-alert/past-alerts') + : undefined; + + const daemon = new AlertsDaemon({ + apiKey, + wsUrl, + restUrl, + action: options.action, + actionEnv: flags['action-env'], + backfill: !flags['no-backfill'], + stateFile, + logFile: null, // foreground: log to stderr, not file + log: (level, message) => { + process.stderr.write(`[${new Date().toISOString()}] [${level.toUpperCase()}] ${message}\n`); + }, + }); + + // Graceful shutdown + process.on('SIGINT', () => { daemon.stop(); process.exit(0); }); + process.on('SIGTERM', () => { daemon.stop(); process.exit(0); }); + + await daemon.start(); + }, + + // ── start ──────────────────────────────────────────────────────────────── + 'start': async () => { + const pid = readPid(pidFile); + if (isProcessRunning(pid)) { + log(`Daemon already running (PID ${pid})`); + return; + } + + // Spawn detached child + const { spawn } = await import('child_process'); + const argv = [ + ...process.argv.slice(0, 2), // node + script path + 'alerts', 'daemon', 'run', + ...(options['ws-url'] ? ['--ws-url', options['ws-url']] : []), + ...(options.action ? ['--action', options.action] : []), + ...(flags['action-env'] ? ['--action-env'] : []), + ...(flags['no-backfill'] ? ['--no-backfill'] : []), + ...(options['state-file'] ? ['--state-file', options['state-file']] : []), + '--log-file', logFile, + ]; + + const child = spawn(process.execPath, argv.slice(1), { + detached: true, + stdio: 'ignore', + env: process.env, + }); + child.unref(); + + fs.mkdirSync(path.dirname(pidFile), { recursive: true }); + fs.writeFileSync(pidFile, String(child.pid), { mode: 0o600 }); + + log(`Daemon started (PID ${child.pid})`); + log(`Log: ${logFile}`); + log(`State: ${stateFile}`); + }, + + // ── stop ───────────────────────────────────────────────────────────────── + 'stop': () => { + const pid = readPid(pidFile); + if (!isProcessRunning(pid)) { + log('Daemon is not running'); + return; + } + try { + process.kill(pid, 'SIGTERM'); + log(`Daemon stopped (PID ${pid})`); + fs.unlinkSync(pidFile); + } catch (err) { + log(`Failed to stop daemon: ${err.message}`); + } + }, + + // ── status ─────────────────────────────────────────────────────────────── + 'status': () => { + const pid = readPid(pidFile); + const running = isProcessRunning(pid); + + let state = {}; + try { + state = JSON.parse(fs.readFileSync(stateFile, 'utf8')); + } catch {} + + const status = { + running, + pid: running ? pid : null, + lastAlertAt: state.lastAlertAt ?? null, + lastAlertId: state.lastAlertId ?? null, + sessionId: state.sessionId ?? null, + stateFile, + logFile, + pidFile, + }; + + return status; + }, + + // ── logs ───────────────────────────────────────────────────────────────── + 'logs': async () => { + if (!fs.existsSync(logFile)) { + log(`No log file found at ${logFile}. Has the daemon been started?`); + return; + } + // Tail last 50 lines + const content = fs.readFileSync(logFile, 'utf8'); + const lines = content.split('\n').filter(Boolean); + const tail = lines.slice(-50).join('\n'); + log(tail); + }, + }; + + if (!handlers[sub]) { + throw new Error(`Unknown daemon subcommand: ${sub}. Available: run, start, stop, status, logs`); + } + + return handlers[sub](); + }; +} diff --git a/src/daemon/alerts-daemon.js b/src/daemon/alerts-daemon.js new file mode 100644 index 00000000..f440ab68 --- /dev/null +++ b/src/daemon/alerts-daemon.js @@ -0,0 +1,378 @@ +/** + * Nansen CLI - Smart Alerts Daemon + * WebSocket client that connects to the Nansen alerts stream. + * Handles reconnect, backfill, state persistence, and action hooks. + */ + +import { EventEmitter } from 'events'; +import { spawn } from 'child_process'; +import fs from 'fs'; +import path from 'path'; +import os from 'os'; + +// ── Constants ───────────────────────────────────────────────────────────────── + +export const DEFAULT_WS_URL = 'wss://api.nansen.ai/v1/smart-alert/stream'; +export const DEFAULT_REST_URL = 'https://api.nansen.ai/api/v1/smart-alert/past-alerts'; +const DEFAULT_BASE_DELAY_MS = 5_000; +const MAX_DELAY_MS = 300_000; +const PING_INTERVAL_MS = 30_000; +const PONG_TIMEOUT_MS = 10_000; + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +function jitter(maxMs = 2000) { + return Math.floor(Math.random() * maxMs); +} + +function backoffDelay(attempt, baseMs = DEFAULT_BASE_DELAY_MS) { + const raw = baseMs * Math.pow(2, attempt) + jitter(); + return Math.min(raw, MAX_DELAY_MS); +} + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Atomically write a JSON file: write to .tmp then rename. + * Permissions: 0600 (owner read/write only — state may contain session IDs). + */ +function writeJsonAtomic(filePath, data) { + const tmp = filePath + '.tmp'; + fs.mkdirSync(path.dirname(filePath), { recursive: true }); + fs.writeFileSync(tmp, JSON.stringify(data, null, 2), { mode: 0o600 }); + fs.renameSync(tmp, filePath); +} + +function readJsonSafe(filePath) { + try { + return JSON.parse(fs.readFileSync(filePath, 'utf8')); + } catch { + return null; + } +} + +/** + * Interpolate {placeholder} tokens in a command string using safe alert metadata. + * Alert data payload is NOT interpolated — only top-level string fields. + * This prevents any injection from alert content into the shell command. + */ +function interpolateCommand(template, alert) { + return template + .replace(/\{alertId\}/g, sanitizeShell(alert.alertId ?? '')) + .replace(/\{alertName\}/g, sanitizeShell(alert.alertName ?? '')) + .replace(/\{alertType\}/g, sanitizeShell(alert.alertType ?? '')) + .replace(/\{firedAt\}/g, sanitizeShell(alert.firedAt ?? '')); +} + +/** + * Strip characters that could break a shell command. + * Placeholder substitutions land inside a shell string that we don't fully control, + * so we only allow a safe subset: alphanumeric, dash, underscore, dot, colon, slash, @. + */ +function sanitizeShell(str) { + return String(str).replace(/[^a-zA-Z0-9\-_.:/@ ]/g, ''); +} + +// ── AlertsDaemon ────────────────────────────────────────────────────────────── + +export class AlertsDaemon extends EventEmitter { + /** + * @param {object} opts + * @param {string} opts.apiKey Nansen API key (required) + * @param {string} [opts.wsUrl] WebSocket server URL + * @param {string} [opts.restUrl] /past-alerts endpoint base URL + * @param {string} [opts.action] Shell command template to run per alert + * @param {boolean} [opts.actionEnv] Pass alert JSON as NANSEN_ALERT env var (vs stdin) + * @param {boolean} [opts.backfill] Fetch past alerts on (re)connect (default: true) + * @param {string} [opts.stateFile] Path to state JSON + * @param {string} [opts.logFile] Append logs to this file (null = stderr only) + * @param {function} [opts.log] Custom log function(level, message) + * @param {function} [opts.WebSocket] Injected WebSocket class (for testing) + * @param {function} [opts.fetchFn] Injected fetch (for testing) + */ + constructor(opts = {}) { + super(); + if (!opts.apiKey) throw new Error('apiKey is required'); + + this.apiKey = opts.apiKey; + this.wsUrl = opts.wsUrl ?? DEFAULT_WS_URL; + this.restUrl = opts.restUrl ?? DEFAULT_REST_URL; + this.action = opts.action ?? null; + this.actionEnv = opts.actionEnv ?? false; + this.backfill = opts.backfill ?? true; + this.stateFile = opts.stateFile ?? path.join(os.homedir(), '.nansen', 'alerts-daemon-state.json'); + this.logFile = opts.logFile ?? null; + this._logFn = opts.log ?? null; + this._WebSocket = opts.WebSocket ?? null; + this._fetch = opts.fetchFn ?? globalThis.fetch; + + this._ws = null; + this._running = false; + this._reconnectAttempt = 0; + this._pingTimer = null; + this._pongTimer = null; + this._state = readJsonSafe(this.stateFile) ?? {}; + } + + // ── Lifecycle ──────────────────────────────────────────────────────────────── + + async start() { + if (this._running) return; + this._running = true; + this.log('info', 'Daemon starting'); + await this._connectLoop(); + } + + stop() { + this._running = false; + this._clearTimers(); + if (this._ws) { + try { this._ws.close(1000, 'daemon stopped'); } catch {} + this._ws = null; + } + this.log('info', 'Daemon stopped'); + } + + // ── Connection loop ────────────────────────────────────────────────────────── + + async _connectLoop() { + while (this._running) { + try { + await this._connect(); + } catch (err) { + this.log('error', `Connection error: ${err.message}`); + } + + if (!this._running) break; + + const delay = backoffDelay(this._reconnectAttempt); + this.log('info', `Reconnecting in ${Math.round(delay / 1000)}s (attempt ${this._reconnectAttempt + 1})`); + this._reconnectAttempt++; + await sleep(delay); + } + } + + async _connect() { + // Resolve WebSocket class: injected (tests) > Node 22 built-in > ws package + let WS = this._WebSocket; + if (!WS) { + if (typeof globalThis.WebSocket !== 'undefined') { + WS = globalThis.WebSocket; + } else { + // Dynamically import 'ws' — optional peer dependency + try { + WS = (await import('ws')).default; + } catch { + throw new Error( + 'No WebSocket implementation found. ' + + 'Node 22+ has it built-in. For older Node, run: npm install ws' + ); + } + } + } + + return new Promise((resolve, reject) => { + const ws = new WS(this.wsUrl, { + headers: { Authorization: `Bearer ${this.apiKey}` }, + }); + this._ws = ws; + + ws.on('open', async () => { + this.log('info', `Connected to ${this.wsUrl}`); + this._reconnectAttempt = 0; + this._startPing(); + this._saveState({ startedAt: this._state.startedAt ?? new Date().toISOString() }); + + if (this.backfill && this._state.lastAlertAt) { + try { + await this._fetchPastAlerts(this._state.lastAlertAt); + } catch (err) { + this.log('warn', `Backfill failed: ${err.message}`); + } + } + }); + + ws.on('message', (raw) => { + let msg; + try { + msg = JSON.parse(raw.toString()); + } catch { + this.log('warn', 'Received unparseable message, ignoring'); + return; + } + this._handleMessage(msg); + }); + + ws.on('close', (code, reason) => { + this._clearTimers(); + this.log('info', `Connection closed (code=${code} reason=${reason?.toString() ?? ''})`); + resolve(); // let the loop decide whether to reconnect + }); + + ws.on('error', (err) => { + this.log('error', `WebSocket error: ${err.message}`); + // 'close' will fire after 'error', so we reject to surface it in connectLoop + reject(err); + }); + }); + } + + // ── Message handling ───────────────────────────────────────────────────────── + + _handleMessage(msg) { + switch (msg.type) { + case 'connected': + this._saveState({ sessionId: msg.sessionId }); + this.log('info', `Session: ${msg.sessionId}`); + this.emit('connected', msg); + break; + + case 'alert': + // Log only metadata — not alert data payload (may contain market-sensitive info) + this.log('info', `Alert: [${msg.alertId}] ${msg.alertName} (${msg.alertType}) at ${msg.firedAt}`); + this._saveState({ lastAlertAt: msg.firedAt, lastAlertId: msg.alertId }); + this.emit('alert', msg); + this._dispatchAlert(msg); + break; + + case 'pong': + this._clearPongTimer(); + break; + + case 'error': + this.log('error', `Server error [${msg.code}]: ${msg.message}`); + this.emit('server-error', msg); + if (msg.code === 'UNAUTHORIZED') { + this._running = false; // auth failures are not recoverable + this._ws?.close(); + } + break; + + default: + this.log('debug', `Unknown message type: ${msg.type}`); + } + } + + _dispatchAlert(alert) { + // 1. Always emit as NDJSON on stdout (for pipe mode) + process.stdout.write(JSON.stringify(alert) + '\n'); + + // 2. Run --action hook if configured + if (!this.action) return; + + const cmd = interpolateCommand(this.action, alert); + const alertJson = JSON.stringify(alert); + const env = { ...process.env }; + + if (this.actionEnv) { + env.NANSEN_ALERT = alertJson; + } + + try { + const child = spawn('/bin/sh', ['-c', cmd], { + env, + stdio: ['pipe', 'inherit', 'inherit'], + }); + + if (!this.actionEnv) { + child.stdin.write(alertJson); + child.stdin.end(); + } + + child.on('error', (err) => { + this.log('error', `Action hook error: ${err.message}`); + }); + + child.on('close', (code) => { + if (code !== 0) { + this.log('warn', `Action hook exited ${code} for alert ${alert.alertId}`); + } + }); + } catch (err) { + this.log('error', `Failed to spawn action hook: ${err.message}`); + } + } + + // ── Backfill ───────────────────────────────────────────────────────────────── + + async _fetchPastAlerts(since) { + this.log('info', `Backfilling since ${since}`); + const url = `${this.restUrl}?since=${encodeURIComponent(since)}&limit=50`; + + const res = await this._fetch(url, { + headers: { Authorization: `Bearer ${this.apiKey}` }, + }); + + if (!res.ok) { + throw new Error(`past-alerts ${res.status} ${res.statusText}`); + } + + const body = await res.json(); + const alerts = body.alerts ?? []; + + if (alerts.length === 0) { + this.log('info', 'No missed alerts in backfill window'); + return; + } + + this.log('info', `Replaying ${alerts.length} missed alert(s)`); + for (const alert of alerts) { + this.emit('alert', alert); + this._dispatchAlert(alert); + } + } + + // ── Keepalive ──────────────────────────────────────────────────────────────── + + _startPing() { + this._clearTimers(); + this._pingTimer = setInterval(() => { + if (this._ws?.readyState === 1 /* OPEN */) { + this._ws.send(JSON.stringify({ type: 'ping', ts: Date.now() })); + this._pongTimer = setTimeout(() => { + this.log('warn', 'Pong timeout — forcing reconnect'); + this._ws?.close(); + }, PONG_TIMEOUT_MS); + } + }, PING_INTERVAL_MS); + } + + _clearPongTimer() { + if (this._pongTimer) { clearTimeout(this._pongTimer); this._pongTimer = null; } + } + + _clearTimers() { + if (this._pingTimer) { clearInterval(this._pingTimer); this._pingTimer = null; } + this._clearPongTimer(); + } + + // ── State ──────────────────────────────────────────────────────────────────── + + _saveState(patch) { + this._state = { ...this._state, ...patch }; + try { + writeJsonAtomic(this.stateFile, this._state); + } catch (err) { + this.log('warn', `State file write failed: ${err.message}`); + } + } + + // ── Logging ────────────────────────────────────────────────────────────────── + + log(level, message) { + const line = `[${new Date().toISOString()}] [${level.toUpperCase()}] ${message}`; + if (this._logFn) { + this._logFn(level, message); + } else { + process.stderr.write(line + '\n'); + } + if (this.logFile) { + try { fs.appendFileSync(this.logFile, line + '\n'); } catch {} + } + this.emit('log', { level, message, line }); + } +} + +export default AlertsDaemon; diff --git a/src/daemon/mock-server.js b/src/daemon/mock-server.js new file mode 100644 index 00000000..63317bf2 --- /dev/null +++ b/src/daemon/mock-server.js @@ -0,0 +1,116 @@ +/** + * Nansen CLI - Mock WebSocket server for local daemon testing. + * + * Usage: + * node src/daemon/mock-server.js [--port 9876] [--interval 10] + * + * Fires a fake alert every --interval seconds. Useful for developing + * and testing the daemon without a real Nansen WS backend. + */ + +import { WebSocketServer } from 'ws'; +import http from 'http'; + +const port = parseInt(process.argv.find((a, i, arr) => arr[i - 1] === '--port') ?? '9876'); +const intervalSec = parseInt(process.argv.find((a, i, arr) => arr[i - 1] === '--interval') ?? '10'); + +const MOCK_ALERTS = [ + { + type: 'alert', + alertId: 'mock-001', + alertName: 'ETH SM Inflow >5M', + alertType: 'sm-token-flows', + firedAt: null, // filled at fire time + data: { + chain: 'ethereum', + token: { address: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2', symbol: 'WETH' }, + inflow_1h: 6_200_000, + netflow_1h: 4_100_000, + }, + }, + { + type: 'alert', + alertId: 'mock-002', + alertName: 'Large USDC Transfer', + alertType: 'common-token-transfer', + firedAt: null, + data: { + chain: 'ethereum', + event: 'send', + usdValue: 2_500_000, + from: '0xabc...1234', + to: '0xdef...5678', + token: { address: '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48', symbol: 'USDC' }, + }, + }, +]; + +// Simple HTTP server to handle /api/v1/smart-alert/past-alerts +const httpServer = http.createServer((req, res) => { + if (req.url?.startsWith('/api/v1/smart-alert/past-alerts')) { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + alerts: [], + count: 0, + oldest: null, + })); + return; + } + res.writeHead(404); + res.end('Not found'); +}); + +const wss = new WebSocketServer({ server: httpServer, path: '/v1/smart-alert/stream' }); + +wss.on('connection', (ws, req) => { + const auth = req.headers['authorization']; + if (!auth?.startsWith('Bearer ')) { + ws.send(JSON.stringify({ type: 'error', code: 'UNAUTHORIZED', message: 'Missing API key' })); + ws.close(1008, 'Unauthorized'); + return; + } + + const sessionId = `mock-sess-${Date.now()}`; + console.log(`[mock-server] Client connected, session=${sessionId}`); + + ws.send(JSON.stringify({ + type: 'connected', + sessionId, + serverTime: new Date().toISOString(), + })); + + // Handle pings + ws.on('message', (raw) => { + try { + const msg = JSON.parse(raw.toString()); + if (msg.type === 'ping') { + ws.send(JSON.stringify({ type: 'pong', ts: msg.ts })); + } + } catch {} + }); + + // Fire mock alerts on interval + let alertIndex = 0; + const interval = setInterval(() => { + if (ws.readyState !== ws.OPEN) { + clearInterval(interval); + return; + } + const template = MOCK_ALERTS[alertIndex % MOCK_ALERTS.length]; + const alert = { ...template, firedAt: new Date().toISOString() }; + console.log(`[mock-server] Firing alert: ${alert.alertId} — ${alert.alertName}`); + ws.send(JSON.stringify(alert)); + alertIndex++; + }, intervalSec * 1000); + + ws.on('close', () => { + clearInterval(interval); + console.log(`[mock-server] Client disconnected, session=${sessionId}`); + }); +}); + +httpServer.listen(port, () => { + console.log(`[mock-server] Listening on ws://localhost:${port}/v1/smart-alert/stream`); + console.log(`[mock-server] Firing alerts every ${intervalSec}s`); + console.log(`[mock-server] REST: http://localhost:${port}/api/v1/smart-alert/past-alerts`); +}); diff --git a/src/schema.json b/src/schema.json index bc0c64a5..1cc890e6 100644 --- a/src/schema.json +++ b/src/schema.json @@ -704,6 +704,104 @@ "delete": { "description": "Delete an alert. Usage: nansen alerts delete ", "options": {} + }, + "daemon": { + "description": "Real-time alert streaming via WebSocket", + "subcommands": { + "run": { + "description": "Run daemon in foreground (alerts emitted as NDJSON on stdout)", + "options": { + "action": { + "type": "string", + "description": "Shell command to run per alert. Supports {alertId}, {alertName}, {alertType}, {firedAt} placeholders." + }, + "action-env": { + "type": "boolean", + "description": "Pass alert JSON as NANSEN_ALERT env var instead of stdin" + }, + "no-backfill": { + "type": "boolean", + "description": "Skip past-alert backfill on (re)connect" + }, + "ws-url": { + "type": "string", + "description": "Override WebSocket server URL" + }, + "state-file": { + "type": "string", + "description": "Path to state JSON (default: ~/.nansen/alerts-daemon-state.json)" + } + } + }, + "start": { + "description": "Start daemon in background (writes PID file)", + "options": { + "action": { + "type": "string", + "description": "Shell command to run per alert" + }, + "action-env": { + "type": "boolean", + "description": "Pass alert JSON as NANSEN_ALERT env var instead of stdin" + }, + "no-backfill": { + "type": "boolean", + "description": "Skip past-alert backfill on (re)connect" + }, + "ws-url": { + "type": "string", + "description": "Override WebSocket server URL" + }, + "state-file": { + "type": "string", + "description": "Path to state JSON" + }, + "pid-file": { + "type": "string", + "description": "Path to PID file (default: ~/.nansen/alerts-daemon.pid)" + }, + "log-file": { + "type": "string", + "description": "Path to log file (default: ~/.nansen/alerts-daemon.log)" + } + } + }, + "stop": { + "description": "Stop the running daemon", + "options": { + "pid-file": { + "type": "string", + "description": "Path to PID file" + } + } + }, + "status": { + "description": "Show daemon state (running/stopped, last alert, uptime)", + "options": { + "state-file": { + "type": "string", + "description": "Path to state JSON" + }, + "pid-file": { + "type": "string", + "description": "Path to PID file" + }, + "log-file": { + "type": "string", + "description": "Path to log file" + } + } + }, + "logs": { + "description": "Tail the daemon log file", + "options": { + "log-file": { + "type": "string", + "description": "Path to log file" + } + } + } + } } } },