-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer.rb
52 lines (43 loc) · 1.08 KB
/
consumer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
require 'celluloid'
require 'celluloid/io'
require 'celluloid/autostart'
require 'bunny'
class Worker
include Celluloid::IO
finalizer :shutdown
def initialize(conn)
@ch = conn.create_channel
q = @ch.queue('task_queue', :durable => true)
q.subscribe manual_ack: true, &method(:on_event)
p 'worker accepting connections'
end
def on_event(delivery_info, properties, body)
p "#{Time.now.strftime('%FT %T.%L')} started work: #{body}"
sleep(10)
p "#{Time.now.strftime('%FT %T.%L')} completed work: #{body}"
@ch.ack(delivery_info.delivery_tag)
end
def shutdown
p 'trying to shutdown...'
@ch.close
p 'hey shutdown'
end
end
class Consumer
def initialize(options = {})
@size = (ENV['POOL'] || 2).to_i
@connection = Bunny.new(options)
@connection.start
@workers = @size.times.map{ Worker.new(@connection) }
end
def close_connection
futures = @workers.map { |w| w.future(:finalize) }
@connection.close if futures.all?
end
end
begin
@consumer = Consumer.new
rescue Interrupt => _
@consumer.close_connection
end
sleep