Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/yabeda/base_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ def perform_gauge_set!(_metric, _tags, _value)
raise NotImplementedError, "#{self.class} doesn't support setting gauges"
end

# Optional method that adapters can implement for native gauge increment/decrement
def perform_gauge_increment!(_gauge, _tags, _increment)
# used to indicate adapter does not implement this method. return a non-nil value
# in your adapter to indicate it is supported
nil
end

def register_histogram!(_metric)
raise NotImplementedError, "#{self.class} doesn't support histograms as metric type!"
end
Expand Down
8 changes: 2 additions & 6 deletions lib/yabeda/counter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@ class Counter < Metric
def increment(*args)
tags, by = self.class.parse_args(*args)
all_tags = ::Yabeda::Tags.build(tags, group)
values[all_tags] += by
value = increment_value(all_tags, by: by)
adapters.each_value do |adapter|
adapter.perform_counter_increment!(self, all_tags, by)
end
values[all_tags]
end

def values
@values ||= Concurrent::Hash.new(0)
value
end

# @api private
Expand Down
21 changes: 18 additions & 3 deletions lib/yabeda/gauge.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ module Yabeda
class Gauge < Metric
def set(tags, value)
all_tags = ::Yabeda::Tags.build(tags, group)
values[all_tags] = value
atomic_value = values[all_tags] ||= Concurrent::Atom.new(0)
atomic_value.swap { |_| value }
adapters.each_value do |adapter|
adapter.perform_gauge_set!(self, all_tags, value)
end
Expand All @@ -18,7 +19,14 @@ def set(tags, value)
# @param by [Integer] increment value
def increment(*args)
tags, by = Counter.parse_args(*args)
set(tags, get(tags).to_i + by)
all_tags = ::Yabeda::Tags.build(tags, group)
next_value = increment_value(all_tags, by: by)
adapters.each_value do |adapter|
if adapter.perform_gauge_increment!(self, all_tags, by).nil?
adapter.perform_gauge_set!(self, all_tags, next_value)
end
end
next_value
end

# @overload decrement(tags = {}, by: 1)
Expand All @@ -27,7 +35,14 @@ def increment(*args)
# @param by [Integer] decrement value
def decrement(*args)
tags, by = Counter.parse_args(*args)
set(tags, get(tags).to_i - by)
all_tags = ::Yabeda::Tags.build(tags, group)
next_value = increment_value(all_tags, by: -by)
adapters.each_value do |adapter|
if adapter.perform_gauge_increment!(self, all_tags, -by).nil?
adapter.perform_gauge_set!(self, all_tags, next_value)
end
end
next_value
end
end
end
14 changes: 9 additions & 5 deletions lib/yabeda/metric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ class Metric
# rubocop:disable Layout/LineLength
option :adapter, optional: true, comment: "Monitoring system adapter to register metric in and report metric values to (other adapters won't be used)"
# rubocop:enable Layout/LineLength
option :values, optional: true, default: proc { Concurrent::Hash.new }

# Returns the value for the given label set
def get(labels = {})
values[::Yabeda::Tags.build(labels, group)]
end

def values
@values ||= Concurrent::Hash.new
@values[::Yabeda::Tags.build(labels, group)]&.value
end

# Returns allowed tags for metric (with account for global and group-level +default_tags+)
Expand Down Expand Up @@ -62,5 +59,12 @@ def adapter

super
end

# Atomically increment the stored value, assumed to be given all labels, including group labels
# @api private
def increment_value(labels = {}, by: 1)
atomic_value = values[labels] ||= Concurrent::Atom.new(0)
atomic_value.swap { |prev| prev + by }
end
end
end
10 changes: 9 additions & 1 deletion spec/yabeda/counter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@
it { expect(increment_counter).to eq(metric_value) }

it "increments counter with empty tags if tags are not provided" do
expect { counter.increment }.to change { counter.values[{}] }.by(1)
expect { counter.increment }.to change { counter.get({}) || 0 }.by(1)
end

it "execute perform_counter_increment!" do
increment_counter
expect(adapter).to have_received(:perform_counter_increment!).with(counter, tags, metric_value)
end

