-
-
Notifications
You must be signed in to change notification settings - Fork 75
feat: New fetch job function. #1241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: New fetch job function. #1241
Conversation
New fetch job function: - handles on update conflict - works with same lock different priority task order - inner select uses index properly
Fetch job with retry after max retries get any doable job instead
The code is not ready to be merged! |
You can mark the PR as a draft to make sure it doesn't get accidentally merged :) |
Thanks for this PR, @TheNeedForSleep ! I have just come up against this issue in a project I'm working on. I have temporarily "fixed" it for myself with pretty much the same SQL change: CREATE OR REPLACE FUNCTION procrastinate_fetch_job(
target_queue_names character varying[]
)
RETURNS procrastinate_jobs
LANGUAGE plpgsql
AS $$
DECLARE
found_jobs procrastinate_jobs;
BEGIN
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
WHERE
(
jobs.lock IS NULL OR
-- reject the job if its lock has earlier jobs
NOT EXISTS (
SELECT 1
FROM procrastinate_jobs AS earlier_jobs
WHERE
earlier_jobs.lock = jobs.lock
AND earlier_jobs.status IN ('todo', 'doing', 'aborting')
AND earlier_jobs.id < jobs.id)
)
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names ))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;
RETURN found_jobs;
END;
$$; Question: why is it necessary to make the change in manager.py? It looks to me that the existing query will already fallback to a job where |
Add 'aborting' status back in to keep the old behaviour. Adding 'aborting' will slow down the query as the lock index does not include the aborting status.
If multiple workers try to fetch tasks with the same lock they will keep running into the same conflicts. To deescalate a worker that fails to fetch a task successfully just grabs the next task with no lock instead of returning None going to sleep and then keep retrying and keep escalating the problem. I dont know if it is actually necessary to add in this code for everyone but in some situations this problem will occur.
I will let you guys decide if you actually want that change in the manager.py If any of my ideas hard to follow then please let me know :) |
Motivation for this change: Worker will crash when trying to add a job with lock that already is in the lock index. Test setup: The following jobs where the fast_job_go_brr is called once. import asyncio
import logging
import time
from random import choice, random
from pydantic import validate_call
from worker import PRIORITY_QUEUE, IntegrityErrorRetryStrategy, worker
logger = logging.getLogger(__name__)
@worker.task(
queue=PRIORITY_QUEUE,
retry=IntegrityErrorRetryStrategy(),
)
@validate_call
async def fast_job_go_brr(
n_jobs: int = 10_000,
locks: list | None = None,
):
if locks is None:
locks = [None, "A", "B", "C"]
async with asyncio.TaskGroup() as group:
for _i in range(n_jobs):
group.create_task(
fast_job.configure(
lock=choice(locks), # noqa: S311
priority=choice([1, 2, 3]), # noqa: S311
).defer_async()
)
@worker.task(
queue=PRIORITY_QUEUE,
retry=IntegrityErrorRetryStrategy(),
)
@validate_call
async def fast_job(min_wait: float = 0.001, max_wait: float = 0.01, sync_wait=False):
sleep_time = random() * (max_wait - min_wait) + min_wait # noqa: S311
logger.info("Waiting %s", sleep_time)
if sync_wait:
time.sleep(sleep_time)
else:
await asyncio.sleep(sleep_time)
logger.info("Done") |
fixes #1242
New fetch job function:
Does not support the aborting status!
Successful PR Checklist:
PR label(s):