Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ cargo install leash-harness
leash list # built-in stacks
leash run sim-mcp # MCP stdio for LLM agents
leash run sim-http # localhost HTTP + WebSocket
leash run sim-stream-hub # localhost TCP JSONL stream hub
leash serve mcp-http # localhost MCP JSON control surface
```

Expand Down Expand Up @@ -78,6 +79,9 @@ leash run sim-http
leash agent-send "inspect the battery"
leash agent-interactive

# Run a TCP JSONL stream hub for external module processes
leash run sim-stream-hub

# Run as a daemon and inspect JSONL logs
leash run sim-http --daemon
leash log sim-http --json --module http --lines 20
Expand Down Expand Up @@ -466,6 +470,13 @@ increment hub rejection status; the listener keeps serving later peers. This is
the first cross-process stream orchestration layer, not a distributed runtime
supervisor or remote hardware control surface.

Run it from the CLI with:

```bash
leash serve stream-hub --profile sim --listen 127.0.0.1:9970
leash run sim-stream-hub
```

## Run Logs and Resource Samples

Daemon runs write structured JSONL logs under the Leash state directory. Each
Expand Down Expand Up @@ -535,7 +546,8 @@ See [issues](https://github.com/specdog/leash/issues) for the full plan. Highlig
- [x] Stream processing helpers: latest-value backpressure, quality filters, timestamp pairing
- [x] Agent input channels: one-shot CLI, interactive CLI, and localhost web input
- [x] TCP JSONL stream framing for cross-process modules
- [ ] Long-lived cross-process and network transport orchestration
- [x] Runnable TCP JSONL stream hub for cross-process module links
- [ ] External worker lifecycle and supervision
- [ ] MAVLink drone + manipulator adapters
- [x] Localhost command center dashboard
- [x] Viewer-ready visualization frames
Expand Down
7 changes: 7 additions & 0 deletions examples/network-transport/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ Leash runtime streams default to in-process `local-pubsub` or deterministic
`memory` backends. External module processes can use the smaller TCP JSONL
boundary without changing those defaults.

Run the localhost hub:

```bash
leash serve stream-hub --profile sim --listen 127.0.0.1:9970
leash run sim-stream-hub
```

Each line is one `NetworkStreamFrame`:

```json
Expand Down
2 changes: 2 additions & 0 deletions scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ flowchart TB
all --> http["smoke-http.sh\nHTTP routes, telemetry streams, policy denial"]
all --> mcp["smoke-mcp.sh\nstdio MCP initialize, tools, health"]
all --> mcphttp["smoke-mcp-http.sh\nlocalhost MCP HTTP, CLI, planner, and patrol calls"]
all --> streamhub["smoke-stream-hub.sh\nTCP JSONL stream hub"]
all --> replayhttp["smoke-replay-http.sh\nHTTP replay observe"]
all --> replaymcp["smoke-replay-mcp.sh\nMCP replay observe"]
all --> physical["smoke-physical-gate.sh\nphysical profile refuses without gate"]
Expand All @@ -22,6 +23,7 @@ flowchart TB
- `smoke-http.sh`: HTTP, WebSocket/SSE, visualization frame, map/costmap contracts, external clients, agent input, capture, drive, and policy checks.
- `smoke-mcp.sh`: stdio MCP initialization and tool calls.
- `smoke-mcp-http.sh`: localhost MCP HTTP routes, `leash mcp` CLI calls, sim planner set/status calls, and sim patrol start/status/stop calls.
- `smoke-stream-hub.sh`: starts the localhost TCP JSONL stream hub, sends valid frames, and proves an invalid peer does not kill the listener.
- `smoke-replay-http.sh`: replay mode over HTTP.
- `smoke-replay-mcp.sh`: replay mode over MCP.
- `smoke-physical-gate.sh`: proves physical startup fails without explicit actuation.
Expand Down
11 changes: 10 additions & 1 deletion scripts/smoke-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ const checks = [
argv: ["bash", "scripts/smoke-mcp-http.sh"],
proof: "HTTP MCP tool list, health/stop calls, CLI status/modules, key=value/JSON direct calls, planner, patrol, and spatial-memory calls passed",
},
{
name: "stream-hub",
argv: ["bash", "scripts/smoke-stream-hub.sh"],
proof: "TCP JSONL stream hub accepted valid frames and kept serving after an invalid peer",
},
{
name: "replay-http-observe",
argv: ["bash", "scripts/smoke-replay-http.sh"],
Expand All @@ -79,11 +84,15 @@ const checks = [
validate: (stdout) => {
const stacks = JSON.parse(stdout);
const names = stacks.map((stack) => stack.name);
for (const required of ["sim-http", "sim-mcp", "bridge-compat-http", "waveshare-ugv-http"]) {
for (const required of ["sim-http", "sim-mcp", "sim-stream-hub", "bridge-compat-http", "waveshare-ugv-http"]) {
if (!names.includes(required)) {
throw new Error(`missing stack ${required}`);
}
}
const streamHub = stacks.find((stack) => stack.name === "sim-stream-hub");
if (!streamHub || streamHub.transport.kind !== "stream-hub") {
throw new Error("sim-stream-hub stack did not declare stream-hub transport");
}
const physical = stacks.find((stack) => stack.name === "waveshare-ugv-http");
if (!physical.hardware_required) {
throw new Error("waveshare stack did not declare hardware_required");
Expand Down
108 changes: 108 additions & 0 deletions scripts/smoke-stream-hub.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env bash
set -euo pipefail

port="${LEASH_STREAM_HUB_SMOKE_PORT:-18084}"
log_file="$(mktemp -t leash-stream-hub-smoke.XXXXXX.log)"
timeout_secs="${LEASH_SMOKE_TIMEOUT_SECS:-60}"

cleanup() {
if [[ -n "${server_pid:-}" ]] && kill -0 "$server_pid" 2>/dev/null; then
kill "$server_pid" 2>/dev/null || true
wait "$server_pid" 2>/dev/null || true
fi
rm -f "$log_file"
}
trap cleanup EXIT

cargo run --quiet -- serve stream-hub --profile sim --listen "127.0.0.1:$port" >"$log_file" 2>&1 &
server_pid=$!

ready=false
for _ in $(seq 1 $((timeout_secs * 10))); do
if PORT="$port" node <<'EOF' >/dev/null 2>&1
const net = require("node:net");
const socket = net.createConnection({ host: "127.0.0.1", port: Number(process.env.PORT) });
socket.setTimeout(250);
socket.on("connect", () => socket.end());
socket.on("close", () => process.exit(0));
socket.on("timeout", () => socket.destroy(new Error("timeout")));
socket.on("error", () => process.exit(1));
EOF
then
ready=true
break
fi
if ! kill -0 "$server_pid" 2>/dev/null; then
echo "stream hub smoke server exited before readiness" >&2
cat "$log_file" >&2
exit 1
fi
sleep 0.1
done

if [[ "$ready" != true ]]; then
echo "stream hub smoke server was not ready after ${timeout_secs}s" >&2
cat "$log_file" >&2
exit 1
fi

LOG_FILE="$log_file" PORT="$port" node <<'EOF'
const fs = require("node:fs");
const net = require("node:net");

const port = Number(process.env.PORT);

function sendLines(lines) {
return new Promise((resolve, reject) => {
const socket = net.createConnection({ host: "127.0.0.1", port }, () => {
for (const line of lines) socket.write(`${line}\n`);
socket.end();
});
socket.setTimeout(2000);
socket.on("timeout", () => socket.destroy(new Error("tcp jsonl send timeout")));
socket.on("error", reject);
socket.on("close", resolve);
});
}

function frame(stream, seq) {
return JSON.stringify({
schema_version: "leash-stream-jsonl-v1",
stream,
payload: { seq, source: "smoke-stream-hub" },
});
}

(async () => {
const firstLine = fs.readFileSync(process.env.LOG_FILE, "utf8").trim().split(/\n+/)[0];
const startup = JSON.parse(firstLine);
if (startup.ok !== true) throw new Error("stream hub startup ok was not true");
if (startup.transport !== "stream-hub") throw new Error(`unexpected transport: ${startup.transport}`);
if (startup.profile !== "sim") throw new Error(`unexpected profile: ${startup.profile}`);
if (startup.listen !== `127.0.0.1:${port}`) throw new Error(`unexpected listen: ${startup.listen}`);
if (startup.stream_transport !== "local-pubsub") {
throw new Error(`unexpected stream transport: ${startup.stream_transport}`);
}

await sendLines([frame("telemetry", 1), frame("telemetry", 2)]);
await sendLines([
JSON.stringify({
schema_version: "old",
stream: "telemetry",
payload: { seq: 0, source: "bad-peer" },
}),
]);
await sendLines([frame("telemetry", 3)]);
})().catch((error) => {
console.error(error.stack || String(error));
process.exit(1);
});
EOF

if ! kill -0 "$server_pid" 2>/dev/null; then
echo "stream hub smoke server exited after client traffic" >&2
cat "$log_file" >&2
exit 1
fi

echo "stream hub smoke ok: 127.0.0.1:$port"
51 changes: 49 additions & 2 deletions src/bin/leash.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#[cfg(feature = "http")]
use std::io::{self, Write};
use std::{collections::BTreeMap, fmt, net::SocketAddr, path::PathBuf, time::Duration};

Expand All @@ -17,7 +16,7 @@ use leash_harness::{
module::default_module_graph,
replay::{scaled_delay, ReplayEvent, ReplayEventKind, ReplayRecording, REPLAY_FORMAT_VERSION},
stack::{built_in_stacks, find_stack, Stack, StackTransport},
transport::StreamTransportBackend,
transport::{spawn_tcp_jsonl_stream_hub, StreamTransportBackend},
types::RunLogEntry,
Harness, HarnessConfig, Profile, TelemetryStreamFrame,
};
Expand Down Expand Up @@ -98,6 +97,7 @@ enum Transport {
McpHttp(McpHttpServeArgs),
#[cfg(feature = "http")]
Http(HttpServeArgs),
StreamHub(StreamHubServeArgs),
}

#[derive(Debug, Args)]
Expand Down Expand Up @@ -196,6 +196,15 @@ struct McpHttpServeArgs {
listen: SocketAddr,
}

#[derive(Debug, Args)]
struct StreamHubServeArgs {
#[command(flatten)]
runtime: RuntimeArgs,

#[arg(long, default_value = "127.0.0.1:9970")]
listen: SocketAddr,
}

#[derive(Debug, Args)]
struct HttpTarget {
#[arg(long, env = "LEASH_URL", default_value = "http://127.0.0.1:8000")]
Expand Down Expand Up @@ -408,6 +417,11 @@ async fn main() -> Result<()> {
let harness = Harness::new(config)?;
leash_harness::http::serve_http(harness, listen).await?;
}
Transport::StreamHub(args) => {
let config =
config_from_args(args.runtime, Some(args.listen), cli.config.clone(), None)?;
serve_stream_hub(config).await?;
}
},
Command::Record(args) => {
let output = record_stream(args, cli.config.clone()).await?;
Expand Down Expand Up @@ -726,10 +740,43 @@ async fn run_stack(args: RunArgs, config_path: Option<PathBuf>) -> Result<()> {
bail!("MCP stacks require the 'mcp' feature");
}
}
StackTransport::StreamHub => {
if args.daemon {
bail!("daemon mode is only supported for HTTP stacks");
}
serve_stream_hub(config).await?;
}
}
Ok(())
}

#[derive(Debug, Serialize)]
struct StreamHubServeOutput {
ok: bool,
transport: &'static str,
profile: String,
listen: String,
stream_transport: String,
}

async fn serve_stream_hub(config: HarnessConfig) -> Result<()> {
let listen = config.listen;
let output = StreamHubServeOutput {
ok: true,
transport: "stream-hub",
profile: config.profile.as_str().to_string(),
listen: listen.to_string(),
stream_transport: config.stream_transport.as_str().to_string(),
};
let harness = Harness::new(config)?;
let hub = spawn_tcp_jsonl_stream_hub(listen, harness.stream_transport()).await?;
println!("{}", serde_json::to_string(&output)?);
io::stdout().flush()?;
tokio::signal::ctrl_c().await?;
hub.shutdown().await?;
Ok(())
}

#[derive(Debug)]
struct RunSelection {
#[cfg_attr(not(feature = "http"), allow(dead_code))]
Expand Down
4 changes: 4 additions & 0 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ impl Harness {
self.stream_transport.subscribe(stream)
}

pub fn stream_transport(&self) -> Arc<dyn StreamTransport> {
self.stream_transport.clone()
}

pub fn capability_registry(&self) -> CapabilityRegistry {
CapabilityRegistry::new(self.clone())
}
Expand Down
33 changes: 33 additions & 0 deletions src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ use crate::{
pub enum StackTransport {
Http,
Mcp,
StreamHub,
}

impl StackTransport {
pub fn as_str(self) -> &'static str {
match self {
Self::Http => "http",
Self::Mcp => "mcp",
Self::StreamHub => "stream-hub",
}
}
}
Expand Down Expand Up @@ -187,6 +189,7 @@ pub fn built_in_stacks() -> Vec<Stack> {
feature_stacks(vec![
sim_http_stack(),
sim_mcp_stack(),
sim_stream_hub_stack(),
bridge_compat_http_stack(),
waveshare_ugv_http_stack(),
])
Expand Down Expand Up @@ -282,6 +285,27 @@ fn sim_mcp_stack() -> Stack {
}
}

fn sim_stream_hub_stack() -> Stack {
Stack {
name: "sim-stream-hub".to_string(),
description: "Simulation TCP JSONL stream hub for external module links".to_string(),
profile: Profile::Sim,
transport: TransportBinding {
kind: StackTransport::StreamHub,
listen: Some(socket("127.0.0.1:9970")),
},
required_features: strings(&["sim"]),
hardware_required: false,
adapter: simulation_adapter_profile(),
config_overrides: PartialHarnessConfig {
listen: Some(socket("127.0.0.1:9970")),
..PartialHarnessConfig::default()
},
modules: module_refs(Profile::Sim),
command: "leash run sim-stream-hub".to_string(),
}
}

fn bridge_compat_http_stack() -> Stack {
Stack {
name: "bridge-compat-http".to_string(),
Expand Down Expand Up @@ -645,6 +669,7 @@ mod tests {
let stacks = built_in_stacks();
assert!(stacks.iter().any(|stack| stack.name == "sim-http"));
assert!(stacks.iter().any(|stack| stack.name == "sim-mcp"));
assert!(stacks.iter().any(|stack| stack.name == "sim-stream-hub"));
assert!(stacks
.iter()
.any(|stack| stack.name == "bridge-compat-http"));
Expand Down Expand Up @@ -698,6 +723,14 @@ mod tests {
assert_eq!(sim.category, AdapterCategory::Simulation);
assert!(sim.required_gates.is_empty());

let stream_hub = stacks
.iter()
.find(|stack| stack.name == "sim-stream-hub")
.unwrap();
assert_eq!(stream_hub.transport.kind, StackTransport::StreamHub);
assert!(!stream_hub.hardware_required);
assert_eq!(stream_hub.transport.listen, Some(socket("127.0.0.1:9970")));

#[cfg(feature = "mavlink-drone")]
{
let drone = stacks
Expand Down
Loading