Skip to content

Conversation

ericallam
Copy link
Member

No description provided.

Copy link

changeset-bot bot commented Aug 28, 2025

⚠️ No Changeset found

Latest commit: 8e6a8d3

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Aug 28, 2025

Walkthrough

The change replaces scan operations to use a duplicated Redis connection (redis.duplicate()) instead of the primary client within concurrency-related streams. currentConcurrencyScanStream and processConcurrencySet now create a duplicate, use it to drive scanStream/sscanStream, and return the stream along with the duplicate Redis instance. After scanning completes (or on error), the duplicate connection is closed via redis.quit(). Error handling is added around scan operations, capturing scanError and logging failures in both scanConcurrencySets and processConcurrencySet. These updates are internal; no exported/public API signatures were changed.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/redis-scan-duplcates

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbit in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbit in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbit gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbit read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbit help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbit ignore or @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbit summary or @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbit or @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal-packages/run-engine/src/run-queue/index.ts (1)

1956-2019: Mirror the same abort-safe completion for per-set SSCAN; fix log message; always quit in finally.

Without closing on abort/close, this can also hang. Align handling with scanConcurrencySets.

Apply this diff:

   private async processConcurrencySet(concurrencyKey: string) {
-    const redis = this.redis.duplicate();
-
-    const stream = redis.sscanStream(concurrencyKey, {
+    const redis = this.redis.duplicate();
+    const stream = redis.sscanStream(concurrencyKey, {
       count: 100,
     });
 
     const { promise, resolve, reject } = promiseWithResolvers<void>();
 
     stream.on("end", () => {
       resolve();
     });
 
     stream.on("error", (error) => {
       this.logger.error("Error in sscanStream for concurrency set", {
         concurrencyKey,
         error,
       });
 
       reject(error);
     });
 
+    // Resolve if the stream is destroyed/closed or the RunQueue aborts.
+    stream.on("close", () => resolve());
+    const onAbort = () => {
+      stream.destroy();
+      resolve();
+    };
+    this.abortController.signal.addEventListener("abort", onAbort, { once: true });
+
     stream.on("data", async (runIds: string[]) => {
       stream.pause();
@@
-    const [scanError] = await tryCatch(promise);
-
-    if (scanError) {
-      this.logger.error("Error scanning concurrency sets", {
-        error: scanError,
-      });
-    }
-
-    await redis.quit();
-
-    return promise;
+    try {
+      await promise;
+    } catch (scanError) {
+      this.logger.error("Error scanning concurrency set", { concurrencyKey, error: scanError });
+    } finally {
+      this.abortController.signal.removeEventListener("abort", onAbort);
+      await redis.quit();
+    }
 
     return promise;
   }
🧹 Nitpick comments (1)
internal-packages/run-engine/src/run-queue/index.ts (1)

1919-1924: Avoid accidental mid-string prefix removal when normalizing keys.

replace() may remove the first occurrence anywhere in the key. Prefer startsWith + slice to strip only a true prefix.

Apply this diff:

-      const uniqueKeys = Array.from(new Set<string>(keys)).map((key) =>
-        key.replace(redis.options.keyPrefix ?? "", "")
-      );
+      const prefix = redis.options.keyPrefix ?? "";
+      const uniqueKeys = Array.from(new Set<string>(keys)).map((key) =>
+        prefix && key.startsWith(prefix) ? key.slice(prefix.length) : key
+      );
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between b0b0df6 and 8e6a8d3.

📒 Files selected for processing (1)
  • internal-packages/run-engine/src/run-queue/index.ts (4 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • internal-packages/run-engine/src/run-queue/index.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (22)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
internal-packages/run-engine/src/run-queue/index.ts (1)

912-931: Good switch to a duplicated Redis client for SCAN.

Using a dedicated duplicate for scanStream avoids interference with the primary client. Returning it to the caller for explicit lifecycle management is clear.

Comment on lines +1943 to 1954
const [scanError] = await tryCatch(promise);

if (scanError) {
this.logger.error("Error scanning concurrency sets", {
error: scanError,
});
}

await redis.quit();

return promise;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure scan always completes on abort and return concrete stats; close client in finally.

If the AbortSignal fires before any data/end/error, the promise can hang. Also, returning the already-settled promise is awkward and drops stats.

Apply this diff:

-    const [scanError] = await tryCatch(promise);
-
-    if (scanError) {
-      this.logger.error("Error scanning concurrency sets", {
-        error: scanError,
-      });
-    }
-
-    await redis.quit();
-
-    return promise;
+    const onAbort = () => {
+      stream.destroy();
+      resolve(stats);
+    };
+    this.abortController.signal.addEventListener("abort", onAbort, { once: true });
+
+    try {
+      const result = await promise;
+      return result;
+    } catch (scanError) {
+      this.logger.error("Error scanning concurrency sets", { error: scanError });
+      throw scanError;
+    } finally {
+      this.abortController.signal.removeEventListener("abort", onAbort);
+      await redis.quit();
+    }

Committable suggestion skipped: line range outside the PR's diff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant