forked from OriginTrail/edge-node-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.js
50 lines (44 loc) · 1.44 KB
/
queue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
const { Queue, Worker, QueueScheduler } = require('bullmq');
const redis = require('ioredis');
const publishService = require('./services/publishService.js');
require('./queue-metrics');
// Create the Redis connection
const connection = new redis({
maxRetriesPerRequest: null
});
const queues = {};
async function createAssetJob(wallet, assetContent) {
if (!queues[wallet]) {
// Initialize queue for the wallet
queues[wallet] = new Queue(`wallet-jobs-${wallet}`, { connection });
// Create a worker for this wallet's queue
new Worker(
`wallet-jobs-${wallet}`,
async (job) => {
const { wallet } = job.data;
await createKnowledgeAsset(wallet, assetContent);
},
{ connection, concurrency: 1 } // Ensure one job per wallet at a time
);
}
//add job to the queue
await queues[wallet].add(
`wallet-job-${wallet}`, // Unique job name per wallet
{ wallet },
{
removeOnComplete: true // Clean up after job is processed
}
);
}
async function createKnowledgeAsset(wallet, assetContent) {
console.log(`Processing Asset create job for wallet ${wallet}:`);
let result = await publishService.createAsset(
'internal',
assetContent,
wallet
);
console.log(`Finished processing Asset create job for wallet ${wallet}`);
}
module.exports = {
createAssetJob
};