Skip to content

Commit

Permalink
Merge pull request #30 from AirHelp/refactor-worker
Browse files Browse the repository at this point in the history
Refactor worker
  • Loading branch information
nglx authored Jan 21, 2020
2 parents 2dbcb19 + 835cdbb commit d615225
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 39 deletions.
2 changes: 1 addition & 1 deletion lib/eventboss/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def worker_count
end

def new_worker(id)
Worker.new(self, id, @client, @bus)
Worker.new(self, id, @bus)
end

def new_poller(queue, listener)
Expand Down
2 changes: 1 addition & 1 deletion lib/eventboss/long_poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def kill(wait = false)
def fetch_and_dispatch
fetch_messages.each do |message|
logger.debug(id) { "enqueueing message #{message.message_id}" }
@bus << UnitOfWork.new(queue, listener, message)
@bus << UnitOfWork.new(@client, queue, listener, message)
end
end

Expand Down
17 changes: 9 additions & 8 deletions lib/eventboss/unit_of_work.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,37 @@ class UnitOfWork

attr_accessor :queue, :listener, :message

def initialize(queue, listener, message)
def initialize(client, queue, listener, message)
@client = client
@queue = queue
@listener = listener
@message = message
@logger = logger
end

def run(client)
def run
logger.debug(@message.message_id) { 'Started' }
processor = @listener.new
processor.receive(JSON.parse(@message.body))
logger.debug(@message.message_id) { 'Finished' }
rescue StandardError => exception
handle_exception(exception, processor: processor, message_id: @message.message_id)
else
cleanup(client) unless processor.postponed_by
cleanup unless processor.postponed_by
ensure
change_message_visibility(client, processor.postponed_by) if processor.postponed_by
change_message_visibility(processor.postponed_by) if processor.postponed_by
end

def change_message_visibility(client, postponed_by)
client.change_message_visibility(
def change_message_visibility(postponed_by)
@client.change_message_visibility(
queue_url: @queue.url,
receipt_handle: @message.receipt_handle,
visibility_timeout: postponed_by
)
end

def cleanup(client)
client.delete_message(
def cleanup
@client.delete_message(
queue_url: @queue.url, receipt_handle: @message.receipt_handle
)
logger.debug(@message.message_id) { 'Deleting' }
Expand Down
8 changes: 4 additions & 4 deletions lib/eventboss/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ class Worker

attr_reader :id

def initialize(launcher, id, client, bus)
def initialize(launcher, id, bus, restart_on: [Exception])
@id = "worker-#{id}"
@launcher = launcher
@client = client
@bus = bus
@thread = nil
@restart_on = restart_on
end

def start
Expand All @@ -20,12 +20,12 @@ def start

def run
while (work = @bus.pop)
work.run(@client)
work.run
end
@launcher.worker_stopped(self)
rescue Eventboss::Shutdown
@launcher.worker_stopped(self)
rescue Exception => exception
rescue *@restart_on => exception
handle_exception(exception, worker_id: id)
# Restart the worker in case of hard exception
# Message won't be delete from SQS and will be visible later
Expand Down
12 changes: 6 additions & 6 deletions spec/eventboss/unit_of_work_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

describe Eventboss::UnitOfWork do
subject do
described_class.new(queue, listener_class, message)
described_class.new(client, queue, listener_class, message)
end

let(:queue) { double('queue', url: 'url') }
Expand All @@ -26,7 +26,7 @@ def receive(payload)
expect(client)
.to receive(:delete_message).with(queue_url: 'url', receipt_handle: 'handle')

subject.run(client)
subject.run
end
end

Expand All @@ -52,7 +52,7 @@ def receive(payload)
visibility_timeout: 100
)

subject.run(client)
subject.run
end
end

Expand All @@ -71,7 +71,7 @@ def receive(payload)
it 'does not cleanup message' do
expect(client).not_to receive(:delete_message)

subject.run(client)
subject.run
end
end

Expand All @@ -98,7 +98,7 @@ def receive(payload)
visibility_timeout: 100
)

subject.run(client)
subject.run
end
end

Expand All @@ -119,7 +119,7 @@ def receive(payload)
it 'does not run the job' do
expect(client).not_to receive(:delete_message)

subject.run(client)
subject.run
end
end
end
34 changes: 15 additions & 19 deletions spec/eventboss/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,49 @@

describe Eventboss::Worker do
subject do
described_class.new(launcher, queue, client, bus)
end

Work = Struct.new(:finished) do
def run(*)
self.finished = true
end
end

FailedWork = Struct.new(:exception) do
def run(*)
raise exception
end
described_class.new(launcher, queue, bus, restart_on: [StandardError])
end

let(:launcher) { instance_double('Eventboss::Launcher', worker_stopped: true) }
let(:queue) { double('queue', url: 'url') }
let(:client) { double('client') }
let(:bus) { [] }

context 'when work has no errors' do
let(:work) { Work.new }
let(:work) do
instance_double('UnitOfWork', queue: nil, listener: nil, message: nil)
end

context 'when work has no errors' do
before { bus << work }

it 'runs the job' do
expect(work).to receive(:run)
subject.run
expect(work.finished).to be true
end

it 'stops the launcher' do
expect(work).to receive(:run)
expect(launcher).to receive(:worker_stopped).with(subject)
subject.run
end
end

context 'when work has errors' do
let(:work) { FailedWork.new(error) }

before { bus << work }

context 'with exception' do
subject do
described_class.new(launcher, queue, bus, restart_on: [Exception])
end

let(:error) { Exception }

it 'handles the error' do
expect(work).to receive(:run) { raise error }
expect { subject.run }.not_to raise_error
end

it 'restarts the worker' do
expect(work).to receive(:run) { raise error }
expect(launcher).to receive(:worker_stopped).with(subject, restart: true)
subject.run
end
Expand All @@ -60,10 +54,12 @@ def run(*)
let(:error) { Eventboss::Shutdown }

it 'handles the error' do
expect(work).to receive(:run) { raise error }
expect { subject.run }.not_to raise_error
end

it 'stops the worker' do
expect(work).to receive(:run) { raise error }
expect(launcher).to receive(:worker_stopped).with(subject)
subject.run
end
Expand Down

0 comments on commit d615225

Please sign in to comment.