Skip to content

Commit

Permalink
Merge pull request jestjs#540 from facebook/worker-farm
Browse files Browse the repository at this point in the history
Replace node-worker-pool with node-worker-farm
  • Loading branch information
amasad committed Oct 7, 2015
2 parents a07a5a7 + 78ba6e6 commit 50fe34e
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 225 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
"lodash.template": "^3.6.2",
"mkdirp": "^0.5.1",
"node-haste": "^1.2.8",
"node-worker-pool": "^3.0.2",
"object-assign": "^4.0.1",
"optimist": "^0.6.1",
"resolve": "^1.1.6",
"sane": "^1.2.0",
"through": "^2.3.8",
"which": "^1.1.1"
"which": "^1.1.1",
"worker-farm": "^1.3.1"
},
"devDependencies": {
"jshint": "^2.8.0",
Expand Down
110 changes: 14 additions & 96 deletions src/Console.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,106 +27,24 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

/*jshint strict:false*/
'use strict';

var util = require('util');

function Console(messageQueue) {
if (!(this instanceof Console)) {
return new Console(messageQueue);
var Console = require('console').Console;
var colors = require('./lib/colors');

class CustomConsole extends Console {
warn() {
return super.warn(
colors.colorize(util.format.apply(this, arguments), colors.YELLOW)
);
}

Object.defineProperty(this, '_messageQueue', {
value: messageQueue,
writable: true,
enumerable: false,
configurable: true
});

Object.defineProperty(this, '_times', {
value: {},
writable: true,
enumerable: false,
configurable: true
});

// bind the prototype functions to this Console instance
var keys = Object.keys(Console.prototype);
for (var v = 0; v < keys.length; v++) {
var k = keys[v];
this[k] = this[k].bind(this);
error() {
return super.error(
colors.colorize(util.format.apply(this, arguments), colors.RED)
);
}
}

Console.prototype.log = function() {
this._messageQueue.push({
type: 'log',
data: util.format.apply(this, arguments) + '\n'
});
};


Console.prototype.info = Console.prototype.log;


Console.prototype.warn = function() {
this._messageQueue.push({
type: 'warn',
data: util.format.apply(this, arguments) + '\n'
});
};


Console.prototype.error = function() {
this._messageQueue.push({
type: 'error',
data: util.format.apply(this, arguments) + '\n'
});
};


Console.prototype.dir = function(object, options) {
this._messageQueue.push({
type: 'dir',
data: util.inspect(object, util._extend({
customInspect: false
}, options)) + '\n'
});
};


Console.prototype.time = function(label) {
this._times[label] = Date.now();
};


Console.prototype.timeEnd = function(label) {
var time = this._times[label];
if (!time) {
throw new Error('No such label: ' + label);
}
var duration = Date.now() - time;
this.log('%s: %dms', label, duration);
};


Console.prototype.trace = function() {
// TODO probably can to do this better with V8's debug object once that is
// exposed.
var err = new Error();
err.name = 'Trace';
err.message = util.format.apply(this, arguments);
/*jshint noarg:false*/
Error.captureStackTrace(err, arguments.callee);
this.error(err.stack);
};


Console.prototype.assert = function(expression) {
if (!expression) {
var arr = Array.prototype.slice.call(arguments, 1);
require('assert').ok(false, util.format.apply(this, arr));
}
};

module.exports = Console;
module.exports = CustomConsole;
23 changes: 0 additions & 23 deletions src/DefaultTestReporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ function(config, testResult, aggregatedResults) {
this.verboseLog(testResult.testResults, resultHeader);
}

testResult.logMessages.forEach(this._printConsoleMessage.bind(this));

if (!allTestsPassed) {
var failureMessage = formatFailureMessage(testResult, {
rootPath: config.rootDir,
Expand Down Expand Up @@ -139,27 +137,6 @@ function (config, aggregatedResults) {
this.log('Run time: ' + runTime + 's');
};

DefaultTestReporter.prototype._printConsoleMessage = function(msg) {
switch (msg.type) {
case 'dir':
case 'log':
this._process.stdout.write(msg.data);
break;
case 'warn':
this._process.stderr.write(
this._formatMsg(msg.data, colors.YELLOW)
);
break;
case 'error':
this._process.stderr.write(
this._formatMsg(msg.data, colors.RED)
);
break;
default:
throw new Error('Unknown console message type!: ' + msg.type);
}
};

DefaultTestReporter.prototype._clearWaitingOn = function() {
// Don't write special chars in noHighlight mode
// to get clean output for logs.
Expand Down
86 changes: 31 additions & 55 deletions src/TestRunner.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ var assign = require('object-assign');
var promiseDone = require('./lib/promiseDone');
var through = require('through');
var utils = require('./lib/utils');
var WorkerPool = require('node-worker-pool');
var workerFarm = require('worker-farm');
var Console = require('./Console');
var promisify = require('./lib/promisify');

var TEST_WORKER_PATH = require.resolve('./TestWorker');

Expand Down Expand Up @@ -322,10 +323,8 @@ TestRunner.prototype.runTest = function(testFilePath) {
var env = new configDeps.testEnvironment(config);
var testRunner = configDeps.testRunner;

// Capture and serialize console.{log|warning|error}s so they can be passed
// around (such as through some channel back to a parent process)
var consoleMessages = [];
env.global.console = new Console(consoleMessages);
// Intercept console logs to colorize.
env.global.console = new Console(process.stdout, process.stderr);

// Pass the testFilePath into the runner, so it can be used to e.g.
// configure test reporter output.
Expand Down Expand Up @@ -380,7 +379,6 @@ TestRunner.prototype.runTest = function(testFilePath) {
.then(function(results) {
testExecStats.end = Date.now();

results.logMessages = consoleMessages;
results.perfStats = testExecStats;
results.testFilePath = testFilePath;
results.coverage =
Expand Down Expand Up @@ -523,7 +521,6 @@ TestRunner.prototype.runTests = function(testPaths, reporter) {
suites: {},
tests: {},
testResults: {},
logMessages: []
};
aggregatedResults.testResults.push(testResult);
aggregatedResults.numFailedTests++;
Expand Down Expand Up @@ -575,56 +572,35 @@ TestRunner.prototype._createInBandTestRun = function(
TestRunner.prototype._createParallelTestRun = function(
testPaths, onTestResult, onRunFailure
) {
var workerPool = new WorkerPool(
this._opts.maxWorkers,
this._opts.nodePath,
this._opts.nodeArgv.concat([
TEST_WORKER_PATH,
'--config=' + JSON.stringify(this._config)
])
);
var farm = workerFarm({
maxConcurretCallsPerWorker: 1,

// We allow for a couple of transient errors. Say something to do
// with loading/serialization of the resourcemap (which I've seen
// happen).
maxRetries: 2,
maxConcurrentWorkers: this._opts.maxWorkers
}, TEST_WORKER_PATH);

var runTest = promisify(farm);

return this._getModuleLoaderResourceMap()
.then(function() {
return Promise.all(testPaths.map(function(testPath) {
return workerPool.sendMessage({testFilePath: testPath})
.then(function(testResult) {
onTestResult(testPath, testResult);
})
.catch(function(err) {
onRunFailure(testPath, err);

// Jest uses regular worker messages to initialize workers, so
// there's no way for node-worker-pool to understand how to
// recover/re-initialize a child worker that needs to be restarted.
// (node-worker-pool can't distinguish between initialization
// messages and ephemeral "normal" messages in order to replay the
// initialization message upon booting the new, replacement worker
// process).
//
// This is mostly a limitation of node-worker-pool's initialization
// features, and ideally it would be possible to recover from a
// test that causes a worker process to exit unexpectedly. However,
// for now Jest will just fail hard if any child process exits
// unexpectedly.
//
// This will likely bite me in the ass as an unbreak now if we hit
// this issue again -- but I guess that's a faster failure than
// having Jest just hang forever without any indication as to why.
if (err.message
&& /Worker process exited before /.test(err.message)) {
console.error(
'A worker process has quit unexpectedly! This is bad news, ' +
'shutting down now!'
);
process.exit(1);
}
});
}));
})
.then(function() {
return workerPool.destroy();
});
.then(() => Promise.all(testPaths.map(
testPath => runTest({ config: this._config, testFilePath: testPath })
.then(testResult => onTestResult(testPath, testResult))
.catch(err => {
onRunFailure(testPath, err);

if (err.type === 'ProcessTerminatedError') {
// Initialization error or some other uncaught error
console.error(
'A worker process has quit unexpectedly! ' +
'Most likely this an initialization error.'
);
process.exit(1);
}
})
))).then(() => workerFarm.end(farm));
};

function _pathStreamToPromise(stream) {
Expand Down
90 changes: 45 additions & 45 deletions src/TestWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,52 +7,52 @@
*/
'use strict';

var optimist = require('optimist');
var TestRunner = require('./TestRunner');
var workerUtils = require('node-worker-pool/nodeWorkerUtils');

if (require.main === module) {
try {
process.on('uncaughtException', workerUtils.respondWithError);

var argv = optimist.demand(['config']).argv;
var config = JSON.parse(argv.config);

var testRunner = null;
var onMessage = function(message) {
if (testRunner === null) {
testRunner = new TestRunner(config, {
useCachedModuleLoaderResourceMap: true,
});
// Make sure uncaught errors are logged before we exit.
// Could be transient errors to do with loading and serializing the resouce
// map.
process.on('uncaughtException', (err) => {
console.error(err.stack);
process.exit(1);
});

// Start require()ing config dependencies now.
//
// Config dependencies are entries in the config that are require()d (in
// order to be pluggable) such as 'moduleLoader' or
// 'testEnvironment'.
testRunner.preloadConfigDependencies();

// Start deserializing the resource map to get a potential head-start on
// that work before the first "run-test" message comes in.
//
// This is just a perf optimization -- and it is only an optimization
// some of the time (when the there is any significant idle time between
// this first initialization message and the first "run-rest" message).
//
// It is also only an optimization so long as deserialization of the
// resource map is a bottleneck (which is the case at the time of this
// writing).
testRunner.preloadResourceMap();
}
var TestRunner = require('./TestRunner');

return testRunner.runTest(message.testFilePath)
.catch(function(err) {
throw (err.stack || err.message || err);
});
};
var testRunner;

module.exports = function(data, callback) {
if (!testRunner) {
testRunner = new TestRunner(data.config, {
useCachedModuleLoaderResourceMap: true
});

// Start require()ing config dependencies now.
//
// Config dependencies are entries in the config that are require()d (in
// order to be pluggable) such as 'moduleLoader' or
// 'testEnvironment'.
testRunner.preloadConfigDependencies();

// Start deserializing the resource map to get a potential head-start on
// that work before the first "run-test" message comes in.
//
// This is just a perf optimization -- and it is only an optimization
// some of the time (when the there is any significant idle time between
// this first initialization message and the first "run-rest" message).
//
// It is also only an optimization so long as deserialization of the
// resource map is a bottleneck (which is the case at the time of this
// writing).
testRunner.preloadResourceMap();
}

workerUtils.startWorker(null, onMessage);
} catch (e) {
workerUtils.respondWithError(e);
try {
testRunner.runTest(data.testFilePath)
.then(
result => callback(null, result),
// TODO: move to error object passing (why limit to strings?).
err => callback(err.stack || err.message || err)
);
} catch (err) {
callback(err.stack || err.message || err);
}
}
};
Loading

0 comments on commit 50fe34e

Please sign in to comment.