Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
da65624
feat: implement backend configuration and lifecycle management
niteshpurohit May 3, 2026
9023ef7
feat: enhance backend configuration and lifecycle management
niteshpurohit May 3, 2026
4960e22
refactor: update validate_backend_instance signature
niteshpurohit May 3, 2026
1d76307
feat: enhance backend configuration validation and lifecycle
niteshpurohit May 3, 2026
26c0757
feat: enhance backend configuration and worker settings
niteshpurohit May 3, 2026
e9863d6
feat: enhance backend configuration validation
niteshpurohit May 3, 2026
e04973a
feat: enhance backend configuration and immutability
niteshpurohit May 4, 2026
20c9b76
feat: enhance backend validation and immutability
niteshpurohit May 4, 2026
ec7b508
feat: enhance backend option validation
niteshpurohit May 4, 2026
b36bacc
feat: enhance backend bootstrapping and error handling
niteshpurohit May 4, 2026
e225d16
feat: validate backend options and improve error handling
niteshpurohit May 4, 2026
50bcc7c
feat: enhance backend configuration and validation
niteshpurohit May 4, 2026
7a82d40
feat: enhance backend option validation and error handling
niteshpurohit May 5, 2026
b401776
feat: enhance queue store factory option validation
niteshpurohit May 5, 2026
de0e5f1
feat: improve backend bootstrapping and error handling
niteshpurohit May 5, 2026
0d1251b
feat: enhance backend option validation and configuration
niteshpurohit May 5, 2026
7df0378
feat: enhance backend configuration and validation
niteshpurohit May 5, 2026
8159947
feat: enhance queue store class validation
niteshpurohit May 5, 2026
418b7d6
feat: enhance backend configuration and class interface
niteshpurohit May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions core/karya/lib/karya/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# LICENSE file in the root directory of this source tree.

require_relative 'internal/null_logger'
require_relative 'primitives/callable'

# Karya module serves as the namespace for all classes and modules related to the Karya gem.
module Karya
Expand Down Expand Up @@ -50,16 +51,209 @@ def configure_queue_store(queue_store)
@queue_store = queue_store
end

def configure_backend(backend_class, **options)
load_backend_support
validate_backend_class(backend_class)
validate_backend_options(options)
@backend_class = backend_class
@backend_options = immutable_backend_option_value(options)
@backend_class
Comment thread
niteshpurohit marked this conversation as resolved.
end
Comment thread
niteshpurohit marked this conversation as resolved.

def logger
return @logger if defined?(@logger) && @logger

Internal::NullLogger.new
end

def backend_class
if defined?(@backend_class) && @backend_class
load_backend_support
return validate_backend_class(@backend_class)
end

nil
end

def backend_options
if defined?(@backend_options) && @backend_options
load_backend_support
validate_backend_options(@backend_options)
return duplicate_backend_option_value(@backend_options)
end

{}
end

def queue_store
return @queue_store if defined?(@queue_store) && @queue_store

raise MissingQueueStoreConfigurationError, 'Karya.queue_store must be configured before starting a worker'
end

private

def load_backend_support
require_relative 'backend'
require_relative 'queue_store/base'
require_relative 'backpressure'
require_relative 'circuit_breaker'
require_relative 'fairness'
end

def validate_backend_class(backend_class)
raise InvalidBackendConfigurationError, 'configured backend class must be a Class' unless backend_class.is_a?(Class)
raise InvalidBackendConfigurationError, 'configured backend class must include Karya::Backend::Base' unless backend_class <= Karya::Backend::Base

Comment thread
niteshpurohit marked this conversation as resolved.
backend_class
end

def validate_backend_options(options)
unless options.is_a?(Hash)
raise InvalidBackendConfigurationError,
"configured backend options must be a Hash: #{options.class}"
end

options.each do |name, value|
validate_backend_option_key(name)
validate_backend_option(name, value)
end

options
end
Comment thread
niteshpurohit marked this conversation as resolved.

Comment thread
niteshpurohit marked this conversation as resolved.
def validate_backend_option(name, value)
return if valid_backend_option_value?(name, value)

raise InvalidBackendConfigurationError,
"configured backend option #{name.inspect} has unsupported value #{value.class}"
end

def valid_backend_option_value?(name, value)
return queue_store_factory_option?(name, value) if name == :queue_store_class