it "is threadsafe" do
aggregate_failures do
threads = 20.times.map { Thread.new { 1000.times { counter.increment({}, by: 1) && sleep(0.0001) } } }
threads.each(&:join)
expect(counter.get({})).to eq(20_000)
end
end

context "with adapter option" do
let(:another_adapter) { instance_double(Yabeda::BaseAdapter, perform_counter_increment!: true, register!: true) }
let(:counter) { Yabeda.counter_with_adapter }
Expand Down
38 changes: 36 additions & 2 deletions spec/yabeda/gauge_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
let(:metric_value) { 10 }
let(:gauge) { Yabeda.test_gauge }
let(:built_tags) { { built_foo: "built_bar" } }
let(:adapter) { instance_double(Yabeda::BaseAdapter, perform_gauge_set!: true, register!: true) }
let(:implements_increment) { nil }
let(:adapter) { instance_double(Yabeda::BaseAdapter, perform_gauge_set!: true, perform_gauge_increment!: implements_increment, register!: true) }

before do
Yabeda.configure do
Expand All @@ -26,10 +27,28 @@
end

describe "#increment" do
it "is threadsafe" do
aggregate_failures do
threads = 20.times.map { Thread.new { 1000.times { gauge.increment(tags, by: 1) && sleep(0.0001) } } }
threads.each(&:join)
expect(gauge.get(tags)).to eq(20_000)
end
end

context "when adapter implements perform_gauge_increment!" do
let(:implements_increment) { true }

before { gauge.increment(tags) }

it { expect(adapter).not_to have_received(:perform_gauge_set!).with(gauge, built_tags, 1) }
it { expect(adapter).to have_received(:perform_gauge_increment!).with(gauge, built_tags, 1) }
end

context "when gauge has no initial value" do
before { gauge.increment(tags) }

it { expect(adapter).to have_received(:perform_gauge_set!).with(gauge, built_tags, 1) }
it { expect(adapter).to have_received(:perform_gauge_increment!).with(gauge, built_tags, 1) }
end

context "when gauge has a value already" do
Expand All @@ -38,22 +57,33 @@
gauge.increment(tags)
end

it { expect(adapter).to have_received(:perform_gauge_increment!).with(gauge, built_tags, 1) }
it { expect(adapter).to have_received(:perform_gauge_set!).with(gauge, built_tags, metric_value + 1) }
end

context "when custom step specified" do
it "increases by value of custom step" do
set_gauge
gauge.increment(tags, by: 42)
expect(adapter).to have_received(:perform_gauge_increment!).with(gauge, built_tags, 42)
expect(adapter).to have_received(:perform_gauge_set!).with(gauge, built_tags, metric_value + 42)
end
end
end

describe "#decrement" do
it "is threadsafe" do
aggregate_failures do
threads = 20.times.map { Thread.new { 1000.times { gauge.decrement(tags, by: 1) && sleep(0.0001) } } }
threads.each(&:join)
expect(gauge.get(tags)).to eq(-20_000)
end
end

context "when gauge has no initial value" do
before { gauge.decrement(tags) }

it { expect(adapter).to have_received(:perform_gauge_increment!).with(gauge, built_tags, -1) }
it { expect(adapter).to have_received(:perform_gauge_set!).with(gauge, built_tags, -1) }
end

Expand All @@ -63,14 +93,18 @@
gauge.decrement(tags)
end

it { expect(adapter).to have_received(:perform_gauge_increment!).with(gauge, built_tags, - 1) }
it { expect(adapter).to have_received(:perform_gauge_set!).with(gauge, built_tags, metric_value - 1) }
end

context "when custom step specified" do
it "decreases by value of custom step" do
set_gauge
gauge.decrement(tags, by: 42)
expect(adapter).to have_received(:perform_gauge_set!).with(gauge, built_tags, metric_value - 42)
aggregate_failures do
expect(adapter).to have_received(:perform_gauge_set!).with(gauge, built_tags, metric_value - 42)
expect(adapter).to have_received(:perform_gauge_increment!).with(gauge, built_tags, - 42)
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/yabeda/metric_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
let(:tags) { { foo: "1", bar: "2" } }

before do
metric.instance_variable_set(:@values, { tags => 42 })
metric.values[tags] = Concurrent::Atom.new(42)
end

it { is_expected.to eq(42) }
Expand Down