Skip to content
This repository has been archived by the owner on Jan 9, 2023. It is now read-only.

Commit

Permalink
conform write handler pattern to match client_stream_processor (pre-b…
Browse files Browse the repository at this point in the history
…inding the handlers and setting them explicitly when necessary)
  • Loading branch information
stephen-palmer committed Feb 6, 2018
1 parent 87fa097 commit d715097
Showing 1 changed file with 14 additions and 31 deletions.
45 changes: 14 additions & 31 deletions lib/server/command_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ const { promisify } = require('util');
const kSource = Symbol("source");
const kCache = Symbol("cache");
const kSendFileQueue = Symbol("sendFileQueue");
const kReadStateVersion = Symbol("readStateVersion");
const kReadStateCommand = Symbol("readStateCommand");
const kReadStatePutStream = Symbol("readStatePutStream");
const kReadStateNone = Symbol("readStateNone");

class CommandProcessor extends Duplex {

Expand All @@ -22,7 +18,15 @@ class CommandProcessor extends Duplex {
super();
this[kCache] = cache;
this[kSendFileQueue] = [];
this._setWriteHandler(kReadStateVersion);

this._writeHandlers = {
putStream: this._handleWrite.bind(this),
command: this._handleCommand.bind(this),
version: this._handleVersion.bind(this),
none: () => Promise.resolve()
};

this._writeHandler = this._writeHandlers.version;

/**
*
Expand All @@ -44,27 +48,6 @@ class CommandProcessor extends Duplex {
this._registerEventListeners();
}

/**
*
* @param {symbol<kReadStateVersion|kReadStateCommand|kReadStatePutStream>} readState
* @private
*/
_setWriteHandler(readState) {
switch(readState) {
case kReadStateVersion:
this._writeHandler = this._handleVersion;
break;
case kReadStateCommand:
this._writeHandler = this._handleCommand;
break;
case kReadStatePutStream:
this._writeHandler = this._handleWrite;
break;
default:
this._writeHandler = () => Promise.resolve();
}
}

_registerEventListeners() {
const self = this;
this.once('finish', this._printReadStats);
Expand All @@ -82,7 +65,7 @@ class CommandProcessor extends Duplex {
* @private
*/
_write(chunk, encoding, callback) {
this._writeHandler.call(this, chunk)
this._writeHandler(chunk)
.then(() => callback(), err => this._quit(err));
}

Expand Down Expand Up @@ -173,7 +156,7 @@ class CommandProcessor extends Duplex {
async _quit(err) {
this[kSource].unpipe(this);
this[kSource].emit('quit');
this._setWriteHandler(kReadStateNone);
this._writeHandler = this._writeHandlers.none;
if(err) {
helpers.log(consts.LOG_ERR, err);
}
Expand All @@ -186,7 +169,7 @@ class CommandProcessor extends Duplex {
*/
async _handleVersion(data) {
let version = helpers.readUInt32(data);
this._setWriteHandler(kReadStateCommand);
this._writeHandler = this._writeHandlers.command;

let err = null;
if(version !== consts.PROTOCOL_VERSION) {
Expand All @@ -208,7 +191,7 @@ class CommandProcessor extends Duplex {
this._putSent += data.length;
if(this._putSent === this._putSize) {
this._putStream.end();
this._setWriteHandler(kReadStateCommand);
this._writeHandler = this._writeHandlers.command;
this._putSent = 0;
this._putSize = 0;
}
Expand Down Expand Up @@ -349,7 +332,7 @@ class CommandProcessor extends Duplex {
this._putStream = await this._trx.getWriteStream(type, size);
this._putStream.promiseWrite = promisify(this._putStream.write).bind(this._putStream);
this._putSize = size;
this._setWriteHandler(kReadStatePutStream);
this._writeHandler = this._writeHandlers.putStream;
}
}

Expand Down

0 comments on commit d715097

Please sign in to comment.