-
Notifications
You must be signed in to change notification settings - Fork 220
Task 2: MatrixRunner backend + APIs Integration for multiple jobs #89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughA new matrix job management feature called "MatrixRunner" was introduced to the backend, including a persistent state mechanism and multiple new API endpoints for task control and monitoring. Supporting code and a state file were added. The Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant FlaskAPI
participant MatrixRunner
participant StateFile
Client->>FlaskAPI: POST /api/matrix-runner/start (params)
FlaskAPI->>MatrixRunner: generate(param_dict)
MatrixRunner->>StateFile: save()
FlaskAPI-->>Client: job count
Client->>FlaskAPI: POST /api/matrix-runner/next
FlaskAPI->>MatrixRunner: next_job()
MatrixRunner->>StateFile: save()
FlaskAPI-->>Client: next job info
Client->>FlaskAPI: POST /api/matrix-runner/complete (job_id)
FlaskAPI->>MatrixRunner: complete_job(job_id)
MatrixRunner->>StateFile: save()
FlaskAPI-->>Client: status
Client->>FlaskAPI: POST /api/matrix-runner/pause/resume
FlaskAPI->>MatrixRunner: pause()/resume()
MatrixRunner->>StateFile: save()
FlaskAPI-->>Client: status
Client->>FlaskAPI: GET /api/matrix-runner/status
FlaskAPI->>MatrixRunner: status()
FlaskAPI-->>Client: summary
Estimated code review effort🎯 4 (Complex) | ⏱️ ~35 minutes Poem
Note 🔌 MCP (Model Context Protocol) integration is now available in Early Access!Pro users can now connect to remote MCP servers under the Integrations page to get reviews and chat conversations that understand additional development context. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Reviewer's GuideThis PR introduces a MatrixRunner utility to generate and track grid-based image-generation jobs with pause/resume and JSON persistence, and wires it into the existing backend via dedicated API endpoints for sweep control and status monitoring. Sequence diagram for MatrixRunner API job lifecyclesequenceDiagram
actor User
participant API
participant MatrixRunner
User->>API: POST /api/matrix-runner/start
API->>MatrixRunner: generate(param_dict)
MatrixRunner-->>API: jobs initialized
API-->>User: total_jobs
User->>API: POST /api/matrix-runner/next
API->>MatrixRunner: next_job()
MatrixRunner-->>API: next job (marked running)
API-->>User: job details
User->>API: POST /api/matrix-runner/complete
API->>MatrixRunner: complete_job(job_id)
MatrixRunner-->>API: job marked done
API-->>User: status ok
Class diagram for MatrixRunner job managementclassDiagram
class MatrixRunner {
- state_file: str
- jobs: list
- index: int
- paused: bool
+ __init__(state_file)
+ generate(param_dict)
+ next_job()
+ complete_job(job_id)
+ pause()
+ resume()
+ save()
+ load()
}
MatrixRunner --> "*" Job
class Job {
+ id: int
+ status: str
+ [dynamic parameters: seeds, steps, samplers, ...]
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @nataliarodriguez-uc - I've reviewed your changes - here's some feedback:
- Wrap MatrixRunner state modifications and file writes in a thread lock to prevent race conditions when multiple API requests invoke next_job/complete_job concurrently.
- Move the persisted state file out of the source directory (or at least add it to .gitignore) so you don’t accidentally commit runtime state into version control.
- Remove duplicate imports and any unused dependencies in task_runner.py to keep the module clean and maintainable.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Wrap MatrixRunner state modifications and file writes in a thread lock to prevent race conditions when multiple API requests invoke next_job/complete_job concurrently.
- Move the persisted state file out of the source directory (or at least add it to .gitignore) so you don’t accidentally commit runtime state into version control.
- Remove duplicate imports and any unused dependencies in task_runner.py to keep the module clean and maintainable.
## Individual Comments
### Comment 1
<location> `dream_layer_backend/dream_layer_backend_utils/task_runner.py:59` </location>
<code_context>
+ return job
+ return None # No more jobs
+
+ def complete_job(self, job_id):
+ """Mark job as done"""
+ for job in self.jobs:
</code_context>
<issue_to_address>
complete_job does not handle the case where job_id is not found.
Currently, if job_id is missing, no feedback is given. Adding a warning or exception would improve error visibility.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
def complete_job(self, job_id):
"""Mark job as done"""
for job in self.jobs:
if job["id"] == job_id:
job["status"] = "done"
break
self.save()
=======
def complete_job(self, job_id):
"""Mark job as done. Raises ValueError if job_id not found."""
found = False
for job in self.jobs:
if job["id"] == job_id:
job["status"] = "done"
found = True
break
if not found:
raise ValueError(f"Job with id {job_id} not found.")
self.save()
>>>>>>> REPLACE
</suggested_fix>
### Comment 2
<location> `dream_layer_backend/dream_layer_backend_utils/task_runner.py:77` </location>
<code_context>
+ self.save()
+
+ # --- Persistence ---
+ def save(self):
+ with open(self.state_file, "w") as f:
+ json.dump(
</code_context>
<issue_to_address>
save does not handle file write errors.
Wrap the file write in a try/except block and log errors to prevent process crashes.
Suggested implementation:
```python
def save(self):
try:
with open(self.state_file, "w") as f:
json.dump(
{"jobs": self.jobs, "index": self.index, "paused": self.paused}, f, indent=2
)
except Exception as e:
import logging
logging.error(f"Failed to save state to {self.state_file}: {e}")
```
If `logging` is already imported at the top of the file, you can remove the `import logging` line inside the except block.
</issue_to_address>
### Comment 3
<location> `dream_layer_backend/dream_layer.py:447` </location>
<code_context>
+ data = request.json or {}
+ # accept lists/ranges for seeds, steps, samplers, etc.
+ # keep only list-valued keys for the sweep
+ param_dict = {k: v for k, v in data.items() if isinstance(v, list)}
+ runner.generate(param_dict) # deterministic expansion
+ return jsonify({"status":"success","total_jobs": len(runner.jobs)})
</code_context>
<issue_to_address>
Only list-valued keys are used for job generation, which may ignore scalar parameters.
Scalar values from the API client are currently excluded from job generation. To ensure all parameters are considered, normalize scalars to single-item lists before processing.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
# accept lists/ranges for seeds, steps, samplers, etc.
# keep only list-valued keys for the sweep
param_dict = {k: v for k, v in data.items() if isinstance(v, list)}
runner.generate(param_dict) # deterministic expansion
return jsonify({"status":"success","total_jobs": len(runner.jobs)})
=======
# accept lists/ranges for seeds, steps, samplers, etc.
# normalize scalars to single-item lists for the sweep
param_dict = {k: v if isinstance(v, list) else [v] for k, v in data.items()}
runner.generate(param_dict) # deterministic expansion
return jsonify({"status":"success","total_jobs": len(runner.jobs)})
>>>>>>> REPLACE
</suggested_fix>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
def complete_job(self, job_id): | ||
"""Mark job as done""" | ||
for job in self.jobs: | ||
if job["id"] == job_id: | ||
job["status"] = "done" | ||
break | ||
self.save() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: complete_job does not handle the case where job_id is not found.
Currently, if job_id is missing, no feedback is given. Adding a warning or exception would improve error visibility.
def complete_job(self, job_id): | |
"""Mark job as done""" | |
for job in self.jobs: | |
if job["id"] == job_id: | |
job["status"] = "done" | |
break | |
self.save() | |
def complete_job(self, job_id): | |
"""Mark job as done. Raises ValueError if job_id not found.""" | |
found = False | |
for job in self.jobs: | |
if job["id"] == job_id: | |
job["status"] = "done" | |
found = True | |
break | |
if not found: | |
raise ValueError(f"Job with id {job_id} not found.") | |
self.save() |
self.save() | ||
|
||
# --- Persistence --- | ||
def save(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: save does not handle file write errors.
Wrap the file write in a try/except block and log errors to prevent process crashes.
Suggested implementation:
def save(self):
try:
with open(self.state_file, "w") as f:
json.dump(
{"jobs": self.jobs, "index": self.index, "paused": self.paused}, f, indent=2
)
except Exception as e:
import logging
logging.error(f"Failed to save state to {self.state_file}: {e}")
If logging
is already imported at the top of the file, you can remove the import logging
line inside the except block.
# accept lists/ranges for seeds, steps, samplers, etc. | ||
# keep only list-valued keys for the sweep | ||
param_dict = {k: v for k, v in data.items() if isinstance(v, list)} | ||
runner.generate(param_dict) # deterministic expansion | ||
return jsonify({"status":"success","total_jobs": len(runner.jobs)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Only list-valued keys are used for job generation, which may ignore scalar parameters.
Scalar values from the API client are currently excluded from job generation. To ensure all parameters are considered, normalize scalars to single-item lists before processing.
# accept lists/ranges for seeds, steps, samplers, etc. | |
# keep only list-valued keys for the sweep | |
param_dict = {k: v for k, v in data.items() if isinstance(v, list)} | |
runner.generate(param_dict) # deterministic expansion | |
return jsonify({"status":"success","total_jobs": len(runner.jobs)}) | |
# accept lists/ranges for seeds, steps, samplers, etc. | |
# normalize scalars to single-item lists for the sweep | |
param_dict = {k: v if isinstance(v, list) else [v] for k, v in data.items()} | |
runner.generate(param_dict) # deterministic expansion | |
return jsonify({"status":"success","total_jobs": len(runner.jobs)}) |
job = runner.next_job() | ||
if not job: | ||
return jsonify({"status":"empty"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Use named expression to simplify assignment and conditional (
use-named-expression
) - Lift code into else after jump in control flow (
reintroduce-else
) - Swap if/else branches (
swap-if-else-branches
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Nitpick comments (3)
dream_layer_backend/dream_layer.py (3)
125-128
: Persisting state under code dir is fine; ensure it’s ignored and not committedGood location, but pair this with a fixed .gitignore and remove the tracked JSON as discussed.
If you’d prefer a writable data dir, consider
os.path.join(parent_dir, "Dream_Layer_Resources", "state")
.
463-474
: Consider delegating status aggregation to the runner under lockAvoids races and keeps logic encapsulated.
Example:
# in MatrixRunner def summary(self): with self._lock: counts = {"pending": 0, "running": 0, "done": 0} for j in self.jobs: s = j.get("status") if s in counts: counts[s] += 1 return {"total_jobs": len(self.jobs), **counts, "paused": self.paused}Then in the endpoint:
return jsonify({"status":"ok", **runner.summary()})
475-482
: Endpoint OK; optionally use 204 No Content when emptyReturning 204 improves semantics; current response is acceptable.
- if not job: - return jsonify({"status":"empty"}) + if not job: + return ("", 204)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
.gitignore
(1 hunks)dream_layer_backend/dream_layer.py
(3 hunks)dream_layer_backend/dream_layer_backend_utils/task_runner.py
(1 hunks)dream_layer_backend/matrix_runner_state.json
(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-16T18:40:41.273Z
Learnt from: divyaprakash0426
PR: DreamLayer-AI/DreamLayer#40
File: docker/Dockerfile.backend.dev:4-6
Timestamp: 2025-07-16T18:40:41.273Z
Learning: The DreamLayer project follows an iterative approach to Docker development, where basic Docker setup is established first, and missing dependencies (like PyYAML) are addressed in subsequent iterations when related services (like ComfyUI) are added to the Docker files.
Applied to files:
.gitignore
🪛 Ruff (0.12.2)
dream_layer_backend/dream_layer_backend_utils/task_runner.py
11-11: random
imported but unused
Remove unused import: random
(F401)
14-14: flask.Flask
imported but unused
Remove unused import
(F401)
14-14: flask.request
imported but unused
Remove unused import
(F401)
14-14: flask.jsonify
imported but unused
Remove unused import
(F401)
15-15: flask_cors.CORS
imported but unused
Remove unused import: flask_cors.CORS
(F401)
16-16: PIL.Image
imported but unused
Remove unused import: PIL.Image
(F401)
17-17: numpy
imported but unused
Remove unused import: numpy
(F401)
21-21: Redefinition of unused itertools
from line 13
Remove definition: itertools
(F811)
22-22: Redefinition of unused json
from line 10
Remove definition: json
(F811)
23-23: Redefinition of unused os
from line 9
Remove definition: os
(F811)
🔇 Additional comments (3)
dream_layer_backend/dream_layer.py (3)
13-13
: Import looks goodClean, targeted import of the new runner.
453-457
: Pause endpoint LGTM
458-462
: Resume endpoint LGTM
# DreamLayer logs and test logs | ||
logs | ||
test-logs | ||
test-logsdream_layer_backend/matrix_runner_state.json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Fix malformed .gitignore entry; split into two lines
The entry concatenated two paths, so neither is ignored. Restore test-logs
and add the state file on its own line.
Apply:
- test-logsdream_layer_backend/matrix_runner_state.json
+ test-logs
+ dream_layer_backend/matrix_runner_state.json
Then ensure the state file is untracked:
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Context around the malformed line:"
nl -ba .gitignore | sed -n '205,220p'
echo -e "\nIs the state file tracked?"
git ls-files -v dream_layer_backend/matrix_runner_state.json || true
echo -e "\nIf listed, untrack it:"
git rm --cached -f dream_layer_backend/matrix_runner_state.json || true
Length of output: 229
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Matched lines in .gitignore:"
grep -n 'test-logsdream_layer_backend' .gitignore || true
echo -e "\nContext around the malformed entry:"
grep -C2 'test-logsdream_layer_backend' .gitignore || true
echo -e "\nTracking status of the state file:"
git ls-files -v dream_layer_backend/matrix_runner_state.json || echo "Not tracked"
Length of output: 618
Fix malformed .gitignore entry for test logs and state file
The entry at line 213 in .gitignore
currently reads as a single concatenated path and therefore ignores neither target. Split it into two separate lines:
.gitignore
@@ -213,1 +213,2 @@
-test-logsdream_layer_backend/matrix_runner_state.json
+test-logs
+dream_layer_backend/matrix_runner_state.json
Then untrack the state file (it is currently tracked — git ls-files -v
shows H dream_layer_backend/matrix_runner_state.json
):
git rm --cached -f dream_layer_backend/matrix_runner_state.json
🤖 Prompt for AI Agents
In the .gitignore file at line 213, the entry combines two paths into one,
causing neither to be ignored. Split this line into two separate lines, one for
test-logsdream_layer_backend and one for matrix_runner_state.json. Then, to
untrack the currently tracked state file
dream_layer_backend/matrix_runner_state.json, run the command git rm --cached -f
dream_layer_backend/matrix_runner_state.json.
import os | ||
import json | ||
import random | ||
import logging | ||
import itertools | ||
from flask import Flask, request, jsonify | ||
from flask_cors import CORS | ||
from PIL import Image | ||
import numpy as np | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove unused and duplicate imports; keep this module framework-agnostic
Multiple imports are unused and some are duplicated, tripping Ruff (F401/F811) and coupling this utility to Flask/PIL/NumPy unnecessarily.
-import os
-import json
-import random
-import logging
-import itertools
-from flask import Flask, request, jsonify
-from flask_cors import CORS
-from PIL import Image
-import numpy as np
+import os
+import json
+import logging
+import itertools
@@
-import itertools
-import json
-import os
Optionally, use logger
or remove import logging
+ logger = logging.getLogger(__name__)
if not needed.
Also applies to: 21-24
🧰 Tools
🪛 Ruff (0.12.2)
11-11: random
imported but unused
Remove unused import: random
(F401)
14-14: flask.Flask
imported but unused
Remove unused import
(F401)
14-14: flask.request
imported but unused
Remove unused import
(F401)
14-14: flask.jsonify
imported but unused
Remove unused import
(F401)
15-15: flask_cors.CORS
imported but unused
Remove unused import: flask_cors.CORS
(F401)
16-16: PIL.Image
imported but unused
Remove unused import: PIL.Image
(F401)
17-17: numpy
imported but unused
Remove unused import: numpy
(F401)
🤖 Prompt for AI Agents
In dream_layer_backend/dream_layer_backend_utils/task_runner.py around lines 9
to 18 and 21 to 24, remove all unused imports including flask, flask_cors, PIL,
numpy, and any duplicates to keep the module framework-agnostic and avoid Ruff
warnings. Also, if logging is not used, remove the import logging statement and
any logger initialization. Only keep imports that are actually used in the code.
class MatrixRunner: | ||
def __init__(self, state_file="matrix_jobs.json"): | ||
self.state_file = state_file | ||
self.jobs = [] | ||
self.index = 0 | ||
self.paused = False | ||
self.load() | ||
|
||
# --- Core Job Management --- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add thread-safety and atomic persistence to prevent race conditions and file corruption
Concurrent /next
//complete
calls can return duplicate jobs or corrupt the JSON (partial writes). Guard state with a lock and write atomically.
@@
-class MatrixRunner:
+import threading
+
+class MatrixRunner:
def __init__(self, state_file="matrix_jobs.json"):
self.state_file = state_file
self.jobs = []
self.index = 0
self.paused = False
- self.load()
+ self._lock = threading.RLock()
+ self.load()
@@
- def generate(self, param_dict):
+ def generate(self, param_dict):
"""Create all jobs from parameter ranges and reset state"""
- keys = list(param_dict.keys())
- combos = list(itertools.product(*param_dict.values()))
- self.jobs = [
- {"id": i, **dict(zip(keys, combo)), "status": "pending"}
- for i, combo in enumerate(combos)
- ]
- self.index = 0
- self.paused = False
- self.save()
+ keys = list(param_dict.keys())
+ combos = list(itertools.product(*param_dict.values()))
+ with self._lock:
+ self.jobs = [
+ {"id": i, **dict(zip(keys, combo)), "status": "pending"}
+ for i, combo in enumerate(combos)
+ ]
+ self.index = 0
+ self.paused = False
+ self.save()
@@
- def next_job(self):
+ def next_job(self):
"""Get the next pending job and mark it as running"""
- if self.paused:
- return None
- while self.index < len(self.jobs):
- job = self.jobs[self.index]
- self.index += 1
- if job["status"] == "pending":
- job["status"] = "running"
- self.save()
- return job
+ with self._lock:
+ if self.paused:
+ return None
+ while self.index < len(self.jobs):
+ job = self.jobs[self.index]
+ self.index += 1
+ if job.get("status") == "pending":
+ job["status"] = "running"
+ self.save()
+ return job
return None # No more jobs
@@
- def complete_job(self, job_id):
+ def complete_job(self, job_id) -> bool:
"""Mark job as done"""
- for job in self.jobs:
- if job["id"] == job_id:
- job["status"] = "done"
- break
- self.save()
+ found = False
+ with self._lock:
+ for job in self.jobs:
+ if job.get("id") == job_id:
+ job["status"] = "done"
+ found = True
+ break
+ self.save()
+ return found
@@
- def pause(self):
- self.paused = True
- self.save()
+ def pause(self):
+ with self._lock:
+ self.paused = True
+ self.save()
@@
- def resume(self):
- self.paused = False
- self.save()
+ def resume(self):
+ with self._lock:
+ self.paused = False
+ self.save()
@@
- def save(self):
- with open(self.state_file, "w") as f:
- json.dump(
- {"jobs": self.jobs, "index": self.index, "paused": self.paused}, f, indent=2
- )
+ def save(self):
+ """Atomic write to avoid partial/corrupt files."""
+ state = {"jobs": self.jobs, "index": self.index, "paused": self.paused}
+ tmp = f"{self.state_file}.tmp"
+ with open(tmp, "w") as f:
+ json.dump(state, f, indent=2)
+ os.replace(tmp, self.state_file)
@@
- def load(self):
- if os.path.exists(self.state_file):
- with open(self.state_file, "r") as f:
- state = json.load(f)
- self.jobs = state.get("jobs", [])
- self.index = state.get("index", 0)
- self.paused = state.get("paused", False)
+ def load(self):
+ if os.path.exists(self.state_file):
+ try:
+ with open(self.state_file, "r") as f:
+ state = json.load(f)
+ self.jobs = state.get("jobs", [])
+ self.index = state.get("index", 0)
+ self.paused = state.get("paused", False)
+ except Exception as e:
+ logging.getLogger(__name__).warning("Failed to load state (%s); starting fresh", e)
+ self.jobs, self.index, self.paused = [], 0, False
Optional: convert any lingering "running"
jobs back to "pending"
on load to recover from worker crashes.
Also applies to: 46-57, 59-66, 68-75, 77-82, 83-89
🤖 Prompt for AI Agents
In dream_layer_backend/dream_layer_backend_utils/task_runner.py around lines 25
to 33 and also lines 46-57, 59-66, 68-75, 77-82, and 83-89, the current
implementation lacks thread-safety and atomic file writes, which can cause race
conditions and JSON corruption during concurrent job state updates. To fix this,
add a threading lock to guard all accesses and modifications to the shared job
state and ensure that writes to the state_file are done atomically, for example
by writing to a temporary file and then renaming it. Additionally, modify the
load method to reset any jobs marked as "running" back to "pending" to recover
from worker crashes.
def generate(self, param_dict): | ||
"""Create all jobs from parameter ranges and reset state""" | ||
keys = list(param_dict.keys()) | ||
combos = list(itertools.product(*param_dict.values())) | ||
self.jobs = [ | ||
{"id": i, **dict(zip(keys, combo)), "status": "pending"} | ||
for i, combo in enumerate(combos) | ||
] | ||
self.index = 0 | ||
self.paused = False | ||
self.save() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guard against explosive cartesian products
A broad grid can easily generate millions of jobs, exhausting memory.
Add a cap (env-configurable) and reject requests exceeding it:
- Pre-compute size via product of lengths, bail out if > MAX_JOBS.
- Or stream generation instead of materializing the full list.
🤖 Prompt for AI Agents
In dream_layer_backend/dream_layer_backend_utils/task_runner.py around lines 34
to 45, the generate method creates all job combinations at once, which can cause
memory exhaustion for large parameter grids. To fix this, add an
environment-configurable MAX_JOBS limit, pre-compute the total number of
combinations by multiplying the lengths of each parameter list, and if this
exceeds MAX_JOBS, raise an exception or reject the request before generating
jobs. This prevents creating an excessively large list and guards against memory
issues.
@app.route('/api/matrix-runner/start', methods=['POST']) | ||
def matrix_runner_start(): | ||
try: | ||
data = request.json or {} | ||
# accept lists/ranges for seeds, steps, samplers, etc. | ||
# keep only list-valued keys for the sweep | ||
param_dict = {k: v for k, v in data.items() if isinstance(v, list)} | ||
runner.generate(param_dict) # deterministic expansion | ||
return jsonify({"status":"success","total_jobs": len(runner.jobs)}) | ||
except Exception as e: | ||
return jsonify({"status":"error","message": str(e)}), 500 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Validate inputs and cap the sweep size to avoid OOM
Sanity-check list inputs and limit total jobs to a safe maximum; return 400 if exceeded.
@app.route('/api/matrix-runner/start', methods=['POST'])
def matrix_runner_start():
try:
- data = request.json or {}
+ data = request.json or {}
# accept lists/ranges for seeds, steps, samplers, etc.
# keep only list-valued keys for the sweep
- param_dict = {k: v for k, v in data.items() if isinstance(v, list)}
- runner.generate(param_dict) # deterministic expansion
- return jsonify({"status":"success","total_jobs": len(runner.jobs)})
+ param_dict = {k: v for k, v in data.items() if isinstance(v, list) and len(v) > 0}
+ if not param_dict:
+ return jsonify({"status":"error","message":"Provide at least one non-empty list parameter"}), 400
+ from math import prod
+ max_jobs = int(os.environ.get("MATRIX_MAX_JOBS", "10000"))
+ total = prod(len(v) for v in param_dict.values())
+ if total > max_jobs:
+ return jsonify({"status":"error","message": f"Grid too large ({total} > {max_jobs})"}), 400
+ runner.generate(param_dict) # deterministic expansion
+ return jsonify({"status":"success","total_jobs": len(runner.jobs)})
except Exception as e:
return jsonify({"status":"error","message": str(e)}), 500
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In dream_layer_backend/dream_layer.py around lines 441 to 452, the
matrix_runner_start endpoint currently accepts list inputs without validation or
limits, which risks excessive job creation and potential OOM errors. Add input
validation to ensure list parameters are sane, calculate the total number of
jobs generated, and if it exceeds a predefined safe maximum, return a 400
response with an appropriate error message. This prevents resource exhaustion by
capping the sweep size before calling runner.generate.
@app.route('/api/matrix-runner/complete', methods=['POST']) | ||
def matrix_runner_complete(): | ||
data = request.json or {} | ||
job_id = data.get("job_id") | ||
runner.complete_job(job_id) | ||
return jsonify({"status":"ok"}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Validate job_id and return 404 when not found
Currently None
(or an invalid ID) silently succeeds. Also align with the updated complete_job
bool return.
@app.route('/api/matrix-runner/complete', methods=['POST'])
def matrix_runner_complete():
- data = request.json or {}
- job_id = data.get("job_id")
- runner.complete_job(job_id)
- return jsonify({"status":"ok"})
+ data = request.json or {}
+ job_id = data.get("job_id")
+ if job_id is None:
+ return jsonify({"status":"error","message":"job_id is required"}), 400
+ ok = runner.complete_job(job_id)
+ if not ok:
+ return jsonify({"status":"error","message":f"job_id {job_id} not found"}), 404
+ return jsonify({"status":"ok"})
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@app.route('/api/matrix-runner/complete', methods=['POST']) | |
def matrix_runner_complete(): | |
data = request.json or {} | |
job_id = data.get("job_id") | |
runner.complete_job(job_id) | |
return jsonify({"status":"ok"}) | |
@app.route('/api/matrix-runner/complete', methods=['POST']) | |
def matrix_runner_complete(): | |
data = request.json or {} | |
job_id = data.get("job_id") | |
if job_id is None: | |
return jsonify({"status":"error","message":"job_id is required"}), 400 | |
ok = runner.complete_job(job_id) | |
if not ok: | |
return jsonify({"status":"error","message":f"job_id {job_id} not found"}), 404 | |
return jsonify({"status":"ok"}) |
🤖 Prompt for AI Agents
In dream_layer_backend/dream_layer.py around lines 483 to 489, the
matrix_runner_complete function does not validate the job_id from the request
JSON and always returns a success status even if job_id is None or invalid.
Update the function to check if job_id is provided and call
runner.complete_job(job_id), which now returns a boolean indicating success. If
the job_id is missing or complete_job returns False, return a 404 response;
otherwise, return the success JSON response.
{ | ||
"jobs": [ | ||
{ | ||
"id": 0, | ||
"seeds": 1, | ||
"steps": 10, | ||
"Samplers": "euler", | ||
"status": "done" | ||
}, | ||
{ | ||
"id": 1, | ||
"seeds": 1, | ||
"steps": 20, | ||
"Samplers": "euler", | ||
"status": "running" | ||
}, | ||
{ | ||
"id": 2, | ||
"seeds": 2, | ||
"steps": 10, | ||
"Samplers": "euler", | ||
"status": "pending" | ||
}, | ||
{ | ||
"id": 3, | ||
"seeds": 2, | ||
"steps": 20, | ||
"Samplers": "euler", | ||
"status": "pending" | ||
}, | ||
{ | ||
"id": 4, | ||
"seeds": 3, | ||
"steps": 10, | ||
"Samplers": "euler", | ||
"status": "pending" | ||
}, | ||
{ | ||
"id": 5, | ||
"seeds": 3, | ||
"steps": 20, | ||
"Samplers": "euler", | ||
"status": "pending" | ||
} | ||
], | ||
"index": 2, | ||
"paused": false | ||
} No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not commit runtime state; remove file and rely on persistence via .gitignore
This JSON is mutable, environment-specific state. Keep it out of VCS to avoid drift, merge conflicts, and accidental resets. Also, key naming is inconsistent (Samplers
vs likely samplers
).
Recommended actions:
- Delete from repo and ignore it (see .gitignore fix above).
- Regenerate at runtime; initialize empty state when absent.
- Normalize keys to snake_case if you need a seed file in examples.
🤖 Prompt for AI Agents
In dream_layer_backend/matrix_runner_state.json lines 1 to 48, this file
contains mutable runtime state that should not be committed to version control
to prevent conflicts and accidental resets. Remove this file from the repository
and add it to .gitignore to avoid tracking it. Ensure the application
regenerates this state at runtime and initializes it as empty if the file is
missing. Also, if a seed file is needed for examples, normalize all keys to
snake_case, changing "Samplers" to "samplers" for consistency.
Hey? Thanks for raising the PR :) Can you please include a few DreamLayer UI screenshots? That would make it easier for us to review. |
Summary
Implements Task 2: MatrixRunner backend + APIs to support matrix/grid-based image generation workflows with persistent state, pause/resume capability, and job tracking.
Key Features
MatrixRunner
class indream_layer_backend_utils/task_runner.py
pending
,running
,done
)New API routes in
dream_layer.py
POST /api/matrix-runner/start
– Initialize job sweepPOST /api/matrix-runner/pause
– Pause executionPOST /api/matrix-runner/resume
– Resume executionGET /api/matrix-runner/status
– Retrieve current state of all jobsPOST /api/matrix-runner/next
– Fetch the next pending jobPOST /api/matrix-runner/complete
– Mark a job as completedTesting
Tested locally using
curl
: