diff --git a/lib/server/command_processor.js b/lib/server/command_processor.js index c2b4cfe..f6fdbd7 100644 --- a/lib/server/command_processor.js +++ b/lib/server/command_processor.js @@ -2,13 +2,19 @@ const helpers = require('./../helpers'); const config = require('config'); const filesize = require('filesize'); const consts = require('./../constants'); -const Duplex = require('stream').Duplex; +const { Duplex, Writable } = require('stream'); const { promisify } = require('util'); const kSource = Symbol("source"); const kCache = Symbol("cache"); const kSendFileQueue = Symbol("sendFileQueue"); +class NullStream extends Writable { + _write(chunk, encoding, cb) { + setImmediate(cb); + } +} + class CommandProcessor extends Duplex { /** @@ -24,7 +30,6 @@ class CommandProcessor extends Duplex { putStream: this._handleWrite.bind(this), command: this._handleCommand.bind(this), version: this._handleVersion.bind(this), - none: () => Promise.resolve() }; this._writeHandler = this._writeHandlers.version; @@ -40,6 +45,7 @@ class CommandProcessor extends Duplex { this._putWhitelist = this._options.putWhitelist; this._whitelistEmpty = (!Array.isArray(this._putWhitelist) || !this._putWhitelist.length); + this._nullStream = new NullStream(); this._putStream = null; this._putSize = 0; this._putSent = 0; @@ -344,14 +350,14 @@ class CommandProcessor extends Duplex { if (this._isWhitelisted(this._trx.clientAddress)) { this._putStream = await this._trx.getWriteStream(type, size); - this._putStream.promiseWrite = promisify(this._putStream.write).bind(this._putStream); - this._putSize = size; - this._writeHandler = this._writeHandlers.putStream; - } - else { - this._writeHandler = this._writeHandlers.none; + } else { + this._putStream = this._nullStream; helpers.log(consts.LOG_DBG, `PUT rejected from non-whitelisted IP: ${this._trx.clientAddress}`); } + + this._putStream.promiseWrite = promisify(this._putStream.write).bind(this._putStream); + this._putSize = size; + this._writeHandler = this._writeHandlers.putStream; } } diff --git a/test/command_processor.js b/test/command_processor.js index ef008db..e45aead 100644 --- a/test/command_processor.js +++ b/test/command_processor.js @@ -19,7 +19,7 @@ describe("CommandProcessor", () => { const p = this.cmdProc._onPut("a", 999); p.catch(function () {}); - assert(spy.called) + assert(spy.called); }); it("should implement PUT when whitelisted (multiple)", async () => { @@ -33,7 +33,7 @@ describe("CommandProcessor", () => { const p = this.cmdProc._onPut("a", 999); p.catch(function () {}); - assert(spy.called) + assert(spy.called); }); it("should implement PUT when whitelist empty", async () => { @@ -47,29 +47,20 @@ describe("CommandProcessor", () => { const p = this.cmdProc._onPut("a", 999); p.catch(function () {}); - assert(spy.called) + assert(spy.called); }); - it("should not implement PUT when not whitelisted", async () => { + it("should allow commands after writing when being whitelisted", async () => { this.cmdProc._whitelistEmpty = false; this.cmdProc._putWhitelist = ["127.0.0.1"]; this.cmdProc._trx = new PutTransaction(); this.cmdProc._trx.clientAddress = "127.0.0.2"; - await this.cmdProc._onPut("a", 999); - assert.strictEqual(this.cmdProc._writeHandler, this.cmdProc._writeHandlers.none); - }); - - it("should not implement PUT when not whitelisted (multiple)", async () => { - this.cmdProc._whitelistEmpty = false; - this.cmdProc._putWhitelist = ["127.0.0.6", "127.0.0.3", "127.0.0.1"]; - - this.cmdProc._trx = new PutTransaction(); - this.cmdProc._trx.clientAddress = "127.0.0.2"; - - await this.cmdProc._onPut("a", 999); - assert.strictEqual(this.cmdProc._writeHandler, this.cmdProc._writeHandlers.none); + await this.cmdProc._onPut("a", 6); + assert.strictEqual(this.cmdProc._writeHandler, this.cmdProc._writeHandlers.putStream); + this.cmdProc._writeHandler('abcdef'); + assert(this.cmdProc._writeHandlers.command); }); }); });