Skip to content

Commit

Permalink
add callbacks for async profile processing
Browse files Browse the repository at this point in the history
  • Loading branch information
bmansoob committed Nov 1, 2023
1 parent fbf4c3e commit a031315
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 2 deletions.
27 changes: 27 additions & 0 deletions lib/app_profiler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ module Viewer
mattr_accessor :upload_queue_max_length, default: 10
mattr_accessor :upload_queue_interval_secs, default: 5
mattr_accessor :profile_file_prefix, default: DefaultProfilePrefix
mattr_reader :profile_enqueue_success, default: nil
mattr_reader :profile_enqueue_failure, default: nil
mattr_reader :after_process_queue, default: nil

class << self
def run(*args, &block)
Expand Down Expand Up @@ -88,6 +91,30 @@ def profile_url_formatter=(block)
@@profile_url_formatter = block # rubocop:disable Style/ClassVars
end

def profile_enqueue_success=(handler)
unless handler.nil? || (handler.is_a?(Proc) && handler.lambda? && handler.arity == 0)
raise ArgumentError, "profile_enqueue_success must be a lambda that accepts no argument"
end

@@profile_enqueue_success = handler # rubocop:disable Style/ClassVars
end

def profile_enqueue_failure=(handler)
unless handler.nil? || (handler.is_a?(Proc) && handler.lambda? && handler.arity == 1)
raise ArgumentError, "profile_enqueue_failure must be a lambda that accepts one argument"
end

@@profile_enqueue_failure = handler # rubocop:disable Style/ClassVars
end

def after_process_queue=(handler)
unless handler.nil? || (handler.is_a?(Proc) && handler.lambda? && handler.arity == 2)
raise ArgumentError, "after_process_queue must be a lambda that accepts two arguments"
end

@@after_process_queue = handler # rubocop:disable Style/ClassVars
end

def profile_url(upload)
return unless AppProfiler.profile_url_formatter

Expand Down
3 changes: 3 additions & 0 deletions lib/app_profiler/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class Railtie < Rails::Railtie
AppProfiler.upload_queue_max_length = app.config.app_profiler.upload_queue_max_length || 10
AppProfiler.upload_queue_interval_secs = app.config.app_profiler.upload_queue_interval_secs || 5
AppProfiler.profile_file_prefix = app.config.app_profiler.profile_file_prefix || DefaultProfilePrefix
AppProfiler.profile_enqueue_success = app.config.app_profiler.profile_enqueue_success
AppProfiler.profile_enqueue_failure = app.config.app_profiler.profile_enqueue_failure
AppProfiler.after_process_queue = app.config.app_profiler.after_process_queue
end

initializer "app_profiler.add_middleware" do |app|
Expand Down
9 changes: 8 additions & 1 deletion lib/app_profiler/storage/google_cloud_storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ def enqueue_upload(profile)
@queue ||= init_queue
begin
@queue.push(profile, true) # non-blocking push, raises ThreadError if queue is full
AppProfiler.profile_enqueue_success&.call
rescue ThreadError
AppProfiler.logger.info("[AppProfiler] upload queue is full, profile discarded")
AppProfiler.profile_enqueue_failure&.call(profile)
end
end
end
Expand Down Expand Up @@ -80,7 +82,12 @@ def process_queue

return unless queue

queue.size.times { queue.pop(false).upload }
num_success = 0
num_failures = 0
queue.size.times do
queue.pop(false).upload ? num_success+=1 : num_failures+=1
end
AppProfiler.after_process_queue&.call(num_success, num_failures)
end

def gcs_filename(profile)
Expand Down
49 changes: 48 additions & 1 deletion test/app_profiler/storage/google_cloud_storage_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,25 @@ def teardown
test ".process_queue uploads" do
with_stubbed_process_queue_thread do
profile = profile_from_stackprof
@called = false
AppProfiler.profile_enqueue_success = -> { @called = true }
GoogleCloudStorage.enqueue_upload(profile)
profile.expects(:upload).once
assert(@called)
# profile.expects(:upload).once

@num_success = 0
@num_failures = 0

AppProfiler.after_process_queue = ->(num_success, num_failures) do
@num_success = num_success
@num_failures = num_failures
end
GoogleCloudStorage.send(:process_queue)
assert_equal(1, @num_success)
assert_equal(0, @num_failures)

ensure
AppProfiler.profile_enqueue_success = nil
end
end

Expand All @@ -74,7 +90,18 @@ def teardown
end
dropped_profile = Profile.from_stackprof(profile_from_stackprof)
AppProfiler.logger.expects(:info).with { |value| value =~ /upload queue is full/ }

@called = false
AppProfiler.profile_enqueue_success = -> { @called = true }
refute(@called)

@profile = nil
AppProfiler.profile_enqueue_failure = ->(profile) { @profile = profile }
GoogleCloudStorage.enqueue_upload(dropped_profile)
assert_equal(dropped_profile, @profile)
ensure
AppProfiler.profile_enqueue_success = nil
AppProfiler.profile_enqueue_failure = nil
end
end

Expand All @@ -87,6 +114,26 @@ def teardown
assert(th.alive?)
end

test "unexpected handler profile_enqueue_success raises ArgumentError " do
assert_raises(ArgumentError) do
AppProfiler.profile_enqueue_success = proc { return }
end

assert_raises(ArgumentError) do
AppProfiler.profile_enqueue_success = ->(_success) { return }
end
end

test "unexpected handler after_process_queue raises ArgumentError " do
assert_raises(ArgumentError) do
AppProfiler.after_process_queue = proc { return }
end

assert_raises(ArgumentError) do
AppProfiler.after_process_queue = lambda { return }
end
end

private

def with_stubbed_process_queue_thread
Expand Down

0 comments on commit a031315

Please sign in to comment.