Skip to content

Conversation

@ericallam
Copy link
Member

@ericallam ericallam commented Oct 24, 2025

This PR introduces an upgraded Realtime streams backend and SDK that makes streams more reliable (and resumable) with increased or the removal of limits. We've also improved the visibily of streams via the run dashboard.

New limits

View the below limits table for more details:

Limit Streams 1.0 Streams 2.0
Maximum stream length 2000 Unlimited
Number of active streams per run 5 Unlimited
Maximum streams per run 10 Unlimited
Maximum stream TTL 1 day 28 days
Maximum stream size 10MB 300 MiB

Additionally, previously only a single client stream could be sent to a Realtime stream. Now, you can send multiple client streams to a single Realtime stream.

These new limits only apply when using the new streams backend powered by S2, and must be enabled explicitly when triggering a run by setting the new unstable_v2RealtimeStreams future flag when configuring your trigger client (or alternatively you can set the TRIGGER_V2_REALTIME_STREAMS=1 environment variable in your backend code):

import { auth } from "@trigger.dev/sdk";

auth.configure({
  future: {
    unstable_v2RealtimeStreams: true,
  },
});

Reliability improvements

When appending to a stream, the backend will now reliably resume appending from the last chunk index if there is a lost connection. Additionally, we've improved the reliability of reading from a stream by automatically resuming failed reads from the last chunk index if there is a lost connection.

This means that both sides of the stream will be much more reliable and will not lose data even when faced with network issues or other disruptions.

SDK improvements

We've moved the stream logic into their own dedicated namespace in the SDK instead of being mixed in with the other metadata methods:

import { streams, metadata, task } from "@trigger.dev/sdk";

const myTask = task({
  id: "my-task",
  run: async (payload: { message: string }) => {
    const stream = // ... get a stream from somewhere
      // Before:
      await metadata.stream("my-stream", stream);

    // After:
    await streams.append("my-stream", stream);
  },
});

You can now append to a stream using the streams.append method, which returns a result that can be used to wait until the stream is complete:

import { streams, task } from "@trigger.dev/sdk";

const myTask = task({
  id: "my-task",
  run: async (payload: { message: string }) => {
    const myAIStream = // ... get a stream from somewhere

    const { stream, waitUntilComplete } = await streams.append("ai-stream", myAIStream);

    // A. Iterate over the stream which is an async iterable and a ReadableStream
    for await (const chunk of stream) {
      console.log(chunk);
    }

    // B. Wait until the stream is complete
    await waitUntilComplete();

    return {
      message: "Stream completed successfully",
    };
  },
});

When calling streams.append from inside a task, the stream is automatically associated with the current run. You can also optionally specify a target run ID to append to a stream on a different run:

import { streams, task } from "@trigger.dev/sdk";

const myTask = task({
  id: "my-task",
  run: async (payload: { message: string; otherRunId?: string }, { ctx }) => {
    const myAIStream = // ... get a stream from somewhere

    // Append to a stream on the root run
    const { stream, waitUntilComplete } = await streams.append("ai-stream", myAIStream, { target: ctx.run.rootTaskRunId });

    // Append to a stream on the parent run
    const { stream, waitUntilComplete } = await streams.append("ai-stream", myAIStream, { target: ctx.run.parentTaskRunId });

    // Append to a stream on another run by ID
    const { stream, waitUntilComplete } = await streams.append("ai-stream", myAIStream, { target: payload.otherRunId });
  },
});

This means that, if you specify a target run ID, you can append to a stream outside of a task:

import { streams } from "@trigger.dev/sdk";
import { openai } from "@ai-sdk/openai";
import {
  convertToModelMessages,
  streamText,
  UIMessage,
  createUIMessageStreamResponse,
} from "ai";

// Allow streaming responses up to 30 seconds
export const maxDuration = 30;

export async function POST(req: Request) {
  const { messages, runId }: { messages: UIMessage[]; runId: string } =
    await req.json();

  const result = streamText({
    model: openai("gpt-4.1"),
    system: "You are a helpful assistant.",
    messages: convertToModelMessages(messages),
  });

  const { stream } = await streams.append(
    "ai-stream",
    result.toUIMessageStream(),
    { target: runId }
  );

  return createUIMessageStreamResponse({
    stream,
  });
}

We've also added a new streams.read method to read from a stream:

import { streams } from "@trigger.dev/sdk";

const stream = await streams.read(runId, "my-stream");

for await (const chunk of stream) {
  console.log(chunk);
}

You can also specify a timeout and start index to read from:

import { streams } from "@trigger.dev/sdk";

const stream = await streams.read(runId, "my-stream", {
  timeoutInSeconds: 10, // Will stop the stream if no data is received within 10 seconds
  startIndex: 10, // Will start reading from the 10th chunk
});

New useRealtimeStream hook

We've added a new useRealtimeStream hook to subscribe to a stream by its run ID and stream key:

"use client";

import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { Streamdown } from "streamdown";

export function Streams({ accessToken, runId, startIndex }: { accessToken: string; runId: string; startIndex?: number }) {
  // parts will be typed as `string[]`
  const { parts, error } = useRealtimeStream<string>(runId, "stream", {
    accessToken, // Pass in a public access token to authenticate the request
    onData: (data) => {
      console.log(data); // Optionally, you can listen to the data as it comes in
    },
    timeoutInSeconds: 600, // Will stop the stream if no data is received within 600 seconds (default is 60 seconds)
    startIndex, // Will start reading from the xth chunk if provided (default is 0)
    throttleInMs: 50, // Will throttle the stream updates to 50ms (default is 16ms)
  });

  if (error) return <div className="text-red-600 font-semibold">Error: {error.message}</div>;

  if (!parts) return <div className="text-gray-600">Loading...</div>;

  const stream = parts.join("");

  return (
    <div className="space-y-4">
      <div className="text-sm font-medium text-gray-700">
        <span className="font-semibold">Run:</span> {runId}
      </div>
      <div className="prose prose-sm max-w-none text-gray-900">
        <Streamdown isAnimating={true}>{stream}</Streamdown>
      </div>
    </div>
  );
}

Dashboard improvements

We're now surfacing streams in the runs dashboard that will allow you to view the stream data in real-time:

CleanShot.2025-10-24.at.17.19.22.mp4

ericallam and others added 30 commits October 23, 2025 15:36
…ing resumable streams. We also will now send invisible "ping" packets to keep connected clients alive when there are no real data packets to send, which will be especially helpful to older clients
… the stream keys. Also added a new db:seed script to seed a fresh database for local development with reference projects setup
@changeset-bot
Copy link

changeset-bot bot commented Oct 24, 2025

🦋 Changeset detected

Latest commit: 2246e6e

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 24 packages
Name Type
@trigger.dev/sdk Patch
@trigger.dev/python Patch
d3-chat Patch
references-d3-openai-agents Patch
references-nextjs-realtime Patch
references-realtime-streams Patch
@trigger.dev/build Patch
@trigger.dev/core Patch
@trigger.dev/react-hooks Patch
@trigger.dev/redis-worker Patch
@trigger.dev/rsc Patch
@trigger.dev/schema-to-json Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch
trigger.dev Patch
@internal/cache Patch
@internal/clickhouse Patch
@internal/redis Patch
@internal/replication Patch
@internal/run-engine Patch
@internal/schedule-engine Patch
@internal/testcontainers Patch
@internal/tracing Patch
@internal/zod-worker Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 24, 2025

Walkthrough

Adds a comprehensive realtime streams subsystem: database schema and Prisma migrations (TaskRun fields), server implementations for Redis v1 and S2 v2 realtime streams, new types/interfaces, SDK APIs and client flags for stream creation and subscription, worker integrations, stream writers for v1/v2, manager/no-op implementations, React UI components and hooks for viewing streams, route handlers for stream endpoints (including loader/action changes), environment variables and nginx/toxiproxy configs, seed script updates, and many tests for stream writers and Redis behavior. Also removes the relay-based realtime module and updates related wiring across services and packages.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

  • Heterogeneous, large-scope changes across backend, SDK, runtime, and UI.
  • Areas needing focused review:
    • Retry/backoff, resume-from-chunk, and buffering logic in StreamsWriterV1/V2 and MetadataStream.
    • RedisRealtimeStreams and S2RealtimeStreams implementations and their initialization/headers.
    • Removal of relayRealtimeStreams and migration of responsibilities to new code paths (v1StreamsGlobal).
    • API surface changes in packages/core and trigger-sdk (client config, future flags, new exports).
    • Database migrations and TaskRun schema additions plus propagation through run engine and trigger flows.
    • New public types/interfaces and their compatibility (realtimeStreams manager, streams API, waitUntil signature changes).
    • Routes that switched from ingestion to streaming (loader/action changes) and SSE Last-Event-ID/timeout handling.
    • Tests for StreamsWriterV1 and RedisRealtimeStreams (ensure test assumptions match implementations).

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The PR description provides excellent and comprehensive detail about what's being changed, including a detailed limits comparison table, reliability improvements, SDK refactoring with code examples, and new React hooks. However, it does not follow the required template structure. The description is missing the formal "Closes #" statement, the ✅ Checklist section with steps verifying contribution guide compliance and testing, an explicit Testing section describing test steps, a formal Changelog section, and Screenshots/media references (though a video is mentioned inline). While the substantive content quality is high, the template structure and required sections are absent. To meet the repository's PR description requirements, the author should restructure the description to include: (1) a formal issue reference using "Closes #" at the top, (2) the complete ✅ Checklist with all items checked or explicitly noted, (3) a Testing section explicitly describing the steps taken to validate this change, and (4) a formal Changelog section summarizing what has changed. The existing detailed content about limits, SDK improvements, and dashboard features can be reorganized under these required sections to satisfy the template while preserving the comprehensive context already provided.
Docstring Coverage ⚠️ Warning Docstring coverage is 20.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The PR title "feat(realtime): Realtime streams 2.0" directly relates to the primary objective of this changeset, which is to introduce a new Realtime streams 2.0 backend powered by S2. The title uses conventional commit format with a clear scope (realtime) and descriptive message that accurately captures the main feature being introduced. The title is concise and specific enough that a teammate scanning the git history would understand this changeset introduces a major upgrade to the realtime streams system with improved reliability, expanded limits, and new SDK APIs.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/realtime-streams-2

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 442979f and 2246e6e.

