Skip to content

Link claimed_executions to processes via process_name #601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ AllCops:
TargetRubyVersion: 3.3
Exclude:
- "**/*_schema.rb"
- "lib/generators/solid_queue/update/templates/db/*"
19 changes: 18 additions & 1 deletion UPGRADING.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
# Upgrading to version 1.x
# Upgrading to version 1.3
There's a new migration in this version that can be installed via:
```bash
bin/rails solid_queue:update
```
which is a new generator to facilitate updates. This will use the `queue` database by default, but if you're using a different database name for Solid Queue, you can install the new migrations in the right place with:
```bash
DATABASE=your-solid-queue-db-name bin/rails solid_queue:update
```

Finally, the migration needs to be run with:
```bash
bin/rails db:migrate
```

The migration affects the tables `solid_queue_claimed_executions` and `solid_queue_processes` tables. It's not mandatory: everything will continue working as before without it, only a deprecation warning will be emitted. The migration will be mandatory in the next major version (2.0).

# Upgrading to version >=1.0, < 1.3
The value returned for `enqueue_after_transaction_commit?` has changed to `true`, and it's no longer configurable. If you want to change this, you need to use Active Job's configuration options.

# Upgrading to version 0.9.x
Expand Down
30 changes: 23 additions & 7 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
# frozen_string_literal: true

class SolidQueue::ClaimedExecution < SolidQueue::Execution
belongs_to :process
def self.process_name_column_exists?
column_names.include?("process_name")
end

if process_name_column_exists?
belongs_to :process, primary_key: :name, foreign_key: :process_name
else
warn_about_pending_migrations

belongs_to :process
attr_accessor :process_name
end

scope :orphaned, -> { where.missing(:process) }

Expand All @@ -12,12 +23,16 @@ def success?
end

class << self
def claiming(job_ids, process_id, &block)
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }
def claiming(job_ids, process, &block)
process_data = { process_id: process.id }.tap do |hsh|
hsh[:process_name] = process.name if process_name_column_exists?
end

job_data = Array(job_ids).collect { |job_id| { job_id: job_id }.merge(process_data) }

SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload|
SolidQueue.instrument(:claim, job_ids: job_ids, **process_data) do |payload|
insert_all!(job_data)
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
where(job_id: job_ids, process_id: process.id).load.tap do |claimed|
block.call(claimed)

payload[:size] = claimed.size
Expand All @@ -44,7 +59,8 @@ def fail_all_with(error)
execution.unblock_next_job
end

payload[:process_ids] = executions.map(&:process_id).uniq
payload[:process_ids] = executions.map(&:process_id).uniq.presence
payload[:process_names] = executions.map(&:process_name).uniq.presence
payload[:job_ids] = executions.map(&:job_id).uniq
payload[:size] = executions.size
end
Expand Down Expand Up @@ -74,7 +90,7 @@ def perform
end

def release
SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do
SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id, process_name: process_name) do
transaction do
job.dispatch_bypassing_concurrency_limits
destroy!
Expand Down
8 changes: 7 additions & 1 deletion app/models/solid_queue/process/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ module Executor
extend ActiveSupport::Concern

included do
has_many :claimed_executions
if ClaimedExecution.process_name_column_exists?
has_many :claimed_executions, primary_key: :name, foreign_key: :process_name
else
warn_about_pending_migrations

has_many :claimed_executions
end

after_destroy :release_all_claimed_executions
end
Expand Down
12 changes: 6 additions & 6 deletions app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ class ReadyExecution < Execution
assumes_attributes_from_job

class << self
def claim(queue_list, limit, process_id)
def claim(queue_list, limit, process)
QueueSelector.new(queue_list, self).scoped_relations.flat_map do |queue_relation|
select_and_lock(queue_relation, process_id, limit).tap do |locked|
select_and_lock(queue_relation, process, limit).tap do |locked|
limit -= locked.size
end
end
Expand All @@ -20,23 +20,23 @@ def aggregated_count_across(queue_list)
end

private
def select_and_lock(queue_relation, process_id, limit)
def select_and_lock(queue_relation, process, limit)
return [] if limit <= 0

transaction do
candidates = select_candidates(queue_relation, limit)
lock_candidates(candidates, process_id)
lock_candidates(candidates, process)
end
end

