diff --git a/README.md b/README.md index 9bb5195..97f49fb 100644 --- a/README.md +++ b/README.md @@ -100,8 +100,8 @@ EVENTBOSS_ENV=env_name # production/staging/test EVENTBOSS_REGION=aws_region # i.e. eu-west-1 EVENTBOSS_CONCURRENCY=10 # default is 25 -AWS_SNS_ENDPOINT=http://localhost:4575 # when using with localstack -AWS_SQS_ENDPOINT=http://localhost:4576 # when using with localstack +AWS_SNS_ENDPOINT=http://localhost:4566 # when using with localstack +AWS_SQS_ENDPOINT=http://localhost:4566 # when using with localstack ``` Use fixed account ID for localstack setup: ``` diff --git a/lib/eventboss/development_mode.rb b/lib/eventboss/development_mode.rb index e9c5d5d..00e76e7 100644 --- a/lib/eventboss/development_mode.rb +++ b/lib/eventboss/development_mode.rb @@ -17,6 +17,9 @@ def setup_infrastructure(queues) logger.info('development-mode') { "Creating queue #{queue.name}..." } sqs_client.create_queue(queue_name: queue.name) + logger.info('development-mode') { "Creating deadletter queue #{queue.name}-deadletter..." } + sqs_client.create_queue(queue_name: "#{queue.name}-deadletter") + logger.info('development-mode') { "Setting up queue #{queue.name} policy..." } policy = queue_policy(queue.arn, topic.topic_arn) sqs_client.set_queue_attributes(queue_url: queue.url, attributes: { Policy: policy.to_json }) diff --git a/lib/tasks/eventboss.rake b/lib/tasks/eventboss.rake index b9d7d54..2f593a7 100644 --- a/lib/tasks/eventboss.rake +++ b/lib/tasks/eventboss.rake @@ -6,6 +6,9 @@ namespace :eventboss do task :reload, [:event_name, :source_app, :max_messages] do |task, args| source_app = args[:source_app] event_name = args[:event_name] + start_time = Time.now + + Eventboss.logger.info "[#{task.name}] Start task, time: #{start_time}" # Zero means: fetch all messages max_messages = args[:max_messages].to_i @@ -17,13 +20,11 @@ namespace :eventboss do queue_name = compose_queue_name(source_app, event_name) - puts "[#{task.name}] Reloading #{queue_name}-deadletter (max: #{ max_messages }, batch: #{ batch_size })" + Eventboss.logger.info "[#{task.name}] Reloading #{queue_name}-deadletter (max: #{ max_messages }, batch: #{ batch_size })" queue = Eventboss::Queue.new("#{queue_name}-deadletter") send_queue = Eventboss::Queue.new(queue_name) - puts "[#{task.name}] #{queue.url}" - puts "[#{task.name}] to" - puts "[#{task.name}] #{send_queue.url}" + Eventboss.logger.info "[#{task.name}] #{queue.url} to #{send_queue.url}" fetcher = Eventboss::Fetcher.new(Eventboss.configuration) client = fetcher.client @@ -42,16 +43,24 @@ namespace :eventboss do break if max_messages > 0 && total >= max_messages end + Eventboss.logger.info <<~HEREDOC + [#{task.name}] Task done + total messages: #{total} + total time: #{Time.now - start_time}s + HEREDOC end desc 'Purge deadletter queue' task :purge, [:event_name, :source_app, :max_messages] do |task, args| source_app = args[:source_app] event_name = args[:event_name] + start_time = Time.now # Zero means: fetch all messages max_messages = args[:max_messages].to_i + Eventboss.logger.info "[#{task.name}] Start task, time: #{start_time}" + # Ensure we don't fetch more than 10 messages from SQS batch_size = max_messages == 0 ? 10 : [10, max_messages].min @@ -59,9 +68,9 @@ namespace :eventboss do queue_name = compose_queue_name(source_app, event_name) - puts "[#{task.name}] Purging #{queue_name}-deadletter (max: #{ max_messages }, batch: #{ batch_size })" + Eventboss.logger.info "[#{task.name}] Purging #{queue_name}-deadletter (max: #{ max_messages }, batch: #{ batch_size })" queue = Eventboss::Queue.new("#{queue_name}-deadletter") - puts "[#{task.name}] #{queue.url}" + Eventboss.logger.info "[#{task.name}] #{queue.url}" fetcher = Eventboss::Fetcher.new(Eventboss.configuration) total = 0 @@ -78,6 +87,12 @@ namespace :eventboss do break if max_messages > 0 && total >= max_messages end + + Eventboss.logger.info <<~HEREDOC + [#{task.name}] Task done + total messages: #{total} + total time: #{Time.now - start_time}s + HEREDOC end def compose_queue_name(source_app, event_name)