📒 Files selected for processing (1)
  • packages/core/test/runStream.test.ts (2 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • packages/core/test/runStream.test.ts
**/*.test.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Our tests are all vitest

Files:

  • packages/core/test/runStream.test.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • packages/core/test/runStream.test.ts
**/*.{test,spec}.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

**/*.{test,spec}.{ts,tsx,js,jsx}: Unit tests must use Vitest
Tests should avoid mocks or stubs and use helpers from @internal/testcontainers when Redis or Postgres are needed
Test files live beside the files under test and should use descriptive describe and it blocks

Files:

  • packages/core/test/runStream.test.ts
🧬 Code graph analysis (1)
packages/core/test/runStream.test.ts (2)
packages/core/src/v3/apiClient/runStream.ts (1)
  • SSEStreamPart (175-179)
packages/core/src/v3/apiClient/index.ts (1)
  • SSEStreamPart (146-146)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (2)
packages/core/test/runStream.test.ts (2)

15-28: LGTM! Test implementation correctly updated for SSEStreamPart structure.

The test implementation properly wraps chunks in the new SSEStreamPart format with sequential IDs and incremented timestamps. The artificial timestamp generation (Date.now() + i) ensures unique, ordered timestamps for test scenarios, which is appropriate for unit testing.


1-513: Excellent test coverage for the streaming functionality.

The test suite comprehensively covers the RunSubscription streaming behavior, including edge cases like stream reuse and multiple simultaneous streams. The test structure is clean, follows Vitest best practices, and aligns with the coding guidelines (no mocks, descriptive blocks, proper async handling).


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 27

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (8)
apps/webapp/app/models/organization.server.ts (2)

55-69: Remove unused features variable.

Since v3Enabled is now hardcoded to true, the features variable computed on line 55 is no longer used and should be removed.

Apply this diff to remove the dead code:

- const features = featuresForUrl(new URL(env.APP_ORIGIN));
-
  const organization = await prisma.organization.create({
    data: {
      title,
      slug: uniqueOrgSlug,
      companySize,
      maximumConcurrencyLimit: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT,
      members: {
        create: {
          userId: userId,
          role: "ADMIN",
        },
      },
      v3Enabled: true,
    },

30-30: Use env export instead of direct process.env access.

This line directly accesses process.env.BLOCKED_USERS, which violates the coding guideline for this file path.

As per coding guidelines

Update env.server.ts to include BLOCKED_USERS if needed, then access it via the env export:

- if (typeof process.env.BLOCKED_USERS === "string" && process.env.BLOCKED_USERS.includes(userId)) {
+ if (typeof env.BLOCKED_USERS === "string" && env.BLOCKED_USERS.includes(userId)) {
packages/trigger-sdk/src/v3/index.ts (1)

19-23: Fix type re-export ordering for Context (compile error risk).

export type { Context }; appears before the import type { Context } ..., which can fail. Prefer a direct re-export.

Apply this diff:

-export type { Context };
-
-import type { Context } from "./shared.js";
+export type { Context } from "./shared.js";

The new export * from "./streams.js"; is good.

packages/core/src/v3/runMetadata/manager.ts (1)

318-328: Early return may break when value is a ReadableStream. Convert to AsyncIterable.

You cast value to AsyncIterable, but ReadableStream isn’t AsyncIterable. Wrap it to avoid runtime issues when runId is undefined.

Apply:

-    const $value = value as AsyncIterable<T>;
-
-    if (!this.runId) {
-      return $value;
-    }
+    const toAsyncIterable = (input: AsyncIterable<T> | ReadableStream<T>): AsyncIterable<T> => {
+      if (Symbol.asyncIterator in (input as any)) {
+        return input as AsyncIterable<T>;
+      }
+      const stream = input as ReadableStream<T>;
+      return {
+        async *[Symbol.asyncIterator]() {
+          const reader = stream.getReader();
+          try {
+            while (true) {
+              const { done, value } = await reader.read();
+              if (done) return;
+              yield value as T;
+            }
+          } finally {
+            try {
+              reader.releaseLock();
+            } catch {}
+          }
+        },
+      };
+    };
+
+    if (!this.runId) {
+      return toAsyncIterable(value);
+    }

If you already have a shared utility for this conversion, prefer importing it instead.

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)

276-313: Ingest must split by newline to preserve chunk boundaries and correct chunkIndex

reader.read() returns arbitrary string segments, not line-delimited chunks. Writing each read as one Redis entry corrupts chunk boundaries and breaks resume (chunkIndex). Accumulate and split on '\n', increment chunkIndex per line.

-      const textStream = stream.pipeThrough(new TextDecoderStream());
-      const reader = textStream.getReader();
-
-      while (true) {
-        const { done, value } = await reader.read();
-
-        if (done || !value) {
-          break;
-        }
-
-        // Write each chunk with its index and clientId
-        this.logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", {
-          streamKey,
-          runId,
-          clientId,
-          chunkIndex: currentChunkIndex,
-          resumeFromChunk: startChunk,
-          value,
-        });
-
-        await redis.xadd(
-          streamKey,
-          "MAXLEN",
-          "~",
-          String(env.REALTIME_STREAM_MAX_LENGTH),
-          "*",
-          "clientId",
-          clientId,
-          "chunkIndex",
-          currentChunkIndex.toString(),
-          "data",
-          value
-        );
-
-        currentChunkIndex++;
-      }
+      const textStream = stream.pipeThrough(new TextDecoderStream());
+      const reader = textStream.getReader();
+      let buffer = "";
+
+      while (true) {
+        const { done, value } = await reader.read();
+        if (done) break;
+        if (value) buffer += value;
+
+        const lines = buffer.split("\n");
+        buffer = lines.pop() ?? "";
+
+        for (const line of lines) {
+          const trimmed = line.trim();
+          if (trimmed.length === 0) continue;
+
+          this.logger.debug("[RedisRealtimeStreams][ingestData] Writing line", {
+            streamKey,
+            runId,
+            clientId,
+            chunkIndex: currentChunkIndex,
+            resumeFromChunk: startChunk,
+          });
+
+          await redis.xadd(
+            streamKey,
+            "MAXLEN",
+            "~",
+            String(env.REALTIME_STREAM_MAX_LENGTH),
+            "*",
+            "clientId",
+            clientId,
+            "chunkIndex",
+            currentChunkIndex.toString(),
+            "data",
+            trimmed
+          );
+
+          currentChunkIndex++;
+        }
+      }
+
+      // Flush any trailing data without newline
+      if (buffer.trim().length > 0) {
+        await redis.xadd(
+          streamKey,
+          "MAXLEN",
+          "~",
+          String(env.REALTIME_STREAM_MAX_LENGTH),
+          "*",
+          "clientId",
+          clientId,
+          "chunkIndex",
+          currentChunkIndex.toString(),
+          "data",
+          buffer
+        );
+        currentChunkIndex++;
+      }
packages/react-hooks/src/hooks/useRealtime.ts (3)

86-98: Fix SWR keying and reset completion when runId changes (prevents mixed/stale state).

  • Keys for run/error/complete omit runId, so switching runId within the same hook instance can mix states and suppress onComplete for subsequent runs.
  • Also reset hasCalledOnCompleteRef and complete when runId changes.

Apply:

-  const { data: run, mutate: mutateRun } = useSWR<RealtimeRun<TTask>>([idKey, "run"], null);
+  const { data: run, mutate: mutateRun } = useSWR<RealtimeRun<TTask>>([idKey, runId, "run"], null);

-  const { data: error = undefined, mutate: setError } = useSWR<undefined | Error>(
-    [idKey, "error"],
+  const { data: error = undefined, mutate: setError } = useSWR<undefined | Error>(
+    [idKey, runId, "error"],
     null
   );

-  const { data: isComplete = false, mutate: setIsComplete } = useSWR<boolean>(
-    [idKey, "complete"],
+  const { data: isComplete = false, mutate: setIsComplete } = useSWR<boolean>(
+    [idKey, runId, "complete"],
     null
   );

And add a small reset effect:

   const hasCalledOnCompleteRef = useRef(false);
+  useEffect(() => {
+    hasCalledOnCompleteRef.current = false;
+    setIsComplete(false);
+    setError(undefined);
+  }, [runId]);

Also applies to: 147-155


232-258: Isolate per-run stream caches and reset on run change.

  • streams, run, error, and complete SWR keys in useRealtimeRunWithStreams omit runId → cross-run mixing.
  • Ensure new run starts with fresh stream buffers and completion state.
-  const { data: streams, mutate: mutateStreams } = useSWR<StreamResults<TStreams>>(
-    [idKey, "streams"],
+  const { data: streams, mutate: mutateStreams } = useSWR<StreamResults<TStreams>>(
+    [idKey, runId, "streams"],
     null,
     { fallbackData: initialStreamsFallback }
   );

-  const { data: run, mutate: mutateRun } = useSWR<RealtimeRun<TTask>>([idKey, "run"], null);
+  const { data: run, mutate: mutateRun } = useSWR<RealtimeRun<TTask>>([idKey, runId, "run"], null);

-  const { data: isComplete = false, mutate: setIsComplete } = useSWR<boolean>(
-    [idKey, "complete"],
+  const { data: isComplete = false, mutate: setIsComplete } = useSWR<boolean>(
+    [idKey, runId, "complete"],
     null
   );

-  const { data: error = undefined, mutate: setError } = useSWR<undefined | Error>(
-    [idKey, "error"],
+  const { data: error = undefined, mutate: setError } = useSWR<undefined | Error>(
+    [idKey, runId, "error"],
     null
   );

Reset when runId changes:

   const hasCalledOnCompleteRef = useRef(false);
+  useEffect(() => {
+    hasCalledOnCompleteRef.current = false;
+    setIsComplete(false);
+    // fresh buffer for new run
+    mutateStreams({} as StreamResults<TStreams>);
+  }, [runId]);

Also applies to: 311-319


408-423: Key SWR caches by tag/batch to avoid cross-subscription bleed.

  • Keys for runs/error in useRealtimeRunsWithTag and useRealtimeBatch omit identifiers (tag/createdAt, batchId).
  • This can merge different subscriptions into one cache.
@@ useRealtimeRunsWithTag
-  const { data: runs, mutate: mutateRuns } = useSWR<RealtimeRun<TTask>[]>([idKey, "run"], null, {
+  const tagKey = Array.isArray(tag) ? [...tag].sort().join("|") : tag;
+  const { data: runs, mutate: mutateRuns } = useSWR<RealtimeRun<TTask>[]>([idKey, "runs", tagKey, options?.createdAt], null, {
     fallbackData: [],
   });
-  const { data: error = undefined, mutate: setError } = useSWR<undefined | Error>(
-    [idKey, "error"],
+  const { data: error = undefined, mutate: setError } = useSWR<undefined | Error>(
+    [idKey, "error", tagKey, options?.createdAt],
     null
   );

@@ useRealtimeBatch
-  const { data: runs, mutate: mutateRuns } = useSWR<RealtimeRun<TTask>[]>([idKey, "run"], null, {
+  const { data: runs, mutate: mutateRuns } = useSWR<RealtimeRun<TTask>[]>([idKey, "runs", batchId], null, {
     fallbackData: [],
   });
-  const { data: error = undefined, mutate: setError } = useSWR<undefined | Error>(
-    [idKey, "error"],
+  const { data: error = undefined, mutate: setError } = useSWR<undefined | Error>(
+    [idKey, "error", batchId],
     null
   );

Also applies to: 506-520

🧹 Nitpick comments (41)
apps/webapp/app/assets/icons/SnakedArrowIcon.tsx (1)

1-20: Extract props type for consistency.

The component is well-structured and follows React best practices for icon components. Consider extracting the props definition to an explicit type to align with project conventions of using explicit types over inline definitions.

-export function SnakedArrowIcon({ className }: { className?: string }) {
+type SnakedArrowIconProps = { className?: string };
+
+export function SnakedArrowIcon({ className }: SnakedArrowIconProps) {
apps/webapp/app/assets/icons/MoveToBottomIcon.tsx (1)

1-27: Consider aligning structure with MoveToTopIcon for consistency.

The implementation is clean and follows the coding guidelines. However, MoveToTopIcon includes a <g clipPath> wrapper and <defs> section that are absent here. While the clipPath appears unnecessary (it defines a 24×24 rectangle matching the viewBox, likely an SVG export artifact), the structural inconsistency between paired components could cause confusion.

Consider either:

  1. Adding the clipPath/defs to match MoveToTopIcon for consistency, or
  2. Removing them from MoveToTopIcon to match this cleaner approach (preferred if they serve no functional purpose).
docker/config/nginx.conf (1)

2-2: Consider adding worker_connections for completeness.

While an empty events block is valid, for a more production-representative config, consider adding worker_connections 1024; or similar.

-events {}
+events {
+  worker_connections 1024;
+}
packages/core/src/v3/schemas/common.ts (2)

342-342: Consider constraining realtimeStreamsVersion to valid values.

The field accepts any string, but the summary indicates specific values like "v1" (default) and "v2" are used to coordinate between Redis-based and S2-based streaming backends. Using a literal union would prevent invalid values and improve type safety.

Apply this diff to constrain the field to known versions:

-      realtimeStreamsVersion: z.string().optional(),
+      realtimeStreamsVersion: z.enum(["v1", "v2"]).optional(),

If additional versions are anticipated, consider defining a shared schema constant like RealtimeStreamsVersion that can be reused across the codebase.


342-342: Add documentation for realtimeStreamsVersion.

The field lacks a JSDoc comment explaining its purpose or valid values. Documenting this would help developers understand when and how to set this field.

Apply this diff to add documentation:

     z.object({
       traceContext: z.record(z.unknown()).optional(),
+      /** Specifies the realtime streaming infrastructure version. Defaults to "v1" (Redis-based). Use "v2" for S2-based streaming. */
       realtimeStreamsVersion: z.string().optional(),
     })
