Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic exponential histogram #1736

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ module Aggregation
require 'opentelemetry/sdk/metrics/aggregation/sum'
require 'opentelemetry/sdk/metrics/aggregation/last_value'
require 'opentelemetry/sdk/metrics/aggregation/drop'
require 'opentelemetry/sdk/metrics/aggregation/exponential_histogram_data_point'
require 'opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram'
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require_relative 'exponential_histogram/buckets'
require_relative 'exponential_histogram/log2e_scale_factor'
require_relative 'exponential_histogram/ieee_754'
require_relative 'exponential_histogram/logarithm_mapping'
require_relative 'exponential_histogram/exponent_mapping'

module OpenTelemetry
module SDK
module Metrics
module Aggregation
# Contains the implementation of the {https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram ExponentialBucketHistogram} aggregation
class ExponentialBucketHistogram # rubocop:disable Metrics/ClassLength
attr_reader :aggregation_temporality

# relate to min max scale: https://opentelemetry.io/docs/specs/otel/metrics/sdk/#support-a-minimum-and-maximum-scale
MAX_SCALE = 20
MIN_SCALE = -10
MAX_SIZE = 160

# The default boundaries are calculated based on default max_size and max_scale values
def initialize(
aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta),
max_size: MAX_SIZE,
max_scale: MAX_SCALE,
record_min_max: true,
zero_threshold: 0
)
@aggregation_temporality = aggregation_temporality
@record_min_max = record_min_max
@min = Float::INFINITY
@max = -Float::INFINITY
@sum = 0
@count = 0
@zero_threshold = zero_threshold
@zero_count = 0
@size = validate_size(max_size)
@scale = validate_scale(max_scale)

@mapping = new_mapping(@scale)
end

def collect(start_time, end_time, data_points)
if @aggregation_temporality == :delta
# Set timestamps and 'move' data point values to result.
hdps = data_points.values.map! do |hdp|
hdp.start_time_unix_nano = start_time
hdp.time_unix_nano = end_time
hdp
end
data_points.clear
hdps
else
# Update timestamps and take a snapshot.
data_points.values.map! do |hdp|
hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
hdp.time_unix_nano = end_time
hdp = hdp.dup
hdp.positive = hdp.positive.dup
hdp.negative = hdp.negative.dup
hdp
end
end
end

# rubocop:disable Metrics/MethodLength
def update(amount, attributes, data_points)
# fetch or initialize the ExponentialHistogramDataPoint
hdp = data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
0, # :time_unix_nano
0, # :count
0, # :sum
@scale, # :scale
@zero_count, # :zero_count
ExponentialHistogram::Buckets.new, # :positive
ExponentialHistogram::Buckets.new, # :negative
0, # :flags
nil, # :exemplars
min, # :min
max, # :max
@zero_threshold # :zero_threshold)
)
end

# Start to populate the data point (esp. the buckets)
if @record_min_max
hdp.max = amount if amount > hdp.max
hdp.min = amount if amount < hdp.min
end

hdp.sum += amount
hdp.count += 1

if amount.abs <= @zero_threshold
hdp.zero_count += 1
hdp.scale = 0 if hdp.count == hdp.zero_count # if always getting zero, then there is no point to keep doing the update
return
end

# rescale, map to index, update the buckets here
buckets = amount.positive? ? hdp.positive : hdp.negative
amount = -amount if amount.negative?

bucket_index = @mapping.map_to_index(amount)

rescaling_needed = false
low = high = 0

if buckets.counts == [0] # special case of empty
buckets.index_start = bucket_index
buckets.index_end = bucket_index
buckets.index_base = bucket_index

elsif bucket_index < buckets.index_start && (buckets.index_end - bucket_index) >= @size
rescaling_needed = true
low = bucket_index
high = buckets.index_end

elsif bucket_index > buckets.index_end && (bucket_index - buckets.index_start) >= @size
rescaling_needed = true
low = buckets.index_start
high = bucket_index
end

if rescaling_needed
scale_change = get_scale_change(low, high)
downscale(scale_change, hdp.positive, hdp.negative)
new_scale = @mapping.scale - scale_change
hdp.scale = new_scale
@mapping = new_mapping(new_scale)
bucket_index = @mapping.map_to_index(amount)

OpenTelemetry.logger.debug "Rescaled with new scale #{new_scale} from #{low} and #{high}; bucket_index is updated to #{bucket_index}"
end

# adjust buckets based on the bucket_index
if bucket_index < buckets.index_start
span = buckets.index_end - bucket_index
grow_buckets(span, buckets)
buckets.index_start = bucket_index
elsif bucket_index > buckets.index_end
span = bucket_index - buckets.index_start
grow_buckets(span, buckets)
buckets.index_end = bucket_index
end

bucket_index -= buckets.index_base
bucket_index += buckets.counts.size if bucket_index.negative?

buckets.increment_bucket(bucket_index)
nil
end
# rubocop:enable Metrics/MethodLength

private

def grow_buckets(span, buckets)
return if span < buckets.counts.size

