Background container start: async worker for container starts#1051
Background container start: async worker for container starts#1051
Conversation
Container starts are now enqueued to a Redis Stream and processed by a dedicated container-worker service with configurable parallelism. The POST /docker endpoint returns immediately with a start_id, and clients poll GET /docker/status for progress. This decouples the web server from long-running container operations and improves reliability under concurrent load. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e21e5eaaf6
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| "challenge_index": str(dojo_challenge.challenge_index), | ||
| "practice": str(int(bool(practice))), | ||
| "as_user_id": str(as_user_id) if as_user_id is not None else "", |
There was a problem hiding this comment.
Preserve workspace-token user in queued start
The queued payload only includes as_user_id, but that field is only populated in the admin as_user branch. If a request uses X-Workspace-Token, as_user is set earlier yet as_user_id stays None, so this stream entry sends an empty as_user_id and the worker can only start the challenge as the current user. That changes mounts/flags for token-based starts compared to the synchronous path. Please propagate the token user ID into the payload (or resolve it in the worker) so workspace-token launches target the intended user.
Useful? React with 👍 / 👎.
| try: | ||
| process_start(r, consumer_name, message_id, message_data) | ||
| except Exception as e: | ||
| logger.error(f"Unexpected error processing {message_id}: {e}", exc_info=True) | ||
| finally: | ||
| r.xack(CONTAINER_STARTS_STREAM, CONSUMER_GROUP, message_id) |
There was a problem hiding this comment.
Avoid ACKing failed start jobs
This loop always XACKs in finally even if process_start throws (e.g., transient DB/Redis error before status is updated). In that case the stream entry is dropped, the status key can remain stuck at queued, and the user will poll until timeout with no automatic retry. Consider only ACKing after process_start succeeds (or after you write a terminal failed status), and leaving the message pending on unexpected exceptions so another worker can retry.
Useful? React with 👍 / 👎.
| return cached | ||
| find_command = ["/bin/find", "/dev", "-type", "c"] | ||
| # When using certain logging drivers (like Splunk), docker.containers.run() returns None | ||
| # Use detach=True and logs() to capture output instead |
There was a problem hiding this comment.
Let’s not remove comments just to remove comments, let’s just not add unnecessary comments in future written code.
|
|
||
|
|
||
| def remove_container(user): | ||
| # Just in case our container is still running on the other docker container, let's make sure we try to kill both |
There was a problem hiding this comment.
See other comment about killing comments
| else: | ||
| raise RuntimeError(f"Workspace failed to become ready.") | ||
|
|
||
| def docker_locked(func): |
There was a problem hiding this comment.
Do we have tests that exercise this “locked” behavior?
There was a problem hiding this comment.
Yes — test_concurrent_start_same_user in test/test_container_worker.py exercises this. It starts a challenge, then immediately tries a second start for the same user and asserts it gets rejected with the lock error.
There was a problem hiding this comment.
Yes — test_concurrent_start_same_user in test/test_container_worker.py starts two challenges concurrently for the same user and verifies the second gets the lock-contention error.
dojo_plugin/api/v1/docker.py
Outdated
| status = get_start_status(redis_client, start_id) | ||
|
|
||
| if status is None: | ||
| return {"success": False, "error": "Unknown start ID"} |
There was a problem hiding this comment.
Should this be a 404 HTTP response? Does that make more semantics sense? (although, is that something we’d need to handle on the frontend?
dojo_plugin/api/v1/docker.py
Outdated
|
|
||
| user = get_current_user() | ||
| if status.get("user_id") != user.id: | ||
| return {"success": False, "error": "Unknown start ID"} |
There was a problem hiding this comment.
The error message here is the same as above, but the issue is unknown user_id?
There was a problem hiding this comment.
Intentional — returning the same "Unknown start ID" prevents leaking information about whether a start_id exists for a different user. If we returned a distinct message, an attacker could enumerate valid start_ids.
| RETRY_DELAY = 2 | ||
|
|
||
|
|
||
| def get_redis_client(): |
There was a problem hiding this comment.
There’s a lot of this function, we should move this to utils or use one that (likely) already exists.
| "error": None, "user_id": user_id, | ||
| }) | ||
|
|
||
| logger.info(f"Starting challenge for user {user_id} start={start_id} (attempt {attempt}/{MAX_ATTEMPTS})") |
There was a problem hiding this comment.
Big change, for all the logging in this PR, use = format of f-strings so that splunk can easily capture.
| break | ||
|
|
||
| try: | ||
| if autoclaim_counter >= 12: |
There was a problem hiding this comment.
Should this be a global?
| logger = logging.getLogger(__name__) | ||
| logger.setLevel(logging.INFO) | ||
| handler = logging.StreamHandler() | ||
| handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) |
There was a problem hiding this comment.
Now that there’s a lot of these components that do logging in this way let’s DRY so that it’s in one place
SPEC.md
Outdated
| @@ -0,0 +1,228 @@ | |||
| # Background Container Start | |||
There was a problem hiding this comment.
Don’t commit the spec just commit the code
…C.md - Restore removed comments on remove_container and get_available_devices - Fix workspace-token as_user_id not propagating to worker payload - Move XACK after process_start (don't ACK failed jobs) - Return 404 for unknown/unauthorized start IDs - DRY get_redis_client: import from background_stats - Use = format f-strings in all new logging for Splunk - Extract AUTOCLAIM_INTERVAL_TICKS constant - DRY worker logging setup into setup_worker_logging() - Handle 404 in frontend status polling - Remove SPEC.md from repo Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary
container:starts) and processed by a dedicatedcontainer-workerservice with configurable parallelism (CONTAINER_WORKERS, default 8)/dockerreturns immediately with astart_id; clients pollGET /docker/status?id=for progress (queued → starting → ready/failed)Test plan
container-workerservice starts and stays running (dojo compose ps)./deploy.sh -tpasses existing tests with updated asyncstart_challenge()intest/utils.pytest/test_container_worker.py(async flow, status progression, concurrent lock, unknown ID)🤖 Generated with Claude Code