-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsender_rpc.js
58 lines (48 loc) · 1.79 KB
/
sender_rpc.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
const amqplib = require("amqplib");
const uuid = require("uuid");
const {AMQP_URL, RPC_EXCHANGE} = require("./config");
const callServiceRpc = async (service, rpc, args=[], kwargs={}) => {
const connection = await amqplib.connect(AMQP_URL);
const channel = await connection.createChannel();
await channel.assertExchange(RPC_EXCHANGE, "topic", {durable: true});
// create response queue
const routingKey = uuid.v4();
const responseQueue = `rpc.reply-node_rpc_proxy-${routingKey}`;
await channel.assertQueue(responseQueue, {
durable: false,
autoDelete: true,
exclusive: true,
});
await channel.bindQueue(responseQueue, RPC_EXCHANGE, routingKey);
const consumer = await channel.consume(responseQueue, async (msg) => {
if (msg !== null) {
console.log("Result Message.properties :", msg.properties);
console.log("Result Message.fields :", msg.fields);
const params = JSON.parse(msg.content.toString());
console.log("Result Message.content :", params);
channel.ack(msg);
setTimeout(() => {
connection.close()
}, 1000);
}
});
console.log("consumer", consumer);
const content = new Buffer(JSON.stringify({
args,
kwargs,
}));
const correlationId = uuid.v4();
await channel.publish(RPC_EXCHANGE, `${service}.${rpc}`, content, {
contentType: 'application/json',
contentEncoding: 'utf-8',
replyTo: routingKey,
headers: {
'nameko.call_id_stack': [`standalone_rpc_proxy.call.${service}.${rpc}`]
},
correlationId,
});
console.log("Message sent");
};
(async function() {
await callServiceRpc("service_x", "do_something", ["sausage"], {});
})();