forked from OptimalBits/bull
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimer-manager.js
142 lines (119 loc) · 3.38 KB
/
timer-manager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
'use strict';
const _ = require('lodash');
const uuid = require('uuid');
/**
Timer Manager
Keep track of timers to ensure that disconnect() is
only called (via close()) at a time when it's safe
to do so.
Queues currently use two timers:
- The first one is used for delayed jobs and is
preemptible i.e. it is possible to close a queue
while delayed jobs are still pending (they will
be processed when the queue is resumed). This timer
is cleared by close() and is not managed here.
- The second one is used to lock Redis while
processing jobs. These timers are short-lived,
and there can be more than one active at a
time.
The lock timer executes Redis commands, which
means we can't close queues while it's active i.e.
this won't work:
queue.process(function (job, jobDone) {
handle(job);
queue.disconnect().then(jobDone);
})
The disconnect() call closes the Redis connections; then, when
a queue tries to perform the scheduled Redis commands,
they block until a Redis connection becomes available...
The solution is to close the Redis connections when there are no
active timers i.e. when the queue is idle. This helper class keeps
track of the active timers and executes any queued listeners
whenever that count goes to zero.
Since disconnect() simply can't work if there are active handles,
its close() wrapper postpones closing the Redis connections
until the next idle state. This means that close() can safely
be called from anywhere at any time, even from within a job
handler:
queue.process(function (job, jobDone) {
handle(job);
queue.close();
jobDone();
})
*/
function TimerManager() {
this.idle = true;
this.listeners = [];
this.timers = {};
}
/**
Create a new timer (setTimeout).
Expired timers are automatically cleared
@param {String} name - Name of a timer key. Used only for debugging.
@param {Number} delay - delay of timeout
@param {Function} fn - Function to execute after delay
@returns {Number} id - The timer id. Used to clear the timer
*/
TimerManager.prototype.set = function(name, delay, fn) {
const id = uuid.v4();
const timer = setTimeout(
(timerInstance, timeoutId) => {
timerInstance.clear(timeoutId);
try {
fn();
} catch (err) {
console.error(err);
}
},
delay,
this,
id
);
// XXX only the timer is used, but the
// other fields are useful for
// troubleshooting/debugging
this.timers[id] = {
name,
timer
};
this.idle = false;
return id;
};
/**
Clear a timer (clearTimeout).
Queued listeners are executed if there are no
remaining timers
*/
TimerManager.prototype.clear = function(id) {
const timers = this.timers;
const timer = timers[id];
if (!timer) {
return;
}
clearTimeout(timer.timer);
delete timers[id];
if (!this.idle && _.size(timers) === 0) {
while (this.listeners.length) {
this.listeners.pop()();
}
this.idle = true;
}
};
TimerManager.prototype.clearAll = function() {
_.each(this.timers, (timer, id) => {
this.clear(id);
});
};
/**
* Returns a promise that resolves when there are no active timers.
*/
TimerManager.prototype.whenIdle = function() {
return new Promise(resolve => {
if (this.idle) {
resolve();
} else {
this.listeners.unshift(resolve);
}
});
};
module.exports = TimerManager;