-
Notifications
You must be signed in to change notification settings - Fork 1
/
collectorworker.js
126 lines (113 loc) · 4.36 KB
/
collectorworker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/**
* Created by mox on 29/03/16.
* Fatus worker for collect failed process
*
*/
'use strict';
const MODULE_NAME = 'FatusCollectorWorker';
const FatusWorker = require('./worker');
const MessageJob = require('./messagejob');
const moment = require('moment');
const async = require('async');
const assert = require('assert');
const util = require('util');
const retry = require('retry');
/** the worker for the fatus */
class FatusCollectorWorker extends FatusWorker {
/**
* get a new job from the fail queue
* @param th
* @param wfcallback
* @override
*/
fetchNewJob(th, wfcallback) {
th.fetchIteration = th.fetchIteration +1;
if(th.fetchIteration<(th.STACK_PROTECTION_THRSD*2)) {
let NOW = moment();
th.fatus.getQueueTop(function onGet(err, msg) {
if (!err && msg && msg[0] && msg[0].messageText) {
let reservedCondition = msg[0].messageText.reserved && moment(msg[0].messageText.dtReserve).diff(NOW) < th.MAX_RESERVATION_TIME && msg[0].messageText.reserver != th.name;
let failCondition = msg[0].messageText.fail < th.MAX_FAIL_ALLOWED;
if ( failCondition || reservedCondition) {
return th.fetchNewJob(th, wfcallback);
}else{
wfcallback(err, msg);
}
}else{
wfcallback(null,null);
}
});
}else{
wfcallback(null,null);
}
}
/**
* execute a single peek-execute-pop cycle
* @override
*/
single(){
let th = this;
this.iteration = this.iteration+1;
let msgObj,jobObj;
try {
async.waterfall([
// get work from queue
function top(wfcallback) {
console.log(MODULE_NAME + '%s: fetch from queue ', th.name);
//th.fetchNewJob(th, wfcallback);
th.fetchIteration = 0;
th.fetchNewJob(th,wfcallback);
},
// reserve the job
function reserve(msg, wfcallback){
if(msg && msg[0] && msg[0].messageId){
msgObj = msg[0];
console.log( MODULE_NAME + '%s: msg found, reserving %s',th.name,msgObj.messageId);
jobObj = new MessageJob(msgObj);
jobObj.reserve(th, wfcallback);
th.processing = jobObj;
th.processingId = msgObj.messageId;
}else {
console.log( MODULE_NAME + '%s: all queue elements are not processable -retry later- %s',th.name,util.inspect(msgObj));
wfcallback(new Error('queue is empty'),null);
}
},
// pop message to insert into the fail queue
function postExecute(res, wfcallback){
th.popMessageAndInsert(msgObj,th,wfcallback);
}
],
// update if error, else ok
function _onFinish(err,val){
th.processing = null;
th.processingId = null;
th.emit('runcomplete');
// repeat only if stack is not full
if(th.iteration<th.STACK_PROTECTION_THRSD && th.fetchIteration<(th.STACK_PROTECTION_THRSD*2)){
th.run();
}else{
console.log(MODULE_NAME + '%s: stack protection threshold, KILLING WORKER',th.name);
th.fatus.removeWorker(th.name);
}
});
}catch(err){
// sync error, update message
if(msgObj && jobObj) {
jobObj.fails(err);
th.updateMsgOnError(jobObj, msgObj, err, th);
}
}
}
/**
* move message to fail queue
* @param msg to move
* @param th ref to this
* @param callback to call in the end
*/
popMessageAndInsert(msg,th,callback){
msg.reserved = false;
th.fatus.insertInFailQueue(msg,callback);
}
}
/** Exports */
module.exports = FatusCollectorWorker;