Skip to content

Commit

Permalink
feat: 🎸 add trap system for externals pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Oct 11, 2023
1 parent f168e4c commit 3e71a4d
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 20 deletions.
6 changes: 4 additions & 2 deletions packages/core/src/catcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ export default class Catcher extends SafeTransform {
if (typeof this.func === 'function') {
const e = this.func(chunk);
if (e instanceof Error) {
this.emit('error', e);
this.emit('error', e); // catch and stop
} else if (e === false) {
this.push(chunk); // no catch
}
}
} else {
this.push(chunk);
this.push(chunk); // no catch
}
done();
}
Expand Down
44 changes: 42 additions & 2 deletions packages/core/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,53 @@ ezs.createCommands = (params) => {
return commands;
};
ezs.writeTo = writeTo;
ezs.createPipeline = (input, commands) => commands.reduce((amont, aval) => amont.pipe(aval), input);
ezs.createPipeline = (input, commands, trap) => {
const output = commands.reduce((amont, aval) => amont.pipe(aval), input);
if (!trap || !trap.write) {
return output;
}
return output
.pipe(ezs.catch((e) => {
trap.write({
type: 'Run-time warning',
scope: 'data',
message: e.message.split('\n').shift(),
messageFull: e.message,
sourceError: e.sourceError,
sourceChunk: e.sourceChunk,
});
return false; // do not catch the error
}))
.once('error', (e) => {
trap.write({
type: 'Fatal run-time error',
scope: 'statements',
message: e.message,
sourceError: e.sourceError,
sourceChunk: e.sourceChunk,
});
trap.end();
})
.once('end', () => {
trap.end();
});
};
ezs.compress = (options) => compressStream(ezs, options);
ezs.uncompress = (options) => uncompressStream(ezs, options);
ezs.createStream = (options) => new PassThrough(options);
ezs.createServer = (port, path) => Server.createServer(ezs, port, path);
ezs.createCluster = (port, path) => Server.createCluster(ezs, port, path);

ezs.createTrap = (file, env) => {
if (!file) {
return;
}
const input = ezs.createStream(ezs.objectMode());
ezs.createPipeline(input, ezs.compileCommands(ezs.createCommands({ file }), env))
.once('error', () => true)
.once('end', () => true)
.on('data', () => true);
return input;
};
ezs.use(Statements);

