diff --git a/packages/basics/src/csv-parse.js b/packages/basics/src/csv-parse.js index c45cc6a7..6bb5dc79 100644 --- a/packages/basics/src/csv-parse.js +++ b/packages/basics/src/csv-parse.js @@ -11,9 +11,13 @@ function CSVParse(data, feed) { this.whenFinish = feed.flow(this.input); } if (this.isLast()) { - this.decoder.end(); + writeTo( + this.input, + this.decoder.end(), + () => this.input.end(), + ); this.whenFinish.finally(() => feed.close()); - return this.input.end(); + return ; } writeTo( this.input, diff --git a/packages/basics/src/txt-parse.js b/packages/basics/src/txt-parse.js index f9bf79f9..465f1997 100644 --- a/packages/basics/src/txt-parse.js +++ b/packages/basics/src/txt-parse.js @@ -3,14 +3,16 @@ import { StringDecoder } from 'string_decoder'; function TXTParse(data, feed) { if (!this.decoder) { this.decoder = new StringDecoder('utf8'); + this.remainder = ''; + this.counter = 0; } if (this.isLast()) { - this.decoder.end(); + this.remainder += this.decoder.end(); + if (this.remainder && this.counter > 1) { + feed.write(this.remainder); + } return feed.end(); } - - this.remainder = this.remainder || ''; - let separator; try { const val = '"'.concat(this.getParam('separator', '\n')).concat('"'); @@ -32,6 +34,7 @@ function TXTParse(data, feed) { lines.forEach((line) => { feed.write(line); }); + this.counter += lines.length; feed.end(); } diff --git a/packages/basics/test/basics.js b/packages/basics/test/basics.js index fc45df88..f63bc9e3 100644 --- a/packages/basics/test/basics.js +++ b/packages/basics/test/basics.js @@ -38,6 +38,31 @@ describe('test', () => { done(); }); }); + it('CSVParse #2', (done) => { + const res = []; + from([ + 'a,b,c\nd,', + 'e,f\ng,', + Buffer.from([0xE2]), + Buffer.from([0x82]), + Buffer.from([0xAC]), + Buffer.from(','), + Buffer.from([0xC2]), + Buffer.from([0xA2]), + + ]) + .pipe(ezs('CSVParse')) + .on('data', (chunk) => { + assert(typeof chunk === 'object'); + res.push(chunk); + }) + .on('end', () => { + assert.equal(res.length, 3); + assert.equal(res[2][1], '€'); + assert.equal(res[2][2], '¢'); + done(); + }); + }); it('CSVString#1', (done) => { const res = []; diff --git a/packages/basics/test/txt-parse.js b/packages/basics/test/txt-parse.js index 9c92cb2b..59899bfd 100644 --- a/packages/basics/test/txt-parse.js +++ b/packages/basics/test/txt-parse.js @@ -81,6 +81,28 @@ describe('TXTParse', () => { done(); }); }); + it('should generate with a separator parameter and multi bytes char', (done) => { + let res = []; + from([ + 'a*b*', + Buffer.from([0xE2]), + Buffer.from([0x82]), + Buffer.from([0xAC]), + '*', + Buffer.from([0xC2]), + Buffer.from([0xA2]), + ]) + .pipe(ezs('TXTParse', { separator: '*' })) + .on('data', (data) => { + res = [...res, data]; + }) + .on('end', () => { + expect(res).toStrictEqual(['a', 'b', '€', '¢']); + done(); + }); + }); + + it('should not generate with a tab separator', (done) => { let res = []; diff --git a/packages/core/src/statements/concat.js b/packages/core/src/statements/concat.js index 8639c9c2..2837e75a 100644 --- a/packages/core/src/statements/concat.js +++ b/packages/core/src/statements/concat.js @@ -1,3 +1,4 @@ +import { StringDecoder } from 'string_decoder'; /** * Take all `String`, concat them and throw just one. * @@ -33,20 +34,23 @@ * @returns {String} */ export default function concat(data, feed) { + if (!this.decoder) { + this.decoder = new StringDecoder('utf8'); + } const beginWith = this.getParam('beginWith', ''); const joinWith = this.getParam('joinWith', ''); const endWith = this.getParam('endWith', ''); if (this.buffer === undefined) { - this.buffer = beginWith; + this.buffer = []; } if (this.isLast()) { - feed.send(this.buffer.concat(endWith)); + this.buffer.push(this.decoder.end()); + feed.send(beginWith.concat(this.buffer.filter(Boolean).join(joinWith)).concat(endWith)); return feed.close(); } - if (!this.isFirst()) { - this.buffer = this.buffer.concat(joinWith); - } - this.buffer = this.buffer.concat(data); + const value = Buffer.isBuffer(data) ? this.decoder.write(data) : data; + this.buffer.push(value); return feed.end(); } + diff --git a/packages/core/src/statements/unpack.js b/packages/core/src/statements/unpack.js index b4769aa0..bb7556f9 100644 --- a/packages/core/src/statements/unpack.js +++ b/packages/core/src/statements/unpack.js @@ -1,4 +1,6 @@ import debug from 'debug'; +import { StringDecoder } from 'string_decoder'; + const eol = '\n'; @@ -9,17 +11,21 @@ const eol = '\n'; * @returns {object} */ export default function unpack(data, feed) { + if (!this.decoder) { + this.decoder = new StringDecoder('utf8'); + this.remainder = ''; + } if (this.isLast()) { - if (this.remainder) { - feed.write(JSON.parse(this.remainder)); + const lastchunk = [this.remainder, this.decoder.end()].filter(Boolean).join(''); + if (lastchunk) { + feed.write(JSON.parse(lastchunk)); } return feed.close(); } - this.remainder = this.remainder || ''; let lines; if (Buffer.isBuffer(data)) { - lines = data.toString().split(eol); + lines = this.decoder.write(data).split(eol); } else if (typeof data === 'string') { lines = data.split(eol); } else { diff --git a/packages/core/test/statements.js b/packages/core/test/statements.js index 7cfa8b07..1e8bfba1 100644 --- a/packages/core/test/statements.js +++ b/packages/core/test/statements.js @@ -143,7 +143,30 @@ describe('statements', () => { done(); }); }); + it('concat#2', (done) => { + const result = []; + from([ + Buffer.from([0xE2]), + Buffer.from([0x82]), + Buffer.from([0xAC]), + Buffer.from([0xC2]), + Buffer.from([0xA2]), + ]) + .pipe(ezs('concat', { + beginWith: '<', + joinWith: '|', + endWith: '>', + })) + .on('data', (chunk) => { + result.push(chunk); + }) + .on('end', () => { + assert.equal(result.length, 1); + assert.equal(result[0], '<€|¢>'); + done(); + }); + }); it('transit#1', (done) => { let index = 0; const data = [ @@ -319,6 +342,7 @@ describe('statements', () => { res.push(chunk); }) .on('end', () => { + console.log({res}); assert.equal(res.length, 0); done(); }); @@ -359,6 +383,31 @@ describe('statements', () => { done(new Error('Error is the right behavior')); }); }); + it('unpack#5', (done) => { + const res = []; + from([ + Buffer.from('"'), + Buffer.from([0xC2]), + Buffer.from([0xA2]), + Buffer.from('"'), + Buffer.from('\n'), + Buffer.from('"'), + Buffer.from([0xE2]), + Buffer.from([0x82]), + Buffer.from([0xAC]), + Buffer.from('"'), + Buffer.from('\n'), + ]) + .pipe(ezs('unpack')) + .on('data', (chunk) => { + res.push(chunk); + }) + .on('end', () => { + assert.equal(res[0], '¢'); + assert.equal(res[1], '€'); + done(); + }); + }); it('truncate#1', (done) => { const res = []; from(['aa', 'bb', 'cc', 'dd', 'ee'])