def select_candidates(queue_relation, limit)
queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id)
end

def lock_candidates(executions, process_id)
def lock_candidates(executions, process)
return [] if executions.none?

SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process_id) do |claimed|
SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process) do |claimed|
ids_to_delete = executions.index_by(&:job_id).values_at(*claimed.map(&:job_id)).map(&:id)
where(id: ids_to_delete).delete_all
end
Expand Down
24 changes: 19 additions & 5 deletions app/models/solid_queue/record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,25 @@ class Record < ActiveRecord::Base

connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to

def self.non_blocking_lock
if SolidQueue.use_skip_locked
lock(Arel.sql("FOR UPDATE SKIP LOCKED"))
else
lock
class << self
def non_blocking_lock
if SolidQueue.use_skip_locked
lock(Arel.sql("FOR UPDATE SKIP LOCKED"))
else
lock
end
end

def warn_about_pending_migrations
SolidQueue.deprecator.warn(<<~DEPRECATION)
Solid Queue has pending database migrations. To get the new migration files, run:
rails solid_queue:update
which will install the migration under `db/queue_migrate`. To change the database, run
DATABASE=your-solid-queue-db rails solid_queue:update
Then, apply the migrations with:
rails db:migrate
These migrations will be required after version 2.0
DEPRECATION
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
t.bigint "job_id", null: false
t.bigint "process_id"
t.datetime "created_at", null: false
t.string "process_name"
t.index [ "job_id" ], name: "index_solid_queue_claimed_executions_on_job_id", unique: true
t.index [ "process_id", "job_id" ], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id"
t.index [ "process_name" ], name: "index_solid_queue_claimed_executions_on_process_name"
end

create_table "solid_queue_failed_executions", force: :cascade do |t|
Expand Down Expand Up @@ -60,7 +62,7 @@
t.datetime "created_at", null: false
t.string "name", null: false
t.index [ "last_heartbeat_at" ], name: "index_solid_queue_processes_on_last_heartbeat_at"
t.index [ "name", "supervisor_id" ], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true
t.index [ "name" ], name: "index_solid_queue_processes_on_name", unique: true
t.index [ "supervisor_id" ], name: "index_solid_queue_processes_on_supervisor_id"
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
class LinkClaimedExecutionsWithProcessesThroughName < ActiveRecord::Migration[<%= ActiveRecord::VERSION::STRING.to_f %>]
def up
unless connection.column_exists?(:solid_queue_claimed_executions, :process_name)
add_column :solid_queue_claimed_executions, :process_name, :string
add_index :solid_queue_claimed_executions, :process_name
end
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This migration is idempotent because it is already included in the setup schema, for people who are installing Solid Queue for the first time. If there's a new update in the future with a new migration, people should be able to run all of them without any issues.


unless connection.index_exists?(:solid_queue_processes, :name)
add_index :solid_queue_processes, :name, unique: true
end

if connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ])
remove_index :solid_queue_processes, [ :name, :supervisor_id ]
end
end

def down
if connection.column_exists?(:solid_queue_claimed_executions, :process_name)
remove_column :solid_queue_claimed_executions, :process_name
end

if connection.index_exists?(:solid_queue_processes, :name)
remove_index :solid_queue_processes, :name
end

unless connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ])
add_index :solid_queue_processes, [ :name, :supervisor_id ], unique: true
end
end
end
20 changes: 20 additions & 0 deletions lib/generators/solid_queue/update/update_generator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

require "rails/generators/active_record"

class SolidQueue::UpdateGenerator < Rails::Generators::Base
include ActiveRecord::Generators::Migration

source_root File.expand_path("templates", __dir__)

class_option :database, type: :string, aliases: %i[ --db ], default: "queue",
desc: "The database that Solid Queue uses. Defaults to `queue`"

def copy_new_migrations
template_dir = Dir.new(File.join(self.class.source_root, "db"))

template_dir.each_child do |migration_file|
migration_template File.join("db", migration_file), File.join(db_migrate_path, migration_file), skip: true
end
end
end
4 changes: 4 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ def preserve_finished_jobs?
preserve_finished_jobs
end