module.exports = ezs;
4 changes: 3 additions & 1 deletion packages/core/src/statements/combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ async function saveIn(data, feed) {
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @returns {Object}
*/
export default function combine(data, feed) {
Expand All @@ -90,7 +91,8 @@ export default function combine(data, feed) {
if (!database[this.databaseID]) {
database[this.databaseID] = {};
const statements = ezs.compileCommands(commands, this.getEnv());
const output = ezs.createPipeline(input, statements)
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const output = ezs.createPipeline(input, statements, logger)
.pipe(ezs(saveIn, null, this.databaseID))
.pipe(ezs.catch())
.on('data', (d) => assert(d)) // WARNING: The data must be consumed, otherwise the "end" event has not been triggered
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/statements/delegate.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import debug from 'debug';
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @returns {Object}
*/
export default function delegate(data, feed) {
Expand All @@ -25,7 +26,8 @@ export default function delegate(data, feed) {
append: this.getParam('append'),
});
const statements = ezs.compileCommands(commands, this.getEnv());
const output = ezs.createPipeline(this.input, statements)
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const output = ezs.createPipeline(this.input, statements, logger)
.pipe(ezs.catch((e) => feed.write(e))); // avoid to break pipeline at each error
this.whenFinish = feed.flow(output);
}
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/statements/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async function mergeWith(data, feed) {
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @param {String} [cacheName] Enable cache, with dedicated name
* @param {String} [token] add token values in the subpipeline (optional)
* @returns {Object}
Expand Down Expand Up @@ -137,7 +138,8 @@ export default async function expand(data, feed) {
append: this.getParam('append'),
});
const statements = ezs.compileCommands(commands, this.getEnv());
const output = ezs.createPipeline(input, statements)
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const output = ezs.createPipeline(input, statements, logger)
.pipe(ezs(mergeWith, { path }, {
store: this.store,
cachePath: this.cachePath,
Expand Down Expand Up @@ -188,7 +190,8 @@ export default async function expand(data, feed) {
append: this.getParam('append'),
});
const statements = ezs.compileCommands(commands, this.getEnv());
const output = ezs.createPipeline(input, statements)
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const output = ezs.createPipeline(input, statements, logger)
.pipe(ezs(mergeWith, { path }, {
store: this.store,
cachePath: this.cachePath,
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/statements/fork.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import _ from 'lodash';
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @returns {Object}
*/
export default function fork(data, feed) {
Expand All @@ -34,7 +35,8 @@ export default function fork(data, feed) {
append: this.getParam('append'),
});
const statements = ezs.compileCommands(commands, this.getEnv());
output = ezs.createPipeline(this.input, statements);
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
output = ezs.createPipeline(this.input, statements, logger);
}
catch(e) {
return feed.stop(e);
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/statements/loop.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async function loopFunc(data, feed) {
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in an object
* @param {String} [command] the external pipeline is described in an URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @returns {Object}
*/
export default function loop(data, feed) {
Expand All @@ -66,7 +67,8 @@ export default function loop(data, feed) {
.map((i) => (reverse ? !i : i));
const statements = ezs.compileCommands(commands, this.getEnv());
const input = ezs.createStream(ezs.objectMode());
const output = ezs.createPipeline(input, statements)
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const output = ezs.createPipeline(input, statements, logger)
.pipe(ezs(loopFunc, {
reverse,
depth: 1,
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/statements/map.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import from from 'from';
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in an object
* @param {String} [command] the external pipeline is described in an URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @returns {Object}
*/
export default function map(data, feed) {
Expand All @@ -38,7 +39,8 @@ export default function map(data, feed) {
return feed.send(data);
}
const newValue = [];
const output = ezs.createPipeline(from(value), this.createStatements());
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const output = ezs.createPipeline(from(value), this.createStatements(), logger);
return output
.pipe(ezs.catch())
.on('error', (error) => {
Expand Down
8 changes: 5 additions & 3 deletions packages/core/src/statements/parallel.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import merge from 'merge2';
import debug from 'debug';
import settings from '../settings';

export const duplexer = (ezs, commands, environment) => () => {
export const duplexer = (ezs, commands, environment, logger) => () => {
const input = ezs.createStream(ezs.objectMode());
const streams = ezs.compileCommands(commands, environment);
const output = ezs.createPipeline(input, streams);
const output = ezs.createPipeline(input, streams, logger);
const duplex = [input, output];
return duplex;
};
Expand All @@ -17,6 +17,7 @@ export const duplexer = (ezs, commands, environment) => () => {
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @returns {Object}
*/
export default function parallel(data, feed) {
Expand All @@ -36,7 +37,8 @@ export default function parallel(data, feed) {
return feed.stop(new Error('Invalid parmeter for [parallel]'));
}
debug('ezs')(`[parallel] start with #${concurrency} workers.`);
const handles = Array(concurrency).fill(true).map(duplexer(ezs, commands, environment));
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const handles = Array(concurrency).fill(true).map(duplexer(ezs, commands, environment, logger));
this.ins = handles.map((h) => h[0]);
this.outs = handles.map((h) => h[1]);
const funnel = merge(this.outs, ezs.objectMode())
Expand Down
14 changes: 13 additions & 1 deletion packages/core/src/statements/singleton.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import debug from 'debug';
import { addedDiff } from 'deep-object-diff';

/**
* Takes only the first `Object` delegate processing to a external pipeline
*
* @name parallel
* @param {String} [file] the external pipeline is described in a file
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @returns {Object}
*/
export default function singleton(data, feed) {
if (this.isLast()) {
return feed.close();
Expand All @@ -20,7 +31,8 @@ export default function singleton(data, feed) {
append: this.getParam('append'),
});
const statements = ezs.compileCommands(commands, this.getEnv());
ezs.createPipeline(input, statements)
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
ezs.createPipeline(input, statements, logger)
.pipe(ezs.catch())
.on('error', (e) => feed.stop(e))
.on('data', (chunk) => {
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/statements/spawn.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in an object
* @param {String} [command] the external pipeline is described in an URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @param {String} [cache] Use a specific ezs statement to run commands (advanced)
* @returns {Object}
*/
Expand All @@ -32,7 +33,8 @@ export default async function spawn(data, feed) {
statements = ezs.compileCommands(commands, this.getEnv());
}
const input = ezs.createStream(ezs.objectMode());
const output = ezs.createPipeline(input, statements)
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const output = ezs.createPipeline(input, statements, logger)
.pipe(ezs.catch((e) => feed.write(e))); // avoid to break pipeline at each error
ezs.writeTo(input, data, () => input.end());
await feed.flow(output);
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/statements/swing.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import debug from 'debug';
* @param {String} [test] if test is true
* @param {String} [reverse=false] reverse the test
* @param {String} [file] the external pipeline is described in a file
* @param {String} [script] the external pipeline is described in a string of
* characters
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in an object
* @param {String} [command] the external pipeline is described in an URL-like
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* command
* @returns {Object}
*/
Expand All @@ -30,7 +30,8 @@ export default function swing(data, feed) {
append: this.getParam('append'),
});
const statements = ezs.compileCommands(commands, this.getEnv());
const output = ezs.createPipeline(this.input, statements)
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const output = ezs.createPipeline(this.input, statements, logger)
.pipe(ezs.catch((e) => feed.write(e))); // avoid to break pipeline at each error
feed.timeout = 0; // special case : test could be never true all the time, and so the feed will never receive any data
this.whenFinish = feed.flow(output);
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/statements/time.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ export default function time(data, feed) {
const streams = ezs.compileCommands(commands, environment);
this.input = new PassThrough({ objectMode: true });

const output = ezs.createPipeline(this.input, streams)
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const output = ezs.createPipeline(this.input, streams, logger)
.pipe(ezs.catch((e) => feed.write(e)))
.once('error', (e) => feed.stop(e))
.on('data', () => feed.write());
Expand Down
90 changes: 90 additions & 0 deletions packages/core/test/fork.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,94 @@ describe('fork)', () => {



it.only('#6 (trap & standalone)', (done) => {
const script = `
[aie]
`;
const env = {
trap: false,
}
from([
{ a: 1, b: 9 },
{ a: 2, b: 9 },
{ a: 1, b: 9 },
{ a: 1, b: 9 },
{ a: 1, b: 9 },
])
.pipe(ezs('fork', {
script,
standalone: true,
logger: './trap.ini',
},
env,
))
.pipe(ezs.catch())
.on('error', (e) => {
try {
expect(e.message).toEqual(expect.stringContaining('aie'));
setTimeout(
() => {
expect(env.trap).toEqual(true);
done();
},
500,
);
} catch(ee) {
done(ee);
}
})
.on('data', () => true)
.on('end', () => {
done(new Error('Error is the right behavior'));
});
});

it.only('#7 (trap)', (done) => {
const script = `
[boum]
`;
const env = {
trap: false,
}
from([
{ a: 1, b: 9 },
{ a: 2, b: 9 },
{ a: 1, b: 9 },
{ a: 1, b: 9 },
{ a: 1, b: 9 },
])
.pipe(ezs('fork', {
script,
standalone: true,
logger: './trap.ini',
},
env,
))
.pipe(ezs.catch())
.on('error', (e) => {
try {
expect(e.message).toEqual(expect.stringContaining('Boom!'));
setTimeout(
() => {
expect(env.trap).toEqual(true);
done();
},
500,
);
} catch(ee) {
done(ee);
}
})
.on('data', () => true)
.on('end', () => {
done(new Error('Error is the right behavior'));
});
});







});
Loading

0 comments on commit 3e71a4d

Please sign in to comment.