Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/instructions/core-karya-rbs.instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ RBS in this repository is a correctness contract.
`private`, mirror that visibility change in the same RBS patch.
- Match argument names, keyword names, optionality, return types, and nested
module/class structure.
- Match the full accepted input surface before normalization. If Ruby accepts
aliases, alternative casing, delimiter variants, or `nil` before rejecting or
normalizing, model that accepted input in the RBS instead of only the
normalized canonical value.
- Do not collapse shared contracts to one concrete implementation's keyword
surface. Shared interfaces should model only the common contract they truly
guarantee, not adapter-local boot options copied from the first
implementation.
- Do not use broad keyword maps to dodge exactness. If Ruby requires specific
keys, rejects unknown keys, or distinguishes one optional keyword from
another, encode that explicitly instead of using a generic catch-all keyword
shape.
- Remove stale entries for deleted methods, constants, and modules.
- Do not use `untyped`, `any`, or other generic escape hatches where a concrete
type is knowable.
Expand Down
10 changes: 10 additions & 0 deletions .github/instructions/review.instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@ For any Ruby change that has a mirrored file under `sig/`:
- **Required:** Treat spec-layout drift as a review concern when extracted
owner-local files leave all direct behavior buried only in a monolithic owner
spec.
- **Required:** Mirror the accepted input surface, not just the normalized
output surface. If Ruby accepts aliases, mixed casing, delimiter variants, or
broader nilability before normalization, the RBS input type must reflect
that same accepted set.
- **Required:** Flag signatures that overfit one implementation while claiming
to model a shared contract. If a shared base/module type is narrowed to one
concrete adapter's keyword set or return posture, treat that as contract
drift even when the current implementation still passes tests.
- **Required:** Flag generic keyword or catch-all type shapes that hide runtime
rules about required keys, rejected keys, or known option names.

## Architecture Guidelines Review

Expand Down
1 change: 1 addition & 0 deletions core/karya/lib/karya.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
require_relative 'karya/outbound_events'
require_relative 'karya/reservation'
require_relative 'karya/queue_store'
require_relative 'karya/backend'
require_relative 'karya/workflow'
require_relative 'karya/constant_resolver'
require_relative 'karya/worker'
Expand Down
17 changes: 17 additions & 0 deletions core/karya/lib/karya/backend.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

# Copyright Codevedas Inc. 2025-present
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

module Karya
# Raised when backend configuration is invalid.
class InvalidBackendConfigurationError < Error; end

# Namespace for backend interface and lifecycle contracts.
module Backend
autoload :Base, 'karya/backend/base'
autoload :InMemory, 'karya/backend/in_memory'
Comment thread
niteshpurohit marked this conversation as resolved.
end
end
31 changes: 31 additions & 0 deletions core/karya/lib/karya/backend/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

# Copyright Codevedas Inc. 2025-present
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

module Karya
module Backend
# Shared backend contract above the queue-store persistence API.
module Base
def identifier
raise NotImplementedError, "#{self.class} must implement ##{__method__}"
end

def build_queue_store
raise NotImplementedError, "#{self.class} must implement ##{__method__}"
end

def before_start(queue_store:)
_queue_store = queue_store
nil
end

def after_stop(queue_store:)
Comment thread
niteshpurohit marked this conversation as resolved.
_queue_store = queue_store
nil
end
end
end
end
56 changes: 56 additions & 0 deletions core/karya/lib/karya/backend/in_memory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true

# Copyright Codevedas Inc. 2025-present
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

require_relative '../base'
require_relative '../backend'
require_relative '../queue_store/in_memory'

module Karya
module Backend
# Quick-start backend wrapper around the single-process reference queue store.
class InMemory
include Base

UNSET = Object.new.freeze
Comment thread
niteshpurohit marked this conversation as resolved.
private_constant :UNSET

def initialize(queue_store_class: QueueStore::InMemory)
@identifier = 'in_memory'
@queue_store_class = queue_store_class
end

attr_reader :identifier

