diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9106b2a --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/.bundle/ +/.yardoc +/_yardoc/ +/coverage/ +/doc/ +/pkg/ +/spec/reports/ +/tmp/ diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..8c18f1a --- /dev/null +++ b/.rspec @@ -0,0 +1,2 @@ +--format documentation +--color diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..fa75df1 --- /dev/null +++ b/Gemfile @@ -0,0 +1,3 @@ +source 'https://rubygems.org' + +gemspec diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..d31031d --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,57 @@ +PATH + remote: . + specs: + eventboss (1.0.0) + aws-sdk-sns (>= 1.1.0) + aws-sdk-sqs (>= 1.3.0) + concurrent-ruby (~> 1.0, >= 1.0.5) + dotenv (~> 2.1, >= 2.1.1) + +GEM + remote: https://rubygems.org/ + specs: + aws-eventstream (1.0.3) + aws-partitions (1.161.0) + aws-sdk-core (3.51.0) + aws-eventstream (~> 1.0, >= 1.0.2) + aws-partitions (~> 1.0) + aws-sigv4 (~> 1.1) + jmespath (~> 1.0) + aws-sdk-sns (1.13.0) + aws-sdk-core (~> 3, >= 3.48.2) + aws-sigv4 (~> 1.1) + aws-sdk-sqs (1.13.0) + aws-sdk-core (~> 3, >= 3.48.2) + aws-sigv4 (~> 1.1) + aws-sigv4 (1.1.0) + aws-eventstream (~> 1.0, >= 1.0.2) + concurrent-ruby (1.1.5) + diff-lcs (1.3) + dotenv (2.7.2) + jmespath (1.4.0) + rake (12.3.1) + rspec (3.7.0) + rspec-core (~> 3.7.0) + rspec-expectations (~> 3.7.0) + rspec-mocks (~> 3.7.0) + rspec-core (3.7.0) + rspec-support (~> 3.7.0) + rspec-expectations (3.7.0) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.7.0) + rspec-mocks (3.7.0) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.7.0) + rspec-support (3.7.0) + +PLATFORMS + ruby + +DEPENDENCIES + bundler (~> 1.13) + eventboss! + rake (>= 10.0) + rspec (~> 3.0) + +BUNDLED WITH + 1.17.2 diff --git a/Guardfile b/Guardfile new file mode 100644 index 0000000..ca9b1b2 --- /dev/null +++ b/Guardfile @@ -0,0 +1,4 @@ +guard 'rspec', cmd: 'bundle exec rspec --color' do + watch(%r{^lib/(.+)\.rb$}) { |m| "spec/#{m[1]}_spec.rb" } + watch(%r|^spec/(.*)_spec\.rb|) +end diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..afaaef2 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2019 AirHelp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md index f3ce677..64c7f39 100644 --- a/README.md +++ b/README.md @@ -1 +1,132 @@ -# eventboss \ No newline at end of file +# Eventboss + +Eventboss ruby client. + +## Installation + +Add this line to your application's Gemfile: + +```ruby +gem 'eventboss' +``` + +## Usage + +Run the listener by: +``` +bundle exec eventboss +``` +it will read conf values from ENV variables in configuration sections. + +### Broadcasting events: + +```ruby +publisher = Eventboss.publisher(event_name) +publisher.publish(payload) +``` + +### Unicasting events in batches: (via SQS) + +```ruby +sender = Eventboss.sender(event_name, destination_app) +sender.send_batch([payload1, payload2]) +``` + +Receiving events via listeners: + +```ruby +class AnyName + include Eventboss::Listener + eventboss_options source_app: 'src_app_name', event_name: 'my_event' + + def receive(payload) + end +end +``` + + +## Configuration + +By default, no exception will be raised when publisher configuration is missing (`eventboss_region`, +`eventboss_account_id`, `eventboss_app_name`). This can lead to false-positive specs, app not working without exceptions on dev/stg/prod environment. It's strongly advised to set `raise_on_missing_configuration` to true. + +Using `.configure`: + +```ruby +Eventboss.configure do |config| + config.raise_on_missing_configuration = true + config.eventboss_account_id = 1234567 + config.eventboss_app_name = name + config.eventboss_region = aws_region + config.concurrency = 10 + # when using custom clients like localstack + config.sns_client = client # Custom SNS Client can be used, i.e. to use local mock, see: https://github.com/phstc/shoryuken/wiki/Using-a-local-mock-SQS-server + config.sqs_client = Aws::SQS::Client.new(endpoint: 'http://localstack:4576', region: 'us-east-1', credentials: Aws::Credentials.new('fake', 'fake')) +end +``` + +Using ENVs: + +``` +EVENTBUS_ACCOUNT_ID=12345676 +EVENTBUS_APP_NAME=application_name +EVENTBUS_ENV=env_name # production/staging/test +EVENTBUS_REGION=aws_region # i.e. eu-west-1 +EVENTBUS_CONCURRENCY=10 # default is 25 + +AWS_SNS_ENDPOINT=http://localhost:4575 # when using with localstack +AWS_SQS_ENDPOINT=http://localhost:4576 # when using with localstack +``` + +### Logging and error handling +To have more verbose logging, set `log_level` in configuration (default is `info`). + +Logger is used as default error handler. There is Airbrake handler available, to use it ensure you have `airbrake` or `airbrake-ruby` gem and add it to error handlers stack: + +```ruby +Eventboss.configure do |config| + config.error_handlers << Eventboss::ErrorHandlers::Airbrake.new +end +``` + +### Polling strategy + +Default is `Eventboss::Polling::Basic`. See `eventboss/polling/*` for other options. The configuration should be a `lambda` like so: + +```ruby +Eventboss.configure do |config| + config.polling_strategy = lambda { |queues| Eventboss::Polling::TimedRoundRobin.new(queues) } +end +``` + +## Topics & Queues naming convention + +The SNSes should be name in the following pattern: +``` +eventboss-{src_app_name}-{event_name}-{environment} +``` + +i.e. +``` +eventboss-srcapp-transaction_change-staging +``` + +The corresponding SQSes should be name like: +``` +{dest_app_name}-eventboss-{src_app_name}-{event_name}-{environment} +{dest_app_name}-eventboss-{src_app_name}-{event_name}-{environment}-deadletter +``` +i.e. +``` +destapp-eventboss-srcapp-transaction_change-staging +destapp-eventboss-srcapp-transaction_change-staging-deadletter +``` + +## Contributing + +Bug reports and pull requests are welcome on GitHub at https://github.com/AirHelp/eventboss. + + +## License + +The gem is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT). diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..b7e9ed5 --- /dev/null +++ b/Rakefile @@ -0,0 +1,6 @@ +require "bundler/gem_tasks" +require "rspec/core/rake_task" + +RSpec::Core::RakeTask.new(:spec) + +task :default => :spec diff --git a/bin/eventboss b/bin/eventboss new file mode 100755 index 0000000..98961ef --- /dev/null +++ b/bin/eventboss @@ -0,0 +1,47 @@ +#!/usr/bin/env ruby + +require 'rubygems' +require 'dotenv' +require 'eventboss' +require 'optparse' + +Dotenv.load + +STDOUT.sync = true +options = {} + +OptionParser.new do |parser| + parser.on('-r', '--require LIBRARY', 'Require custom app entrypoint') do |lib| + options[:require] = lib + end +end.parse! + +begin + logger = Eventboss::Logger + + require 'rails' + logger.debug('Loading rails...') + if ::Rails::VERSION::MAJOR < 4 + require File.expand_path('config/environment.rb') + else + require File.expand_path('config/application.rb') + require File.expand_path('config/environment.rb') + end + ::Rails.application.eager_load! +rescue LoadError + logger.debug('Seems like not a Rails app') + + if options[:require].nil? + logger.warn('Please use -r to load a custom app entrypoint') + exit(0) + else + logger.debug("Loading #{options[:require]}") + require File.expand_path(options[:require]) + end +end + +logger.info('Starting eventboss...') +logger.info('Active Listeners:') +logger.info(Eventboss::QueueListener.list.to_s) + +Eventboss.launch diff --git a/eventboss.gemspec b/eventboss.gemspec new file mode 100644 index 0000000..e7b2c05 --- /dev/null +++ b/eventboss.gemspec @@ -0,0 +1,40 @@ +# coding: utf-8 +lib = File.expand_path('../lib', __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) +require 'eventboss/version' + +Gem::Specification.new do |spec| + spec.name = "eventboss" + spec.version = Eventboss::VERSION + spec.authors = ["AirHelp"] + spec.email = ["marcin.naglik@airhelp.com"] + + spec.summary = %q{Eventboss Ruby Client.} + spec.description = %q{Eventboss Ruby Client.} + spec.homepage = "https://airhelp.com" + spec.license = "MIT" + + # Prevent pushing this gem to RubyGems.org. To allow pushes either set the 'allowed_push_host' + # to allow pushing to a single host or delete this section to allow pushing to any host. + if spec.respond_to?(:metadata) + spec.metadata['allowed_push_host'] = "TODO: Set to 'http://mygemserver.com'" + else + raise "RubyGems 2.0 or newer is required to protect against " \ + "public gem pushes." + end + + spec.files = `git ls-files -z`.split("\x0").reject do |f| + f.match(%r{^(test|spec|features)/}) + end + spec.executables = ["eventboss"] + spec.require_paths = ["lib"] + + spec.add_dependency "concurrent-ruby", "~> 1.0", ">= 1.0.5" + spec.add_dependency "aws-sdk-sqs", ">= 1.3.0" + spec.add_dependency "aws-sdk-sns", ">= 1.1.0" + spec.add_dependency "dotenv", "~> 2.1", ">= 2.1.1" + + spec.add_development_dependency "bundler", "~> 1.13" + spec.add_development_dependency 'rake', '>= 10.0' + spec.add_development_dependency "rspec", "~> 3.0" +end diff --git a/lib/eventboss.rb b/lib/eventboss.rb new file mode 100644 index 0000000..001cb13 --- /dev/null +++ b/lib/eventboss.rb @@ -0,0 +1,79 @@ +require 'aws-sdk-sqs' +require 'aws-sdk-sns' +require 'concurrent' +require 'securerandom' + +require 'eventboss/version' +require 'eventboss/configuration' +require 'eventboss/instrumentation' +require 'eventboss/sns_client' +require 'eventboss/queue' +require 'eventboss/queue_listener' +require 'eventboss/listener' +require 'eventboss/logging' +require 'eventboss/safe_thread' +require 'eventboss/launcher' +require 'eventboss/long_poller' +require 'eventboss/unit_of_work' +require 'eventboss/worker' +require 'eventboss/fetcher' +require 'eventboss/publisher' +require 'eventboss/sender' +require 'eventboss/manager' +require 'eventboss/runner' +require 'eventboss/logger' +require 'eventboss/polling/basic' +require 'eventboss/polling/timed_round_robin' +require 'eventboss/extensions' + +# For Rails use railtie, for plain Ruby apps use custom scripts loader +if defined?(Rails) + require 'eventboss/railtie' +else + require 'eventboss/scripts' +end + +module Eventboss + Shutdown = Class.new(StandardError) + + class << self + def publisher(event_name, opts = {}) + Publisher.new(event_name, configuration.sns_client, configuration, opts) + end + + def sender(event_name, destination_app, options = {}) + queue_name = Queue.build_name( + destination: destination_app, + source: configuration.eventboss_app_name, + event: event_name, + env: env, + generic: options[:generic] + ) + + Sender.new( + client: configuration.sqs_client, + queue: Queue.new(queue_name) + ) + end + + def listen + Eventboss::Runner.start + end + + def launch + Eventboss::Runner.launch + end + + def env + @env ||= ENV['EVENTBUS_ENV'] || ENV['RAILS_ENV'] || ENV['RACK_ENV'] + end + + def configure + yield configuration if block_given? + end + + def configuration + @_configuration ||= Configuration.new + end + end +end diff --git a/lib/eventboss/configuration.rb b/lib/eventboss/configuration.rb new file mode 100644 index 0000000..e3d621b --- /dev/null +++ b/lib/eventboss/configuration.rb @@ -0,0 +1,104 @@ +module Eventboss + class Configuration + attr_writer :raise_on_missing_configuration, + :error_handlers, + :concurrency, + :log_level, + :sns_client, + :sqs_client, + :eventboss_region, + :eventboss_app_name, + :eventboss_account_id, + :aws_access_key_id, + :aws_secret_access_key, + :polling_strategy, + :aws_sns_endpoint, + :aws_sqs_endpoint, + :sns_sqs_name_infix + + def raise_on_missing_configuration + defined_or_default('raise_on_missing_configuration') { false } + end + + def error_handlers + defined_or_default('error_handlers') { [ErrorHandlers::Logger.new] } + end + + def concurrency + defined_or_default('concurrency') { ENV['EVENTBUS_CONCURRENCY'] ? ENV['EVENTBUS_CONCURRENCY'].to_i : 25 } + end + + def log_level + defined_or_default('log_level') { :info } + end + + def sns_client + defined_or_default('sns_client') { Eventboss::SnsClient.new(self) } + end + + def sqs_client + defined_or_default('sqs_client') do + options = { + region: eventboss_region, + credentials: Aws::Credentials.new( + aws_access_key_id, + aws_secret_access_key + ) + } + if aws_sqs_endpoint + options[:endpoint] = aws_sqs_endpoint + end + + Aws::SQS::Client.new(options) + end + end + + def eventboss_region + defined_or_default('eventboss_region') { ENV['EVENTBUS_REGION'] } + end + + def eventboss_app_name + defined_or_default('eventboss_app_name') { ENV['EVENTBUS_APP_NAME'] } + end + + def eventboss_account_id + defined_or_default('eventboss_account_id') { ENV['EVENTBUS_ACCOUNT_ID'] } + end + + def aws_access_key_id + defined_or_default('aws_access_key_id') { ENV['AWS_ACCESS_KEY_ID'] } + end + + def aws_secret_access_key + defined_or_default('aws_secret_access_key') { ENV['AWS_SECRET_ACCESS_KEY'] } + end + + def aws_sqs_endpoint + defined_or_default('aws_sqs_endpoint') { ENV['AWS_SQS_ENDPOINT'] } + end + + def aws_sns_endpoint + defined_or_default('aws_sns_endpoint') { ENV['AWS_SNS_ENDPOINT'] } + end + + def polling_strategy + defined_or_default('polling_strategy') do + lambda { |queues| Eventboss::Polling::Basic.new(queues) } + end + end + + def sns_sqs_name_infix + defined_or_default('sns_sqs_name_infix') { ENV['EVENTBUS_SQS_SNS_NAME_INFIX'] || '-eventboss-' } + end + + private + + def defined_or_default(variable_name) + if instance_variable_defined?("@#{variable_name}") + instance_variable_get("@#{variable_name}") + else + instance_variable_set("@#{variable_name}", yield) if block_given? + end + end + end +end diff --git a/lib/eventboss/error_handlers/airbrake.rb b/lib/eventboss/error_handlers/airbrake.rb new file mode 100644 index 0000000..4f8fba6 --- /dev/null +++ b/lib/eventboss/error_handlers/airbrake.rb @@ -0,0 +1,13 @@ +module Eventboss + module ErrorHandlers + class Airbrake + def call(exception, context = {}) + ::Airbrake.notify(exception) do |notice| + notice[:context][:component] = 'eventboss' + notice[:context][:action] = context[:processor].class.to_s if context[:processor] + notice[:context].merge!(context) + end + end + end + end +end diff --git a/lib/eventboss/error_handlers/logger.rb b/lib/eventboss/error_handlers/logger.rb new file mode 100644 index 0000000..723cd21 --- /dev/null +++ b/lib/eventboss/error_handlers/logger.rb @@ -0,0 +1,12 @@ +module Eventboss + module ErrorHandlers + class Logger + def call(exception, context = {}) + notice = {}.merge!(context) + notice[:jid] = notice[:processor].jid if notice[:processor] + notice[:processor] = notice[:processor].class.to_s if notice[:processor] + Eventboss::Logger.error("Failure processing request #{exception.message}", notice) + end + end + end +end diff --git a/lib/eventboss/extensions.rb b/lib/eventboss/extensions.rb new file mode 100644 index 0000000..06ed38d --- /dev/null +++ b/lib/eventboss/extensions.rb @@ -0,0 +1,2 @@ +require 'eventboss/error_handlers/logger' +require 'eventboss/error_handlers/airbrake' diff --git a/lib/eventboss/fetcher.rb b/lib/eventboss/fetcher.rb new file mode 100644 index 0000000..fb2cef7 --- /dev/null +++ b/lib/eventboss/fetcher.rb @@ -0,0 +1,33 @@ +module Eventboss + class Fetcher + FETCH_LIMIT = 10 # maximum possible for SQS + + attr_reader :client + + def initialize(configuration) + @client = configuration.sqs_client + end + + def fetch(queue, limit) + @client.receive_message(queue_url: queue.url, max_number_of_messages: max_no_of_messages(limit)).messages + end + + def delete(queue, message) + @client.delete_message(queue_url: queue.url, receipt_handle: message.receipt_handle) + end + + def change_message_visibility(queue, message, visibility_timeout) + @client.change_message_visibility( + queue_url: queue.url, + receipt_handle: message.receipt_handle, + visibility_timeout: visibility_timeout + ) + end + + private + + def max_no_of_messages(limit) + [limit, FETCH_LIMIT].min + end + end +end diff --git a/lib/eventboss/instrumentation.rb b/lib/eventboss/instrumentation.rb new file mode 100644 index 0000000..41d659b --- /dev/null +++ b/lib/eventboss/instrumentation.rb @@ -0,0 +1,26 @@ +module Eventboss + # :nodoc: + module Instrumentation + def self.add(queue_listeners) + return unless defined?(::NewRelic::Agent::Instrumentation::ControllerInstrumentation) + Eventboss::Instrumentation::NewRelic.install(queue_listeners) + end + + # :nodoc: + module NewRelic + def self.install(queue_listeners) + Eventboss::Logger.logger.info('Loaded NewRelic instrumentation') + queue_listeners.each_value do |listener_class| + listener_class.include(::NewRelic::Agent::Instrumentation::ControllerInstrumentation) + listener_class.add_transaction_tracer(:receive, category: 'OtherTransaction/EventbossJob') + end + + Eventboss::Sender.include(::NewRelic::Agent::MethodTracer) + Eventboss::Sender.add_method_tracer(:send_batch, 'Eventboss/sender_send_batch') + + Eventboss::Publisher.include(::NewRelic::Agent::MethodTracer) + Eventboss::Publisher.add_method_tracer(:publish, 'Eventboss/publisher_publish') + end + end + end +end diff --git a/lib/eventboss/launcher.rb b/lib/eventboss/launcher.rb new file mode 100644 index 0000000..ce220ee --- /dev/null +++ b/lib/eventboss/launcher.rb @@ -0,0 +1,96 @@ +module Eventboss + # Launcher manages lifecycle of queues and pollers threads + class Launcher + include Logging + + DEFAULT_SHUTDOWN_ATTEMPTS = 5 + DEFAULT_SHUTDOWN_DELAY = 5 + + def initialize(queues, client, options = {}) + @options = options + @queues = queues + @client = client + + @lock = Mutex.new + @bus = SizedQueue.new(@queues.size * 10) + + @pollers = Set.new + @queues.each { |q, listener| @pollers << new_poller(q, listener) } + + @workers = Set.new + worker_count.times { |id| @workers << new_worker(id) } + end + + def start + logger.info("Starting #{@workers.size} workers, #{@pollers.size} pollers", 'launcher') + @pollers.each(&:start) + @workers.each(&:start) + end + + def stop + logger.info('Gracefully shutdown', 'launcher') + + @bus.clear + @pollers.each(&:terminate) + @workers.each(&:terminate) + + wait_for_shutdown + hard_shutdown + end + + def hard_shutdown + return if @pollers.empty? && @workers.empty? + + logger.info("Killing remaining #{@pollers.size} pollers, #{@workers.size} workers", 'launcher') + @pollers.each(&:kill) + @workers.each(&:kill) + end + + def worker_stopped(worker, restart: false) + @lock.synchronize do + @workers.delete(worker) + @workers << new_worker(worker.id).tap(&:start) if restart + end + logger.debug("Worker #{worker.id} stopped, restart: #{restart}", 'launcher') + end + + def poller_stopped(poller, restart: false) + @lock.synchronize do + @pollers.delete(poller) + @pollers << new_poller(poller.queue, poller.listener).tap(&:start) if restart + end + logger.debug("Poller #{poller.id} stopped, restart: #{restart}", 'launcher') + end + + private + + def worker_count + @options.fetch(:worker_count, [2, Concurrent.processor_count].max) + end + + def new_worker(id) + Worker.new(self, id, @client, @bus) + end + + def new_poller(queue, listener) + LongPoller.new(self, @bus, @client, queue, listener) + end + + def wait_for_shutdown + attempts = 0 + while @pollers.any? || @workers.any? + break if (attempts += 1) > shutdown_attempts + sleep shutdown_delay + logger.info("Waiting for #{@pollers.size} pollers, #{@workers.size} workers", 'launcher') + end + end + + def shutdown_attempts + Integer(@options[:shutdown_attempts] || DEFAULT_SHUTDOWN_ATTEMPTS) + end + + def shutdown_delay + Integer(@options[:shutdown_delay] || DEFAULT_SHUTDOWN_DELAY) + end + end +end diff --git a/lib/eventboss/listener.rb b/lib/eventboss/listener.rb new file mode 100644 index 0000000..d3aac4a --- /dev/null +++ b/lib/eventboss/listener.rb @@ -0,0 +1,28 @@ +module Eventboss + module Listener + ACTIVE_LISTENERS = {} + + def self.included(base) + base.extend ClassMethods + end + + def jid + @jid ||= SecureRandom.uuid + end + + attr_reader :postponed_by + + def postpone_by(time_in_secs) + @postponed_by = time_in_secs.to_i + end + + module ClassMethods + def eventboss_options(opts) + source_app = opts[:source_app] ? "#{opts[:source_app]}-" : "" + event_name = opts[:event_name] + + ACTIVE_LISTENERS["#{source_app}#{event_name}"] = self + end + end + end +end diff --git a/lib/eventboss/logger.rb b/lib/eventboss/logger.rb new file mode 100644 index 0000000..88cbccd --- /dev/null +++ b/lib/eventboss/logger.rb @@ -0,0 +1,34 @@ +module Eventboss + class Logger + class << self + def logger + Thread.current[:ah_eventboss_logger] ||= ::Logger.new( + STDOUT, + level: Eventboss.configuration.log_level + ) + end + + def info(msg, tag = nil) + return unless logger + logger.info(tagged(msg, tag)) + end + + def debug(msg, tag = nil) + return unless logger + logger.debug(tagged(msg, tag)) + end + + def error(msg, tag = nil) + return unless logger + logger.error(tagged(msg, tag)) + end + + private + + def tagged(msg, tag) + return msg if tag.nil? + msg.prepend("[#{tag}] ") + end + end + end +end diff --git a/lib/eventboss/logging.rb b/lib/eventboss/logging.rb new file mode 100644 index 0000000..9336e8d --- /dev/null +++ b/lib/eventboss/logging.rb @@ -0,0 +1,8 @@ +module Eventboss + # Logging include logging helpers + module Logging + def logger + Eventboss::Logger + end + end +end diff --git a/lib/eventboss/long_poller.rb b/lib/eventboss/long_poller.rb new file mode 100644 index 0000000..236b1b2 --- /dev/null +++ b/lib/eventboss/long_poller.rb @@ -0,0 +1,75 @@ +module Eventboss + # LongPoller fetches messages from SQS using Long Polling + # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html + # It starts one thread per queue (handled by Launcher) + class LongPoller + include Logging + include SafeThread + + TIME_WAIT = 10 + + attr_reader :id, :queue, :listener + + def initialize(launcher, bus, client, queue, listener) + @id = "poller-#{queue.name}" + @launcher = launcher + @bus = bus + @client = client + @queue = queue + @listener = listener + @thread = nil + @stop = false + end + + def start + @thread = safe_thread(id, &method(:run)) + end + + def terminate(wait = false) + @stop = true + return unless @thread + @thread.value if wait + end + + def kill(wait = false) + @stop = true + return unless @thread + @thread.value if wait + + # Force shutdown of poller, in case the loop is stuck + @thread.raise Eventboss::Shutdown + @thread.value if wait + end + + def fetch_and_dispatch + fetch_messages.each do |message| + logger.debug("enqueueing message #{message.message_id}", id) + @bus << UnitOfWork.new(queue, listener, message) + end + end + + def run + fetch_and_dispatch until @stop + @launcher.poller_stopped(self) + rescue Eventboss::Shutdown + @launcher.poller_stopped(self) + rescue StandardError => exception + handle_exception(exception, poller_id: id) + # Give a chance for temporary AWS errors to be resolved + # Sleep guarantees against repeating fast failure errors + sleep TIME_WAIT + @launcher.poller_stopped(self, restart: @stop == false) + end + + private + + def fetch_messages + logger.debug('fetching messages', id) + @client.receive_message( + queue_url: queue.url, + max_number_of_messages: 10, + wait_time_seconds: TIME_WAIT + ).messages + end + end +end diff --git a/lib/eventboss/manager.rb b/lib/eventboss/manager.rb new file mode 100644 index 0000000..1eed0ac --- /dev/null +++ b/lib/eventboss/manager.rb @@ -0,0 +1,116 @@ +module Eventboss + class Manager + MIN_DISPATCH_INTERVAL = 0.1 + + def initialize(fetcher, polling_strategy, executor, queue_listeners, concurrency, error_handlers) + @fetcher = fetcher + @polling_strategy = polling_strategy + @max_processors = concurrency + @busy_processors = Concurrent::AtomicFixnum.new(0) + @executor = executor + @queue_listeners = queue_listeners + @error_handlers = Array(error_handlers) + end + + def start + Eventboss::Logger.debug('Starting dispatch loop...') + + dispatch_loop + end + + private + + def running? + @executor.running? + end + + def dispatch_loop + return unless running? + + Eventboss::Logger.debug('Posting task to executor') + + @executor.post { dispatch } + end + + def dispatch + return unless running? + + if ready <= 0 || (queue = @polling_strategy.next_queue).nil? + return sleep(MIN_DISPATCH_INTERVAL) + end + dispatch_single_messages(queue) + rescue => ex + handle_dispatch_error(ex) + ensure + Eventboss::Logger.debug('Ensuring dispatch loop') + dispatch_loop + end + + def busy + @busy_processors.value + end + + def ready + @max_processors - busy + end + + def processor_done(processor) + Eventboss::Logger.info("Success", processor.jid) + @busy_processors.decrement + end + + def processor_error(processor, exception) + @error_handlers.each { |handler| handler.call(exception, processor) } + @busy_processors.decrement + end + + def assign(queue, sqs_msg) + return unless running? + + @busy_processors.increment + processor = @queue_listeners[queue].new + + Concurrent::Promise.execute(executor: @executor) do + body = JSON.parse(sqs_msg.body) rescue sqs_msg.body + Eventboss::Logger.info("Started", processor.jid) + processor.receive(body) + end.then do + cleanup(processor) + postpone_if_needed(queue, sqs_msg, processor) || delete_from_queue(queue, sqs_msg) + processor_done(processor) + end.rescue do |e| + cleanup(processor) + postpone_if_needed(queue, sqs_msg, processor) + processor_error(processor, e) + end + end + + def cleanup(_processor) + if defined?(ActiveRecord) + ::ActiveRecord::Base.clear_active_connections! + end + end + + def delete_from_queue(queue, sqs_msg) + @fetcher.delete(queue, sqs_msg) + end + + def postpone_if_needed(queue, sqs_msg, processor) + return false unless processor.postponed_by + @fetcher.change_message_visibility(queue, sqs_msg, processor.postponed_by) + rescue => error + Eventboss::Logger.info("Could not postpone message #{error.message}", processor.jid) + end + + def dispatch_single_messages(queue) + messages = @fetcher.fetch(queue, ready) + @polling_strategy.messages_found(queue, messages.size) + messages.each { |message| assign(queue, message) } + end + + def handle_dispatch_error(ex) + Eventboss::Logger.error("Error dispatching #{ex.message}") + Process.kill('USR1', Process.pid) + end + end +end diff --git a/lib/eventboss/polling/basic.rb b/lib/eventboss/polling/basic.rb new file mode 100644 index 0000000..5f3009f --- /dev/null +++ b/lib/eventboss/polling/basic.rb @@ -0,0 +1,68 @@ +module Eventboss + module Polling + class Basic + PAUSE_AFTER_EMPTY = 2 # seconds + + def initialize(queues, timer = Time) + @queues = queues.to_a + @timer = timer + @paused_until = @queues.each_with_object(Hash.new) do |queue, hash| + hash[queue] = @timer.at(0) + end + + reset_next_queue + end + + def next_queue + next_active_queue + end + + def messages_found(queue, messages_count) + if messages_count == 0 + pause(queue) + else + reset_next_queue + end + end + + def active_queues + @queues.reject { |q, _| queue_paused?(q) } + end + + private + + def next_active_queue + reset_next_queue if queues_unpaused_since? + + size = @queues.length + size.times do + queue = @queues[@next_queue_index] + @next_queue_index = (@next_queue_index + 1) % size + return queue unless queue_paused?(queue) + end + + nil + end + + def queues_unpaused_since? + last = @last_unpause_check + now = @last_unpause_check = @timer.now + + last && @paused_until.values.any? { |t| t > last && t <= now } + end + + def reset_next_queue + @next_queue_index = 0 + end + + def queue_paused?(queue) + @paused_until[queue] > @timer.now + end + + def pause(queue) + return unless PAUSE_AFTER_EMPTY > 0 + @paused_until[queue] = @timer.now + PAUSE_AFTER_EMPTY + end + end + end +end diff --git a/lib/eventboss/polling/timed_round_robin.rb b/lib/eventboss/polling/timed_round_robin.rb new file mode 100644 index 0000000..bf3136e --- /dev/null +++ b/lib/eventboss/polling/timed_round_robin.rb @@ -0,0 +1,42 @@ +module Eventboss + module Polling + class TimedRoundRobin + PAUSE_AFTER_EMPTY = 2 # seconds + + def initialize(queues, timer = Time) + @queues = queues.to_a + @timer = timer + @next_queue_index = 0 + @paused_until = @queues.each_with_object(Hash.new) do |queue, hash| + hash[queue] = @timer.at(0) + end + end + + def next_queue + size = @queues.length + size.times do + queue = @queues[@next_queue_index] + @next_queue_index = (@next_queue_index + 1) % size + return queue unless queue_paused?(queue) + end + + nil + end + + def messages_found(queue, messages_count) + pause(queue) if messages_count == 0 + end + + private + + def queue_paused?(queue) + @paused_until[queue] > @timer.now + end + + def pause(queue) + return unless PAUSE_AFTER_EMPTY > 0 + @paused_until[queue] = @timer.now + PAUSE_AFTER_EMPTY + end + end + end +end diff --git a/lib/eventboss/publisher.rb b/lib/eventboss/publisher.rb new file mode 100644 index 0000000..fd4baad --- /dev/null +++ b/lib/eventboss/publisher.rb @@ -0,0 +1,32 @@ +module Eventboss + class Publisher + def initialize(event_name, sns_client, configuration, opts = {}) + @event_name = event_name + @sns_client = sns_client + @configuration = configuration + @generic = opts[:generic] + end + + def publish(payload) + sns_client.publish({ + topic_arn: topic_arn, + message: json_payload(payload) + }) + end + + private + + attr_reader :event_name, :sns_client, :configuration + + def json_payload(payload) + payload.is_a?(String) ? payload : payload.to_json + end + + def topic_arn + src_selector = @generic ? "" : "-#{configuration.eventboss_app_name}" + + "arn:aws:sns:#{configuration.eventboss_region}:#{configuration.eventboss_account_id}:\ +eventboss#{src_selector}-#{event_name}-#{Eventboss.env}" + end + end +end diff --git a/lib/eventboss/queue.rb b/lib/eventboss/queue.rb new file mode 100644 index 0000000..34e62b3 --- /dev/null +++ b/lib/eventboss/queue.rb @@ -0,0 +1,42 @@ +module Eventboss + class Queue + include Comparable + attr_reader :name + + def self.build_name(source:, destination:, event:, env:, generic:) + source = + if generic + '' + else + "-#{source}" + end + + "#{destination}-eventboss#{source}-#{event}-#{env}" + end + + def initialize(name, configuration = Eventboss.configuration) + @client = configuration.sqs_client + @name = name + end + + def url + @url ||= client.get_queue_url(queue_name: name).queue_url + end + + def <=>(another_queue) + name <=> another_queue&.name + end + + def eql?(another_queue) + name == another_queue&.name + end + + def hash + name.hash + end + + private + + attr_reader :client + end +end diff --git a/lib/eventboss/queue_listener.rb b/lib/eventboss/queue_listener.rb new file mode 100644 index 0000000..f8a48f1 --- /dev/null +++ b/lib/eventboss/queue_listener.rb @@ -0,0 +1,11 @@ +module Eventboss + class QueueListener + class << self + def list + Hash[Eventboss::Listener::ACTIVE_LISTENERS.map do |src_app_event, listener| + [Eventboss::Queue.new("#{Eventboss.configuration.eventboss_app_name}#{Eventboss.configuration.sns_sqs_name_infix}#{src_app_event}-#{Eventboss.env}"), listener] + end] + end + end + end +end diff --git a/lib/eventboss/railtie.rb b/lib/eventboss/railtie.rb new file mode 100644 index 0000000..a8a4a0b --- /dev/null +++ b/lib/eventboss/railtie.rb @@ -0,0 +1,5 @@ +class Eventboss::Railtie < Rails::Railtie + rake_tasks do + load 'tasks/eventboss.rake' + end +end diff --git a/lib/eventboss/runner.rb b/lib/eventboss/runner.rb new file mode 100644 index 0000000..407c3ca --- /dev/null +++ b/lib/eventboss/runner.rb @@ -0,0 +1,60 @@ +module Eventboss + class Runner + class << self + def launch + queues = Eventboss::QueueListener.list + client = Eventboss.configuration.sqs_client + config = Eventboss.configuration + + Eventboss::Instrumentation.add(queues) + + launcher = Launcher.new(queues, client, worker_count: config.concurrency) + + self_read, _self_write = IO.pipe + begin + launcher.start + while (_readable_io = IO.select([self_read])) + # handle_signal(readable_io.first[0].gets.strip) + end + rescue Interrupt + launcher.stop + exit 0 + end + end + + def start + configuration = Eventboss.configuration + + queue_listeners = Eventboss::QueueListener.list + Eventboss::Instrumentation.add(queue_listeners) + polling_strategy = configuration.polling_strategy.call(queue_listeners.keys) + + fetcher = Eventboss::Fetcher.new(configuration) + executor = Concurrent.global_io_executor + + manager = Eventboss::Manager.new( + fetcher, + polling_strategy, + executor, + queue_listeners, + configuration.concurrency, + configuration.error_handlers + ) + + manager.start + + self_read, self_write = IO.pipe + begin + while (readable_io = IO.select([self_read])) + signal = readable_io.first[0].gets.strip + # handle_signal(signal) + end + rescue Interrupt + executor.shutdown + executor.wait_for_termination + exit 0 + end + end + end + end +end diff --git a/lib/eventboss/safe_thread.rb b/lib/eventboss/safe_thread.rb new file mode 100644 index 0000000..267919c --- /dev/null +++ b/lib/eventboss/safe_thread.rb @@ -0,0 +1,23 @@ +module Eventboss + # SafeThread includes thread handling with automatic error reporting + module SafeThread + def safe_thread(name) + Thread.new do + begin + Thread.current[:ah_eventboss_label] = name + yield + rescue Exception => exception + handle_exception(exception, name: name) + raise exception + end + end + end + + def handle_exception(exception, context) + context.freeze + Eventboss.configuration.error_handlers.each do |handler| + handler.call(exception, context) + end + end + end +end diff --git a/lib/eventboss/scripts.rb b/lib/eventboss/scripts.rb new file mode 100644 index 0000000..36fa32d --- /dev/null +++ b/lib/eventboss/scripts.rb @@ -0,0 +1 @@ +load 'tasks/eventboss.rake' diff --git a/lib/eventboss/sender.rb b/lib/eventboss/sender.rb new file mode 100644 index 0000000..2caa259 --- /dev/null +++ b/lib/eventboss/sender.rb @@ -0,0 +1,25 @@ +module Eventboss + class Sender + def initialize(client:, queue:) + @client = client + @queue = queue + end + + def send_batch(payload) + client.send_message_batch( + queue_url: queue.url, + entries: Array(build_entries(payload)) + ) + end + + private + + attr_reader :queue, :client + + def build_entries(messages) + messages.map do |message| + { id: SecureRandom.hex, message_body: message.to_json } + end + end + end +end diff --git a/lib/eventboss/sns_client.rb b/lib/eventboss/sns_client.rb new file mode 100644 index 0000000..51b2ac7 --- /dev/null +++ b/lib/eventboss/sns_client.rb @@ -0,0 +1,55 @@ +module Eventboss + class NotConfigured < StandardError; end + + class SnsClient + def initialize(configuration) + @configuration = configuration + end + + def publish(payload) + backend.publish(payload) + end + + private + + attr_reader :configuration + + def backend + if configured? + options = { + region: configuration.eventboss_region, + credentials: ::Aws::Credentials.new( + configuration.aws_access_key_id, + configuration.aws_secret_access_key + ) + } + if configuration.aws_sns_endpoint + options[:endpoint] = configuration.aws_sns_endpoint + end + Aws::SNS::Client.new( + options + ) + elsif configuration.raise_on_missing_configuration + raise NotConfigured, 'Eventboss is not configured' + else + Mock.new + end + end + + def configured? + !!( + configuration.eventboss_region && + configuration.eventboss_account_id && + configuration.eventboss_app_name + ) + end + + class Mock + def publish(_) + Eventboss::Logger.info('Eventboss is not configured. Skipping message publishing!') + return + end + end + end + +end diff --git a/lib/eventboss/unit_of_work.rb b/lib/eventboss/unit_of_work.rb new file mode 100644 index 0000000..0c52322 --- /dev/null +++ b/lib/eventboss/unit_of_work.rb @@ -0,0 +1,33 @@ +module Eventboss + # UnitOfWork handles calls a listener for each message and deletes on success + class UnitOfWork + include Logging + include SafeThread + + attr_accessor :queue, :listener, :message + + def initialize(queue, listener, message) + @queue = queue + @listener = listener + @message = message + end + + def run(client) + logger.debug('Started', @message.message_id) + processor = @listener.new + processor.receive(JSON.parse(@message.body)) + logger.info('Finished', @message.message_id) + rescue StandardError => exception + handle_exception(exception, processor: processor, message_id: @message.message_id) + else + cleanup(client) + end + + def cleanup(client) + client.delete_message( + queue_url: @queue.url, receipt_handle: @message.receipt_handle + ) + logger.debug('Deleting', @message.message_id) + end + end +end diff --git a/lib/eventboss/version.rb b/lib/eventboss/version.rb new file mode 100644 index 0000000..ef0521a --- /dev/null +++ b/lib/eventboss/version.rb @@ -0,0 +1,3 @@ +module Eventboss + VERSION = "1.0.0" +end diff --git a/lib/eventboss/worker.rb b/lib/eventboss/worker.rb new file mode 100644 index 0000000..c0a14c8 --- /dev/null +++ b/lib/eventboss/worker.rb @@ -0,0 +1,55 @@ +module Eventboss + # Worker is part of a pool of workers, handles UnitOfWork lifecycle + class Worker + include Logging + include SafeThread + + attr_reader :id + + def initialize(launcher, id, client, bus) + @id = "worker-#{id}" + @launcher = launcher + @client = client + @bus = bus + @thread = nil + end + + def start + @thread = safe_thread(id, &method(:run)) + end + + def run + while (work = @bus.pop) + work.run(@client) + end + @launcher.worker_stopped(self) + rescue Eventboss::Shutdown + @launcher.worker_stopped(self) + rescue Exception => exception + handle_exception(exception, worker_id: id) + # Restart the worker in case of hard exception + # Message won't be delete from SQS and will be visible later + @launcher.worker_stopped(self, restart: true) + end + + def terminate(wait = false) + stop_token + return unless @thread + @thread.value if wait + end + + def kill(wait = false) + stop_token + return unless @thread + @thread.raise Eventboss::Shutdown + @thread.value if wait + end + + private + + # stops the loop, by enqueuing falsey value + def stop_token + @bus << nil + end + end +end diff --git a/lib/tasks/eventboss.rake b/lib/tasks/eventboss.rake new file mode 100644 index 0000000..d2e3e84 --- /dev/null +++ b/lib/tasks/eventboss.rake @@ -0,0 +1,47 @@ +require 'rake' + +namespace :eventboss do + namespace :deadletter do + desc 'Reload deadletter queue' + task :reload, [:event_name, :source_app, :max_messages] do |_, args| + source_app = args[:source_app] ? "#{args[:source_app]}-" : '' + event_name = args[:event_name] + + # Zero means, fetch all messages + max_messages = args[:max_messages].to_i + + # Ensure we don't fetch more than 10 messages from SQS + batch_size = max_messages == 0 ? 10 : [10, max_messages].min + + abort 'At least event name should be passed as argument' unless event_name + + queue_name = "#{Eventboss.configuration.eventboss_app_name}#{Eventboss.configuration.sns_sqs_name_infix}#{source_app}#{event_name}-#{Eventboss.env}" + queue = Eventboss::Queue.new("#{queue_name}-deadletter") + send_queue = Eventboss::Queue.new(queue_name) + + puts "Reloading deadletter (max: #{ max_messages }, batch: #{ batch_size })" + puts " #{queue.url}" + puts ' to' + puts " #{send_queue.url}" + + fetcher = Eventboss::Fetcher.new(Eventboss.configuration) + client = fetcher.client + total = 0 + loop do + messages = fetcher.fetch(queue, batch_size) + break if messages.count.zero? + + messages.each do |message| + puts "Publishing message: #{message.body}" + client.send_message(queue_url: send_queue.url, message_body: message.body) + fetcher.delete(queue, message) + + total += 1 + break if max_messages > 0 && total >= max_messages + end + + break if max_messages > 0 && total >= max_messages + end + end + end +end diff --git a/spec/eventboss/configuration_spec.rb b/spec/eventboss/configuration_spec.rb new file mode 100644 index 0000000..be76b64 --- /dev/null +++ b/spec/eventboss/configuration_spec.rb @@ -0,0 +1,37 @@ +require 'spec_helper' + +RSpec.describe Eventboss::Configuration do + let(:configuration) { described_class.new } + + describe 'accessors with lazy evaluated defaults' do + subject { configuration } + it 'always returns explicitly set value' do + expect(subject.raise_on_missing_configuration).to eq(false) + subject.raise_on_missing_configuration = nil + expect(subject.raise_on_missing_configuration).to eq(nil) + subject.raise_on_missing_configuration = true + expect(subject.raise_on_missing_configuration).to eq(true) + end + + it 'caches evaluated default' do + expect(subject.sns_client.object_id).to eq(subject.sns_client.object_id) + end + end + + describe '#concurrency' do + subject { configuration.concurrency } + context 'when not set' do + it 'returns default concurrency of 25' do + expect(subject).to eq(25) + end + end + + context 'when in ENV' do + before { ENV['EVENTBUS_CONCURRENCY'] = '10' } + + it 'is taken from ENV' do + expect(subject).to eq(10) + end + end + end +end diff --git a/spec/eventboss/fetcher_spec.rb b/spec/eventboss/fetcher_spec.rb new file mode 100644 index 0000000..a88826b --- /dev/null +++ b/spec/eventboss/fetcher_spec.rb @@ -0,0 +1,43 @@ +require "spec_helper" + +describe Eventboss::Fetcher do + let(:queue) { Eventboss::Queue.new('test', configuration) } + let(:client_mock) { double('client_mock', get_queue_url: double(queue_url: 'url'))} + let(:configuration) do + Eventboss::Configuration.new.tap do |config| + config.sqs_client = client_mock + end + end + + subject { described_class.new(configuration) } + + context '#FETCH_LIMIT' do + it 's set to 10' do + expect(Eventboss::Fetcher::FETCH_LIMIT).to eq(10) + end + end + + context '#fetch' do + context 'when limit higher that 10' do + it 'calls receive client with max no msg eq 10' do + expect(client_mock).to receive(:receive_message).with(queue_url: queue.url, max_number_of_messages: 10).and_return(double(messages: [1, 2, 3])) + expect(subject.fetch(queue, 20)).to eq([1, 2, 3]) + end + end + + context 'when limit smaller than 10' do + it 'calls receive client limit' do + expect(client_mock).to receive(:receive_message).with(queue_url: queue.url, max_number_of_messages: 5).and_return(double(messages: [1, 2, 3])) + expect(subject.fetch(queue, 5)).to eq([1, 2, 3]) + end + end + end + + context '#delete' do + it 'calls delete message' do + message = double(receipt_handle: 'abc') + expect(client_mock).to receive(:delete_message).with(queue_url: queue.url, receipt_handle: 'abc') + subject.delete(queue, message) + end + end +end diff --git a/spec/eventboss/listener_spec.rb b/spec/eventboss/listener_spec.rb new file mode 100644 index 0000000..68ce466 --- /dev/null +++ b/spec/eventboss/listener_spec.rb @@ -0,0 +1,41 @@ +require "spec_helper" + +describe Eventboss::Listener do + class Listener1 + include Eventboss::Listener + eventboss_options source_app: 'app1', event_name: 'transaction_created' + end + + class GenericListener1 + include Eventboss::Listener + eventboss_options event_name: 'transaction_created' + end + + context '#jid' do + it 'creates unique jid for the job' do + expect(Listener1.new.jid).not_to be_nil + expect(Listener1.new.jid).not_to eq(Listener1.new.jid) + end + end + + context '#ACTIVE_LISTENERS' do + it 'adds the class to active listeners hash' do + expect(Eventboss::Listener::ACTIVE_LISTENERS).to eq( + "transaction_created" => GenericListener1, + "app1-transaction_created" => Listener1 + ) + end + end + + context '#postponed_by' do + it 's nil when not called' do + expect(Listener1.new.postponed_by).to be_nil + end + + it 's set to the value passed to postpone_by' do + listener = Listener1.new + listener.postpone_by(60) + expect(listener.postponed_by).to eq(60) + end + end +end diff --git a/spec/eventboss/long_poller_spec.rb b/spec/eventboss/long_poller_spec.rb new file mode 100644 index 0000000..d5c8af9 --- /dev/null +++ b/spec/eventboss/long_poller_spec.rb @@ -0,0 +1,39 @@ +require 'spec_helper' + +describe Eventboss::LongPoller do + subject do + described_class.new(launcher, bus, client, queue, listener) + end + + let(:launcher) { instance_double('Eventboss::Launcher', worker_stopped: true) } + let(:bus) { [] } + let(:client) { double('client') } + let(:queue) { double('queue', name: 'name', url: 'url') } + let(:listener) { double('listener') } + let(:message) { double('message', message_id: 1) } + + before do + allow(client).to receive(:receive_message) do + OpenStruct.new(messages: [message]) + end + end + + describe '#fetch_and_dispatch' do + it 'adds to the bus' do + subject.fetch_and_dispatch + expect(bus.size).to be 1 + end + + it 'bus contains UnitOfWork' do + subject.fetch_and_dispatch + expect(bus).to include(an_instance_of(Eventboss::UnitOfWork)) + end + + it 'calls client with proper attributes' do + expect(client).to receive(:receive_message) + .with(queue_url: 'url', max_number_of_messages: 10, wait_time_seconds: 10) + + subject.fetch_and_dispatch + end + end +end diff --git a/spec/eventboss/manager_spec.rb b/spec/eventboss/manager_spec.rb new file mode 100644 index 0000000..7a26fd8 --- /dev/null +++ b/spec/eventboss/manager_spec.rb @@ -0,0 +1,71 @@ +require "spec_helper" + +describe Eventboss::Manager do + let(:fetcher) { double('fetcher') } + let(:queue) { double('queue') } + let(:sqs_msg) { double('sqs_msg') } + let(:processor) { double('processor', jid: 123, postponed_by: postponed_by) } + subject(:manager) { described_class.new(fetcher, nil, nil, nil, nil, nil) } + + before do + allow(Eventboss::Logger).to receive(:info) + allow(Eventboss::Logger).to receive(:error) + end + + describe 'automated exception handling' do + context 'with airbrake handler' do + before do + class_double('Airbrake', notify: true).as_stubbed_const + end + + let(:processor) { double('processor', jid: 123, postponed_by: nil) } + let(:fetcher) { double('fetcher') } + + subject do + queue_listeners = { queue: double('processor', new: processor) } + executor = Concurrent.global_immediate_executor + concurrency = 1 + + described_class.new(fetcher, nil, executor, queue_listeners, concurrency, [Eventboss::ErrorHandlers::Airbrake.new]).send(:assign, :queue, double('message', body: '{}')) + end + + it 'notifies airbrake' do + err = StandardError.new + expect(processor).to receive(:receive).and_raise(err) + expect(Airbrake).to receive(:notify).with(err) + expect(fetcher).to_not receive(:delete) + + subject + end + end + end + + describe '#postpone_if_needed' do + context 'when postponed' do + let(:postponed_by) { 10 } + + it 'changes message visibility' do + expect(fetcher).to receive(:change_message_visibility).with(queue, sqs_msg, 10) + manager.send(:postpone_if_needed, queue, sqs_msg, processor) + end + end + + context 'it works even if postponing raise error' do + let(:postponed_by) { 10 } + + it 'changes message visibility' do + expect(fetcher).to receive(:change_message_visibility).and_raise('Could not postpone') + manager.send(:postpone_if_needed, queue, sqs_msg, processor) + end + end + + context 'when not postponed' do + let(:postponed_by) { nil } + + it 'does not change message visibility' do + expect(fetcher).not_to receive(:change_message_visibility) + manager.send(:postpone_if_needed, queue, sqs_msg, processor) + end + end + end +end diff --git a/spec/eventboss/polling/basic_spec.rb b/spec/eventboss/polling/basic_spec.rb new file mode 100644 index 0000000..eece2e8 --- /dev/null +++ b/spec/eventboss/polling/basic_spec.rb @@ -0,0 +1,97 @@ +require "spec_helper" + +describe Eventboss::Polling::Basic do + let(:queues) { ['a', 'b', 'c'] } + let(:timer) { FixedTimer.new } + + subject(:basic) { described_class.new(queues) } + + it 'returns next queues one by one' do + expect(basic.next_queue).to eq('a') + expect(basic.next_queue).to eq('b') + expect(basic.next_queue).to eq('c') + expect(basic.next_queue).to eq('a') + end + + it 'skips the queue when paused' do + basic.messages_found('b', 0) + basic.messages_found('c', 1) + expect(basic.next_queue).to eq('a') + expect(basic.next_queue).to eq('c') + expect(basic.next_queue).to eq('a') + end + + it 'returns nil when all skipped' do + basic.messages_found('a', 0) + basic.messages_found('b', 0) + basic.messages_found('c', 0) + expect(basic.next_queue).to be_nil + end + + it 'consuming the same queue if still has messages' do + basic.messages_found('a', 1) + basic.messages_found('b', 1) + basic.messages_found('c', 1) + + expect(basic.next_queue).to eq('a') + basic.messages_found('a', 0) + + expect(basic.next_queue).to eq('b') + basic.messages_found('b', 1) + + expect(basic.next_queue).to eq('b') + end + + it 'pauses for two seconds' do + pool = described_class.new(%w(a b), timer) + + pool.messages_found('a', 1) + pool.messages_found('b', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + # will pause the dispatcher + expect(pool.next_queue).to be_nil + timer.at(timer.now + 1) + + # will pause the dispatcher + expect(pool.next_queue).to be_nil + timer.at(timer.now + 1) + + expect(pool.next_queue).to eq('a') + end + + # This ensures that when there is any paused queues when + # the time has expired, the cycle will start from scratch + it 'restarts work from scratch ' do + pool = described_class.new(queues, timer) + + pool.messages_found('a', 1) + pool.messages_found('b', 1) + pool.messages_found('c', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + # enough time passed, will reset to the beginning + # whereas in TimedRoundRobin, it will go to `c` + timer.at(timer.now + 4) + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + expect(pool.next_queue).to eq('c') + pool.messages_found('c', 0) + + expect(pool.next_queue).to be_nil + end +end diff --git a/spec/eventboss/polling/timed_round_robin_spec.rb b/spec/eventboss/polling/timed_round_robin_spec.rb new file mode 100644 index 0000000..d99b3e5 --- /dev/null +++ b/spec/eventboss/polling/timed_round_robin_spec.rb @@ -0,0 +1,118 @@ +require "spec_helper" + +describe Eventboss::Polling::TimedRoundRobin do + let(:queues) { ['a', 'b', 'c'] } + let(:timer) { FixedTimer.new } + + subject(:pool) { described_class.new(queues) } + + it 'returns next queues one by one' do + expect(pool.next_queue).to eq('a') + expect(pool.next_queue).to eq('b') + expect(pool.next_queue).to eq('c') + expect(pool.next_queue).to eq('a') + end + + it 'skips the queue when paused' do + pool.messages_found('b', 0) + pool.messages_found('c', 1) + expect(pool.next_queue).to eq('a') + expect(pool.next_queue).to eq('c') + expect(pool.next_queue).to eq('a') + end + + it 'returns nil when all skipped' do + pool.messages_found('a', 0) + pool.messages_found('b', 0) + pool.messages_found('c', 0) + expect(pool.next_queue).to be_nil + end + + it 'always goes to the next queue' do + pool.messages_found('a', 1) + pool.messages_found('b', 1) + pool.messages_found('c', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 1) + + expect(pool.next_queue).to eq('c') + end + + it 'skips paused queues' do + pool = described_class.new(queues, timer) + + pool.messages_found('a', 1) + pool.messages_found('b', 1) + pool.messages_found('c', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 1) + + expect(pool.next_queue).to eq('c') + pool.messages_found('c', 1) + + # `a` was paused for 2 seconds, next to pick up is `b` + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + expect(pool.next_queue).to eq('c') + pool.messages_found('c', 0) + + # all queues paused, dispatcher will sleep + expect(pool.next_queue).to be_nil + end + + it 'pauses for two seconds' do + pool = described_class.new(%w(a b), timer) + + pool.messages_found('a', 1) + pool.messages_found('b', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + # will pause the dispatcher + expect(pool.next_queue).to be_nil + timer.at(timer.now + 1) + + # will pause the dispatcher + expect(pool.next_queue).to be_nil + timer.at(timer.now + 1) + + expect(pool.next_queue).to eq('a') + end + + # This ensures that next queue is picked up, even if + # queue `a` was paused for enough time to be next + it 'continues to the next available queue' do + pool = described_class.new(queues, timer) + + pool.messages_found('a', 1) + pool.messages_found('b', 1) + pool.messages_found('c', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 1) + timer.at(timer.now + 1) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + # enough time passed, should continue on the next queue + timer.at(timer.now + 4) + expect(pool.next_queue).to eq('c') + pool.messages_found('c', 0) + + expect(pool.next_queue).to eq('a') + end +end diff --git a/spec/eventboss/publisher_spec.rb b/spec/eventboss/publisher_spec.rb new file mode 100644 index 0000000..dec3aeb --- /dev/null +++ b/spec/eventboss/publisher_spec.rb @@ -0,0 +1,36 @@ +require 'spec_helper' + +RSpec.describe Eventboss::Publisher do + describe '#publish' do + subject { described_class.new('event_name', sns_client, configuration, opts).publish(payload) } + let(:payload) { '{}' } + let(:sns_client) { instance_double(Eventboss::SnsClient, publish: sns_response) } + let(:sns_response) { double } + let(:opts) { {} } + let(:configuration) do + Eventboss::Configuration.new.tap do |c| + c.eventboss_app_name = 'app_name1' + end + end + + it 'publishes to sns with source app name by default' do + expect(sns_client).to receive(:publish).with( + topic_arn: "arn:aws:sns:::eventboss-app_name1-event_name-#{Eventboss.env}", + message: "{}" + ) + expect(subject).to eq(sns_response) + end + + context 'when generic event' do + let(:opts) { { generic: true } } + + it 'publishes to sns without app name' do + expect(sns_client).to receive(:publish).with( + topic_arn: "arn:aws:sns:::eventboss-event_name-#{Eventboss.env}", + message: "{}" + ) + expect(subject).to eq(sns_response) + end + end + end +end diff --git a/spec/eventboss/queue_listener_spec.rb b/spec/eventboss/queue_listener_spec.rb new file mode 100644 index 0000000..5394e9a --- /dev/null +++ b/spec/eventboss/queue_listener_spec.rb @@ -0,0 +1,26 @@ +require "spec_helper" + +describe Eventboss::QueueListener do + before do + Eventboss.configure do |config| + config.sqs_client = double('client') + config.eventboss_app_name = 'app1' + end + ENV['EVENTBUS_ENV'] = 'staging' + end + + context '#list' do + it 'builds a map of queue to listener' do + class Listener1 + include Eventboss::Listener + eventboss_options source_app: 'destapp1', event_name: 'transaction_created' + end + class Listener2 + include Eventboss::Listener + eventboss_options source_app: 'destapp2', event_name: 'file_uploaded' + end + expect(Eventboss::QueueListener.list[Eventboss::Queue.new("app1-eventboss-destapp1-transaction_created-#{Eventboss.env}")]).to eq(Listener1) + expect(Eventboss::QueueListener.list[Eventboss::Queue.new("app1-eventboss-destapp2-file_uploaded-#{Eventboss.env}")]).to eq(Listener2) + end + end +end diff --git a/spec/eventboss/queue_spec.rb b/spec/eventboss/queue_spec.rb new file mode 100644 index 0000000..d61398b --- /dev/null +++ b/spec/eventboss/queue_spec.rb @@ -0,0 +1,75 @@ +require "spec_helper" + +describe Eventboss::Queue do + let(:name) { 'sample_queue_name' } + let(:client_mock) { double('client_mock', get_queue_url: double(queue_url: queue_url))} + let(:queue_url) { double } + let(:configuration) do + Eventboss::Configuration.new.tap do |config| + config.sqs_client = client_mock + end + end + + subject(:queue) { described_class.new(name, configuration) } + + context '#name' do + it 'returns name set in initializer' do + expect(queue.name).to eq(name) + end + end + + context '#url' do + before do + Eventboss.configure do |config| + config.eventboss_region = 'us-east-1' + config.eventboss_account_id = '12345' + end + end + + it 'returns url for the queue' do + expect(queue.url).to eq(queue_url) + end + end + + context '#comparable' do + context 'when equal' do + it 'returns true' do + expect(Eventboss::Queue.new('123').eql?(Eventboss::Queue.new('123'))).to be_truthy + end + end + end + + describe '.build_name' do + let(:source) { 'src' } + let(:destination) { 'dst' } + let(:event) { 'process_resource' } + let(:env) { 'test' } + let(:generic) { false } + + subject do + described_class.build_name( + source: source, + destination: destination, + event: event, + env: env, + generic: generic + ) + end + + it 'returns queue name' do + url = "#{destination}-eventboss-#{source}-#{event}-#{env}" + + expect(subject).to eql(url) + end + + context 'when generic is true' do + let(:generic) { true } + + it 'returns queue name' do + url = "#{destination}-eventboss-#{event}-#{env}" + + expect(subject).to eql(url) + end + end + end +end diff --git a/spec/eventboss/sender_spec.rb b/spec/eventboss/sender_spec.rb new file mode 100644 index 0000000..83ce639 --- /dev/null +++ b/spec/eventboss/sender_spec.rb @@ -0,0 +1,49 @@ +require 'spec_helper' + +RSpec.describe Eventboss::Sender do + describe '#send_batch' do + let(:sender) { described_class.new(queue: queue, client: sqs_client) } + let(:sqs_client) do + instance_double(Aws::SQS::Client, send_message_batch: double) + end + let(:queue_url) { 'sample_queue_url' } + let(:queue) { instance_double(Eventboss::Queue, url: queue_url) } + let(:payload) do + [ + { key: 'val' } + ] + end + + subject { sender.send_batch(payload) } + + it 'sends messages to given queue' do + expect(sqs_client).to receive(:send_message_batch).with( + hash_including( + queue_url: queue_url + ) + ) + + subject + end + + it 'sets id for each message' do + expect(sqs_client).to receive(:send_message_batch).with( + hash_including( + entries: array_including(hash_including(:id)) + ) + ) + + subject + end + + it 'sets message data under message_body key' do + expect(sqs_client).to receive(:send_message_batch).with( + hash_including( + entries: array_including(hash_including(message_body: payload.first.to_json)) + ) + ) + + subject + end + end +end diff --git a/spec/eventboss/sns_client_spec.rb b/spec/eventboss/sns_client_spec.rb new file mode 100644 index 0000000..c9ef106 --- /dev/null +++ b/spec/eventboss/sns_client_spec.rb @@ -0,0 +1,57 @@ +require 'spec_helper' + +RSpec.describe Eventboss::SnsClient do + describe '#publish' do + subject { described_class.new(configuration).publish(payload) } + let(:payload) { '{}' } + + context 'for not configured sns' do + let(:configuration) do + Eventboss::Configuration.new.tap do |config| + config.eventboss_region = nil + config.eventboss_account_id = nil + config.eventboss_app_name = nil + end + end + + it 'logs info' do + expect(Eventboss::Logger).to receive(:info).and_call_original + subject + end + + context 'with raise_on_missing_configuration config' do + let(:configuration) do + Eventboss::Configuration.new.tap do |config| + config.raise_on_missing_configuration = true + config.eventboss_region = nil + config.eventboss_account_id = nil + config.eventboss_app_name = nil + end + end + + it 'raises error' do + expect { subject }.to raise_error(Eventboss::NotConfigured) + end + end + end + + context 'for configured sns' do + let(:configuration) do + Eventboss::Configuration.new.tap do |config| + config.eventboss_region = 'test' + config.eventboss_account_id = 'test' + config.eventboss_app_name = 'test' + end + end + + before do + expect(Aws::SNS::Client).to receive(:new).and_return(instance_double(Aws::SNS::Client, publish: sns_response)) + end + let(:sns_response) { double } + + it 'publishes to sns' do + expect(subject).to eq(sns_response) + end + end + end +end diff --git a/spec/eventboss/unit_of_work_spec.rb b/spec/eventboss/unit_of_work_spec.rb new file mode 100644 index 0000000..ac162f9 --- /dev/null +++ b/spec/eventboss/unit_of_work_spec.rb @@ -0,0 +1,51 @@ +require 'spec_helper' + +describe Eventboss::UnitOfWork do + class Listener + def jid; end + + def receive; end + end + + subject do + described_class.new(queue, Listener, message) + end + + let(:queue) { double('queue', url: 'url') } + let(:client) { double('client') } + let(:message) do + double('message', message_id: 'id', body: '{}', receipt_handle: 'handle') + end + + context 'with sucessful job' do + it 'runs the job' do + expect_any_instance_of(Listener).to receive(:receive).and_return(true) + expect(client) + .to receive(:delete_message).with(queue_url: 'url', receipt_handle: 'handle') + + subject.run(client) + end + end + + context 'with failed job' do + it 'does not cleanup message' do + expect_any_instance_of(Listener).to receive(:receive).and_raise(RuntimeError) + expect(client).not_to receive(:delete_message) + + subject.run(client) + end + end + + context 'with invalid JSON payload' do + let(:message) do + double('message', message_id: 'id', body: '.') + end + + it 'does not run the job' do + expect_any_instance_of(Listener).not_to receive(:receive) + expect(client).not_to receive(:delete_message) + + subject.run(client) + end + end +end diff --git a/spec/eventboss/worker_spec.rb b/spec/eventboss/worker_spec.rb new file mode 100644 index 0000000..b1a06b6 --- /dev/null +++ b/spec/eventboss/worker_spec.rb @@ -0,0 +1,72 @@ +require 'spec_helper' + +describe Eventboss::Worker do + subject do + described_class.new(launcher, queue, client, bus) + end + + Work = Struct.new(:finished) do + def run(*) + self.finished = true + end + end + + FailedWork = Struct.new(:exception) do + def run(*) + raise exception + end + end + + let(:launcher) { instance_double('Eventboss::Launcher', worker_stopped: true) } + let(:queue) { double('queue', url: 'url') } + let(:client) { double('client') } + let(:bus) { [] } + + context 'when work has no errors' do + let(:work) { Work.new } + + before { bus << work } + + it 'runs the job' do + subject.run + expect(work.finished).to be true + end + + it 'stops the launcher' do + expect(launcher).to receive(:worker_stopped).with(subject) + subject.run + end + end + + context 'when work has errors' do + let(:work) { FailedWork.new(error) } + + before { bus << work } + + context 'with exception' do + let(:error) { Exception } + + it 'handles the error' do + expect { subject.run }.not_to raise_error + end + + it 'restarts the worker' do + expect(launcher).to receive(:worker_stopped).with(subject, restart: true) + subject.run + end + end + + context 'on shutdown' do + let(:error) { Eventboss::Shutdown } + + it 'handles the error' do + expect { subject.run }.not_to raise_error + end + + it 'stops the worker' do + expect(launcher).to receive(:worker_stopped).with(subject) + subject.run + end + end + end +end diff --git a/spec/eventboss_spec.rb b/spec/eventboss_spec.rb new file mode 100644 index 0000000..256ef16 --- /dev/null +++ b/spec/eventboss_spec.rb @@ -0,0 +1,37 @@ +require "spec_helper" + +describe Eventboss do + it "has a version number" do + expect(Eventboss::VERSION).not_to be nil + end + + context '#start' do + it 'runs start on runner' do + expect(Eventboss::Runner).to receive(:start) + Eventboss.listen + end + end + + describe '#sender' do + let(:event_name) { 'fake_event' } + let(:destination_app) { 'fake_app' } + let(:configuration) do + Eventboss::Configuration.new.tap do |c| + c.eventboss_app_name = 'app_name1' + c.eventboss_region = 'dummy' + end + end + + subject { described_class.sender(event_name, destination_app) } + + before do + allow(described_class).to receive(:configuration) { configuration } + end + + it 'calls Eventboss::Sender' do + expect(Eventboss::Sender).to receive(:new).with(hash_including(:queue, :client)) + + subject + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..290ebeb --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,12 @@ +$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__) +require "eventboss" + +class FixedTimer + def at(time) + @time = Time.at(time) + end + + def now + @time + end +end