case value
when NilClass, TrueClass, FalseClass, Numeric, String, Symbol,
Class, QueueStore::Base, Backpressure::PolicySet, CircuitBreaker::PolicySet,
Fairness::Policy
true
Comment thread
niteshpurohit marked this conversation as resolved.
Comment thread
niteshpurohit marked this conversation as resolved.
when Array
valid_backend_option_array?(name, value)
when Hash
valid_backend_option_hash?(name, value)
else
generic_backend_option?(name, value)
end
Comment thread
niteshpurohit marked this conversation as resolved.
end

def valid_backend_option_array?(name, value)
value.all? { |item| valid_backend_option_value?(name, item) }
end

def valid_backend_option_hash?(name, value)
value.all? { |key, item| valid_backend_option_key?(key) && valid_backend_option_value?(name, item) }
end

def valid_backend_option_key?(key)
key.is_a?(Symbol) || key.is_a?(String)
end

def validate_backend_option_key(key)
return if key.is_a?(Symbol)

raise InvalidBackendConfigurationError,
"configured backend option keys must be Symbols: #{key.inspect}"
end

def queue_store_factory_option?(name, value)
name == :queue_store_class && value.is_a?(Class) && valid_queue_store_factory_class?(value)
end

def valid_queue_store_factory_class?(value)
value < QueueStore::Base
rescue StandardError
false
end

def generic_callable_option?(name, value)
Primitives::Callable.new(name, value, error_class: InvalidBackendConfigurationError).normalize
true
rescue InvalidBackendConfigurationError
false
end

def generic_backend_option?(name, value)
return false if name == :queue_store_class

generic_callable_option?(name, value)
end

def immutable_backend_option_value(value)
case value
when Array
immutable_backend_option_array(value)
when Hash
immutable_backend_option_hash(value)
when String
immutable_backend_option_string(value)
else
value
end
end
Comment thread
niteshpurohit marked this conversation as resolved.

def duplicate_backend_option_value(value)
case value
when Array
duplicate_backend_option_array(value)
when Hash
duplicate_backend_option_hash(value)
when String
duplicate_backend_option_string(value)
else
value
end
end

def immutable_backend_option_array(value)
value.map { |item| immutable_backend_option_value(item) }.freeze
end

def immutable_backend_option_hash(value)
value.each_with_object({}) do |(key, item), snapshot|
snapshot[immutable_backend_option_key(key)] = immutable_backend_option_value(item)
end.freeze
end

def duplicate_backend_option_array(value)
value.map { |item| duplicate_backend_option_value(item) }
end

def duplicate_backend_option_hash(value)
value.each_with_object({}) do |(key, item), duplicated|
duplicated[duplicate_backend_option_key(key)] = duplicate_backend_option_value(item)
end
end

def immutable_backend_option_key(key)
return immutable_backend_option_string(key) if key.is_a?(String)

key
end

def duplicate_backend_option_key(key)
return duplicate_backend_option_string(key) if key.is_a?(String)

key
end

def immutable_backend_option_string(value)
return value if value.frozen?

value.dup.freeze
end

def duplicate_backend_option_string(value)
value.dup
end
end
end
17 changes: 11 additions & 6 deletions core/karya/lib/karya/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
require_relative 'version'
require_relative 'worker'
require_relative 'worker_supervisor'
require_relative 'cli/backend_boot'
require_relative 'cli/config_builder'
require_relative 'cli/integer_option'
require_relative 'cli/env_prefix'
Expand Down Expand Up @@ -75,21 +76,25 @@ def help(command = nil)
method_option :stop_when_idle, type: :boolean, default: false
def worker(*queues)
load_required_files(options.fetch(:require))
supervisor = Karya::WorkerSupervisor.new(**build_worker_configuration(queues))
worker_settings = build_worker_settings(queues)
backend, queue_store = BackendBoot.resolve
worker_configuration = worker_settings.merge(queue_store:)

status = supervisor.run
status = BackendBoot.run_with_lifecycle(backend, queue_store) do
supervisor = Karya::WorkerSupervisor.new(**worker_configuration)
supervisor.run
end
exit(status) if status.positive?
end

desc 'runtime SUBCOMMAND ...ARGS', 'Inspect or control a running supervisor via the local runtime state file'
subcommand 'runtime', RuntimeCommand

