diff --git a/packages/core/src/statements/dedupe.js b/packages/core/src/statements/dedupe.js new file mode 100644 index 00000000..56ecc1a8 --- /dev/null +++ b/packages/core/src/statements/dedupe.js @@ -0,0 +1,37 @@ +import _ from 'lodash'; +/** + * Take `Object`, and check that the object identifier has not already been used previously + * + * @param {String} [path = uri] path containing the object Identifier + * @param {Boolean} [ignore = false] Just ignore duplicate object + * @returns {Object} + */ +export default async function identify(data, feed) { + const pathName = this.getParam('path', 'uri'); + const ignore = Boolean(this.getParam('ignore', false)); + const path = Array.isArray(pathName) ? pathName.shift() : pathName; + const check = 'no uri!'; + const uri = _.get(data, path, check); + if (!this.previousURI) { + this.previousURI = {}; + } + if (this.isLast()) { + return feed.close(); + } + if (uri === check) { + if (ignore) { + console.warn(`WARNING: ${path} field not exists, item #${this.getIndex()} was ignored` ); + return feed.end(); + } + return feed.send(new Error(`${path} field not exists, enable to dedupe.`)); + } + if (this.previousURI[uri] === true) { + if (ignore) { + console.warn(`WARNING: ${uri} already exists, item #${this.getIndex()} was ignored` ); + return feed.end(); + } + return feed.send(new Error(`Duplicate identifier: ${uri} already exists`)); + } + this.previousURI[uri] = true; + return feed.send(data); +} diff --git a/packages/core/test/dedupe.js b/packages/core/test/dedupe.js new file mode 100644 index 00000000..518e54cf --- /dev/null +++ b/packages/core/test/dedupe.js @@ -0,0 +1,142 @@ +import assert from 'assert'; +import from from 'from'; +import ezs from '../src'; +import statements from '../src/statements'; + +ezs.use(statements); + +describe('[dedupe]', () => { + it('with error (1)', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 1, b: 'b' }, + { a: 3, b: 'c' }, + { a: 4, b: 'd' }, + { a: 5, b: 'e' }, + { a: 6, b: 'f' }, + ]; + const output = []; + from(input) + .pipe(ezs('dedupe', { path: ['a', 'c'] })) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 6); + assert.ok(output[1] instanceof Error); + assert.ok(output[1].message.includes('Duplicate identifier: 1 already exists')); + done(); + }); + }); + it('with error (2)', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 1, b: 'b' }, + { a: 3, b: 'c' }, + { a: 3, b: 'd' }, + { a: 4, b: 'e' }, + { a: 4, b: 'f' }, + ]; + const output = []; + from(input) + .pipe(ezs('dedupe', { path: 'a' })) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 6); + assert.ok(output[1] instanceof Error); + assert.ok(output[3] instanceof Error); + assert.ok(output[5] instanceof Error); + done(); + }); + }); + it('with ignore (1)', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 1, b: 'b' }, + { a: 3, b: 'c' }, + { a: 4, b: 'd' }, + { a: 5, b: 'e' }, + { a: 6, b: 'f' }, + ]; + const output = []; + from(input) + .pipe(ezs('dedupe', { path: ['a', 'c'], ignore: true })) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 5); + done(); + }); + }); + it('with ignore (2)', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 1, b: 'b' }, + { a: 3, b: 'c' }, + { a: 3, b: 'd' }, + { a: 4, b: 'e' }, + { a: 4, b: 'f' }, + ]; + const output = []; + from(input) + .pipe(ezs('dedupe', { path: 'a', ignore: true })) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 3); + done(); + }); + }); + it('with no uri', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 1, b: 'b' }, + { a: 3, b: 'c' }, + { a: 3, b: 'd' }, + { a: 4, b: 'e' }, + { a: 4, b: 'f' }, + ]; + const output = []; + from(input) + .pipe(ezs('dedupe')) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 6); + assert.ok(output[0] instanceof Error); + assert.ok(output[0].message.includes('uri field not exists, enable to dedupe.')); + assert.ok(output[1] instanceof Error); + assert.ok(output[2] instanceof Error); + assert.ok(output[3] instanceof Error); + assert.ok(output[4] instanceof Error); + assert.ok(output[5] instanceof Error); + done(); + }); + }); + it('with no uri (ignore)', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 1, b: 'b' }, + { a: 3, b: 'c' }, + { a: 3, b: 'd' }, + { a: 4, b: 'e' }, + { a: 4, b: 'f' }, + ]; + const output = []; + from(input) + .pipe(ezs('dedupe', { ignore: true })) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 0); + done(); + }); + }); +}); +