Skip to content

Commit

Permalink
Merge pull request #4787 from Countly/SER-731-create-secure-endpoints…
Browse files Browse the repository at this point in the history
…-for-process-profiling

[SER-731] [system-utility] Endpoints for profiler and inspector
  • Loading branch information
coskunaydinoglu authored Dec 25, 2023
2 parents 9147f47 + 3eed22f commit 644f8b6
Show file tree
Hide file tree
Showing 6 changed files with 544 additions and 15 deletions.
7 changes: 5 additions & 2 deletions api/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ plugins.connectToAllDatabases().then(function() {
* Handle exit events for gracefull close
*/
['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGILL', 'SIGTRAP', 'SIGABRT',
'SIGBUS', 'SIGFPE', 'SIGUSR1', 'SIGSEGV', 'SIGUSR2', 'SIGTERM'
'SIGBUS', 'SIGFPE', 'SIGSEGV', 'SIGTERM',
].forEach(function(sig) {
process.on(sig, async function() {
storeBatchedData(sig);
Expand Down Expand Up @@ -278,7 +278,10 @@ plugins.connectToAllDatabases().then(function() {
: os.cpus().length;

for (let i = 0; i < workerCount; i++) {
const worker = cluster.fork();
// there's no way to define inspector port of a worker in the code. So if we don't
// pick a unique port for each worker, they conflict with each other.
const inspectorPort = i + 1 + (common?.config?.masterInspectorPort || 9229);
const worker = cluster.fork({ NODE_OPTIONS: "--inspect-port=" + inspectorPort });
workers.push(worker);
}

Expand Down
42 changes: 42 additions & 0 deletions api/utils/countlyFs.js
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,24 @@ countlyFs.gridfs = {};
});
});
};
/**
* List files inside the category (collection/directory)
* @param {string} category - collection to list files in
* @param {function} callback - function called when files found or query errored, providing error object as first param and a list of filename, creation date and size as secondas second
*/
ob.listFiles = function(category, callback) {
const bucket = new GridFSBucket(db, { bucketName: category });
bucket.find().toArray()
.then((records) => callback(
null,
records.map(({ filename, uploadDate, length }) => ({
filename,
createdOn: uploadDate,
size: length
}))
))
.catch((error) => callback(error, null));
};

/**
* Get handler for filesystem, which in case of GridFS is database connection
Expand Down Expand Up @@ -941,6 +959,30 @@ countlyFs.fs = {};
});
};

/**
* List files inside the category (directory)
* @param {string} category - directory to list files in
* @param {function} callback - function called when files found, providing error object as first param and a list of filename, creation date and size as second
*/
ob.listFiles = function(category, callback) {
fs.readdir(category, function(err, files) {
if (err) {
return callback(err);
}
callback(
null,
files.map(filename => {
const stats = fs.statSync(category + '/' + filename);
return {
filename,
createdOn: stats.mtime,
size: stats.size
};
})
);
});
};

/**
* Get handler for filesystem, which in case of GridFS is database connection
* @returns {object} databse connection
Expand Down
233 changes: 226 additions & 7 deletions plugins/system-utility/api/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,234 @@ var plugin = {},
common = require('../../../api/utils/common.js'),
tracker = require('../../../api/parts/mgmt/tracker.js'),
plugins = require('../../pluginManager.js'),
systemUtility = require('./system.utility');
systemUtility = require('./system.utility'),
log = common.log('system-utility:api'),
cluster = require("cluster");

const processName = (cluster.isMaster ? "master" : "worker") + "-" + process.pid;
const profilerCmds = ["startProfiler", "stopProfiler", "startInspector", "stopInspector"];
let numberOfWorkers;

/**
* Checks if the message is sent from profiler/inspector endpoints
* @param {object} msg IPC message object contains a "cmd" key
* @returns {boolean} true if an inspector/profiler message
*/
function isInspectorMessage(msg) {
return typeof msg === "object" && profilerCmds.includes(msg.cmd);
}

/**
* Handles IPC messages sent by profiler and inspector endpoints.
* @param {object} msg should contain at least "cmd" and "msgId" key
*/
function handleMessage(msg) {
if (isInspectorMessage(msg)) {
let args = msg.args || [];
// each process will have their own processName. So we can't pass
// that from the main process:
if (msg.cmd === "stopProfiler" || msg.cmd === "startProfiler") {
args = [processName];
}

systemUtility[msg.cmd](...args).catch(err => {
log.e(err);
console.error(err);
});
}
else if (typeof msg === "object" && msg.cmd === "setNumberOfWorkers") {
numberOfWorkers = msg.params.numberOfWorkers;
}
}

// Handle messages broadcasted from master to worker.
process.on("message", msg => handleMessage(msg));

// Handle messages sent from worker to master.
plugins.register("/master", () => {
const workers = Object.values(cluster.workers);

workers.forEach(worker => {
// set the numberOfWorkers variable on each worker
worker.on("listening", () => {
worker.send({
cmd: "setNumberOfWorkers",
params: { numberOfWorkers: workers.length }
});
});

// listen workers for inspector/profiler messages
worker.on("message", msg => {
if (isInspectorMessage(msg)) {
// handle on master
handleMessage(msg);

// broadcast to all workers except for "startInspector".
// running startInspector on master also starts worker's inspectors.
if (!["startInspector"].includes(msg.cmd)) {
workers.forEach(_worker => _worker.send(msg));
}
}
});
});
});

// helper functions to start/stop with a timeout.
let timeouts = { Profiler: null, Inspector: null };
/**
* Sends a "startInspector" or "startProfiler" message to main process.
* Sets a timeout callback to stop the same operation.
* @param {string} type Inspector|Profiler
*/
function startWithTimeout(type) {
if (timeouts[type]) {
throw new Error("Already started");
}
process.send({ cmd: "start" + type });
timeouts[type] = setTimeout(() => stopWithTimeout(type, true), 2 * 60 * 60 * 1000);
}
/**
* Sends a "stopInspector" or "stopProfiler" message to main process.
* @param {string} type Inspector|Profiler
* @param {boolean} fromTimeout true if its being stoped because of the timeout
*/
function stopWithTimeout(type, fromTimeout = false) {
if (!timeouts[type]) {
throw new Error(type + " needs to be started");
}
process.send({ cmd: "stop" + type });
if (!fromTimeout) {
clearTimeout(timeouts[type]);
}
timeouts[type] = null;
}

(function() {
//write api call
/*
plugins.register("/i", function(ob){
});
*/
plugins.register("/i/inspector", function(ob) {
var params = ob.params,
path = ob.paths[3].toLowerCase(),
validate = ob.validateUserForGlobalAdmin;

switch (path) {
case "start":
validate(params, () => {
const masterPort = common.config?.api?.masterInspectorPort ?? 9229;
try {
startWithTimeout("Inspector");
common.returnMessage(params, 200, {
workers: numberOfWorkers,
ports: [masterPort, masterPort + numberOfWorkers]
});
}
catch (err) {
log.e(err);
common.returnMessage(params, 500, err.toString());
}
});
return true;

case "stop":
validate(params, () => {
try {
stopWithTimeout("Inspector");
common.returnMessage(params, 200, "Stoping inspector for all processes");
}
catch (err) {
log.e(err);
common.returnMessage(params, 500, err.toString());
}
});
return true;

default:
return false;
}
});

plugins.register("/i/profiler", function(ob) {
var params = ob.params,
path = ob.paths[3].toLowerCase(),
validate = ob.validateUserForGlobalAdmin;

switch (path) {
case 'start':
validate(params, () => {
try {
startWithTimeout("Profiler");
common.returnMessage(params, 200, "Starting profiler for all processes");
}
catch (err) {
log.e(err);
common.returnMessage(params, 500, err.toString());
}
});
return true;

case 'stop':
validate(params, () => {
try {
stopWithTimeout("Profiler");
common.returnMessage(params, 200, "Stoping profiler for all processes");
}
catch (err) {
log.e(err);
common.returnMessage(params, 500, err.toString());
}
});
return true;

case 'list-files':
validate(params, () => {
systemUtility.listProfilerFiles()
.then(res => common.returnMessage(params, 200, res))
.catch(err => {
log.e(err);
common.returnMessage(params, 404, "Profiler files not found");
});
});
return true;

case 'download':
validate(params, () => {
systemUtility.downloadProfilerFile(params.qstring.filename)
.then(({ data, filename }) => {
common.returnRaw(params, 200, data, {
'Content-Type': 'plain/text; charset=utf-8',
'Content-disposition': 'attachment; filename=' + filename
});
})
.catch(err => {
log.e(err);
common.returnMessage(params, 404, "File not found");
});
});
return true;

case 'download-all':
validate(params, async() => {
try {
const tarStream = await systemUtility.profilerFilesTarStream();
params.res.writeHead(200, {
"Content-Type": "plain/text; charset=utf-8",
"Content-Disposition": "attachment; filename=profiler.tar"
});
tarStream.on("end", () => params.res.end());
tarStream.pipe(params.res);
}
catch (err) {
log.e(err);
console.error(err);
common.returnMessage(params, 500, "Server error");
}
});
return true;

default:
return false;
}
});



plugins.register("/o/system", function(ob) {

Expand Down
Loading

0 comments on commit 644f8b6

Please sign in to comment.