-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrpc.rb
49 lines (40 loc) · 1.18 KB
/
rpc.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
require 'amqp'
require 'json'
module Rpc
def self.callbacks
@callbacks ||= {}
end
def self.next_seq!
@seq = (@seq || 0).succ
end
def self.exchange= exchange
@exchange = exchange
end
def self.queue= queue
queue.bind(@exchange, :routing_key => queue.name)
@queue = queue
end
def self.fire method, params={}, &callback
seq = next_seq!
callbacks[seq] = callback
data = {:queue => @queue.name, :method => method, :params => params, :seq => seq}
@exchange.publish data.to_json, :routing_key => 'rpc'
end
end
EM.next_tick do
AMQP::Channel.new do |channel, open_ok|
AMQP::Exchange.new(channel, :topic, 'app-manager') do |exchange|
Rpc.exchange = exchange
AMQP::Queue.new(channel, '', :exclusive => true) do |queue, declare_ok|
Rpc.queue = queue
queue.subscribe do |payload|
json = JSON.parse payload
next unless callback = Rpc.callbacks[json['seq']]
callback.call json['error'], json['result']
Rpc.callbacks.delete json['seq'] unless json['error'] == 'partial'
end
puts '>> RabbitMQ connection ready'
end
end
end
end