Skip to content

Commit

Permalink
Merge pull request #400 from Inist-CNRS/combine-cache
Browse files Browse the repository at this point in the history
feat: 🎸 cacheName for combine like expand
  • Loading branch information
touv authored Feb 9, 2024
2 parents 3e4f4de + 0b78a9e commit baf45cf
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 64 deletions.
138 changes: 93 additions & 45 deletions packages/core/src/statements/combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ import _ from 'lodash';
import debug from 'debug';
import assert from 'assert';
import hasher from 'node-object-hash';
import { resolve as resolvePath } from 'path';
import { tmpdir } from 'os';
import makeDir from 'make-dir';
import pathExists from 'path-exists';
import cacache from 'cacache';

const hashCoerce = hasher({ sort: false, coerce: true });
const core = (id, value) => ({ id, value });

const database = {};
export const database = {};

async function saveIn(data, feed) {
if (this.isLast()) {
Expand All @@ -23,10 +28,37 @@ async function saveIn(data, feed) {
database[databaseID][id] = value;
}
}
return feed.send(id);
return feed.send(data);
}

function cacheSave(data, feed) {
const { ezs } = this;
const cachePath = this.getParam('cachePath');
const cacheKey = this.getParam('cacheKey');
if (this.isLast()) {
if (cachePath && cacheKey) {
this.whenFinish.finally(() => feed.close());
return this.input.end();
}
return feed.close();
}
if (cachePath && cacheKey) {
if (!this.input) {
this.input = ezs.createStream(ezs.objectMode());
this.whenFinish = new Promise(
(resolve) => this.input
.pipe(ezs('pack'))
.pipe(cacache.put.stream(cachePath, cacheKey))
.on('end', resolve)
);
}
return ezs.writeTo(this.input, data, () => feed.send(data));
}
return feed.send(data);
}



/**
* Takes an `Object` and substitute a field with the corresponding value found in a external pipeline
* the internal pipeline must produce a stream of special object (id, value)
Expand Down Expand Up @@ -70,15 +102,23 @@ async function saveIn(data, feed) {
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [logger] A dedicaded pipeline described in a file to trap or log errors
* @param {String} [cacheName] Enable cache, with dedicated name
* @returns {Object}
*/
export default function combine(data, feed) {
export default async function combine(data, feed) {
const { ezs } = this;
const cacheName = this.getParam('cacheName');
let whenReady = Promise.resolve(true);
if (this.isFirst()) {
if (cacheName && !this.cachePath) {
const location = this.getParam('location');
this.cachePath = resolvePath(location || tmpdir(), 'memory', `combine/${cacheName}`);
if (!pathExists.sync(this.cachePath)) {
makeDir.sync(this.cachePath);
}
}
debug('ezs')('[combine] with sub pipeline.');
const primer = this.getParam('primer', 'n/a');
const input = ezs.createStream(ezs.objectMode());
const commands = ezs.createCommands({
file: this.getParam('file'),
script: this.getParam('script'),
Expand All @@ -88,11 +128,26 @@ export default function combine(data, feed) {
append: this.getParam('append'),
});
this.databaseID = hashCoerce.hash({ primer, commands });
const input = ezs.createStream(ezs.objectMode());
if (!database[this.databaseID]) {
database[this.databaseID] = {};
const statements = ezs.compileCommands(commands, this.getEnv());
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
const output = ezs.createPipeline(input, statements, logger)
let stream;
if (cacheName) {
const cacheObject = await cacache.get.info(this.cachePath, this.databaseID);
if (cacheObject) {
stream = cacache.get.stream.byDigest(this.cachePath, cacheObject.integrity).pipe(ezs('unpack'));
}
}
if (!stream) {
const statements = ezs.compileCommands(commands, this.getEnv());
const logger = ezs.createTrap(this.getParam('logger'), this.getEnv());
stream = ezs.createPipeline(input, statements, logger)
.pipe(ezs(cacheSave, {
cachePath: this.cachePath,
cacheKey: this.databaseID,
}));
}
const output = stream
.pipe(ezs(saveIn, null, this.databaseID))
.pipe(ezs.catch())
.on('data', (d) => assert(d)) // WARNING: The data must be consumed, otherwise the "end" event has not been triggered
Expand All @@ -105,42 +160,35 @@ export default function combine(data, feed) {
if (this.isLast()) {
return feed.close();
}
return whenReady
.then(() => {
const defval = this.getParam('default', null);
const path = this.getParam('path');
const pathVal = _.get(data, path);
const keys = [].concat(pathVal).filter(Boolean);
if (keys.length === 0) {
return feed.send(data);
}
const values = keys.map((key) => {
if (!database[this.databaseID][key]) {
return null;
}
return core(key, database[this.databaseID][key]);
});
if (values.length && Array.isArray(pathVal)) {
_.set(data, path, values);
} else if (values.length && !Array.isArray(pathVal)) {
const val = values.shift();
if (val !== null) {
_.set(data, path, val);
} else if (defval !== null) {
const orig = _.get(data, path);
_.set(data, path, { id: orig, value: defval });
} else {
const orig = _.get(data, path);
_.set(data, path, { id: orig, value: orig });
}
} else if (Array.isArray(pathVal)) {
_.set(data, path, pathVal.map((id) => ({ id })));
} else {
_.set(data, path, { id: pathVal });
}
return feed.send(data);
})
.catch((e) => {
feed.stop(e);
});
await whenReady;
const defval = this.getParam('default', null);
const path = this.getParam('path');
const pathVal = _.get(data, path);
const keys = [].concat(pathVal).filter(Boolean);
if (keys.length === 0) {
return feed.send(data);
}
const values = keys.map((key) => {
if (!database[this.databaseID][key]) {
return null;
}
return core(key, database[this.databaseID][key]);
});
// length of the values is always equal to the length of the keys.
if (Array.isArray(pathVal)) {
_.set(data, path, values);
} else {
const val = values.shift();
if (val !== null) {
_.set(data, path, val);
} else if (defval !== null) {
const orig = _.get(data, path);
_.set(data, path, { id: orig, value: defval });
} else {
const orig = _.get(data, path);
_.set(data, path, { id: orig, value: orig });
}
}
return feed.send(data);

}
81 changes: 62 additions & 19 deletions packages/core/test/combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import assert from 'assert';
import fs from 'fs';
import from from 'from';
import ezs from '../src';
import { database } from '../src/statements/combine';

ezs.addPath(__dirname);

Expand Down Expand Up @@ -209,6 +210,47 @@ describe('combine', () => {
done();
});
});
test('with script #4', (done) => {
const input = [
{ a: 1, b: ['a', 'b'] },
{ a: 2 },
{ a: 3, b: 'c' },
{ a: 4, b: 'y' },
{ a: 5, b: 'e' },
{ a: 6, b: 'z' },
];
const output = [];
const script = `
[use]
plugin = analytics
[replace]
path = value
value = fix({id:'a', value:'aa'},{id:'b', value:'bb'},{id:'c', value:'cc'},{id:'d', value:'dd'},{id:'e', value:'ee'},{id:'f', value:'ff'})
[exploding]
[value]
`;

from(input)
.pipe(ezs('combine', { path: 'b', default:'n/a', script }))
.pipe(ezs.catch())
.on('error', done)
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
assert.equal(output.length, 6);
assert.equal(output[0].b[0].value, 'aa');
assert.equal(output[0].b[1].value, 'bb');
assert.equal(output[1].b, undefined);
assert.equal(output[2].b.value, 'cc');
assert.equal(output[3].b.value, 'n/a');
assert.equal(output[4].b.value, 'ee');
assert.equal(output[5].b.value, 'n/a');
done();
});
});
test('with file', (done) => {
const input = [
{ a: 1, b: 'a' },
Expand Down Expand Up @@ -331,6 +373,7 @@ describe('no combine', () => {
done(new Error('Error is the right behavior'));
});
});

test.skip('with wrong location', (done) => {
const input = [
{ a: 1, b: 'a' },
Expand Down Expand Up @@ -364,7 +407,7 @@ describe('no combine', () => {
})
.on('end', () => {
done(new Error('Error is the right behavior'));
});
});
});
});

