Skip to content

Commit

Permalink
Merge pull request #385 from Inist-CNRS/dedupe
Browse files Browse the repository at this point in the history
feat: 🎸 add [dedupe]
  • Loading branch information
touv authored Nov 27, 2023
2 parents be3f1e5 + 8d6c69a commit 9e1d0ca
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 0 deletions.
37 changes: 37 additions & 0 deletions packages/core/src/statements/dedupe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import _ from 'lodash';
/**
* Take `Object`, and check that the object identifier has not already been used previously
*
* @param {String} [path = uri] path containing the object Identifier
* @param {Boolean} [ignore = false] Just ignore duplicate object
* @returns {Object}
*/
export default async function identify(data, feed) {
const pathName = this.getParam('path', 'uri');
const ignore = Boolean(this.getParam('ignore', false));
const path = Array.isArray(pathName) ? pathName.shift() : pathName;
const check = 'no uri!';
const uri = _.get(data, path, check);
if (!this.previousURI) {
this.previousURI = {};
}
if (this.isLast()) {
return feed.close();
}
if (uri === check) {
if (ignore) {
console.warn(`WARNING: ${path} field not exists, item #${this.getIndex()} was ignored` );
return feed.end();
}
return feed.send(new Error(`${path} field not exists, enable to dedupe.`));
}
if (this.previousURI[uri] === true) {
if (ignore) {
console.warn(`WARNING: ${uri} already exists, item #${this.getIndex()} was ignored` );
return feed.end();
}
return feed.send(new Error(`Duplicate identifier: ${uri} already exists`));
}
this.previousURI[uri] = true;
return feed.send(data);
}
2 changes: 2 additions & 0 deletions packages/core/src/statements/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import exchange from './exchange';
import swing from './swing';
import loop from './loop';
import map from './map';
import dedupe from './dedupe';
import identify from './identify';
import throttle from './throttle';
import combine from './combine';
Expand Down Expand Up @@ -68,6 +69,7 @@ export default {
swing,
loop,
map,
dedupe,
identify,
throttle,
expand,
Expand Down
142 changes: 142 additions & 0 deletions packages/core/test/dedupe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import assert from 'assert';
import from from 'from';
import ezs from '../src';
import statements from '../src/statements';

ezs.use(statements);

describe('[dedupe]', () => {
it('with error (1)', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 1, b: 'b' },
{ a: 3, b: 'c' },
{ a: 4, b: 'd' },
{ a: 5, b: 'e' },
{ a: 6, b: 'f' },
];
const output = [];
from(input)
.pipe(ezs('dedupe', { path: ['a', 'c'] }))
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
assert.equal(output.length, 6);
assert.ok(output[1] instanceof Error);
assert.ok(output[1].message.includes('Duplicate identifier: 1 already exists'));
done();
});
});
it('with error (2)', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 1, b: 'b' },
{ a: 3, b: 'c' },
{ a: 3, b: 'd' },
{ a: 4, b: 'e' },
{ a: 4, b: 'f' },
];
const output = [];
from(input)
.pipe(ezs('dedupe', { path: 'a' }))
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
assert.equal(output.length, 6);
assert.ok(output[1] instanceof Error);
assert.ok(output[3] instanceof Error);
assert.ok(output[5] instanceof Error);
done();
});
});
it('with ignore (1)', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 1, b: 'b' },
{ a: 3, b: 'c' },
{ a: 4, b: 'd' },
{ a: 5, b: 'e' },
{ a: 6, b: 'f' },
];
const output = [];
from(input)
.pipe(ezs('dedupe', { path: ['a', 'c'], ignore: true }))
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
assert.equal(output.length, 5);
done();
});
});
it('with ignore (2)', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 1, b: 'b' },
{ a: 3, b: 'c' },
{ a: 3, b: 'd' },
{ a: 4, b: 'e' },
{ a: 4, b: 'f' },
];
const output = [];
from(input)
.pipe(ezs('dedupe', { path: 'a', ignore: true }))
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
assert.equal(output.length, 3);
done();
});
});
it('with no uri', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 1, b: 'b' },
{ a: 3, b: 'c' },
{ a: 3, b: 'd' },
{ a: 4, b: 'e' },
{ a: 4, b: 'f' },
];
const output = [];
from(input)
.pipe(ezs('dedupe'))
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
assert.equal(output.length, 6);
assert.ok(output[0] instanceof Error);
assert.ok(output[0].message.includes('uri field not exists, enable to dedupe.'));
assert.ok(output[1] instanceof Error);
assert.ok(output[2] instanceof Error);
assert.ok(output[3] instanceof Error);
assert.ok(output[4] instanceof Error);
assert.ok(output[5] instanceof Error);
done();
});
});
it('with no uri (ignore)', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 1, b: 'b' },
{ a: 3, b: 'c' },
{ a: 3, b: 'd' },
{ a: 4, b: 'e' },
{ a: 4, b: 'f' },
];
const output = [];
from(input)
.pipe(ezs('dedupe', { ignore: true }))
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
assert.equal(output.length, 0);
done();
});
});
});

0 comments on commit 9e1d0ca

Please sign in to comment.