diff --git a/packages/core/src/statements/combine.js b/packages/core/src/statements/combine.js index 479a72bc..ba673379 100644 --- a/packages/core/src/statements/combine.js +++ b/packages/core/src/statements/combine.js @@ -2,11 +2,16 @@ import _ from 'lodash'; import debug from 'debug'; import assert from 'assert'; import hasher from 'node-object-hash'; +import { resolve as resolvePath } from 'path'; +import { tmpdir } from 'os'; +import makeDir from 'make-dir'; +import pathExists from 'path-exists'; +import cacache from 'cacache'; const hashCoerce = hasher({ sort: false, coerce: true }); const core = (id, value) => ({ id, value }); -const database = {}; +export const database = {}; async function saveIn(data, feed) { if (this.isLast()) { @@ -23,10 +28,37 @@ async function saveIn(data, feed) { database[databaseID][id] = value; } } - return feed.send(id); + return feed.send(data); +} + +function cacheSave(data, feed) { + const { ezs } = this; + const cachePath = this.getParam('cachePath'); + const cacheKey = this.getParam('cacheKey'); + if (this.isLast()) { + if (cachePath && cacheKey) { + this.whenFinish.finally(() => feed.close()); + return this.input.end(); + } + return feed.close(); + } + if (cachePath && cacheKey) { + if (!this.input) { + this.input = ezs.createStream(ezs.objectMode()); + this.whenFinish = new Promise( + (resolve) => this.input + .pipe(ezs('pack')) + .pipe(cacache.put.stream(cachePath, cacheKey)) + .on('end', resolve) + ); + } + return ezs.writeTo(this.input, data, () => feed.send(data)); + } + return feed.send(data); } + /** * Takes an `Object` and substitute a field with the corresponding value found in a external pipeline * the internal pipeline must produce a stream of special object (id, value) @@ -70,15 +102,23 @@ async function saveIn(data, feed) { * @param {String} [commands] the external pipeline is described in a object * @param {String} [command] the external pipeline is described in a URL-like command * @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors + * @param {String} [cacheName] Enable cache, with dedicated name * @returns {Object} */ -export default function combine(data, feed) { +export default async function combine(data, feed) { const { ezs } = this; + const cacheName = this.getParam('cacheName'); let whenReady = Promise.resolve(true); if (this.isFirst()) { + if (cacheName && !this.cachePath) { + const location = this.getParam('location'); + this.cachePath = resolvePath(location || tmpdir(), 'memory', `combine/${cacheName}`); + if (!pathExists.sync(this.cachePath)) { + makeDir.sync(this.cachePath); + } + } debug('ezs')('[combine] with sub pipeline.'); const primer = this.getParam('primer', 'n/a'); - const input = ezs.createStream(ezs.objectMode()); const commands = ezs.createCommands({ file: this.getParam('file'), script: this.getParam('script'), @@ -88,11 +128,26 @@ export default function combine(data, feed) { append: this.getParam('append'), }); this.databaseID = hashCoerce.hash({ primer, commands }); + const input = ezs.createStream(ezs.objectMode()); if (!database[this.databaseID]) { database[this.databaseID] = {}; - const statements = ezs.compileCommands(commands, this.getEnv()); - const logger = ezs.createTrap(this.getParam('logger'), this.getEnv()); - const output = ezs.createPipeline(input, statements, logger) + let stream; + if (cacheName) { + const cacheObject = await cacache.get.info(this.cachePath, this.databaseID); + if (cacheObject) { + stream = cacache.get.stream.byDigest(this.cachePath, cacheObject.integrity).pipe(ezs('unpack')); + } + } + if (!stream) { + const statements = ezs.compileCommands(commands, this.getEnv()); + const logger = ezs.createTrap(this.getParam('logger'), this.getEnv()); + stream = ezs.createPipeline(input, statements, logger) + .pipe(ezs(cacheSave, { + cachePath: this.cachePath, + cacheKey: this.databaseID, + })); + } + const output = stream .pipe(ezs(saveIn, null, this.databaseID)) .pipe(ezs.catch()) .on('data', (d) => assert(d)) // WARNING: The data must be consumed, otherwise the "end" event has not been triggered @@ -105,42 +160,35 @@ export default function combine(data, feed) { if (this.isLast()) { return feed.close(); } - return whenReady - .then(() => { - const defval = this.getParam('default', null); - const path = this.getParam('path'); - const pathVal = _.get(data, path); - const keys = [].concat(pathVal).filter(Boolean); - if (keys.length === 0) { - return feed.send(data); - } - const values = keys.map((key) => { - if (!database[this.databaseID][key]) { - return null; - } - return core(key, database[this.databaseID][key]); - }); - if (values.length && Array.isArray(pathVal)) { - _.set(data, path, values); - } else if (values.length && !Array.isArray(pathVal)) { - const val = values.shift(); - if (val !== null) { - _.set(data, path, val); - } else if (defval !== null) { - const orig = _.get(data, path); - _.set(data, path, { id: orig, value: defval }); - } else { - const orig = _.get(data, path); - _.set(data, path, { id: orig, value: orig }); - } - } else if (Array.isArray(pathVal)) { - _.set(data, path, pathVal.map((id) => ({ id }))); - } else { - _.set(data, path, { id: pathVal }); - } - return feed.send(data); - }) - .catch((e) => { - feed.stop(e); - }); + await whenReady; + const defval = this.getParam('default', null); + const path = this.getParam('path'); + const pathVal = _.get(data, path); + const keys = [].concat(pathVal).filter(Boolean); + if (keys.length === 0) { + return feed.send(data); + } + const values = keys.map((key) => { + if (!database[this.databaseID][key]) { + return null; + } + return core(key, database[this.databaseID][key]); + }); + // length of the values is always equal to the length of the keys. + if (Array.isArray(pathVal)) { + _.set(data, path, values); + } else { + const val = values.shift(); + if (val !== null) { + _.set(data, path, val); + } else if (defval !== null) { + const orig = _.get(data, path); + _.set(data, path, { id: orig, value: defval }); + } else { + const orig = _.get(data, path); + _.set(data, path, { id: orig, value: orig }); + } + } + return feed.send(data); + } diff --git a/packages/core/test/combine.js b/packages/core/test/combine.js index 4f9206dc..fd8e59ca 100644 --- a/packages/core/test/combine.js +++ b/packages/core/test/combine.js @@ -2,6 +2,7 @@ import assert from 'assert'; import fs from 'fs'; import from from 'from'; import ezs from '../src'; +import { database } from '../src/statements/combine'; ezs.addPath(__dirname); @@ -209,6 +210,47 @@ describe('combine', () => { done(); }); }); + test('with script #4', (done) => { + const input = [ + { a: 1, b: ['a', 'b'] }, + { a: 2 }, + { a: 3, b: 'c' }, + { a: 4, b: 'y' }, + { a: 5, b: 'e' }, + { a: 6, b: 'z' }, + ]; + const output = []; + const script = ` + [use] + plugin = analytics + + [replace] + path = value + value = fix({id:'a', value:'aa'},{id:'b', value:'bb'},{id:'c', value:'cc'},{id:'d', value:'dd'},{id:'e', value:'ee'},{id:'f', value:'ff'}) + + [exploding] + [value] + `; + + from(input) + .pipe(ezs('combine', { path: 'b', default:'n/a', script })) + .pipe(ezs.catch()) + .on('error', done) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 6); + assert.equal(output[0].b[0].value, 'aa'); + assert.equal(output[0].b[1].value, 'bb'); + assert.equal(output[1].b, undefined); + assert.equal(output[2].b.value, 'cc'); + assert.equal(output[3].b.value, 'n/a'); + assert.equal(output[4].b.value, 'ee'); + assert.equal(output[5].b.value, 'n/a'); + done(); + }); + }); test('with file', (done) => { const input = [ { a: 1, b: 'a' }, @@ -331,6 +373,7 @@ describe('no combine', () => { done(new Error('Error is the right behavior')); }); }); + test.skip('with wrong location', (done) => { const input = [ { a: 1, b: 'a' }, @@ -364,7 +407,7 @@ describe('no combine', () => { }) .on('end', () => { done(new Error('Error is the right behavior')); - }); + }); }); }); @@ -388,20 +431,20 @@ const cacheScript = ` [value] `; - - -test('combine with internal cache with script #1', (done) => { +const nextTest = (done) => { const input = [ - { a: 1, b: 'a' }, - { a: 2, b: 'b' }, - { a: 3, b: 'c' }, - { a: 4, b: 'd' }, - { a: 5, b: 'e' }, - { a: 6, b: 'f' }, + { a: 11, b: 'a' }, + { a: 12, b: 'b' }, + { a: 13, b: 'c' }, + { a: 14, b: 'd' }, + { a: 15, b: 'e' }, + { a: 16, b: 'f' }, ]; + // Clean memory cache to hit the file cache + Object.keys(database).forEach(k => {delete database[k];}); const output = []; from(input) - .pipe(ezs('combine', { path: 'b', script: cacheScript }, env)) + .pipe(ezs('combine', { path: 'b', script: cacheScript, cacheName }, env)) .pipe(ezs.catch()) .on('error', done) .on('data', (chunk) => { @@ -415,13 +458,13 @@ test('combine with internal cache with script #1', (done) => { assert.equal(output[3].b.value, 'dd'); assert.equal(output[4].b.value, 'ee'); assert.equal(output[5].b.value, 'ff'); - assert.equal(env.executed, true); - env.executed = false; + assert.equal(env.executed, false); done(); }); -}); +}; + -test('combine with internal cache with script #2', (done) => { +test('combine with internal cache with script', (done) => { const input = [ { a: 1, b: 'a' }, { a: 2, b: 'b' }, @@ -432,7 +475,7 @@ test('combine with internal cache with script #2', (done) => { ]; const output = []; from(input) - .pipe(ezs('combine', { path: 'b', script: cacheScript }, env)) + .pipe(ezs('combine', { path: 'b', script: cacheScript, cacheName }, env)) .pipe(ezs.catch()) .on('error', done) .on('data', (chunk) => { @@ -446,9 +489,9 @@ test('combine with internal cache with script #2', (done) => { assert.equal(output[3].b.value, 'dd'); assert.equal(output[4].b.value, 'ee'); assert.equal(output[5].b.value, 'ff'); - assert.equal(env.executed, false); - done(); + assert.equal(env.executed, true); + env.executed = false; + nextTest(done); }); }); -