From 2a8409989c62597b0bf13e0a015aee6149b09e2f Mon Sep 17 00:00:00 2001 From: Mike Rotondo Date: Thu, 25 Sep 2025 14:57:19 -0700 Subject: [PATCH] use number of workers to control concurrency of mef jobs, and number of threads to control concurrency of webhook callback jobs --- app/jobs/mef/mef_job.rb | 1 + app/jobs/webhook_callback_job.rb | 2 ++ config/database.yml | 2 +- config/queue.yml | 14 ++++++++++++-- db/queue_schema.rb | 17 ++++++++++++++++- 5 files changed, 32 insertions(+), 4 deletions(-) diff --git a/app/jobs/mef/mef_job.rb b/app/jobs/mef/mef_job.rb index 7162928..4de8df3 100644 --- a/app/jobs/mef/mef_job.rb +++ b/app/jobs/mef/mef_job.rb @@ -2,6 +2,7 @@ module Mef class MefJob < ApplicationJob attr_accessor :webhook_url, :api_request_id, :mef_credentials + queue_as :mef retry_on MefService::RetryableError after_discard do |job, exception| diff --git a/app/jobs/webhook_callback_job.rb b/app/jobs/webhook_callback_job.rb index 7d6e4de..54982b3 100644 --- a/app/jobs/webhook_callback_job.rb +++ b/app/jobs/webhook_callback_job.rb @@ -1,4 +1,6 @@ class WebhookCallbackJob < ApplicationJob + queue_as :webhook_callback + def perform(api_request_id, webhook_url, payload) payload_with_api_request_id = payload.merge({api_request_id:}) uri = URI.parse(webhook_url) diff --git a/config/database.yml b/config/database.yml index 491bc27..000f310 100644 --- a/config/database.yml +++ b/config/database.yml @@ -17,7 +17,7 @@ default: &default encoding: unicode # For details on connection pooling, see Rails configuration guide # https://guides.rubyonrails.org/configuring.html#database-pooling - pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 5 } %> + pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 15 } %> development: diff --git a/config/queue.yml b/config/queue.yml index 658805a..0d31ec0 100644 --- a/config/queue.yml +++ b/config/queue.yml @@ -3,9 +3,16 @@ default: &default - polling_interval: .1 batch_size: 500 workers: - - queues: "*" + - queues: mef + # We are constrained by MeF to 5 concurrent connections, so we use a limit of 5 single-threaded workers threads: 1 - processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %> + processes: 5 + polling_interval: 0.1 + - queues: webhook_callbacks + # We want our callbacks to not be blocked by each other, but we know they are threadsafe so we use a multi-threaded + # worker with the same number of threads as there are mef queue workers (in case they all trigger webhooks simultaneously) + threads: 5 + processes: 1 polling_interval: 0.1 development: @@ -14,5 +21,8 @@ development: test: <<: *default +demo: + <<: *default + production: <<: *default diff --git a/db/queue_schema.rb b/db/queue_schema.rb index 4b2cfdb..089e938 100644 --- a/db/queue_schema.rb +++ b/db/queue_schema.rb @@ -1,4 +1,19 @@ -ActiveRecord::Schema[7.1].define(version: 1) do +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[8.0].define(version: 1) do + # These are extensions that must be enabled in order to support this database + enable_extension "pg_catalog.plpgsql" + create_table "solid_queue_blocked_executions", force: :cascade do |t| t.bigint "job_id", null: false t.string "queue_name", null: false