From ef969fa78aa90a04eae7bb4b9fa9aecc448f9d2a Mon Sep 17 00:00:00 2001 From: Matthew Blasius Date: Wed, 3 May 2023 17:09:13 -0500 Subject: [PATCH] fix: only emit 'end' after unzip is done writing --- src/node/index.js | 6 +++--- test/node/pipe.js | 55 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/src/node/index.js b/src/node/index.js index 0a1bacd59..e6dc73cc9 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -463,13 +463,13 @@ Request.prototype._pipeContinue = function (stream, options) { stream.emit('error', error); }); res.pipe(unzipObject).pipe(stream, options); + // don't emit 'end' until unzipObject has completed writing all its data. + unzipObject.once('end', () => this.emit('end')); } else { res.pipe(stream, options); + res.once('end', () => this.emit('end')); } - res.once('end', () => { - this.emit('end'); - }); }); return stream; }; diff --git a/test/node/pipe.js b/test/node/pipe.js index 649977a5a..c6f11222c 100644 --- a/test/node/pipe.js +++ b/test/node/pipe.js @@ -6,6 +6,8 @@ const app = express(); const fs = require('fs'); const bodyParser = require('body-parser'); let http = require('http'); +const zlib = require('zlib'); +const { pipeline } = require('stream'); if (process.env.HTTP2_TEST) { http = require('http2'); @@ -17,6 +19,13 @@ app.get('/', (request_, res) => { fs.createReadStream('test/node/fixtures/user.json').pipe(res); }); +app.get('/gzip', (request_, res) => { + res.writeHead(200, { + 'Content-Encoding': 'gzip' + }); + fs.createReadStream('test/node/fixtures/user.json').pipe(new zlib.createGzip()).pipe(res); +}); + app.get('/redirect', (request_, res) => { res.set('Location', '/').sendStatus(302); }); @@ -102,6 +111,52 @@ describe('request pipe', () => { request_.pipe(stream); }); + it('should act as a readable stream with unzip', (done) => { + const stream = fs.createWriteStream(destinationPath); + + let responseCalled = false; + const request_ = request.get(base + '/gzip'); + request_.type('json'); + + request_.on('response', (res) => { + res.status.should.eql(200); + responseCalled = true; + }); + stream.on('finish', () => { + JSON.parse(fs.readFileSync(destinationPath)).should.eql({ + name: 'tobi' + }); + responseCalled.should.be.true(); + done(); + }); + request_.pipe(stream); + }); + + it('should act as a readable stream with unzip and node.stream.pipeline', (done) => { + const stream = fs.createWriteStream(destinationPath); + + let responseCalled = false; + const request_ = request.get(base + '/gzip'); + request_.type('json'); + + request_.on('response', (res) => { + res.status.should.eql(200); + responseCalled = true; + }); + // pipeline automatically ends streams by default. Since unzipping introduces a transform stream that is + // not monitored by pipeline, we need to make sure request_ does not emit 'end' until the unzip step + // has finished writing data. Otherwise, we'll either end up with truncated data or a 'write after end' error. + pipeline(request_, stream, function (err) { + (!!err).should.be.false(); + responseCalled.should.be.true(); + + JSON.parse(fs.readFileSync(destinationPath)).should.eql({ + name: 'tobi' + }); + done(); + }); + }); + it('should follow redirects', (done) => { const stream = fs.createWriteStream(destinationPath);