def build_queue_store(
token_generator: UNSET,
expired_tombstone_limit: UNSET,
completed_batch_retention_limit: UNSET,
max_batch_size: UNSET,
policy_set: UNSET,
circuit_breaker_policy_set: UNSET,
fairness_policy: UNSET
)
queue_store = queue_store_class.new(**{
token_generator:,
expired_tombstone_limit:,
completed_batch_retention_limit:,
max_batch_size:,
policy_set:,
circuit_breaker_policy_set:,
fairness_policy:
}.reject { |_name, value| value.equal?(UNSET) })
return queue_store if queue_store.is_a?(QueueStore::Base)

raise InvalidBackendConfigurationError, 'queue_store_class must build a Karya::QueueStore::Base'
end

private

attr_reader :queue_store_class
end
end
end
9 changes: 7 additions & 2 deletions core/karya/lib/karya/worker_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,19 @@ def with_signal_handlers(shutdown_controller)
end

def collect_signal_restorers(restorers, shutdown_controller)
register_signal_restorers(restorers, shutdown_controller)
restorers << WakeupSignal.register_restorer(WAKEUP_SIGNAL)
register_signal_restorers(restorers, shutdown_controller)
end

def register_signal_restorers(restorers, shutdown_controller)
SIGNALS.each do |signal|
restorers << runtime.subscribe_signal(signal, -> { shutdown_controller.advance })
restorers << runtime.subscribe_signal(signal, proc do
shutdown_controller.advance
ensure
WakeupSignal.interrupt(WAKEUP_SIGNAL)
end)
Comment thread
niteshpurohit marked this conversation as resolved.
end
restorers
end

def cleanup_tracked_children(child_pids)
Expand Down
30 changes: 22 additions & 8 deletions core/karya/lib/karya/worker_supervisor/runtime.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ def self.default_killer
->(signal, pid) { Process.kill(signal, pid) }
end

def self.default_signal_subscriber
lambda do |signal, handler|
previous_handler = Signal.trap(signal) { handler.call }
lambda do
Signal.trap(signal, previous_handler)
nil
end
end
end

def self.normalize_callable(name, value)
Primitives::Callable.new(name, value, error_class: InvalidWorkerSupervisorConfigurationError).normalize
end
Expand Down Expand Up @@ -66,34 +76,34 @@ def initialize(**attributes)
runtime_class = self.class
@forker = runtime_class.normalize_forker(
:forker,
runtime_class.resolve_option(attributes, :forker, default: method(:default_forker))
resolve_runtime_option(attributes, :forker, default: method(:default_forker))
)
@instrumenter = runtime_class.normalize_optional_callable(
:instrumenter,
runtime_class.resolve_option(attributes, :instrumenter, default: Karya.instrumenter)
resolve_runtime_option(attributes, :instrumenter, default: Karya.instrumenter)
)
@killer = runtime_class.normalize_callable(
:killer,
runtime_class.resolve_option(attributes, :killer, default: runtime_class.default_killer)
resolve_runtime_option(attributes, :killer, default: runtime_class.default_killer)
)
@logger = validate_logger(
runtime_class.resolve_option(attributes, :logger, default: Karya.logger)
resolve_runtime_option(attributes, :logger, default: Karya.logger)
)
@outbound_event_dispatcher = runtime_class.normalize_optional_outbound_event_dispatcher(
:outbound_event_dispatcher,
runtime_class.resolve_option(attributes, :outbound_event_dispatcher, default: Karya.outbound_event_dispatcher)
resolve_runtime_option(attributes, :outbound_event_dispatcher, default: Karya.outbound_event_dispatcher)
)
@poll_waiter = runtime_class.normalize_callable(
:poll_waiter,
runtime_class.resolve_option(attributes, :poll_waiter, default: default_poll_waiter)
resolve_runtime_option(attributes, :poll_waiter, default: default_poll_waiter)
)
@signal_subscriber = runtime_class.normalize_optional_callable(
:signal_subscriber,
runtime_class.resolve_option(attributes, :signal_subscriber, default: nil)
resolve_runtime_option(attributes, :signal_subscriber, default: runtime_class.default_signal_subscriber)
)
@waiter = runtime_class.normalize_callable(
:waiter,
runtime_class.resolve_option(attributes, :waiter, default: default_waiter)
resolve_runtime_option(attributes, :waiter, default: default_waiter)
)
end

Expand Down Expand Up @@ -179,6 +189,10 @@ def default_waiter

