diff --git a/api/api.js b/api/api.js index 99fa2ee1581..44213d452c9 100644 --- a/api/api.js +++ b/api/api.js @@ -171,7 +171,7 @@ plugins.connectToAllDatabases().then(function() { * Handle exit events for gracefull close */ ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGILL', 'SIGTRAP', 'SIGABRT', - 'SIGBUS', 'SIGFPE', 'SIGUSR1', 'SIGSEGV', 'SIGUSR2', 'SIGTERM' + 'SIGBUS', 'SIGFPE', 'SIGSEGV', 'SIGTERM', ].forEach(function(sig) { process.on(sig, async function() { storeBatchedData(sig); @@ -278,7 +278,10 @@ plugins.connectToAllDatabases().then(function() { : os.cpus().length; for (let i = 0; i < workerCount; i++) { - const worker = cluster.fork(); + // there's no way to define inspector port of a worker in the code. So if we don't + // pick a unique port for each worker, they conflict with each other. + const inspectorPort = i + 1 + (common?.config?.masterInspectorPort || 9229); + const worker = cluster.fork({ NODE_OPTIONS: "--inspect-port=" + inspectorPort }); workers.push(worker); } diff --git a/api/utils/countlyFs.js b/api/utils/countlyFs.js index fbf0769235c..099905ae2fa 100644 --- a/api/utils/countlyFs.js +++ b/api/utils/countlyFs.js @@ -631,6 +631,24 @@ countlyFs.gridfs = {}; }); }); }; + /** + * List files inside the category (collection/directory) + * @param {string} category - collection to list files in + * @param {function} callback - function called when files found or query errored, providing error object as first param and a list of filename, creation date and size as secondas second + */ + ob.listFiles = function(category, callback) { + const bucket = new GridFSBucket(db, { bucketName: category }); + bucket.find().toArray() + .then((records) => callback( + null, + records.map(({ filename, uploadDate, length }) => ({ + filename, + createdOn: uploadDate, + size: length + })) + )) + .catch((error) => callback(error, null)); + }; /** * Get handler for filesystem, which in case of GridFS is database connection @@ -941,6 +959,30 @@ countlyFs.fs = {}; }); }; + /** + * List files inside the category (directory) + * @param {string} category - directory to list files in + * @param {function} callback - function called when files found, providing error object as first param and a list of filename, creation date and size as second + */ + ob.listFiles = function(category, callback) { + fs.readdir(category, function(err, files) { + if (err) { + return callback(err); + } + callback( + null, + files.map(filename => { + const stats = fs.statSync(category + '/' + filename); + return { + filename, + createdOn: stats.mtime, + size: stats.size + }; + }) + ); + }); + }; + /** * Get handler for filesystem, which in case of GridFS is database connection * @returns {object} databse connection diff --git a/plugins/system-utility/api/api.js b/plugins/system-utility/api/api.js index b57fa7dc6e0..f5e956ea63c 100644 --- a/plugins/system-utility/api/api.js +++ b/plugins/system-utility/api/api.js @@ -2,15 +2,234 @@ var plugin = {}, common = require('../../../api/utils/common.js'), tracker = require('../../../api/parts/mgmt/tracker.js'), plugins = require('../../pluginManager.js'), - systemUtility = require('./system.utility'); + systemUtility = require('./system.utility'), + log = common.log('system-utility:api'), + cluster = require("cluster"); + +const processName = (cluster.isMaster ? "master" : "worker") + "-" + process.pid; +const profilerCmds = ["startProfiler", "stopProfiler", "startInspector", "stopInspector"]; +let numberOfWorkers; + +/** + * Checks if the message is sent from profiler/inspector endpoints + * @param {object} msg IPC message object contains a "cmd" key + * @returns {boolean} true if an inspector/profiler message + */ +function isInspectorMessage(msg) { + return typeof msg === "object" && profilerCmds.includes(msg.cmd); +} + +/** + * Handles IPC messages sent by profiler and inspector endpoints. + * @param {object} msg should contain at least "cmd" and "msgId" key + */ +function handleMessage(msg) { + if (isInspectorMessage(msg)) { + let args = msg.args || []; + // each process will have their own processName. So we can't pass + // that from the main process: + if (msg.cmd === "stopProfiler" || msg.cmd === "startProfiler") { + args = [processName]; + } + + systemUtility[msg.cmd](...args).catch(err => { + log.e(err); + console.error(err); + }); + } + else if (typeof msg === "object" && msg.cmd === "setNumberOfWorkers") { + numberOfWorkers = msg.params.numberOfWorkers; + } +} + +// Handle messages broadcasted from master to worker. +process.on("message", msg => handleMessage(msg)); + +// Handle messages sent from worker to master. +plugins.register("/master", () => { + const workers = Object.values(cluster.workers); + + workers.forEach(worker => { + // set the numberOfWorkers variable on each worker + worker.on("listening", () => { + worker.send({ + cmd: "setNumberOfWorkers", + params: { numberOfWorkers: workers.length } + }); + }); + + // listen workers for inspector/profiler messages + worker.on("message", msg => { + if (isInspectorMessage(msg)) { + // handle on master + handleMessage(msg); + + // broadcast to all workers except for "startInspector". + // running startInspector on master also starts worker's inspectors. + if (!["startInspector"].includes(msg.cmd)) { + workers.forEach(_worker => _worker.send(msg)); + } + } + }); + }); +}); + +// helper functions to start/stop with a timeout. +let timeouts = { Profiler: null, Inspector: null }; +/** + * Sends a "startInspector" or "startProfiler" message to main process. + * Sets a timeout callback to stop the same operation. + * @param {string} type Inspector|Profiler + */ +function startWithTimeout(type) { + if (timeouts[type]) { + throw new Error("Already started"); + } + process.send({ cmd: "start" + type }); + timeouts[type] = setTimeout(() => stopWithTimeout(type, true), 2 * 60 * 60 * 1000); +} +/** + * Sends a "stopInspector" or "stopProfiler" message to main process. + * @param {string} type Inspector|Profiler + * @param {boolean} fromTimeout true if its being stoped because of the timeout + */ +function stopWithTimeout(type, fromTimeout = false) { + if (!timeouts[type]) { + throw new Error(type + " needs to be started"); + } + process.send({ cmd: "stop" + type }); + if (!fromTimeout) { + clearTimeout(timeouts[type]); + } + timeouts[type] = null; +} (function() { - //write api call - /* - plugins.register("/i", function(ob){ - - }); - */ + plugins.register("/i/inspector", function(ob) { + var params = ob.params, + path = ob.paths[3].toLowerCase(), + validate = ob.validateUserForGlobalAdmin; + + switch (path) { + case "start": + validate(params, () => { + const masterPort = common.config?.api?.masterInspectorPort ?? 9229; + try { + startWithTimeout("Inspector"); + common.returnMessage(params, 200, { + workers: numberOfWorkers, + ports: [masterPort, masterPort + numberOfWorkers] + }); + } + catch (err) { + log.e(err); + common.returnMessage(params, 500, err.toString()); + } + }); + return true; + + case "stop": + validate(params, () => { + try { + stopWithTimeout("Inspector"); + common.returnMessage(params, 200, "Stoping inspector for all processes"); + } + catch (err) { + log.e(err); + common.returnMessage(params, 500, err.toString()); + } + }); + return true; + + default: + return false; + } + }); + + plugins.register("/i/profiler", function(ob) { + var params = ob.params, + path = ob.paths[3].toLowerCase(), + validate = ob.validateUserForGlobalAdmin; + + switch (path) { + case 'start': + validate(params, () => { + try { + startWithTimeout("Profiler"); + common.returnMessage(params, 200, "Starting profiler for all processes"); + } + catch (err) { + log.e(err); + common.returnMessage(params, 500, err.toString()); + } + }); + return true; + + case 'stop': + validate(params, () => { + try { + stopWithTimeout("Profiler"); + common.returnMessage(params, 200, "Stoping profiler for all processes"); + } + catch (err) { + log.e(err); + common.returnMessage(params, 500, err.toString()); + } + }); + return true; + + case 'list-files': + validate(params, () => { + systemUtility.listProfilerFiles() + .then(res => common.returnMessage(params, 200, res)) + .catch(err => { + log.e(err); + common.returnMessage(params, 404, "Profiler files not found"); + }); + }); + return true; + + case 'download': + validate(params, () => { + systemUtility.downloadProfilerFile(params.qstring.filename) + .then(({ data, filename }) => { + common.returnRaw(params, 200, data, { + 'Content-Type': 'plain/text; charset=utf-8', + 'Content-disposition': 'attachment; filename=' + filename + }); + }) + .catch(err => { + log.e(err); + common.returnMessage(params, 404, "File not found"); + }); + }); + return true; + + case 'download-all': + validate(params, async() => { + try { + const tarStream = await systemUtility.profilerFilesTarStream(); + params.res.writeHead(200, { + "Content-Type": "plain/text; charset=utf-8", + "Content-Disposition": "attachment; filename=profiler.tar" + }); + tarStream.on("end", () => params.res.end()); + tarStream.pipe(params.res); + } + catch (err) { + log.e(err); + console.error(err); + common.returnMessage(params, 500, "Server error"); + } + }); + return true; + + default: + return false; + } + }); + + plugins.register("/o/system", function(ob) { diff --git a/plugins/system-utility/api/system.utility.js b/plugins/system-utility/api/system.utility.js index 54bd3e8a05f..44e0f298932 100644 --- a/plugins/system-utility/api/system.utility.js +++ b/plugins/system-utility/api/system.utility.js @@ -1,5 +1,11 @@ var common = require('../../../api/utils/common.js'); +const inspector = require('inspector'); +const countlyFs = require('../../../api/utils/countlyFs.js'); var exec = require('child_process').exec; +const tar = require("tar-stream"); +const session = new inspector.Session(); + +const PROFILER_DIR = "nodeprofile"; var _id = null; @@ -395,5 +401,221 @@ function mongodbConnectionCheck() { }); } + exports.healthcheck = healthCheck; -exports.dbcheck = mongodbConnectionCheck; \ No newline at end of file +exports.dbcheck = mongodbConnectionCheck; + +// PROFILER + +/** + * Promise abstraction for session.post + * @param {string} cmd session command to run + * @returns {Promise} the value command returns + */ +function sessionPost(cmd) { + return new Promise((res, rej) => { + session.post(cmd, (err, arg) => { + if (err) { + return rej(err); + } + return res(arg); + }); + }); +} + +/** + * Saves the result to gridfs + * @param {string} filename file name with extension + * @param {object} result result object returned by the profiler + * @returns {Promise} filename + */ +function saveProfilerResult(filename, result) { + return new Promise((res, rej) => { + countlyFs.gridfs.saveData( + PROFILER_DIR, filename, JSON.stringify(result), + { writeMode: "overwrite" }, + function(err) { + if (err) { + return rej(err); + } + res(filename); + } + ); + }); +} + +/** + * Connects to inspector session and starts profilers + * There're 3 types of profiler: cpu, heap, coverage + */ +async function startProfiler() { + session.connect(); + + await sessionPost("Profiler.enable"); + await sessionPost("Profiler.start"); + await sessionPost("Profiler.startPreciseCoverage"); + + await sessionPost("HeapProfiler.enable"); + await sessionPost("HeapProfiler.startSampling"); +} + +/** + * Stops profiler and disconnects from the inspector session + * Files to be created: + * - process-name.cpuprofile + * - process-name.heapprofile + * - process-name.coverage + * @param {string} processName process or worker and process id + */ +async function stopProfiler(processName) { + const errors = []; + + // clear old files + await new Promise( + (res, rej) => countlyFs.gridfs.deleteAll( + PROFILER_DIR, + null, + err => err ? rej(err) : res() + ) + ); + + // coverage + try { + const coverage = await sessionPost("Profiler.takePreciseCoverage"); + await saveProfilerResult(processName + ".coverage", coverage?.result); + await sessionPost("Profiler.stopPreciseCoverage"); + } + catch (err) { + errors.push(err); + } + + // cpu profiler + try { + const cpuProfile = await sessionPost("Profiler.stop"); + await saveProfilerResult(processName + ".cpuprofile", cpuProfile?.profile); + await sessionPost("Profiler.disable"); + } + catch (err) { + errors.push(err); + } + + // heap profiler + try { + const heapProfile = await sessionPost("HeapProfiler.stopSampling"); + await saveProfilerResult(processName + ".heapprofile", heapProfile?.profile); + await sessionPost("HeapProfiler.disable"); + } + catch (err) { + errors.push(err); + } + + session.disconnect(); + + if (errors.length) { + throw errors; + } +} + +/** + * Returns the data of a file in PROFILER_DIR collection + * @param {string} filename file name with extension + * @returns {Promise<{data:string, filename:string}>} file object with name and content + */ +function downloadProfilerFile(filename) { + return new Promise((resolve, reject) => { + countlyFs.gridfs.getData(PROFILER_DIR, filename, {}, (err, data) => { + if (err) { + return reject("File not found"); + } + resolve({ data, filename }); + }); + }); +} +/** + * Returns the names, creation dates and size of all files in the PROFILER_DIR collection + * @returns {Promise>} file info + */ +function listProfilerFiles() { + return new Promise((resolve, reject) => { + countlyFs.gridfs.listFiles(PROFILER_DIR, (err, files) => { + if (err) { + return reject(err); + } + resolve(files); + }); + }); +} + +/** + * Returns the tarball read stream for all profiler files + * @returns {tar.Pack} tar stream + */ +async function profilerFilesTarStream() { + const files = await listProfilerFiles(); + let fileStreamFinished = 0; + if (!files.length) { + return null; + } + const pack = tar.pack(); + for (let i = 0; i < files.length; i++) { + const entry = pack.entry({ name: files[i].filename, size: files[i].size }); + const stream = await new Promise((res, rej) => { + countlyFs.gridfs.getStream( + PROFILER_DIR, + files[i].filename, + {}, + (err, fileStream) => err ? rej(err) : res(fileStream) + ); + }); + stream.pipe(entry); + stream.on("end", () => { + entry.end(); + fileStreamFinished++; + if (fileStreamFinished === files.length) { + pack.finalize(); + } + }); + } + return pack; +} + +/** + * Opens inspector. Running inspector.open on master process triggers workers to + * open their inspector also. But this is not the case for closing the inspectors. + * Each worker needs to be closed manually. + * @returns {void} + */ +function startInspector() { + return new Promise((res, rej) => { + try { + res(inspector.open()); + } + catch (err) { + rej(err); + } + }); +} + +/** + * Closes inspector. Running inspector.close on master doesn't trigger workers to + * close their inspector. Each worker needs to be closed manually. + * @returns {void} + */ +function stopInspector() { + return new Promise((res, rej) => { + try { + res(inspector.close()); + } + catch (err) { + rej(err); + } + }); +} + +exports.startProfiler = startProfiler; +exports.stopProfiler = stopProfiler; +exports.downloadProfilerFile = downloadProfilerFile; +exports.listProfilerFiles = listProfilerFiles; +exports.startInspector = startInspector; +exports.stopInspector = stopInspector; +exports.profilerFilesTarStream = profilerFilesTarStream; diff --git a/plugins/system-utility/package-lock.json b/plugins/system-utility/package-lock.json index 532f5c7d5da..97a55595fca 100644 --- a/plugins/system-utility/package-lock.json +++ b/plugins/system-utility/package-lock.json @@ -6,7 +6,44 @@ "packages": { "": { "name": "system-utility", - "version": "0.0.0" + "version": "0.0.0", + "dependencies": { + "tar-stream": "^3.1.6" + } + }, + "node_modules/b4a": { + "version": "1.6.4", + "resolved": "https://registry.npmjs.org/b4a/-/b4a-1.6.4.tgz", + "integrity": "sha512-fpWrvyVHEKyeEvbKZTVOeZF3VSKKWtJxFIxX/jaVPf+cLbGUSitjb49pHLqPV2BUNNZ0LcoeEGfE/YCpyDYHIw==" + }, + "node_modules/fast-fifo": { + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/fast-fifo/-/fast-fifo-1.3.2.tgz", + "integrity": "sha512-/d9sfos4yxzpwkDkuN7k2SqFKtYNmCTzgfEpz82x34IM9/zc8KGxQoXg1liNC/izpRM/MBdt44Nmx41ZWqk+FQ==" + }, + "node_modules/queue-tick": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/queue-tick/-/queue-tick-1.0.1.tgz", + "integrity": "sha512-kJt5qhMxoszgU/62PLP1CJytzd2NKetjSRnyuj31fDd3Rlcz3fzlFdFLD1SItunPwyqEOkca6GbV612BWfaBag==" + }, + "node_modules/streamx": { + "version": "2.15.6", + "resolved": "https://registry.npmjs.org/streamx/-/streamx-2.15.6.tgz", + "integrity": "sha512-q+vQL4AAz+FdfT137VF69Cc/APqUbxy+MDOImRrMvchJpigHj9GksgDU2LYbO9rx7RX6osWgxJB2WxhYv4SZAw==", + "dependencies": { + "fast-fifo": "^1.1.0", + "queue-tick": "^1.0.1" + } + }, + "node_modules/tar-stream": { + "version": "3.1.6", + "resolved": "https://registry.npmjs.org/tar-stream/-/tar-stream-3.1.6.tgz", + "integrity": "sha512-B/UyjYwPpMBv+PaFSWAmtYjwdrlEaZQEhMIBFNC5oEG8lpiW8XjcSdmEaClj28ArfKScKHs2nshz3k2le6crsg==", + "dependencies": { + "b4a": "^1.6.4", + "fast-fifo": "^1.2.0", + "streamx": "^2.15.0" + } } } } diff --git a/plugins/system-utility/package.json b/plugins/system-utility/package.json index dd551e9f5de..130017e714d 100644 --- a/plugins/system-utility/package.json +++ b/plugins/system-utility/package.json @@ -5,16 +5,22 @@ "description": "Plugin for getting current system information.", "author": "Count.ly", "homepage": "https://count.ly/plugins/", - "repository" :{ "type" : "git", "url" : "http://github.com/Countly/countly-server.git"}, - "bugs":{ "url" : "http://github.com/Countly/countly-server/issues"}, + "repository": { + "type": "git", + "url": "http://github.com/Countly/countly-server.git" + }, + "bugs": { + "url": "http://github.com/Countly/countly-server/issues" + }, "keywords": [ "countly", "analytics", "mobile", "plugins", - "template" + "template" ], "dependencies": { + "tar-stream": "^3.1.6" }, "private": true -} \ No newline at end of file +}