diff --git a/.rubocop.yml b/.rubocop.yml index 75df1173..7299253b 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -7,3 +7,4 @@ AllCops: TargetRubyVersion: 3.3 Exclude: - "**/*_schema.rb" + - "lib/generators/solid_queue/update/templates/db/*" diff --git a/UPGRADING.md b/UPGRADING.md index 51ab06a8..2fc0d8e8 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -1,4 +1,21 @@ -# Upgrading to version 1.x +# Upgrading to version 1.3 +There's a new migration in this version that can be installed via: +```bash +bin/rails solid_queue:update +``` +which is a new generator to facilitate updates. This will use the `queue` database by default, but if you're using a different database name for Solid Queue, you can install the new migrations in the right place with: +```bash +DATABASE=your-solid-queue-db-name bin/rails solid_queue:update +``` + +Finally, the migration needs to be run with: +```bash +bin/rails db:migrate +``` + +The migration affects the tables `solid_queue_claimed_executions` and `solid_queue_processes` tables. It's not mandatory: everything will continue working as before without it, only a deprecation warning will be emitted. The migration will be mandatory in the next major version (2.0). + +# Upgrading to version >=1.0, < 1.3 The value returned for `enqueue_after_transaction_commit?` has changed to `true`, and it's no longer configurable. If you want to change this, you need to use Active Job's configuration options. # Upgrading to version 0.9.x diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 8840505b..11540e77 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -1,7 +1,18 @@ # frozen_string_literal: true class SolidQueue::ClaimedExecution < SolidQueue::Execution - belongs_to :process + def self.process_name_column_exists? + column_names.include?("process_name") + end + + if process_name_column_exists? + belongs_to :process, primary_key: :name, foreign_key: :process_name + else + warn_about_pending_migrations + + belongs_to :process + attr_accessor :process_name + end scope :orphaned, -> { where.missing(:process) } @@ -12,12 +23,16 @@ def success? end class << self - def claiming(job_ids, process_id, &block) - job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } } + def claiming(job_ids, process, &block) + process_data = { process_id: process.id }.tap do |hsh| + hsh[:process_name] = process.name if process_name_column_exists? + end + + job_data = Array(job_ids).collect { |job_id| { job_id: job_id }.merge(process_data) } - SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload| + SolidQueue.instrument(:claim, job_ids: job_ids, **process_data) do |payload| insert_all!(job_data) - where(job_id: job_ids, process_id: process_id).load.tap do |claimed| + where(job_id: job_ids, process_id: process.id).load.tap do |claimed| block.call(claimed) payload[:size] = claimed.size @@ -44,7 +59,8 @@ def fail_all_with(error) execution.unblock_next_job end - payload[:process_ids] = executions.map(&:process_id).uniq + payload[:process_ids] = executions.map(&:process_id).uniq.presence + payload[:process_names] = executions.map(&:process_name).uniq.presence payload[:job_ids] = executions.map(&:job_id).uniq payload[:size] = executions.size end @@ -74,7 +90,7 @@ def perform end def release - SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do + SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id, process_name: process_name) do transaction do job.dispatch_bypassing_concurrency_limits destroy! diff --git a/app/models/solid_queue/process/executor.rb b/app/models/solid_queue/process/executor.rb index 8dcd12aa..9bf2db42 100644 --- a/app/models/solid_queue/process/executor.rb +++ b/app/models/solid_queue/process/executor.rb @@ -6,7 +6,13 @@ module Executor extend ActiveSupport::Concern included do - has_many :claimed_executions + if ClaimedExecution.process_name_column_exists? + has_many :claimed_executions, primary_key: :name, foreign_key: :process_name + else + warn_about_pending_migrations + + has_many :claimed_executions + end after_destroy :release_all_claimed_executions end diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 35a11292..4c207487 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -7,9 +7,9 @@ class ReadyExecution < Execution assumes_attributes_from_job class << self - def claim(queue_list, limit, process_id) + def claim(queue_list, limit, process) QueueSelector.new(queue_list, self).scoped_relations.flat_map do |queue_relation| - select_and_lock(queue_relation, process_id, limit).tap do |locked| + select_and_lock(queue_relation, process, limit).tap do |locked| limit -= locked.size end end @@ -20,12 +20,12 @@ def aggregated_count_across(queue_list) end private - def select_and_lock(queue_relation, process_id, limit) + def select_and_lock(queue_relation, process, limit) return [] if limit <= 0 transaction do candidates = select_candidates(queue_relation, limit) - lock_candidates(candidates, process_id) + lock_candidates(candidates, process) end end @@ -33,10 +33,10 @@ def select_candidates(queue_relation, limit) queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id) end - def lock_candidates(executions, process_id) + def lock_candidates(executions, process) return [] if executions.none? - SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process_id) do |claimed| + SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process) do |claimed| ids_to_delete = executions.index_by(&:job_id).values_at(*claimed.map(&:job_id)).map(&:id) where(id: ids_to_delete).delete_all end diff --git a/app/models/solid_queue/record.rb b/app/models/solid_queue/record.rb index d73e41b2..c2549620 100644 --- a/app/models/solid_queue/record.rb +++ b/app/models/solid_queue/record.rb @@ -6,11 +6,25 @@ class Record < ActiveRecord::Base connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to - def self.non_blocking_lock - if SolidQueue.use_skip_locked - lock(Arel.sql("FOR UPDATE SKIP LOCKED")) - else - lock + class << self + def non_blocking_lock + if SolidQueue.use_skip_locked + lock(Arel.sql("FOR UPDATE SKIP LOCKED")) + else + lock + end + end + + def warn_about_pending_migrations + SolidQueue.deprecator.warn(<<~DEPRECATION) + Solid Queue has pending database migrations. To get the new migration files, run: + rails solid_queue:update + which will install the migration under `db/queue_migrate`. To change the database, run + DATABASE=your-solid-queue-db rails solid_queue:update + Then, apply the migrations with: + rails db:migrate + These migrations will be required after version 2.0 + DEPRECATION end end end diff --git a/lib/generators/solid_queue/install/templates/db/queue_schema.rb b/lib/generators/solid_queue/install/templates/db/queue_schema.rb index 85194b6a..760ee615 100644 --- a/lib/generators/solid_queue/install/templates/db/queue_schema.rb +++ b/lib/generators/solid_queue/install/templates/db/queue_schema.rb @@ -15,8 +15,10 @@ t.bigint "job_id", null: false t.bigint "process_id" t.datetime "created_at", null: false + t.string "process_name" t.index [ "job_id" ], name: "index_solid_queue_claimed_executions_on_job_id", unique: true t.index [ "process_id", "job_id" ], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + t.index [ "process_name" ], name: "index_solid_queue_claimed_executions_on_process_name" end create_table "solid_queue_failed_executions", force: :cascade do |t| @@ -60,7 +62,7 @@ t.datetime "created_at", null: false t.string "name", null: false t.index [ "last_heartbeat_at" ], name: "index_solid_queue_processes_on_last_heartbeat_at" - t.index [ "name", "supervisor_id" ], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index [ "name" ], name: "index_solid_queue_processes_on_name", unique: true t.index [ "supervisor_id" ], name: "index_solid_queue_processes_on_supervisor_id" end diff --git a/lib/generators/solid_queue/update/templates/db/link_claimed_executions_with_processes_through_name.rb b/lib/generators/solid_queue/update/templates/db/link_claimed_executions_with_processes_through_name.rb new file mode 100644 index 00000000..439b9504 --- /dev/null +++ b/lib/generators/solid_queue/update/templates/db/link_claimed_executions_with_processes_through_name.rb @@ -0,0 +1,30 @@ +class LinkClaimedExecutionsWithProcessesThroughName < ActiveRecord::Migration[<%= ActiveRecord::VERSION::STRING.to_f %>] + def up + unless connection.column_exists?(:solid_queue_claimed_executions, :process_name) + add_column :solid_queue_claimed_executions, :process_name, :string + add_index :solid_queue_claimed_executions, :process_name + end + + unless connection.index_exists?(:solid_queue_processes, :name) + add_index :solid_queue_processes, :name, unique: true + end + + if connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + remove_index :solid_queue_processes, [ :name, :supervisor_id ] + end + end + + def down + if connection.column_exists?(:solid_queue_claimed_executions, :process_name) + remove_column :solid_queue_claimed_executions, :process_name + end + + if connection.index_exists?(:solid_queue_processes, :name) + remove_index :solid_queue_processes, :name + end + + unless connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + add_index :solid_queue_processes, [ :name, :supervisor_id ], unique: true + end + end +end diff --git a/lib/generators/solid_queue/update/update_generator.rb b/lib/generators/solid_queue/update/update_generator.rb new file mode 100644 index 00000000..272b0e3c --- /dev/null +++ b/lib/generators/solid_queue/update/update_generator.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require "rails/generators/active_record" + +class SolidQueue::UpdateGenerator < Rails::Generators::Base + include ActiveRecord::Generators::Migration + + source_root File.expand_path("templates", __dir__) + + class_option :database, type: :string, aliases: %i[ --db ], default: "queue", + desc: "The database that Solid Queue uses. Defaults to `queue`" + + def copy_new_migrations + template_dir = Dir.new(File.join(self.class.source_root, "db")) + + template_dir.each_child do |migration_file| + migration_template File.join("db", migration_file), File.join(db_migrate_path, migration_file), skip: true + end + end +end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e0d51c8c..2c49a689 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -69,6 +69,10 @@ def preserve_finished_jobs? preserve_finished_jobs end + def deprecator + @deprecator ||= ActiveSupport::Deprecation.new(next_major_version, "SolidQueue") + end + def instrument(channel, **options, &block) ActiveSupport::Notifications.instrument("#{channel}.solid_queue", **options, &block) end diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index d10997c7..7debf229 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -37,5 +37,9 @@ class Engine < ::Rails::Engine include ActiveJob::ConcurrencyControls end end + + initializer "solid_queue.deprecator" do |app| + app.deprecators[:solid_queue] = SolidQueue.deprecator + end end end diff --git a/lib/solid_queue/log_subscriber.rb b/lib/solid_queue/log_subscriber.rb index 96fb19bf..aeb9d038 100644 --- a/lib/solid_queue/log_subscriber.rb +++ b/lib/solid_queue/log_subscriber.rb @@ -8,7 +8,7 @@ def dispatch_scheduled(event) end def claim(event) - debug formatted_event(event, action: "Claim jobs", **event.payload.slice(:process_id, :job_ids, :claimed_job_ids, :size)) + debug formatted_event(event, action: "Claim jobs", **event.payload.slice(:process_id, :process_name, :job_ids, :claimed_job_ids, :size)) end def release_many_claimed(event) @@ -16,11 +16,11 @@ def release_many_claimed(event) end def fail_many_claimed(event) - warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids)) + warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids, :process_names)) end def release_claimed(event) - info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id)) + info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id, :process_name)) end def retry_all(event) diff --git a/lib/solid_queue/tasks.rb b/lib/solid_queue/tasks.rb index 91cd778b..f8d22fc2 100644 --- a/lib/solid_queue/tasks.rb +++ b/lib/solid_queue/tasks.rb @@ -4,7 +4,12 @@ Rails::Command.invoke :generate, [ "solid_queue:install" ] end - desc "start solid_queue supervisor to dispatch and process jobs" + desc "Update Solid Queue" + task :update do + Rails::Command.invoke :generate, [ "solid_queue:update" ] + end + + desc "Start Solid Queue supervisor to dispatch and process jobs" task start: :environment do SolidQueue::Supervisor.start end diff --git a/lib/solid_queue/version.rb b/lib/solid_queue/version.rb index 3de06d85..13917578 100644 --- a/lib/solid_queue/version.rb +++ b/lib/solid_queue/version.rb @@ -1,3 +1,7 @@ module SolidQueue VERSION = "1.2.0" + + def self.next_major_version + Gem::Version.new(VERSION).segments.first + 1 + end end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index e036a5fd..d426688c 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -38,7 +38,7 @@ def poll def claim_executions with_polling_volume do - SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process_id) + SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process) end end diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 231a211b..d7e2570e 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -10,11 +10,6 @@ Gem::Specification.new do |spec| spec.description = "Database-backed Active Job backend." spec.license = "MIT" - spec.post_install_message = <<~MESSAGE - Upgrading from Solid Queue < 1.0? Check details on breaking changes and upgrade instructions - --> https://github.com/rails/solid_queue/blob/main/UPGRADING.md - MESSAGE - spec.metadata["homepage_uri"] = spec.homepage spec.metadata["source_code_uri"] = "https://github.com/rails/solid_queue" diff --git a/test/dummy/db/queue_migrate/20250720172253_link_claimed_executions_with_processes_through_name.rb b/test/dummy/db/queue_migrate/20250720172253_link_claimed_executions_with_processes_through_name.rb new file mode 100644 index 00000000..7d6f93e7 --- /dev/null +++ b/test/dummy/db/queue_migrate/20250720172253_link_claimed_executions_with_processes_through_name.rb @@ -0,0 +1,30 @@ +class LinkClaimedExecutionsWithProcessesThroughName < ActiveRecord::Migration[7.1] + def up + unless connection.column_exists?(:solid_queue_claimed_executions, :process_name) + add_column :solid_queue_claimed_executions, :process_name, :string + add_index :solid_queue_claimed_executions, :process_name + end + + unless connection.index_exists?(:solid_queue_processes, :name) + add_index :solid_queue_processes, :name, unique: true + end + + if connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + remove_index :solid_queue_processes, [ :name, :supervisor_id ] + end + end + + def down + if connection.column_exists?(:solid_queue_claimed_executions, :process_name) + remove_column :solid_queue_claimed_executions, :process_name + end + + if connection.index_exists?(:solid_queue_processes, :name) + remove_index :solid_queue_processes, :name + end + + unless connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + add_index :solid_queue_processes, [ :name, :supervisor_id ], unique: true + end + end +end diff --git a/test/dummy/db/queue_schema.rb b/test/dummy/db/queue_schema.rb index 697c2e92..20b83084 100644 --- a/test/dummy/db/queue_schema.rb +++ b/test/dummy/db/queue_schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 1) do +ActiveRecord::Schema[7.1].define(version: 2025_07_20_172253) do create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.bigint "job_id", null: false t.string "queue_name", null: false @@ -27,8 +27,10 @@ t.bigint "job_id", null: false t.bigint "process_id" t.datetime "created_at", null: false + t.string "process_name" t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + t.index ["process_name"], name: "index_solid_queue_claimed_executions_on_process_name" end create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| @@ -72,7 +74,7 @@ t.datetime "created_at", null: false t.string "name", null: false t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" - t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index ["name"], name: "index_solid_queue_processes_on_name", unique: true t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" end diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 95dadb19..8aae8a3e 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -34,7 +34,7 @@ class InstrumentationTest < ActiveSupport::TestCase end assert_equal 1, events.size - assert_event events.first, "claim", process_id: process.id, job_ids: jobs.map(&:id), claimed_job_ids: jobs.map(&:id), size: 3 + assert_event events.first, "claim", process_id: process.id, process_name: process.name, job_ids: jobs.map(&:id), claimed_job_ids: jobs.map(&:id), size: 3 end test "polling emits events" do @@ -68,7 +68,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert_equal 2, events.size release_one_event, release_many_event = events - assert_event release_one_event, "release_claimed", job_id: SolidQueue::Job.last.id, process_id: process.id + assert_event release_one_event, "release_claimed", job_id: SolidQueue::Job.last.id, process_id: process.id, process_name: process.name assert_event release_many_event, "release_many_claimed", size: 1 end @@ -150,14 +150,14 @@ class InstrumentationTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } jobs = SolidQueue::Job.last(3) - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) events = subscribed(/fail.*_claimed\.solid_queue/) do SolidQueue::Process.prune end assert_equal 1, events.count - assert_event events.first, "fail_many_claimed", process_ids: [ process.id ], job_ids: jobs.map(&:id), size: 3 + assert_event events.first, "fail_many_claimed", process_ids: [ process.id ], process_names: [ process.name ], job_ids: jobs.map(&:id), size: 3 end test "errors when deregistering processes are included in deregister_process events" do diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index ebb1100c..f7383cfd 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -271,7 +271,7 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase private def assert_clean_termination - wait_for_registered_processes 0, timeout: 0.2.second + wait_for_registered_processes 0, timeout: 0.5.second assert_no_registered_processes assert_no_claimed_jobs assert_not process_exists?(@pid) diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 98513c94..5dfe69b3 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -87,7 +87,7 @@ def prepare_and_claim_job(active_job, process: @process) job.prepare_for_execution assert_difference -> { SolidQueue::ClaimedExecution.count } => +1 do - SolidQueue::ReadyExecution.claim(job.queue_name, 1, process.id) + SolidQueue::ReadyExecution.claim(job.queue_name, 1, process) end SolidQueue::ClaimedExecution.last diff --git a/test/models/solid_queue/process_test.rb b/test/models/solid_queue/process_test.rb index 489b2aca..e81a848e 100644 --- a/test/models/solid_queue/process_test.rb +++ b/test/models/solid_queue/process_test.rb @@ -20,7 +20,7 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } jobs = SolidQueue::Job.last(3) - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) travel_to 10.minutes.from_now @@ -40,7 +40,7 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } jobs = SolidQueue::Job.last(3) - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) travel_to 10.minutes.from_now diff --git a/test/models/solid_queue/ready_execution_test.rb b/test/models/solid_queue/ready_execution_test.rb index dd9269ca..6c712598 100644 --- a/test/models/solid_queue/ready_execution_test.rb +++ b/test/models/solid_queue/ready_execution_test.rb @@ -6,12 +6,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.set(queue: "backend", priority: 5 - i).perform_later(i) end + @process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123", metadata: { queue: "background" }) @jobs = SolidQueue::Job.where(queue_name: "backend").order(:priority) end test "claim all jobs for existing queue" do assert_claimed_jobs(@jobs.count) do - SolidQueue::ReadyExecution.claim("backend", @jobs.count + 1, 42) + SolidQueue::ReadyExecution.claim("backend", @jobs.count + 1, @process) end @jobs.each(&:reload) @@ -21,13 +22,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase test "claim jobs for queue without jobs at the moment" do assert_no_difference [ -> { SolidQueue::ReadyExecution.count }, -> { SolidQueue::ClaimedExecution.count } ] do - SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10, 42) + SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10, @process) end end test "claim some jobs for existing queue" do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim("backend", 2, 42) + SolidQueue::ReadyExecution.claim("backend", 2, @process) end @jobs.first(2).each do |job| @@ -45,7 +46,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(6) do - SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, @process) end end @@ -53,7 +54,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(6) do - SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, @process) end end @@ -61,7 +62,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(1) do - SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, @process) end assert @jobs.none?(&:claimed?) @@ -73,7 +74,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase SolidQueue::Queue.find_by_name("backend").pause assert_claimed_jobs(1) do - SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, @process) end @jobs.each(&:reload) @@ -84,7 +85,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(6) do - SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, @process) end end @@ -92,7 +93,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(0) do - SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, @process) end end @@ -101,7 +102,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase job = SolidQueue::Job.last assert_claimed_jobs(3) do - SolidQueue::ReadyExecution.claim("*", 3, 42) + SolidQueue::ReadyExecution.claim("*", 3, @process) end assert job.reload.claimed? @@ -117,7 +118,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase assert_equal "background", job.queue_name assert_claimed_jobs(3) do - SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42) + SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, @process) end assert job.reload.claimed? @@ -136,7 +137,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase claimed_jobs = [] 4.times do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, 42) + SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, @process) end claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job) @@ -157,7 +158,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase claimed_jobs = [] 5.times do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, 42) + SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, @process) end claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job) diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 7a531ad2..7ef568b4 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -111,7 +111,7 @@ class SupervisorTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123") - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) assert_equal 3, SolidQueue::ClaimedExecution.count assert_equal 0, SolidQueue::ReadyExecution.count @@ -138,7 +138,7 @@ class SupervisorTest < ActiveSupport::TestCase 4.times { |i| ThrottledUpdateResultJob.set(queue: :new_queue).perform_later(result) } process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123") - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) assert_equal 3, SolidQueue::ClaimedExecution.count assert_equal 0, SolidQueue::ReadyExecution.count @@ -193,7 +193,7 @@ class SupervisorTest < ActiveSupport::TestCase worker_process = SolidQueue::Process.register(kind: "Worker", pid: 999_999, name: worker_name) job = StoreResultJob.perform_later(42) - claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process.id).first + claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process).first terminated_fork = Struct.new(:name).new(worker_name)