Expand All @@ -388,20 +431,20 @@ const cacheScript = `
[value]
`;



test('combine with internal cache with script #1', (done) => {
const nextTest = (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 2, b: 'b' },
{ a: 3, b: 'c' },
{ a: 4, b: 'd' },
{ a: 5, b: 'e' },
{ a: 6, b: 'f' },
{ a: 11, b: 'a' },
{ a: 12, b: 'b' },
{ a: 13, b: 'c' },
{ a: 14, b: 'd' },
{ a: 15, b: 'e' },
{ a: 16, b: 'f' },
];
// Clean memory cache to hit the file cache
Object.keys(database).forEach(k => {delete database[k];});
const output = [];
from(input)
.pipe(ezs('combine', { path: 'b', script: cacheScript }, env))
.pipe(ezs('combine', { path: 'b', script: cacheScript, cacheName }, env))
.pipe(ezs.catch())
.on('error', done)
.on('data', (chunk) => {
Expand All @@ -415,13 +458,13 @@ test('combine with internal cache with script #1', (done) => {
assert.equal(output[3].b.value, 'dd');
assert.equal(output[4].b.value, 'ee');
assert.equal(output[5].b.value, 'ff');
assert.equal(env.executed, true);
env.executed = false;
assert.equal(env.executed, false);
done();
});
});
};


test('combine with internal cache with script #2', (done) => {
test('combine with internal cache with script', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 2, b: 'b' },
Expand All @@ -432,7 +475,7 @@ test('combine with internal cache with script #2', (done) => {
];
const output = [];
from(input)
.pipe(ezs('combine', { path: 'b', script: cacheScript }, env))
.pipe(ezs('combine', { path: 'b', script: cacheScript, cacheName }, env))
.pipe(ezs.catch())
.on('error', done)
.on('data', (chunk) => {
Expand All @@ -446,9 +489,9 @@ test('combine with internal cache with script #2', (done) => {
assert.equal(output[3].b.value, 'dd');
assert.equal(output[4].b.value, 'ee');
assert.equal(output[5].b.value, 'ff');
assert.equal(env.executed, false);
done();
assert.equal(env.executed, true);
env.executed = false;
nextTest(done);
});
});


0 comments on commit baf45cf

Please sign in to comment.