packages/trigger-sdk/src/v3/streams.ts (2)

113-155: Harden readStream(): always close span; fix falsy startIndex check.

  • If apiClient.fetchStream() rejects before invoking callbacks, the span won’t end.
  • startIndex uses a falsy check, so 0 is ignored.

Apply this diff:

-  return await apiClient.fetchStream(runId, key, {
-    signal: options?.signal,
-    timeoutInSeconds: options?.timeoutInSeconds ?? 60,
-    lastEventId: options?.startIndex ? (options.startIndex - 1).toString() : undefined,
-    onComplete: () => {
-      span.end();
-    },
-    onError: (error) => {
-      span.recordException(error);
-      span.setStatus({ code: SpanStatusCode.ERROR });
-      span.end();
-    },
-  });
+  try {
+    return await apiClient.fetchStream(runId, key, {
+      signal: options?.signal,
+      timeoutInSeconds: options?.timeoutInSeconds ?? 60,
+      lastEventId:
+        options?.startIndex != null ? (options.startIndex - 1).toString() : undefined,
+      onComplete: () => {
+        span.end();
+      },
+      onError: (error) => {
+        span.recordException(error);
+        span.setStatus({ code: SpanStatusCode.ERROR });
+        span.end();
+      },
+    });
+  } catch (error) {
+    span.recordException(error as any);
+    span.setStatus({ code: SpanStatusCode.ERROR });
+    span.end();
+    throw error;
+  }

1-13: Type‑only import for AsyncIterableStream.

Import AsyncIterableStream as a type to avoid bundling it at runtime.

-  AsyncIterableStream,
+  type AsyncIterableStream,
packages/trigger-sdk/src/v3/shared.ts (1)

1299-1308: Align public wrapper requestOptions types with internals (API parity).

Internals accept TriggerApiRequestOptions (with clientConfig), but public wrappers still type requestOptions as ApiRequestOptions:

  • triggerAndWait(...) (Lines 433-455)
  • batchTriggerAndWait(...) (Lines 481-492)

This prevents callers from passing clientConfig to those wrappers.

Apply these diffs:

