From 8a9c7597b1053538ff63a7b66edfa44f6765ed7a Mon Sep 17 00:00:00 2001 From: AirHelp <> Date: Mon, 13 May 2019 16:50:31 +0200 Subject: [PATCH] Initial version MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Marcin Naglik Co-authored-by: Przemysław Wróblewski Co-authored-by: Mateusz Baltyn Co-authored-by: Felipe Philipp Co-authored-by: Wojciech Homa Co-authored-by: Paweł Przeniczny Co-authored-by: Jakub Tokaj Co-authored-by: Maciek Dubiński Co-authored-by: Sebastian Nowak --- .gitignore | 8 ++ .rspec | 2 + Gemfile | 3 + Gemfile.lock | 57 ++++++++ Guardfile | 4 + LICENSE.txt | 21 +++ README.md | 133 +++++++++++++++++- Rakefile | 6 + bin/eventboss | 47 +++++++ eventboss.gemspec | 40 ++++++ lib/eventboss.rb | 79 +++++++++++ lib/eventboss/configuration.rb | 104 ++++++++++++++ lib/eventboss/error_handlers/airbrake.rb | 13 ++ lib/eventboss/error_handlers/logger.rb | 12 ++ lib/eventboss/extensions.rb | 2 + lib/eventboss/fetcher.rb | 33 +++++ lib/eventboss/instrumentation.rb | 26 ++++ lib/eventboss/launcher.rb | 96 +++++++++++++ lib/eventboss/listener.rb | 28 ++++ lib/eventboss/logger.rb | 34 +++++ lib/eventboss/logging.rb | 8 ++ lib/eventboss/long_poller.rb | 75 ++++++++++ lib/eventboss/manager.rb | 116 +++++++++++++++ lib/eventboss/polling/basic.rb | 68 +++++++++ lib/eventboss/polling/timed_round_robin.rb | 42 ++++++ lib/eventboss/publisher.rb | 32 +++++ lib/eventboss/queue.rb | 42 ++++++ lib/eventboss/queue_listener.rb | 11 ++ lib/eventboss/railtie.rb | 5 + lib/eventboss/runner.rb | 60 ++++++++ lib/eventboss/safe_thread.rb | 23 +++ lib/eventboss/scripts.rb | 1 + lib/eventboss/sender.rb | 25 ++++ lib/eventboss/sns_client.rb | 55 ++++++++ lib/eventboss/unit_of_work.rb | 33 +++++ lib/eventboss/version.rb | 3 + lib/eventboss/worker.rb | 55 ++++++++ lib/tasks/eventboss.rake | 47 +++++++ spec/eventboss/configuration_spec.rb | 37 +++++ spec/eventboss/fetcher_spec.rb | 43 ++++++ spec/eventboss/listener_spec.rb | 41 ++++++ spec/eventboss/long_poller_spec.rb | 39 +++++ spec/eventboss/manager_spec.rb | 71 ++++++++++ spec/eventboss/polling/basic_spec.rb | 97 +++++++++++++ .../polling/timed_round_robin_spec.rb | 118 ++++++++++++++++ spec/eventboss/publisher_spec.rb | 36 +++++ spec/eventboss/queue_listener_spec.rb | 26 ++++ spec/eventboss/queue_spec.rb | 75 ++++++++++ spec/eventboss/sender_spec.rb | 49 +++++++ spec/eventboss/sns_client_spec.rb | 57 ++++++++ spec/eventboss/unit_of_work_spec.rb | 51 +++++++ spec/eventboss/worker_spec.rb | 72 ++++++++++ spec/eventboss_spec.rb | 37 +++++ spec/spec_helper.rb | 12 ++ 54 files changed, 2309 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 .rspec create mode 100644 Gemfile create mode 100644 Gemfile.lock create mode 100644 Guardfile create mode 100644 LICENSE.txt create mode 100644 Rakefile create mode 100755 bin/eventboss create mode 100644 eventboss.gemspec create mode 100644 lib/eventboss.rb create mode 100644 lib/eventboss/configuration.rb create mode 100644 lib/eventboss/error_handlers/airbrake.rb create mode 100644 lib/eventboss/error_handlers/logger.rb create mode 100644 lib/eventboss/extensions.rb create mode 100644 lib/eventboss/fetcher.rb create mode 100644 lib/eventboss/instrumentation.rb create mode 100644 lib/eventboss/launcher.rb create mode 100644 lib/eventboss/listener.rb create mode 100644 lib/eventboss/logger.rb create mode 100644 lib/eventboss/logging.rb create mode 100644 lib/eventboss/long_poller.rb create mode 100644 lib/eventboss/manager.rb create mode 100644 lib/eventboss/polling/basic.rb create mode 100644 lib/eventboss/polling/timed_round_robin.rb create mode 100644 lib/eventboss/publisher.rb create mode 100644 lib/eventboss/queue.rb create mode 100644 lib/eventboss/queue_listener.rb create mode 100644 lib/eventboss/railtie.rb create mode 100644 lib/eventboss/runner.rb create mode 100644 lib/eventboss/safe_thread.rb create mode 100644 lib/eventboss/scripts.rb create mode 100644 lib/eventboss/sender.rb create mode 100644 lib/eventboss/sns_client.rb create mode 100644 lib/eventboss/unit_of_work.rb create mode 100644 lib/eventboss/version.rb create mode 100644 lib/eventboss/worker.rb create mode 100644 lib/tasks/eventboss.rake create mode 100644 spec/eventboss/configuration_spec.rb create mode 100644 spec/eventboss/fetcher_spec.rb create mode 100644 spec/eventboss/listener_spec.rb create mode 100644 spec/eventboss/long_poller_spec.rb create mode 100644 spec/eventboss/manager_spec.rb create mode 100644 spec/eventboss/polling/basic_spec.rb create mode 100644 spec/eventboss/polling/timed_round_robin_spec.rb create mode 100644 spec/eventboss/publisher_spec.rb create mode 100644 spec/eventboss/queue_listener_spec.rb create mode 100644 spec/eventboss/queue_spec.rb create mode 100644 spec/eventboss/sender_spec.rb create mode 100644 spec/eventboss/sns_client_spec.rb create mode 100644 spec/eventboss/unit_of_work_spec.rb create mode 100644 spec/eventboss/worker_spec.rb create mode 100644 spec/eventboss_spec.rb create mode 100644 spec/spec_helper.rb diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9106b2a --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/.bundle/ +/.yardoc +/_yardoc/ +/coverage/ +/doc/ +/pkg/ +/spec/reports/ +/tmp/ diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..8c18f1a --- /dev/null +++ b/.rspec @@ -0,0 +1,2 @@ +--format documentation +--color diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..fa75df1 --- /dev/null +++ b/Gemfile @@ -0,0 +1,3 @@ +source 'https://rubygems.org' + +gemspec diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..d31031d --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,57 @@ +PATH + remote: . + specs: + eventboss (1.0.0) + aws-sdk-sns (>= 1.1.0) + aws-sdk-sqs (>= 1.3.0) + concurrent-ruby (~> 1.0, >= 1.0.5) + dotenv (~> 2.1, >= 2.1.1) + +GEM + remote: https://rubygems.org/ + specs: + aws-eventstream (1.0.3) + aws-partitions (1.161.0) + aws-sdk-core (3.51.0) + aws-eventstream (~> 1.0, >= 1.0.2) + aws-partitions (~> 1.0) + aws-sigv4 (~> 1.1) + jmespath (~> 1.0) + aws-sdk-sns (1.13.0) + aws-sdk-core (~> 3, >= 3.48.2) + aws-sigv4 (~> 1.1) + aws-sdk-sqs (1.13.0) + aws-sdk-core (~> 3, >= 3.48.2) + aws-sigv4 (~> 1.1) + aws-sigv4 (1.1.0) + aws-eventstream (~> 1.0, >= 1.0.2) + concurrent-ruby (1.1.5) + diff-lcs (1.3) + dotenv (2.7.2) + jmespath (1.4.0) + rake (12.3.1) + rspec (3.7.0) + rspec-core (~> 3.7.0) + rspec-expectations (~> 3.7.0) + rspec-mocks (~> 3.7.0) + rspec-core (3.7.0) + rspec-support (~> 3.7.0) + rspec-expectations (3.7.0) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.7.0) + rspec-mocks (3.7.0) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.7.0) + rspec-support (3.7.0) + +PLATFORMS + ruby + +DEPENDENCIES + bundler (~> 1.13) + eventboss! + rake (>= 10.0) + rspec (~> 3.0) + +BUNDLED WITH + 1.17.2 diff --git a/Guardfile b/Guardfile new file mode 100644 index 0000000..ca9b1b2 --- /dev/null +++ b/Guardfile @@ -0,0 +1,4 @@ +guard 'rspec', cmd: 'bundle exec rspec --color' do + watch(%r{^lib/(.+)\.rb$}) { |m| "spec/#{m[1]}_spec.rb" } + watch(%r|^spec/(.*)_spec\.rb|) +end diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..afaaef2 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2019 AirHelp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md index f3ce677..64c7f39 100644 --- a/README.md +++ b/README.md @@ -1 +1,132 @@ -# eventboss \ No newline at end of file +# Eventboss + +Eventboss ruby client. + +## Installation + +Add this line to your application's Gemfile: + +```ruby +gem 'eventboss' +``` + +## Usage + +Run the listener by: +``` +bundle exec eventboss +``` +it will read conf values from ENV variables in configuration sections. + +### Broadcasting events: + +```ruby +publisher = Eventboss.publisher(event_name) +publisher.publish(payload) +``` + +### Unicasting events in batches: (via SQS) + +```ruby +sender = Eventboss.sender(event_name, destination_app) +sender.send_batch([payload1, payload2]) +``` + +Receiving events via listeners: + +```ruby +class AnyName + include Eventboss::Listener + eventboss_options source_app: 'src_app_name', event_name: 'my_event' + + def receive(payload) + end +end +``` + + +## Configuration + +By default, no exception will be raised when publisher configuration is missing (`eventboss_region`, +`eventboss_account_id`, `eventboss_app_name`). This can lead to false-positive specs, app not working without exceptions on dev/stg/prod environment. It's strongly advised to set `raise_on_missing_configuration` to true. + +Using `.configure`: + +```ruby +Eventboss.configure do |config| + config.raise_on_missing_configuration = true + config.eventboss_account_id = 1234567 + config.eventboss_app_name = name + config.eventboss_region = aws_region + config.concurrency = 10 + # when using custom clients like localstack + config.sns_client = client # Custom SNS Client can be used, i.e. to use local mock, see: https://github.com/phstc/shoryuken/wiki/Using-a-local-mock-SQS-server + config.sqs_client = Aws::SQS::Client.new(endpoint: 'http://localstack:4576', region: 'us-east-1', credentials: Aws::Credentials.new('fake', 'fake')) +end +``` + +Using ENVs: + +``` +EVENTBUS_ACCOUNT_ID=12345676 +EVENTBUS_APP_NAME=application_name +EVENTBUS_ENV=env_name # production/staging/test +EVENTBUS_REGION=aws_region # i.e. eu-west-1 +EVENTBUS_CONCURRENCY=10 # default is 25 + +AWS_SNS_ENDPOINT=http://localhost:4575 # when using with localstack +AWS_SQS_ENDPOINT=http://localhost:4576 # when using with localstack +``` + +### Logging and error handling +To have more verbose logging, set `log_level` in configuration (default is `info`). + +Logger is used as default error handler. There is Airbrake handler available, to use it ensure you have `airbrake` or `airbrake-ruby` gem and add it to error handlers stack: + +```ruby +Eventboss.configure do |config| + config.error_handlers << Eventboss::ErrorHandlers::Airbrake.new +end +``` + +### Polling strategy + +Default is `Eventboss::Polling::Basic`. See `eventboss/polling/*` for other options. The configuration should be a `lambda` like so: + +```ruby +Eventboss.configure do |config| + config.polling_strategy = lambda { |queues| Eventboss::Polling::TimedRoundRobin.new(queues) } +end +``` + +## Topics & Queues naming convention + +The SNSes should be name in the following pattern: +``` +eventboss-{src_app_name}-{event_name}-{environment} +``` + +i.e. +``` +eventboss-srcapp-transaction_change-staging +``` + +The corresponding SQSes should be name like: +``` +{dest_app_name}-eventboss-{src_app_name}-{event_name}-{environment} +{dest_app_name}-eventboss-{src_app_name}-{event_name}-{environment}-deadletter +``` +i.e. +``` +destapp-eventboss-srcapp-transaction_change-staging +destapp-eventboss-srcapp-transaction_change-staging-deadletter +``` + +## Contributing + +Bug reports and pull requests are welcome on GitHub at https://github.com/AirHelp/eventboss. + + +## License + +The gem is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT). diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..b7e9ed5 --- /dev/null +++ b/Rakefile @@ -0,0 +1,6 @@ +require "bundler/gem_tasks" +require "rspec/core/rake_task" + +RSpec::Core::RakeTask.new(:spec) + +task :default => :spec diff --git a/bin/eventboss b/bin/eventboss new file mode 100755 index 0000000..98961ef --- /dev/null +++ b/bin/eventboss @@ -0,0 +1,47 @@ +#!/usr/bin/env ruby + +require 'rubygems' +require 'dotenv' +require 'eventboss' +require 'optparse' + +Dotenv.load + +STDOUT.sync = true +options = {} + +OptionParser.new do |parser| + parser.on('-r', '--require LIBRARY', 'Require custom app entrypoint') do |lib| + options[:require] = lib + end +end.parse! + +begin + logger = Eventboss::Logger + + require 'rails' + logger.debug('Loading rails...') + if ::Rails::VERSION::MAJOR < 4 + require File.expand_path('config/environment.rb') + else + require File.expand_path('config/application.rb') + require File.expand_path('config/environment.rb') + end + ::Rails.application.eager_load! +rescue LoadError + logger.debug('Seems like not a Rails app') + + if options[:require].nil? + logger.warn('Please use -r to load a custom app entrypoint') + exit(0) + else + logger.debug("Loading #{options[:require]}") + require File.expand_path(options[:require]) + end +end + +logger.info('Starting eventboss...') +logger.info('Active Listeners:') +logger.info(Eventboss::QueueListener.list.to_s) + +Eventboss.launch diff --git a/eventboss.gemspec b/eventboss.gemspec new file mode 100644 index 0000000..e7b2c05 --- /dev/null +++ b/eventboss.gemspec @@ -0,0 +1,40 @@ +# coding: utf-8 +lib = File.expand_path('../lib', __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) +require 'eventboss/version' + +Gem::Specification.new do |spec| + spec.name = "eventboss" + spec.version = Eventboss::VERSION + spec.authors = ["AirHelp"] + spec.email = ["marcin.naglik@airhelp.com"] + + spec.summary = %q{Eventboss Ruby Client.} + spec.description = %q{Eventboss Ruby Client.} + spec.homepage = "https://airhelp.com" + spec.license = "MIT" + + # Prevent pushing this gem to RubyGems.org. To allow pushes either set the 'allowed_push_host' + # to allow pushing to a single host or delete this section to allow pushing to any host. + if spec.respond_to?(:metadata) + spec.metadata['allowed_push_host'] = "TODO: Set to 'http://mygemserver.com'" + else + raise "RubyGems 2.0 or newer is required to protect against " \ + "public gem pushes." + end + + spec.files = `git ls-files -z`.split("\x0").reject do |f| + f.match(%r{^(test|spec|features)/}) + end + spec.executables = ["eventboss"] + spec.require_paths = ["lib"] + + spec.add_dependency "concurrent-ruby", "~> 1.0", ">= 1.0.5" + spec.add_dependency "aws-sdk-sqs", ">= 1.3.0" + spec.add_dependency "aws-sdk-sns", ">= 1.1.0" + spec.add_dependency "dotenv", "~> 2.1", ">= 2.1.1" + + spec.add_development_dependency "bundler", "~> 1.13" + spec.add_development_dependency 'rake', '>= 10.0' + spec.add_development_dependency "rspec", "~> 3.0" +end diff --git a/lib/eventboss.rb b/lib/eventboss.rb new file mode 100644 index 0000000..001cb13 --- /dev/null +++ b/lib/eventboss.rb @@ -0,0 +1,79 @@ +require 'aws-sdk-sqs' +require 'aws-sdk-sns' +require 'concurrent' +require 'securerandom' + +require 'eventboss/version' +require 'eventboss/configuration' +require 'eventboss/instrumentation' +require 'eventboss/sns_client' +require 'eventboss/queue' +require 'eventboss/queue_listener' +require 'eventboss/listener' +require 'eventboss/logging' +require 'eventboss/safe_thread' +require 'eventboss/launcher' +require 'eventboss/long_poller' +require 'eventboss/unit_of_work' +require 'eventboss/worker' +require 'eventboss/fetcher' +require 'eventboss/publisher' +require 'eventboss/sender' +require 'eventboss/manager' +require 'eventboss/runner' +require 'eventboss/logger' +require 'eventboss/polling/basic' +require 'eventboss/polling/timed_round_robin' +require 'eventboss/extensions' + +# For Rails use railtie, for plain Ruby apps use custom scripts loader +if defined?(Rails) + require 'eventboss/railtie' +else + require 'eventboss/scripts' +end + +module Eventboss + Shutdown = Class.new(StandardError) + + class << self + def publisher(event_name, opts = {}) + Publisher.new(event_name, configuration.sns_client, configuration, opts) + end + + def sender(event_name, destination_app, options = {}) + queue_name = Queue.build_name( + destination: destination_app, + source: configuration.eventboss_app_name, + event: event_name, + env: env, + generic: options[:generic] + ) + + Sender.new( + client: configuration.sqs_client, + queue: Queue.new(queue_name) + ) + end + + def listen + Eventboss::Runner.start + end + + def launch + Eventboss::Runner.launch + end + + def env + @env ||= ENV['EVENTBUS_ENV'] || ENV['RAILS_ENV'] || ENV['RACK_ENV'] + end + + def configure + yield configuration if block_given? + end + + def configuration + @_configuration ||= Configuration.new + end + end +end diff --git a/lib/eventboss/configuration.rb b/lib/eventboss/configuration.rb new file mode 100644 index 0000000..e3d621b --- /dev/null +++ b/lib/eventboss/configuration.rb @@ -0,0 +1,104 @@ +module Eventboss + class Configuration + attr_writer :raise_on_missing_configuration, + :error_handlers, + :concurrency, + :log_level, + :sns_client, + :sqs_client, + :eventboss_region, + :eventboss_app_name, + :eventboss_account_id, + :aws_access_key_id, + :aws_secret_access_key, + :polling_strategy, + :aws_sns_endpoint, + :aws_sqs_endpoint, + :sns_sqs_name_infix + + def raise_on_missing_configuration + defined_or_default('raise_on_missing_configuration') { false } + end + + def error_handlers + defined_or_default('error_handlers') { [ErrorHandlers::Logger.new] } + end + + def concurrency + defined_or_default('concurrency') { ENV['EVENTBUS_CONCURRENCY'] ? ENV['EVENTBUS_CONCURRENCY'].to_i : 25 } + end + + def log_level + defined_or_default('log_level') { :info } + end + + def sns_client + defined_or_default('sns_client') { Eventboss::SnsClient.new(self) } + end + + def sqs_client + defined_or_default('sqs_client') do + options = { + region: eventboss_region, + credentials: Aws::Credentials.new( + aws_access_key_id, + aws_secret_access_key + ) + } + if aws_sqs_endpoint + options[:endpoint] = aws_sqs_endpoint + end + + Aws::SQS::Client.new(options) + end + end + + def eventboss_region + defined_or_default('eventboss_region') { ENV['EVENTBUS_REGION'] } + end + + def eventboss_app_name + defined_or_default('eventboss_app_name') { ENV['EVENTBUS_APP_NAME'] } + end + + def eventboss_account_id + defined_or_default('eventboss_account_id') { ENV['EVENTBUS_ACCOUNT_ID'] } + end + + def aws_access_key_id + defined_or_default('aws_access_key_id') { ENV['AWS_ACCESS_KEY_ID'] } + end + + def aws_secret_access_key + defined_or_default('aws_secret_access_key') { ENV['AWS_SECRET_ACCESS_KEY'] } + end + + def aws_sqs_endpoint + defined_or_default('aws_sqs_endpoint') { ENV['AWS_SQS_ENDPOINT'] } + end + + def aws_sns_endpoint + defined_or_default('aws_sns_endpoint') { ENV['AWS_SNS_ENDPOINT'] } + end + + def polling_strategy + defined_or_default('polling_strategy') do + lambda { |queues| Eventboss::Polling::Basic.new(queues) } + end + end + + def sns_sqs_name_infix + defined_or_default('sns_sqs_name_infix') { ENV['EVENTBUS_SQS_SNS_NAME_INFIX'] || '-eventboss-' } + end + + private + + def defined_or_default(variable_name) + if instance_variable_defined?("@#{variable_name}") + instance_variable_get("@#{variable_name}") + else + instance_variable_set("@#{variable_name}", yield) if block_given? + end + end + end +end diff --git a/lib/eventboss/error_handlers/airbrake.rb b/lib/eventboss/error_handlers/airbrake.rb new file mode 100644 index 0000000..4f8fba6 --- /dev/null +++ b/lib/eventboss/error_handlers/airbrake.rb @@ -0,0 +1,13 @@ +module Eventboss + module ErrorHandlers + class Airbrake + def call(exception, context = {}) + ::Airbrake.notify(exception) do |notice| + notice[:context][:component] = 'eventboss' + notice[:context][:action] = context[:processor].class.to_s if context[:processor] + notice[:context].merge!(context) + end + end + end + end +end diff --git a/lib/eventboss/error_handlers/logger.rb b/lib/eventboss/error_handlers/logger.rb new file mode 100644 index 0000000..723cd21 --- /dev/null +++ b/lib/eventboss/error_handlers/logger.rb @@ -0,0 +1,12 @@ +module Eventboss + module ErrorHandlers + class Logger + def call(exception, context = {}) + notice = {}.merge!(context) + notice[:jid] = notice[:processor].jid if notice[:processor] + notice[:processor] = notice[:processor].class.to_s if notice[:processor] + Eventboss::Logger.error("Failure processing request #{exception.message}", notice) + end + end + end +end diff --git a/lib/eventboss/extensions.rb b/lib/eventboss/extensions.rb new file mode 100644 index 0000000..06ed38d --- /dev/null +++ b/lib/eventboss/extensions.rb @@ -0,0 +1,2 @@ +require 'eventboss/error_handlers/logger' +require 'eventboss/error_handlers/airbrake' diff --git a/lib/eventboss/fetcher.rb b/lib/eventboss/fetcher.rb new file mode 100644 index 0000000..fb2cef7 --- /dev/null +++ b/lib/eventboss/fetcher.rb @@ -0,0 +1,33 @@ +module Eventboss + class Fetcher + FETCH_LIMIT = 10 # maximum possible for SQS + + attr_reader :client + + def initialize(configuration) + @client = configuration.sqs_client + end + + def fetch(queue, limit) + @client.receive_message(queue_url: queue.url, max_number_of_messages: max_no_of_messages(limit)).messages + end + + def delete(queue, message) + @client.delete_message(queue_url: queue.url, receipt_handle: message.receipt_handle) + end + + def change_message_visibility(queue, message, visibility_timeout) + @client.change_message_visibility( + queue_url: queue.url, + receipt_handle: message.receipt_handle, + visibility_timeout: visibility_timeout + ) + end + + private + + def max_no_of_messages(limit) + [limit, FETCH_LIMIT].min + end + end +end diff --git a/lib/eventboss/instrumentation.rb b/lib/eventboss/instrumentation.rb new file mode 100644 index 0000000..41d659b --- /dev/null +++ b/lib/eventboss/instrumentation.rb @@ -0,0 +1,26 @@ +module Eventboss + # :nodoc: + module Instrumentation + def self.add(queue_listeners) + return unless defined?(::NewRelic::Agent::Instrumentation::ControllerInstrumentation) + Eventboss::Instrumentation::NewRelic.install(queue_listeners) + end + + # :nodoc: + module NewRelic + def self.install(queue_listeners) + Eventboss::Logger.logger.info('Loaded NewRelic instrumentation') + queue_listeners.each_value do |listener_class| + listener_class.include(::NewRelic::Agent::Instrumentation::ControllerInstrumentation) + listener_class.add_transaction_tracer(:receive, category: 'OtherTransaction/EventbossJob') + end + + Eventboss::Sender.include(::NewRelic::Agent::MethodTracer) + Eventboss::Sender.add_method_tracer(:send_batch, 'Eventboss/sender_send_batch') + + Eventboss::Publisher.include(::NewRelic::Agent::MethodTracer) + Eventboss::Publisher.add_method_tracer(:publish, 'Eventboss/publisher_publish') + end + end + end +end diff --git a/lib/eventboss/launcher.rb b/lib/eventboss/launcher.rb new file mode 100644 index 0000000..ce220ee --- /dev/null +++ b/lib/eventboss/launcher.rb @@ -0,0 +1,96 @@ +module Eventboss + # Launcher manages lifecycle of queues and pollers threads + class Launcher + include Logging + + DEFAULT_SHUTDOWN_ATTEMPTS = 5 + DEFAULT_SHUTDOWN_DELAY = 5 + + def initialize(queues, client, options = {}) + @options = options + @queues = queues + @client = client + + @lock = Mutex.new + @bus = SizedQueue.new(@queues.size * 10) + + @pollers = Set.new + @queues.each { |q, listener| @pollers << new_poller(q, listener) } + + @workers = Set.new + worker_count.times { |id| @workers << new_worker(id) } + end + + def start + logger.info("Starting #{@workers.size} workers, #{@pollers.size} pollers", 'launcher') + @pollers.each(&:start) + @workers.each(&:start) + end + + def stop + logger.info('Gracefully shutdown', 'launcher') + + @bus.clear + @pollers.each(&:terminate) + @workers.each(&:terminate) + + wait_for_shutdown + hard_shutdown + end + + def hard_shutdown + return if @pollers.empty? && @workers.empty? + + logger.info("Killing remaining #{@pollers.size} pollers, #{@workers.size} workers", 'launcher') + @pollers.each(&:kill) + @workers.each(&:kill) + end + + def worker_stopped(worker, restart: false) + @lock.synchronize do + @workers.delete(worker) + @workers << new_worker(worker.id).tap(&:start) if restart + end + logger.debug("Worker #{worker.id} stopped, restart: #{restart}", 'launcher') + end + + def poller_stopped(poller, restart: false) + @lock.synchronize do + @pollers.delete(poller) + @pollers << new_poller(poller.queue, poller.listener).tap(&:start) if restart + end + logger.debug("Poller #{poller.id} stopped, restart: #{restart}", 'launcher') + end + + private + + def worker_count + @options.fetch(:worker_count, [2, Concurrent.processor_count].max) + end + + def new_worker(id) + Worker.new(self, id, @client, @bus) + end + + def new_poller(queue, listener) + LongPoller.new(self, @bus, @client, queue, listener) + end + + def wait_for_shutdown + attempts = 0 + while @pollers.any? || @workers.any? + break if (attempts += 1) > shutdown_attempts + sleep shutdown_delay + logger.info("Waiting for #{@pollers.size} pollers, #{@workers.size} workers", 'launcher') + end + end + + def shutdown_attempts + Integer(@options[:shutdown_attempts] || DEFAULT_SHUTDOWN_ATTEMPTS) + end + + def shutdown_delay + Integer(@options[:shutdown_delay] || DEFAULT_SHUTDOWN_DELAY) + end + end +end diff --git a/lib/eventboss/listener.rb b/lib/eventboss/listener.rb new file mode 100644 index 0000000..d3aac4a --- /dev/null +++ b/lib/eventboss/listener.rb @@ -0,0 +1,28 @@ +module Eventboss + module Listener + ACTIVE_LISTENERS = {} + + def self.included(base) + base.extend ClassMethods + end + + def jid + @jid ||= SecureRandom.uuid + end + + attr_reader :postponed_by + + def postpone_by(time_in_secs) + @postponed_by = time_in_secs.to_i + end + + module ClassMethods + def eventboss_options(opts) + source_app = opts[:source_app] ? "#{opts[:source_app]}-" : "" + event_name = opts[:event_name] + + ACTIVE_LISTENERS["#{source_app}#{event_name}"] = self + end + end + end +end diff --git a/lib/eventboss/logger.rb b/lib/eventboss/logger.rb new file mode 100644 index 0000000..88cbccd --- /dev/null +++ b/lib/eventboss/logger.rb @@ -0,0 +1,34 @@ +module Eventboss + class Logger + class << self + def logger + Thread.current[:ah_eventboss_logger] ||= ::Logger.new( + STDOUT, + level: Eventboss.configuration.log_level + ) + end + + def info(msg, tag = nil) + return unless logger + logger.info(tagged(msg, tag)) + end + + def debug(msg, tag = nil) + return unless logger + logger.debug(tagged(msg, tag)) + end + + def error(msg, tag = nil) + return unless logger + logger.error(tagged(msg, tag)) + end + + private + + def tagged(msg, tag) + return msg if tag.nil? + msg.prepend("[#{tag}] ") + end + end + end +end diff --git a/lib/eventboss/logging.rb b/lib/eventboss/logging.rb new file mode 100644 index 0000000..9336e8d --- /dev/null +++ b/lib/eventboss/logging.rb @@ -0,0 +1,8 @@ +module Eventboss + # Logging include logging helpers + module Logging + def logger + Eventboss::Logger + end + end +end diff --git a/lib/eventboss/long_poller.rb b/lib/eventboss/long_poller.rb new file mode 100644 index 0000000..236b1b2 --- /dev/null +++ b/lib/eventboss/long_poller.rb @@ -0,0 +1,75 @@ +module Eventboss + # LongPoller fetches messages from SQS using Long Polling + # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html + # It starts one thread per queue (handled by Launcher) + class LongPoller + include Logging + include SafeThread + + TIME_WAIT = 10 + + attr_reader :id, :queue, :listener + + def initialize(launcher, bus, client, queue, listener) + @id = "poller-#{queue.name}" + @launcher = launcher + @bus = bus + @client = client + @queue = queue + @listener = listener + @thread = nil + @stop = false + end + + def start + @thread = safe_thread(id, &method(:run)) + end + + def terminate(wait = false) + @stop = true + return unless @thread + @thread.value if wait + end + + def kill(wait = false) + @stop = true + return unless @thread + @thread.value if wait + + # Force shutdown of poller, in case the loop is stuck + @thread.raise Eventboss::Shutdown + @thread.value if wait + end + + def fetch_and_dispatch + fetch_messages.each do |message| + logger.debug("enqueueing message #{message.message_id}", id) + @bus << UnitOfWork.new(queue, listener, message) + end + end + + def run + fetch_and_dispatch until @stop + @launcher.poller_stopped(self) + rescue Eventboss::Shutdown + @launcher.poller_stopped(self) + rescue StandardError => exception + handle_exception(exception, poller_id: id) + # Give a chance for temporary AWS errors to be resolved + # Sleep guarantees against repeating fast failure errors + sleep TIME_WAIT + @launcher.poller_stopped(self, restart: @stop == false) + end + + private + + def fetch_messages + logger.debug('fetching messages', id) + @client.receive_message( + queue_url: queue.url, + max_number_of_messages: 10, + wait_time_seconds: TIME_WAIT + ).messages + end + end +end diff --git a/lib/eventboss/manager.rb b/lib/eventboss/manager.rb new file mode 100644 index 0000000..1eed0ac --- /dev/null +++ b/lib/eventboss/manager.rb @@ -0,0 +1,116 @@ +module Eventboss + class Manager + MIN_DISPATCH_INTERVAL = 0.1 + + def initialize(fetcher, polling_strategy, executor, queue_listeners, concurrency, error_handlers) + @fetcher = fetcher + @polling_strategy = polling_strategy + @max_processors = concurrency + @busy_processors = Concurrent::AtomicFixnum.new(0) + @executor = executor + @queue_listeners = queue_listeners + @error_handlers = Array(error_handlers) + end + + def start + Eventboss::Logger.debug('Starting dispatch loop...') + + dispatch_loop + end + + private + + def running? + @executor.running? + end + + def dispatch_loop + return unless running? + + Eventboss::Logger.debug('Posting task to executor') + + @executor.post { dispatch } + end + + def dispatch + return unless running? + + if ready <= 0 || (queue = @polling_strategy.next_queue).nil? + return sleep(MIN_DISPATCH_INTERVAL) + end + dispatch_single_messages(queue) + rescue => ex + handle_dispatch_error(ex) + ensure + Eventboss::Logger.debug('Ensuring dispatch loop') + dispatch_loop + end + + def busy + @busy_processors.value + end + + def ready + @max_processors - busy + end + + def processor_done(processor) + Eventboss::Logger.info("Success", processor.jid) + @busy_processors.decrement + end + + def processor_error(processor, exception) + @error_handlers.each { |handler| handler.call(exception, processor) } + @busy_processors.decrement + end + + def assign(queue, sqs_msg) + return unless running? + + @busy_processors.increment + processor = @queue_listeners[queue].new + + Concurrent::Promise.execute(executor: @executor) do + body = JSON.parse(sqs_msg.body) rescue sqs_msg.body + Eventboss::Logger.info("Started", processor.jid) + processor.receive(body) + end.then do + cleanup(processor) + postpone_if_needed(queue, sqs_msg, processor) || delete_from_queue(queue, sqs_msg) + processor_done(processor) + end.rescue do |e| + cleanup(processor) + postpone_if_needed(queue, sqs_msg, processor) + processor_error(processor, e) + end + end + + def cleanup(_processor) + if defined?(ActiveRecord) + ::ActiveRecord::Base.clear_active_connections! + end + end + + def delete_from_queue(queue, sqs_msg) + @fetcher.delete(queue, sqs_msg) + end + + def postpone_if_needed(queue, sqs_msg, processor) + return false unless processor.postponed_by + @fetcher.change_message_visibility(queue, sqs_msg, processor.postponed_by) + rescue => error + Eventboss::Logger.info("Could not postpone message #{error.message}", processor.jid) + end + + def dispatch_single_messages(queue) + messages = @fetcher.fetch(queue, ready) + @polling_strategy.messages_found(queue, messages.size) + messages.each { |message| assign(queue, message) } + end + + def handle_dispatch_error(ex) + Eventboss::Logger.error("Error dispatching #{ex.message}") + Process.kill('USR1', Process.pid) + end + end +end diff --git a/lib/eventboss/polling/basic.rb b/lib/eventboss/polling/basic.rb new file mode 100644 index 0000000..5f3009f --- /dev/null +++ b/lib/eventboss/polling/basic.rb @@ -0,0 +1,68 @@ +module Eventboss + module Polling + class Basic + PAUSE_AFTER_EMPTY = 2 # seconds + + def initialize(queues, timer = Time) + @queues = queues.to_a + @timer = timer + @paused_until = @queues.each_with_object(Hash.new) do |queue, hash| + hash[queue] = @timer.at(0) + end + + reset_next_queue + end + + def next_queue + next_active_queue + end + + def messages_found(queue, messages_count) + if messages_count == 0 + pause(queue) + else + reset_next_queue + end + end + + def active_queues + @queues.reject { |q, _| queue_paused?(q) } + end + + private + + def next_active_queue + reset_next_queue if queues_unpaused_since? + + size = @queues.length + size.times do + queue = @queues[@next_queue_index] + @next_queue_index = (@next_queue_index + 1) % size + return queue unless queue_paused?(queue) + end + + nil + end + + def queues_unpaused_since? + last = @last_unpause_check + now = @last_unpause_check = @timer.now + + last && @paused_until.values.any? { |t| t > last && t <= now } + end + + def reset_next_queue + @next_queue_index = 0 + end + + def queue_paused?(queue) + @paused_until[queue] > @timer.now + end + + def pause(queue) + return unless PAUSE_AFTER_EMPTY > 0 + @paused_until[queue] = @timer.now + PAUSE_AFTER_EMPTY + end + end + end +end diff --git a/lib/eventboss/polling/timed_round_robin.rb b/lib/eventboss/polling/timed_round_robin.rb new file mode 100644 index 0000000..bf3136e --- /dev/null +++ b/lib/eventboss/polling/timed_round_robin.rb @@ -0,0 +1,42 @@ +module Eventboss + module Polling + class TimedRoundRobin + PAUSE_AFTER_EMPTY = 2 # seconds + + def initialize(queues, timer = Time) + @queues = queues.to_a + @timer = timer + @next_queue_index = 0 + @paused_until = @queues.each_with_object(Hash.new) do |queue, hash| + hash[queue] = @timer.at(0) + end + end + + def next_queue + size = @queues.length + size.times do + queue = @queues[@next_queue_index] + @next_queue_index = (@next_queue_index + 1) % size + return queue unless queue_paused?(queue) + end + + nil + end + + def messages_found(queue, messages_count) + pause(queue) if messages_count == 0 + end + + private + + def queue_paused?(queue) + @paused_until[queue] > @timer.now + end + + def pause(queue) + return unless PAUSE_AFTER_EMPTY > 0 + @paused_until[queue] = @timer.now + PAUSE_AFTER_EMPTY + end + end + end +end diff --git a/lib/eventboss/publisher.rb b/lib/eventboss/publisher.rb new file mode 100644 index 0000000..fd4baad --- /dev/null +++ b/lib/eventboss/publisher.rb @@ -0,0 +1,32 @@ +module Eventboss + class Publisher + def initialize(event_name, sns_client, configuration, opts = {}) + @event_name = event_name + @sns_client = sns_client + @configuration = configuration + @generic = opts[:generic] + end + + def publish(payload) + sns_client.publish({ + topic_arn: topic_arn, + message: json_payload(payload) + }) + end + + private + + attr_reader :event_name, :sns_client, :configuration + + def json_payload(payload) + payload.is_a?(String) ? payload : payload.to_json + end + + def topic_arn + src_selector = @generic ? "" : "-#{configuration.eventboss_app_name}" + + "arn:aws:sns:#{configuration.eventboss_region}:#{configuration.eventboss_account_id}:\ +eventboss#{src_selector}-#{event_name}-#{Eventboss.env}" + end + end +end diff --git a/lib/eventboss/queue.rb b/lib/eventboss/queue.rb new file mode 100644 index 0000000..34e62b3 --- /dev/null +++ b/lib/eventboss/queue.rb @@ -0,0 +1,42 @@ +module Eventboss + class Queue + include Comparable + attr_reader :name + + def self.build_name(source:, destination:, event:, env:, generic:) + source = + if generic + '' + else + "-#{source}" + end + + "#{destination}-eventboss#{source}-#{event}-#{env}" + end + + def initialize(name, configuration = Eventboss.configuration) + @client = configuration.sqs_client + @name = name + end + + def url + @url ||= client.get_queue_url(queue_name: name).queue_url + end + + def <=>(another_queue) + name <=> another_queue&.name + end + + def eql?(another_queue) + name == another_queue&.name + end + + def hash + name.hash + end + + private + + attr_reader :client + end +end diff --git a/lib/eventboss/queue_listener.rb b/lib/eventboss/queue_listener.rb new file mode 100644 index 0000000..f8a48f1 --- /dev/null +++ b/lib/eventboss/queue_listener.rb @@ -0,0 +1,11 @@ +module Eventboss + class QueueListener + class << self + def list + Hash[Eventboss::Listener::ACTIVE_LISTENERS.map do |src_app_event, listener| + [Eventboss::Queue.new("#{Eventboss.configuration.eventboss_app_name}#{Eventboss.configuration.sns_sqs_name_infix}#{src_app_event}-#{Eventboss.env}"), listener] + end] + end + end + end +end diff --git a/lib/eventboss/railtie.rb b/lib/eventboss/railtie.rb new file mode 100644 index 0000000..a8a4a0b --- /dev/null +++ b/lib/eventboss/railtie.rb @@ -0,0 +1,5 @@ +class Eventboss::Railtie < Rails::Railtie + rake_tasks do + load 'tasks/eventboss.rake' + end +end diff --git a/lib/eventboss/runner.rb b/lib/eventboss/runner.rb new file mode 100644 index 0000000..407c3ca --- /dev/null +++ b/lib/eventboss/runner.rb @@ -0,0 +1,60 @@ +module Eventboss + class Runner + class << self + def launch + queues = Eventboss::QueueListener.list + client = Eventboss.configuration.sqs_client + config = Eventboss.configuration + + Eventboss::Instrumentation.add(queues) + + launcher = Launcher.new(queues, client, worker_count: config.concurrency) + + self_read, _self_write = IO.pipe + begin + launcher.start + while (_readable_io = IO.select([self_read])) + # handle_signal(readable_io.first[0].gets.strip) + end + rescue Interrupt + launcher.stop + exit 0 + end + end + + def start + configuration = Eventboss.configuration + + queue_listeners = Eventboss::QueueListener.list + Eventboss::Instrumentation.add(queue_listeners) + polling_strategy = configuration.polling_strategy.call(queue_listeners.keys) + + fetcher = Eventboss::Fetcher.new(configuration) + executor = Concurrent.global_io_executor + + manager = Eventboss::Manager.new( + fetcher, + polling_strategy, + executor, + queue_listeners, + configuration.concurrency, + configuration.error_handlers + ) + + manager.start + + self_read, self_write = IO.pipe + begin + while (readable_io = IO.select([self_read])) + signal = readable_io.first[0].gets.strip + # handle_signal(signal) + end + rescue Interrupt + executor.shutdown + executor.wait_for_termination + exit 0 + end + end + end + end +end diff --git a/lib/eventboss/safe_thread.rb b/lib/eventboss/safe_thread.rb new file mode 100644 index 0000000..267919c --- /dev/null +++ b/lib/eventboss/safe_thread.rb @@ -0,0 +1,23 @@ +module Eventboss + # SafeThread includes thread handling with automatic error reporting + module SafeThread + def safe_thread(name) + Thread.new do + begin + Thread.current[:ah_eventboss_label] = name + yield + rescue Exception => exception + handle_exception(exception, name: name) + raise exception + end + end + end + + def handle_exception(exception, context) + context.freeze + Eventboss.configuration.error_handlers.each do |handler| + handler.call(exception, context) + end + end + end +end diff --git a/lib/eventboss/scripts.rb b/lib/eventboss/scripts.rb new file mode 100644 index 0000000..36fa32d --- /dev/null +++ b/lib/eventboss/scripts.rb @@ -0,0 +1 @@ +load 'tasks/eventboss.rake' diff --git a/lib/eventboss/sender.rb b/lib/eventboss/sender.rb new file mode 100644 index 0000000..2caa259 --- /dev/null +++ b/lib/eventboss/sender.rb @@ -0,0 +1,25 @@ +module Eventboss + class Sender + def initialize(client:, queue:) + @client = client + @queue = queue + end + + def send_batch(payload) + client.send_message_batch( + queue_url: queue.url, + entries: Array(build_entries(payload)) + ) + end + + private + + attr_reader :queue, :client + + def build_entries(messages) + messages.map do |message| + { id: SecureRandom.hex, message_body: message.to_json } + end + end + end +end diff --git a/lib/eventboss/sns_client.rb b/lib/eventboss/sns_client.rb new file mode 100644 index 0000000..51b2ac7 --- /dev/null +++ b/lib/eventboss/sns_client.rb @@ -0,0 +1,55 @@ +module Eventboss + class NotConfigured < StandardError; end + + class SnsClient + def initialize(configuration) + @configuration = configuration + end + + def publish(payload) + backend.publish(payload) + end + + private + + attr_reader :configuration + + def backend + if configured? + options = { + region: configuration.eventboss_region, + credentials: ::Aws::Credentials.new( + configuration.aws_access_key_id, + configuration.aws_secret_access_key + ) + } + if configuration.aws_sns_endpoint + options[:endpoint] = configuration.aws_sns_endpoint + end + Aws::SNS::Client.new( + options + ) + elsif configuration.raise_on_missing_configuration + raise NotConfigured, 'Eventboss is not configured' + else + Mock.new + end + end + + def configured? + !!( + configuration.eventboss_region && + configuration.eventboss_account_id && + configuration.eventboss_app_name + ) + end + + class Mock + def publish(_) + Eventboss::Logger.info('Eventboss is not configured. Skipping message publishing!') + return + end + end + end + +end diff --git a/lib/eventboss/unit_of_work.rb b/lib/eventboss/unit_of_work.rb new file mode 100644 index 0000000..0c52322 --- /dev/null +++ b/lib/eventboss/unit_of_work.rb @@ -0,0 +1,33 @@ +module Eventboss + # UnitOfWork handles calls a listener for each message and deletes on success + class UnitOfWork + include Logging + include SafeThread + + attr_accessor :queue, :listener, :message + + def initialize(queue, listener, message) + @queue = queue + @listener = listener + @message = message + end + + def run(client) + logger.debug('Started', @message.message_id) + processor = @listener.new + processor.receive(JSON.parse(@message.body)) + logger.info('Finished', @message.message_id) + rescue StandardError => exception + handle_exception(exception, processor: processor, message_id: @message.message_id) + else + cleanup(client) + end + + def cleanup(client) + client.delete_message( + queue_url: @queue.url, receipt_handle: @message.receipt_handle + ) + logger.debug('Deleting', @message.message_id) + end + end +end diff --git a/lib/eventboss/version.rb b/lib/eventboss/version.rb new file mode 100644 index 0000000..ef0521a --- /dev/null +++ b/lib/eventboss/version.rb @@ -0,0 +1,3 @@ +module Eventboss + VERSION = "1.0.0" +end diff --git a/lib/eventboss/worker.rb b/lib/eventboss/worker.rb new file mode 100644 index 0000000..c0a14c8 --- /dev/null +++ b/lib/eventboss/worker.rb @@ -0,0 +1,55 @@ +module Eventboss + # Worker is part of a pool of workers, handles UnitOfWork lifecycle + class Worker + include Logging + include SafeThread + + attr_reader :id + + def initialize(launcher, id, client, bus) + @id = "worker-#{id}" + @launcher = launcher + @client = client + @bus = bus + @thread = nil + end + + def start + @thread = safe_thread(id, &method(:run)) + end + + def run + while (work = @bus.pop) + work.run(@client) + end + @launcher.worker_stopped(self) + rescue Eventboss::Shutdown + @launcher.worker_stopped(self) + rescue Exception => exception + handle_exception(exception, worker_id: id) + # Restart the worker in case of hard exception + # Message won't be delete from SQS and will be visible later + @launcher.worker_stopped(self, restart: true) + end + + def terminate(wait = false) + stop_token + return unless @thread + @thread.value if wait + end + + def kill(wait = false) + stop_token + return unless @thread + @thread.raise Eventboss::Shutdown + @thread.value if wait + end + + private + + # stops the loop, by enqueuing falsey value + def stop_token + @bus << nil + end + end +end diff --git a/lib/tasks/eventboss.rake b/lib/tasks/eventboss.rake new file mode 100644 index 0000000..d2e3e84 --- /dev/null +++ b/lib/tasks/eventboss.rake @@ -0,0 +1,47 @@ +require 'rake' + +namespace :eventboss do + namespace :deadletter do + desc 'Reload deadletter queue' + task :reload, [:event_name, :source_app, :max_messages] do |_, args| + source_app = args[:source_app] ? "#{args[:source_app]}-" : '' + event_name = args[:event_name] + + # Zero means, fetch all messages + max_messages = args[:max_messages].to_i + + # Ensure we don't fetch more than 10 messages from SQS + batch_size = max_messages == 0 ? 10 : [10, max_messages].min + + abort 'At least event name should be passed as argument' unless event_name + + queue_name = "#{Eventboss.configuration.eventboss_app_name}#{Eventboss.configuration.sns_sqs_name_infix}#{source_app}#{event_name}-#{Eventboss.env}" + queue = Eventboss::Queue.new("#{queue_name}-deadletter") + send_queue = Eventboss::Queue.new(queue_name) + + puts "Reloading deadletter (max: #{ max_messages }, batch: #{ batch_size })" + puts " #{queue.url}" + puts ' to' + puts " #{send_queue.url}" + + fetcher = Eventboss::Fetcher.new(Eventboss.configuration) + client = fetcher.client + total = 0 + loop do + messages = fetcher.fetch(queue, batch_size) + break if messages.count.zero? + + messages.each do |message| + puts "Publishing message: #{message.body}" + client.send_message(queue_url: send_queue.url, message_body: message.body) + fetcher.delete(queue, message) + + total += 1 + break if max_messages > 0 && total >= max_messages + end + + break if max_messages > 0 && total >= max_messages + end + end + end +end diff --git a/spec/eventboss/configuration_spec.rb b/spec/eventboss/configuration_spec.rb new file mode 100644 index 0000000..be76b64 --- /dev/null +++ b/spec/eventboss/configuration_spec.rb @@ -0,0 +1,37 @@ +require 'spec_helper' + +RSpec.describe Eventboss::Configuration do + let(:configuration) { described_class.new } + + describe 'accessors with lazy evaluated defaults' do + subject { configuration } + it 'always returns explicitly set value' do + expect(subject.raise_on_missing_configuration).to eq(false) + subject.raise_on_missing_configuration = nil + expect(subject.raise_on_missing_configuration).to eq(nil) + subject.raise_on_missing_configuration = true + expect(subject.raise_on_missing_configuration).to eq(true) + end + + it 'caches evaluated default' do + expect(subject.sns_client.object_id).to eq(subject.sns_client.object_id) + end + end + + describe '#concurrency' do + subject { configuration.concurrency } + context 'when not set' do + it 'returns default concurrency of 25' do + expect(subject).to eq(25) + end + end + + context 'when in ENV' do + before { ENV['EVENTBUS_CONCURRENCY'] = '10' } + + it 'is taken from ENV' do + expect(subject).to eq(10) + end + end + end +end diff --git a/spec/eventboss/fetcher_spec.rb b/spec/eventboss/fetcher_spec.rb new file mode 100644 index 0000000..a88826b --- /dev/null +++ b/spec/eventboss/fetcher_spec.rb @@ -0,0 +1,43 @@ +require "spec_helper" + +describe Eventboss::Fetcher do + let(:queue) { Eventboss::Queue.new('test', configuration) } + let(:client_mock) { double('client_mock', get_queue_url: double(queue_url: 'url'))} + let(:configuration) do + Eventboss::Configuration.new.tap do |config| + config.sqs_client = client_mock + end + end + + subject { described_class.new(configuration) } + + context '#FETCH_LIMIT' do + it 's set to 10' do + expect(Eventboss::Fetcher::FETCH_LIMIT).to eq(10) + end + end + + context '#fetch' do + context 'when limit higher that 10' do + it 'calls receive client with max no msg eq 10' do + expect(client_mock).to receive(:receive_message).with(queue_url: queue.url, max_number_of_messages: 10).and_return(double(messages: [1, 2, 3])) + expect(subject.fetch(queue, 20)).to eq([1, 2, 3]) + end + end + + context 'when limit smaller than 10' do + it 'calls receive client limit' do + expect(client_mock).to receive(:receive_message).with(queue_url: queue.url, max_number_of_messages: 5).and_return(double(messages: [1, 2, 3])) + expect(subject.fetch(queue, 5)).to eq([1, 2, 3]) + end + end + end + + context '#delete' do + it 'calls delete message' do + message = double(receipt_handle: 'abc') + expect(client_mock).to receive(:delete_message).with(queue_url: queue.url, receipt_handle: 'abc') + subject.delete(queue, message) + end + end +end diff --git a/spec/eventboss/listener_spec.rb b/spec/eventboss/listener_spec.rb new file mode 100644 index 0000000..68ce466 --- /dev/null +++ b/spec/eventboss/listener_spec.rb @@ -0,0 +1,41 @@ +require "spec_helper" + +describe Eventboss::Listener do + class Listener1 + include Eventboss::Listener + eventboss_options source_app: 'app1', event_name: 'transaction_created' + end + + class GenericListener1 + include Eventboss::Listener + eventboss_options event_name: 'transaction_created' + end + + context '#jid' do + it 'creates unique jid for the job' do + expect(Listener1.new.jid).not_to be_nil + expect(Listener1.new.jid).not_to eq(Listener1.new.jid) + end + end + + context '#ACTIVE_LISTENERS' do + it 'adds the class to active listeners hash' do + expect(Eventboss::Listener::ACTIVE_LISTENERS).to eq( + "transaction_created" => GenericListener1, + "app1-transaction_created" => Listener1 + ) + end + end + + context '#postponed_by' do + it 's nil when not called' do + expect(Listener1.new.postponed_by).to be_nil + end + + it 's set to the value passed to postpone_by' do + listener = Listener1.new + listener.postpone_by(60) + expect(listener.postponed_by).to eq(60) + end + end +end diff --git a/spec/eventboss/long_poller_spec.rb b/spec/eventboss/long_poller_spec.rb new file mode 100644 index 0000000..d5c8af9 --- /dev/null +++ b/spec/eventboss/long_poller_spec.rb @@ -0,0 +1,39 @@ +require 'spec_helper' + +describe Eventboss::LongPoller do + subject do + described_class.new(launcher, bus, client, queue, listener) + end + + let(:launcher) { instance_double('Eventboss::Launcher', worker_stopped: true) } + let(:bus) { [] } + let(:client) { double('client') } + let(:queue) { double('queue', name: 'name', url: 'url') } + let(:listener) { double('listener') } + let(:message) { double('message', message_id: 1) } + + before do + allow(client).to receive(:receive_message) do + OpenStruct.new(messages: [message]) + end + end + + describe '#fetch_and_dispatch' do + it 'adds to the bus' do + subject.fetch_and_dispatch + expect(bus.size).to be 1 + end + + it 'bus contains UnitOfWork' do + subject.fetch_and_dispatch + expect(bus).to include(an_instance_of(Eventboss::UnitOfWork)) + end + + it 'calls client with proper attributes' do + expect(client).to receive(:receive_message) + .with(queue_url: 'url', max_number_of_messages: 10, wait_time_seconds: 10) + + subject.fetch_and_dispatch + end + end +end diff --git a/spec/eventboss/manager_spec.rb b/spec/eventboss/manager_spec.rb new file mode 100644 index 0000000..7a26fd8 --- /dev/null +++ b/spec/eventboss/manager_spec.rb @@ -0,0 +1,71 @@ +require "spec_helper" + +describe Eventboss::Manager do + let(:fetcher) { double('fetcher') } + let(:queue) { double('queue') } + let(:sqs_msg) { double('sqs_msg') } + let(:processor) { double('processor', jid: 123, postponed_by: postponed_by) } + subject(:manager) { described_class.new(fetcher, nil, nil, nil, nil, nil) } + + before do + allow(Eventboss::Logger).to receive(:info) + allow(Eventboss::Logger).to receive(:error) + end + + describe 'automated exception handling' do + context 'with airbrake handler' do + before do + class_double('Airbrake', notify: true).as_stubbed_const + end + + let(:processor) { double('processor', jid: 123, postponed_by: nil) } + let(:fetcher) { double('fetcher') } + + subject do + queue_listeners = { queue: double('processor', new: processor) } + executor = Concurrent.global_immediate_executor + concurrency = 1 + + described_class.new(fetcher, nil, executor, queue_listeners, concurrency, [Eventboss::ErrorHandlers::Airbrake.new]).send(:assign, :queue, double('message', body: '{}')) + end + + it 'notifies airbrake' do + err = StandardError.new + expect(processor).to receive(:receive).and_raise(err) + expect(Airbrake).to receive(:notify).with(err) + expect(fetcher).to_not receive(:delete) + + subject + end + end + end + + describe '#postpone_if_needed' do + context 'when postponed' do + let(:postponed_by) { 10 } + + it 'changes message visibility' do + expect(fetcher).to receive(:change_message_visibility).with(queue, sqs_msg, 10) + manager.send(:postpone_if_needed, queue, sqs_msg, processor) + end + end + + context 'it works even if postponing raise error' do + let(:postponed_by) { 10 } + + it 'changes message visibility' do + expect(fetcher).to receive(:change_message_visibility).and_raise('Could not postpone') + manager.send(:postpone_if_needed, queue, sqs_msg, processor) + end + end + + context 'when not postponed' do + let(:postponed_by) { nil } + + it 'does not change message visibility' do + expect(fetcher).not_to receive(:change_message_visibility) + manager.send(:postpone_if_needed, queue, sqs_msg, processor) + end + end + end +end diff --git a/spec/eventboss/polling/basic_spec.rb b/spec/eventboss/polling/basic_spec.rb new file mode 100644 index 0000000..eece2e8 --- /dev/null +++ b/spec/eventboss/polling/basic_spec.rb @@ -0,0 +1,97 @@ +require "spec_helper" + +describe Eventboss::Polling::Basic do + let(:queues) { ['a', 'b', 'c'] } + let(:timer) { FixedTimer.new } + + subject(:basic) { described_class.new(queues) } + + it 'returns next queues one by one' do + expect(basic.next_queue).to eq('a') + expect(basic.next_queue).to eq('b') + expect(basic.next_queue).to eq('c') + expect(basic.next_queue).to eq('a') + end + + it 'skips the queue when paused' do + basic.messages_found('b', 0) + basic.messages_found('c', 1) + expect(basic.next_queue).to eq('a') + expect(basic.next_queue).to eq('c') + expect(basic.next_queue).to eq('a') + end + + it 'returns nil when all skipped' do + basic.messages_found('a', 0) + basic.messages_found('b', 0) + basic.messages_found('c', 0) + expect(basic.next_queue).to be_nil + end + + it 'consuming the same queue if still has messages' do + basic.messages_found('a', 1) + basic.messages_found('b', 1) + basic.messages_found('c', 1) + + expect(basic.next_queue).to eq('a') + basic.messages_found('a', 0) + + expect(basic.next_queue).to eq('b') + basic.messages_found('b', 1) + + expect(basic.next_queue).to eq('b') + end + + it 'pauses for two seconds' do + pool = described_class.new(%w(a b), timer) + + pool.messages_found('a', 1) + pool.messages_found('b', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + # will pause the dispatcher + expect(pool.next_queue).to be_nil + timer.at(timer.now + 1) + + # will pause the dispatcher + expect(pool.next_queue).to be_nil + timer.at(timer.now + 1) + + expect(pool.next_queue).to eq('a') + end + + # This ensures that when there is any paused queues when + # the time has expired, the cycle will start from scratch + it 'restarts work from scratch ' do + pool = described_class.new(queues, timer) + + pool.messages_found('a', 1) + pool.messages_found('b', 1) + pool.messages_found('c', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + # enough time passed, will reset to the beginning + # whereas in TimedRoundRobin, it will go to `c` + timer.at(timer.now + 4) + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + expect(pool.next_queue).to eq('c') + pool.messages_found('c', 0) + + expect(pool.next_queue).to be_nil + end +end diff --git a/spec/eventboss/polling/timed_round_robin_spec.rb b/spec/eventboss/polling/timed_round_robin_spec.rb new file mode 100644 index 0000000..d99b3e5 --- /dev/null +++ b/spec/eventboss/polling/timed_round_robin_spec.rb @@ -0,0 +1,118 @@ +require "spec_helper" + +describe Eventboss::Polling::TimedRoundRobin do + let(:queues) { ['a', 'b', 'c'] } + let(:timer) { FixedTimer.new } + + subject(:pool) { described_class.new(queues) } + + it 'returns next queues one by one' do + expect(pool.next_queue).to eq('a') + expect(pool.next_queue).to eq('b') + expect(pool.next_queue).to eq('c') + expect(pool.next_queue).to eq('a') + end + + it 'skips the queue when paused' do + pool.messages_found('b', 0) + pool.messages_found('c', 1) + expect(pool.next_queue).to eq('a') + expect(pool.next_queue).to eq('c') + expect(pool.next_queue).to eq('a') + end + + it 'returns nil when all skipped' do + pool.messages_found('a', 0) + pool.messages_found('b', 0) + pool.messages_found('c', 0) + expect(pool.next_queue).to be_nil + end + + it 'always goes to the next queue' do + pool.messages_found('a', 1) + pool.messages_found('b', 1) + pool.messages_found('c', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 1) + + expect(pool.next_queue).to eq('c') + end + + it 'skips paused queues' do + pool = described_class.new(queues, timer) + + pool.messages_found('a', 1) + pool.messages_found('b', 1) + pool.messages_found('c', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 1) + + expect(pool.next_queue).to eq('c') + pool.messages_found('c', 1) + + # `a` was paused for 2 seconds, next to pick up is `b` + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + expect(pool.next_queue).to eq('c') + pool.messages_found('c', 0) + + # all queues paused, dispatcher will sleep + expect(pool.next_queue).to be_nil + end + + it 'pauses for two seconds' do + pool = described_class.new(%w(a b), timer) + + pool.messages_found('a', 1) + pool.messages_found('b', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 0) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + # will pause the dispatcher + expect(pool.next_queue).to be_nil + timer.at(timer.now + 1) + + # will pause the dispatcher + expect(pool.next_queue).to be_nil + timer.at(timer.now + 1) + + expect(pool.next_queue).to eq('a') + end + + # This ensures that next queue is picked up, even if + # queue `a` was paused for enough time to be next + it 'continues to the next available queue' do + pool = described_class.new(queues, timer) + + pool.messages_found('a', 1) + pool.messages_found('b', 1) + pool.messages_found('c', 1) + + expect(pool.next_queue).to eq('a') + pool.messages_found('a', 1) + timer.at(timer.now + 1) + + expect(pool.next_queue).to eq('b') + pool.messages_found('b', 0) + + # enough time passed, should continue on the next queue + timer.at(timer.now + 4) + expect(pool.next_queue).to eq('c') + pool.messages_found('c', 0) + + expect(pool.next_queue).to eq('a') + end +end diff --git a/spec/eventboss/publisher_spec.rb b/spec/eventboss/publisher_spec.rb new file mode 100644 index 0000000..dec3aeb --- /dev/null +++ b/spec/eventboss/publisher_spec.rb @@ -0,0 +1,36 @@ +require 'spec_helper' + +RSpec.describe Eventboss::Publisher do + describe '#publish' do + subject { described_class.new('event_name', sns_client, configuration, opts).publish(payload) } + let(:payload) { '{}' } + let(:sns_client) { instance_double(Eventboss::SnsClient, publish: sns_response) } + let(:sns_response) { double } + let(:opts) { {} } + let(:configuration) do + Eventboss::Configuration.new.tap do |c| + c.eventboss_app_name = 'app_name1' + end + end + + it 'publishes to sns with source app name by default' do + expect(sns_client).to receive(:publish).with( + topic_arn: "arn:aws:sns:::eventboss-app_name1-event_name-#{Eventboss.env}", + message: "{}" + ) + expect(subject).to eq(sns_response) + end + + context 'when generic event' do + let(:opts) { { generic: true } } + + it 'publishes to sns without app name' do + expect(sns_client).to receive(:publish).with( + topic_arn: "arn:aws:sns:::eventboss-event_name-#{Eventboss.env}", + message: "{}" + ) + expect(subject).to eq(sns_response) + end + end + end +end diff --git a/spec/eventboss/queue_listener_spec.rb b/spec/eventboss/queue_listener_spec.rb new file mode 100644 index 0000000..5394e9a --- /dev/null +++ b/spec/eventboss/queue_listener_spec.rb @@ -0,0 +1,26 @@ +require "spec_helper" + +describe Eventboss::QueueListener do + before do + Eventboss.configure do |config| + config.sqs_client = double('client') + config.eventboss_app_name = 'app1' + end + ENV['EVENTBUS_ENV'] = 'staging' + end + + context '#list' do + it 'builds a map of queue to listener' do + class Listener1 + include Eventboss::Listener + eventboss_options source_app: 'destapp1', event_name: 'transaction_created' + end + class Listener2 + include Eventboss::Listener + eventboss_options source_app: 'destapp2', event_name: 'file_uploaded' + end + expect(Eventboss::QueueListener.list[Eventboss::Queue.new("app1-eventboss-destapp1-transaction_created-#{Eventboss.env}")]).to eq(Listener1) + expect(Eventboss::QueueListener.list[Eventboss::Queue.new("app1-eventboss-destapp2-file_uploaded-#{Eventboss.env}")]).to eq(Listener2) + end + end +end diff --git a/spec/eventboss/queue_spec.rb b/spec/eventboss/queue_spec.rb new file mode 100644 index 0000000..d61398b --- /dev/null +++ b/spec/eventboss/queue_spec.rb @@ -0,0 +1,75 @@ +require "spec_helper" + +describe Eventboss::Queue do + let(:name) { 'sample_queue_name' } + let(:client_mock) { double('client_mock', get_queue_url: double(queue_url: queue_url))} + let(:queue_url) { double } + let(:configuration) do + Eventboss::Configuration.new.tap do |config| + config.sqs_client = client_mock + end + end + + subject(:queue) { described_class.new(name, configuration) } + + context '#name' do + it 'returns name set in initializer' do + expect(queue.name).to eq(name) + end + end + + context '#url' do + before do + Eventboss.configure do |config| + config.eventboss_region = 'us-east-1' + config.eventboss_account_id = '12345' + end + end + + it 'returns url for the queue' do + expect(queue.url).to eq(queue_url) + end + end + + context '#comparable' do + context 'when equal' do + it 'returns true' do + expect(Eventboss::Queue.new('123').eql?(Eventboss::Queue.new('123'))).to be_truthy + end + end + end + + describe '.build_name' do + let(:source) { 'src' } + let(:destination) { 'dst' } + let(:event) { 'process_resource' } + let(:env) { 'test' } + let(:generic) { false } + + subject do + described_class.build_name( + source: source, + destination: destination, + event: event, + env: env, + generic: generic + ) + end + + it 'returns queue name' do + url = "#{destination}-eventboss-#{source}-#{event}-#{env}" + + expect(subject).to eql(url) + end + + context 'when generic is true' do + let(:generic) { true } + + it 'returns queue name' do + url = "#{destination}-eventboss-#{event}-#{env}" + + expect(subject).to eql(url) + end + end + end +end diff --git a/spec/eventboss/sender_spec.rb b/spec/eventboss/sender_spec.rb new file mode 100644 index 0000000..83ce639 --- /dev/null +++ b/spec/eventboss/sender_spec.rb @@ -0,0 +1,49 @@ +require 'spec_helper' + +RSpec.describe Eventboss::Sender do + describe '#send_batch' do + let(:sender) { described_class.new(queue: queue, client: sqs_client) } + let(:sqs_client) do + instance_double(Aws::SQS::Client, send_message_batch: double) + end + let(:queue_url) { 'sample_queue_url' } + let(:queue) { instance_double(Eventboss::Queue, url: queue_url) } + let(:payload) do + [ + { key: 'val' } + ] + end + + subject { sender.send_batch(payload) } + + it 'sends messages to given queue' do + expect(sqs_client).to receive(:send_message_batch).with( + hash_including( + queue_url: queue_url + ) + ) + + subject + end + + it 'sets id for each message' do + expect(sqs_client).to receive(:send_message_batch).with( + hash_including( + entries: array_including(hash_including(:id)) + ) + ) + + subject + end + + it 'sets message data under message_body key' do + expect(sqs_client).to receive(:send_message_batch).with( + hash_including( + entries: array_including(hash_including(message_body: payload.first.to_json)) + ) + ) + + subject + end + end +end diff --git a/spec/eventboss/sns_client_spec.rb b/spec/eventboss/sns_client_spec.rb new file mode 100644 index 0000000..c9ef106 --- /dev/null +++ b/spec/eventboss/sns_client_spec.rb @@ -0,0 +1,57 @@ +require 'spec_helper' + +RSpec.describe Eventboss::SnsClient do + describe '#publish' do + subject { described_class.new(configuration).publish(payload) } + let(:payload) { '{}' } + + context 'for not configured sns' do + let(:configuration) do + Eventboss::Configuration.new.tap do |config| + config.eventboss_region = nil + config.eventboss_account_id = nil + config.eventboss_app_name = nil + end + end + + it 'logs info' do + expect(Eventboss::Logger).to receive(:info).and_call_original + subject + end + + context 'with raise_on_missing_configuration config' do + let(:configuration) do + Eventboss::Configuration.new.tap do |config| + config.raise_on_missing_configuration = true + config.eventboss_region = nil + config.eventboss_account_id = nil + config.eventboss_app_name = nil + end + end + + it 'raises error' do + expect { subject }.to raise_error(Eventboss::NotConfigured) + end + end + end + + context 'for configured sns' do + let(:configuration) do + Eventboss::Configuration.new.tap do |config| + config.eventboss_region = 'test' + config.eventboss_account_id = 'test' + config.eventboss_app_name = 'test' + end + end + + before do + expect(Aws::SNS::Client).to receive(:new).and_return(instance_double(Aws::SNS::Client, publish: sns_response)) + end + let(:sns_response) { double } + + it 'publishes to sns' do + expect(subject).to eq(sns_response) + end + end + end +end diff --git a/spec/eventboss/unit_of_work_spec.rb b/spec/eventboss/unit_of_work_spec.rb new file mode 100644 index 0000000..ac162f9 --- /dev/null +++ b/spec/eventboss/unit_of_work_spec.rb @@ -0,0 +1,51 @@ +require 'spec_helper' + +describe Eventboss::UnitOfWork do + class Listener + def jid; end + + def receive; end + end + + subject do + described_class.new(queue, Listener, message) + end + + let(:queue) { double('queue', url: 'url') } + let(:client) { double('client') } + let(:message) do + double('message', message_id: 'id', body: '{}', receipt_handle: 'handle') + end + + context 'with sucessful job' do + it 'runs the job' do + expect_any_instance_of(Listener).to receive(:receive).and_return(true) + expect(client) + .to receive(:delete_message).with(queue_url: 'url', receipt_handle: 'handle') + + subject.run(client) + end + end + + context 'with failed job' do + it 'does not cleanup message' do + expect_any_instance_of(Listener).to receive(:receive).and_raise(RuntimeError) + expect(client).not_to receive(:delete_message) + + subject.run(client) + end + end + + context 'with invalid JSON payload' do + let(:message) do + double('message', message_id: 'id', body: '.') + end + + it 'does not run the job' do + expect_any_instance_of(Listener).not_to receive(:receive) + expect(client).not_to receive(:delete_message) + + subject.run(client) + end + end +end diff --git a/spec/eventboss/worker_spec.rb b/spec/eventboss/worker_spec.rb new file mode 100644 index 0000000..b1a06b6 --- /dev/null +++ b/spec/eventboss/worker_spec.rb @@ -0,0 +1,72 @@ +require 'spec_helper' + +describe Eventboss::Worker do + subject do + described_class.new(launcher, queue, client, bus) + end + + Work = Struct.new(:finished) do + def run(*) + self.finished = true + end + end + + FailedWork = Struct.new(:exception) do + def run(*) + raise exception + end + end + + let(:launcher) { instance_double('Eventboss::Launcher', worker_stopped: true) } + let(:queue) { double('queue', url: 'url') } + let(:client) { double('client') } + let(:bus) { [] } + + context 'when work has no errors' do + let(:work) { Work.new } + + before { bus << work } + + it 'runs the job' do + subject.run + expect(work.finished).to be true + end + + it 'stops the launcher' do + expect(launcher).to receive(:worker_stopped).with(subject) + subject.run + end + end + + context 'when work has errors' do + let(:work) { FailedWork.new(error) } + + before { bus << work } + + context 'with exception' do + let(:error) { Exception } + + it 'handles the error' do + expect { subject.run }.not_to raise_error + end + + it 'restarts the worker' do + expect(launcher).to receive(:worker_stopped).with(subject, restart: true) + subject.run + end + end + + context 'on shutdown' do + let(:error) { Eventboss::Shutdown } + + it 'handles the error' do + expect { subject.run }.not_to raise_error + end + + it 'stops the worker' do + expect(launcher).to receive(:worker_stopped).with(subject) + subject.run + end + end + end +end diff --git a/spec/eventboss_spec.rb b/spec/eventboss_spec.rb new file mode 100644 index 0000000..256ef16 --- /dev/null +++ b/spec/eventboss_spec.rb @@ -0,0 +1,37 @@ +require "spec_helper" + +describe Eventboss do + it "has a version number" do + expect(Eventboss::VERSION).not_to be nil + end + + context '#start' do + it 'runs start on runner' do + expect(Eventboss::Runner).to receive(:start) + Eventboss.listen + end + end + + describe '#sender' do + let(:event_name) { 'fake_event' } + let(:destination_app) { 'fake_app' } + let(:configuration) do + Eventboss::Configuration.new.tap do |c| + c.eventboss_app_name = 'app_name1' + c.eventboss_region = 'dummy' + end + end + + subject { described_class.sender(event_name, destination_app) } + + before do + allow(described_class).to receive(:configuration) { configuration } + end + + it 'calls Eventboss::Sender' do + expect(Eventboss::Sender).to receive(:new).with(hash_including(:queue, :client)) + + subject + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..290ebeb --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,12 @@ +$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__) +require "eventboss" + +class FixedTimer + def at(time) + @time = Time.at(time) + end + + def now + @time + end +end