Skip to content

Commit da6b33d

Browse files
fix: add merge logic for exponential histogram when the temporality cumulative (#1869)
* include scale check and add more test case * update * fix some of the test case * fix all test case except cumulative merge * lint * udpate merge and test case except some test case with check on bucket * test case passed * fix test case after the scale fix * Update exponential_bucket_histogram.rb * revision --------- Co-authored-by: Kayla Reopelle <[email protected]>
1 parent 8c02da8 commit da6b33d

File tree

8 files changed

+1461
-144
lines changed

8 files changed

+1461
-144
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# frozen_string_literal: true
2+
3+
require 'bundler/inline'
4+
5+
gemfile(true) do
6+
source 'https://rubygems.org'
7+
gem "opentelemetry-api"
8+
gem "opentelemetry-common"
9+
gem "opentelemetry-sdk"
10+
11+
gem 'opentelemetry-metrics-api', path: '../../metrics_api'
12+
gem 'opentelemetry-metrics-sdk', path: '../../metrics_sdk'
13+
end
14+
15+
require 'opentelemetry/sdk'
16+
require 'opentelemetry-metrics-sdk'
17+
18+
# this example manually configures the exporter, turn off automatic configuration
19+
ENV['OTEL_METRICS_EXPORTER'] = 'none'
20+
21+
OpenTelemetry::SDK.configure
22+
23+
console_metric_exporter = OpenTelemetry::SDK::Metrics::Export::ConsoleMetricPullExporter.new
24+
25+
OpenTelemetry.meter_provider.add_metric_reader(console_metric_exporter)
26+
27+
OpenTelemetry.meter_provider.add_view('*exponential*', aggregation: OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(aggregation_temporality: :cumulative, max_scale: 20), type: :histogram, unit: 'smidgen')
28+
29+
meter = OpenTelemetry.meter_provider.meter("SAMPLE_METER_NAME")
30+
31+
exponential_histogram = meter.create_histogram('test_exponential_histogram', unit: 'smidgen', description: 'a small amount of something')
32+
(1..10).each do |v|
33+
val = v ** 2
34+
exponential_histogram.record(val, attributes: { 'lox' => 'xol' })
35+
end
36+
37+
OpenTelemetry.meter_provider.metric_readers.each(&:pull)
38+
OpenTelemetry.meter_provider.shutdown

metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb

Lines changed: 206 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
require_relative 'exponential_histogram/ieee_754'
1010
require_relative 'exponential_histogram/logarithm_mapping'
1111
require_relative 'exponential_histogram/exponent_mapping'
12+
require_relative 'exponential_histogram_data_point'
1213

1314
module OpenTelemetry
1415
module SDK
@@ -44,8 +45,20 @@ def initialize(
4445
@scale = validate_scale(max_scale)
4546

4647
@mapping = new_mapping(@scale)
48+
49+
# Previous state for cumulative aggregation
50+
@previous_positive = {} # nil
51+
@previous_negative = {} # nil
52+
@previous_min = {} # Float::INFINITY
53+
@previous_max = {} # -Float::INFINITY
54+
@previous_sum = {} # 0
55+
@previous_count = {} # 0
56+
@previous_zero_count = {} # 0
57+
@previous_scale = {} # nil
4758
end
4859

60+
# when aggregation temporality is cumulative, merge and downscale will happen.
61+
# rubocop:disable Metrics/MethodLength
4962
def collect(start_time, end_time, data_points)
5063
if @aggregation_temporality.delta?
5164
# Set timestamps and 'move' data point values to result.
@@ -57,18 +70,133 @@ def collect(start_time, end_time, data_points)
5770
data_points.clear
5871
hdps
5972
else
60-
# Update timestamps and take a snapshot.
61-
data_points.values.map! do |hdp|
62-
hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
63-
hdp.time_unix_nano = end_time
64-
hdp = hdp.dup
65-
hdp.positive = hdp.positive.dup
66-
hdp.negative = hdp.negative.dup
67-
hdp
73+
# CUMULATIVE temporality - merge current data_points to previous data_points
74+
# and only keep the merged data_points in @previous_*
75+
76+
merged_data_points = {}
77+
78+
# this will slow down the operation especially if large amount of data_points present
79+
# but it should be fine since with cumulative, the data_points are merged into previous_* and not kept in data_points
80+
# rubocop:disable Metrics/BlockLength
81+
data_points.each do |attributes, hdp|
82+
# Store current values
83+
current_positive = hdp.positive
84+
current_negative = hdp.negative
85+
current_sum = hdp.sum
86+
current_min = hdp.min
87+
current_max = hdp.max
88+
current_count = hdp.count
89+
current_zero_count = hdp.zero_count
90+
current_scale = hdp.scale
91+
92+
# Setup previous positive, negative bucket and scale based on three different cases
93+
@previous_positive[attributes] = current_positive.copy_empty if @previous_positive[attributes].nil?
94+
@previous_negative[attributes] = current_negative.copy_empty if @previous_negative[attributes].nil?
95+
@previous_scale[attributes] = current_scale if @previous_scale[attributes].nil?
96+
97+
# Determine minimum scale for merging
98+
min_scale = [@previous_scale[attributes], current_scale].min
99+
100+
# Calculate ranges for positive and negative buckets
101+
low_positive, high_positive = get_low_high_previous_current(
102+
@previous_positive[attributes],
103+
current_positive,
104+
@previous_scale[attributes],
105+
current_scale,
106+
min_scale
107+
)
108+
low_negative, high_negative = get_low_high_previous_current(
109+
@previous_negative[attributes],
110+
current_negative,
111+
@previous_scale[attributes],
112+
current_scale,
113+
min_scale
114+
)
115+
116+
# Adjust min_scale based on bucket size constraints
117+
min_scale = [
118+
min_scale - get_scale_change(low_positive, high_positive),
119+
min_scale - get_scale_change(low_negative, high_negative)
120+
].min
121+
122+
# Downscale previous buckets if necessary
123+
downscale_change = @previous_scale[attributes] - min_scale
124+
downscale(downscale_change, @previous_positive[attributes], @previous_negative[attributes])
125+
126+
# Merge current buckets into previous buckets (kind like update); it's always :cumulative
127+
merge_buckets(@previous_positive[attributes], current_positive, current_scale, min_scale, @aggregation_temporality)
128+
merge_buckets(@previous_negative[attributes], current_negative, current_scale, min_scale, @aggregation_temporality)
129+
130+
# initialize min, max, sum, count, zero_count for first time
131+
@previous_min[attributes] = Float::INFINITY if @previous_min[attributes].nil?
132+
@previous_max[attributes] = -Float::INFINITY if @previous_max[attributes].nil?
133+
@previous_sum[attributes] = 0 if @previous_sum[attributes].nil?
134+
@previous_count[attributes] = 0 if @previous_count[attributes].nil?
135+
@previous_zero_count[attributes] = 0 if @previous_zero_count[attributes].nil?
136+
137+
# Update aggregated values
138+
@previous_min[attributes] = [@previous_min[attributes], current_min].min
139+
@previous_max[attributes] = [@previous_max[attributes], current_max].max
140+
@previous_sum[attributes] += current_sum
141+
@previous_count[attributes] += current_count
142+
@previous_zero_count[attributes] += current_zero_count
143+
@previous_scale[attributes] = min_scale
144+
145+
# Create merged data point
146+
merged_hdp = ExponentialHistogramDataPoint.new(
147+
attributes,
148+
start_time,
149+
end_time,
150+
@previous_count[attributes],
151+
@previous_sum[attributes],
152+
@previous_scale[attributes],
153+
@previous_zero_count[attributes],
154+
@previous_positive[attributes].dup,
155+
@previous_negative[attributes].dup,
156+
0, # flags
157+
nil, # exemplars
158+
@previous_min[attributes],
159+
@previous_max[attributes],
160+
@zero_threshold
161+
)
162+
163+
merged_data_points[attributes] = merged_hdp
164+
end
165+
# rubocop:enable Metrics/BlockLength
166+
167+
# when you have no local_data_points, the loop from cumulative aggregation will not run
168+
# so return last merged data points if exists
169+
if data_points.empty? && !@previous_positive.empty?
170+
@previous_positive.each_key do |attributes|
171+
merged_hdp = ExponentialHistogramDataPoint.new(
172+
attributes,
173+
start_time,
174+
end_time,
175+
@previous_count[attributes],
176+
@previous_sum[attributes],
177+
@previous_scale[attributes],
178+
@previous_zero_count[attributes],
179+
@previous_positive[attributes].dup,
180+
@previous_negative[attributes].dup,
181+
0, # flags
182+
nil, # exemplars
183+
@previous_min[attributes],
184+
@previous_max[attributes],
185+
@zero_threshold
186+
)
187+
merged_data_points[attributes] = merged_hdp
188+
end
68189
end
190+
191+
# clear data_points since the data is merged into previous_* already;
192+
# otherwise we will have duplicated data_points in the next collect
193+
data_points.clear
194+
merged_data_points.values # return array
69195
end
70196
end
197+
# rubocop:enable Metrics/MethodLength
71198

199+
# this is aggregate in python; there is no merge in aggregate; but rescale happened
72200
# rubocop:disable Metrics/MethodLength
73201
def update(amount, attributes, data_points)
74202
# fetch or initialize the ExponentialHistogramDataPoint
@@ -78,6 +206,7 @@ def update(amount, attributes, data_points)
78206
max = -Float::INFINITY
79207
end
80208

209+
# this code block will only be executed if no data_points was found with the attributes
81210
data_points[attributes] = ExponentialHistogramDataPoint.new(
82211
attributes,
83212
nil, # :start_time_unix_nano
@@ -203,7 +332,8 @@ def get_scale_change(low, high)
203332
end
204333

205334
def downscale(change, positive, negative)
206-
return if change <= 0
335+
return if change == 0
336+
raise ArgumentError, 'Invalid change of scale' if change < 0
207337

208338
positive.downscale(change)
209339
negative.downscale(change)
@@ -217,11 +347,76 @@ def validate_scale(scale)
217347
end
218348

219349
def validate_size(size)
220-
raise ArgumentError, "Max size #{size} is smaller than minimum size #{MIN_MAX_SIZE}" if size < MIN_MAX_SIZE
221-
raise ArgumentError, "Max size #{size} is larger than maximum size #{MAX_MAX_SIZE}" if size > MAX_MAX_SIZE
350+
raise ArgumentError, "Buckets min size #{size} is smaller than minimum min size #{MIN_MAX_SIZE}" if size < MIN_MAX_SIZE
351+
raise ArgumentError, "Buckets max size #{size} is larger than maximum max size #{MAX_MAX_SIZE}" if size > MAX_MAX_SIZE
222352

223353
size
224354
end
355+
356+
# checked, only issue is if @previous_scale is nil, then get_low_high may throw error
357+
def get_low_high_previous_current(previous_buckets, current_buckets, previous_scale, current_scale, min_scale)
358+
previous_low, previous_high = get_low_high(previous_buckets, previous_scale, min_scale)
359+
current_low, current_high = get_low_high(current_buckets, current_scale, min_scale)
360+
361+
if current_low > current_high
362+
[previous_low, previous_high]
363+
elsif previous_low > previous_high
364+
[current_low, current_high]
365+
else
366+
[[previous_low, current_low].min, [previous_high, current_high].max]
367+
end
368+
end
369+
370+
def get_low_high(buckets, scale, min_scale)
371+
return [0, -1] if buckets.nil? || buckets.counts == [0] || buckets.counts.empty?
372+
373+
shift = scale - min_scale
374+
[buckets.index_start >> shift, buckets.index_end >> shift]
375+
end
376+
377+
def merge_buckets(previous_buckets, current_buckets, current_scale, min_scale, aggregation_temporality)
378+
return unless current_buckets && !current_buckets.counts.empty?
379+
380+
current_change = current_scale - min_scale
381+
382+
# when we iterate counts, we don't use offset counts
383+
current_buckets.instance_variable_get(:@counts).each_with_index do |current_bucket, current_bucket_index|
384+
next if current_bucket == 0
385+
386+
current_index = current_buckets.index_base + current_bucket_index
387+
current_index -= current_buckets.counts.size if current_index > current_buckets.index_end
388+
389+
inds = current_index >> current_change
390+
391+
# Grow previous buckets if needed to accommodate the new index
392+
if inds < previous_buckets.index_start
393+
span = previous_buckets.index_end - inds
394+
395+
raise StandardError, 'Incorrect merge scale' if span >= @size
396+
397+
previous_buckets.grow(span + 1, @size) if span >= previous_buckets.counts.size
398+
399+
previous_buckets.index_start = inds
400+
end
401+
402+
if inds > previous_buckets.index_end
403+
span = inds - previous_buckets.index_start
404+
405+
raise StandardError, 'Incorrect merge scale' if span >= @size
406+
407+
previous_buckets.grow(span + 1, @size) if span >= previous_buckets.counts.size
408+
409+
previous_buckets.index_end = inds
410+
end
411+
412+
bucket_index = inds - previous_buckets.index_base
413+
bucket_index += previous_buckets.counts.size if bucket_index < 0
414+
415+
# For delta temporality in merge, we subtract (this shouldn't normally happen in our use case)
416+
increment = aggregation_temporality == :delta ? -current_bucket : current_bucket
417+
previous_buckets.increment_bucket(bucket_index, increment)
418+
end
419+
end
225420
end
226421
end
227422
end

metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_histogram/buckets.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ def grow(needed, max_size)
2727
old_positive_limit = size - bias
2828

2929
new_size = [2**Math.log2(needed).ceil, max_size].min
30-
3130
new_positive_limit = new_size - bias
3231

3332
tmp = Array.new(new_size, 0)
@@ -105,6 +104,15 @@ def downscale(amount)
105104
def increment_bucket(bucket_index, increment = 1)
106105
@counts[bucket_index] += increment
107106
end
107+
108+
def copy_empty
109+
new_buckets = self.class.new
110+
new_buckets.instance_variable_set(:@counts, Array.new(@counts.size, 0))
111+
new_buckets.instance_variable_set(:@index_base, @index_base)
112+
new_buckets.instance_variable_set(:@index_start, @index_start)
113+
new_buckets.instance_variable_set(:@index_end, @index_end)
114+
new_buckets
115+
end
108116
end
109117
end
110118
end

metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,12 +294,12 @@
294294
error = assert_raises(ArgumentError) do
295295
OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(max_size: 10_000_000)
296296
end
297-
assert_equal('Max size 10000000 is larger than maximum size 16384', error.message)
297+
assert_equal('Buckets max size 10000000 is larger than maximum max size 16384', error.message)
298298

299299
error = assert_raises(ArgumentError) do
300300
OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(max_size: 0)
301301
end
302-
assert_equal('Max size 0 is smaller than minimum size 2', error.message)
302+
assert_equal('Buckets min size 0 is smaller than minimum min size 2', error.message)
303303
end
304304
end
305305
end

0 commit comments

Comments
 (0)