def deprecator
@deprecator ||= ActiveSupport::Deprecation.new(next_major_version, "SolidQueue")
end

def instrument(channel, **options, &block)
ActiveSupport::Notifications.instrument("#{channel}.solid_queue", **options, &block)
end
Expand Down
4 changes: 4 additions & 0 deletions lib/solid_queue/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,9 @@ class Engine < ::Rails::Engine
include ActiveJob::ConcurrencyControls
end
end

initializer "solid_queue.deprecator" do |app|
app.deprecators[:solid_queue] = SolidQueue.deprecator
end
end
end
6 changes: 3 additions & 3 deletions lib/solid_queue/log_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ def dispatch_scheduled(event)
end

def claim(event)
debug formatted_event(event, action: "Claim jobs", **event.payload.slice(:process_id, :job_ids, :claimed_job_ids, :size))
debug formatted_event(event, action: "Claim jobs", **event.payload.slice(:process_id, :process_name, :job_ids, :claimed_job_ids, :size))
end

def release_many_claimed(event)
info formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
end

def fail_many_claimed(event)
warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids))
warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids, :process_names))
end

def release_claimed(event)
info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id, :process_name))
end

def retry_all(event)
Expand Down
7 changes: 6 additions & 1 deletion lib/solid_queue/tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
Rails::Command.invoke :generate, [ "solid_queue:install" ]
end

desc "start solid_queue supervisor to dispatch and process jobs"
desc "Update Solid Queue"
task :update do
Rails::Command.invoke :generate, [ "solid_queue:update" ]
end

desc "Start Solid Queue supervisor to dispatch and process jobs"
task start: :environment do
SolidQueue::Supervisor.start
end
Expand Down
4 changes: 4 additions & 0 deletions lib/solid_queue/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
module SolidQueue
VERSION = "1.2.0"

def self.next_major_version
Gem::Version.new(VERSION).segments.first + 1
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def poll

def claim_executions
with_polling_volume do
SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process_id)
SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process)
end
end

Expand Down
5 changes: 0 additions & 5 deletions solid_queue.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ Gem::Specification.new do |spec|
spec.description = "Database-backed Active Job backend."
spec.license = "MIT"

spec.post_install_message = <<~MESSAGE
Upgrading from Solid Queue < 1.0? Check details on breaking changes and upgrade instructions
--> https://github.com/rails/solid_queue/blob/main/UPGRADING.md
MESSAGE

spec.metadata["homepage_uri"] = spec.homepage
spec.metadata["source_code_uri"] = "https://github.com/rails/solid_queue"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
class LinkClaimedExecutionsWithProcessesThroughName < ActiveRecord::Migration[7.1]
def up
unless connection.column_exists?(:solid_queue_claimed_executions, :process_name)
add_column :solid_queue_claimed_executions, :process_name, :string
add_index :solid_queue_claimed_executions, :process_name
end

unless connection.index_exists?(:solid_queue_processes, :name)
add_index :solid_queue_processes, :name, unique: true
end

if connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ])
remove_index :solid_queue_processes, [ :name, :supervisor_id ]
end
end

def down
if connection.column_exists?(:solid_queue_claimed_executions, :process_name)
remove_column :solid_queue_claimed_executions, :process_name
end

if connection.index_exists?(:solid_queue_processes, :name)
remove_index :solid_queue_processes, :name
end

unless connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ])
add_index :solid_queue_processes, [ :name, :supervisor_id ], unique: true
end
end
end
6 changes: 4 additions & 2 deletions test/dummy/db/queue_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 1) do
ActiveRecord::Schema[7.1].define(version: 2025_07_20_172253) do
create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
Expand All @@ -27,8 +27,10 @@
t.bigint "job_id", null: false
t.bigint "process_id"
t.datetime "created_at", null: false
t.string "process_name"
t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true
t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id"
t.index ["process_name"], name: "index_solid_queue_claimed_executions_on_process_name"
end

create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
Expand Down Expand Up @@ -72,7 +74,7 @@
t.datetime "created_at", null: false
t.string "name", null: false
t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at"
t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true
t.index ["name"], name: "index_solid_queue_processes_on_name", unique: true
t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id"
end

Expand Down
Loading
Loading