From cfa7c137bf4597abd9f8c414e63e5496c6d93b0c Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Fri, 11 Jul 2025 18:35:15 -0400 Subject: [PATCH 01/11] Add a deprecator to SolidQueue module And include it in the host application's deprecators. --- lib/solid_queue.rb | 4 ++++ lib/solid_queue/engine.rb | 4 ++++ lib/solid_queue/version.rb | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e0d51c8c..2c49a689 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -69,6 +69,10 @@ def preserve_finished_jobs? preserve_finished_jobs end + def deprecator + @deprecator ||= ActiveSupport::Deprecation.new(next_major_version, "SolidQueue") + end + def instrument(channel, **options, &block) ActiveSupport::Notifications.instrument("#{channel}.solid_queue", **options, &block) end diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index d10997c7..7debf229 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -37,5 +37,9 @@ class Engine < ::Rails::Engine include ActiveJob::ConcurrencyControls end end + + initializer "solid_queue.deprecator" do |app| + app.deprecators[:solid_queue] = SolidQueue.deprecator + end end end diff --git a/lib/solid_queue/version.rb b/lib/solid_queue/version.rb index 3de06d85..13917578 100644 --- a/lib/solid_queue/version.rb +++ b/lib/solid_queue/version.rb @@ -1,3 +1,7 @@ module SolidQueue VERSION = "1.2.0" + + def self.next_major_version + Gem::Version.new(VERSION).segments.first + 1 + end end From b8efb15d7220f0675a6bd654448527a430cc4310 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sat, 19 Jul 2025 13:51:17 +0200 Subject: [PATCH 02/11] Remove post-install message It's been a long time already since version 1.0 went out. --- solid_queue.gemspec | 5 ----- 1 file changed, 5 deletions(-) diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 231a211b..d7e2570e 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -10,11 +10,6 @@ Gem::Specification.new do |spec| spec.description = "Database-backed Active Job backend." spec.license = "MIT" - spec.post_install_message = <<~MESSAGE - Upgrading from Solid Queue < 1.0? Check details on breaking changes and upgrade instructions - --> https://github.com/rails/solid_queue/blob/main/UPGRADING.md - MESSAGE - spec.metadata["homepage_uri"] = spec.homepage spec.metadata["source_code_uri"] = "https://github.com/rails/solid_queue" From 46286cd1a3aca17d4e5ef3fb53ba82177cac3fc8 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sat, 19 Jul 2025 14:28:04 +0200 Subject: [PATCH 03/11] Add `update` generator and new migration To be used as `bin/rails solid_queue:update` to copy new migrations. The new migration links claimed executions to processes with `process_name`, and replaces the unique index on processes on `supervisor_id, name` with just `name`, as we'll want the name to uniquely identify a process because we'll rely on that to release claimed executions, independently from the supervisor. Even though it was really unlikely to have a collision on name because these are set randomly with `SecureRandom.hex(10)`, in this way we guarantee it. fix --- ..._executions_with_processes_through_name.rb | 30 +++++++++++++++++++ .../solid_queue/update/update_generator.rb | 20 +++++++++++++ lib/solid_queue/tasks.rb | 7 ++++- 3 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 lib/generators/solid_queue/update/templates/db/link_claimed_executions_with_processes_through_name.rb create mode 100644 lib/generators/solid_queue/update/update_generator.rb diff --git a/lib/generators/solid_queue/update/templates/db/link_claimed_executions_with_processes_through_name.rb b/lib/generators/solid_queue/update/templates/db/link_claimed_executions_with_processes_through_name.rb new file mode 100644 index 00000000..439b9504 --- /dev/null +++ b/lib/generators/solid_queue/update/templates/db/link_claimed_executions_with_processes_through_name.rb @@ -0,0 +1,30 @@ +class LinkClaimedExecutionsWithProcessesThroughName < ActiveRecord::Migration[<%= ActiveRecord::VERSION::STRING.to_f %>] + def up + unless connection.column_exists?(:solid_queue_claimed_executions, :process_name) + add_column :solid_queue_claimed_executions, :process_name, :string + add_index :solid_queue_claimed_executions, :process_name + end + + unless connection.index_exists?(:solid_queue_processes, :name) + add_index :solid_queue_processes, :name, unique: true + end + + if connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + remove_index :solid_queue_processes, [ :name, :supervisor_id ] + end + end + + def down + if connection.column_exists?(:solid_queue_claimed_executions, :process_name) + remove_column :solid_queue_claimed_executions, :process_name + end + + if connection.index_exists?(:solid_queue_processes, :name) + remove_index :solid_queue_processes, :name + end + + unless connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + add_index :solid_queue_processes, [ :name, :supervisor_id ], unique: true + end + end +end diff --git a/lib/generators/solid_queue/update/update_generator.rb b/lib/generators/solid_queue/update/update_generator.rb new file mode 100644 index 00000000..272b0e3c --- /dev/null +++ b/lib/generators/solid_queue/update/update_generator.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require "rails/generators/active_record" + +class SolidQueue::UpdateGenerator < Rails::Generators::Base + include ActiveRecord::Generators::Migration + + source_root File.expand_path("templates", __dir__) + + class_option :database, type: :string, aliases: %i[ --db ], default: "queue", + desc: "The database that Solid Queue uses. Defaults to `queue`" + + def copy_new_migrations + template_dir = Dir.new(File.join(self.class.source_root, "db")) + + template_dir.each_child do |migration_file| + migration_template File.join("db", migration_file), File.join(db_migrate_path, migration_file), skip: true + end + end +end diff --git a/lib/solid_queue/tasks.rb b/lib/solid_queue/tasks.rb index 91cd778b..f8d22fc2 100644 --- a/lib/solid_queue/tasks.rb +++ b/lib/solid_queue/tasks.rb @@ -4,7 +4,12 @@ Rails::Command.invoke :generate, [ "solid_queue:install" ] end - desc "start solid_queue supervisor to dispatch and process jobs" + desc "Update Solid Queue" + task :update do + Rails::Command.invoke :generate, [ "solid_queue:update" ] + end + + desc "Start Solid Queue supervisor to dispatch and process jobs" task start: :environment do SolidQueue::Supervisor.start end From 535182830297504a8a2c665927e124c9f88d169a Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 20 Jul 2025 19:25:57 +0200 Subject: [PATCH 04/11] Apply update and new migration to Dummy app --- ..._executions_with_processes_through_name.rb | 30 +++++++++++++++++++ test/dummy/db/queue_schema.rb | 6 ++-- 2 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 test/dummy/db/queue_migrate/20250720172253_link_claimed_executions_with_processes_through_name.rb diff --git a/test/dummy/db/queue_migrate/20250720172253_link_claimed_executions_with_processes_through_name.rb b/test/dummy/db/queue_migrate/20250720172253_link_claimed_executions_with_processes_through_name.rb new file mode 100644 index 00000000..7d6f93e7 --- /dev/null +++ b/test/dummy/db/queue_migrate/20250720172253_link_claimed_executions_with_processes_through_name.rb @@ -0,0 +1,30 @@ +class LinkClaimedExecutionsWithProcessesThroughName < ActiveRecord::Migration[7.1] + def up + unless connection.column_exists?(:solid_queue_claimed_executions, :process_name) + add_column :solid_queue_claimed_executions, :process_name, :string + add_index :solid_queue_claimed_executions, :process_name + end + + unless connection.index_exists?(:solid_queue_processes, :name) + add_index :solid_queue_processes, :name, unique: true + end + + if connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + remove_index :solid_queue_processes, [ :name, :supervisor_id ] + end + end + + def down + if connection.column_exists?(:solid_queue_claimed_executions, :process_name) + remove_column :solid_queue_claimed_executions, :process_name + end + + if connection.index_exists?(:solid_queue_processes, :name) + remove_index :solid_queue_processes, :name + end + + unless connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + add_index :solid_queue_processes, [ :name, :supervisor_id ], unique: true + end + end +end diff --git a/test/dummy/db/queue_schema.rb b/test/dummy/db/queue_schema.rb index 697c2e92..20b83084 100644 --- a/test/dummy/db/queue_schema.rb +++ b/test/dummy/db/queue_schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 1) do +ActiveRecord::Schema[7.1].define(version: 2025_07_20_172253) do create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.bigint "job_id", null: false t.string "queue_name", null: false @@ -27,8 +27,10 @@ t.bigint "job_id", null: false t.bigint "process_id" t.datetime "created_at", null: false + t.string "process_name" t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + t.index ["process_name"], name: "index_solid_queue_claimed_executions_on_process_name" end create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| @@ -72,7 +74,7 @@ t.datetime "created_at", null: false t.string "name", null: false t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" - t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index ["name"], name: "index_solid_queue_processes_on_name", unique: true t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" end From 6bdf56f2cd74d1ced6c0ec43c2d77bd7e9e427cf Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 20 Jul 2025 20:00:40 +0200 Subject: [PATCH 05/11] Reflect new migration on initial schema People installing Solid Queue for the first time will get the final schema. Migrations are idempotent so they can later update and run migrations without any problems. --- .../solid_queue/install/templates/db/queue_schema.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/generators/solid_queue/install/templates/db/queue_schema.rb b/lib/generators/solid_queue/install/templates/db/queue_schema.rb index 85194b6a..760ee615 100644 --- a/lib/generators/solid_queue/install/templates/db/queue_schema.rb +++ b/lib/generators/solid_queue/install/templates/db/queue_schema.rb @@ -15,8 +15,10 @@ t.bigint "job_id", null: false t.bigint "process_id" t.datetime "created_at", null: false + t.string "process_name" t.index [ "job_id" ], name: "index_solid_queue_claimed_executions_on_job_id", unique: true t.index [ "process_id", "job_id" ], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + t.index [ "process_name" ], name: "index_solid_queue_claimed_executions_on_process_name" end create_table "solid_queue_failed_executions", force: :cascade do |t| @@ -60,7 +62,7 @@ t.datetime "created_at", null: false t.string "name", null: false t.index [ "last_heartbeat_at" ], name: "index_solid_queue_processes_on_last_heartbeat_at" - t.index [ "name", "supervisor_id" ], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index [ "name" ], name: "index_solid_queue_processes_on_name", unique: true t.index [ "supervisor_id" ], name: "index_solid_queue_processes_on_supervisor_id" end From 698695a0f3a0b9d74b7c829a15bddd4eb18204a7 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 20 Jul 2025 21:26:51 +0200 Subject: [PATCH 06/11] Pass `process.name` all the way to claimed executions when claiming jobs --- app/models/solid_queue/claimed_execution.rb | 14 +++++----- app/models/solid_queue/process/executor.rb | 2 +- app/models/solid_queue/ready_execution.rb | 12 ++++----- lib/solid_queue/worker.rb | 2 +- test/integration/instrumentation_test.rb | 2 +- .../solid_queue/claimed_execution_test.rb | 2 +- test/models/solid_queue/process_test.rb | 4 +-- .../solid_queue/ready_execution_test.rb | 27 ++++++++++--------- test/unit/supervisor_test.rb | 6 ++--- 9 files changed, 37 insertions(+), 34 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 8840505b..c52590b3 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true class SolidQueue::ClaimedExecution < SolidQueue::Execution - belongs_to :process + belongs_to :process, primary_key: :name, foreign_key: :process_name scope :orphaned, -> { where.missing(:process) } @@ -12,12 +12,13 @@ def success? end class << self - def claiming(job_ids, process_id, &block) - job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } } + def claiming(job_ids, process, &block) + process_data = { process_id: process.id, process_name: process.name } + job_data = Array(job_ids).collect { |job_id| { job_id: job_id }.merge(process_data) } - SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload| + SolidQueue.instrument(:claim, job_ids: job_ids, **process_data) do |payload| insert_all!(job_data) - where(job_id: job_ids, process_id: process_id).load.tap do |claimed| + where(job_id: job_ids, process_id: process.id).load.tap do |claimed| block.call(claimed) payload[:size] = claimed.size @@ -45,6 +46,7 @@ def fail_all_with(error) end payload[:process_ids] = executions.map(&:process_id).uniq + payload[:process_names] = executions.map(&:process_name).uniq payload[:job_ids] = executions.map(&:job_id).uniq payload[:size] = executions.size end @@ -74,7 +76,7 @@ def perform end def release - SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do + SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id, process_name: process_name) do transaction do job.dispatch_bypassing_concurrency_limits destroy! diff --git a/app/models/solid_queue/process/executor.rb b/app/models/solid_queue/process/executor.rb index 8dcd12aa..6b212564 100644 --- a/app/models/solid_queue/process/executor.rb +++ b/app/models/solid_queue/process/executor.rb @@ -6,7 +6,7 @@ module Executor extend ActiveSupport::Concern included do - has_many :claimed_executions + has_many :claimed_executions, primary_key: :name, foreign_key: :process_name after_destroy :release_all_claimed_executions end diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 35a11292..4c207487 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -7,9 +7,9 @@ class ReadyExecution < Execution assumes_attributes_from_job class << self - def claim(queue_list, limit, process_id) + def claim(queue_list, limit, process) QueueSelector.new(queue_list, self).scoped_relations.flat_map do |queue_relation| - select_and_lock(queue_relation, process_id, limit).tap do |locked| + select_and_lock(queue_relation, process, limit).tap do |locked| limit -= locked.size end end @@ -20,12 +20,12 @@ def aggregated_count_across(queue_list) end private - def select_and_lock(queue_relation, process_id, limit) + def select_and_lock(queue_relation, process, limit) return [] if limit <= 0 transaction do candidates = select_candidates(queue_relation, limit) - lock_candidates(candidates, process_id) + lock_candidates(candidates, process) end end @@ -33,10 +33,10 @@ def select_candidates(queue_relation, limit) queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id) end - def lock_candidates(executions, process_id) + def lock_candidates(executions, process) return [] if executions.none? - SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process_id) do |claimed| + SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process) do |claimed| ids_to_delete = executions.index_by(&:job_id).values_at(*claimed.map(&:job_id)).map(&:id) where(id: ids_to_delete).delete_all end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index e036a5fd..d426688c 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -38,7 +38,7 @@ def poll def claim_executions with_polling_volume do - SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process_id) + SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process) end end diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 95dadb19..4133ec87 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -150,7 +150,7 @@ class InstrumentationTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } jobs = SolidQueue::Job.last(3) - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) events = subscribed(/fail.*_claimed\.solid_queue/) do SolidQueue::Process.prune diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 98513c94..5dfe69b3 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -87,7 +87,7 @@ def prepare_and_claim_job(active_job, process: @process) job.prepare_for_execution assert_difference -> { SolidQueue::ClaimedExecution.count } => +1 do - SolidQueue::ReadyExecution.claim(job.queue_name, 1, process.id) + SolidQueue::ReadyExecution.claim(job.queue_name, 1, process) end SolidQueue::ClaimedExecution.last diff --git a/test/models/solid_queue/process_test.rb b/test/models/solid_queue/process_test.rb index 489b2aca..e81a848e 100644 --- a/test/models/solid_queue/process_test.rb +++ b/test/models/solid_queue/process_test.rb @@ -20,7 +20,7 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } jobs = SolidQueue::Job.last(3) - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) travel_to 10.minutes.from_now @@ -40,7 +40,7 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } jobs = SolidQueue::Job.last(3) - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) travel_to 10.minutes.from_now diff --git a/test/models/solid_queue/ready_execution_test.rb b/test/models/solid_queue/ready_execution_test.rb index dd9269ca..6c712598 100644 --- a/test/models/solid_queue/ready_execution_test.rb +++ b/test/models/solid_queue/ready_execution_test.rb @@ -6,12 +6,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.set(queue: "backend", priority: 5 - i).perform_later(i) end + @process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123", metadata: { queue: "background" }) @jobs = SolidQueue::Job.where(queue_name: "backend").order(:priority) end test "claim all jobs for existing queue" do assert_claimed_jobs(@jobs.count) do - SolidQueue::ReadyExecution.claim("backend", @jobs.count + 1, 42) + SolidQueue::ReadyExecution.claim("backend", @jobs.count + 1, @process) end @jobs.each(&:reload) @@ -21,13 +22,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase test "claim jobs for queue without jobs at the moment" do assert_no_difference [ -> { SolidQueue::ReadyExecution.count }, -> { SolidQueue::ClaimedExecution.count } ] do - SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10, 42) + SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10, @process) end end test "claim some jobs for existing queue" do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim("backend", 2, 42) + SolidQueue::ReadyExecution.claim("backend", 2, @process) end @jobs.first(2).each do |job| @@ -45,7 +46,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(6) do - SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, @process) end end @@ -53,7 +54,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(6) do - SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, @process) end end @@ -61,7 +62,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(1) do - SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, @process) end assert @jobs.none?(&:claimed?) @@ -73,7 +74,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase SolidQueue::Queue.find_by_name("backend").pause assert_claimed_jobs(1) do - SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, @process) end @jobs.each(&:reload) @@ -84,7 +85,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(6) do - SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, @process) end end @@ -92,7 +93,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(0) do - SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, @process) end end @@ -101,7 +102,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase job = SolidQueue::Job.last assert_claimed_jobs(3) do - SolidQueue::ReadyExecution.claim("*", 3, 42) + SolidQueue::ReadyExecution.claim("*", 3, @process) end assert job.reload.claimed? @@ -117,7 +118,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase assert_equal "background", job.queue_name assert_claimed_jobs(3) do - SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42) + SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, @process) end assert job.reload.claimed? @@ -136,7 +137,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase claimed_jobs = [] 4.times do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, 42) + SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, @process) end claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job) @@ -157,7 +158,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase claimed_jobs = [] 5.times do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, 42) + SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, @process) end claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job) diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 7a531ad2..7ef568b4 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -111,7 +111,7 @@ class SupervisorTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123") - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) assert_equal 3, SolidQueue::ClaimedExecution.count assert_equal 0, SolidQueue::ReadyExecution.count @@ -138,7 +138,7 @@ class SupervisorTest < ActiveSupport::TestCase 4.times { |i| ThrottledUpdateResultJob.set(queue: :new_queue).perform_later(result) } process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123") - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) assert_equal 3, SolidQueue::ClaimedExecution.count assert_equal 0, SolidQueue::ReadyExecution.count @@ -193,7 +193,7 @@ class SupervisorTest < ActiveSupport::TestCase worker_process = SolidQueue::Process.register(kind: "Worker", pid: 999_999, name: worker_name) job = StoreResultJob.perform_later(42) - claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process.id).first + claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process).first terminated_fork = Struct.new(:name).new(worker_name) From 4b1af3a1be9be4fa4d33a54d81185c7f41c111fc Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 20 Jul 2025 21:40:30 +0200 Subject: [PATCH 07/11] Instruct Rubocop to ignore DB templates --- .rubocop.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.rubocop.yml b/.rubocop.yml index 75df1173..7299253b 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -7,3 +7,4 @@ AllCops: TargetRubyVersion: 3.3 Exclude: - "**/*_schema.rb" + - "lib/generators/solid_queue/update/templates/db/*" From 0f3ab74b85ac847f1f033a30a6c1a01776d99282 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 21 Jul 2025 14:57:12 +0200 Subject: [PATCH 08/11] Guard usage of new `process_name` to link claimed executions We can only use this if new migrations have been run. If not, we just emit a deprecation warning and continue as before. --- app/models/solid_queue/claimed_execution.rb | 22 ++++++++++++++++---- app/models/solid_queue/process/executor.rb | 8 ++++++- app/models/solid_queue/record.rb | 22 +++++++++++++++----- test/integration/processes_lifecycle_test.rb | 2 +- 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index c52590b3..11540e77 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -1,7 +1,18 @@ # frozen_string_literal: true class SolidQueue::ClaimedExecution < SolidQueue::Execution - belongs_to :process, primary_key: :name, foreign_key: :process_name + def self.process_name_column_exists? + column_names.include?("process_name") + end + + if process_name_column_exists? + belongs_to :process, primary_key: :name, foreign_key: :process_name + else + warn_about_pending_migrations + + belongs_to :process + attr_accessor :process_name + end scope :orphaned, -> { where.missing(:process) } @@ -13,7 +24,10 @@ def success? class << self def claiming(job_ids, process, &block) - process_data = { process_id: process.id, process_name: process.name } + process_data = { process_id: process.id }.tap do |hsh| + hsh[:process_name] = process.name if process_name_column_exists? + end + job_data = Array(job_ids).collect { |job_id| { job_id: job_id }.merge(process_data) } SolidQueue.instrument(:claim, job_ids: job_ids, **process_data) do |payload| @@ -45,8 +59,8 @@ def fail_all_with(error) execution.unblock_next_job end - payload[:process_ids] = executions.map(&:process_id).uniq - payload[:process_names] = executions.map(&:process_name).uniq + payload[:process_ids] = executions.map(&:process_id).uniq.presence + payload[:process_names] = executions.map(&:process_name).uniq.presence payload[:job_ids] = executions.map(&:job_id).uniq payload[:size] = executions.size end diff --git a/app/models/solid_queue/process/executor.rb b/app/models/solid_queue/process/executor.rb index 6b212564..9bf2db42 100644 --- a/app/models/solid_queue/process/executor.rb +++ b/app/models/solid_queue/process/executor.rb @@ -6,7 +6,13 @@ module Executor extend ActiveSupport::Concern included do - has_many :claimed_executions, primary_key: :name, foreign_key: :process_name + if ClaimedExecution.process_name_column_exists? + has_many :claimed_executions, primary_key: :name, foreign_key: :process_name + else + warn_about_pending_migrations + + has_many :claimed_executions + end after_destroy :release_all_claimed_executions end diff --git a/app/models/solid_queue/record.rb b/app/models/solid_queue/record.rb index d73e41b2..c99f1f71 100644 --- a/app/models/solid_queue/record.rb +++ b/app/models/solid_queue/record.rb @@ -6,11 +6,23 @@ class Record < ActiveRecord::Base connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to - def self.non_blocking_lock - if SolidQueue.use_skip_locked - lock(Arel.sql("FOR UPDATE SKIP LOCKED")) - else - lock + class << self + def non_blocking_lock + if SolidQueue.use_skip_locked + lock(Arel.sql("FOR UPDATE SKIP LOCKED")) + else + lock + end + end + + def warn_about_pending_migrations + SolidQueue.deprecator.warn(<<~DEPRECATION) + Solid Queue has pending database migrations. To get the new migration files, run: + rails solid_queue:update + And then: + rails db:migrate + These migrations will be required after version 2.0 + DEPRECATION end end end diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index ebb1100c..f7383cfd 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -271,7 +271,7 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase private def assert_clean_termination - wait_for_registered_processes 0, timeout: 0.2.second + wait_for_registered_processes 0, timeout: 0.5.second assert_no_registered_processes assert_no_claimed_jobs assert_not process_exists?(@pid) From aaea3cd232bcb8199d60192b287c9f538ba1a6f5 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 21 Jul 2025 15:05:02 +0200 Subject: [PATCH 09/11] Add upgrade instructions to `UPGRADING` --- UPGRADING.md | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/UPGRADING.md b/UPGRADING.md index 51ab06a8..63e35e92 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -1,4 +1,18 @@ -# Upgrading to version 1.x +# Upgrading to version 1.3 +There's a new migration in this version that can be installed via +```bash +bin/rails solid_queue:update +``` +which is a new generator to facilitate updates. + +Then, the migration needs to be run with +```bash +bin/rails db:migrate +``` + +The migration affects the tables `solid_queue_claimed_executions` and `solid_queue_processes` tables. It's not mandatory: everything will continue working as before without it, only a deprecation warning will be emitted. The migration will be mandatory in the next major version (2.0). + +# Upgrading to version >=1.0, < 1.3 The value returned for `enqueue_after_transaction_commit?` has changed to `true`, and it's no longer configurable. If you want to change this, you need to use Active Job's configuration options. # Upgrading to version 0.9.x From dfae79bad32fc5ebec2b3a0f5e6845fdcbe6cc07 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 21 Jul 2025 15:38:24 +0200 Subject: [PATCH 10/11] Include instructions to set a different DB for the migrations In the deprecation warning and upgrade instructions. --- UPGRADING.md | 9 ++++++--- app/models/solid_queue/record.rb | 4 +++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/UPGRADING.md b/UPGRADING.md index 63e35e92..2fc0d8e8 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -1,11 +1,14 @@ # Upgrading to version 1.3 -There's a new migration in this version that can be installed via +There's a new migration in this version that can be installed via: ```bash bin/rails solid_queue:update ``` -which is a new generator to facilitate updates. +which is a new generator to facilitate updates. This will use the `queue` database by default, but if you're using a different database name for Solid Queue, you can install the new migrations in the right place with: +```bash +DATABASE=your-solid-queue-db-name bin/rails solid_queue:update +``` -Then, the migration needs to be run with +Finally, the migration needs to be run with: ```bash bin/rails db:migrate ``` diff --git a/app/models/solid_queue/record.rb b/app/models/solid_queue/record.rb index c99f1f71..c2549620 100644 --- a/app/models/solid_queue/record.rb +++ b/app/models/solid_queue/record.rb @@ -19,7 +19,9 @@ def warn_about_pending_migrations SolidQueue.deprecator.warn(<<~DEPRECATION) Solid Queue has pending database migrations. To get the new migration files, run: rails solid_queue:update - And then: + which will install the migration under `db/queue_migrate`. To change the database, run + DATABASE=your-solid-queue-db rails solid_queue:update + Then, apply the migrations with: rails db:migrate These migrations will be required after version 2.0 DEPRECATION From a06ef6b2926395bbcdb5fac6f7581cb9c6a82fa2 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 22 Jul 2025 13:58:06 +0200 Subject: [PATCH 11/11] Include `process_name` in relevant instrumentation events --- lib/solid_queue/log_subscriber.rb | 6 +++--- test/integration/instrumentation_test.rb | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/solid_queue/log_subscriber.rb b/lib/solid_queue/log_subscriber.rb index 96fb19bf..aeb9d038 100644 --- a/lib/solid_queue/log_subscriber.rb +++ b/lib/solid_queue/log_subscriber.rb @@ -8,7 +8,7 @@ def dispatch_scheduled(event) end def claim(event) - debug formatted_event(event, action: "Claim jobs", **event.payload.slice(:process_id, :job_ids, :claimed_job_ids, :size)) + debug formatted_event(event, action: "Claim jobs", **event.payload.slice(:process_id, :process_name, :job_ids, :claimed_job_ids, :size)) end def release_many_claimed(event) @@ -16,11 +16,11 @@ def release_many_claimed(event) end def fail_many_claimed(event) - warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids)) + warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids, :process_names)) end def release_claimed(event) - info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id)) + info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id, :process_name)) end def retry_all(event) diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 4133ec87..8aae8a3e 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -34,7 +34,7 @@ class InstrumentationTest < ActiveSupport::TestCase end assert_equal 1, events.size - assert_event events.first, "claim", process_id: process.id, job_ids: jobs.map(&:id), claimed_job_ids: jobs.map(&:id), size: 3 + assert_event events.first, "claim", process_id: process.id, process_name: process.name, job_ids: jobs.map(&:id), claimed_job_ids: jobs.map(&:id), size: 3 end test "polling emits events" do @@ -68,7 +68,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert_equal 2, events.size release_one_event, release_many_event = events - assert_event release_one_event, "release_claimed", job_id: SolidQueue::Job.last.id, process_id: process.id + assert_event release_one_event, "release_claimed", job_id: SolidQueue::Job.last.id, process_id: process.id, process_name: process.name assert_event release_many_event, "release_many_claimed", size: 1 end @@ -157,7 +157,7 @@ class InstrumentationTest < ActiveSupport::TestCase end assert_equal 1, events.count - assert_event events.first, "fail_many_claimed", process_ids: [ process.id ], job_ids: jobs.map(&:id), size: 3 + assert_event events.first, "fail_many_claimed", process_ids: [ process.id ], process_names: [ process.name ], job_ids: jobs.map(&:id), size: 3 end test "errors when deregistering processes are included in deregister_process events" do