Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handle ws errors #28

Merged
merged 8 commits into from
Jul 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions lib/ws-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,15 @@ WebsocketChannel.prototype.connect = function(uri, _token) {
this._uri = uri;
this._token = token;

var headers = {'x-mesh-token': token};
var websocket = new Websocket(uri, {headers: headers});
var opts = {
headers: {'x-mesh-token': token},
// TODO: per-message deflate could be useful for some of our messages, and
// it is even enabled by default, but it is also a little bit buggy, so we
// disable it so we don't end up with 'write after end' errors from the
// underlying socket.
perMessageDeflate: false,
};
var websocket = new Websocket(uri, opts);

this._attach(websocket);

Expand All @@ -88,7 +95,6 @@ WebsocketChannel.prototype.accept = function(websocket) {
assert(this._token);
this._uri = null; // TODO(bert) set this
this._attach(websocket);

return this;
};

Expand Down Expand Up @@ -123,15 +129,12 @@ WebsocketChannel.prototype._detach = function() {

this._debug('detach with state %j', this._websocket.readyState);

this._unscheduleSendAck();

this._websocket.removeListener('message', this._handleMessage);
this._websocket.removeListener('error', this._handleError);
this._websocket.removeListener('close', this._handleDisconnect);
this._websocket.removeListener('open', this._onOpen);

if (this._websocket.readystate !== Websocket.CLOSED)
this._closeWebsocket(this._websocket);
this._closeWebsocket(this._websocket);

this._websocket = null;
};
Expand All @@ -141,7 +144,6 @@ WebsocketChannel.prototype.close = function(callback) {
this._debug('close');

if (this._websocket) {
this._unscheduleSendAck();
this._flushSendQueue();
this._closeWebsocket(this._websocket);
}
Expand All @@ -156,6 +158,9 @@ WebsocketChannel.prototype.close = function(callback) {
} else {
var error = new Error('Messages were discarded');
this._debug('close: %s', error.message);
this._debug('sendSeq: %j, recvAck: %j, sentAck: %j, recvSeq: %j',
this._sentSeq, this._receivedAck, this._sentAck,
this._receivedSeq);
process.nextTick(function() {
callback(error);
});
Expand Down Expand Up @@ -230,6 +235,15 @@ WebsocketChannel.prototype.unref = function() {
WebsocketChannel.prototype._onOpen = function() {
this._debug('connection open');

var ws = this._websocket;
var self = this;
process.nextTick(function() {
// clients tend to emit 'connect', servers emit 'connection', but
// WebsocketChannel implements both a client and a server interface.
self.emit('connection', ws);
self.emit('connect', ws);
});

this._socket = this._websocket._socket;

if (this._unref && this._callbacksPending === 0)
Expand All @@ -240,8 +254,6 @@ WebsocketChannel.prototype._onOpen = function() {


WebsocketChannel.prototype._send = function(packet) {
this._unscheduleSendAck();

// Set the packet's sequence number and insert into the send queue.
var seq = packet.seq = ++this._seq;
this._sendQueue[seq] = packet;
Expand All @@ -266,7 +278,7 @@ WebsocketChannel.prototype._scheduleSendAck = function() {
if (this._receivedSeq === this._sentAck)
return;
// Don't schedule if other packets are scheduled.
if (this._sentSeq === this._seq)
if (this._sentSeq !== this._seq)
return;
// Don't schedule if the ack timer is already running.
if (this._ackTimer)
Expand Down Expand Up @@ -371,18 +383,22 @@ WebsocketChannel.prototype._flushSendQueue = function() {

this._debug('flush %d msgs', this._seq - this._sentSeq);

while (this._sentSeq < this._seq) {
while (this._sentSeq < this._seq && this._isOpen()) {
packet = sendQueue[++this._sentSeq];
packet.ack = this._sentAck = this._receivedSeq;
this._websocket.send(JSON.stringify(packet), afterSend);
}

// Send an ack-only packet if necessary.
if (this._sentAck !== this._receivedSeq) {
if (this._sentAck !== this._receivedSeq && this._isOpen()) {
packet = {ack: this._sentAck = this._receivedSeq};
this._websocket.send(JSON.stringify(packet), afterSend);
}

if (packet) {
this._unscheduleSendAck();
}

function afterSend(err) {
self._debug('sent: err?', err);
if (err)
Expand Down
160 changes: 160 additions & 0 deletions test/test-ws-errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
'use strict';

var Server = require('./mock-server');
var WebsocketChannel = require('../ws-channel');
var tap = require('tap');

var NULL_WS_PKT = new Buffer([0, 0, 0, 0]);
var server;
var wsURL;
var client;
var clients = [];

tap.test('create server', function(t) {
server = new Server('test-control', echo('server'), onListening);
server.channel.on('connection', addClient);
function onListening(url) {
wsURL = url;
t.assert(wsURL, 'server is listening at ' + wsURL);
t.end();
}
function addClient(ws) {
t.comment('new WS client:', ws._socket.remotePort);
clients.push(ws);
}
});

tap.test('create client', function(t) {
client = new WebsocketChannel(echo('client'));
t.assert(client, 'client channel should exist');
client.on('error', connect);
client.once('connect', function(ws) {
t.assert(ws, 'client should connect');
t.end();
});
connect();

function connect(err) {
if (err) {
t.comment('reconnecting due to error: ', err);
}
client.connect(wsURL);
}
});

tap.test('sabotage first websocket', function(t) {
sabotage(t, 1, client, clients);
});

tap.test('verify reconnected', function(t) {
t.plan(2);
server.request('Hello?', function(res) {
t.equal(res, 'Hello?');
});
client.request('Is there anybody in there?', function(res) {
t.equal(res, 'Is there anybody in there?');
});
});

tap.test('sabotage second websocket', function(t) {
sabotage(t, 2, client, clients);
});

tap.test('verify reconnected', function(t) {
t.plan(2);
server.request('Just nod if you can hear me.', function(res) {
t.equal(res, 'Just nod if you can hear me.');
});
client.request('Is there anyone home?', function(res) {
t.equal(res, 'Is there anyone home?');
});
});

tap.test('sabotage third websocket', function(t) {
sabotage(t, 3, client, clients);
});

tap.test('verify reconnected', function(t) {
t.plan(2);
server.request('Come on now', function(res) {
t.equal(res, 'Come on now');
});
client.request('I hear you\'re feeling down.', function(res) {
t.equal(res, 'I hear you\'re feeling down.');
});
});

tap.test('delay', function(t) {
t.comment('let both sides send any unsent ACKs');
setTimeout(t.end, 100);
});

tap.test('gracefully disconnect client', function(t) {
inspect(t, 'client', client);
inspect(t, 'server', client);
client.close(function(err) {
t.ifError(err, 'should be no errors when disconnecting');
t.end();
});
client.on('error', function(err) {
console.error('client error:', err);
});
server.channel.on('error', function(err) {
console.error('server error:', err);
});
});

tap.test('shutdown server', function(t) {
inspect(t, 'client', client);
inspect(t, 'server', client);
server.stop(function() {
t.pass('server shut down');
t.end();
});
});

tap.test('count the bodies', function(t) {
t.equal(clients.length, 4, 'should be 4 different connection attempts');
t.end();
});

function echo(name) {
return _echo;
function _echo(req, callback) {
console.log('# %s:', name, req);
callback(req);
}
}

function sabotage(t, count, client, clients) {
var latestClient = clients[count - 1];
t.plan(5);
t.equal(clients.length, count, 'should be latest client');
client.once('error', function(err) {
t.ok(err, 'client side gets an error event');
});
server.channel.once('connection', function(ws) {
t.ok(ws, 'server side gets a new client');
});
client.once('connect', function(ws) {
t.ok(ws, 'client should reconnect');
});
lightOnFire(t, latestClient);
}

function inspect(t, name, ch) {
t.comment('%s => sendSeq: %j, recvAck: %j, sentAck: %j, recvSeq: %j',
name, ch._sentSeq, ch._receivedAck, ch._sentAck, ch._receivedSeq);
}


function lightOnFire(t, ws) {
t.assert(ws && ws._socket, 'given a socket to light on fire');
t.comment('burning ws: %d', ws._socket.remotePort);
// throw a bad WS frame directly onto the socket to confuse the protocol's
// state machine.... then set it on fire.
if (ws._socket) {
ws._socket.end(NULL_WS_PKT);
}
ws.terminate();
}
5 changes: 3 additions & 2 deletions test/test-ws-reconnect.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ function onError(err) {
// The tolerable errors are ECONNRESET and "write after end"
// either which may happen after the client kills the websocket
// when it reconnects.
assert(/write after end/i.test(err.message) ||
err.code === 'ECONNRESET');
assert.ifError(!/write after end/i.test(err.message) &&
err.code !== 'ECONNRESET' &&
err);
}