feat: add BatchWorkpool for high-throughput task processing#167
feat: add BatchWorkpool for high-throughput task processing#167sethconvex wants to merge 34 commits intomainfrom
Conversation
📝 WalkthroughWalkthroughAdds a new BatchWorkpool feature: client-side BatchWorkpool API, backend batch component and schema, extensive tests, and example integrations (example pools, actions, pipelines, and Convex config updates). Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as Batch API<br/>(enqueue/enqueueBatch)
participant DB as Database
participant Executor as Executor<br/>(_runExecutorLoop)
participant Handler as Handler<br/>(registered action)
participant Backend as Backend<br/>(complete/fail/onComplete)
Client->>API: enqueue(name, args, onComplete)
API->>DB: insert task (pending)
API-->>Client: return taskId
loop Executor poll cycle
Executor->>DB: listPending & claimByIds
Executor->>Handler: executeHandler(args) (concurrent)
Handler-->>Executor: result / error
alt success
Executor->>Backend: completeBatch(items)
Backend->>DB: delete task / persist result
Backend->>API: schedule dispatchOnCompleteBatch
else failure
Executor->>Backend: failBatch(items)
Backend->>DB: update readyAt / attempt or delete when exhausted
end
end
sequenceDiagram
participant User
participant Runner as Runner<br/>(runStandard/runBatch)
participant DB as Jobs DB
participant Workpool as Workpool<br/>(standard or batch)
participant Pipeline as Pipeline<br/>(onComplete handlers)
participant Action as Action<br/>(translate/count)
User->>Runner: start N jobs
Runner->>DB: insert jobs (status: started)
Runner->>Workpool: enqueue(task, onComplete -> pipeline.step)
par per job
Workpool->>Action: translateToSpanish(sentence)
Action-->>Workpool: spanish
Workpool->>Pipeline: onComplete(jobId, spanish)
Pipeline->>DB: update spanish, status
Pipeline->>Workpool: enqueue(next)
Workpool->>Action: translateToEnglish(spanish)
Action-->>Workpool: english
Workpool->>Pipeline: onComplete(jobId, english)
Pipeline->>DB: update backToEnglish, status
Pipeline->>Workpool: enqueue(count)
Workpool->>Action: countLetters(english)
Action-->>Workpool: count
Workpool->>Pipeline: onComplete(jobId, count)
Pipeline->>DB: update letterCount, completedAt, status: completed
end
User->>DB: query progress/results
DB-->>User: aggregated metrics / results
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
commit: |
330122b to
1651f80
Compare
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In `@example/convex/standardActions.ts`:
- Around line 4-28: translateToSpanish (and translateToEnglish plus their pooled
counterparts) currently assumes the OpenAI fetch succeeded and directly accesses
data.choices[0].message.content; add proper HTTP and payload validation: after
fetch check response.ok and if not read response.text() (or json()) and throw a
descriptive Error including status and body, then parse JSON and validate that
data.choices is an array with at least one element and that
choices[0].message.content exists before returning it; if validation fails throw
a clear error indicating missing choices in the OpenAI response. Use the
function names translateToSpanish / translateToEnglish and their counterparts in
pooledActions to locate and update each handler.
In `@src/client/pool.ts`:
- Around line 147-166: action() currently registers the handler in this.registry
and also creates a standalone internalAction via internalActionGeneric using
opts.args as the validator, but the pooled executor invokes the registered
handler directly with task.args (from enqueue()) without running those
validators, allowing malformed args through; to fix, store the validator
alongside the handler in the registry when action() is called (e.g.,
registry.set(name, { handler, validator: opts.args })), and update the pooled
executor path that currently calls the raw handler with task.args to first
validate task.args against the stored validator (throw or reject on validation
failure) before invoking the handler; ensure enqueue() still writes v.any() but
the pooled runner enforces validation using the validator saved by action().
- Around line 48-51: Remove the unused imports RunResult, LogLevel, and
EnqueueOptions from the import list (they are declared but not referenced) and
delete the now-stale eslint-disable directive present near the top-level of the
file (the lingering /* eslint-disable ... */ comment around the same area);
ensure only actually used types like RetryBehavior, RunMutationCtx, and
RunQueryCtx remain imported and then run the linter/CI to confirm the pipeline
error is resolved.
- Around line 221-255: The executor can crash if a task's completion/failure
mutation rejects, leaking inFlight claims; to fix, ensure per-task promises
never reject by wrapping the handler(...).then(...).catch(...) chain in a
promise that always resolves (e.g., swallow/log errors and return a neutral
value) before storing it in inFlight, and surround the outer worker loop with a
try/finally that calls releaseClaims for any remaining inFlight keys;
specifically update the block building p (the promise from handler and
ctx.runMutation of self.component.pool.complete / self.component.pool.fail) so
it never rejects, keep inFlight.set(task._id, p) as before, and add a
try/finally around the while loop that invokes releaseClaims(ctx,
[...inFlight.keys()], ...) to guarantee cleanup even if Promise.race rejects.
- Around line 177-178: Remove the unnecessary `const self = this` alias in the
`executor()` method and update the handler passed to `internalActionGeneric` to
use `this` directly (the handler is already an arrow function so it lexically
captures `this`); find references to `self.` inside that handler (and any other
inner arrow handlers in `executor`) and replace them with `this.`, or
alternatively bind the function if you convert it to a non-arrow—ensure
`internalActionGeneric`'s handler uses `this` rather than the aliased `self`.
In `@src/component/pool.test.ts`:
- Around line 15-19: Add an afterEach that flushes scheduled functions and
restores timers to prevent cross-test leakage: after tests that call
setupTest()/t and use enqueue/ensureExecutors (which schedules via
ctx.scheduler.runAfter), add an afterEach(async () => { await
t.finishAllScheduledFunctions(vi.runAllTimers); vi.useRealTimers(); }). Place
this after the setupTest/describe block so t is available to clean up between
tests.
In `@src/component/pool.ts`:
- Line 17: Remove the unused import vResult and the unused variable assignment
poolStatus: delete the import of vResult from the top of the module and remove
the poolStatus assignment block (the variable and its unused computation),
ensuring no other code references vResult or poolStatus; then run
TypeScript/ESLint checks and adjust any remaining references if they exist.
- Around line 429-450: ensureExecutors currently only starts a single executor
per call which prevents ramping up to config.maxWorkers when multiple slots are
free; update ensureExecutors (in MutationCtx) to loop while
config.activeExecutors < config.maxWorkers, increment activeExecutors for each
started executor via ctx.db.patch (use the same config._id and update field) and
schedule each executor with ctx.scheduler.runAfter(handle, {}) until maxWorkers
is reached or no more pending work; ensure you re-read or atomically increment
activeExecutors to avoid races when patching.
- Around line 175-200: The complete (and similarly fail) mutation currently
calls callOnComplete before removing the task, which allows retries to re-run
the onComplete handler; update the handler for complete (and fail) so you first
atomically PATCH the task record to a terminal marker (e.g., set task.status =
"completed" or set a completedAt timestamp) using ctx.db.update/patch, then only
if the update succeeds and the terminal marker was not already set call
callOnComplete, and finally delete the task; reference the complete mutation,
fail mutation, callOnComplete, and the ctx.db.delete/ctx.db.update operations
when implementing the change to ensure retries skip re-invocation.
🧹 Nitpick comments (11)
src/client/pool.ts (1)
382-394:getPoolConfigisasyncbut returnsundefinedsynchronously whenexecutorFnRefis null.This is fine functionally, but consider that callers
enqueueandenqueueBatchbothawaitthis. WhenexecutorFnRefis not set,poolConfigisundefined, meaningensureExecutorson the component side won't start any executors for those enqueues. This is documented as intentional for pipeline onComplete callbacks, but it could be a subtle foot-gun if a user forgetssetExecutorRef.Consider logging a warning when
executorFnRefis null to aid debugging.example/convex/schema.ts (1)
10-21: Consider usingv.union(v.literal(...))formodeandstatusfields.Both fields document their expected values in comments but use
v.string(), which provides no runtime validation. Since the component schema (src/component/schema.ts) already usesv.union(v.literal(...))for thepoolTasks.statusfield, the same pattern would be consistent here.This is example code, so not critical.
example/convex/standardActions.ts (1)
4-61: Significant code duplication withpooledActions.ts.The
translateToSpanish,translateToEnglish, andcountLettershandlers are copy-pasted betweenstandardActions.tsandpooledActions.ts. Consider extracting the shared logic into helper functions (e.g., acallOpenAI(prompt, sentence)helper) and importing them in both files. This is example code so not urgent, but it would reduce maintenance burden if the example evolves.example/convex/pooledActions.ts (1)
5-29: Missing error handling on the OpenAI fetch response.Neither
response.oknor HTTP status is checked before parsing. If the API returns a non-2xx response (rate limit, auth failure, server error),data.choices[0]will throw an unhelpfulTypeErrorinstead of a descriptive error. Same applies totranslateToEnglish(Line 34). The standard actions instandardActions.tsshare this gap, but since the pooled mode retries on failure, a clear error message matters more here.🛡️ Suggested improvement
const data = await response.json(); + if (!response.ok) { + throw new Error(`OpenAI API error (${response.status}): ${JSON.stringify(data)}`); + } return data.choices[0].message.content;src/component/pool.test.ts (2)
249-315: Tests don't verifyonCompletecallback invocation.The
completetests verify task deletion, but none assert that theonCompletemutation is actually called with the correctresultandcontext. Similarly,failandcanceltests don't verifyonCompleteinvocation. SincecallOnCompleteis a critical part of the pipeline chaining mechanism, this is a meaningful coverage gap.Would you like me to open an issue to track adding
onCompletecallback verification tests?
602-623: Missing test forexecutorDonewithstartMore: trueandreleaseClaims.
executorDoneis only tested withstartMore: false. ThestartMore: truepath triggersensureExecutors, which schedules new executor actions — worth verifying.- The
releaseClaimsmutation has no dedicated test coverage at all. It's a meaningful part of the executor soft-deadline lifecycle.Would you like me to draft test cases for these?
src/component/pool.ts (5)
251-261:countPendinguses.collect().lengthwhich loads all pending documents into memory.For a high-throughput pool that might have thousands of pending tasks, collecting all documents just to count them is wasteful. Consider using a running count field on
poolConfigthat increments/decrements as tasks are enqueued and claimed, or at minimum add a.take()upper bound to avoid unbounded memory use.
452-473: Error swallowing incallOnCompletesilently drops pipeline failures.While the try/catch matches existing workpool behavior (per the comment), a
console.erroris the only signal when anonCompletehandler fails. In a pooled pipeline, a failedonCompletemeans the entire downstream chain is silently dropped — the task is deleted, no retry occurs, and the job stays stuck in an intermediate status forever. Consider at minimum emitting a structured log with the task name and args for observability, or surfacing this as a dead-letter entry.
63-96:enqueueandenqueueBatchduplicate the upsert + insert + ensureExecutors pattern.
enqueueBatchessentially reimplementsenqueuein a loop. Consider havingenqueueBatchdelegate to a shared internal helper (or callenqueue's core logic) to keep the insert + optional-field handling in one place.Also applies to: 98-133
348-378:sweepStaleClaimsprocesses at most 100 tasks per invocation with no continuation.If more than 100 claims are stale, the remainder won't be swept until the next cron/manual trigger. Consider scheduling a follow-up sweep when the batch is full:
♻️ Suggested continuation
return stale.length; + // If we hit the batch limit, there may be more stale claims + // Schedule another sweep immediately + if (stale.length === 100) { + await ctx.scheduler.runAfter(0, api.pool.sweepStaleClaims, {}); + } }, });
302-318:canceldoesn't guard against canceling an already-completed task's side effects.If a task has already been completed (and deleted) by the executor,
cancelcorrectly no-ops on line 306–308. However, if a task is inclaimedstate and the executor is concurrently running the handler, canceling it will invokeonCompletewithkind: "canceled"and delete the task, but the executor's in-flight handler will continue running. When it finishes,completeorfailwill no-op (task gone), but the handler's side effects (e.g., OpenAI API calls) will have already occurred. This is inherent to the design but worth documenting.
| export const translateToSpanish = internalAction({ | ||
| args: { sentence: v.string() }, | ||
| handler: async (_ctx, { sentence }) => { | ||
| const response = await fetch("https://api.openai.com/v1/chat/completions", { | ||
| method: "POST", | ||
| headers: { | ||
| Authorization: `Bearer ${process.env.OPENAI_API_KEY}`, | ||
| "Content-Type": "application/json", | ||
| }, | ||
| body: JSON.stringify({ | ||
| model: "gpt-4o-mini", | ||
| messages: [ | ||
| { | ||
| role: "system", | ||
| content: | ||
| "Translate the following to Spanish. Return only the translation.", | ||
| }, | ||
| { role: "user", content: sentence }, | ||
| ], | ||
| }), | ||
| }); | ||
| const data = await response.json(); | ||
| return data.choices[0].message.content; | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Missing HTTP response error handling — data.choices[0] will throw on API errors.
Neither translateToSpanish nor translateToEnglish check response.ok before accessing data.choices[0].message.content. If the OpenAI API returns a 4xx/5xx (rate limit, invalid key, etc.), choices will be undefined, producing a confusing TypeError instead of a meaningful error message. The same issue exists in example/convex/pooledActions.ts.
🛡️ Proposed fix (apply to both actions and their pooled counterparts)
const data = await response.json();
+ if (!response.ok) {
+ throw new Error(`OpenAI API error ${response.status}: ${data.error?.message ?? JSON.stringify(data)}`);
+ }
return data.choices[0].message.content;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export const translateToSpanish = internalAction({ | |
| args: { sentence: v.string() }, | |
| handler: async (_ctx, { sentence }) => { | |
| const response = await fetch("https://api.openai.com/v1/chat/completions", { | |
| method: "POST", | |
| headers: { | |
| Authorization: `Bearer ${process.env.OPENAI_API_KEY}`, | |
| "Content-Type": "application/json", | |
| }, | |
| body: JSON.stringify({ | |
| model: "gpt-4o-mini", | |
| messages: [ | |
| { | |
| role: "system", | |
| content: | |
| "Translate the following to Spanish. Return only the translation.", | |
| }, | |
| { role: "user", content: sentence }, | |
| ], | |
| }), | |
| }); | |
| const data = await response.json(); | |
| return data.choices[0].message.content; | |
| }, | |
| }); | |
| export const translateToSpanish = internalAction({ | |
| args: { sentence: v.string() }, | |
| handler: async (_ctx, { sentence }) => { | |
| const response = await fetch("https://api.openai.com/v1/chat/completions", { | |
| method: "POST", | |
| headers: { | |
| Authorization: `Bearer ${process.env.OPENAI_API_KEY}`, | |
| "Content-Type": "application/json", | |
| }, | |
| body: JSON.stringify({ | |
| model: "gpt-4o-mini", | |
| messages: [ | |
| { | |
| role: "system", | |
| content: | |
| "Translate the following to Spanish. Return only the translation.", | |
| }, | |
| { role: "user", content: sentence }, | |
| ], | |
| }), | |
| }); | |
| const data = await response.json(); | |
| if (!response.ok) { | |
| throw new Error(`OpenAI API error ${response.status}: ${data.error?.message ?? JSON.stringify(data)}`); | |
| } | |
| return data.choices[0].message.content; | |
| }, | |
| }); |
🤖 Prompt for AI Agents
In `@example/convex/standardActions.ts` around lines 4 - 28, translateToSpanish
(and translateToEnglish plus their pooled counterparts) currently assumes the
OpenAI fetch succeeded and directly accesses data.choices[0].message.content;
add proper HTTP and payload validation: after fetch check response.ok and if not
read response.text() (or json()) and throw a descriptive Error including status
and body, then parse JSON and validate that data.choices is an array with at
least one element and that choices[0].message.content exists before returning
it; if validation fails throw a clear error indicating missing choices in the
OpenAI response. Use the function names translateToSpanish / translateToEnglish
and their counterparts in pooledActions to locate and update each handler.
There was a problem hiding this comment.
This is example code demonstrating the library — intentionally kept simple. Users will add error handling appropriate for their use case.
There was a problem hiding this comment.
@sethconvex, understood! I appreciate the clarification that these examples are intentionally kept minimal for demonstration purposes. Users will indeed add appropriate error handling when adapting the code for their production use cases.
✏️ Learnings added
Learnt from: sethconvex
Repo: get-convex/workpool PR: 167
File: example/convex/standardActions.ts:4-11
Timestamp: 2026-02-14T00:53:20.220Z
Learning: In the workpool repository, example code in the `example/convex/` directory (such as standardActions.ts and batchActions.ts) is intentionally kept simple and illustrative. Error handling (like response.ok checks for API calls) is omitted by design, with the expectation that users will add appropriate error handling for their specific use cases.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: sethconvex
Repo: get-convex/workpool PR: 0
File: :0-0
Timestamp: 2026-02-13T07:29:09.647Z
Learning: In the workpool BatchWorkpool implementation (src/component/batch.ts and src/client/batch.ts), the `ensureExecutors` function intentionally starts one executor per call rather than using a while loop. This creates staggered executor lifecycles and was validated by stress testing (3000 jobs, 47s, 0 failures).
| action< | ||
| DataModel extends GenericDataModel, | ||
| Args extends DefaultFunctionArgs = any, | ||
| Returns = any, | ||
| >( | ||
| name: string, | ||
| opts: { | ||
| args: Record<string, Validator<any, any, any>>; | ||
| handler: ( | ||
| ctx: GenericActionCtx<DataModel>, | ||
| args: Args, | ||
| ) => Promise<Returns>; | ||
| }, | ||
| ): RegisteredAction<"internal", Args, Returns> { | ||
| this.registry.set(name, opts.handler as HandlerFn); | ||
| return internalActionGeneric({ | ||
| args: opts.args as any, | ||
| handler: opts.handler as any, | ||
| }); | ||
| } |
There was a problem hiding this comment.
action() registers handler in instance registry but also generates a standalone internalAction.
The generated internalAction at line 162-165 can be invoked independently (e.g., via the standard Workpool), bypassing the pooled executor entirely. This is mentioned in the docstring as intentional for workflow's step.runAction(). However, note that opts.args is used for the standalone action's validator but is not used when the pooled executor invokes the handler (line 221 passes task.args directly without validation).
Tasks enqueued via enqueue() store args as v.any() in the DB (per schema), so a malformed args object could reach the handler unvalidated in pooled mode.
🤖 Prompt for AI Agents
In `@src/client/pool.ts` around lines 147 - 166, action() currently registers the
handler in this.registry and also creates a standalone internalAction via
internalActionGeneric using opts.args as the validator, but the pooled executor
invokes the registered handler directly with task.args (from enqueue()) without
running those validators, allowing malformed args through; to fix, store the
validator alongside the handler in the registry when action() is called (e.g.,
registry.set(name, { handler, validator: opts.args })), and update the pooled
executor path that currently calls the raw handler with task.args to first
validate task.args against the stored validator (throw or reject on validation
failure) before invoking the handler; ensure enqueue() still writes v.any() but
the pooled runner enforces validation using the validator saved by action().
There was a problem hiding this comment.
By design. The action() method registers the handler for use by the executor loop while also exposing it as a standalone internalAction for direct invocation. The args schema on the standalone action matches what the user passes to enqueue, so it does validate.
There was a problem hiding this comment.
@sethconvex, understood. I've noted that the args validation occurs at the enqueue() call site when the standalone action schema validates the input, and the pooled executor then processes those already-validated args from the task queue.
This aligns with the existing learning that argument validation bypass in batch mode is intentional, since the args originate from the user's own enqueue() call and represent a trusted boundary.
🧠 Learnings used
Learnt from: sethconvex
Repo: get-convex/workpool PR: 0
File: :0-0
Timestamp: 2026-02-13T07:29:09.647Z
Learning: In the workpool BatchWorkpool implementation (src/component/batch.ts and src/client/batch.ts), the `ensureExecutors` function intentionally starts one executor per call rather than using a while loop. This creates staggered executor lifecycles and was validated by stress testing (3000 jobs, 47s, 0 failures).
Learnt from: sethconvex
Repo: get-convex/workpool PR: 0
File: :0-0
Timestamp: 2026-02-13T07:29:09.647Z
Learning: In the workpool BatchWorkpool implementation, argument validation bypass in batch mode is intentional by design, since the args come from the user's own `enqueue()` call and represent a trusted boundary.
src/component/pool.test.ts
Outdated
| describe("pool", () => { | ||
| async function setupTest() { | ||
| const t = convexTest(schema, modules); | ||
| return t; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the actual afterEach implementation in pool.test.ts
cat src/component/pool.test.ts | head -50Repository: get-convex/workpool
Length of output: 1191
🏁 Script executed:
# Check how other test files handle afterEach
rg -B2 -A8 "afterEach" --type=ts -g '**/*.test.*' | head -100Repository: get-convex/workpool
Length of output: 4629
🏁 Script executed:
# Search for convex-test cleanup methods/patterns
rg "finishAllScheduled|finishInProgress|close\b|teardown|cleanup" --type=ts | grep -i "convex\|test" | head -20Repository: get-convex/workpool
Length of output: 596
🏁 Script executed:
# Check if convex-test package provides cleanup utilities
fd -e ts -e js | xargs grep -l "convexTest" | head -5Repository: get-convex/workpool
Length of output: 204
🏁 Script executed:
# Look at example test to understand the finishAllScheduledFunctions pattern
cat example/convex/example.test.ts | head -150Repository: get-convex/workpool
Length of output: 2951
🏁 Script executed:
# Search for actual scheduler.runAfter calls in pool.ts to see what gets scheduled
rg "scheduler.runAfter\|ctx.scheduler" src/component/pool.ts -B2 -A2Repository: get-convex/workpool
Length of output: 45
🏁 Script executed:
# Check if there are any test isolation issues documented or if tests fail without cleanup
rg "finishAllScheduled|test.*isolation\|flaky\|isolation" --type=ts src/component/ | head -20Repository: get-convex/workpool
Length of output: 45
🏁 Script executed:
# Check pool.ts to see what functions schedule work via scheduler
cat src/component/pool.ts | grep -B5 -A5 "runAfter\|scheduler"Repository: get-convex/workpool
Length of output: 387
🏁 Script executed:
# Search for scheduler usage in any component file
rg "ctx.scheduler" src/component/ -A3Repository: get-convex/workpool
Length of output: 6744
🏁 Script executed:
# Check if fresh convexTest instances are completely isolated or if state leaks
# Look at how convex-test handles teardown between instances
rg "convexTest\(" src/component/pool.test.ts -A10Repository: get-convex/workpool
Length of output: 258
🏁 Script executed:
# Check if any tests in pool.test.ts actually trigger the scheduler code (enqueue -> ensureExecutors)
grep -n "enqueue\|ensureExecutor" src/component/pool.test.ts | head -20Repository: get-convex/workpool
Length of output: 1236
🏁 Script executed:
# Compare: does stats.test.ts also schedule functions and skip cleanup?
grep -n "scheduler\|runAfter" src/component/stats.test.tsRepository: get-convex/workpool
Length of output: 45
🏁 Script executed:
# Check if there are any known test failures or flakiness related to test isolation
rg "flaky\|isolation\|leak" --type=ts --type=md src/component/ example/Repository: get-convex/workpool
Length of output: 45
🏁 Script executed:
# Let's verify if a test actually fails or has isolation issues without cleanup
# by checking the test output or looking for any documented issues
git log --oneline --all | head -20Repository: get-convex/workpool
Length of output: 179
🏁 Script executed:
# Check git history to see if there were any test isolation issues with pool.test.ts
git log --oneline -- src/component/pool.test.ts | head -10Repository: get-convex/workpool
Length of output: 119
🏁 Script executed:
# Final check: see if the convex-test documentation or comments mention whether
# fresh instances automatically clean up scheduled functions
rg "finishAllScheduled|cleanup|teardown|isolation" --type=ts -B3 -A3 | grep -i "convex\|test\|scheduled" | head -30Repository: get-convex/workpool
Length of output: 2024
Add scheduled function cleanup to afterEach for test isolation.
The afterEach hook currently only restores real timers. However, enqueue (called in 20+ tests) triggers ensureExecutors() which schedules executor actions via ctx.scheduler.runAfter(0, handle, {}). These scheduled functions should be flushed before the next test runs to prevent cross-test state leakage.
The example/convex/example.test.ts file demonstrates the correct pattern:
afterEach(async () => {
await t.finishAllScheduledFunctions(vi.runAllTimers);
vi.useRealTimers();
});Add this cleanup to pool.test.ts for consistency and proper test isolation.
🤖 Prompt for AI Agents
In `@src/component/pool.test.ts` around lines 15 - 19, Add an afterEach that
flushes scheduled functions and restores timers to prevent cross-test leakage:
after tests that call setupTest()/t and use enqueue/ensureExecutors (which
schedules via ctx.scheduler.runAfter), add an afterEach(async () => { await
t.finishAllScheduledFunctions(vi.runAllTimers); vi.useRealTimers(); }). Place
this after the setupTest/describe block so t is available to clean up between
tests.
There was a problem hiding this comment.
Tests pass without afterEach cleanup — convex-test creates isolated environments per test. Adding cleanup would be purely cosmetic.
There was a problem hiding this comment.
@sethconvex, thanks for clarifying! If convex-test provides per-test isolation without requiring explicit cleanup between tests, then the afterEach is indeed unnecessary. I appreciate you confirming that the tests pass and the framework handles this automatically.
✏️ Learnings added
Learnt from: sethconvex
Repo: get-convex/workpool PR: 167
File: src/component/pool.test.ts:0-0
Timestamp: 2026-02-14T00:53:32.089Z
Learning: In the workpool repository using convex-test, afterEach cleanup with `t.finishAllScheduledFunctions(vi.runAllTimers)` is not required in test files like `src/component/pool.test.ts` because convex-test creates isolated environments per test, making such cleanup purely cosmetic.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: sethconvex
Repo: get-convex/workpool PR: 0
File: :0-0
Timestamp: 2026-02-13T07:29:09.647Z
Learning: In the workpool BatchWorkpool implementation (src/component/batch.ts and src/client/batch.ts), the `ensureExecutors` function intentionally starts one executor per call rather than using a while loop. This creates staggered executor lifecycles and was validated by stress testing (3000 jobs, 47s, 0 failures).
src/component/pool.ts
Outdated
| retryBehavior, | ||
| type RunResult, | ||
| vOnCompleteFnContext, | ||
| vResult, |
There was a problem hiding this comment.
Remove unused import vResult and unused variable poolStatus.
Static analysis flags these:
vResult(Line 17) is imported but never used.poolStatus(Lines 23–29) is assigned but never referenced.
🧹 Proposed fix
import {
type OnCompleteArgs,
retryBehavior,
type RunResult,
vOnCompleteFnContext,
- vResult,
} from "./shared.js";-const poolStatus = v.union(
- v.literal("pending"),
- v.literal("claimed"),
- v.literal("completed"),
- v.literal("failed"),
- v.literal("canceled"),
-);Also applies to: 23-29
🧰 Tools
🪛 GitHub Check: Test and lint
[warning] 17-17:
'vResult' is defined but never used. Allowed unused vars must match /^_/u
🤖 Prompt for AI Agents
In `@src/component/pool.ts` at line 17, Remove the unused import vResult and the
unused variable assignment poolStatus: delete the import of vResult from the top
of the module and remove the poolStatus assignment block (the variable and its
unused computation), ensuring no other code references vResult or poolStatus;
then run TypeScript/ESLint checks and adjust any remaining references if they
exist.
There was a problem hiding this comment.
Pool mode is not part of this PR — those files were in a separate branch. The comments on pool.ts are not applicable here.
There was a problem hiding this comment.
@sethconvex, thank you for the clarification! You're absolutely right—I apologize for the confusion. I mistakenly reviewed src/component/pool.ts, which is not part of this PR. I'll disregard that comment.
Thanks for catching that! 🙏
🧠 Learnings used
Learnt from: sethconvex
Repo: get-convex/workpool PR: 0
File: :0-0
Timestamp: 2026-02-13T07:29:09.647Z
Learning: In the workpool BatchWorkpool implementation (src/component/batch.ts and src/client/batch.ts), the `ensureExecutors` function intentionally starts one executor per call rather than using a while loop. This creates staggered executor lifecycles and was validated by stress testing (3000 jobs, 47s, 0 failures).
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@example/convex/pooledActions.ts`:
- Around line 8-27: The fetch call in translateToSpanish (and similarly in
translateToEnglish) lacks response error handling, so accessing
data.choices[0].message.content can throw on non-2xx results; update both
functions to check response.ok after the fetch, parse the error body or text
when not ok, and throw or return a clear error including HTTP status and error
details instead of blindly using data.choices; also validate that parsed data
has choices[0].message.content before returning and throw a descriptive error if
the shape is unexpected.
🧹 Nitpick comments (6)
example/convex/schema.ts (1)
8-21: Consider usingv.union(v.literal(...))formodeandstatusfields.Both
modeandstatusare typed asv.string()with valid values documented only in comments. Usingv.union(v.literal("standard"), v.literal("pooled"))and similar forstatuswould provide schema-level validation, preventing invalid values from being inserted. This is example code though, so it's a minor suggestion.src/component/schema.ts (1)
21-27:"completed","failed", and"canceled"statuses appear to be dead code in practice.Based on the test suite and the client-side executor logic, tasks are deleted from
poolTasksupon completion, final failure, and cancellation — they never transition to"completed","failed", or"canceled". These status literals inflate the schema without being used. If they're kept for future use (e.g., retaining finished task history), a comment noting that would help.src/component/pool.test.ts (2)
602-623: Missing test forexecutorDonewithstartMore: true.Only the
startMore: falsepath is tested. ThestartMore: truecase likely triggers self-scheduling of a new executor, which is a meaningful behavioral difference worth covering — especially since it touches theensureExecutorslogic.
488-501: Returning"finished"for non-existent tasks is a design choice worth documenting.The
statusquery treats a deleted/non-existent task ID as"finished". This is reasonable for the common case (completed tasks are deleted), but could mask bugs where a caller passes a completely invalid ID. A comment in the component-sidestatusquery explaining this trade-off would help future maintainers.src/client/pool.ts (2)
54-54: Branded typePoolTaskIdisn't enforced —__isPoolTaskIdbrand is never applied.
PoolTaskIdis defined asstring & { __isPoolTaskId: true }, but the brand is never actually assigned. On line 329, the castid as unknown as PoolTaskIdcreates the illusion of type safety, but any plain string can be passed wherePoolTaskIdis expected without a compiler error (since the brand property is never checked). This is a cosmetic concern — the branding pattern only works if consumers never bypass it, but the double-cast here sets a precedent that undermines it.
256-257:Promise.raceon settled promises is a no-op — considerPromise.anyor filtering.When a task completes and its promise resolves, it remains in the
inFlightmap until.finally()runs. On the next loop iteration,Promise.racewill immediately resolve with the already-settled promise's value (undefined), causing a tight spin untilfinallycallbacks execute andinFlightis cleaned up. In practice,.finally()runs synchronously after.then/.catch, so this is likely fine, but it's worth noting.
Add BatchWorkpool class that enables running many task handlers concurrently inside a single long-lived Convex action, reducing per-task overhead for high-volume workloads like LLM calls. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1651f80 to
8a9108d
Compare
Example app comparing standard workpool mode (1 action per task) with batch mode (many handlers per executor action) using a 3-step translation pipeline. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
8a9108d to
9fe61f3
Compare
…orkpool Enable workflow integration by adding methods to check if a function is batch-registered and to enqueue tasks with a pre-computed onComplete handle string instead of a FunctionReference. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@src/client/batch.ts`:
- Around line 269-284: The executor currently double-schedules work: calling
component.batch.executorDone({ startMore: true }) and also calling
ctx.scheduler.runAfter(0, executorRef, {}) causes two executors to be spawned
while activeExecutors/ensureExecutors only accounts for one; remove the
self-schedule path to fix this by deleting or disabling the block that calls
getExecutorRef() / ctx.scheduler.runAfter(0, executorRef, {}) so only
executorDone(startMore: true) drives new executor creation (or alternately,
invert the design and have executorDone never trigger scheduling and rely solely
on the self-schedule, but do not keep both). Ensure references in the file to
executorRef, getExecutorRef, and ctx.scheduler.runAfter are updated/removed
accordingly.
In `@src/component/batch.test.ts`:
- Around line 23-30: Add timer cleanup to the test teardown: in the afterEach
block that currently calls vi.useRealTimers(), also call vi.clearAllTimers() to
remove any pending fake-timer state from vi.useFakeTimers(); for tests that
schedule work (e.g., the "lazy-init" test which triggers ensureExecutors),
ensure those tests explicitly await the scheduled work by calling await
t.finishAllScheduledFunctions(vi.runAllTimers) where they need to verify
completion instead of relying on teardown.
In `@src/component/batch.ts`:
- Around line 293-309: The cancel handler currently allows cancelling tasks in
any state which can race with an in-progress executor; update cancel (the
mutation handler named cancel) to first check task.status and only proceed if it
is "pending" (or not "claimed") — if task.status === "claimed" (or non-pending)
return early (or surface an error) so you do not call callOnComplete or
ctx.db.delete for an actively running task; ensure you reference the same
task.onComplete handling and ctx.db.delete logic only inside the guarded branch
to match complete/fail semantics.
- Around line 33-50: The configure mutation duplicates the upsert logic in the
private helper upsertBatchConfig; refactor configure to delegate to
upsertBatchConfig to remove the DRY violation: replace the inlined upsert in
configure with a call to upsertBatchConfig(ctx, args) (or to a small exported
wrapper that forwards ctx and args), ensuring upsertBatchConfig continues to set
executorHandle, maxWorkers, claimTimeoutMs on patch and to insert
activeExecutors: 0 when creating a new record so behavior remains identical.
🧹 Nitpick comments (11)
example/convex/batchActions.ts (1)
64-69: Module-level side effect for executor wiring — verify import order is guaranteed.
batch.setExecutorRef(internal.batchActions.executor)at line 69 runs as a module-level side effect. This works becausetest.tsexplicitly imports"./batchActions"to ensure the registration occurs before enqueue calls. However, this pattern is fragile — if another module enqueues batch tasks without importingbatchActions, the executor reference won't be set.Consider adding a defensive check inside
batch.enqueue()(in the client) that throws a clear error if the executor reference hasn't been configured yet.example/convex/test.ts (2)
87-116:collect()loads all jobs into memory — fine for the example, but worth noting.The
progressquery fetches all jobs for a mode via.collect()and then filters in-memory. This is fine for the small-scale example but wouldn't scale to thousands of jobs. If this ever moves beyond example code, consider using separate index queries per status.
146-154:resethas no pagination — may hit Convex mutation limits with large datasets.Deleting all records in a single mutation via
collect()+ loop could exceed the mutation document write limit (8192 in Convex) if the table grows large. For a test utility this is acceptable, but adding a comment noting the limitation would be helpful.example/convex/pipeline.ts (1)
14-16: Error details from the result are discarded on failure.All six pipeline handlers check
result.kind !== "success"and setstatus: "failed", but the error information from theresultobject (e.g.,result.error) is not stored in the job record. This makes debugging failed jobs harder since you can't inspect what went wrong.♻️ Suggested improvement (example for one handler, apply to all)
if (result.kind !== "success") { - await ctx.db.patch(context.jobId, { status: "failed" }); + await ctx.db.patch(context.jobId, { + status: "failed", + completedAt: Date.now(), + }); return; }Note: You'd also need to add an
errorfield to thejobsschema to store the error message if desired.Also applies to: 37-39, 60-62, 77-79, 100-102, 123-125
src/component/batch.test.ts (1)
488-501: Treating non-existent tasks as "finished" is a deliberate design choice — consider documenting it.Returning
{ state: "finished" }for a non-existent task (line 500) means callers can't distinguish between "completed and deleted" and "never existed / wrong ID". This is a reasonable simplification for the batch use case (since completed tasks are deleted), but it could mask bugs where callers pass invalid IDs. A brief doc comment on thestatusfunction inbatch.tswould help.src/component/schema.ts (1)
15-45: Remove or clarify unused terminal status literals in the schema.The schema includes
"completed","failed", and"canceled"as status union literals, but these statuses are never written to the database. Tasks are deleted rather than updated to any terminal state:
- On successful completion: task is deleted
- On failure: task is deleted
- On cancellation: task is deleted
The
statusquery includes defensive case statements for these values (returning"finished"), but they can never be reached. Either remove these literals from the union, or add a clarifying comment (e.g., "Reserved for future 'keep history' mode") to avoid misleading future readers.src/component/batch.ts (3)
242-252:countPendingloads all pending tasks into memory just to count them.
.collect()fetches every matching document. With high-throughput batch workloads this could pull thousands of documents into memory unnecessarily.Consider using an aggregation or limiting the query if only a "there is work" check is needed. Alternatively, maintain a counter in
batchConfig.
278-288: Deadswitchcases for statuses that are never persisted.Tasks are deleted on completion, failure (terminal), and cancellation — they never remain in the DB with
status = "completed" | "failed" | "canceled". Thecase "completed"/case "failed"/case "canceled"branches (lines 283–286) are unreachable dead code. This is misleading because it implies those are valid stored states.Removing these cases (or replacing with a
default: throw) makes the intent clearer and catches future bugs if the status set changes.
443-464:callOnCompleteswallows errors silently — consider surfacing failures.The
catchon line 460 logs the error but the caller has no indication theonCompletecallback failed. For pipeline-style usage whereonCompleteenqueues the next step, a silently-swallowed error means the pipeline stalls with no trace beyond a console log.Consider at least storing the error on the task (before deletion) or emitting a structured log entry that monitoring can alert on.
src/client/batch.ts (2)
244-253: Busy-polling withsetTimeoutwhen pending tasks exist but none are ready.When
inFlight.size === 0,canClaimis true, butclaimBatchreturned nothing (tasks have futurereadyAt), the loop queriescountPendingand if > 0, sleeps forpollInterval(default 500ms) then loops. This repeatedly callscountPendingevery 500ms, consuming query bandwidth.For production workloads with many retrying tasks on backoff, consider using a longer interval or returning the earliest
readyAtfrom the component so the executor can sleep precisely until the next task is ready.
440-452:createFunctionHandleis called on everyenqueue— consider caching.
getBatchConfig()callscreateFunctionHandle(this.executorFnRef)on every enqueue. SinceexecutorFnRefdoesn't change aftersetExecutorRef, the result can be cached to avoid repeated async work.♻️ Suggested optimization
+ private cachedExecutorHandle: string | null = null; + private async getBatchConfig() { if (!this.executorFnRef) return undefined; - const executorHandle = await createFunctionHandle(this.executorFnRef); + if (!this.cachedExecutorHandle) { + this.cachedExecutorHandle = await createFunctionHandle(this.executorFnRef); + } return { - executorHandle, + executorHandle: this.cachedExecutorHandle, maxWorkers: this.options.maxWorkers ?? 10, claimTimeoutMs: this.options.claimTimeoutMs ?? 120_000, }; }
| beforeEach(async () => { | ||
| vi.useFakeTimers(); | ||
| t = await setupTest(); | ||
| }); | ||
|
|
||
| afterEach(() => { | ||
| vi.useRealTimers(); | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
fd batch.test.tsRepository: get-convex/workpool
Length of output: 90
🏁 Script executed:
cat -n src/component/batch.test.tsRepository: get-convex/workpool
Length of output: 23360
🌐 Web query:
convex-test finishAllScheduledFunctions afterEach cleanup best practices
💡 Result:
When to use t.finishAllScheduledFunctions
Use await t.finishAllScheduledFunctions(vi.runAllTimers) in tests that trigger chains of scheduled work (a function schedules another function, etc.) so the test doesn’t finish while jobs are still pending/in-progress. This is the pattern shown in the Convex convex-test docs. (docs.convex.dev)
Recommended Vitest cleanup (afterEach) for scheduled-function tests
The two most common leaks are fake timers and pending timers. A solid baseline is:
import { afterEach, vi } from "vitest";
afterEach(() => {
// Don’t leak fake timers into other tests
vi.useRealTimers();
// Don’t leak pending timer state
vi.clearAllTimers();
// If you use spies/mocks elsewhere
vi.restoreAllMocks();
});Convex’s convex-test scheduled-function examples explicitly reset timers back to real timers at the end of the test. (docs.convex.dev)
Best practice pattern: try/finally inside the test
If a test might throw before cleanup, prefer try/finally so timers always reset:
test("...", async () => {
vi.useFakeTimers();
try {
const t = convexTest(schema, modules);
await t.mutation(api.scheduler.someChainStarter);
await t.finishAllScheduledFunctions(vi.runAllTimers); // drains chained schedules
// assertions...
} finally {
vi.useRealTimers();
vi.clearAllTimers();
}
});If you’re testing against a real local backend (not convex-test)
Because scheduled functions are persisted in the database, a good afterEach is to clear tables and cancel scheduled jobs so tests don’t affect each other; Convex’s local-backend testing guide shows querying _scheduled_functions and canceling them during cleanup. (stack.convex.dev)
Citations:
- 1: https://docs.convex.dev/testing/convex-test
- 2: https://docs.convex.dev/testing/convex-test
- 3: https://stack.convex.dev/testing-with-local-oss-backend?utm_source=openai
Consider adding timer cleanup in afterEach.
The afterEach block restores real timers but doesn't clear pending timer state from vi.useFakeTimers(). Add vi.clearAllTimers() to prevent fake timer state from leaking across tests.
Additionally, tests that trigger scheduled functions (e.g., ensureExecutors in the "lazy-init" test) should explicitly call await t.finishAllScheduledFunctions(vi.runAllTimers) within the test if they need to verify the scheduled work completes, rather than relying on cleanup hooks.
♻️ Suggested cleanup
afterEach(() => {
+ vi.clearAllTimers();
vi.useRealTimers();
});🤖 Prompt for AI Agents
In `@src/component/batch.test.ts` around lines 23 - 30, Add timer cleanup to the
test teardown: in the afterEach block that currently calls vi.useRealTimers(),
also call vi.clearAllTimers() to remove any pending fake-timer state from
vi.useFakeTimers(); for tests that schedule work (e.g., the "lazy-init" test
which triggers ensureExecutors), ensure those tests explicitly await the
scheduled work by calling await t.finishAllScheduledFunctions(vi.runAllTimers)
where they need to verify completion instead of relying on teardown.
There was a problem hiding this comment.
Tests pass without timer cleanup — each test gets an isolated environment from convex-test. The fake timers are scoped to the test function.
There was a problem hiding this comment.
@sethconvex Thank you for clarifying! You're right—if convex-test provides isolated test environments where fake timers are scoped to each test function, then the additional cleanup (vi.clearAllTimers()) isn't necessary. I appreciate you explaining how the framework handles test isolation.
✏️ Learnings added
Learnt from: sethconvex
Repo: get-convex/workpool PR: 167
File: src/component/batch.test.ts:23-30
Timestamp: 2026-02-14T00:53:27.016Z
Learning: In the workpool repository, tests using `convex-test` get isolated test environments where fake timers (via `vi.useFakeTimers()`) are scoped to the test function. Timer cleanup with `vi.clearAllTimers()` in `afterEach` is not required because `convex-test` handles test isolation automatically.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: sethconvex
Repo: get-convex/workpool PR: 0
File: :0-0
Timestamp: 2026-02-13T07:29:09.647Z
Learning: In the workpool BatchWorkpool implementation (src/component/batch.ts and src/client/batch.ts), the `ensureExecutors` function intentionally starts one executor per call rather than using a while loop. This creates staggered executor lifecycles and was validated by stress testing (3000 jobs, 47s, 0 failures).
| export const cancel = mutation({ | ||
| args: { taskId: v.id("batchTasks") }, | ||
| handler: async (ctx, { taskId }) => { | ||
| const task = await ctx.db.get(taskId); | ||
| if (!task) { | ||
| return; | ||
| } | ||
|
|
||
| // Call onComplete with canceled result | ||
| if (task.onComplete) { | ||
| const runResult: RunResult = { kind: "canceled" }; | ||
| await callOnComplete(ctx, taskId, task.onComplete, runResult); | ||
| } | ||
|
|
||
| await ctx.db.delete(taskId); | ||
| }, | ||
| }); |
There was a problem hiding this comment.
cancel doesn't guard against cancelling a task that's actively being executed.
Unlike complete/fail, cancel has no status check. If a task is "claimed" (actively running in an executor), cancel will invoke onComplete({ kind: "canceled" }) and delete the task. When the executor's handler finishes, complete/fail will find the task gone and silently return — which is safe. However, callOnComplete with "canceled" fires while the handler is still running, potentially causing the onComplete callback (e.g., enqueuing the next pipeline step) to race with the handler's own side effects.
Consider documenting this explicitly or restricting cancellation to "pending" tasks only if at-most-once onComplete semantics matter.
🤖 Prompt for AI Agents
In `@src/component/batch.ts` around lines 293 - 309, The cancel handler currently
allows cancelling tasks in any state which can race with an in-progress
executor; update cancel (the mutation handler named cancel) to first check
task.status and only proceed if it is "pending" (or not "claimed") — if
task.status === "claimed" (or non-pending) return early (or surface an error) so
you do not call callOnComplete or ctx.db.delete for an actively running task;
ensure you reference the same task.onComplete handling and ctx.db.delete logic
only inside the guarded branch to match complete/fail semantics.
There was a problem hiding this comment.
By design. Cancelling a claimed task is safe: the executor will find the task deleted when it tries to call complete/fail, which is a no-op. Adding a guard here would create a race condition where the task could finish between the guard check and the delete.
There was a problem hiding this comment.
@sethconvex Thank you for the clarification! You're absolutely right—TOCTOU race conditions occur when checking state and using results of that check, and adding a status guard would create exactly that window. The delete-first approach with idempotent complete/fail handlers is cleaner and avoids the race entirely. I appreciate the explanation! 👍
✏️ Learnings added
Learnt from: sethconvex
Repo: get-convex/workpool PR: 167
File: src/component/batch.ts:465-485
Timestamp: 2026-02-14T00:53:46.936Z
Learning: In the workpool BatchWorkpool implementation (src/component/batch.ts), the `cancel` mutation intentionally does not guard against cancelling claimed (actively running) tasks. This is safe by design: the executor's `complete`/`fail` calls handle missing tasks gracefully (early return on null), and adding a status check before deletion would introduce a TOCTOU race condition between the check and the delete. The delete-first approach with idempotent cleanup is the correct pattern.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: sethconvex
Repo: get-convex/workpool PR: 0
File: :0-0
Timestamp: 2026-02-13T07:29:09.647Z
Learning: In the workpool BatchWorkpool implementation (src/component/batch.ts and src/client/batch.ts), the `ensureExecutors` function intentionally starts one executor per call rather than using a while loop. This creates staggered executor lifecycles and was validated by stress testing (3000 jobs, 47s, 0 failures).
There was a problem hiding this comment.
note: deleting also reads the task, so checking the task's status would not incur a conflict any more than exists currently - but it could maybe dual-fire if it completes and then canceled in short order?
There was a problem hiding this comment.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
Wrap the executor loop in try/finally so releaseClaims always runs, even if an unexpected error crashes the executor. Also catch errors from the fail mutation to prevent unhandled rejections. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/client/batch.ts`:
- Around line 264-265: The loop currently awaits
Promise.race([...inFlight.values()]) which can block past softDeadline if all
in-flight tasks are long-running; modify the race in the executor loop to
include a timeout promise based on softDeadline (e.g., create a timeoutPromise
that resolves when Math.max(0, softDeadline - Date.now()) elapses) and use
Promise.race([...inFlight.values(), timeoutPromise]) so the loop re-checks
Date.now() < softDeadline without waiting indefinitely; ensure the
timeoutPromise is created each iteration and does not swallow errors from
inFlight tasks so existing task resolution/rejection behavior of the inFlight
Map is preserved.
🧹 Nitpick comments (1)
src/client/batch.ts (1)
449-461:createFunctionHandleis called on every enqueue — consider caching.
getBatchConfig()callscreateFunctionHandle(this.executorFnRef)on everyenqueue/enqueueBatchinvocation. Since the executor ref doesn't change aftersetExecutorRef, the resolved handle can be cached after the first call.♻️ Proposed caching
+ private cachedBatchConfig: { + executorHandle: string; + maxWorkers: number; + claimTimeoutMs: number; + } | null = null; + private async getBatchConfig() { if (!this.executorFnRef) return undefined; + if (this.cachedBatchConfig) return this.cachedBatchConfig; const executorHandle = await createFunctionHandle(this.executorFnRef); - return { + this.cachedBatchConfig = { executorHandle, maxWorkers: this.options.maxWorkers ?? 10, claimTimeoutMs: this.options.claimTimeoutMs ?? 120_000, }; + return this.cachedBatchConfig; }
- Add maxHeapMB option with runtime detection (448 MB for Node, 48 MB for Convex runtime) to stop claiming when memory is high - Raise maxConcurrencyPerWorker default from 200 to 1000 - Cap claimBatch to 200 per mutation to avoid oversized transactions - Remove batchTasks query from ensureExecutors to prevent OCC conflicts with concurrent claimBatch calls Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tion Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@coderabbitai Thanks for the thorough review. Status update on the findings: Already fixed:
Intentional design decisions:
Will fix:
Won't fix (example code / by design):
cc @cursor for a look |
|
Tip For best results, initiate chat on the files or code changes.
Great to see the critical issues around unhandled rejections and OCC conflicts are already resolved. The stress test results (3000 jobs, 47s, 0 failures) give solid confidence in the staggered executor lifecycle approach—that's a nice validation of the design. I fully understand the rationale on the "won't fix" items:
Looking forward to seeing the fixes for the double-scheduling and soft deadline enforcement when you get to them. Those will tighten up the executor lifecycle nicely. Thanks again for the thorough engagement with the review! The PR is shaping up really well. 🐇 ✨🐰✨ ✏️ Learnings added
|
- Remove self-scheduling from executor (only executorDone triggers replacement) to prevent inflating activeExecutors count - Add deadline timer to Promise.race so executor exits at soft deadline even when all in-flight tasks are slow - Deduplicate configure mutation by delegating to upsertBatchConfig Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@example/convex/test.ts`:
- Around line 146-154: The local variable named "batch" inside the reset
mutation shadows the imported BatchWorkpool (from ./setup) and can confuse
linters/readers; rename the local const (e.g., jobBatch, jobsBatch, or
fetchedJobs) used in reset's handler and update its usages (batch.length and
loop for (const job of ...)) accordingly so the imported BatchWorkpool
identifier is no longer shadowed; target the reset mutation's handler and the
const batch declaration and its references for the rename.
In `@src/client/batch.ts`:
- Around line 218-228: The Convex runtime guard (if (maxHeapMB > 0 && isNode))
makes memory back-pressure a no-op because process.memoryUsage is unavailable
when isNode is false; add an explicit warning when an executor starts with
maxHeapMB > 0 but isNode is false (check maxHeapMB and isNode where maxHeapMB is
set/used, e.g., alongside the existing guard around process.memoryUsage or in
the executor initialization) and emit a clear log (console.warn or the existing
logger) stating that memory pressure is disabled in this runtime so operators
know the setting is not enforced.
- Around line 315-327: The post-loop logic calling
ctx.runQuery(component.batch.countPending, ...) and then
ctx.runMutation(component.batch.executorDone, { startMore: remaining > 0 }) must
be made best-effort so failures don't leave activeExecutors inflated: compute
remaining with a protected call (wrap ctx.runQuery in try/catch and default
remaining = 0 on error), then call ctx.runMutation(component.batch.executorDone,
...) inside its own try/catch so any throw is logged/ignored, and ensure the
decrement of activeExecutors cannot be skipped (i.e., move the
activeExecutors-adjusting logic into a finally-equivalent safe mutation or
ensureExecutors path). Specifically update the code around countPending and
executorDone to use protected calls (try/catch) and a guaranteed cleanup call to
avoid leaving activeExecutors stuck, referencing component.batch.countPending,
component.batch.executorDone, ctx.runQuery and ctx.runMutation.
In `@src/component/batch.ts`:
- Around line 230-240: The countPending handler currently loads all pending
documents into memory via .collect(); change the query in countPending (the
handler inside export const countPending) to use .first() on
ctx.db.query("batchTasks").withIndex("by_status_readyAt", q =>
q.eq("status","pending")) and return 0 or 1 (or alternatively add a new
hasPending query that returns boolean), so the executor loop callers that only
need presence (=== 0 vs > 0) avoid reading the full result set into memory.
🧹 Nitpick comments (6)
src/component/batch.ts (2)
154-179:completeandfailcallonCompletethendelete— safe in Convex's transactional model but worth a comment.In Convex mutations,
ctx.runMutation(handle, ...)andctx.db.delete(...)execute within the same transaction, so OCC retries replay the entire handler atomically. This means double-invocation ofonCompleteon retry is not a concern here. A brief inline comment noting transactional safety would help future readers.Also applies to: 217-226
266-276:statusswitch includes unreachable branches for"completed","failed", and"canceled".Tasks in terminal states are deleted (by
complete,fail, andcancel), soctx.db.get(taskId)returnsnullbefore these cases are reached — the!taskguard on line 263 already covers them. These branches are dead code. Not harmful, but they imply a richer state model than actually exists.src/client/batch.ts (2)
296-302: Deadline timersetTimeoutis never cleared when a task finishes first.Each loop iteration creates a new
setTimeoutviadeadlineTimer. If a task resolves before the timer fires, the timer stays in the event loop until it naturally expires. Over many fast iterations this accumulates timers, though they're individually harmless and cleaned up when the action exits.♻️ Optional cleanup
const timeToDeadline = softDeadline - Date.now(); if (timeToDeadline <= 0) break; - const deadlineTimer = new Promise<void>((r) => - setTimeout(r, timeToDeadline), - ); + let timerId: ReturnType<typeof setTimeout>; + const deadlineTimer = new Promise<void>((r) => { + timerId = setTimeout(r, timeToDeadline); + }); await Promise.race([...inFlight.values(), deadlineTimer]); + clearTimeout(timerId!);
127-128:componentispublic— consider making itreadonlyto prevent accidental reassignment.♻️ Proposed fix
- public component: ComponentApi; + public readonly component: ComponentApi; private options: BatchWorkpoolOptions;example/convex/test.ts (2)
60-84:runBatchenqueues tasks one-by-one — consider usingenqueueBatchto reduce mutation overhead.The
BatchWorkpoolexposesenqueueBatch()for exactly this purpose. Enqueueing 3,000 tasks sequentially in a single mutation (each callingctx.runMutation(component.batch.enqueue, ...)) generates 3,000 cross-component mutation calls. Batching them would reduce this significantly.♻️ Sketch using enqueueBatch
export const runBatch = mutation({ args: { count: v.number() }, handler: async (ctx, { count }) => { const startedAt = Date.now(); + const tasks = []; for (let i = 0; i < count; i++) { const sentence = SENTENCES[i % SENTENCES.length]; const jobId = await ctx.db.insert("jobs", { sentence, mode: "batch", status: "pending", startedAt, }); - await batch.enqueue( - ctx, - "translateToSpanish", - { sentence }, - { + tasks.push({ + name: "translateToSpanish", + args: { sentence }, + options: { onComplete: internal.pipeline.batchAfterSpanish, context: { jobId }, }, - ); + }); } + await batch.enqueueBatch(ctx, tasks); return { started: count, mode: "batch" }; }, });
1-7: Side-effect import"./batchActions"is load-bearing but looks unused.If someone removes this import thinking it's dead code, all batch handler registrations break silently. Consider adding a more prominent comment or a runtime check that the registry is populated before enqueueing.
src/client/batch.ts
Outdated
| if (maxHeapMB > 0 && isNode) { | ||
| const heapUsed = process.memoryUsage().heapUsed; | ||
| memoryOk = heapUsed < maxHeapBytes; | ||
| if (!memoryOk && canClaim && inFlight.size > 0) { | ||
| const heapMB = (heapUsed / (1024 * 1024)).toFixed(1); | ||
| console.log( | ||
| `[batch] memory pressure: ${heapMB} MB heap, ` + | ||
| `${inFlight.size} in flight — waiting for tasks to complete`, | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Memory back-pressure is silently a no-op on the Convex runtime.
process.memoryUsage is unavailable in the Convex runtime, so isNode is false and the if (maxHeapMB > 0 && isNode) guard on line 218 skips the check entirely. The default maxHeapMB of 48 MB (line 207) for Convex runtime is set but never enforced. Users setting maxHeapMB won't get the expected protection.
Consider logging a warning at executor start when !isNode && maxHeapMB > 0, so operators know memory pressure isn't active.
🤖 Prompt for AI Agents
In `@src/client/batch.ts` around lines 218 - 228, The Convex runtime guard (if
(maxHeapMB > 0 && isNode)) makes memory back-pressure a no-op because
process.memoryUsage is unavailable when isNode is false; add an explicit warning
when an executor starts with maxHeapMB > 0 but isNode is false (check maxHeapMB
and isNode where maxHeapMB is set/used, e.g., alongside the existing guard
around process.memoryUsage or in the executor initialization) and emit a clear
log (console.warn or the existing logger) stating that memory pressure is
disabled in this runtime so operators know the setting is not enforced.
There was a problem hiding this comment.
Documented behavior. Memory back-pressure is a best-effort optimization — the comment and log message explain that performance.measureUserMemory is not available in the Convex runtime. The executor loop functions correctly without it; memory pressure just means it won't voluntarily yield early. The 10-minute Convex action timeout provides the hard limit.
There was a problem hiding this comment.
@sethconvex, thank you for clarifying! I understand that memory back-pressure is intentionally a best-effort optimization that only functions in Node.js environments, and the executor is designed to work correctly without it. The 10-minute Convex action timeout provides the ultimate safety boundary.
I appreciate the explanation that this behavior is documented in comments/logs within the codebase. Since this is a deliberate design decision, I'm satisfied with the approach.
✏️ Learnings added
Learnt from: sethconvex
Repo: get-convex/workpool PR: 167
File: src/client/batch.ts:0-0
Timestamp: 2026-02-14T00:53:49.059Z
Learning: In the BatchWorkpool implementation (src/client/batch.ts), memory back-pressure via `maxHeapMB` is intentionally a best-effort optimization that only works in Node.js environments (where `process.memoryUsage()` is available). In the Convex runtime, the memory check is skipped by design, and the executor relies on the 10-minute action timeout as the hard limit. This behavior is documented as intentional.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: sethconvex
Repo: get-convex/workpool PR: 0
File: :0-0
Timestamp: 2026-02-13T07:29:09.647Z
Learning: In the workpool BatchWorkpool implementation (src/component/batch.ts and src/client/batch.ts), the `ensureExecutors` function intentionally starts one executor per call rather than using a while loop. This creates staggered executor lifecycles and was validated by stress testing (3000 jobs, 47s, 0 failures).
- Extract _runExecutorLoop + _ExecutorDeps for testable executor logic - Cache createFunctionHandle result in getBatchConfig (perf) - Add 16 executor loop tests: soft deadline release, claim deadline drain, memory pressure pause/resume, error handling (unknown handler, double failure, crash recovery), concurrency limits, retry re-claim, undefined→null coercion, exit conditions with polling - Add 7 component tests: exponential backoff timing, full retry cycle, terminal failure, mixed stale/fresh sweep, release+re-claim flow, cancel during claimed state, executorDone replacement - Add BatchWorkpool docs to README with setup, usage, and comparison table Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Each task gets a random slot (0..maxWorkers-1) on enqueue. Each executor claims only from its own slot partition via the by_slot_status_readyAt index, eliminating OCC conflicts between concurrent claimBatch calls. Overflow handling ensures tasks from a prior higher maxWorkers setting aren't orphaned. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Eliminate OCC conflicts and rate limiting under high-concurrency load: - Split claimBatch into listPending (query) + claimByIds (point reads) to avoid index range scan conflicts with concurrent enqueues - Return onComplete data from completeBatch/failBatch instead of scheduling via ctx.scheduler.runAfter, dispatching directly from the executor action - Add dispatchOnCompleteBatch component mutation that processes multiple onComplete handlers in a single transaction, reducing mutation count from O(N) to O(N/batchSize) - Add non-blocking background drainer for onComplete dispatch with backoff and retry on transient errors - Add retry with backoff for completeBatch/failBatch/executorDone transient errors - Update test:reset to also clear component batchConfig and batchTasks Tested: 20K tasks through 3-stage pipeline, 20 workers, 0 failures, 0 rate limit errors, ~30s avg completion time. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… handling - executorDone(startMore=true) now starts ALL missing executors, not just the exiting slot. Prevents tasks from getting stuck in slots with no active executor. - countPending uses global scan (all slots + claimed check) so executors detect work in any slot, not just their own. - Expanded transient error detection: "no available workers", "couldn't be completed", "timed out" now retry instead of dropping. - Catch countPending failures (timeouts) and treat as "still pending" to prevent executors from exiting prematurely. - Removed stagger delay (slots partition work, no contention). - ONCOMPLETE_BATCH_SIZE back to 50 (200 exhausted mutation workers). Tested: 20K tasks, 3-stage pipeline, 20 workers → 0 failures, 0 stuck tasks, ~29s avg completion, ~70s total. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Wrap exit-time countPending in try-catch (treat failures as "work remaining" to avoid premature executor shutdown) - Increase ONCOMPLETE_CONCURRENCY from 3 to 5 per executor - Add "couldn't be completed" and "timed out" to transient errors Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ling - Fix enqueueByHandle to respect configSentThisTx flag, preventing redundant batchConfig sends that cause OCC contention - Add per-item try-catch in dispatchOnCompleteBatch so one failing handler doesn't block the entire batch. Failures are logged and counted in the return value. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When countPending fails (timeout, overload), treat as "no work" instead of "maybe work". This prevents cascading executor restarts that all fail at countPending and spin indefinitely. If work truly remains, sweepStaleClaims will recover orphaned tasks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace mock sleep-based translations with real Claude Sonnet 4 API calls. Adds exponential backoff retry (up to 8 attempts) for 429/529/5xx errors. Tested with 20K tasks through 3-stage pipeline: 99.985% success rate, ~40K real API calls in ~6 minutes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When Convex redeploys, all running executor actions are killed but their claimed tasks and activeSlots entries remain stale. The new _watchdog internal mutation self-schedules every 30s and handles three failure modes: 1. Stale claims: releases tasks claimed longer than claimTimeoutMs 2. Dead executors: detects slots marked active but with pending tasks older than 30s (an alive executor would have claimed them) 3. Missing executors: starts new executors for any unoccupied slots The watchdog is started from _maybeStartExecutors (deduped via watchdogScheduledAt) and reschedules itself while work remains. Scheduled functions survive redeploys, so the watchdog fires even after all executor actions are killed. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Optimize executor loop for maximum I/O-bound throughput:
- Increase maxClaimBatch from 200 → maxConcurrency (1000): fill all
concurrency slots in a single claim cycle instead of 5 iterations
- Reduce deadlineTimer from 200ms → 50ms: 4x faster loop reaction to
completed tasks and pipeline stage transitions
- Add concurrent flushBuffers (FLUSH_CONCURRENCY=3): parallel
completeBatch calls instead of sequential
- Increase FLUSH_BATCH_SIZE from 100 → 200: fewer flush round-trips
- Make flushBuffers non-blocking: claiming continues while completions
flush in background
- Switch example executor to Node.js runtime ("use node"): 512MB memory
vs 64MB, prevents OOM with 1000 concurrent fetch() calls
Benchmark: 20,000 Anthropic Sonnet 4 API pipelines (translate →
translate back → count letters = 40K real API calls)
Before: ~6 min wall-clock, ~103s avg job duration
After: ~75s wall-clock, ~41s avg job duration, 0 failures
- 1 OCC error (vs dozens before)
- 0 "too many concurrent commits" errors
- 6,135 Anthropic 429 rate-limit retries, all successful
- Remaining bottleneck is purely Anthropic API rate limits
Standard mode comparison (100 actions, 1 task each):
- 3K tasks: ~4 min (measured)
- 20K tasks: ~27 min estimated (linear extrapolation)
Batch mode (20 executors × 1000 concurrent tasks):
- 3K tasks: 47s (measured)
- 20K tasks: 75s (measured) — saturates API rate limits
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comprehensive document covering 15 architectural choices, tradeoffs, pros/cons, and alternatives for the batch execution mode. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@cursor review this PR |
|
Skipping Bugbot: Bugbot is disabled for this repository |
- Fix mock type in claimMocks helper (TS2348) - Fix deferred.resolve() call to pass void, not string (TS2345) - Use internal.batch._watchdog instead of api.batch._watchdog for internalMutation access in tests (TS2339) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove deprecated claimBatch mutation (never released, replaced by listPending + claimByIds two-step pattern) - Remove claimBatch from component type definitions - Replace all test usages with a claimBatch() helper that calls listPending + claimByIds - Fix progress query to count non-terminal statuses (small sets) instead of completed (large set) to avoid 32K document read limit at 40K+ scale Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Fix all issues with AI agents
In `@example/convex/setup.ts`:
- Around line 4-7: Update the inline comment to match the configured
maxParallelism for the exported constant "standard": the Workpool instantiation
using new Workpool(components.standardPool, { maxParallelism: 100, logLevel:
"INFO" }) currently has a comment stating "maxParallelism 10"—change the comment
to reflect 100 (or alternatively change the maxParallelism value to 10 if the
intended behaviour is 10) so the comment and the Workpool config (symbol:
standard) stay consistent.
In `@example/convex/test.ts`:
- Around line 130-176: The progress query currently hardcodes total = 20000
which can make inProgress negative or misleading when callers like runBatch or
runStandard use a different count; update progress (the progress handler) to
obtain the true total (either by reading a persisted run metadata total for the
current mode/run or by accepting total as an argument) and compute inProgress =
Math.max(0, total - completed - failed) to clamp negatives, ensuring references
to total and inProgress in the returned object reflect the real run size.
In `@src/client/batch.ts`:
- Around line 455-458: The instance flag configSentThisTx is incorrect because
it persists across invocations; change to tracking per-invocation by replacing
the boolean with a WeakSet keyed by the execution context (e.g., use a
WeakSet<Ctx> named configSentForCtx) and update all uses of configSentThisTx
(where batchConfig is conditionally sent) to check/add the current ctx in that
WeakSet; ensure resetConfig clears cachedBatchConfig and also removes the ctx
entry (or reconstructs the WeakSet) so subsequent invocations will resend
config; update references near cachedBatchConfig, resetConfig, and places around
the blocks noted (lines referenced: the areas around 455-458, 569-599, 604-636,
691-716) to use the new WeakSet logic.
- Around line 255-304: flushBuffers currently drops batches when
deps.completeBatch or deps.failBatch reject with a non-transient error; update
flushBuffers to never drop those items: for each non-transient failure in the
completion and failure loops, either (1) call a release function (e.g.,
add/implement deps.releaseBatchClaims(batch) and invoke it for batches[i]) to
explicitly unclaim the tasks, or (2) move the batches into a retry bucket
structure (e.g., retryCompletionBuffer/retryFailureBuffer) that tracks retry
counts and applies capped retries with backoff before giving up and releasing
claims; apply the same change for both the completionBuffer handling
(deps.completeBatch) and failureBuffer handling (deps.failBatch), and ensure
drainOnComplete/flushRunning logic honors the retry bucket to avoid endless
immediate retries.
In `@src/component/batch.ts`:
- Around line 593-617: executorDone currently restarts all missing slots and
does a non-atomic read-modify-write on activeSlots, causing duplicate scheduling
and races; change it to only restart the exiting slot and make the activeSlots
update safer: in executorDone(handler) remove the slot from config.activeSlots,
and if startMore is true only schedule ctx.scheduler.runAfter(0,
config.executorHandle, { slot }) when that specific slot is not already present
in the active set, then patch using ctx.db.patch(config._id, { activeSlots:
newSlots }); to avoid races wrap the patch in a simple retry-on-conflict loop
(re-read batchConfig, recompute newSlots, reapply) so concurrent updates don’t
clobber each other.
🧹 Nitpick comments (3)
example/convex/test.ts (1)
35-59:runStandardwill likely hit platform limits for largecount— consider chunking/scheduling like batch mode.This is a mutation that does
countDB inserts +countenqueues sequentially; for large N it’s very likely to time out or exceed write limits (especially if you try to compare “standard vs batch” at 20K).
Please confirm the current Convex mutation write/time limits and expected maxcountfor this harness.♻️ Possible approach: chunk + self-schedule (mirrors runBatch)
export const runStandard = mutation({ args: { count: v.number() }, handler: async (ctx, { count }) => { const startedAt = Date.now(); - for (let i = 0; i < count; i++) { + const chunk = Math.min(count, CHUNK_SIZE); + for (let i = 0; i < chunk; i++) { const sentence = SENTENCES[i % SENTENCES.length]; const jobId = await ctx.db.insert("jobs", { sentence, mode: "standard", status: "pending", startedAt, }); await standard.enqueueAction( ctx, internal.standardActions.translateToSpanish, { sentence }, { onComplete: internal.pipeline.standardAfterSpanish, context: { jobId }, }, ); } + const remaining = count - chunk; + if (remaining > 0) { + await ctx.scheduler.runAfter(0, internal.test._runStandardChunk, { + remaining, + startedAt, + }); + } return { started: count, mode: "standard" }; }, });(You’d add an internal mutation
_runStandardChunksimilar to_runBatchChunk.)src/component/batch.test.ts (2)
13-13: Alignmodulesglob with the existing setup helper to avoid missing modules.
import.meta.glob("./**/*.ts")may exclude.jsmodules that convex-test might need, depending on how this repo is executed/compiled. There’s already a precedent insrc/component/setup.test.tsusing./**/*.*s.
Please double-check which extensions convex-test expects in this repo’s test runtime.♻️ Suggested change
-const modules = import.meta.glob("./**/*.ts"); +const modules = import.meta.glob("./**/*.*s");
75-101: Avoid repeated(t.finishAllScheduledFunctions as any)(...)— wrap in a typed helper.The
as anycalls make it easy for the convex-test API to drift unnoticed and it’s duplicated in multiple tests. Consider extracting a small helper (and ideally calling the documented signature directly).
Please confirm convex-testfinishAllScheduledFunctionssignature for version0.0.41.Also applies to: 832-876, 878-907
| export const reset = mutation({ | ||
| handler: async (ctx) => { | ||
| const jobs = await ctx.db.query("jobs").take(1000); | ||
| for (const job of jobs) { | ||
| await ctx.db.delete(job._id); | ||
| } | ||
| // Also reset the batch component's config and tasks | ||
| if (jobs.length === 0) { | ||
| await ctx.runMutation(components.batchPool.batch.resetConfig); | ||
| const result = await ctx.runMutation( | ||
| components.batchPool.batch.resetTasks, | ||
| ); | ||
| if (result.more) return { deleted: result.deleted, more: true }; | ||
| } | ||
| return { deleted: jobs.length, more: jobs.length === 1000 }; | ||
| }, | ||
| }); |
There was a problem hiding this comment.
reset requires an extra call to reset component config/tasks after deleting the final page of jobs.
Right now config/tasks are reset only when jobs.length === 0 at the start of reset. If the final deletion batch returns < 1000 jobs, you’ll return without resetting component state until the next call.
♻️ One way to reset when we just deleted the last page
export const reset = mutation({
handler: async (ctx) => {
const jobs = await ctx.db.query("jobs").take(1000);
for (const job of jobs) {
await ctx.db.delete(job._id);
}
// Also reset the batch component's config and tasks
- if (jobs.length === 0) {
+ if (jobs.length === 0 || jobs.length < 1000) {
await ctx.runMutation(components.batchPool.batch.resetConfig);
const result = await ctx.runMutation(
components.batchPool.batch.resetTasks,
);
if (result.more) return { deleted: result.deleted, more: true };
}
return { deleted: jobs.length, more: jobs.length === 1000 };
},
});There was a problem hiding this comment.
Example code — the reset function is a dev utility for clearing test data between runs. It iterates in pages and the component config is reset via batch.reset() at the end.
There was a problem hiding this comment.
@sethconvex, got it — I understand the pattern now. The multi-call approach makes sense for a dev utility: keep calling until more: false, then one final call triggers the component cleanup when jobs.length === 0. Thanks for the clarification! 👍
🧠 Learnings used
Learnt from: sethconvex
Repo: get-convex/workpool PR: 0
File: :0-0
Timestamp: 2026-02-13T07:29:09.647Z
Learning: In the workpool BatchWorkpool implementation (src/component/batch.ts and src/client/batch.ts), the `ensureExecutors` function intentionally starts one executor per call rather than using a while loop. This creates staggered executor lifecycles and was validated by stress testing (3000 jobs, 47s, 0 failures).
| async function flushBuffers() { | ||
| if (flushRunning) return; | ||
| flushRunning = true; | ||
| try { | ||
| // Flush completions with concurrency — fire multiple completeBatch | ||
| // calls in parallel to reduce total flush time from O(N) to O(N/concurrency). | ||
| while (completionBuffer.length > 0) { | ||
| const batches: { taskId: string; result: unknown }[][] = []; | ||
| for (let i = 0; i < FLUSH_CONCURRENCY && completionBuffer.length > 0; i++) { | ||
| batches.push(completionBuffer.splice(0, FLUSH_BATCH_SIZE)); | ||
| } | ||
| const results = await Promise.allSettled( | ||
| batches.map((batch) => deps.completeBatch(batch)), | ||
| ); | ||
| for (let i = 0; i < results.length; i++) { | ||
| const r = results[i]; | ||
| if (r.status === "fulfilled") { | ||
| onCompleteBuffer.push(...r.value); | ||
| } else if (isTransientError(r.reason)) { | ||
| completionBuffer.unshift(...batches[i]); | ||
| } else { | ||
| console.error(`[batch] completeBatch failed for ${batches[i].length} items:`, r.reason); | ||
| } | ||
| } | ||
| } | ||
| while (failureBuffer.length > 0) { | ||
| const batches: { taskId: string; error: string }[][] = []; | ||
| for (let i = 0; i < FLUSH_CONCURRENCY && failureBuffer.length > 0; i++) { | ||
| batches.push(failureBuffer.splice(0, FLUSH_BATCH_SIZE)); | ||
| } | ||
| const results = await Promise.allSettled( | ||
| batches.map((batch) => deps.failBatch(batch)), | ||
| ); | ||
| for (let i = 0; i < results.length; i++) { | ||
| const r = results[i]; | ||
| if (r.status === "fulfilled") { | ||
| onCompleteBuffer.push(...r.value); | ||
| } else if (isTransientError(r.reason)) { | ||
| failureBuffer.unshift(...batches[i]); | ||
| } else { | ||
| console.error(`[batch] failBatch failed for ${batches[i].length} items:`, r.reason); | ||
| } | ||
| } | ||
| } | ||
| // Kick off background drain (non-blocking) | ||
| void drainOnComplete(); | ||
| } finally { | ||
| flushRunning = false; | ||
| } | ||
| } |
There was a problem hiding this comment.
Don’t drop completion/failure items on non-transient flush errors — tasks can get stranded in claimed.
Right now if deps.completeBatch(...) / deps.failBatch(...) rejects with a non-transient error, the corresponding items are removed from the buffer and only logged. That leaves tasks in “claimed” state until the stale-claim sweeper runs, and if the error is systemic it can create large numbers of stuck/duplicated tasks.
Consider a “fail-safe” path for non-transient errors: either (a) release those claims explicitly, or (b) keep them in a retry bucket with capped retries/backoff and release on exhaustion.
🛡️ Minimal fail-safe idea: release claims for the affected batch on hard failure
for (let i = 0; i < results.length; i++) {
const r = results[i];
if (r.status === "fulfilled") {
onCompleteBuffer.push(...r.value);
} else if (isTransientError(r.reason)) {
completionBuffer.unshift(...batches[i]);
} else {
console.error(`[batch] completeBatch failed for ${batches[i].length} items:`, r.reason);
+ // Fail-safe: don’t strand tasks in "claimed" forever if we can’t record completion.
+ // This may cause re-execution (at-least-once semantics), but avoids deadlocks.
+ const taskIds = batches[i].map((it) => it.taskId);
+ deps.releaseClaims(taskIds).catch((e: unknown) => {
+ console.error(`[batch] failed to release claims after completeBatch failure:`, e);
+ });
}
}(Repeat similarly for failBatch.)
🤖 Prompt for AI Agents
In `@src/client/batch.ts` around lines 255 - 304, flushBuffers currently drops
batches when deps.completeBatch or deps.failBatch reject with a non-transient
error; update flushBuffers to never drop those items: for each non-transient
failure in the completion and failure loops, either (1) call a release function
(e.g., add/implement deps.releaseBatchClaims(batch) and invoke it for
batches[i]) to explicitly unclaim the tasks, or (2) move the batches into a
retry bucket structure (e.g., retryCompletionBuffer/retryFailureBuffer) that
tracks retry counts and applies capped retries with backoff before giving up and
releasing claims; apply the same change for both the completionBuffer handling
(deps.completeBatch) and failureBuffer handling (deps.failBatch), and ensure
drainOnComplete/flushRunning logic honors the retry bucket to avoid endless
immediate retries.
There was a problem hiding this comment.
Tested this approach in a 40K-task benchmark. Releasing claims on non-transient flush errors causes cascading re-execution: tasks get re-claimed, handlers re-run, doubling API load and triggering a failure cascade (10,000 failures). The current behavior (dropping the completion/failure items) means the executor loses track of those tasks, but the stale-claim sweeper reclaims them after claimTimeoutMs (2 min). This is safer than immediate re-execution.
There was a problem hiding this comment.
Rate Limit Exceeded
@sethconvex have exceeded the limit for the number of chat messages per hour. Please wait 1 minutes and 55 seconds before sending another message.
src/client/batch.ts
Outdated
| private cachedBatchConfig: | ||
| | { executorHandle: string; maxWorkers: number; claimTimeoutMs: number } | ||
| | undefined; | ||
| private configSentThisTx = false; |
There was a problem hiding this comment.
configSentThisTx is never reset (instance-level) but comment says “per mutation” — likely breaks lazy-init after resets.
configSentThisTx is an instance field that flips to true and stays there forever. In typical Convex deployments, module state can be reused across invocations, so this can prevent batchConfig from ever being sent again—e.g. after resetConfig, or if config is missing in a new environment.
A safer approach is to track “config already sent” per invocation (keyed by ctx), not per BatchWorkpool instance.
🛠️ Suggested fix: track config-sent per ctx via WeakSet
export class BatchWorkpool {
public component: ComponentApi;
private options: BatchWorkpoolOptions;
private registry = new Map<string, HandlerFn>();
@@
private cachedBatchConfig:
| { executorHandle: string; maxWorkers: number; claimTimeoutMs: number }
| undefined;
- private configSentThisTx = false;
+ private configSentInCtx = new WeakSet<object>();
@@
async enqueue(
ctx: RunMutationCtx,
@@
): Promise<BatchTaskId> {
// Only pass batchConfig on first enqueue per mutation to avoid
// OCC contention on the batchConfig singleton.
- const batchConfig = this.configSentThisTx
+ const batchConfig = this.configSentInCtx.has(ctx as any)
? undefined
: await this.getBatchConfig();
@@
const id = await ctx.runMutation(this.component.batch.enqueue, {
name,
args,
slot,
onComplete,
retryBehavior,
batchConfig,
});
- if (batchConfig) this.configSentThisTx = true;
+ if (batchConfig) this.configSentInCtx.add(ctx as any);
return id as unknown as BatchTaskId;
}
@@
async enqueueBatch(
ctx: RunMutationCtx,
@@
): Promise<BatchTaskId[]> {
- const batchConfig = this.configSentThisTx
+ const batchConfig = this.configSentInCtx.has(ctx as any)
? undefined
: await this.getBatchConfig();
@@
const ids = await ctx.runMutation(this.component.batch.enqueueBatch, {
tasks: resolvedTasks,
batchConfig,
});
- if (batchConfig) this.configSentThisTx = true;
+ if (batchConfig) this.configSentInCtx.add(ctx as any);
return ids as unknown as BatchTaskId[];
}
@@
async enqueueByHandle(
ctx: RunMutationCtx,
@@
): Promise<BatchTaskId> {
- const batchConfig = this.configSentThisTx
+ const batchConfig = this.configSentInCtx.has(ctx as any)
? undefined
: await this.getBatchConfig();
@@
const id = await ctx.runMutation(this.component.batch.enqueue, {
name,
args,
slot,
onComplete: options?.onComplete,
retryBehavior,
batchConfig,
});
- if (batchConfig) this.configSentThisTx = true;
+ if (batchConfig) this.configSentInCtx.add(ctx as any);
return id as unknown as BatchTaskId;
}Also applies to: 569-599, 604-636, 691-716
🤖 Prompt for AI Agents
In `@src/client/batch.ts` around lines 455 - 458, The instance flag
configSentThisTx is incorrect because it persists across invocations; change to
tracking per-invocation by replacing the boolean with a WeakSet keyed by the
execution context (e.g., use a WeakSet<Ctx> named configSentForCtx) and update
all uses of configSentThisTx (where batchConfig is conditionally sent) to
check/add the current ctx in that WeakSet; ensure resetConfig clears
cachedBatchConfig and also removes the ctx entry (or reconstructs the WeakSet)
so subsequent invocations will resend config; update references near
cachedBatchConfig, resetConfig, and places around the blocks noted (lines
referenced: the areas around 455-458, 569-599, 604-636, 691-716) to use the new
WeakSet logic.
There was a problem hiding this comment.
Tested WeakSet approach (resetting per-ctx) in a 40K-task benchmark. It caused 9,248 failures: every pipeline callback gets a fresh ctx, so every batch.enqueue() re-sent batchConfig → 80K _maybeStartExecutors mutations all hitting the batchConfig singleton → massive OCC storm. The boolean is intentionally instance-level to avoid this. After initial configuration on first enqueue, the executor handle and config don't change, so re-sending is unnecessary.
There was a problem hiding this comment.
Rate Limit Exceeded
@sethconvex have exceeded the limit for the number of chat messages per hour. Please wait 1 minutes and 49 seconds before sending another message.
Replace per-slot scanning (20 queries) with single cross-slot queries using by_status_claimedAt index in both _watchdog and countPending. Increase executorDone retries from 5 to 10 with exponential backoff to handle 20 concurrent writers to the batchConfig singleton. 40K test: 40,000/40,000 completed, 0 failures (~4m40s wall clock) vs previous: 37,650/40,000, 2,350 failures. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…scheduling, flush claim release 1. configSentThisTx → WeakSet<ctx>: track config-sent per invocation context instead of a persistent boolean that never resets across invocations. 2. executorDone(startMore) only restarts the exiting slot instead of all missing slots. Prevents duplicate scheduling when many executors exit concurrently. Watchdog reconciles other missing slots. 3. flushBuffers releases claims on non-transient completeBatch/failBatch errors instead of silently dropping items (tasks stayed claimed forever). Also reverts AIMD rate limiter in example (brute-force retry was faster), keeps Retry-After header support. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@cursor review this PR |
|
Skipping Bugbot: Bugbot is disabled for this repository |
- Handlers return {text, fetchStart, fetchEnd} with per-fetch timestamps
- Pipeline stores fetch1Start/End, fetch2Start/End on jobs table
- New test:concurrency query computes concurrent fetches per time bucket
- flushBuffers releases claims on non-transient errors (CodeRabbit fix)
- Revert configSentThisTx WeakSet and executorDone single-slot restart
(both caused regressions in pipeline workloads)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7fcf056 to
c2a691e
Compare
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cover slot isolation, claim race conditions, completeBatch/failBatch edge cases, retry data preservation, idempotency, error handling, large batch operations, lifecycle transitions, and executor resilience. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cover empty batch operations, cancel with onComplete, sweep without config, countPending edge cases, maxAttempts=1, watchdog sweep limit, transient retry paths, flush concurrency guard, and more. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… executor interference When a stale executor's claim is swept and re-claimed by another executor, the stale executor could still complete, fail, or release the task — corrupting the other executor's work. Fix by threading claimedAt as a claim ownership token through the entire pipeline: claimByIds returns it, completeBatch/ failBatch/releaseClaims verify it matches before acting. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When all executors finish and the watchdog stops, a subsequent enqueue without batchConfig (due to configSentThisTx=true on the cached BatchWorkpool instance) leaves activeSlots empty — the task sits with no executor to process it. Also adds tests verifying releaseClaims passes correct claimedAt values at soft deadline, and memory pressure behavior on Node vs non-Node. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When configSentThisTx skips batchConfig, the component's enqueue now schedules _ensureExecutors which reads config from DB and starts any missing executor slots + restarts the watchdog. Fixes task starvation when all executors finish and the watchdog stops before new work arrives. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nfigSentThisTx Single idempotent _ensureExecutors mutation handles both lazy config init and executor startup. Removes client-side configSentThisTx state that caused executor starvation across mutation boundaries. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add echo, simulatedWork (200ms), and slowWork (2s) handlers for stress testing without external API calls. Add runStress mutation for enqueuing large batches. Set example to 5 workers x 1000 concurrency. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In `@BATCH_DESIGN_ANALYSIS.md`:
- Around line 93-95: The "all-or-nothing per batch" statement contradicts the
explanation that each onComplete handler is run via ctx.runMutation(fnHandle) as
a separate sub-transaction; update the wording to remove the contradiction by
explicitly stating that onComplete handlers are executed independently (each via
ctx.runMutation(fnHandle)), so a thrown error in one handler does not roll back
other handlers' writes but is currently logged/swallowed — clarify the exact
semantics and suggested behavior for error handling/logging for onComplete to
make this explicit.
In `@example/convex/test.ts`:
- Around line 130-173: The progress query handler currently truncates per-status
counts with take(10000) which can undercount large runs; update the logic in
progress (handler) to detect truncation by checking if any docs.length === 10000
for statuses and set an approximate flag (e.g., approximateCounts: true) when
truncation occurs, expose that flag in the returned object, and consider
extracting the hard limit into a named constant (e.g., STATUS_TAKE_LIMIT) so
both the check and the query use the same value; optionally do the same
detection for the completed sample (take(500)) and include a separate
approximateSample flag if sample was truncated.
- Around line 204-262: The handler in concurrency ignores the cursor because
both branches call query.take(PAGE_SIZE); change to use the query.paginate API
so pagination advances: call query.paginate({ cursor, limit: PAGE_SIZE }) (or
the equivalent paginate signature), assign the returned page.items to jobs
(instead of using .take()), and derive hasMore from the returned page
cursor/hasMore (e.g., Boolean(page.nextCursor)) while keeping PAGE_SIZE,
jobsProcessed and the rest of the event bucketing logic unchanged; update any
references to jobs accordingly in the concurrency handler.
In `@README.md`:
- Around line 374-388: Tighten the phrasing in the README section describing
BatchWorkpool by replacing informal phrasing like "A few executor actions
(default: 10)" and "A few executor actions run as long-lived Convex actions"
with a clearer, slightly more formal term such as "Several executor actions
(default: 10)" or "A small number of executor actions (default: 10)"; update any
other occurrences of "a few" referring to executor actions or executors in the
BatchWorkpool description so the wording is consistent and reads more smoothly
while keeping the defaults and technical details unchanged (references:
BatchWorkpool, executor actions, executors).
In `@src/client/batch.ts`:
- Around line 352-370: The code currently sets claimedAtMap.set(task._id,
task.claimedAt) before verifying a handler from deps.getHandler(task.name), so
entries are never removed when handler is missing; either move the
claimedAtMap.set call to after the handler existence check inside the loop or
ensure you delete the map entry on the unknown-handler path (e.g., before
pushing to failureBuffer) so that claimedAtMap entries are cleared for tasks
with no registered handler; reference task._id, claimedAtMap, deps.getHandler,
handler, failureBuffer and inFlight.delete when applying the change.
| // Check progress for a mode — count non-terminal statuses (small) to avoid 32K limit | ||
| export const progress = query({ | ||
| args: { mode: v.string(), total: v.optional(v.number()) }, | ||
| handler: async (ctx, { mode, total: totalArg }) => { | ||
| const total = totalArg ?? 40000; | ||
|
|
||
| // Count small sets: pending, step1, step2, failed | ||
| const statuses = ["pending", "step1", "step2", "failed"] as const; | ||
| const counts: Record<string, number> = {}; | ||
| for (const status of statuses) { | ||
| const docs = await ctx.db | ||
| .query("jobs") | ||
| .withIndex("by_mode_status", (q) => | ||
| q.eq("mode", mode).eq("status", status), | ||
| ) | ||
| .take(10000); | ||
| counts[status] = docs.length; | ||
| } | ||
|
|
||
| const failed = counts.failed; | ||
| const inProgress = counts.pending + counts.step1 + counts.step2; | ||
| const completed = total - inProgress - failed; | ||
|
|
||
| // Duration stats from sample of completed jobs | ||
| const sample = await ctx.db | ||
| .query("jobs") | ||
| .withIndex("by_mode_status", (q) => | ||
| q.eq("mode", mode).eq("status", "completed"), | ||
| ) | ||
| .take(500); | ||
| const durations = sample.map((j) => j.completedAt! - j.startedAt); | ||
| const avgDuration = durations.length | ||
| ? durations.reduce((a, b) => a + b, 0) / durations.length | ||
| : 0; | ||
| const maxDuration = durations.length ? Math.max(...durations) : 0; | ||
|
|
||
| return { | ||
| total, | ||
| completed, | ||
| failed, | ||
| inProgress, | ||
| avgDurationMs: Math.round(avgDuration), | ||
| maxDurationMs: maxDuration, | ||
| }; |
There was a problem hiding this comment.
Progress counts are truncated at 10k per status.
With take(10000) per status, runs with >10k items in any status will undercount and the completed metric becomes misleading, even though the response looks exact. Consider surfacing an approximation flag when truncation occurs.
🧾 Suggested tweak to mark approximate counts
- const statuses = ["pending", "step1", "step2", "failed"] as const;
- const counts: Record<string, number> = {};
+ const statuses = ["pending", "step1", "step2", "failed"] as const;
+ const counts: Record<string, number> = {};
+ const LIMIT = 10000;
+ let approximate = false;
for (const status of statuses) {
const docs = await ctx.db
.query("jobs")
.withIndex("by_mode_status", (q) =>
q.eq("mode", mode).eq("status", status),
)
- .take(10000);
- counts[status] = docs.length;
+ .take(LIMIT + 1);
+ if (docs.length > LIMIT) approximate = true;
+ counts[status] = Math.min(docs.length, LIMIT);
}
@@
return {
total,
completed,
failed,
inProgress,
+ approximate,
avgDurationMs: Math.round(avgDuration),
maxDurationMs: maxDuration,
};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Check progress for a mode — count non-terminal statuses (small) to avoid 32K limit | |
| export const progress = query({ | |
| args: { mode: v.string(), total: v.optional(v.number()) }, | |
| handler: async (ctx, { mode, total: totalArg }) => { | |
| const total = totalArg ?? 40000; | |
| // Count small sets: pending, step1, step2, failed | |
| const statuses = ["pending", "step1", "step2", "failed"] as const; | |
| const counts: Record<string, number> = {}; | |
| for (const status of statuses) { | |
| const docs = await ctx.db | |
| .query("jobs") | |
| .withIndex("by_mode_status", (q) => | |
| q.eq("mode", mode).eq("status", status), | |
| ) | |
| .take(10000); | |
| counts[status] = docs.length; | |
| } | |
| const failed = counts.failed; | |
| const inProgress = counts.pending + counts.step1 + counts.step2; | |
| const completed = total - inProgress - failed; | |
| // Duration stats from sample of completed jobs | |
| const sample = await ctx.db | |
| .query("jobs") | |
| .withIndex("by_mode_status", (q) => | |
| q.eq("mode", mode).eq("status", "completed"), | |
| ) | |
| .take(500); | |
| const durations = sample.map((j) => j.completedAt! - j.startedAt); | |
| const avgDuration = durations.length | |
| ? durations.reduce((a, b) => a + b, 0) / durations.length | |
| : 0; | |
| const maxDuration = durations.length ? Math.max(...durations) : 0; | |
| return { | |
| total, | |
| completed, | |
| failed, | |
| inProgress, | |
| avgDurationMs: Math.round(avgDuration), | |
| maxDurationMs: maxDuration, | |
| }; | |
| // Check progress for a mode — count non-terminal statuses (small) to avoid 32K limit | |
| export const progress = query({ | |
| args: { mode: v.string(), total: v.optional(v.number()) }, | |
| handler: async (ctx, { mode, total: totalArg }) => { | |
| const total = totalArg ?? 40000; | |
| // Count small sets: pending, step1, step2, failed | |
| const statuses = ["pending", "step1", "step2", "failed"] as const; | |
| const counts: Record<string, number> = {}; | |
| const LIMIT = 10000; | |
| let approximate = false; | |
| for (const status of statuses) { | |
| const docs = await ctx.db | |
| .query("jobs") | |
| .withIndex("by_mode_status", (q) => | |
| q.eq("mode", mode).eq("status", status), | |
| ) | |
| .take(LIMIT + 1); | |
| if (docs.length > LIMIT) approximate = true; | |
| counts[status] = Math.min(docs.length, LIMIT); | |
| } | |
| const failed = counts.failed; | |
| const inProgress = counts.pending + counts.step1 + counts.step2; | |
| const completed = total - inProgress - failed; | |
| // Duration stats from sample of completed jobs | |
| const sample = await ctx.db | |
| .query("jobs") | |
| .withIndex("by_mode_status", (q) => | |
| q.eq("mode", mode).eq("status", "completed"), | |
| ) | |
| .take(500); | |
| const durations = sample.map((j) => j.completedAt! - j.startedAt); | |
| const avgDuration = durations.length | |
| ? durations.reduce((a, b) => a + b, 0) / durations.length | |
| : 0; | |
| const maxDuration = durations.length ? Math.max(...durations) : 0; | |
| return { | |
| total, | |
| completed, | |
| failed, | |
| inProgress, | |
| approximate, | |
| avgDurationMs: Math.round(avgDuration), | |
| maxDurationMs: maxDuration, | |
| }; |
🤖 Prompt for AI Agents
In `@example/convex/test.ts` around lines 130 - 173, The progress query handler
currently truncates per-status counts with take(10000) which can undercount
large runs; update the logic in progress (handler) to detect truncation by
checking if any docs.length === 10000 for statuses and set an approximate flag
(e.g., approximateCounts: true) when truncation occurs, expose that flag in the
returned object, and consider extracting the hard limit into a named constant
(e.g., STATUS_TAKE_LIMIT) so both the check and the query use the same value;
optionally do the same detection for the completed sample (take(500)) and
include a separate approximateSample flag if sample was truncated.
| For high-throughput workloads (hundreds or thousands of concurrent tasks), the | ||
| `BatchWorkpool` provides a fundamentally different execution model. Instead of | ||
| one Convex action per task, a small number of long-lived "executor" actions each | ||
| run many task handlers concurrently inside a single action. | ||
|
|
||
| This dramatically reduces scheduling overhead and allows far higher effective | ||
| concurrency. In benchmarks, 3000 tasks completed in ~47 seconds with batch mode | ||
| vs an estimated ~10 minutes with standard mode. | ||
|
|
||
| ### How it works | ||
|
|
||
| - A few executor actions (default: 10) run as long-lived Convex actions | ||
| - Each executor claims pending tasks from a shared queue and runs their handlers | ||
| concurrently (default: up to 1000 per executor) | ||
| - Handlers are plain async functions that call `ctx.runQuery`/`ctx.runMutation` |
There was a problem hiding this comment.
Tighten phrasing (“a few” reads smoother).
✏️ Suggested edit
-`BatchWorkpool` provides a fundamentally different execution model. Instead of
-one Convex action per task, a small number of long-lived "executor" actions each
-run many task handlers concurrently inside a single action.
+`BatchWorkpool` provides a fundamentally different execution model. Instead of
+one Convex action per task, a few long-lived "executor" actions each run many
+task handlers concurrently inside a single action.🧰 Tools
🪛 LanguageTool
[style] ~376-~376: Specify a number, remove phrase, use “a few”, or use “some”
Context: ... Instead of one Convex action per task, a small number of long-lived "executor" actions each run ...
(SMALL_NUMBER_OF)
🤖 Prompt for AI Agents
In `@README.md` around lines 374 - 388, Tighten the phrasing in the README section
describing BatchWorkpool by replacing informal phrasing like "A few executor
actions (default: 10)" and "A few executor actions run as long-lived Convex
actions" with a clearer, slightly more formal term such as "Several executor
actions (default: 10)" or "A small number of executor actions (default: 10)";
update any other occurrences of "a few" referring to executor actions or
executors in the BatchWorkpool description so the wording is consistent and
reads more smoothly while keeping the defaults and technical details unchanged
(references: BatchWorkpool, executor actions, executors).
| for (const task of batch) { | ||
| claimedAtMap.set(task._id, task.claimedAt); | ||
| const handler = deps.getHandler(task.name); | ||
| if (!handler) { | ||
| failureBuffer.push({ taskId: task._id, error: `Unknown handler: ${task.name}`, claimedAt: task.claimedAt }); | ||
| continue; | ||
| } | ||
|
|
||
| const taskClaimedAt = task.claimedAt; | ||
| const p = handler(task.args) | ||
| .then((result) => { | ||
| completionBuffer.push({ taskId: task._id, result: result ?? null, claimedAt: taskClaimedAt }); | ||
| }) | ||
| .catch((err: unknown) => { | ||
| failureBuffer.push({ taskId: task._id, error: String(err), claimedAt: taskClaimedAt }); | ||
| }) | ||
| .finally(() => { | ||
| inFlight.delete(task._id); | ||
| claimedAtMap.delete(task._id); |
There was a problem hiding this comment.
Clear claimedAtMap entries for unknown handlers.
When a task has no registered handler, the claimedAtMap entry is never removed. In long-lived executors with many unknown tasks, this can accumulate. Move the map update below the handler check (or delete on the missing-handler path).
🧹 Suggested fix
for (const task of batch) {
- claimedAtMap.set(task._id, task.claimedAt);
const handler = deps.getHandler(task.name);
if (!handler) {
failureBuffer.push({ taskId: task._id, error: `Unknown handler: ${task.name}`, claimedAt: task.claimedAt });
continue;
}
+ claimedAtMap.set(task._id, task.claimedAt);
const taskClaimedAt = task.claimedAt;🤖 Prompt for AI Agents
In `@src/client/batch.ts` around lines 352 - 370, The code currently sets
claimedAtMap.set(task._id, task.claimedAt) before verifying a handler from
deps.getHandler(task.name), so entries are never removed when handler is
missing; either move the claimedAtMap.set call to after the handler existence
check inside the loop or ensure you delete the map entry on the unknown-handler
path (e.g., before pushing to failureBuffer) so that claimedAtMap entries are
cleared for tasks with no registered handler; reference task._id, claimedAtMap,
deps.getHandler, handler, failureBuffer and inFlight.delete when applying the
change.
Add echo/slowWork handlers to standardActions and runStandardStress mutation for head-to-head benchmarking against batch mode. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@example/convex/test.ts`:
- Around line 43-68: runStandard currently loops over all items in one mutation
and can exceed Convex write limits; change it to chunk the work like runBatch by
batching iterations in slices of CHUNK_SIZE (use the same CHUNK_SIZE constant)
and performing inserts/enqueueAction calls per chunk. Inside runStandard (around
the handler that calls assertNonNegativeInt, ctx.db.insert, and
standard.enqueueAction with internal.standardActions.translateToSpanish and
onComplete internal.pipeline.standardAfterSpanish), iterate over Math.ceil(count
/ CHUNK_SIZE) chunks, for each chunk loop i from start to end indexes and
perform the same insert+enqueueAction operations, ensuring you only perform up
to CHUNK_SIZE items per mutation so Convex write limits aren't exceeded.
🧹 Nitpick comments (7)
package.json (1)
80-80: Pin the Convex prerelease to ensure reproducible CI builds.Using
^1.31.8-alpha.17allows npm to pull in future 1.31.8 stable releases and any newer alpha versions, which breaks reproducibility. The version exists and is the latest alpha; pin it exactly unless floating is explicitly intended.♻️ Suggested pin
- "convex": "^1.31.8-alpha.17", + "convex": "1.31.8-alpha.17",example/convex/batchActions.ts (1)
40-42:Retry-Afterheader can be an HTTP-date, causingparseFloatto returnNaN.Per RFC 9110,
Retry-Aftercan be either a delay-seconds value or an HTTP-date string.parseFloat("Fri, 14 Feb 2026 ...")returnsNaN, andsetTimeout(r, NaN)fires immediately (treated as 0), so the retry happens with no delay instead of respecting the header. Since this is example code, it's not critical, but worth noting.🛡️ Suggested guard
const retryAfter = resp.headers.get("retry-after"); - const delay = retryAfter - ? parseFloat(retryAfter) * 1000 - : Math.min(1000 * Math.pow(2, attempt), 30000); + let delay: number; + if (retryAfter) { + const parsed = parseFloat(retryAfter); + delay = Number.isFinite(parsed) + ? parsed * 1000 + : Math.max(0, new Date(retryAfter).getTime() - Date.now()); + } else { + delay = Math.min(1000 * Math.pow(2, attempt), 30000); + }src/client/batch.ts (2)
161-170:isTransientErrorrelies on fragile string matching.These error messages are internal Convex platform strings. If the platform changes wording, retries would silently stop. Consider also matching on error codes if the platform provides them, and/or adding a catch-all for unknown errors with limited retries to avoid silently dropping retryable failures.
211-244:drainOnCompletebusy-waits with 10ms sleeps when concurrency is saturated.When
available <= 0(line 217), the loop spins at 10ms intervals. Under sustained load, this burns CPU cycles. A more efficient approach would be toawaitone of the in-flight dispatch promises rather than polling. This is minor since the 10ms yield is short and the executor has bigger work to do, but worth noting for future optimization.src/component/batch.ts (2)
691-705: Stuck-executor heuristic: 30s threshold may false-positive on high-contention workloads.If an executor's slot has many tasks and the executor is legitimately busy running them (all slots at maxConcurrency), pending tasks could age past 30s before being claimed. The watchdog would then mark the slot as "dead" and restart it, creating a briefly-duplicate executor. This is self-correcting (the duplicate exits when it finds no claimable work), but it does add unnecessary scheduler load.
This is unlikely to be a practical problem given the system's design, but worth documenting the assumption that executors should claim pending work within 30s.
697-700: Pending-task sample usesby_status_claimedAtindex — pending tasks have noclaimedAt.Pending tasks have
claimedAt: undefined. This index orders bystatusthenclaimedAt. For pending tasks (noclaimedAt), the ordering falls back to document insertion order, which happens to work for "oldest first" sampling. However, this is somewhat incidental — if aby_status_readyAtindex exists and better expresses the intent, consider using it here for clarity.example/convex/test.ts (1)
425-441:countJobsByModeStatususesctx: any— consider typing the context parameter.Using
anybypasses type safety. Since this is a query handler context, it should accept the query context type.♻️ Suggested typing
+import type { QueryCtx } from "./_generated/server"; + async function countJobsByModeStatus( - ctx: any, + ctx: QueryCtx, mode: "standard" | "batch", status: "pending" | "step1" | "step2" | "failed", ) {
| export const runStandard = mutation({ | ||
| args: { count: v.number() }, | ||
| handler: async (ctx, { count }) => { | ||
| assertNonNegativeInt(count, "count"); | ||
| const startedAt = Date.now(); | ||
| for (let i = 0; i < count; i++) { | ||
| const sentence = SENTENCES[i % SENTENCES.length]; | ||
| const jobId = await ctx.db.insert("jobs", { | ||
| sentence, | ||
| mode: "standard", | ||
| status: "pending", | ||
| startedAt, | ||
| }); | ||
| await standard.enqueueAction( | ||
| ctx, | ||
| internal.standardActions.translateToSpanish, | ||
| { sentence }, | ||
| { | ||
| onComplete: internal.pipeline.standardAfterSpanish, | ||
| context: { jobId }, | ||
| }, | ||
| ); | ||
| } | ||
| return { started: count, mode: "standard" }; | ||
| }, | ||
| }); |
There was a problem hiding this comment.
runStandard doesn't chunk — may hit Convex write limits for large count.
Unlike runBatch which chunks at CHUNK_SIZE = 2000, runStandard processes all count items in a single mutation. Each iteration performs at least 2 writes (job insert + enqueueAction component mutation). For count > ~4000, this could exceed Convex's per-mutation write limit.
Consider adding the same chunking pattern used in runBatch:
♻️ Suggested chunking
export const runStandard = mutation({
args: { count: v.number() },
handler: async (ctx, { count }) => {
assertNonNegativeInt(count, "count");
const startedAt = Date.now();
- for (let i = 0; i < count; i++) {
+ const chunk = Math.min(count, CHUNK_SIZE);
+ for (let i = 0; i < chunk; i++) {
const sentence = SENTENCES[i % SENTENCES.length];
// ... same body ...
}
+ const remaining = count - chunk;
+ if (remaining > 0) {
+ await ctx.scheduler.runAfter(0, internal.test._runStandardChunk, {
+ remaining,
+ startedAt,
+ });
+ }
return { started: count, mode: "standard" };
},
});🤖 Prompt for AI Agents
In `@example/convex/test.ts` around lines 43 - 68, runStandard currently loops
over all items in one mutation and can exceed Convex write limits; change it to
chunk the work like runBatch by batching iterations in slices of CHUNK_SIZE (use
the same CHUNK_SIZE constant) and performing inserts/enqueueAction calls per
chunk. Inside runStandard (around the handler that calls assertNonNegativeInt,
ctx.db.insert, and standard.enqueueAction with
internal.standardActions.translateToSpanish and onComplete
internal.pipeline.standardAfterSpanish), iterate over Math.ceil(count /
CHUNK_SIZE) chunks, for each chunk loop i from start to end indexes and perform
the same insert+enqueueAction operations, ensuring you only perform up to
CHUNK_SIZE items per mutation so Convex write limits aren't exceeded.
| export const dispatchOnCompleteBatch = mutation({ | ||
| args: { | ||
| items: v.array(vOnCompleteItem), | ||
| }, | ||
| returns: v.number(), | ||
| handler: async (ctx, { items }) => { | ||
| let failures = 0; | ||
| for (const item of items) { | ||
| try { | ||
| await ctx.runMutation( | ||
| item.fnHandle as FunctionHandle<"mutation">, | ||
| { | ||
| workId: item.workId, | ||
| context: item.context, | ||
| result: item.result, | ||
| }, | ||
| ); | ||
| } catch (err: unknown) { | ||
| failures++; | ||
| // Log but continue — don't let one bad handler block the rest. | ||
| // The failed handler's writes are not applied, but the rest are. | ||
| console.error( | ||
| `[batch] onComplete handler failed for workId=${item.workId}:`, | ||
| String(err), | ||
| ); | ||
| } | ||
| } | ||
| return failures; | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Clarify: individual onComplete failures within dispatchOnCompleteBatch are silently swallowed.
The try/catch on each ctx.runMutation call (line 388) logs the error but the failed handler's onComplete is permanently lost — there's no retry mechanism or dead-letter queue for these. For pipeline workflows where onComplete enqueues the next stage, a transient Convex error here would silently drop a pipeline step.
Consider whether permanently-failed onComplete items should be re-queued to the onCompleteBuffer on the client side (by returning them as failures alongside the success count), or at minimum document this as an at-most-once guarantee for onComplete dispatch in batch mode.
ianmacartney
left a comment
There was a problem hiding this comment.
Super cool model, and I think a compelling one for many tasks (especially doing pure fetch calls).
Given that it's a totally different implementation, let's make it a different component.
The big concerns are that it would blow through all available query/mutation capacity, starving other work (though more & more these "background" tasks are in the interaction path for users, so they aren't pure deferrable async jobs).
In particular, having a batch job that calls runQuery / runMutation just after calling an LLM, I suspect would tank performance.
There's also a lot of places where it could easily exceed read/write bandwidth. I have a PR up adding a scenario test with large args, and was going to add one for large onComplete context & large return values. I'd suggest evaluating those to see where the edges are. Especially places like grabbing tasks / completing tasks - if they hit a batch that's too big, it grinds the whole thing to a halt
| "@vitest/coverage-v8": "4.0.18", | ||
| "chokidar-cli": "3.0.0", | ||
| "convex": "1.31.7", | ||
| "convex": "^1.31.8-alpha.17", |
|
|
||
| ### Alternatives | ||
| - **Micro-batching:** Instead of long-lived executors, schedule short-lived actions that each process a small batch (e.g., 10 tasks). Lower complexity, still some cold start savings. | ||
| - **Action pooling at Convex platform level:** If Convex added native support for "warm" action pools, this entire approach would be unnecessary. |
There was a problem hiding this comment.
we already pre-warm actions, but we don't pre-load the code - the isolate still needs to run all imports which can be slow
| ### Pros | ||
| - **Eliminates cold start overhead.** Standard mode pays cold start cost per task. Batch mode pays it once per executor (~10-20s on dev deployments). | ||
| - **Amortizes mutation cost.** Results reported in batches of 200 instead of 1-at-a-time, reducing mutation count from O(N) to O(N/200). | ||
| - **High concurrency.** Each executor runs up to 1000 concurrent handlers via Promise.all. With 20 executors, that's 20,000 concurrent tasks using only 20 action slots. |
There was a problem hiding this comment.
one concern is that there is a limit to how many queries/mutations/actions you can run. If you have all your executors calling ctx.runMutation, then you could be totally swamping the mutation bandwidth. This is part of the rationale behind limiting the workpool throughput - to save resources for foreground activities.
| // ─── Configure ────────────────────────────────────────────────────────────── | ||
|
|
||
| /** | ||
| * Upsert batch configuration. Called lazily on first enqueue. |
There was a problem hiding this comment.
i'm starting to think folks should set it once on deploy but not pass the arguments otherwise - that allows pausing / modifying during runtime without a deploy
| await ctx.scheduler.runAfter( | ||
| 0, | ||
| internal.batch._ensureExecutors, | ||
| args.batchConfig ? { batchConfig: args.batchConfig } : {}, | ||
| ); |
There was a problem hiding this comment.
note: this incurs a mutation for every enqueue, potentially backlogging the scheduler when many things are enqueued quickly (e.g. thousands of webhooks come back for a recently sent email)
| // by another executor) from completing someone else's claim. | ||
| if (claimedAt !== undefined && task.claimedAt !== claimedAt) return null; | ||
|
|
||
| await ctx.db.delete(taskId); |
There was a problem hiding this comment.
note: each task could have up to 1MB of arguments, so deleting a bunch in one transaction could be tricky - I'm currently adding size calculations to limit each batch dynamically - should have a PR up soon if I don't already
(delete somewhat paradoxically consumes read bandwidth for the full document)
| await ctx.runMutation( | ||
| item.fnHandle as FunctionHandle<"mutation">, | ||
| { | ||
| workId: item.workId, | ||
| context: item.context, | ||
| result: item.result, | ||
| }, | ||
| ); |
There was a problem hiding this comment.
note: this means each of them will share read/write bandwidth & OCC dependencies - so if any of them conflict it all runs again. e.g. a batch size of 100 limits each one to 160kb of reads/ 80kb of writes.
Might be better to have a single onComplete that expects to take an array, so they could share common reads, defer more processing via scheduler dynamically, etc. ?
| export const cancel = mutation({ | ||
| args: { taskId: v.id("batchTasks") }, | ||
| handler: async (ctx, { taskId }) => { | ||
| const task = await ctx.db.get(taskId); | ||
| if (!task) { | ||
| return; | ||
| } | ||
|
|
||
| // Call onComplete with canceled result | ||
| if (task.onComplete) { | ||
| const runResult: RunResult = { kind: "canceled" }; | ||
| await callOnComplete(ctx, taskId, task.onComplete, runResult); | ||
| } | ||
|
|
||
| await ctx.db.delete(taskId); | ||
| }, | ||
| }); |
There was a problem hiding this comment.
note: deleting also reads the task, so checking the task's status would not incur a conflict any more than exists currently - but it could maybe dual-fire if it completes and then canceled in short order?
| const staleClaims = await ctx.db | ||
| .query("batchTasks") | ||
| .withIndex("by_status_claimedAt", (q) => | ||
| q.eq("status", "claimed").lt("claimedAt", claimCutoff), | ||
| ) | ||
| .take(500); |
There was a problem hiding this comment.
A couple concerns here:
- There will be an outage/downtime for at least the claimCutoff period every time we deploy funrun / they push.
- 500 items means each one can only have 32kb of args (which is smaller than some embedding vectors). The watchdog will likely run out of memory
Summary
Adds
BatchWorkpool, a new execution mode where a small number of long-lived executor actions each run many task handlers concurrently. This eliminates the 1-action-per-task overhead of standard mode and enables much higher effective concurrency.Key design decisions
complete,fail, andreleasemutations verify the calling executor still owns the task, preventing duplicate completions from stale executors.What's included
src/client/batch.ts—BatchWorkpoolclass withaction(),executor(),enqueue(),enqueueBatch(),enqueueByHandle()src/component/batch.ts— Component-side mutations: enqueue, claim, complete, fail, release, executor lifecyclesrc/component/schema.ts—batchTaskstable with slot-partitioned indexes,batchConfigsingletonsrc/client/batch.test.ts— 100+ client-side unit tests (executor loop, retry, backoff, memory pressure)src/component/batch.test.ts— 100+ component-side unit tests (claim ownership, slot isolation, edge cases)example/— Full comparison app with standard vs batch modes, stress test handlersREADME.md— BatchWorkpool documentation and usage guideBATCH_DESIGN_ANALYSIS.md— Detailed architecture rationaleBenchmarks: Standard vs Batch mode
Head-to-head comparison on the same Convex dev deployment. Each task runs a 2-second simulated handler (
slowWork).Standard mode processes ~100 tasks every ~2s (limited by maxParallelism and per-action scheduling overhead). Batch mode runs all tasks concurrently across 5 long-lived executors, completing 1K tasks in a single 2s cycle and 10K tasks in ~4 cycles.
Additional stress tests (batch mode only)
Zero OCC errors across all test configurations, validating the slot-based executor partitioning design.
Test plan
npm test)npm run build)🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Documentation
Latest branch benchmark (2026-02-14)
Additional apples-to-apples operational comparison on this branch:
example:enqueueABunchOfActions× 67 (~2010 actions)test:runBatch(count=2000, mock translation enabled)Derived from this run:
Artifact:
benchmark_results/standard_vs_batch_comparison_2026-02-14.txt