diff --git a/CHANGELOG.md b/CHANGELOG.md index 589aef4b..c9d859e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## [Unreleased][unreleased] - Fix shutdown while initialization +- Worker-based multitenancy implementation ## [2.6.10][] - 2022-05-09 diff --git a/impress.js b/impress.js index 6e74f0be..f9d7e2f6 100644 --- a/impress.js +++ b/impress.js @@ -2,6 +2,7 @@ process.title = 'impress'; +const fsp = require('fs').promises; const { Worker } = require('worker_threads'); const path = require('path'); const { Config } = require('metaconfiguration'); @@ -9,154 +10,200 @@ const metavm = require('metavm'); const metautil = require('metautil'); const { loadSchema } = require('metaschema'); const { Logger } = require('metalog'); -let logger = null; - -let finalization = false; -let initialization = true; const CONFIG_SECTIONS = ['log', 'scale', 'server', 'sessions']; const PATH = process.cwd(); +const WORKER_PATH = path.join(__dirname, 'lib/worker.js'); const CFG_PATH = path.join(PATH, 'application/config'); const LOG_PATH = path.join(PATH, 'log'); const CTRL_C = 3; const LOG_OPTIONS = { path: LOG_PATH, home: PATH, workerId: 0 }; +const CONTEXT = metavm.createContext({ process }); +const CFG_OPTIONS = { mode: process.env.MODE, context: CONTEXT }; + +const impress = { + logger: null, + config: null, + finalized: () => {}, + finalization: false, + initialization: true, + console, + applications: new Map(), + lastWorkerId: 0, +}; -const exit = async (message = 'Can not start Application server') => { - console.error(metautil.replace(message, PATH, '')); - if (logger) await logger.close(); +const exit = async (message) => { + impress.console.info(message); + if (impress.logger && impress.logger.active) await this.logger.close(); process.exit(1); }; const logError = (type) => (err) => { const msg = err.stack || err.message || 'no stack trace'; - console.error(`${type} error: ${msg}`); - if (finalization) return; - if (initialization) exit(); + impress.console.error(`${type}: ${msg}`); + if (impress.finalization) return; + if (impress.initialization) exit('Can not start Application server'); }; -process.on('uncaughtException', logError('uncaughtException')); -process.on('warning', logError('warning')); -process.on('unhandledRejection', logError('unhandledRejection')); +process.on('uncaughtException', logError('Uncaught exception')); +process.on('warning', logError('Warning')); +process.on('unhandledRejection', logError('Unhandled rejection')); + +const startWorker = (app, kind, port, id = ++impress.lastWorkerId) => { + // TODO: implement kind: 'worker' + // 'scheduler' will not be kind of worker + // it will be shared for all apps + const workerData = { id, kind, path: app.path, port }; + const options = { trackUnmanagedFds: true, workerData }; + const worker = new Worker(WORKER_PATH, options); + app.threads.set(id, worker); + + worker.on('exit', (code) => { + if (code !== 0) startWorker(app, kind, port, id); + else app.threads.delete(id); + if (app.threads.size === 0) { + impress.applications.delete(app.path); + if (impress.applications.size === 0) impress.finalized(); + } + }); + + const handlers = { + event: ({ name }) => { + if (name === 'started') { + app.ready++; + app.pool.add(worker); + if (app.threads.size === app.ready) { + impress.console.info(`App started: ${app.peth}`); + } + } + }, + + task: (msg) => { + impress.console.log({ id, type: 'task', msg }); + //const transferList = msg.port ? [msg.port] : undefined; + //scheduler.postMessage(msg, transferList); + }, + + invoke: async (msg) => { + impress.console.log({ id, type: 'invoke', msg }); + /* + const { name, port, exclusive } = msg; + if (name === 'done') { + if (exclusive) pool.release(worker); + return; + } + if (name !== 'request') return; + const promisedThread = exclusive ? pool.capture() : pool.next(); + const next = await promisedThread.catch(() => { + port.postMessage({ error: new Error('No thread available') }); + return null; + }); + if (!next) return; + next.postMessage(msg, [port]); + */ + }, + }; + + worker.on('message', (msg) => { + const handler = handlers[msg.type]; + if (handler) handler(msg); + }); +}; const validateConfig = async (config) => { - const schemaPath = path.join(__dirname, 'schemas/config'); let valid = true; + const schemaPath = path.join(__dirname, 'schemas/config'); for (const section of CONFIG_SECTIONS) { const fileName = path.join(schemaPath, section + '.js'); const schema = await loadSchema(fileName); const checkResult = schema.check(config[section]); if (!checkResult.valid) { for (const err of checkResult.errors) { - console.error(`${err} in application/config/${section}.js`); + impress.console.error(`${err} in application/config/${section}.js`); } valid = false; } } - if (!valid) exit(); + if (!valid) exit('Application server configuration is invalid'); }; -(async () => { - const context = metavm.createContext({ process }); - const CFG_OPTIONS = { mode: process.env.MODE, context }; - const config = await new Config(CFG_PATH, CFG_OPTIONS).catch((err) => { +const loadApplication = async (root) => { + impress.console.info(`Start: ${root}`); + const configPath = path.join(root, 'application/config'); + const config = await new Config(configPath, CFG_OPTIONS).catch((err) => { exit(`Can not read configuration: ${CFG_PATH}\n${err.stack}`); }); - - logger = await new Logger({ ...LOG_OPTIONS, ...config.log }); - logger.on('error', logError('logger error')); - if (logger.active) global.console = logger.console; - await validateConfig(config); + const { balancer, ports = [], workers = {} } = config.server; - const serversCount = ports.length + (balancer ? 1 : 0); - const schedulerCount = 1; - const schedulerId = serversCount; - const poolSize = workers.pool || 0; - const count = serversCount + schedulerCount + poolSize; - let startTimer = null; - let active = 0; - let starting = 0; - let scheduler = null; - const threads = new Array(count); + const threads = new Map(); const pool = new metautil.Pool({ timeout: workers.wait }); - const stop = async () => { - finalization = true; - const closing = logger.close(); - for (const worker of threads) { - worker.postMessage({ type: 'event', name: 'stop' }); - } - await closing; - }; + const app = { path: root, config, threads, pool, ready: 0 }; - const start = (id) => { - const workerPath = path.join(__dirname, 'lib/worker.js'); - const worker = new Worker(workerPath, { trackUnmanagedFds: true }); - threads[id] = worker; - if (id === schedulerId) scheduler = worker; - else if (id > schedulerId) pool.add(worker); - - worker.on('exit', (code) => { - active--; - if (code !== 0) start(id); - else if (active === 0) process.exit(0); - else if (active < 0 && id === 0) exit('Application server stopped'); - }); + if (balancer) startWorker(app, 'balancer', balancer); + for (const port of ports) startWorker(app, 'server', port); + const poolSize = workers.pool || 0; + for (let i = 0; i < poolSize; i++) startWorker(app, 'worker'); - worker.on('online', () => { - if (++starting === count) { - startTimer = setTimeout(() => { - if (active !== count) { - console.warn(`Worker ${id} initialization timeout`); - } - }, config.server.timeouts.start); - } - }); + impress.applications.set(root, app); +}; - const ITC = { - event: ({ name }) => { - if (name !== 'started') return; - active++; - if (active === count && startTimer) { - clearTimeout(startTimer); - startTimer = null; - } - }, - - task: (msg) => { - if (msg.type !== 'task') return; - const transferList = msg.port ? [msg.port] : undefined; - scheduler.postMessage(msg, transferList); - }, - - invoke: async (msg) => { - const { name, port, exclusive } = msg; - if (name === 'done') { - if (exclusive) pool.release(worker); - return; - } - if (name !== 'request') return; - const promisedThread = exclusive ? pool.capture() : pool.next(); - const next = await promisedThread.catch(() => { - port.postMessage({ error: new Error('No thread available') }); - return null; - }); - if (!next) return; - next.postMessage(msg, [port]); - }, - }; - - worker.on('message', (msg) => { - const handler = ITC[msg.type]; - if (handler) handler(msg); - }); - }; +const loadApplications = async () => { + const list = await fsp + .readFile('.applications', 'utf8') + .then((data) => data.split('\n').filter((s) => s.length !== 0)) + .catch(() => [PATH]); + for (const path of list) { + await loadApplication(path); + } +}; + +const stopApplication = (root) => { + const app = impress.applications.get(root); + for (const thread of app.threads.values()) { + thread.postMessage({ type: 'event', name: 'stop' }); + } +}; - for (let id = 0; id < count; id++) start(id); +const stop = async () => { + impress.finalization = true; + const logClosed = impress.logger.close(); + const portsClosed = new Promise((resolve) => { + impress.finalized = resolve; + setTimeout(() => { + impress.console.error('Exit with graceful shutdown timeout'); + resolve(); + }, impress.config.server.timeouts.stop); + }); + for (const app of impress.applications.values()) { + stopApplication(app.path); + } + await Promise.allSettled([logClosed, portsClosed]); + exit('Application server stopped'); +}; + +(async () => { + const configPath = path.join(PATH, 'application/config'); + const config = await new Config(configPath, CFG_OPTIONS).catch((err) => { + exit(`Can not read configuration: ${CFG_PATH}\n${err.stack}`); + }); + await validateConfig(config); + impress.config = config; + const logger = await new Logger({ ...LOG_OPTIONS, ...config.log }); + logger.on('error', logError('Logger')); + if (logger.active) impress.console = logger.console; + impress.logger = logger; process.on('SIGINT', stop); process.on('SIGTERM', stop); + const startTimer = setTimeout(() => { + impress.console.warn(`Initialization timeout`); + }, config.server.timeouts.start); + + await loadApplications(); + if (process.stdin.isTTY) { process.stdin.setRawMode(true); process.stdin.on('data', (data) => { @@ -164,5 +211,6 @@ const validateConfig = async (config) => { if (key === CTRL_C) stop(); }); } - initialization = false; -})().catch(logError('initialization')); + clearTimeout(startTimer); + impress.initialization = false; +})().catch(logError('Initialization')); diff --git a/lib/application.js b/lib/application.js index 2f3bf2f2..2f88b638 100644 --- a/lib/application.js +++ b/lib/application.js @@ -4,7 +4,9 @@ const { node, npm, metarhia } = require('./dependencies.js'); const path = require('path'); const events = require('events'); const fs = require('fs'); -const { MessageChannel, parentPort, threadId } = require('worker_threads'); +const wt = require('worker_threads'); +const { MessageChannel, parentPort, threadId } = wt; +const workerData = wt.workerData || { path: process.cwd() }; const metavm = require('metavm'); const metawatch = require('metawatch'); const metautil = require('metautil'); @@ -36,10 +38,10 @@ const SANDBOX = { ...metavm.COMMON_CONTEXT, Error, node, npm, metarhia }; class Application extends events.EventEmitter { constructor() { super(); - this.kind = ''; + this.kind = workerData.kind; this.initialization = true; this.finalization = false; - this.root = process.cwd(); + this.root = workerData.path; this.path = path.join(this.root, 'application'); this.schemas = new Schemas('schemas', this); @@ -66,8 +68,7 @@ class Application extends events.EventEmitter { return path.join(this.path, relative); } - async init(kind) { - this.kind = kind; + async init() { this.startWatch(); this.createSandbox(); this.sandbox.application.emit('loading'); @@ -85,7 +86,7 @@ class Application extends events.EventEmitter { await Promise.allSettled(this.starts.map((fn) => this.execute(fn))); this.sandbox.application.emit('started'); await this.api.load(); - if (kind === 'scheduler') await this.scheduler.load(); + if (this.kind === 'scheduler') await this.scheduler.load(); const { api } = this.sandbox; if (api.auth) { const { provider } = api.auth; diff --git a/lib/worker.js b/lib/worker.js index 10b1dd62..3bab4a2b 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -2,7 +2,7 @@ const path = require('path'); const fsp = require('fs').promises; -const { parentPort, threadId } = require('worker_threads'); +const { parentPort, threadId, workerData } = require('worker_threads'); const { Config } = require('metaconfiguration'); const { Logger } = require('metalog'); const { Server } = require('metacom'); @@ -57,17 +57,12 @@ process.on('unhandledRejection', logError('unhandledRejection')); } } - const { balancer, ports = [] } = config.server; - const servingThreads = ports.length + (balancer ? 1 : 0); + await application.init(); - let kind = 'worker'; - if (threadId <= servingThreads) kind = 'server'; - if (threadId === servingThreads + 1) kind = 'scheduler'; - - await application.init(kind); - - if (kind === 'server') { - application.server = new Server(config.server, application); + const { kind, port } = workerData; + if (kind === 'server' || kind === 'balancer') { + const options = { ...config.server, port, kind }; + application.server = new Server(options, application); } console.info(`Application started in worker ${threadId}`);