OpenTelemetry.logger.debug "buckets need to grow to #{span + 1} from #{buckets.counts.size} (max bucket size #{@size})"
buckets.grow(span + 1, @size)
end

def new_mapping(scale)
scale <= 0 ? ExponentialHistogram::ExponentMapping.new(scale) : ExponentialHistogram::LogarithmMapping.new(scale)
end

def empty_counts
@boundaries ? Array.new(@boundaries.size + 1, 0) : nil
end

def get_scale_change(low, high)
# puts "get_scale_change: low: #{low}, high: #{high}, @size: #{@size}"
# python code also produce 18 with 0,1048575, the high is little bit off
# just checked, the mapping is also ok, produce the 1048575
change = 0
while high - low >= @size
high >>= 1
low >>= 1
change += 1
end
change
end

def downscale(change, positive, negative)
return if change <= 0

positive.downscale(change)
negative.downscale(change)
end

def validate_scale(scale)
return scale unless scale > MAX_SCALE || scale < MIN_SCALE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how performant it is, but another way to implement this could be:

(MIN_SCALE..MAX_SCALE).cover?(scale)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think use operator will be faster since it's direct comparisons and short-circuiting behavior; but range comparison looks nice. Let me know if I need to change it.

       user     system      total        real
Direct Comparison:  0.047998   0.000000   0.047998 (  0.047786)
Range Cover:  0.172396   0.000963   0.173359 (  0.173389)


OpenTelemetry.logger.warn "Scale #{scale} is invalid, using default max scale #{MAX_SCALE}"
MAX_SCALE
end

def validate_size(size)
return size unless size > MAX_SIZE || size < 0

OpenTelemetry.logger.warn "Size #{size} is invalid, using default max size #{MAX_SIZE}"
MAX_SIZE
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Aggregation
module ExponentialHistogram
# Buckets is the fundamental building block of exponential histogram that store bucket/boundary value
class Buckets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this class have test coverage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently no; it's intensively test through test case for exponential_bucket_histogram. If bucket is not correct, the exponential_bucket_histogram will fail.

I will come up with some test case for the class inside /exponential_histogram/

attr_accessor :index_start, :index_end, :index_base

def initialize
@counts = [0]
@index_base = 0
@index_start = 0
@index_end = 0
end

# grow simply expand the @counts size
def grow(needed, max_size)
size = @counts.size
bias = @index_base - @index_start
old_positive_limit = size - bias

new_size = [2**Math.log2(needed).ceil, max_size].min

new_positive_limit = new_size - bias

tmp = Array.new(new_size, 0)
tmp[new_positive_limit..-1] = @counts[old_positive_limit..]
tmp[0...old_positive_limit] = @counts[0...old_positive_limit]
@counts = tmp
end

def offset
@index_start
end

def offset_counts
bias = @index_base - @index_start
@counts[-bias..] + @counts[0...-bias]
end
alias counts offset_counts

def length
return 0 if @counts.empty?
return 0 if @index_end == @index_start && counts[0] == 0

@index_end - @index_start + 1
end

def get_bucket(key)
bias = @index_base - @index_start

key += @counts.size if key < bias
key -= bias

@counts[key]
end

def downscale(amount)
bias = @index_base - @index_start

if bias != 0
@index_base = @index_start
@counts.reverse!
@counts = @counts[0...bias].reverse + @counts[bias..].reverse
end

size = 1 + @index_end - @index_start
each = 1 << amount
inpos = 0
outpos = 0
pos = @index_start

while pos <= @index_end
mod = pos % each
mod += each if mod < 0

inds = mod

while inds < each && inpos < size
if outpos != inpos
@counts[outpos] += @counts[inpos]
@counts[inpos] = 0
end

inpos += 1
pos += 1
inds += 1
end

outpos += 1
end

@index_start >>= amount
@index_end >>= amount
@index_base = @index_start
end

def increment_bucket(bucket_index, increment = 1)
@counts[bucket_index] += increment
end
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Aggregation
module ExponentialHistogram
# LogarithmMapping for mapping when scale < 0
class ExponentMapping
attr_reader :scale

def initialize(scale)
@scale = scale
@min_normal_lower_boundary_index = calculate_min_normal_lower_boundary_index(scale)
@max_normal_lower_boundary_index = IEEE754::MAX_NORMAL_EXPONENT >> -@scale
end

def map_to_index(value)
return @min_normal_lower_boundary_index if value < IEEE754::MIN_NORMAL_VALUE

exponent = IEEE754.get_ieee_754_exponent(value)
correction = (IEEE754.get_ieee_754_mantissa(value) - 1) >> IEEE754::MANTISSA_WIDTH
(exponent + correction) >> -@scale
end

def calculate_min_normal_lower_boundary_index(scale)
inds = IEEE754::MIN_NORMAL_EXPONENT >> -scale
inds -= 1 if -scale < 2
inds
end

# for testing
def get_lower_boundary(inds)
raise StandardError, 'mapping underflow' if inds < @min_normal_lower_boundary_index || inds > @max_normal_lower_boundary_index

Math.ldexp(1, inds << -@scale)
end
end
end
end
end
end
end
Loading
Loading