From 1c0c2f74f2218ff63b646eae7855b6bbccc1ff0d Mon Sep 17 00:00:00 2001 From: Thorsten Rademaker Date: Wed, 20 Nov 2024 15:18:52 +0100 Subject: [PATCH 1/6] feat: New fetch job function. New fetch job function: - handles on update conflict - works with same lock different priority task order - inner select uses index properly --- .../02.15.00_01_new_fetch_job_function.sql | 41 +++++++++++++++++ procrastinate/sql/schema.sql | 46 ++++++++++--------- 2 files changed, 66 insertions(+), 21 deletions(-) create mode 100644 procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql diff --git a/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql b/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql new file mode 100644 index 000000000..22fa7e6a9 --- /dev/null +++ b/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql @@ -0,0 +1,41 @@ +-- new fetch job. Only checks for doing. On update conflict return NULL +CREATE OR REPLACE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) + RETURNS procrastinate_jobs + LANGUAGE plpgsql +AS $$ +DECLARE + found_jobs procrastinate_jobs; +BEGIN + BEGIN + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + WHERE + (jobs.lock IS NULL OR + NOT EXISTS ( -- reject the job if its lock has current jobs + SELECT 1 + FROM procrastinate_jobs AS jobs_with_locks + WHERE + jobs.lock IS NOT NULL + AND jobs_with_locks.lock = jobs.lock + AND jobs_with_locks.status = 'doing' + LIMIT 1 + )) + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names)) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + RETURN found_jobs; + EXCEPTION + WHEN unique_violation THEN + RETURN NULL; -- Return empty result on conflict + END; +END; +$$; diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index 4dd7bd59a..0e455c6a2 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -157,44 +157,48 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_fetch_job( - target_queue_names character varying[] -) - RETURNS procrastinate_jobs - LANGUAGE plpgsql +CREATE OR REPLACE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) + RETURNS procrastinate_jobs + LANGUAGE plpgsql AS $$ DECLARE - found_jobs procrastinate_jobs; + found_jobs procrastinate_jobs; BEGIN - WITH candidate AS ( - SELECT jobs.* + BEGIN + WITH candidate AS ( + SELECT jobs.* FROM procrastinate_jobs AS jobs WHERE - -- reject the job if its lock has earlier jobs - NOT EXISTS ( + (jobs.lock IS NULL OR + NOT EXISTS ( -- reject the job if its lock has current jobs SELECT 1 - FROM procrastinate_jobs AS earlier_jobs - WHERE - jobs.lock IS NOT NULL - AND earlier_jobs.lock = jobs.lock - AND earlier_jobs.status IN ('todo', 'doing', 'aborting') - AND earlier_jobs.id < jobs.id) + FROM procrastinate_jobs AS jobs_with_locks + WHERE + jobs.lock IS NOT NULL + AND jobs_with_locks.lock = jobs.lock + AND jobs_with_locks.status = 'doing' + LIMIT 1 + )) AND jobs.status = 'todo' - AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) + AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names)) AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 FOR UPDATE OF jobs SKIP LOCKED - ) - UPDATE procrastinate_jobs + ) + UPDATE procrastinate_jobs SET status = 'doing' FROM candidate WHERE procrastinate_jobs.id = candidate.id RETURNING procrastinate_jobs.* INTO found_jobs; - - RETURN found_jobs; + RETURN found_jobs; + EXCEPTION + WHEN unique_violation THEN + RETURN NULL; -- Return empty result on conflict + END; END; $$; + -- procrastinate_finish_job -- the next_scheduled_at argument is kept for compatibility reasons, it is to be -- removed after 1.0.0 is released From af6f84d2083270b628c8d72f9a05938eb2806dd9 Mon Sep 17 00:00:00 2001 From: Thorsten Rademaker Date: Tue, 3 Dec 2024 14:49:10 +0100 Subject: [PATCH 2/6] feat: fetch job with retry Fetch job with retry after max retries get any doable job instead --- procrastinate/manager.py | 14 ++- .../02.15.00_01_new_fetch_job_function.sql | 103 ++++++++++++------ procrastinate/sql/queries.sql | 5 + procrastinate/sql/schema.sql | 102 +++++++++++------ 4 files changed, 154 insertions(+), 70 deletions(-) diff --git a/procrastinate/manager.py b/procrastinate/manager.py index d77261f17..016495df0 100644 --- a/procrastinate/manager.py +++ b/procrastinate/manager.py @@ -134,10 +134,16 @@ async def fetch_job(self, queues: Iterable[str] | None) -> jobs.Job | None: : None if no suitable job was found. The job otherwise. """ - - row = await self.connector.execute_query_one_async( - query=sql.queries["fetch_job"], queues=queues - ) + try: + row = await self.connector.execute_query_one_async( + query=sql.queries["fetch_job"], queues=queues + ) + except exceptions.UniqueViolation as _exc: + # getting a job with lock lead to conflicts + # get any doable job instead + row = await self.connector.execute_query_one_async( + query=sql.queries["fetch_job_without_lock"], queues=queues + ) # fetch_tasks will always return a row, but is there's no relevant # value, it will all be None diff --git a/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql b/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql index 22fa7e6a9..5e0230e15 100644 --- a/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql +++ b/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql @@ -1,41 +1,78 @@ -- new fetch job. Only checks for doing. On update conflict return NULL -CREATE OR REPLACE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) - RETURNS procrastinate_jobs +CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job(target_queue_names character varying[]) + RETURNS procrastinate.procrastinate_jobs + LANGUAGE plpgsql +AS $function$ +DECLARE + found_jobs procrastinate_jobs; + retry_count INT := 0; +BEGIN + LOOP + BEGIN + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + WHERE + (jobs.lock IS NULL OR + NOT EXISTS ( -- reject the job if its lock has current jobs + SELECT 1 + FROM procrastinate_jobs AS jobs_with_locks + WHERE + jobs.lock IS NOT NULL + AND jobs_with_locks.lock = jobs.lock + AND jobs_with_locks.status = 'doing' + LIMIT 1 + )) + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names)) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + + RETURN found_jobs; + EXCEPTION + WHEN unique_violation THEN + PERFORM pg_sleep(random() * 0.01); -- fuzzy retry with upto 10ms sleep + retry_count := retry_count + 1; + IF retry_count >= 3 THEN + --RAISE NOTICE 'Maximum retry attempts reached, returning empty result.'; + RETURN unique_violation; + END IF; + END; + END LOOP; +END; +$function$ +; + +CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job_without_lock(target_queue_names character varying[]) + RETURNS procrastinate.procrastinate_jobs LANGUAGE plpgsql AS $$ DECLARE found_jobs procrastinate_jobs; BEGIN - BEGIN - WITH candidate AS ( - SELECT jobs.* - FROM procrastinate_jobs AS jobs - WHERE - (jobs.lock IS NULL OR - NOT EXISTS ( -- reject the job if its lock has current jobs - SELECT 1 - FROM procrastinate_jobs AS jobs_with_locks - WHERE - jobs.lock IS NOT NULL - AND jobs_with_locks.lock = jobs.lock - AND jobs_with_locks.status = 'doing' - LIMIT 1 - )) - AND jobs.status = 'todo' - AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names)) - AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) - ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 - FOR UPDATE OF jobs SKIP LOCKED - ) - UPDATE procrastinate_jobs - SET status = 'doing' - FROM candidate - WHERE procrastinate_jobs.id = candidate.id - RETURNING procrastinate_jobs.* INTO found_jobs; - RETURN found_jobs; - EXCEPTION - WHEN unique_violation THEN - RETURN NULL; -- Return empty result on conflict - END; + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + WHERE + jobs.lock IS NULL + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names)) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + RETURN found_jobs; END; $$; diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index ad91db9d9..59f65b35c 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -17,6 +17,11 @@ SELECT procrastinate_defer_periodic_job(%(queue)s, %(lock)s, %(queueing_lock)s, SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts FROM procrastinate_fetch_job(%(queues)s); +-- fetch_job_without_lock -- +-- Get the first awaiting job +SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts + FROM procrastinate_fetch_job(%(queues)s); + -- select_stalled_jobs -- -- Get running jobs that started more than a given time ago SELECT job.id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts, max(event.at) started_at diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index 0e455c6a2..d1227ad29 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -157,44 +157,80 @@ BEGIN END; $$; -CREATE OR REPLACE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) - RETURNS procrastinate_jobs +CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job(target_queue_names character varying[]) + RETURNS procrastinate.procrastinate_jobs LANGUAGE plpgsql AS $$ DECLARE found_jobs procrastinate_jobs; + retry_count INT := 0; BEGIN - BEGIN - WITH candidate AS ( - SELECT jobs.* - FROM procrastinate_jobs AS jobs - WHERE - (jobs.lock IS NULL OR - NOT EXISTS ( -- reject the job if its lock has current jobs - SELECT 1 - FROM procrastinate_jobs AS jobs_with_locks - WHERE - jobs.lock IS NOT NULL - AND jobs_with_locks.lock = jobs.lock - AND jobs_with_locks.status = 'doing' - LIMIT 1 - )) - AND jobs.status = 'todo' - AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names)) - AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) - ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 - FOR UPDATE OF jobs SKIP LOCKED - ) - UPDATE procrastinate_jobs - SET status = 'doing' - FROM candidate - WHERE procrastinate_jobs.id = candidate.id - RETURNING procrastinate_jobs.* INTO found_jobs; - RETURN found_jobs; - EXCEPTION - WHEN unique_violation THEN - RETURN NULL; -- Return empty result on conflict - END; + LOOP + BEGIN + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + WHERE + (jobs.lock IS NULL OR + NOT EXISTS ( -- reject the job if its lock has current jobs + SELECT 1 + FROM procrastinate_jobs AS jobs_with_locks + WHERE + jobs.lock IS NOT NULL + AND jobs_with_locks.lock = jobs.lock + AND jobs_with_locks.status = 'doing' + LIMIT 1 + )) + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names)) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + + RETURN found_jobs; + EXCEPTION + WHEN unique_violation THEN + PERFORM pg_sleep(random() * 0.01); -- fuzzy retry with upto 10ms sleep + retry_count := retry_count + 1; + IF retry_count >= 3 THEN + --RAISE NOTICE 'Maximum retry attempts reached, returning empty result.'; + RETURN unique_violation; + END IF; + END; + END LOOP; +END; +$$; + +CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job_without_lock(target_queue_names character varying[]) + RETURNS procrastinate.procrastinate_jobs + LANGUAGE plpgsql +AS $$ +DECLARE + found_jobs procrastinate_jobs; +BEGIN + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + WHERE + jobs.lock IS NULL + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names)) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + RETURN found_jobs; END; $$; From 30ae552a929ad9d83dce3162974dd2e3dd8364f5 Mon Sep 17 00:00:00 2001 From: Thorsten Rademaker Date: Tue, 3 Dec 2024 15:24:01 +0100 Subject: [PATCH 3/6] fix: raise instead of return --- .../sql/migrations/02.15.00_01_new_fetch_job_function.sql | 3 +-- procrastinate/sql/schema.sql | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql b/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql index 5e0230e15..cbdb5473a 100644 --- a/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql +++ b/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql @@ -41,8 +41,7 @@ BEGIN PERFORM pg_sleep(random() * 0.01); -- fuzzy retry with upto 10ms sleep retry_count := retry_count + 1; IF retry_count >= 3 THEN - --RAISE NOTICE 'Maximum retry attempts reached, returning empty result.'; - RETURN unique_violation; + RAISE; --reraise END IF; END; END LOOP; diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index d1227ad29..5be457ab9 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -199,8 +199,7 @@ BEGIN PERFORM pg_sleep(random() * 0.01); -- fuzzy retry with upto 10ms sleep retry_count := retry_count + 1; IF retry_count >= 3 THEN - --RAISE NOTICE 'Maximum retry attempts reached, returning empty result.'; - RETURN unique_violation; + RAISE; --reraise END IF; END; END LOOP; From 6626a23f4c71be701742abf3d0d2597cf147ea06 Mon Sep 17 00:00:00 2001 From: Thorsten Rademaker Date: Thu, 16 Jan 2025 17:33:13 +0100 Subject: [PATCH 4/6] Use correct sql func --- procrastinate/sql/queries.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index 646422471..31b3efff5 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -20,7 +20,7 @@ SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, -- fetch_job_without_lock -- -- Get the first awaiting job SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts - FROM procrastinate_fetch_job(%(queues)s); + FROM procrastinate_fetch_job_without_lock(%(queues)s); -- select_stalled_jobs -- -- Get running jobs that started more than a given time ago From ae19fa1c3e235d22d816db2cb42a1c7da86587f2 Mon Sep 17 00:00:00 2001 From: Thorsten Rademaker Date: Thu, 16 Jan 2025 17:41:15 +0100 Subject: [PATCH 5/6] Add 'aborting' to fetch job Add 'aborting' status back in to keep the old behaviour. Adding 'aborting' will slow down the query as the lock index does not include the aborting status. --- procrastinate/sql/schema.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index e7b64d3f2..c4385a6d8 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -180,7 +180,7 @@ BEGIN WHERE jobs.lock IS NOT NULL AND jobs_with_locks.lock = jobs.lock - AND jobs_with_locks.status = 'doing' + AND jobs_with_locks.status in ('doing', 'aborting') LIMIT 1 )) AND jobs.status = 'todo' From 7e1bcac99e39eb368da158598295d3f2730f6aae Mon Sep 17 00:00:00 2001 From: Thorsten Rademaker Date: Fri, 17 Jan 2025 11:39:53 +0100 Subject: [PATCH 6/6] fix migrations --- ...unction.sql => 03.00.00_02_pre_new_fetch_job_function.sql} | 4 ++-- procrastinate/sql/queries.sql | 2 +- procrastinate/sql/schema.sql | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) rename procrastinate/sql/migrations/{02.15.00_01_new_fetch_job_function.sql => 03.00.00_02_pre_new_fetch_job_function.sql} (91%) diff --git a/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql b/procrastinate/sql/migrations/03.00.00_02_pre_new_fetch_job_function.sql similarity index 91% rename from procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql rename to procrastinate/sql/migrations/03.00.00_02_pre_new_fetch_job_function.sql index cbdb5473a..a8b55c5f6 100644 --- a/procrastinate/sql/migrations/02.15.00_01_new_fetch_job_function.sql +++ b/procrastinate/sql/migrations/03.00.00_02_pre_new_fetch_job_function.sql @@ -1,5 +1,5 @@ -- new fetch job. Only checks for doing. On update conflict return NULL -CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job(target_queue_names character varying[]) +CREATE OR REPLACE FUNCTION procrastinate_fetch_job_v1(target_queue_names character varying[]) RETURNS procrastinate.procrastinate_jobs LANGUAGE plpgsql AS $function$ @@ -49,7 +49,7 @@ END; $function$ ; -CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job_without_lock(target_queue_names character varying[]) +CREATE OR REPLACE FUNCTION procrastinate_fetch_job_without_lock_v1(target_queue_names character varying[]) RETURNS procrastinate.procrastinate_jobs LANGUAGE plpgsql AS $$ diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index 31b3efff5..9c35d0826 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -20,7 +20,7 @@ SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, -- fetch_job_without_lock -- -- Get the first awaiting job SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts - FROM procrastinate_fetch_job_without_lock(%(queues)s); + FROM procrastinate_fetch_job_without_lock_v1(%(queues)s); -- select_stalled_jobs -- -- Get running jobs that started more than a given time ago diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index c4385a6d8..11eb90f34 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -208,7 +208,7 @@ BEGIN END; $$; -CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job_without_lock(target_queue_names character varying[]) +CREATE FUNCTION procrastinate_fetch_job_without_lock_v1(target_queue_names character varying[]) RETURNS procrastinate.procrastinate_jobs LANGUAGE plpgsql AS $$