From f79a52a028b287d7fc6fc0d566ba7bf8f728a436 Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Thu, 8 Feb 2024 17:30:33 +0100 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20cacheName=20for=20co?= =?UTF-8?q?mbine=20like=20expand?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/src/statements/combine.js | 60 ++++++++++++++++++++++--- packages/core/test/combine.js | 12 +++-- 2 files changed, 59 insertions(+), 13 deletions(-) diff --git a/packages/core/src/statements/combine.js b/packages/core/src/statements/combine.js index 479a72bc1..f057c15aa 100644 --- a/packages/core/src/statements/combine.js +++ b/packages/core/src/statements/combine.js @@ -2,6 +2,11 @@ import _ from 'lodash'; import debug from 'debug'; import assert from 'assert'; import hasher from 'node-object-hash'; +import { resolve as resolvePath } from 'path'; +import { tmpdir } from 'os'; +import makeDir from 'make-dir'; +import pathExists from 'path-exists'; +import cacache from 'cacache'; const hashCoerce = hasher({ sort: false, coerce: true }); const core = (id, value) => ({ id, value }); @@ -23,10 +28,30 @@ async function saveIn(data, feed) { database[databaseID][id] = value; } } - return feed.send(id); + return feed.send(data); +} + +async function cacheSave(data, feed) { + const { ezs } = this; + if (this.isLast()) { + return feed.close(); + } + const cachePath = this.getParam('cachePath'); + const cacheKey = this.getParam('cacheKey'); + if (cachePath && cacheKey) { + if (this.isFirst()) { + this.input = ezs.createStream(ezs.objectMode()); + this.input + .pipe(ezs('pack')) + .pipe(cacache.put.stream(cachePath, cacheKey)); + } + return ezs.writeTo(this.input, data, () => feed.send(data)); + } + feed.send(data); } + /** * Takes an `Object` and substitute a field with the corresponding value found in a external pipeline * the internal pipeline must produce a stream of special object (id, value) @@ -70,15 +95,23 @@ async function saveIn(data, feed) { * @param {String} [commands] the external pipeline is described in a object * @param {String} [command] the external pipeline is described in a URL-like command * @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors + * @param {String} [cacheName] Enable cache, with dedicated name * @returns {Object} */ -export default function combine(data, feed) { +export default async function combine(data, feed) { const { ezs } = this; + const cacheName = this.getParam('cacheName'); let whenReady = Promise.resolve(true); if (this.isFirst()) { + if (cacheName && !this.cachePath) { + const location = this.getParam('location'); + this.cachePath = resolvePath(location || tmpdir(), 'memory', `combine/${cacheName}`); + if (!pathExists.sync(this.cachePath)) { + makeDir.sync(this.cachePath); + } + } debug('ezs')('[combine] with sub pipeline.'); const primer = this.getParam('primer', 'n/a'); - const input = ezs.createStream(ezs.objectMode()); const commands = ezs.createCommands({ file: this.getParam('file'), script: this.getParam('script'), @@ -88,11 +121,26 @@ export default function combine(data, feed) { append: this.getParam('append'), }); this.databaseID = hashCoerce.hash({ primer, commands }); + const input = ezs.createStream(ezs.objectMode()); if (!database[this.databaseID]) { database[this.databaseID] = {}; - const statements = ezs.compileCommands(commands, this.getEnv()); - const logger = ezs.createTrap(this.getParam('logger'), this.getEnv()); - const output = ezs.createPipeline(input, statements, logger) + let stream; + if (cacheName) { + const cacheObject = await cacache.get.info(this.cachePath, this.databaseID); + if (cacheObject) { + stream = cacache.get.stream.byDigest(this.cachePath, cacheObject.integrity).pipe(ezs('unpack')); + } + } + if (!stream) { + const statements = ezs.compileCommands(commands, this.getEnv()); + const logger = ezs.createTrap(this.getParam('logger'), this.getEnv()); + stream = ezs.createPipeline(input, statements, logger) + .pipe(ezs(cacheSave, { + cachePath: this.cachePath, + cacheKey: this.databaseID, + })); + } + const output = stream .pipe(ezs(saveIn, null, this.databaseID)) .pipe(ezs.catch()) .on('data', (d) => assert(d)) // WARNING: The data must be consumed, otherwise the "end" event has not been triggered diff --git a/packages/core/test/combine.js b/packages/core/test/combine.js index 4f9206dc8..e136a54a8 100644 --- a/packages/core/test/combine.js +++ b/packages/core/test/combine.js @@ -364,7 +364,7 @@ describe('no combine', () => { }) .on('end', () => { done(new Error('Error is the right behavior')); - }); + }); }); }); @@ -390,7 +390,7 @@ const cacheScript = ` -test('combine with internal cache with script #1', (done) => { +test.only('combine with internal cache with script #1', (done) => { const input = [ { a: 1, b: 'a' }, { a: 2, b: 'b' }, @@ -401,7 +401,7 @@ test('combine with internal cache with script #1', (done) => { ]; const output = []; from(input) - .pipe(ezs('combine', { path: 'b', script: cacheScript }, env)) + .pipe(ezs('combine', { path: 'b', script: cacheScript, cacheName }, env)) .pipe(ezs.catch()) .on('error', done) .on('data', (chunk) => { @@ -421,7 +421,7 @@ test('combine with internal cache with script #1', (done) => { }); }); -test('combine with internal cache with script #2', (done) => { +test.only('combine with internal cache with script #2', (done) => { const input = [ { a: 1, b: 'a' }, { a: 2, b: 'b' }, @@ -432,7 +432,7 @@ test('combine with internal cache with script #2', (done) => { ]; const output = []; from(input) - .pipe(ezs('combine', { path: 'b', script: cacheScript }, env)) + .pipe(ezs('combine', { path: 'b', script: cacheScript, cacheName }, env)) .pipe(ezs.catch()) .on('error', done) .on('data', (chunk) => { @@ -450,5 +450,3 @@ test('combine with internal cache with script #2', (done) => { done(); }); }); - - From fbeb53184028cacf3fc9ac6fa0096601f76aeded Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Fri, 9 Feb 2024 12:17:00 +0100 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20=F0=9F=90=9B=20improve=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/src/statements/combine.js | 16 +++--- packages/core/test/combine.js | 75 ++++++++++++++++++++----- 2 files changed, 68 insertions(+), 23 deletions(-) diff --git a/packages/core/src/statements/combine.js b/packages/core/src/statements/combine.js index f057c15aa..30a18e805 100644 --- a/packages/core/src/statements/combine.js +++ b/packages/core/src/statements/combine.js @@ -11,7 +11,7 @@ import cacache from 'cacache'; const hashCoerce = hasher({ sort: false, coerce: true }); const core = (id, value) => ({ id, value }); -const database = {}; +export const database = {}; async function saveIn(data, feed) { if (this.isLast()) { @@ -31,7 +31,7 @@ async function saveIn(data, feed) { return feed.send(data); } -async function cacheSave(data, feed) { +function cacheSave(data, feed) { const { ezs } = this; if (this.isLast()) { return feed.close(); @@ -40,6 +40,7 @@ async function cacheSave(data, feed) { const cacheKey = this.getParam('cacheKey'); if (cachePath && cacheKey) { if (this.isFirst()) { + // console.log('cache set', cachePath, cacheKey); this.input = ezs.createStream(ezs.objectMode()); this.input .pipe(ezs('pack')) @@ -126,7 +127,9 @@ export default async function combine(data, feed) { database[this.databaseID] = {}; let stream; if (cacheName) { + // console.log('cache get', this.cachePath, this.databaseID); const cacheObject = await cacache.get.info(this.cachePath, this.databaseID); + // console.log({cacheObject}); if (cacheObject) { stream = cacache.get.stream.byDigest(this.cachePath, cacheObject.integrity).pipe(ezs('unpack')); } @@ -168,9 +171,10 @@ export default async function combine(data, feed) { } return core(key, database[this.databaseID][key]); }); - if (values.length && Array.isArray(pathVal)) { + // length of the values is always equal to the length of the keys. + if (Array.isArray(pathVal)) { _.set(data, path, values); - } else if (values.length && !Array.isArray(pathVal)) { + } else { const val = values.shift(); if (val !== null) { _.set(data, path, val); @@ -181,10 +185,6 @@ export default async function combine(data, feed) { const orig = _.get(data, path); _.set(data, path, { id: orig, value: orig }); } - } else if (Array.isArray(pathVal)) { - _.set(data, path, pathVal.map((id) => ({ id }))); - } else { - _.set(data, path, { id: pathVal }); } return feed.send(data); }) diff --git a/packages/core/test/combine.js b/packages/core/test/combine.js index e136a54a8..fd8e59ca5 100644 --- a/packages/core/test/combine.js +++ b/packages/core/test/combine.js @@ -2,6 +2,7 @@ import assert from 'assert'; import fs from 'fs'; import from from 'from'; import ezs from '../src'; +import { database } from '../src/statements/combine'; ezs.addPath(__dirname); @@ -209,6 +210,47 @@ describe('combine', () => { done(); }); }); + test('with script #4', (done) => { + const input = [ + { a: 1, b: ['a', 'b'] }, + { a: 2 }, + { a: 3, b: 'c' }, + { a: 4, b: 'y' }, + { a: 5, b: 'e' }, + { a: 6, b: 'z' }, + ]; + const output = []; + const script = ` + [use] + plugin = analytics + + [replace] + path = value + value = fix({id:'a', value:'aa'},{id:'b', value:'bb'},{id:'c', value:'cc'},{id:'d', value:'dd'},{id:'e', value:'ee'},{id:'f', value:'ff'}) + + [exploding] + [value] + `; + + from(input) + .pipe(ezs('combine', { path: 'b', default:'n/a', script })) + .pipe(ezs.catch()) + .on('error', done) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 6); + assert.equal(output[0].b[0].value, 'aa'); + assert.equal(output[0].b[1].value, 'bb'); + assert.equal(output[1].b, undefined); + assert.equal(output[2].b.value, 'cc'); + assert.equal(output[3].b.value, 'n/a'); + assert.equal(output[4].b.value, 'ee'); + assert.equal(output[5].b.value, 'n/a'); + done(); + }); + }); test('with file', (done) => { const input = [ { a: 1, b: 'a' }, @@ -331,6 +373,7 @@ describe('no combine', () => { done(new Error('Error is the right behavior')); }); }); + test.skip('with wrong location', (done) => { const input = [ { a: 1, b: 'a' }, @@ -388,17 +431,17 @@ const cacheScript = ` [value] `; - - -test.only('combine with internal cache with script #1', (done) => { +const nextTest = (done) => { const input = [ - { a: 1, b: 'a' }, - { a: 2, b: 'b' }, - { a: 3, b: 'c' }, - { a: 4, b: 'd' }, - { a: 5, b: 'e' }, - { a: 6, b: 'f' }, + { a: 11, b: 'a' }, + { a: 12, b: 'b' }, + { a: 13, b: 'c' }, + { a: 14, b: 'd' }, + { a: 15, b: 'e' }, + { a: 16, b: 'f' }, ]; + // Clean memory cache to hit the file cache + Object.keys(database).forEach(k => {delete database[k];}); const output = []; from(input) .pipe(ezs('combine', { path: 'b', script: cacheScript, cacheName }, env)) @@ -415,13 +458,13 @@ test.only('combine with internal cache with script #1', (done) => { assert.equal(output[3].b.value, 'dd'); assert.equal(output[4].b.value, 'ee'); assert.equal(output[5].b.value, 'ff'); - assert.equal(env.executed, true); - env.executed = false; + assert.equal(env.executed, false); done(); }); -}); +}; -test.only('combine with internal cache with script #2', (done) => { + +test('combine with internal cache with script', (done) => { const input = [ { a: 1, b: 'a' }, { a: 2, b: 'b' }, @@ -446,7 +489,9 @@ test.only('combine with internal cache with script #2', (done) => { assert.equal(output[3].b.value, 'dd'); assert.equal(output[4].b.value, 'ee'); assert.equal(output[5].b.value, 'ff'); - assert.equal(env.executed, false); - done(); + assert.equal(env.executed, true); + env.executed = false; + nextTest(done); }); }); + From 0b78a9ef9b05eaa0bbfbe20678413cdc402e3058 Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Fri, 9 Feb 2024 18:32:01 +0100 Subject: [PATCH 3/3] =?UTF-8?q?test:=20=F0=9F=92=8D=20fix=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/src/statements/combine.js | 90 ++++++++++++------------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/packages/core/src/statements/combine.js b/packages/core/src/statements/combine.js index 30a18e805..ba673379e 100644 --- a/packages/core/src/statements/combine.js +++ b/packages/core/src/statements/combine.js @@ -33,22 +33,28 @@ async function saveIn(data, feed) { function cacheSave(data, feed) { const { ezs } = this; + const cachePath = this.getParam('cachePath'); + const cacheKey = this.getParam('cacheKey'); if (this.isLast()) { + if (cachePath && cacheKey) { + this.whenFinish.finally(() => feed.close()); + return this.input.end(); + } return feed.close(); } - const cachePath = this.getParam('cachePath'); - const cacheKey = this.getParam('cacheKey'); if (cachePath && cacheKey) { - if (this.isFirst()) { - // console.log('cache set', cachePath, cacheKey); + if (!this.input) { this.input = ezs.createStream(ezs.objectMode()); - this.input - .pipe(ezs('pack')) - .pipe(cacache.put.stream(cachePath, cacheKey)); + this.whenFinish = new Promise( + (resolve) => this.input + .pipe(ezs('pack')) + .pipe(cacache.put.stream(cachePath, cacheKey)) + .on('end', resolve) + ); } return ezs.writeTo(this.input, data, () => feed.send(data)); } - feed.send(data); + return feed.send(data); } @@ -127,9 +133,7 @@ export default async function combine(data, feed) { database[this.databaseID] = {}; let stream; if (cacheName) { - // console.log('cache get', this.cachePath, this.databaseID); const cacheObject = await cacache.get.info(this.cachePath, this.databaseID); - // console.log({cacheObject}); if (cacheObject) { stream = cacache.get.stream.byDigest(this.cachePath, cacheObject.integrity).pipe(ezs('unpack')); } @@ -156,39 +160,35 @@ export default async function combine(data, feed) { if (this.isLast()) { return feed.close(); } - return whenReady - .then(() => { - const defval = this.getParam('default', null); - const path = this.getParam('path'); - const pathVal = _.get(data, path); - const keys = [].concat(pathVal).filter(Boolean); - if (keys.length === 0) { - return feed.send(data); - } - const values = keys.map((key) => { - if (!database[this.databaseID][key]) { - return null; - } - return core(key, database[this.databaseID][key]); - }); - // length of the values is always equal to the length of the keys. - if (Array.isArray(pathVal)) { - _.set(data, path, values); - } else { - const val = values.shift(); - if (val !== null) { - _.set(data, path, val); - } else if (defval !== null) { - const orig = _.get(data, path); - _.set(data, path, { id: orig, value: defval }); - } else { - const orig = _.get(data, path); - _.set(data, path, { id: orig, value: orig }); - } - } - return feed.send(data); - }) - .catch((e) => { - feed.stop(e); - }); + await whenReady; + const defval = this.getParam('default', null); + const path = this.getParam('path'); + const pathVal = _.get(data, path); + const keys = [].concat(pathVal).filter(Boolean); + if (keys.length === 0) { + return feed.send(data); + } + const values = keys.map((key) => { + if (!database[this.databaseID][key]) { + return null; + } + return core(key, database[this.databaseID][key]); + }); + // length of the values is always equal to the length of the keys. + if (Array.isArray(pathVal)) { + _.set(data, path, values); + } else { + const val = values.shift(); + if (val !== null) { + _.set(data, path, val); + } else if (defval !== null) { + const orig = _.get(data, path); + _.set(data, path, { id: orig, value: defval }); + } else { + const orig = _.get(data, path); + _.set(data, path, { id: orig, value: orig }); + } + } + return feed.send(data); + }