diff --git a/README.md b/README.md index 2c1e528..7b2402d 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ cargo install leash-harness leash list # built-in stacks leash run sim-mcp # MCP stdio for LLM agents leash run sim-http # localhost HTTP + WebSocket +leash run sim-stream-hub # localhost TCP JSONL stream hub leash serve mcp-http # localhost MCP JSON control surface ``` @@ -78,6 +79,9 @@ leash run sim-http leash agent-send "inspect the battery" leash agent-interactive +# Run a TCP JSONL stream hub for external module processes +leash run sim-stream-hub + # Run as a daemon and inspect JSONL logs leash run sim-http --daemon leash log sim-http --json --module http --lines 20 @@ -466,6 +470,13 @@ increment hub rejection status; the listener keeps serving later peers. This is the first cross-process stream orchestration layer, not a distributed runtime supervisor or remote hardware control surface. +Run it from the CLI with: + +```bash +leash serve stream-hub --profile sim --listen 127.0.0.1:9970 +leash run sim-stream-hub +``` + ## Run Logs and Resource Samples Daemon runs write structured JSONL logs under the Leash state directory. Each @@ -535,7 +546,8 @@ See [issues](https://github.com/specdog/leash/issues) for the full plan. Highlig - [x] Stream processing helpers: latest-value backpressure, quality filters, timestamp pairing - [x] Agent input channels: one-shot CLI, interactive CLI, and localhost web input - [x] TCP JSONL stream framing for cross-process modules -- [ ] Long-lived cross-process and network transport orchestration +- [x] Runnable TCP JSONL stream hub for cross-process module links +- [ ] External worker lifecycle and supervision - [ ] MAVLink drone + manipulator adapters - [x] Localhost command center dashboard - [x] Viewer-ready visualization frames diff --git a/examples/network-transport/README.md b/examples/network-transport/README.md index cc7e414..e3102c2 100644 --- a/examples/network-transport/README.md +++ b/examples/network-transport/README.md @@ -4,6 +4,13 @@ Leash runtime streams default to in-process `local-pubsub` or deterministic `memory` backends. External module processes can use the smaller TCP JSONL boundary without changing those defaults. +Run the localhost hub: + +```bash +leash serve stream-hub --profile sim --listen 127.0.0.1:9970 +leash run sim-stream-hub +``` + Each line is one `NetworkStreamFrame`: ```json diff --git a/scripts/README.md b/scripts/README.md index 280e4ec..ff5f8d8 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -8,6 +8,7 @@ flowchart TB all --> http["smoke-http.sh\nHTTP routes, telemetry streams, policy denial"] all --> mcp["smoke-mcp.sh\nstdio MCP initialize, tools, health"] all --> mcphttp["smoke-mcp-http.sh\nlocalhost MCP HTTP, CLI, planner, and patrol calls"] + all --> streamhub["smoke-stream-hub.sh\nTCP JSONL stream hub"] all --> replayhttp["smoke-replay-http.sh\nHTTP replay observe"] all --> replaymcp["smoke-replay-mcp.sh\nMCP replay observe"] all --> physical["smoke-physical-gate.sh\nphysical profile refuses without gate"] @@ -22,6 +23,7 @@ flowchart TB - `smoke-http.sh`: HTTP, WebSocket/SSE, visualization frame, map/costmap contracts, external clients, agent input, capture, drive, and policy checks. - `smoke-mcp.sh`: stdio MCP initialization and tool calls. - `smoke-mcp-http.sh`: localhost MCP HTTP routes, `leash mcp` CLI calls, sim planner set/status calls, and sim patrol start/status/stop calls. +- `smoke-stream-hub.sh`: starts the localhost TCP JSONL stream hub, sends valid frames, and proves an invalid peer does not kill the listener. - `smoke-replay-http.sh`: replay mode over HTTP. - `smoke-replay-mcp.sh`: replay mode over MCP. - `smoke-physical-gate.sh`: proves physical startup fails without explicit actuation. diff --git a/scripts/smoke-all.sh b/scripts/smoke-all.sh index f46a53c..30ee4b2 100755 --- a/scripts/smoke-all.sh +++ b/scripts/smoke-all.sh @@ -53,6 +53,11 @@ const checks = [ argv: ["bash", "scripts/smoke-mcp-http.sh"], proof: "HTTP MCP tool list, health/stop calls, CLI status/modules, key=value/JSON direct calls, planner, patrol, and spatial-memory calls passed", }, + { + name: "stream-hub", + argv: ["bash", "scripts/smoke-stream-hub.sh"], + proof: "TCP JSONL stream hub accepted valid frames and kept serving after an invalid peer", + }, { name: "replay-http-observe", argv: ["bash", "scripts/smoke-replay-http.sh"], @@ -79,11 +84,15 @@ const checks = [ validate: (stdout) => { const stacks = JSON.parse(stdout); const names = stacks.map((stack) => stack.name); - for (const required of ["sim-http", "sim-mcp", "bridge-compat-http", "waveshare-ugv-http"]) { + for (const required of ["sim-http", "sim-mcp", "sim-stream-hub", "bridge-compat-http", "waveshare-ugv-http"]) { if (!names.includes(required)) { throw new Error(`missing stack ${required}`); } } + const streamHub = stacks.find((stack) => stack.name === "sim-stream-hub"); + if (!streamHub || streamHub.transport.kind !== "stream-hub") { + throw new Error("sim-stream-hub stack did not declare stream-hub transport"); + } const physical = stacks.find((stack) => stack.name === "waveshare-ugv-http"); if (!physical.hardware_required) { throw new Error("waveshare stack did not declare hardware_required"); diff --git a/scripts/smoke-stream-hub.sh b/scripts/smoke-stream-hub.sh new file mode 100755 index 0000000..cc6bb55 --- /dev/null +++ b/scripts/smoke-stream-hub.sh @@ -0,0 +1,108 @@ +#!/usr/bin/env bash +set -euo pipefail + +port="${LEASH_STREAM_HUB_SMOKE_PORT:-18084}" +log_file="$(mktemp -t leash-stream-hub-smoke.XXXXXX.log)" +timeout_secs="${LEASH_SMOKE_TIMEOUT_SECS:-60}" + +cleanup() { + if [[ -n "${server_pid:-}" ]] && kill -0 "$server_pid" 2>/dev/null; then + kill "$server_pid" 2>/dev/null || true + wait "$server_pid" 2>/dev/null || true + fi + rm -f "$log_file" +} +trap cleanup EXIT + +cargo run --quiet -- serve stream-hub --profile sim --listen "127.0.0.1:$port" >"$log_file" 2>&1 & +server_pid=$! + +ready=false +for _ in $(seq 1 $((timeout_secs * 10))); do + if PORT="$port" node <<'EOF' >/dev/null 2>&1 +const net = require("node:net"); +const socket = net.createConnection({ host: "127.0.0.1", port: Number(process.env.PORT) }); +socket.setTimeout(250); +socket.on("connect", () => socket.end()); +socket.on("close", () => process.exit(0)); +socket.on("timeout", () => socket.destroy(new Error("timeout"))); +socket.on("error", () => process.exit(1)); +EOF + then + ready=true + break + fi + if ! kill -0 "$server_pid" 2>/dev/null; then + echo "stream hub smoke server exited before readiness" >&2 + cat "$log_file" >&2 + exit 1 + fi + sleep 0.1 +done + +if [[ "$ready" != true ]]; then + echo "stream hub smoke server was not ready after ${timeout_secs}s" >&2 + cat "$log_file" >&2 + exit 1 +fi + +LOG_FILE="$log_file" PORT="$port" node <<'EOF' +const fs = require("node:fs"); +const net = require("node:net"); + +const port = Number(process.env.PORT); + +function sendLines(lines) { + return new Promise((resolve, reject) => { + const socket = net.createConnection({ host: "127.0.0.1", port }, () => { + for (const line of lines) socket.write(`${line}\n`); + socket.end(); + }); + socket.setTimeout(2000); + socket.on("timeout", () => socket.destroy(new Error("tcp jsonl send timeout"))); + socket.on("error", reject); + socket.on("close", resolve); + }); +} + +function frame(stream, seq) { + return JSON.stringify({ + schema_version: "leash-stream-jsonl-v1", + stream, + payload: { seq, source: "smoke-stream-hub" }, + }); +} + +(async () => { + const firstLine = fs.readFileSync(process.env.LOG_FILE, "utf8").trim().split(/\n+/)[0]; + const startup = JSON.parse(firstLine); + if (startup.ok !== true) throw new Error("stream hub startup ok was not true"); + if (startup.transport !== "stream-hub") throw new Error(`unexpected transport: ${startup.transport}`); + if (startup.profile !== "sim") throw new Error(`unexpected profile: ${startup.profile}`); + if (startup.listen !== `127.0.0.1:${port}`) throw new Error(`unexpected listen: ${startup.listen}`); + if (startup.stream_transport !== "local-pubsub") { + throw new Error(`unexpected stream transport: ${startup.stream_transport}`); + } + + await sendLines([frame("telemetry", 1), frame("telemetry", 2)]); + await sendLines([ + JSON.stringify({ + schema_version: "old", + stream: "telemetry", + payload: { seq: 0, source: "bad-peer" }, + }), + ]); + await sendLines([frame("telemetry", 3)]); +})().catch((error) => { + console.error(error.stack || String(error)); + process.exit(1); +}); +EOF + +if ! kill -0 "$server_pid" 2>/dev/null; then + echo "stream hub smoke server exited after client traffic" >&2 + cat "$log_file" >&2 + exit 1 +fi + +echo "stream hub smoke ok: 127.0.0.1:$port" diff --git a/src/bin/leash.rs b/src/bin/leash.rs index 43d865d..154e1f0 100644 --- a/src/bin/leash.rs +++ b/src/bin/leash.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "http")] use std::io::{self, Write}; use std::{collections::BTreeMap, fmt, net::SocketAddr, path::PathBuf, time::Duration}; @@ -17,7 +16,7 @@ use leash_harness::{ module::default_module_graph, replay::{scaled_delay, ReplayEvent, ReplayEventKind, ReplayRecording, REPLAY_FORMAT_VERSION}, stack::{built_in_stacks, find_stack, Stack, StackTransport}, - transport::StreamTransportBackend, + transport::{spawn_tcp_jsonl_stream_hub, StreamTransportBackend}, types::RunLogEntry, Harness, HarnessConfig, Profile, TelemetryStreamFrame, }; @@ -98,6 +97,7 @@ enum Transport { McpHttp(McpHttpServeArgs), #[cfg(feature = "http")] Http(HttpServeArgs), + StreamHub(StreamHubServeArgs), } #[derive(Debug, Args)] @@ -196,6 +196,15 @@ struct McpHttpServeArgs { listen: SocketAddr, } +#[derive(Debug, Args)] +struct StreamHubServeArgs { + #[command(flatten)] + runtime: RuntimeArgs, + + #[arg(long, default_value = "127.0.0.1:9970")] + listen: SocketAddr, +} + #[derive(Debug, Args)] struct HttpTarget { #[arg(long, env = "LEASH_URL", default_value = "http://127.0.0.1:8000")] @@ -408,6 +417,11 @@ async fn main() -> Result<()> { let harness = Harness::new(config)?; leash_harness::http::serve_http(harness, listen).await?; } + Transport::StreamHub(args) => { + let config = + config_from_args(args.runtime, Some(args.listen), cli.config.clone(), None)?; + serve_stream_hub(config).await?; + } }, Command::Record(args) => { let output = record_stream(args, cli.config.clone()).await?; @@ -726,10 +740,43 @@ async fn run_stack(args: RunArgs, config_path: Option) -> Result<()> { bail!("MCP stacks require the 'mcp' feature"); } } + StackTransport::StreamHub => { + if args.daemon { + bail!("daemon mode is only supported for HTTP stacks"); + } + serve_stream_hub(config).await?; + } } Ok(()) } +#[derive(Debug, Serialize)] +struct StreamHubServeOutput { + ok: bool, + transport: &'static str, + profile: String, + listen: String, + stream_transport: String, +} + +async fn serve_stream_hub(config: HarnessConfig) -> Result<()> { + let listen = config.listen; + let output = StreamHubServeOutput { + ok: true, + transport: "stream-hub", + profile: config.profile.as_str().to_string(), + listen: listen.to_string(), + stream_transport: config.stream_transport.as_str().to_string(), + }; + let harness = Harness::new(config)?; + let hub = spawn_tcp_jsonl_stream_hub(listen, harness.stream_transport()).await?; + println!("{}", serde_json::to_string(&output)?); + io::stdout().flush()?; + tokio::signal::ctrl_c().await?; + hub.shutdown().await?; + Ok(()) +} + #[derive(Debug)] struct RunSelection { #[cfg_attr(not(feature = "http"), allow(dead_code))] diff --git a/src/runtime.rs b/src/runtime.rs index 034865e..158d849 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -402,6 +402,10 @@ impl Harness { self.stream_transport.subscribe(stream) } + pub fn stream_transport(&self) -> Arc { + self.stream_transport.clone() + } + pub fn capability_registry(&self) -> CapabilityRegistry { CapabilityRegistry::new(self.clone()) } diff --git a/src/stack.rs b/src/stack.rs index 1786f10..3522a5c 100644 --- a/src/stack.rs +++ b/src/stack.rs @@ -15,6 +15,7 @@ use crate::{ pub enum StackTransport { Http, Mcp, + StreamHub, } impl StackTransport { @@ -22,6 +23,7 @@ impl StackTransport { match self { Self::Http => "http", Self::Mcp => "mcp", + Self::StreamHub => "stream-hub", } } } @@ -187,6 +189,7 @@ pub fn built_in_stacks() -> Vec { feature_stacks(vec![ sim_http_stack(), sim_mcp_stack(), + sim_stream_hub_stack(), bridge_compat_http_stack(), waveshare_ugv_http_stack(), ]) @@ -282,6 +285,27 @@ fn sim_mcp_stack() -> Stack { } } +fn sim_stream_hub_stack() -> Stack { + Stack { + name: "sim-stream-hub".to_string(), + description: "Simulation TCP JSONL stream hub for external module links".to_string(), + profile: Profile::Sim, + transport: TransportBinding { + kind: StackTransport::StreamHub, + listen: Some(socket("127.0.0.1:9970")), + }, + required_features: strings(&["sim"]), + hardware_required: false, + adapter: simulation_adapter_profile(), + config_overrides: PartialHarnessConfig { + listen: Some(socket("127.0.0.1:9970")), + ..PartialHarnessConfig::default() + }, + modules: module_refs(Profile::Sim), + command: "leash run sim-stream-hub".to_string(), + } +} + fn bridge_compat_http_stack() -> Stack { Stack { name: "bridge-compat-http".to_string(), @@ -645,6 +669,7 @@ mod tests { let stacks = built_in_stacks(); assert!(stacks.iter().any(|stack| stack.name == "sim-http")); assert!(stacks.iter().any(|stack| stack.name == "sim-mcp")); + assert!(stacks.iter().any(|stack| stack.name == "sim-stream-hub")); assert!(stacks .iter() .any(|stack| stack.name == "bridge-compat-http")); @@ -698,6 +723,14 @@ mod tests { assert_eq!(sim.category, AdapterCategory::Simulation); assert!(sim.required_gates.is_empty()); + let stream_hub = stacks + .iter() + .find(|stack| stack.name == "sim-stream-hub") + .unwrap(); + assert_eq!(stream_hub.transport.kind, StackTransport::StreamHub); + assert!(!stream_hub.hardware_required); + assert_eq!(stream_hub.transport.listen, Some(socket("127.0.0.1:9970"))); + #[cfg(feature = "mavlink-drone")] { let drone = stacks