-export function triggerAndWait<TTask extends AnyTask>(
+export function triggerAndWait<TTask extends AnyTask>(
   id: TaskIdentifier<TTask>,
   payload: TaskPayload<TTask>,
   options?: TriggerAndWaitOptions,
-  requestOptions?: ApiRequestOptions
+  requestOptions?: TriggerApiRequestOptions
 ): TaskRunPromise<TaskIdentifier<TTask>, TaskOutput<TTask>> {
 export async function batchTriggerAndWait<TTask extends AnyTask>(
   id: TaskIdentifier<TTask>,
   items: Array<BatchItem<TaskPayload<TTask>>>,
   options?: BatchTriggerAndWaitOptions,
-  requestOptions?: ApiRequestOptions
+  requestOptions?: TriggerApiRequestOptions
 ): Promise<BatchResult<TaskIdentifier<TTask>, TaskOutput<TTask>>> {

Would you like a quick repo script to list any remaining exports that still use ApiRequestOptions where clientConfig should be supported?

Also applies to: 1372-1380

packages/core/src/v3/runMetadata/types.ts (1)

33-35: Make StreamInstance generic and iterable (aligns with usage).

Today it only guarantees wait(). Most call sites will also need to iterate. Recommend composing the iterable shape and making it generic.

Apply:

-export interface StreamInstance {
-  wait(): Promise<void>;
-}
+export interface StreamInstance<T = unknown> extends AsyncIterable<T> {
+  wait(): Promise<void>;
+}

Follow-up: update implementers to implements StreamInstance<T>.

apps/webapp/app/env.server.ts (2)

201-201: Guard against zero/negative timeout.

Use a minimum to avoid disabled/negative timeouts from misconfiguration.

-REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS: z.coerce.number().int().default(60000), // 1 minute
+REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS: z.coerce.number().int().min(1).default(60000), // 1 minute

1206-1215: S2 config: add basic numeric mins and a refine to ensure required vars when v2 is active.

  • Prevent accidental zero values; optionally enforce presence of BASIN/ACCESS_TOKEN when REALTIME_STREAM_VERSION === "v2".
-REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
-REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
-REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
-WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),
+REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().min(1).default(100),
+REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().min(0).default(10),
+REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().min(1).default(60),
+WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().min(1).default(600_000),

Option (near the schema end): add a cross-field check

.superRefine((v, ctx) => {
  if (v.REALTIME_STREAM_VERSION === "v2") {
    if (!v.REALTIME_STREAMS_S2_BASIN) ctx.addIssue({ code: z.ZodIssueCode.custom, path:["REALTIME_STREAMS_S2_BASIN"], message:"required when REALTIME_STREAM_VERSION=v2" });
    if (!v.REALTIME_STREAMS_S2_ACCESS_TOKEN) ctx.addIssue({ code: z.ZodIssueCode.custom, path:["REALTIME_STREAMS_S2_ACCESS_TOKEN"], message:"required when REALTIME_STREAM_VERSION=v2" });
  }
})
packages/core/src/v3/runMetadata/s2MetadataStream.ts (3)

56-56: Type conformance: implement the generic StreamInstance.

If StreamInstance becomes generic and iterable, update the implements clause.

-export class S2MetadataStream<T = any> implements StreamInstance {
+export class S2MetadataStream<T = any> implements StreamInstance<T> {

72-73: Use isomorphic timer type (avoid NodeJS.Timeout).

Prevents TS friction in isomorphic builds.

-  private flushInterval: NodeJS.Timeout | null = null;
+  private flushInterval: ReturnType<typeof setInterval> | null = null;

175-218: Retry scope and recursion.

retryCount is a class field shared across flushes; recursion re-enters flush() and adds more promises. Prefer local attempt counting with a loop to avoid cross-flush interference and deep recursion. Also consider a maxBatchSize to bound memory before flush.

Would you like me to draft a non-recursive, per-flush retry loop and an optional maxBatchSize option?

apps/webapp/app/services/realtime/types.ts (1)

3-7: All implementers and call sites have been updated; style preference (type over interface) remains valid.

Breaking changes verified:

  • S2RealtimeStreams and RedisRealtimeStreams both implement updated signatures with clientId, optional resumeFromChunk, and new getLastChunkIndex method
  • Routes and tests use streamResponse(request, runId, streamId, signal, options) correctly
  • SSE lastEventId mapping verified: "Last-Event-ID" header properly used across packages/core and implemented in s2realtimeStreams and redisRealtimeStreams

Style: Per guidelines, convert to type aliases:

-export interface StreamIngestor {
+export type StreamIngestor = {
   initializeStream(runId: string, streamId: string): Promise<{ responseHeaders?: Record<string, string> }>;
   ingestData(stream: ReadableStream<Uint8Array>, runId: string, streamId: string, clientId: string, resumeFromChunk?: number): Promise<Response>;
   getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number>;
-}
+}

-export interface StreamResponder {
+export type StreamResponder = {
   streamResponse(request: Request, runId: string, streamId: string, signal: AbortSignal, options?: StreamResponseOptions): Promise<Response>;
-}
+}
apps/webapp/app/services/realtime/s2realtimeStreams.server.ts (2)

4-4: Prefer isomorphic Web Crypto API over Node.js crypto module.

The coding guidelines specify preferring isomorphic code over Node.js specific imports. Replace randomUUID from node:crypto with the Web Crypto API's crypto.randomUUID() which works in both Node.js and browser environments.

Apply this diff:

-import { randomUUID } from "node:crypto";
+// Use Web Crypto API (isomorphic)

Then update line 73:

-    const id = randomUUID();
+    const id = crypto.randomUUID();

Based on coding guidelines.


200-237: Remove unused method or clarify its purpose.

The s2ReadOnce method is defined but never called within this file. If it's intended for future use or external consumption, consider adding a comment explaining its purpose. Otherwise, remove it to avoid code clutter.

Additionally, line 213 contains a citation artifact ":contentReference[oaicite:9]{index=9}" that should be removed from the comment.

If the method is not needed, apply this diff:

-  private async s2ReadOnce(
-    stream: string,
-    opts: {
-      seq_num?: number;
-      timestamp?: number;
-      tail_offset?: number;
-      clamp?: boolean;
-      count?: number;
-      bytes?: number;
-      until?: number;
-      wait?: number;
-    }
-  ): Promise<S2ReadResponse> {
-    // GET /v1/streams/{stream}/records?... (supports wait= for long-poll; linearizable reads). :contentReference[oaicite:9]{index=9}
-    const qs = new URLSearchParams();
-    if (opts.seq_num != null) qs.set("seq_num", String(opts.seq_num));
-    if (opts.timestamp != null) qs.set("timestamp", String(opts.timestamp));
-    if (opts.tail_offset != null) qs.set("tail_offset", String(opts.tail_offset));
-    if (opts.clamp != null) qs.set("clamp", String(opts.clamp));
-    if (opts.count != null) qs.set("count", String(opts.count));
-    if (opts.bytes != null) qs.set("bytes", String(opts.bytes));
-    if (opts.until != null) qs.set("until", String(opts.until));
-    if (opts.wait != null) qs.set("wait", String(opts.wait));
-
-    const res = await fetch(`${this.baseUrl}/streams/${encodeURIComponent(stream)}/records?${qs}`, {
-      method: "GET",
-      headers: {
-        Authorization: `Bearer ${this.token}`,
-        Accept: "application/json",
-        "S2-Format": "raw",
-      },
-    });
-    if (!res.ok) {
-      const text = await res.text().catch(() => "");
-      throw new Error(`S2 read failed: ${res.status} ${res.statusText} ${text}`);
-    }
-    return (await res.json()) as S2ReadResponse;
-  }

Or if keeping it, update the comment on line 213:

-    // GET /v1/streams/{stream}/records?... (supports wait= for long-poll; linearizable reads). :contentReference[oaicite:9]{index=9}
+    // GET /v1/streams/{stream}/records?... (supports wait= for long-poll; linearizable reads)
internal-packages/run-engine/src/engine/index.ts (1)

339-396: Plumbed realtimeStreamsVersion looks good; consider narrowing the type.

The propagation into TriggerParams and TaskRun.create is correct. To prevent typos at compile time, narrow TriggerParams.realtimeStreamsVersion to a string union (e.g., "v1" | "v2") and let Prisma’s default "v1" apply when undefined.

Also applies to: 471-476

packages/cli-v3/src/entryPoints/dev-run-worker.ts (1)

144-151: Realtime manager initialization looks correct.

Base URL selection and debug flag wiring are fine (the final ?? false is redundant but harmless).

-  (getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ??
-    false
+  ["1", "true"].includes((getEnvVar("TRIGGER_STREAMS_DEBUG") ?? "").toLowerCase())
packages/core/src/v3/apiClientManager/index.ts (1)

101-113: Update auth error copy to mention TRIGGER_ACCESS_TOKEN

Messages still reference TRIGGER_SECRET_KEY only. Recommend including TRIGGER_ACCESS_TOKEN (preferred) to reduce support churn. Example: “Set TRIGGER_API_URL and TRIGGER_ACCESS_TOKEN (or TRIGGER_SECRET_KEY).”

apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts (1)

425-445: Stricter type guards for entityId/entityMetadata; tolerate missing id

Current code casts entityId/entityMetadata without verifying type; mapping block also requires entityId to be string, dropping metadata otherwise.

Apply:

@@
-  ): { entityType: string; entityId?: string; entityMetadata?: string } | undefined {
+  ): { entityType: string; entityId?: string; entityMetadata?: string } | undefined {
@@
-    const entityType = attributes[SemanticInternalAttributes.ENTITY_TYPE];
-    const entityId = attributes[SemanticInternalAttributes.ENTITY_ID];
-    const entityMetadata = attributes[SemanticInternalAttributes.ENTITY_METADATA];
+    const entityType = attributes[SemanticInternalAttributes.ENTITY_TYPE];
+    const rawEntityId = attributes[SemanticInternalAttributes.ENTITY_ID];
+    const rawEntityMetadata = attributes[SemanticInternalAttributes.ENTITY_METADATA];
@@
-    return {
-      entityType,
-      entityId: entityId as string | undefined,
-      entityMetadata: entityMetadata as string | undefined,
-    };
+    return {
+      entityType,
+      entityId: typeof rawEntityId === "string" ? rawEntityId : undefined,
+      entityMetadata: typeof rawEntityMetadata === "string" ? rawEntityMetadata : undefined,
+    };

And in span detail mapping, allow entity without id (keep metadata if provided):

@@
-      if (
+      if (
         parsedMetadata &&
         "entity" in parsedMetadata &&
         typeof parsedMetadata.entity === "object" &&
         parsedMetadata.entity &&
         "entityType" in parsedMetadata.entity &&
-        typeof parsedMetadata.entity.entityType === "string" &&
-        "entityId" in parsedMetadata.entity &&
-        typeof parsedMetadata.entity.entityId === "string"
+        typeof parsedMetadata.entity.entityType === "string"
       ) {
         span.entity = {
-          id: parsedMetadata.entity.entityId,
+          id:
+            "entityId" in parsedMetadata.entity &&
+            typeof parsedMetadata.entity.entityId === "string"
+              ? parsedMetadata.entity.entityId
+              : undefined,
           type: parsedMetadata.entity.entityType,
           metadata:
             "entityMetadata" in parsedMetadata.entity &&
             parsedMetadata.entity.entityMetadata &&
             typeof parsedMetadata.entity.entityMetadata === "string"
               ? parsedMetadata.entity.entityMetadata
               : undefined,
         };
       }

This improves resilience without changing persisted shapes. [Based on learnings]

Also applies to: 1109-1110, 1149-1155

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx (1)

83-84: Avoid importing components from route modules; move RealtimeStreamViewer to shared component

Importing from another route binds route bundles together and risks subtle loader side effects. Extract RealtimeStreamViewer into a shared component (e.g., components/runs/RealtimeStreamViewer.tsx) and import from there.

Example:

- import { RealtimeStreamViewer } from "../resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route";
+ import { RealtimeStreamViewer } from "~/components/runs/RealtimeStreamViewer";

No behavior change; improves bundling and maintainability. [As per coding guidelines]

Also applies to: 1147-1154

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts (1)

36-37: Threading looks good; consider stricter schema/coercion for version

Propagation to logs and TriggerTaskService is correct.

To reduce invalid inputs, consider constraining/normalizing the header:

-  "x-trigger-realtime-streams-version": z.string().nullish(),
+  "x-trigger-realtime-streams-version": z
+    .enum(["1", "2"]) // or ["v1","v2"]
+    .optional()
+    .nullable(),

Or coerce/trim:

- const { ..., "x-trigger-realtime-streams-version": realtimeStreamsVersion } = headers;
+ const { ..., "x-trigger-realtime-streams-version": realtimeStreamsVersionRaw } = headers;
+ const realtimeStreamsVersion =
+   typeof realtimeStreamsVersionRaw === "string"
+     ? realtimeStreamsVersionRaw.trim()
+     : undefined;

Also applies to: 67-68, 114-114, 129-130

apps/webapp/test/redisRealtimeStreams.test.ts (1)

235-261: Deduplicate SSE parsing in tests with a small helper.

Multiple tests reimplement the same parsing; extract to a shared helper for clarity and maintenance.

Example helper (place at top of file or a local test util):

async function readSseEvents(
  stream: ReadableStream<Uint8Array>,
  maxEvents: number
): Promise<Array<{ id?: string; data?: string }>> {
  const reader = stream.getReader();
  const decoder = new TextDecoder();
  const out: Array<{ id?: string; data?: string }> = [];
  try {
    while (out.length < maxEvents) {
      const { value, done } = await reader.read();
      if (done) break;
      if (!value) continue;
      const text = decoder.decode(value);
      for (const evt of text.split("\n\n").filter(Boolean)) {
        const entry: { id?: string; data?: string } = {};
        for (const line of evt.split("\n")) {
          if (line.startsWith("id: ")) entry.id = line.slice(4).trim();
          if (line.startsWith("data: ")) entry.data = line.slice(6).trim();
        }
        if (entry.id || entry.data) out.push(entry);
      }
    }
  } finally {
    reader.releaseLock();
  }
  return out;
}

Also applies to: 457-481, 1109-1126, 1199-1231, 1309-1331, 1379-1401, 1411-1414

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (1)

57-87: Avoid duplicate streamId entries when creating streams.

Using push can add duplicates if PUT is retried. Prefer idempotency: check presence or use a write that guarantees uniqueness.

One approach:

-      const updatedRun = await prisma.taskRun.update({
+      const updatedRun = await prisma.taskRun.update({
         where: {
           friendlyId: targetId,
           runtimeEnvironmentId: authentication.environment.id,
         },
-        data: {
-          realtimeStreams: {
-            push: params.streamId,
-          },
-        },
+        data: {
+          realtimeStreams: {
+            set: prisma.raw`ARRAY(SELECT DISTINCT UNNEST(realtime_streams || ${params.streamId}))`,
+          },
+        },

Or read-modify-write with a unique Set in application code. Please choose the variant consistent with your DB/provider.

packages/core/src/v3/runMetadata/manager.ts (1)

305-307: Provide a clearer migration error message.

Make it actionable for callers adopting the new API.

-    throw new Error("This needs to use the new realtime streams API");
+    throw new Error(
+      "fetchStream has been removed. Use realtimeStreams.append(key, source, { target?, signal? }) and consume the returned .stream instead."
+    );
packages/core/src/v3/realtimeStreams/manager.ts (1)

21-26: Map key may collide across runs

Using key alone risks collisions if multiple streams with the same key are appended within a run or future reuse. Consider ${runId}:${key} to avoid accidental overwrites.

packages/core/test/metadataStream.test.ts (1)

561-607: Reduce flakiness in backoff-based timing assertions

The elapsed >= ~5s assertion can be flaky under load. Prefer asserting retry count and order; if timing is required, widen tolerance or control timers with vi.useFakeTimers().

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)

16-21: Simplify types and remove unused variant

StreamChunk defines a "legacy-data" variant that is never enqueued. Either emit it where applicable or remove the variant to reduce cognitive load.

Also applies to: 205-235

packages/cli-v3/src/entryPoints/managed-run-worker.ts (2)

312-314: Pass the same wait‑until timeout into waitForAllStreams()

Avoids having two different timeouts (manager default 60s vs TRIGGER_WAIT_UNTIL_TIMEOUT_MS).

-const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000);
+const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000);
 const waitUntilManager = new StandardWaitUntilManager(waitUntilTimeoutInMs);
@@
   waitUntil.register({
-    requiresResolving: () => standardRealtimeStreamsManager.hasActiveStreams(),
-    promise: () => standardRealtimeStreamsManager.waitForAllStreams(),
+    requiresResolving: () => standardRealtimeStreamsManager.hasActiveStreams(),
+    promise: () => standardRealtimeStreamsManager.waitForAllStreams(waitUntilTimeoutInMs),
   });

Also applies to: 143-145


135-141: Tighten TRIGGER_STREAMS_DEBUG parsing

The nullish coalescing after a boolean OR is redundant. Slightly clearer parse:

-  (getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ??
-    false
+  ["1", "true"].includes((getEnvVar("TRIGGER_STREAMS_DEBUG") ?? "").toLowerCase())
packages/core/src/v3/runMetadata/metadataStream.ts (2)

211-215: Use a one‑shot abort listener to prevent listener accumulation

Multiple retries can add multiple listeners.

-      if (this.options.signal) {
-        this.options.signal.addEventListener("abort", () => {
-          req.destroy(new Error("Request aborted"));
-        });
-      }
+      if (this.options.signal) {
+        this.options.signal.addEventListener(
+          "abort",
+          () => req.destroy(new Error("Request aborted")),
+          { once: true }
+        );
+      }

316-322: Minor cleanups: unused helpers/fields and guideline alignment

  • getChunksFromBuffer is unused; remove or wire it up.
  • currentChunkIndex appears unused externally.
  • Consider consolidating node:http/https to isomorphic fetch for consistency with guidelines.

Would you like me to submit a follow‑up PR that:

  • removes dead code (getChunksFromBuffer/currentChunkIndex),
  • switches to fetch() with AbortSignal and streaming body?

Also applies to: 337-349, 27-27, 33-33

packages/core/src/v3/apiClient/runStream.ts (1)

501-521: Handle pipeTo rejections and aborts to avoid unhandled promise rejections

The background pipeline isn’t awaited; errors will surface as unhandled rejections. Also, writes may race after outer controller closes.

-                subscription.subscribe().then((stream) => {
-                  stream
+                subscription.subscribe().then((stream) => {
+                  const writable = new WritableStream({
+                    write(chunk) {
+                      // Guard against late writes after close/abort
+                      if (!controller.desiredSize && this.options?.abortController?.signal?.aborted) {
+                        return;
+                      }
+                      controller.enqueue(chunk);
+                    },
+                  });
+                  stream
                     .pipeThrough(
                       new TransformStream({
                         transform(chunk, controller) {
                           controller.enqueue({
                             type: streamKey,
                             chunk: chunk.chunk as TStreams[typeof streamKey],
                             run,
                           });
                         },
                       })
                     )
-                    .pipeTo(
-                      new WritableStream({
-                        write(chunk) {
-                          controller.enqueue(chunk);
-                        },
-                      })
-                    );
-                });
+                    .pipeTo(writable)
+                    .catch((err) => {
+                      // Suppress if aborting; otherwise log
+                      if (!this.options.abortController.signal.aborted) {
+                        console.warn("Stream pipeline error:", err);
+                      }
+                    });
+                }).catch(() => {});
apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx (3)

363-371: Use a stable key for list items

Index keys cause unnecessary re‑renders and bugs on reconnection. Prefer the SSE id.

-            {chunks.map((chunk, index) => (
+            {chunks.map((chunk, index) => (
               <StreamChunkLine
-                key={index}
+                key={chunk.id ?? index}
                 chunk={chunk}
                 lineNumber={firstLineNumber + index}
                 maxLineNumberWidth={maxLineNumberWidth}
               />
             ))}

131-142: Clipboard write should be awaited and errors handled

Clipboard can fail (permissions, insecure context). Provide feedback.

-  const onCopied = useCallback(
+  const onCopied = useCallback(
     (event: React.MouseEvent<HTMLButtonElement>) => {
       event.preventDefault();
       event.stopPropagation();
-      navigator.clipboard.writeText(getCompactText());
-      setCopied(true);
-      setTimeout(() => {
-        setCopied(false);
-      }, 1500);
+      (async () => {
+        try {
+          await navigator.clipboard.writeText(getCompactText());
+          setCopied(true);
+          setTimeout(() => setCopied(false), 1500);
+        } catch (e) {
+          console.error("Clipboard error", e);
+        }
+      })();
     },
     [getCompactText]
   );

447-502: Optional: deduplicate chunks by id to handle reconnect replays

Reconnects may replay overlapping events. Avoid duplicates.

   useEffect(() => {
     const abortController = new AbortController();
-    let reader: ReadableStreamDefaultReader<SSEStreamPart<unknown>> | null = null;
+    let reader: ReadableStreamDefaultReader<SSEStreamPart<unknown>> | null = null;
+    const seen = new Set<string>();
@@
-          if (value !== undefined) {
-            setChunks((prev) => [
-              ...prev,
-              {
-                id: value.id,
-                data: value.chunk,
-                timestamp: value.timestamp,
-              },
-            ]);
-          }
+          if (value !== undefined && !seen.has(value.id)) {
+            seen.add(value.id);
+            setChunks((prev) => [
+              ...prev,
+              { id: value.id, data: value.chunk, timestamp: value.timestamp },
+            ]);
+          }
packages/react-hooks/src/hooks/useRealtime.ts (4)

860-882: Avoid race conditions in throttled flushes by updating refs alongside mutate.

  • Current code builds next state from existing*Ref.current but doesn’t update the ref, relying on SWR to propagate. Under tight flushes, this can cause lost or duplicated chunks.
@@ processRealtimeRunWithStreams onFlush
-    // Apply all updates
+    // Apply all updates
     for (const [type, chunks] of Object.entries(updatesByType)) {
-      // @ts-ignore
-      nextStreamData[type] = [...(existingDataRef.current[type] || []), ...chunks];
+      // @ts-expect-error type key narrowing
+      nextStreamData[type] = [...(existingDataRef.current[type] || []), ...chunks];
     }
-
-    mutateStreamData(nextStreamData);
+    // keep ref in sync to avoid re-reading stale buffers on next flush
+    existingDataRef.current = nextStreamData;
+    mutateStreamData(nextStreamData);
@@ processRealtimeStream throttle flush
-    const streamQueue = createThrottledQueue<TPart>(async (parts) => {
-      mutatePartsData([...existingPartsRef.current, ...parts]);
-    }, throttleInMs);
+    const streamQueue = createThrottledQueue<TPart>(async (parts) => {
+      const next = [...existingPartsRef.current, ...parts];
+      existingPartsRef.current = next;
+      mutatePartsData(next);
+    }, throttleInMs);

Also applies to: 939-941


127-128: Minor cleanup: simplify defaulting and drop no-op .finally.

  • Use nullish coalescing for stopOnCompletion.
  • Replace .finally(() => {}) with void triggerRequest().
-        typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true
+        options?.stopOnCompletion ?? true
-    triggerRequest().finally(() => {});
+    void triggerRequest();

Also applies to: 290-293, 166-171, 330-335, 474-479, 571-576, 746-751


855-858: Tighten types to remove @ts-ignore around stream updates.

  • Constrain the stream key to a string and type chunks by key; casts become localized and safer.
-  type StreamUpdate = {
-    type: keyof TStreams;
-    chunk: any;
-  };
+  type StreamUpdate = {
+    type: keyof TStreams & string;
+    chunk: TStreams[keyof TStreams];
+  };
@@
-  const updatesByType = updates.reduce(
-      (acc, update) => {
+  const updatesByType = updates.reduce(
+      (acc: Partial<Record<keyof TStreams, any[]>>, update) => {
         if (!acc[update.type]) {
           acc[update.type] = [];
         }
         acc[update.type].push(update.chunk);
         return acc;
       },
-      {} as Record<keyof TStreams, any[]>
+      {} as Partial<Record<keyof TStreams, any[]>>
     );
@@
-    for (const [type, chunks] of Object.entries(updatesByType)) {
-      // @ts-ignore
-      nextStreamData[type] = [...(existingDataRef.current[type] || []), ...chunks];
+    for (const [type, chunks] of Object.entries(updatesByType) as Array<[keyof TStreams & string, any[]]>) {
+      nextStreamData[type] = [
+        ...(((existingDataRef.current as any)[type] as any[]) || []),
+        ...chunks,
+      ] as any;
     }
@@
-      streamQueue.add({
-        type: part.type,
-        // @ts-ignore
-        chunk: part.chunk,
-      });
+      streamQueue.add({ type: part.type as keyof TStreams & string, chunk: part.chunk as any });

Also applies to: 876-883, 888-893


15-24: Avoid duplicating enabled in options.

UseRealtimeRunOptions extends UseApiClientOptions which already includes enabled?. Remove the duplicate to prevent API ambiguity.

 export type UseRealtimeRunOptions = UseApiClientOptions & {
   id?: string;
-  enabled?: boolean;
   /**
    * The number of milliseconds to throttle the stream updates.
    *
    * @default 16
    */
   throttleInMs?: number;
 };

As per coding guidelines.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/cli-v3/src/entryPoints/managed-run-worker.ts (1)

600-608: Cancel should drain/abort active realtime streams.

Currently CANCEL only aborts the run and flushes usage/tracing/metadata. Active streams may keep running. Explicitly wait (and abort on timeout) to avoid leaks.

Apply:

   CANCEL: async ({ timeoutInMs }) => {
     _isCancelled = true;
     _cancelController.abort("run cancelled");
     await callCancelHooks(timeoutInMs);
+    // Ensure realtime streams finish or get aborted before flushing
+    try {
+      await standardRealtimeStreamsManager.waitForAllStreams(
+        typeof timeoutInMs === "number" ? timeoutInMs : waitUntilTimeoutInMs
+      );
+    } catch (err) {
+      console.warn("Failed waiting for realtime streams on cancel", err);
+    }
     if (_executionMeasurement) {
       usage.stop(_executionMeasurement);
     }
     await flushAll(timeoutInMs);
   },
♻️ Duplicate comments (4)
packages/core/test/streamsWriterV1.test.ts (1)

792-805: Fix generator-without-yield lint in empty stream test

Keep semantics but satisfy the linter with a dead-code yield.

   it("should handle empty stream (no chunks)", async () => {
     async function* generateChunks() {
-      // Yields nothing
-      return;
+      // Yields nothing; keep generator shape for linters
+      if (false) {
+        // never executed; placates useYield rule
+        yield undefined as never;
+      }
     }
packages/core/src/v3/realtimeStreams/streamsWriterV1.ts (1)

237-266: Propagate buffer task failures to the request loop

Check captured buffer error and fail fast instead of looping indefinitely.

       const processStream = async () => {
         try {
           let lastSentIndex = startFromChunk - 1;

           while (true) {
+            // Surface background errors
+            if ((this as any)._bufferError) {
+              req.destroy();
+              throw (this as any)._bufferError;
+            }
             // Send all chunks that are available in buffer
             while (lastSentIndex < this.highestBufferedIndex) {
               lastSentIndex++;
               const chunk = this.ringBuffer.find((c) => c.index === lastSentIndex);

               if (chunk) {
                 const stringified = JSON.stringify(chunk.data) + "\n";
                 req.write(stringified);
                 this.currentChunkIndex = lastSentIndex + 1;
               }
             }

             // If stream is complete and we've sent all buffered chunks, we're done
             if (this.streamComplete && lastSentIndex >= this.highestBufferedIndex) {
               req.end();
               break;
             }

             // Wait a bit for more chunks to be buffered
             await this.delay(10);
           }
         } catch (error) {
           reject(error);
         }
       };
apps/webapp/test/redisRealtimeStreams.test.ts (1)

1360-1366: Resolved: correct streamResponse options object.

Passing { lastEventId } matches the signature and avoids type errors. Looks good.

apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)

50-51: Resolved: no silent fallback to v1.

Throwing on missing S2 config for v2 prevents empty reads and data loss. Good.

🧹 Nitpick comments (20)
packages/core/src/v3/waitUntil/manager.ts (1)

6-6: Constructor implements centralized timeout strategy.

The timeout is now stored at the instance level with a sensible default of 60 seconds, aligning with the PR's centralized timeout approach.

Consider adding validation for the timeout parameter to prevent unexpected behavior:

-  constructor(private timeoutInMs: number = 60_000) {}
+  constructor(private timeoutInMs: number = 60_000) {
+    if (timeoutInMs <= 0) {
+      throw new Error("timeoutInMs must be a positive number");
+    }
+  }
packages/core/src/v3/apiClient/runStream.ts (2)

387-402: Consider URL encoding the path parameters.

The runId and streamKey are inserted directly into the URL path. If these contain special characters, the URL could be malformed.

Apply this diff to add URL encoding:

-    const url = `${options?.baseUrl ?? this.baseUrl}/realtime/v1/streams/${runId}/${streamKey}`;
+    const url = `${options?.baseUrl ?? this.baseUrl}/realtime/v1/streams/${encodeURIComponent(runId)}/${encodeURIComponent(streamKey)}`;

742-757: Consider using Zod for metadata stream validation.

The type checking for metadata.$$streams is verbose and could be simplified with Zod validation, which is heavily used in packages/core per coding guidelines.

Define a schema at the top of the file:

import { z } from "zod";

const MetadataStreamsSchema = z.object({
  $$streams: z.array(z.string()).min(1),
});

Then simplify the function:

 function getStreamsFromRunShape(run: AnyRunShape): string[] {
-  const metadataStreams =
-    run.metadata &&
-    "$$streams" in run.metadata &&
-    Array.isArray(run.metadata.$$streams) &&
-    run.metadata.$$streams.length > 0 &&
-    run.metadata.$$streams.every((stream) => typeof stream === "string")
-      ? run.metadata.$$streams
-      : undefined;
+  if (run.metadata) {
+    const result = MetadataStreamsSchema.safeParse(run.metadata);
+    if (result.success) {
+      return result.data.$$streams;
+    }
+  }

-  if (metadataStreams) {
-    return metadataStreams;
-  }
-
   return run.realtimeStreams;
 }

As per coding guidelines.

packages/core/src/v3/streams/asyncIterableStream.ts (1)

106-135: Harden async-iterable detection and lock release

Use a callable check for Symbol.asyncIterator and guard releaseLock to avoid edge errors. Optional: cancel the reader when iteration stops early.

 export function ensureAsyncIterable<T>(
   input: AsyncIterable<T> | ReadableStream<T>
 ): AsyncIterable<T> {
   // If it's already an AsyncIterable, return it as-is
-  if (Symbol.asyncIterator in input) {
+  if (typeof (input as any)[Symbol.asyncIterator] === "function") {
     return input as AsyncIterable<T>;
   }
@@
       const reader = readableStream.getReader();
       try {
         while (true) {
           const { done, value } = await reader.read();
           if (done) {
             break;
           }
           if (value !== undefined) {
             yield value;
           }
         }
       } finally {
-        reader.releaseLock();
+        try {
+          reader.releaseLock();
+        } catch {}
+        // Optionally: await reader.cancel().catch(() => {});
       }
     },
   };
 }
packages/core/src/v3/realtimeStreams/streamsWriterV2.ts (4)

72-72: Use env-agnostic timer type

Avoid Node-only NodeJS.Timeout for isomorphic compatibility.

-  private flushInterval: NodeJS.Timeout | null = null;
+  private flushInterval: ReturnType<typeof setInterval> | null = null;

160-179: Avoid unbounded prefetch; make tee source backpressure-aware

Reading the entire AsyncIterable in start can buffer unboundedly if consumers stall/abort. Drive reads from pull instead and support cancellation.

-  private createTeeStreams() {
-    const readableSource = new ReadableStream<T>({
-      start: async (controller) => {
-        try {
-          let count = 0;
-
-          for await (const value of this.options.source) {
-            controller.enqueue(value);
-            count++;
-          }
-
-          controller.close();
-        } catch (error) {
-          controller.error(error);
-        }
-      },
-    });
-    return readableSource.tee();
-  }
+  private createTeeStreams() {
+    const self = this;
+    const iterator = this.options.source[Symbol.asyncIterator]();
+    const readableSource = new ReadableStream<T>({
+      async pull(controller) {
+        if (self.aborted) {
+          controller.close();
+          return;
+        }
+        const { value, done } = await iterator.next();
+        if (done) {
+          controller.close();
+          return;
+        }
+        controller.enqueue(value);
+      },
+      async cancel() {
+        await iterator.return?.().catch(() => {});
+      },
+    });
+    return readableSource.tee();
+  }

181-225: Release reader lock after buffering completes

Prevent lingering locks by releasing in a finally block.

   private startBuffering(): void {
@@
-    this.bufferReaderTask = (async () => {
+    this.bufferReaderTask = (async () => {
       try {
         let chunkCount = 0;
@@
         }
-      } catch (error) {
+      } catch (error) {
         this.logError("[S2MetadataStream] Error in buffering task:", error);
         throw error;
-      }
+      } finally {
+        try {
+          this.streamReader?.releaseLock();
+        } catch {}
+        this.streamReader = null;
+      }
     })();

295-296: Prune resolved flush promises to cap memory

Keep flushPromises bounded by removing settled entries.

-    this.flushPromises.push(flushPromise);
+    this.flushPromises.push(flushPromise);
+    flushPromise.finally(() => {
+      this.flushPromises = this.flushPromises.filter((p) => p !== flushPromise);
+    });
packages/core/src/v3/realtimeStreams/manager.ts (1)

57-63: Combine abort signals without relying on AbortSignal.any

When AbortSignal.any is unavailable, the current fallback ignores the user signal. Merge signals manually.

-    const combinedSignal = options?.signal
-      ? AbortSignal.any?.([options.signal, abortController.signal]) ?? abortController.signal
-      : abortController.signal;
+    const combinedSignal = options?.signal
+      ? mergeAbortSignals([options.signal, abortController.signal])
+      : abortController.signal;

Add helper near the bottom of the module:

function mergeAbortSignals(signals: AbortSignal[]): AbortSignal {
  const controller = new AbortController();
  const onAbort = () => controller.abort();
  for (const s of signals) {
    if (s.aborted) return AbortSignal.abort();
    s.addEventListener("abort", onAbort, { once: true });
  }
  return controller.signal;
}
packages/core/src/v3/realtimeStreams/streamsWriterV1.ts (2)

331-341: Minor: exponential backoff caps are good; consider logging retries for observability

Optional: add debug logs on retries/timeouts to aid support.


357-369: Unused helper

getChunksFromBuffer() isn't used. Remove or integrate to avoid dead code.

packages/cli-v3/src/entryPoints/managed-run-worker.ts (1)

135-140: Minor: simplify TRIGGER_STREAMS_DEBUG parsing.

The ?? false is redundant and AbortSignal.any support varies. Prefer explicit boolean parsing.

Apply:

-  (getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ??
-    false
+  ["1", "true"].includes((getEnvVar("TRIGGER_STREAMS_DEBUG") ?? "").toLowerCase())
apps/webapp/test/redisRealtimeStreams.test.ts (2)

240-261: Harden SSE parsing to handle chunk splits.

Decoding and split("\n\n") per read can drop events split across reads. Buffer between reads.

Add a tiny helper and reuse:

function createSSECollector() {
  let buf = "";
  return (chunk: Uint8Array, out: (evt: { id?: string; data?: string }) => void) => {
    buf += new TextDecoder().decode(chunk);
    const parts = buf.split("\n\n");
    buf = parts.pop() ?? "";
    for (const evt of parts) {
      let id: string | undefined;
      let data: string | undefined;
      for (const line of evt.split("\n")) {
        if (line.startsWith("id: ")) id = line.slice(4).trim();
        if (line.startsWith("data: ")) data = line.slice(6);
      }
      out({ id, data });
    }
  };
}

Use per test:

const collect = createSSECollector();
const events: string[] = [];
while (events.length < 3) {
  const { value, done } = await reader.read();
  if (done) break;
  collect(value!, (e) => e.data && events.push(e.data));
}

Also applies to: 462-482, 1199-1231, 1374-1401


1141-1146: Reduce timing brittleness in inactivity test.

The 4–8s window is tied to an internal BLOCK=5000ms. If that constant changes, this test flakes.

  • Assert only that streamClosed is true and at least one BLOCK cycle elapsed, or
  • Expose/read block time from implementation and derive expectations dynamically.
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)

25-52: Memoize S2RealtimeStreams to avoid per-request re-instantiation.

Constructing a new client each call adds overhead. Cache by (basin, streamPrefix).

Example:

+const s2Cache = new Map<string, S2RealtimeStreams>();
 export function getRealtimeStreamInstance(
   environment: AuthenticatedEnvironment,
   streamVersion: string
 ): StreamIngestor & StreamResponder {
   if (streamVersion === "v1") {
     return v1RealtimeStreams;
   } else {
     if (env.REALTIME_STREAMS_S2_BASIN && env.REALTIME_STREAMS_S2_ACCESS_TOKEN) {
-      return new S2RealtimeStreams({
+      const streamPrefix = [
+        "org",
+        environment.organization.id,
+        "env",
+        environment.slug,
+        environment.id,
+      ].join("/");
+      const key = `${env.REALTIME_STREAMS_S2_BASIN}|${streamPrefix}`;
+      const cached = s2Cache.get(key);
+      if (cached) return cached;
+      const inst = new S2RealtimeStreams({
         basin: env.REALTIME_STREAMS_S2_BASIN,
         accessToken: env.REALTIME_STREAMS_S2_ACCESS_TOKEN,
-        streamPrefix: [
-          "org",
-          environment.organization.id,
-          "env",
-          environment.slug,
-          environment.id,
-        ].join("/"),
+        streamPrefix,
         logLevel: env.REALTIME_STREAMS_S2_LOG_LEVEL,
         flushIntervalMs: env.REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS,
         maxRetries: env.REALTIME_STREAMS_S2_MAX_RETRIES,
         s2WaitSeconds: env.REALTIME_STREAMS_S2_WAIT_SECONDS,
       });
+      s2Cache.set(key, inst);
+      return inst;
     }
     throw new Error("Realtime streams v2 is required for this run but S2 configuration is missing");
   }
 }
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (2)

121-147: Use the run’s configured version for HEAD; avoid header-driven selection.

HEAD with v2 will currently throw. Select the instance from the run’s version (and consider returning 501 for v2).

Add version to select and use it:

@@ findResource select:
         select: {
           id: true,
           friendlyId: true,
+          realtimeStreamsVersion: true,
           parentTaskRun: {
@@
-    const clientId = request.headers.get("X-Client-Id") || "default";
-    const streamVersion = request.headers.get("X-Stream-Version") || "v1";
-
-    const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);
+    const clientId = request.headers.get("X-Client-Id") || "default";
+    const streamVersion = run.realtimeStreamsVersion || "v1";
+    if (streamVersion !== "v1") {
+      return new Response("HEAD last-chunk is only available for v1", { status: 501 });
+    }
+    const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v1");

Also applies to: 170-177


57-87: Potential duplication in realtimeStreams array.

Using push can add the same streamId multiple times.

Prefer a unique set semantics (e.g., read existing array, de-dup, then set), or enforce uniqueness at the db layer.

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (3)

245-251: Add SSE-friendly headers (buffering/CORS/version).

Improve proxy compatibility and parity with v2.

Apply:

   return new Response(stream, {
     headers: {
       "Content-Type": "text/event-stream",
       "Cache-Control": "no-cache",
       Connection: "keep-alive",
+      "X-Accel-Buffering": "no",
+      "Access-Control-Expose-Headers": "*",
+      "X-Stream-Version": "v1",
     },
   });

237-241: Use structured logger in cleanup.

Avoid bare console in server code.

Apply:

-  await redis.quit().catch(console.error);
+  await redis.quit().catch((err) =>
+    (/* this */ self ?? { logger }).logger.error(
+      "[RedisRealtimeStreams][streamResponse] Error during cleanup",
+      { err }
+    )
+  );

Note: if self not in scope here, capture const log = this.logger; before and use it.


288-296: Avoid logging raw chunk data.

Raw payloads can be large/sensitive. Log size/indices instead.

Apply:

-        this.logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", {
+        this.logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", {
           streamKey,
           runId,
           clientId,
           chunkIndex: currentChunkIndex,
           resumeFromChunk: startChunk,
-          value,
+          size: value.length,
         });
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3eef678 and 442979f.

⛔ Files ignored due to path filters (1)
  • references/realtime-streams/src/trigger/streams.ts is excluded by !references/**
📒 Files selected for processing (17)
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (2 hunks)
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (6 hunks)
  • apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (2 hunks)
  • apps/webapp/test/redisRealtimeStreams.test.ts (1 hunks)
  • packages/cli-v3/src/entryPoints/dev-run-worker.ts (5 hunks)
  • packages/cli-v3/src/entryPoints/managed-run-worker.ts (5 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (8 hunks)
  • packages/core/src/v3/apiClientManager/index.ts (1 hunks)
  • packages/core/src/v3/realtimeStreams/manager.ts (1 hunks)
  • packages/core/src/v3/realtimeStreams/streamsWriterV1.ts (1 hunks)
  • packages/core/src/v3/realtimeStreams/streamsWriterV2.ts (1 hunks)
  • packages/core/src/v3/realtimeStreams/types.ts (1 hunks)
  • packages/core/src/v3/streams/asyncIterableStream.ts (1 hunks)
  • packages/core/src/v3/waitUntil/manager.ts (2 hunks)
  • packages/core/src/v3/waitUntil/types.ts (1 hunks)
  • packages/core/test/streamsWriterV1.test.ts (1 hunks)
  • packages/trigger-sdk/src/v3/streams.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • packages/core/src/v3/apiClientManager/index.ts
  • packages/core/src/v3/realtimeStreams/types.ts
  • packages/trigger-sdk/src/v3/streams.ts
  • packages/cli-v3/src/entryPoints/dev-run-worker.ts
🧰 Additional context used
📓 Path-based instructions (8)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • packages/core/src/v3/waitUntil/types.ts
  • packages/cli-v3/src/entryPoints/managed-run-worker.ts
  • apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts
  • packages/core/src/v3/streams/asyncIterableStream.ts
  • packages/core/src/v3/waitUntil/manager.ts
  • packages/core/src/v3/realtimeStreams/streamsWriterV1.ts
  • packages/core/src/v3/realtimeStreams/streamsWriterV2.ts
  • packages/core/src/v3/realtimeStreams/manager.ts
  • packages/core/src/v3/apiClient/runStream.ts
  • packages/core/test/streamsWriterV1.test.ts
  • apps/webapp/test/redisRealtimeStreams.test.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • packages/core/src/v3/waitUntil/types.ts
  • apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts
  • packages/core/src/v3/streams/asyncIterableStream.ts
  • packages/core/src/v3/waitUntil/manager.ts
  • packages/core/src/v3/realtimeStreams/streamsWriterV1.ts
  • packages/core/src/v3/realtimeStreams/streamsWriterV2.ts
  • packages/core/src/v3/realtimeStreams/manager.ts
  • packages/core/src/v3/apiClient/runStream.ts
  • packages/core/test/streamsWriterV1.test.ts
  • apps/webapp/test/redisRealtimeStreams.test.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json

Files:

  • apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts
  • apps/webapp/test/redisRealtimeStreams.test.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly

Files:

  • apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
apps/webapp/app/**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead

Files:

  • apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
**/*.test.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Our tests are all vitest

Files:

  • packages/core/test/streamsWriterV1.test.ts
  • apps/webapp/test/redisRealtimeStreams.test.ts
**/*.{test,spec}.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

**/*.{test,spec}.{ts,tsx,js,jsx}: Unit tests must use Vitest
Tests should avoid mocks or stubs and use helpers from @internal/testcontainers when Redis or Postgres are needed
Test files live beside the files under test and should use descriptive describe and it blocks

Files:

  • packages/core/test/streamsWriterV1.test.ts
  • apps/webapp/test/redisRealtimeStreams.test.ts
{apps/webapp/**/__tests__/**/*.{ts,tsx},apps/webapp/**/*.{test,spec}.{ts,tsx}}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

{apps/webapp/**/__tests__/**/*.{ts,tsx},apps/webapp/**/*.{test,spec}.{ts,tsx}}: Do not import app/env.server.ts into tests, either directly or indirectly
Tests should only import classes/functions from files under apps/webapp/app/**/*.ts

Files:

  • apps/webapp/test/redisRealtimeStreams.test.ts
🧬 Code graph analysis (11)
packages/core/src/v3/waitUntil/types.ts (1)
packages/core/src/v3/waitUntil/index.ts (1)
  • WaitUntilManager (39-41)
packages/cli-v3/src/entryPoints/managed-run-worker.ts (5)
packages/core/src/v3/apiClientManager-api.ts (1)
  • apiClientManager (5-5)
packages/core/src/v3/run-metadata-api.ts (1)
  • runMetadata (5-5)
packages/core/src/v3/realtimeStreams/manager.ts (1)
  • StandardRealtimeStreamsManager (16-139)
packages/core/src/v3/realtime-streams-api.ts (1)
  • realtimeStreams (5-5)
packages/core/src/v3/waitUntil/manager.ts (1)
  • StandardWaitUntilManager (3-40)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (2)
apps/webapp/app/services/realtime/types.ts (2)
  • StreamIngestor (2-17)
  • StreamResponder (25-33)
apps/webapp/app/services/realtime/s2realtimeStreams.server.ts (1)
  • S2RealtimeStreams (32-246)
packages/core/src/v3/realtimeStreams/streamsWriterV1.ts (1)
packages/core/src/v3/realtimeStreams/types.ts (1)
  • StreamsWriter (23-25)
packages/core/src/v3/realtimeStreams/streamsWriterV2.ts (1)
packages/core/src/v3/realtimeStreams/types.ts (1)
  • StreamsWriter (23-25)
packages/core/src/v3/realtimeStreams/manager.ts (6)
packages/core/src/v3/realtimeStreams/types.ts (3)
  • RealtimeStreamsManager (10-16)
  • RealtimeAppendStreamOptions (4-8)
  • RealtimeStreamInstance (18-21)
packages/core/src/v3/apiClient/index.ts (1)
  • headers (1202-1213)
packages/core/src/v3/streams/asyncIterableStream.ts (3)
  • ensureAsyncIterable (107-135)
  • AsyncIterableStream (1-1)
  • createAsyncIterableStreamFromAsyncIterable (52-97)
packages/core/src/v3/realtimeStreams/streamsWriterV1.ts (1)
  • StreamsWriterV1 (26-466)
packages/core/src/v3/realtimeStreams/streamsWriterV2.ts (1)
  • StreamsWriterV2 (56-392)
packages/core/src/v3/task-context-api.ts (1)
  • taskContext (5-5)
packages/core/src/v3/apiClient/runStream.ts (1)
packages/core/src/v3/apiClient/index.ts (4)
  • SSEStreamPart (146-146)
  • SSEStreamSubscription (136-136)
  • headers (1202-1213)
  • AnyRunShape (139-139)
packages/core/test/streamsWriterV1.test.ts (1)
packages/core/src/v3/realtimeStreams/streamsWriterV1.ts (1)
  • StreamsWriterV1 (26-466)
apps/webapp/test/redisRealtimeStreams.test.ts (2)
internal-packages/testcontainers/src/index.ts (1)
  • redisTest (167-167)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
  • RedisRealtimeStreams (23-413)
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (4)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
  • getRealtimeStreamInstance (25-52)
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (1)
  • loader (11-71)
apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (1)
  • createLoaderApiRoute (97-287)
apps/webapp/app/db.server.ts (1)
  • $replica (103-106)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (3)
packages/core/src/logger.ts (3)
  • Logger (19-139)
  • LogLevel (15-15)
  • error (69-79)
apps/webapp/app/services/realtime/types.ts (3)
  • StreamIngestor (2-17)
  • StreamResponder (25-33)
  • StreamResponseOptions (19-22)
apps/webapp/app/env.server.ts (1)
  • env (1219-1219)
🪛 Biome (2.1.2)
packages/core/test/streamsWriterV1.test.ts

[error] 792-795: This generator function doesn't contain yield.

(lint/correctness/useYield)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (12)
packages/core/src/v3/waitUntil/types.ts (2)

3-3: Type signature refined for deferred promise timeout.

The updated signature now explicitly passes timeoutInMs to deferred promise functions, making the timeout contract clearer.


8-8: API change verified – all call sites properly updated.

All references to blockUntilSettled() in the codebase confirm the timeout parameter has been removed and call sites updated accordingly. The sole call site at packages/core/src/v3/workers/taskExecutor.ts:1099 correctly invokes the method with no arguments, matching the new interface signature.

packages/core/src/v3/waitUntil/manager.ts (1)

16-31: Implementation correctly uses instance-level timeout.

The refactored blockUntilSettled method properly uses the stored timeout for both deferred promise invocation (line 22) and the race timeout (line 27). Memory management is sound with promises being cleared after settlement.

packages/core/src/v3/apiClient/runStream.ts (8)

1-19: LGTM!

The import additions are appropriate for the streaming infrastructure enhancements. All imported modules are actively used in the file.


51-51: LGTM!

The realtimeStreams field addition aligns with the PR objectives for Streams 2.0.


156-179: LGTM!

The new types and interface changes provide a solid foundation for the enhanced streaming API with resume support and timeout handling.


202-213: LGTM!

The subscribe method correctly returns a ReadableStream and properly wires up the connection and cancellation logic.


344-375: LGTM!

The retry logic with exponential backoff is well-implemented, properly handles abort signals, and includes sensible max retry limits.


765-765: LGTM!

The radix parameter has been correctly added to parseInt, addressing the previous review comment.


759-771: LGTM!

The Redis stream ID parsing is correct with proper fallback handling and NaN checks.


590-590: LGTM!

The realtimeStreams field is correctly added with a safe fallback to an empty array.

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)

50-81: LGTM on unified inactivity threshold and pings.

Threshold computed once and reused in both branches; periodic pings help keep connections alive.

Also applies to: 162-178

Comment on lines +88 to +116
// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";
const streamVersion = request.headers.get("X-Stream-Version") || "v1";

if (!request.body) {
return new Response("No body provided", { status: 400 });
}

const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
let resumeFromChunkNumber: number | undefined = undefined;
if (resumeFromChunk) {
const parsed = parseInt(resumeFromChunk, 10);
if (isNaN(parsed) || parsed < 0) {
return new Response(`Invalid X-Resume-From-Chunk header value: ${resumeFromChunk}`, {
status: 400,
});
}
resumeFromChunkNumber = parsed;
}

const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);

return realtimeStream.ingestData(
request.body,
targetId,
params.streamId,
clientId,
resumeFromChunkNumber
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Guard: server-side ingest should be v1 only.

Using header-selected version can route v2 to Redis path; S2 ingest is client-side only and will throw. Reject non-v1 upfront.

Apply:

-      const clientId = request.headers.get("X-Client-Id") || "default";
-      const streamVersion = request.headers.get("X-Stream-Version") || "v1";
+      const clientId = request.headers.get("X-Client-Id") || "default";
+      const streamVersion = (request.headers.get("X-Stream-Version") || "v1").toLowerCase();
+      if (streamVersion !== "v1") {
+        return new Response("Server-side ingest is only supported for v1 streams", { status: 400 });
+      }
@@
-      const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);
+      const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v1");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";
const streamVersion = request.headers.get("X-Stream-Version") || "v1";
if (!request.body) {
return new Response("No body provided", { status: 400 });
}
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
let resumeFromChunkNumber: number | undefined = undefined;
if (resumeFromChunk) {
const parsed = parseInt(resumeFromChunk, 10);
if (isNaN(parsed) || parsed < 0) {
return new Response(`Invalid X-Resume-From-Chunk header value: ${resumeFromChunk}`, {
status: 400,
});
}
resumeFromChunkNumber = parsed;
}
const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);
return realtimeStream.ingestData(
request.body,
targetId,
params.streamId,
clientId,
resumeFromChunkNumber
);
// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";
const streamVersion = (request.headers.get("X-Stream-Version") || "v1").toLowerCase();
if (streamVersion !== "v1") {
return new Response("Server-side ingest is only supported for v1 streams", { status: 400 });
}
if (!request.body) {
return new Response("No body provided", { status: 400 });
}
const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
let resumeFromChunkNumber: number | undefined = undefined;
if (resumeFromChunk) {
const parsed = parseInt(resumeFromChunk, 10);
if (isNaN(parsed) || parsed < 0) {
return new Response(`Invalid X-Resume-From-Chunk header value: ${resumeFromChunk}`, {
status: 400,
});
}
resumeFromChunkNumber = parsed;
}
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v1");
return realtimeStream.ingestData(
request.body,
targetId,
params.streamId,
clientId,
resumeFromChunkNumber
);
🤖 Prompt for AI Agents
In apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts around
lines 88 to 116, the code currently accepts a client-provided X-Stream-Version
which can route non-v1 (e.g. v2) to server-side ingest; enforce server-side
ingest is v1 only by validating the resolved streamVersion and rejecting any
request where streamVersion !== "v1" (return a 400 with a clear message), or
alternatively ignore the header and force streamVersion = "v1" before calling
getRealtimeStreamInstance; implement one of these fixes so only v1 reaches
ingestData.

Comment on lines +135 to +141
const standardRealtimeStreamsManager = new StandardRealtimeStreamsManager(
apiClientManager.clientOrThrow(),
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev"
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
(getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ??
false
);
runMetadata.setGlobalManager(runMetadataManager);
realtimeStreams.setGlobalManager(standardRealtimeStreamsManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Upstream: make AbortSignal composition in StandardRealtimeStreamsManager portable.

If AbortSignal.any is unavailable, user-provided signals won’t propagate. Add a manual combiner.

Outside this file (packages/core/src/v3/realtimeStreams/manager.ts):

-    const combinedSignal = options?.signal
-      ? AbortSignal.any?.([options.signal, abortController.signal]) ?? abortController.signal
-      : abortController.signal;
+    let combinedSignal = abortController.signal;
+    if (options?.signal) {
+      if (typeof (AbortSignal as any).any === "function") {
+        combinedSignal = (AbortSignal as any).any([options.signal, abortController.signal]);
+      } else {
+        const combo = new AbortController();
+        const onAbort = () => combo.abort();
+        options.signal.addEventListener("abort", onAbort, { once: true });
+        abortController.signal.addEventListener("abort", onAbort, { once: true });
+        combinedSignal = combo.signal;
+      }
+    }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 311 to 314
waitUntil.register({
requiresResolving: () => runMetadataManager.hasActiveStreams(),
promise: () => runMetadataManager.waitForAllStreams(),
requiresResolving: () => standardRealtimeStreamsManager.hasActiveStreams(),
promise: (timeoutInMs) => standardRealtimeStreamsManager.waitForAllStreams(timeoutInMs),
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Multiple concurrent streams can be dropped due to manager key collisions.

waitUntil relies on StandardRealtimeStreamsManager.hasActiveStreams(), but that manager tracks streams by the stream key only. With Streams 2.0 allowing multiple client streams per key/run, later appends overwrite earlier ones, so wait/abort may skip streams.

Proposed core fix (packages/core/src/v3/realtimeStreams/manager.ts):

-  private activeStreams = new Map<
-    string,
-    { wait: () => Promise<void>; abortController: AbortController }
-  >();
+  // Track each append independently
+  private activeStreams = new Set<{
+    label: string; // e.g., `${runId}:${key}`
+    wait: () => Promise<void>;
+    abortController: AbortController;
+  }>();

@@
-    const streamInstance =
+    const streamInstance =
       parsedResponse.version === "v1"
         ? new StreamsWriterV1({ /* ... */ })
         : new StreamsWriterV2({ /* ... */ });
-
-    this.activeStreams.set(key, { wait: () => streamInstance.wait(), abortController });
-
-    // Clean up when stream completes
-    streamInstance.wait().finally(() => this.activeStreams.delete(key));
+    const label = `${runId}:${key}`;
+    const ref = { label, wait: () => streamInstance.wait(), abortController };
+    this.activeStreams.add(ref);
+    // Clean up when stream completes
+    streamInstance.wait().finally(() => this.activeStreams.delete(ref));
@@
-    if (this.activeStreams.size === 0) {
+    if (this.activeStreams.size === 0) {
       return;
     }
-    const promises = Array.from(this.activeStreams.values()).map((stream) => stream.wait());
+    const promises = Array.from(this.activeStreams.values()).map((s) => s.wait());
@@
-      const abortedCount = this.activeStreams.size;
-      for (const [key, streamInfo] of this.activeStreams.entries()) {
-        streamInfo.abortController.abort();
-        this.activeStreams.delete(key);
-      }
+      const abortedCount = this.activeStreams.size;
+      for (const s of Array.from(this.activeStreams.values())) {
+        s.abortController.abort();
+        this.activeStreams.delete(s);
+      }

Also make AbortSignal composition robust (see next comment).

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +283 to +285
const data = safeParseJSON(chunk.data) as {
records: Array<{ body: string; seq_num: number; timestamp: number }>;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate batch data structure before casting.

The type assertion is unsafe and could cause runtime errors if the response format doesn't match expectations. Per coding guidelines, Zod validation should be used.

Apply this diff to add safe validation:

+import { z } from "zod";
+
+const BatchDataSchema = z.object({
+  records: z.array(
+    z.object({
+      body: z.string(),
+      seq_num: z.number(),
+      timestamp: z.number(),
+    })
+  ),
+});

Then update the transform:

              } else {
                if (chunk.event === "batch") {
-                  const data = safeParseJSON(chunk.data) as {
-                    records: Array<{ body: string; seq_num: number; timestamp: number }>;
-                  };
+                  const parsed = safeParseJSON(chunk.data);
+                  const result = BatchDataSchema.safeParse(parsed);
+                  
+                  if (!result.success) {
+                    console.error("Invalid batch data format:", result.error);
+                    return;
+                  }
+                  
+                  const data = result.data;

                  for (const record of data.records) {

As per coding guidelines.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In packages/core/src/v3/apiClient/runStream.ts around lines 283 to 285, the code
unsafely asserts chunk.data to a specific shape; replace that with Zod
validation: define a Zod schema for { records: Array<{ body: string; seq_num:
number; timestamp: number }> }, call schema.safeParse(chunk.data), and handle
the result — on success use the parsed value (typed) in the transform, on
failure log/emit an error or skip the batch per existing error-handling
conventions (do not cast directly). Ensure you export/import zod if needed and
update the transform to consume the validated typed data instead of the asserted
object.

Comment on lines +501 to +521
subscription.subscribe().then((stream) => {
stream
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue({
type: streamKey,
chunk: chunk.chunk as TStreams[typeof streamKey],
run,
});
},
})
)
.pipeTo(
new WritableStream({
write(chunk) {
controller.enqueue(chunk);
},
})
);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add error handling for stream subscription and processing.

The promise chain lacks error handling. If subscription.subscribe() rejects or the stream processing fails, it will cause an unhandled promise rejection.

Apply this diff to add error handling:

                // Start stream processing in the background
                subscription.subscribe().then((stream) => {
                  stream
                    .pipeThrough(
                      new TransformStream({
                        transform(chunk, controller) {
                          controller.enqueue({
                            type: streamKey,
                            chunk: chunk.chunk as TStreams[typeof streamKey],
                            run,
                          });
                        },
                      })
                    )
                    .pipeTo(
                      new WritableStream({
                        write(chunk) {
                          controller.enqueue(chunk);
                        },
                      })
-                    );
-                });
+                    )
+                    .catch((error) => {
+                      console.error(`Error processing stream ${streamKey}:`, error);
+                    });
+                }).catch((error) => {
+                  console.error(`Error subscribing to stream ${streamKey}:`, error);
+                  activeStreams.delete(streamKey);
+                });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
subscription.subscribe().then((stream) => {
stream
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue({
type: streamKey,
chunk: chunk.chunk as TStreams[typeof streamKey],
run,
});
},
})
)
.pipeTo(
new WritableStream({
write(chunk) {
controller.enqueue(chunk);
},
})
);
});
subscription.subscribe().then((stream) => {
stream
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue({
type: streamKey,
chunk: chunk.chunk as TStreams[typeof streamKey],
run,
});
},
})
)
.pipeTo(
new WritableStream({
write(chunk) {
controller.enqueue(chunk);
},
})
)
.catch((error) => {
console.error(`Error processing stream ${streamKey}:`, error);
});
}).catch((error) => {
console.error(`Error subscribing to stream ${streamKey}:`, error);
activeStreams.delete(streamKey);
});
🤖 Prompt for AI Agents
In packages/core/src/v3/apiClient/runStream.ts around lines 501 to 521, the code
calls subscription.subscribe() and starts piping the returned stream but has no
rejection or streaming error handling; update the code to attach a .catch
handler to subscription.subscribe() to catch subscribe promise rejections and
call controller.error(...) with the error, and also attach .catch handlers to
the stream.pipeTo(...) promise (and/or the pipeThrough transform) to handle
runtime stream errors by calling controller.error(...) or controller.enqueue an
error object; ensure all async paths that can reject are handled so there are no
unhandled promise rejections.

Comment on lines +23 to +30
private activeStreams = new Map<
string,
{ wait: () => Promise<void>; abortController: AbortController }
>();

reset(): void {
this.activeStreams.clear();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix activeStreams key collisions (multiple streams per key overwrite each other)

Keying by key drops earlier streams when appending multiple to the same run/key. Use a composite/unique id so all are tracked.

-  private activeStreams = new Map<
-    string,
-    { wait: () => Promise<void>; abortController: AbortController }
-  >();
+  private activeStreams = new Map<
+    string,
+    { wait: () => Promise<void>; abortController: AbortController }
+  >();
+  private streamSeq = 0;

And when registering/cleaning up (see lines 88-92):

-    this.activeStreams.set(key, { wait: () => streamInstance.wait(), abortController });
+    const instanceId = `${runId}:${key}:${++this.streamSeq}`;
+    this.activeStreams.set(instanceId, { wait: () => streamInstance.wait(), abortController });
@@
-    streamInstance.wait().finally(() => this.activeStreams.delete(key));
+    streamInstance.wait().finally(() => this.activeStreams.delete(instanceId));

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In packages/core/src/v3/realtimeStreams/manager.ts around lines 23-30 (and
registration/cleanup at ~88-92): the current Map keyed only by `key` lets
multiple streams for the same run/key overwrite each other. Change the tracking
to use a unique composite id per stream (e.g., `${key}:${streamStreamId}` where
streamStreamId is a UUID or incrementing counter) or make the value a nested Map
of streamId→{wait, abortController} so multiple entries per key are preserved.
On register return or persist that unique id, and in the cleanup logic (lines
~88-92) use the same unique id to remove only the specific stream (or remove
from the nested Map), and only clear the top-level key when its nested map is
empty; reset() can still clear everything.

Comment on lines +83 to +103
this.bufferReaderTask = (async () => {
try {
let chunkIndex = 0;
while (true) {
const { done, value } = await this.streamReader!.read();

if (done) {
this.streamComplete = true;
break;
}

// Add to ring buffer
this.addToRingBuffer(chunkIndex, value);
this.highestBufferedIndex = chunkIndex;
chunkIndex++;
}
} catch (error) {
throw error;
}
})();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid unhandled promise rejection in background buffer task; surface errors to sender

The buffer task rethrows and is never awaited, risking unhandledrejection. Capture the error and have the send loop propagate it; also guard with abort and release the reader lock.

   private startBuffering(): void {
     this.streamReader = this.serverStream.getReader();

-    this.bufferReaderTask = (async () => {
+    this.bufferReaderTask = (async () => {
       try {
         let chunkIndex = 0;
         while (true) {
+          if (this.options.signal?.aborted) {
+            this.streamComplete = true;
+            break;
+          }
           const { done, value } = await this.streamReader!.read();

           if (done) {
             this.streamComplete = true;
             break;
           }

           // Add to ring buffer
           this.addToRingBuffer(chunkIndex, value);
           this.highestBufferedIndex = chunkIndex;
           chunkIndex++;
         }
-      } catch (error) {
-        throw error;
+      } catch (error) {
+        // capture; let sender surface it deterministically
+        (this as any)._bufferError = error;
+        this.streamComplete = true;
+      } finally {
+        try {
+          this.streamReader?.releaseLock();
+        } catch {}
+        this.streamReader = null;
       }
     })();
+    // prevent unhandled rejection warnings
+    this.bufferReaderTask.catch(() => {});
   }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants