Skip to content

Commit

Permalink
Worker-based multitenancy implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tshemsedinov committed May 26, 2022
1 parent 01070f5 commit 4fdfde1
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 123 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## [Unreleased][unreleased]

- Fix shutdown while initialization
- Worker-based multitenancy implementation

## [2.6.10][] - 2022-05-09

Expand Down
260 changes: 154 additions & 106 deletions impress.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,167 +2,215 @@

process.title = 'impress';

const fsp = require('fs').promises;
const { Worker } = require('worker_threads');
const path = require('path');
const { Config } = require('metaconfiguration');
const metavm = require('metavm');
const metautil = require('metautil');
const { loadSchema } = require('metaschema');
const { Logger } = require('metalog');
let logger = null;

let finalization = false;
let initialization = true;

const CONFIG_SECTIONS = ['log', 'scale', 'server', 'sessions'];
const PATH = process.cwd();
const WORKER_PATH = path.join(__dirname, 'lib/worker.js');
const CFG_PATH = path.join(PATH, 'application/config');
const LOG_PATH = path.join(PATH, 'log');
const CTRL_C = 3;
const LOG_OPTIONS = { path: LOG_PATH, home: PATH, workerId: 0 };
const CONTEXT = metavm.createContext({ process });
const CFG_OPTIONS = { mode: process.env.MODE, context: CONTEXT };

const impress = {
logger: null,
config: null,
finalized: () => {},
finalization: false,
initialization: true,
console,
applications: new Map(),
lastWorkerId: 0,
};

