Skip to content
This repository has been archived by the owner on Nov 27, 2017. It is now read-only.

Commit

Permalink
Merge pull request #1142 from chirino/master
Browse files Browse the repository at this point in the history
  • Loading branch information
pure-bot[bot] authored Oct 26, 2017
2 parents 73d98ed + d477351 commit def23ec
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 161 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"tether": "1.4.0",
"ts-helpers": "1.1.2",
"typescript-logging": "0.4.0",
"url": "^0.11.0",
"zone.js": "0.8.4"
},
"devDependencies": {
Expand Down
273 changes: 112 additions & 161 deletions src/app/store/entity/events.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { Subscription } from 'rxjs/Subscription';
import { ConfigService } from '../../config.service';
import { Restangular } from 'ngx-restangular';
import { log } from '../../logging';
import { resolve } from 'url';
import 'rxjs/add/operator/first';

export class ChangeEvent {
action: string;
Expand All @@ -16,194 +18,143 @@ class MessageEvent {
event: string;
}

const MAX_RETRIES = 5;
const RECONNECT_TIME = 5000;

@Injectable()
export class EventsService {
private eventSource: EventSource;
private webSocket: WebSocket;
private starting = false;
private retries = 0;
private preferredProtocol = null;

messageEvents: Subject<String> = new Subject<String>();
changeEvents: Subject<ChangeEvent> = new Subject<ChangeEvent>();

constructor(private config: ConfigService, private restangular: Restangular) {
this.startConnection();
this.startConnection(this.retries % 2 === 0);
}

private startConnection() {
onFailure(event) {
this.starting = false;
this.retries++;

if ( this.webSocket ) {
this.webSocket.close();
this.webSocket = undefined;
}
if ( this.eventSource ) {
this.eventSource.close();
this.eventSource = undefined;
}

// Initialy retry very quickly.
let reconnectIn = RECONNECT_TIME;
if (this.retries < 3) {
reconnectIn = 1;
}

setTimeout(() => {
log.info('Reconnecting');
switch (this.preferredProtocol) {
// Once we find a protocol that works, keep using it.
case 'ws':
this.startConnection(true);
break;
case 'es':
this.startConnection(false);
break;
default:
// Keep flipping between WS and ES untill we find one that
// works.
this.startConnection(this.retries % 2 === 0);
}
}, reconnectIn);
}

private startConnection(connectUsingWebSockets) {
if (this.starting) {
return;
}
this.starting = true;
// Setup an event stream reservation first..
const createReservation = (retries = 0) => {
let sub: Subscription = undefined;
try {
sub = this.restangular.all('event/reservations').customPOST().subscribe(
response => {
const apiEndpoint = this.config.getSettings().apiEndpoint;
const reservation = response.data;
// try {
// // First try to connect via a WebSocket
// const wsApiEndpoint = apiEndpoint.replace(/^http/, 'ws');
// this.connectWebSocket(
// wsApiEndpoint + '/event/streams.ws/' + reservation,
// );
// log.info('Connecting using web socket');
// this.starting = false;
// } catch (error) {
// // Then fallback to using EventSource
// log.info('Unable to connect web socket, falling back to SSE');
try {
this.connectEventSource(
apiEndpoint + '/event/streams/' + reservation,
);
this.starting = false;
log.info('Connecting using server side events');
} catch (err) {
log.info('Failed to connect to event source');
}
// }
},
error => {
if (sub) {
sub.unsubscribe();
}
// try and reconnect
if (this.eventSource) {
this.eventSource.close();
this.eventSource = undefined;
}
if (this.webSocket) {
this.webSocket.close();
this.webSocket = undefined;
}
if (retries >= MAX_RETRIES) {
log.info(
'Giving up event stream reservation, refresh the page to retry',

try {
this.restangular.all('event/reservations').customPOST().first().subscribe(
response => {
const apiEndpoint = this.config.getSettings().apiEndpoint;
const reservation = response.data;
try {
if ( connectUsingWebSockets ) {
let wsApiEndpoint = resolve(window.location.href, apiEndpoint);
wsApiEndpoint = wsApiEndpoint.replace(/^http/, 'ws');
wsApiEndpoint += '/event/streams.ws/' + reservation,
this.connectWebSocket(wsApiEndpoint);
log.info('Connecting using web socket');
this.starting = false;
} else {
this.connectEventSource(
apiEndpoint + '/event/streams/' + reservation,
);
return;
this.starting = false;
log.info('Connecting using server side events');
}
setTimeout(() => {
log.info('Attempting to fetch new event stream reservation');
createReservation(retries + 1);
}, RECONNECT_TIME);
},
);
} catch (err) {
if (sub) {
sub.unsubscribe();
}
if (retries >= MAX_RETRIES) {
log.info(
'Giving up event stream reservation, refresh the page to retry',
);
return;
}
setTimeout(() => {
log.info('Attempting to fetch new event stream reservation');
createReservation(retries + 1);
}, RECONNECT_TIME);
}
};
createReservation();
} catch (error) {
this.onFailure(error);
}
},
error => {
this.onFailure(error);
},
);
} catch (error) {
this.onFailure(error);
}
}

private connectEventSource(url: string) {
const setupEventSource = (eventSource: EventSource, retries = 0) => {
const onMessage = event => {
log.info('sse.message: ' + JSON.stringify(event.data));
this.messageEvents.next(event.data);
};
const onChangeEvent = event => {
const value = JSON.parse(event.data) as ChangeEvent;
log.info('sse.change-event: ' + JSON.stringify(value));
this.changeEvents.next(value);
};
const onClose = event => {
log.info('sse.close: ' + JSON.stringify(event));
if (retries >= MAX_RETRIES) {
log.info(
'ss.close: Max retries reached, giving up trying to reconnect SSE',
);
return;
}
setTimeout(() => {
log.info('sse.close: attempting to reconnect');
this.eventSource = setupEventSource(
new EventSource(url),
retries + 1,
);
}, RECONNECT_TIME);
};
const onError = event => {
log.info('sse.error: attempting to reconnect');
try {
eventSource.removeEventListener('message', onMessage);
eventSource.removeEventListener('change-event', onChangeEvent);
eventSource.removeEventListener('close', onClose);
eventSource.removeEventListener('error', onError);
eventSource.close();
} catch (err) {
// ignore
}
// This happens frequently, so let's reconnect sooner
setTimeout(() => {
this.startConnection();
}, 1000);
};
eventSource.addEventListener('message', onMessage);
eventSource.addEventListener('change-event', onChangeEvent);
eventSource.addEventListener('close', onClose);
eventSource.addEventListener('error', onError);
return eventSource;
this.eventSource = new EventSource(url);
this.eventSource.addEventListener('message', (event) => {
this.starting = false;
this.preferredProtocol = 'es';
log.info('sse.message: ' + JSON.stringify(event.data));
this.messageEvents.next(event.data);
});
this.eventSource.addEventListener('change-event', (event) => {
const value = JSON.parse(event.data) as ChangeEvent;
log.info('sse.change-event: ' + JSON.stringify(value));
this.changeEvents.next(value);
});
const onError = (event) => {
log.info('sse.close: ' + JSON.stringify(event));
this.onFailure(event);
};
this.eventSource = setupEventSource(new EventSource(url));
this.eventSource.addEventListener('close', onError);
this.eventSource.addEventListener('error', onError);
}

private connectWebSocket(url) {
const setupWebSocket = (ws, retries = 0) => {
const onMessage = event => {
const messageEvent = JSON.parse(event.data) as MessageEvent;
switch (messageEvent.event) {
case 'message':
log.info('ws.message: ' + JSON.stringify(messageEvent.data));
this.messageEvents.next(messageEvent.data);
break;
case 'change-event':
log.info('ws.change-event: ' + JSON.stringify(messageEvent.data));
const value = JSON.parse(messageEvent.data) as ChangeEvent;
this.changeEvents.next(value);
break;
default:
log.info('ws.unknown-message: ' + JSON.stringify(event));
}
};
const onClose = event => {
log.info('ws.onclose: ' + JSON.stringify(event));
if (retries >= MAX_RETRIES) {
log.info(
'ws.onclose: Max retries reached, giving up trying to reconnect websocket',
);
return;
}
setTimeout(() => {
log.info('ws.onclose: attempting to reconnect');
try {
this.webSocket = setupWebSocket(new WebSocket(url), retries + 1);
} catch (err) {
log.info('ws connect failed, getting new registration');
this.webSocket = undefined;
this.startConnection();
}
}, RECONNECT_TIME);
};
ws.onmessage = onMessage;
ws.onclose = onClose;
return ws;
this.webSocket = new WebSocket(url);
this.webSocket.onmessage = (event) => {
const messageEvent = JSON.parse(event.data) as MessageEvent;
switch (messageEvent.event) {
case 'message':
this.starting = false;
this.preferredProtocol = 'ws';
log.info('ws.message: ' + JSON.stringify(messageEvent.data));
this.messageEvents.next(messageEvent.data);
break;
case 'change-event':
log.info('ws.change-event: ' + JSON.stringify(messageEvent.data));
const value = JSON.parse(messageEvent.data) as ChangeEvent;
this.changeEvents.next(value);
break;
default:
log.info('ws.unknown-message: ' + JSON.stringify(event));
}
};
this.webSocket.onclose = (event) => {
log.info('ws.onclose: ' + JSON.stringify(event));
this.onFailure(event);
};
this.webSocket = setupWebSocket(new WebSocket(url));
}
}

0 comments on commit def23ec

Please sign in to comment.