private

def resolve_runtime_option(attributes, key, default:)
self.class.resolve_option(attributes, key, default:)
end

def validate_logger(value)
%i[debug info warn error].each do |level|
value.public_method(level)
Expand Down
12 changes: 12 additions & 0 deletions core/karya/sig/karya/backend.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright Codevedas Inc. 2025-present
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

module Karya
class InvalidBackendConfigurationError < Error
end

module Backend
end
end
10 changes: 10 additions & 0 deletions core/karya/sig/karya/backend/base.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module Karya
module Backend
module Base
def identifier: () -> String
def build_queue_store: () -> QueueStore::Base
def before_start: (queue_store: QueueStore::Base) -> nil
def after_stop: (queue_store: QueueStore::Base) -> nil
end
end
end
31 changes: 31 additions & 0 deletions core/karya/sig/karya/backend/in_memory.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
module Karya
module Backend
interface _QueueStoreFactoryInput
def new: (
?token_generator: Karya::callable_value?,
?expired_tombstone_limit: Integer?,
?completed_batch_retention_limit: Integer?,
?max_batch_size: Integer?,
?policy_set: Backpressure::PolicySet?,
?circuit_breaker_policy_set: CircuitBreaker::PolicySet?,
?fairness_policy: Fairness::Policy?
) -> QueueStore::Base
Comment thread
niteshpurohit marked this conversation as resolved.
end

class InMemory
include Base

def initialize: (?queue_store_class: _QueueStoreFactoryInput) -> void
def identifier: () -> "in_memory"
def build_queue_store: (
?token_generator: Karya::callable_value?,
?expired_tombstone_limit: Integer?,
?completed_batch_retention_limit: Integer?,
?max_batch_size: Integer?,
?policy_set: Backpressure::PolicySet?,
?circuit_breaker_policy_set: CircuitBreaker::PolicySet?,
?fairness_policy: Fairness::Policy?
) -> QueueStore::Base
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Karya

def initialize: () -> void

def advance: () -> void
def advance: () -> (:draining | :force_stop | nil)

def begin_drain: () -> bool

Expand Down
2 changes: 2 additions & 0 deletions core/karya/sig/karya/worker_supervisor.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ module Karya

def with_signal_handlers: (WorkerSupervisor::ShutdownController shutdown_controller) { () -> Integer } -> Integer

def collect_signal_restorers: (Array[^() -> nil] restorers, WorkerSupervisor::ShutdownController shutdown_controller) -> Array[^() -> nil]

def register_signal_restorers: (Array[^() -> nil] restorers, WorkerSupervisor::ShutdownController shutdown_controller) -> Array[^() -> nil]
Comment thread
niteshpurohit marked this conversation as resolved.

def cleanup_tracked_children: (::Hash[Integer, bool] child_pids) -> nil
Expand Down
2 changes: 2 additions & 0 deletions core/karya/sig/karya/worker_supervisor/runtime.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ module Karya

def self.default_killer: () -> ^(String, Integer) -> Integer

def self.default_signal_subscriber: () -> ^(String, Karya::signal_handler) -> Karya::signal_restorer
Comment thread
niteshpurohit marked this conversation as resolved.

def self.normalize_callable: (Symbol name, Karya::callable_value value) -> Karya::callable_value

def self.normalize_forker: (Symbol name, Karya::forker value) -> Karya::forker
Expand Down
26 changes: 26 additions & 0 deletions core/karya/spec/karya/backend/base_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

RSpec.describe Karya::Backend::Base do
subject(:backend) { implementation.new }

let(:implementation) do
Class.new do
include Karya::Backend::Base
end
end

it 'requires identifier to be implemented' do
expect { backend.identifier }.to raise_error(NotImplementedError, /implement #identifier/)
end

it 'requires build_queue_store to be implemented' do
expect { backend.build_queue_store }.to raise_error(NotImplementedError, /implement #build_queue_store/)
end

it 'provides no-op lifecycle hooks by default' do
queue_store = instance_double(Karya::QueueStore::Base)

expect(backend.before_start(queue_store:)).to be_nil
expect(backend.after_stop(queue_store:)).to be_nil
end
end
Loading
Loading