const exit = async (message = 'Can not start Application server') => {
console.error(metautil.replace(message, PATH, ''));
if (logger) await logger.close();
const exit = async (message) => {
impress.console.info(message);
if (impress.logger && impress.logger.active) await this.logger.close();
process.exit(1);
};

const logError = (type) => (err) => {
const msg = err.stack || err.message || 'no stack trace';
console.error(`${type} error: ${msg}`);
if (finalization) return;
if (initialization) exit();
impress.console.error(`${type}: ${msg}`);
if (impress.finalization) return;
if (impress.initialization) exit('Can not start Application server');
};

process.on('uncaughtException', logError('uncaughtException'));
process.on('warning', logError('warning'));
process.on('unhandledRejection', logError('unhandledRejection'));
process.on('uncaughtException', logError('Uncaught exception'));
process.on('warning', logError('Warning'));
process.on('unhandledRejection', logError('Unhandled rejection'));

const startWorker = (app, kind, port, id = ++impress.lastWorkerId) => {
// TODO: implement kind: 'worker'
// 'scheduler' will not be kind of worker
// it will be shared for all apps
const workerData = { id, kind, path: app.path, port };
const options = { trackUnmanagedFds: true, workerData };
const worker = new Worker(WORKER_PATH, options);
app.threads.set(id, worker);

worker.on('exit', (code) => {
if (code !== 0) startWorker(app, kind, port, id);
else app.threads.delete(id);
if (app.threads.size === 0) {
impress.applications.delete(app.path);
if (impress.applications.size === 0) impress.finalized();
}
});

const handlers = {
event: ({ name }) => {
if (name === 'started') {
app.ready++;
app.pool.add(worker);
if (app.threads.size === app.ready) {
impress.console.info(`App started: ${app.peth}`);
}
}
},

task: (msg) => {
impress.console.log({ id, type: 'task', msg });
//const transferList = msg.port ? [msg.port] : undefined;
//scheduler.postMessage(msg, transferList);
},

invoke: async (msg) => {
impress.console.log({ id, type: 'invoke', msg });
/*
const { name, port, exclusive } = msg;
if (name === 'done') {
if (exclusive) pool.release(worker);
return;
}
if (name !== 'request') return;
const promisedThread = exclusive ? pool.capture() : pool.next();
const next = await promisedThread.catch(() => {
port.postMessage({ error: new Error('No thread available') });
return null;
});
if (!next) return;
next.postMessage(msg, [port]);
*/
},
};

worker.on('message', (msg) => {
const handler = handlers[msg.type];
if (handler) handler(msg);
});
};

const validateConfig = async (config) => {
const schemaPath = path.join(__dirname, 'schemas/config');
let valid = true;
const schemaPath = path.join(__dirname, 'schemas/config');
for (const section of CONFIG_SECTIONS) {
const fileName = path.join(schemaPath, section + '.js');
const schema = await loadSchema(fileName);
const checkResult = schema.check(config[section]);
if (!checkResult.valid) {
for (const err of checkResult.errors) {
console.error(`${err} in application/config/${section}.js`);
impress.console.error(`${err} in application/config/${section}.js`);
}
valid = false;
}
}
if (!valid) exit();
if (!valid) exit('Application server configuration is invalid');
};

(async () => {
const context = metavm.createContext({ process });
const CFG_OPTIONS = { mode: process.env.MODE, context };
const config = await new Config(CFG_PATH, CFG_OPTIONS).catch((err) => {
const loadApplication = async (root) => {
impress.console.info(`Start: ${root}`);
const configPath = path.join(root, 'application/config');
const config = await new Config(configPath, CFG_OPTIONS).catch((err) => {
exit(`Can not read configuration: ${CFG_PATH}\n${err.stack}`);
});

logger = await new Logger({ ...LOG_OPTIONS, ...config.log });
logger.on('error', logError('logger error'));
if (logger.active) global.console = logger.console;

await validateConfig(config);

const { balancer, ports = [], workers = {} } = config.server;
const serversCount = ports.length + (balancer ? 1 : 0);
const schedulerCount = 1;
const schedulerId = serversCount;
const poolSize = workers.pool || 0;
const count = serversCount + schedulerCount + poolSize;
let startTimer = null;
let active = 0;
let starting = 0;
let scheduler = null;
const threads = new Array(count);
const threads = new Map();
const pool = new metautil.Pool({ timeout: workers.wait });

const stop = async () => {
finalization = true;
const closing = logger.close();
for (const worker of threads) {
worker.postMessage({ type: 'event', name: 'stop' });
}
await closing;
};
const app = { path: root, config, threads, pool, ready: 0 };

const start = (id) => {
const workerPath = path.join(__dirname, 'lib/worker.js');
const worker = new Worker(workerPath, { trackUnmanagedFds: true });
threads[id] = worker;
if (id === schedulerId) scheduler = worker;
else if (id > schedulerId) pool.add(worker);

worker.on('exit', (code) => {
active--;
if (code !== 0) start(id);
else if (active === 0) process.exit(0);
else if (active < 0 && id === 0) exit('Application server stopped');
});
if (balancer) startWorker(app, 'balancer', balancer);
for (const port of ports) startWorker(app, 'server', port);
const poolSize = workers.pool || 0;
for (let i = 0; i < poolSize; i++) startWorker(app, 'worker');

worker.on('online', () => {
if (++starting === count) {
startTimer = setTimeout(() => {
if (active !== count) {
console.warn(`Worker ${id} initialization timeout`);
}
}, config.server.timeouts.start);
}
});
impress.applications.set(root, app);
};

const ITC = {
event: ({ name }) => {
if (name !== 'started') return;
active++;
if (active === count && startTimer) {
clearTimeout(startTimer);
startTimer = null;
}
},

task: (msg) => {
if (msg.type !== 'task') return;
const transferList = msg.port ? [msg.port] : undefined;
scheduler.postMessage(msg, transferList);
},

invoke: async (msg) => {
const { name, port, exclusive } = msg;
if (name === 'done') {
if (exclusive) pool.release(worker);
return;
}
if (name !== 'request') return;
const promisedThread = exclusive ? pool.capture() : pool.next();
const next = await promisedThread.catch(() => {
port.postMessage({ error: new Error('No thread available') });
return null;
});
if (!next) return;
next.postMessage(msg, [port]);
},
};

worker.on('message', (msg) => {
const handler = ITC[msg.type];
if (handler) handler(msg);
});
};
const loadApplications = async () => {
const list = await fsp
.readFile('.applications', 'utf8')
.then((data) => data.split('\n').filter((s) => s.length !== 0))
.catch(() => [PATH]);
for (const path of list) {
await loadApplication(path);
}
};

const stopApplication = (root) => {
const app = impress.applications.get(root);
for (const thread of app.threads.values()) {
thread.postMessage({ type: 'event', name: 'stop' });
}
};

for (let id = 0; id < count; id++) start(id);
const stop = async () => {
impress.finalization = true;
const logClosed = impress.logger.close();
const portsClosed = new Promise((resolve) => {
impress.finalized = resolve;
setTimeout(() => {
impress.console.error('Exit with graceful shutdown timeout');
resolve();
}, impress.config.server.timeouts.stop);
});
for (const app of impress.applications.values()) {
stopApplication(app.path);
}
await Promise.allSettled([logClosed, portsClosed]);
exit('Application server stopped');
};

(async () => {
const configPath = path.join(PATH, 'application/config');
const config = await new Config(configPath, CFG_OPTIONS).catch((err) => {
exit(`Can not read configuration: ${CFG_PATH}\n${err.stack}`);
});
await validateConfig(config);
impress.config = config;
const logger = await new Logger({ ...LOG_OPTIONS, ...config.log });
logger.on('error', logError('Logger'));
if (logger.active) impress.console = logger.console;
impress.logger = logger;

process.on('SIGINT', stop);
process.on('SIGTERM', stop);

const startTimer = setTimeout(() => {
impress.console.warn(`Initialization timeout`);
}, config.server.timeouts.start);

await loadApplications();

if (process.stdin.isTTY) {
process.stdin.setRawMode(true);
process.stdin.on('data', (data) => {
const key = data[0];
if (key === CTRL_C) stop();
});
}
initialization = false;
})().catch(logError('initialization'));
clearTimeout(startTimer);
impress.initialization = false;
})().catch(logError('Initialization'));
13 changes: 7 additions & 6 deletions lib/application.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ const { node, npm, metarhia } = require('./dependencies.js');
const path = require('path');
const events = require('events');
const fs = require('fs');
const { MessageChannel, parentPort, threadId } = require('worker_threads');
const wt = require('worker_threads');
const { MessageChannel, parentPort, threadId } = wt;
const workerData = wt.workerData || { path: process.cwd() };
const metavm = require('metavm');
const metawatch = require('metawatch');
const metautil = require('metautil');
Expand Down Expand Up @@ -36,10 +38,10 @@ const SANDBOX = { ...metavm.COMMON_CONTEXT, Error, node, npm, metarhia };
class Application extends events.EventEmitter {
constructor() {
super();
this.kind = '';
this.kind = workerData.kind;
this.initialization = true;
this.finalization = false;
this.root = process.cwd();
this.root = workerData.path;
this.path = path.join(this.root, 'application');

this.schemas = new Schemas('schemas', this);
Expand All @@ -66,8 +68,7 @@ class Application extends events.EventEmitter {
return path.join(this.path, relative);
}

async init(kind) {
this.kind = kind;
async init() {
this.startWatch();
this.createSandbox();
this.sandbox.application.emit('loading');
Expand All @@ -85,7 +86,7 @@ class Application extends events.EventEmitter {
await Promise.allSettled(this.starts.map((fn) => this.execute(fn)));
this.sandbox.application.emit('started');
await this.api.load();
if (kind === 'scheduler') await this.scheduler.load();
if (this.kind === 'scheduler') await this.scheduler.load();
const { api } = this.sandbox;
if (api.auth) {
const { provider } = api.auth;
Expand Down
Loading

0 comments on commit 4fdfde1

Please sign in to comment.