-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathnode-parent-thread-rxjs.js
54 lines (45 loc) · 1.83 KB
/
node-parent-thread-rxjs.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
const Rxjs = require('rxjs');
const RxjsOperators = require('rxjs/operators');
const { Worker } = require('worker_threads');
console.log("\nNode multi-threading demo using worker_threads module in Node 11.7.0\n");
const COMPLETE_SIGNAL = 'COMPLETE';
function runTask(workerData, completedOnTime) {
return Rxjs.Observable.create(observer => {
const worker = new Worker('./node-worker-thread-rxjs.js', { workerData });
worker.on('message', message => observer.next(message));
worker.on('error', error => observer.error(error));
worker.on('exit', code => {
if (code !== 0) {
observer.error(`Worker stopped with exit code ${code}`);
} else {
completedOnTime();
observer.next(COMPLETE_SIGNAL);
observer.complete();
}
});
});
}
const MAX_WAIT_TIME = 3;
const WORKER_TIME = 10;
function main() {
completedOnTime = false;
console.log(`[Main] Starting worker from process ${process.pid}`);
const worker$ = runTask(WORKER_TIME, () => completedOnTime = true);
// receive messages from worker until it completes but only wait for MAX_WAIT_TIME
worker$.pipe(
RxjsOperators.takeWhile(message => message !== COMPLETE_SIGNAL),
RxjsOperators.takeUntil(Rxjs.timer(MAX_WAIT_TIME * 1000))
).subscribe(
result => console.log(`[Main] worker says: ${result}`),
error => console.error(`[Main] worker error: ${error}`),
() => {
if (!completedOnTime) {
console.log(`[Main] worker could not complete its work in the allowed ${MAX_WAIT_TIME}s, exiting Node process`);
process.exit(0);
} else {
console.log(`[Main] worker completed its work in the allowed ${WORKER_TIME}s`);
}
}
);
}
main();