Skip to content

Add BatchEnqueue for pipelined multi-task enqueue#1094

Open
enilsen16 wants to merge 5 commits intohibiken:masterfrom
fanatics-live:batch-enqueue
Open

Add BatchEnqueue for pipelined multi-task enqueue#1094
enilsen16 wants to merge 5 commits intohibiken:masterfrom
fanatics-live:batch-enqueue

Conversation

@enilsen16
Copy link
Copy Markdown

Adds BatchEnqueue to the Broker interface and RDB implementation that sends multiple enqueueCmd Lua script invocations in a single Redis pipeline round-trip. Also adds BatchEnqueueContext to the Client as the public API, returning per-task results for partial-success handling.

Ref: #1069

Adds BatchEnqueue to the Broker interface and RDB implementation that
sends multiple enqueueCmd Lua script invocations in a single Redis
pipeline round-trip. Also adds BatchEnqueueContext to the Client as
the public API, returning per-task results for partial-success handling.

Ref: hibiken#1069
BatchEnqueueContext had a time comparison bug where `now` was captured
before the loop but `processAt` was set to time.Now() inside
composeOptions during each iteration, causing all immediate tasks to be
incorrectly classified as scheduled and rejected.

Fix: move `now` capture inside the loop, after composeOptions.

Additionally, extend BatchEnqueueContext to support scheduled tasks in
the same pipeline. Tasks with a future ProcessAt are now routed to
scheduleCmd (ZADD to scheduled set) instead of being rejected. Only
unique and group tasks remain unsupported.

Changes:
- Add BatchEnqueueItem type pairing TaskMessage with optional ProcessAt
- Update Broker interface, RDB, and testbroker to use BatchEnqueueItem
- Route immediate tasks to enqueueCmd, scheduled tasks to scheduleCmd
- Return correct TaskState (Pending vs Scheduled) in results
- Add tests for immediate, scheduled, and mixed batch scenarios
Exposes the opts field so callers can read a task's options to merge
with additional per-task options at batch time.
@kamikazechaser
Copy link
Copy Markdown
Collaborator

Please document any atomicity guarantees e.g. what kind of failures would or would not reject the entire batch.

// single Redis pipeline round-trip. It returns the number of newly enqueued
// messages (tasks whose IDs already exist in Redis are silently skipped).
// BatchEnqueue adds all given tasks to Redis using a single pipeline round-trip.
// Each item is either enqueued immediately or scheduled based on its ProcessAt field.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a clear warning on silent skipping of duplicates on its own line e.g.

WARNING: tasks whose IDs already exist in Redis are silently skipped

Add comprehensive doc comments to BatchEnqueueContext, the Broker
interface method, and the RDB implementation explaining that the batch
uses a Redis pipeline (not MULTI/EXEC), so partial success is possible
and individual Lua scripts are atomic but the batch is not.

Fix a bug where Script.Run inside a pipeline only sends EVALSHA without
the automatic EVAL fallback that non-pipeline calls get. On a fresh
Redis (or after SCRIPT FLUSH), this caused NOSCRIPT errors for every
pipeline-batched script invocation. The fix preloads the required Lua
scripts before building the pipeline.

Also roll back the in-memory queuesPublished cache when the pipeline
fails, preventing stale entries from suppressing future SADD calls.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants