From 19b4ab2a8715e33830dc3ff53a4f0bccd735608b Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Fri, 2 Aug 2024 10:33:17 +0200 Subject: [PATCH] allow statements to use the server fuse --- packages/core/src/server/knownPipeline.js | 20 ++++++++++---------- packages/core/src/statements/loop.js | 8 ++++++++ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/packages/core/src/server/knownPipeline.js b/packages/core/src/server/knownPipeline.js index 8195be31..d62c86f2 100644 --- a/packages/core/src/server/knownPipeline.js +++ b/packages/core/src/server/knownPipeline.js @@ -31,23 +31,23 @@ const knownPipeline = (ezs) => (request, response, next) => { return next(); } request.catched = true; - debug('ezs')(`Create middleware 'knownPipeline' for ${request.method} ${request.pathName}`); + const { headers, fusible, method, pathName } = request; + const { query } = request.urlParsed; - const { headers, fusible } = request; + debug('ezs')(`Create middleware 'knownPipeline' for ${method} ${pathName}`); const triggerError = errorHandler(request, response); - const { query } = request.urlParsed; - const files = ezs.memoize(`knownPipeline>${request.pathName}`, - () => request.pathName + const files = ezs.memoize(`knownPipeline>${pathName}`, + () => pathName .slice(1) .split(',') .map((file) => join(request.serverPath, dirname(file), basename(file, '.ini').concat('.ini'))) .filter((file) => isFile(file))); if (files.length === 0) { - triggerError(new Error(`Cannot find ${request.pathName}`), 404); + triggerError(new Error(`Cannot find ${pathName}`), 404); return false; } debug('ezs')( - `PID ${process.pid} will execute ${request.pathName} commands with ${sizeof(query)}B of global parameters`, + `PID ${process.pid} will execute ${pathName} commands with ${sizeof(query)}B of global parameters`, ); const meta = ezs.memoize(`executePipeline>${files}`, @@ -67,7 +67,7 @@ const knownPipeline = (ezs) => (request, response, next) => { response.socket.setNoDelay(false); - if (request.method !== 'POST') { + if (method !== 'POST') { response.writeHead(200); response.end(); return true; @@ -80,7 +80,7 @@ const knownPipeline = (ezs) => (request, response, next) => { metricsEnable, } = settings; const execMode = server ? 'dispatch' : delegate; - const environment = { ...query, headers }; + const environment = { ...query, headers, request: { fusible, method, pathName } }; const statements = files.map((file) => ezs(execMode, { file, server }, environment)); const prepend2Pipeline = ezs.parseCommand(onlyOne(prepend)); if (prepend2Pipeline) { @@ -95,7 +95,7 @@ const knownPipeline = (ezs) => (request, response, next) => { statements.push(ezs('tracer', { print: '.', last: '!' })); } if (metricsEnable) { - ezs.use({metrics: metricsHandle(request.pathName)}); + ezs.use({metrics: metricsHandle(pathName)}); statements.unshift(ezs('metrics', { bucket: 'input' })); statements.push(ezs('metrics', { bucket: 'output' })); } diff --git a/packages/core/src/statements/loop.js b/packages/core/src/statements/loop.js index 8b5cbaa3..cb31d9a2 100644 --- a/packages/core/src/statements/loop.js +++ b/packages/core/src/statements/loop.js @@ -1,3 +1,5 @@ +import breaker from './breaker'; + async function loopFunc(data, feed) { const { ezs } = this; if (this.isLast()) { @@ -8,6 +10,8 @@ async function loopFunc(data, feed) { const depth = this.getParam('depth'); const maxDepth = this.getParam('maxDepth'); const reverse = this.getParam('reverse'); + const fusible = this.getParam('fusible'); + const control = fusible ? ezs(breaker, { fusible }) : ezs('transit'); const tests = [] .concat(getParam('test', false, data)) .map((i) => Boolean(i)) @@ -15,6 +19,7 @@ async function loopFunc(data, feed) { const input = ezs.createStream(ezs.objectMode()); const statements = ezs.compileCommands(commands, getEnv()); const output = ezs.createPipeline(input, statements) + .pipe(control) .pipe(ezs(loopFunc, { reverse, depth: depth + 1, maxDepth }, this.getEnv())) .pipe(ezs.catch((e) => feed.write(e))); // avoid to break pipeline at each error @@ -44,6 +49,7 @@ async function loopFunc(data, feed) { * @param {String} [commands] the external pipeline is described in an object * @param {String} [command] the external pipeline is described in an URL-like command * @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors + * @param {String} [fusible] Can be set with the ezs server fusible see env('request.fusible') * @returns {Object} */ export default function loop(data, feed) { @@ -61,6 +67,7 @@ export default function loop(data, feed) { }); const maxDepth = Number(this.getParam('maxDepth', 100000)); const reverse = Boolean(this.getParam('reverse', false)); + const fusible = this.getParam('fusible', false); const tests = [] .concat(this.getParam('test', false)) .map((i) => Boolean(i)) @@ -73,6 +80,7 @@ export default function loop(data, feed) { reverse, depth: 1, maxDepth, + fusible, }, { commands, getEnv: this.getEnv,