no_commands do
def build_worker_configuration(queues)
ConfigBuilder.build(
def build_worker_settings(queues)
ConfigBuilder.build_settings(
options:,
queues:,
queue_store: Karya.queue_store,
defaults: {
processes: Karya::WorkerSupervisor::DEFAULT_PROCESSES,
threads: Karya::WorkerSupervisor::DEFAULT_THREADS
Expand Down Expand Up @@ -130,6 +135,6 @@ def normalize_env_prefix_option(option_name)

# Logger and instrumenter globals are process-wide defaults.
# Pass explicit runtime collaborators when multiple isolated runtimes share a process.
private_constant :ConfigBuilder, :EnvPrefix, :HandlerParser, :IntegerOption, :MappingEntry, :SignalSubscription
private_constant :BackendBoot, :ConfigBuilder, :EnvPrefix, :HandlerParser, :IntegerOption, :MappingEntry, :SignalSubscription
end
end
85 changes: 85 additions & 0 deletions core/karya/lib/karya/cli/backend_boot.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# frozen_string_literal: true

# Copyright Codevedas Inc. 2025-present
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

require 'English'

module Karya
class CLI < Thor
# Resolves configured backend settings into a queue store and wraps backend
# lifecycle hooks around worker boot.
class BackendBoot
class << self
def resolve
return [nil, Karya.queue_store] unless Karya.backend_class

backend = instantiate
[backend, build_queue_store(backend)]
end

def run_with_lifecycle(backend, queue_store, &)
return yield unless backend

execute_with_lifecycle(backend, queue_store, &)
end
Comment thread
niteshpurohit marked this conversation as resolved.

private

def instantiate
backend_class = Karya.backend_class
backend = backend_class.new(**Karya.backend_options)
return backend if backend.is_a?(Karya::Backend::Base)

raise Karya::InvalidBackendConfigurationError,
"#{backend_class} must instantiate a backend including Karya::Backend::Base"
rescue ArgumentError, TypeError => e
raise Karya::InvalidBackendConfigurationError,
"configured backend class #{backend_class} could not be initialized: #{e.message}"
end

def build_queue_store(backend)
queue_store = backend.build_queue_store
return queue_store if queue_store.is_a?(Karya::QueueStore::Base)

raise Karya::InvalidBackendConfigurationError,
"#{backend.class} must build a Karya::QueueStore::Base"
end

def execute_with_lifecycle(backend, queue_store)
started = false
suppress_cleanup_error = false
result = nil

begin
backend.before_start(queue_store:)
started = true
result = yield
rescue StandardError, SignalException, SystemExit
suppress_cleanup_error = true
raise
ensure
suppress_cleanup_error ||= $ERROR_INFO.is_a?(Exception)
finish_lifecycle(backend, queue_store, started:, suppress_cleanup_error:, result:)
Comment thread
niteshpurohit marked this conversation as resolved.
end
Comment thread
niteshpurohit marked this conversation as resolved.

result
end

def finish_lifecycle(backend, queue_store, started:, suppress_cleanup_error:, result:)
return unless started

backend.after_stop(queue_store:)
rescue StandardError
raise unless suppress_cleanup_error || positive_status_result?(result)
end

def positive_status_result?(result)
result.is_a?(Numeric) && result.positive?
end
end
end
end
end
12 changes: 7 additions & 5 deletions core/karya/lib/karya/cli/config_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,23 @@ class CLI < Thor
# Normalizes CLI worker boot options into a supervisor configuration hash.
class ConfigBuilder
def self.build(options:, queues:, queue_store:, defaults:, helpers:)
new(options:, queues:, queue_store:, defaults:, helpers:).build
build_settings(options:, queues:, defaults:, helpers:).merge(queue_store:)
end

def initialize(options:, queues:, queue_store:, defaults:, helpers:)
def self.build_settings(options:, queues:, defaults:, helpers:)
new(options:, queues:, defaults:, helpers:).build
end

def initialize(options:, queues:, defaults:, helpers:)
@options = options
@queues = queues
@queue_store = queue_store
@defaults = defaults
@helpers = helpers
end

def build
env_prefix = helpers.fetch(:normalize_env_prefix_option).call(:env_prefix)
{
queue_store:,
**resolved_process_settings(env_prefix),
state_file: options.fetch(:state_file, nil),
worker_id: options.fetch(:worker_id),
Expand All @@ -41,7 +43,7 @@ def build

private

attr_reader :defaults, :helpers, :options, :queue_store, :queues
attr_reader :defaults, :helpers, :options, :queues

def resolved_process_settings(env_prefix)
resolve_positive_integer_option = helpers.fetch(:resolve_positive_integer_option)
Expand Down
Loading
Loading