forked from BitMEX/api-connectors
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
237 lines (209 loc) · 7.82 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
'use strict';
const _ = require('lodash');
const EventEmitter = require('eventemitter2').EventEmitter2;
const util = require('util');
const debug = require('debug')('BitMEX:realtime-api');
const createSocket = require('./lib/createSocket');
const deltaParser = require('./lib/deltaParser');
const getStreams = require('./lib/getStreams');
const DEFAULT_MAX_TABLE_LEN = 10000;
const endpoints = {
production: 'wss://www.bitmex.com/realtime',
testnet: 'wss://testnet.bitmex.com/realtime'
};
const noSymbolTables = BitMEXClient.noSymbolTables = [
'account',
'affiliate',
'funds',
'insurance',
'margin',
'transact',
'wallet',
'announcement',
'connected',
'chat',
'publicNotifications',
'privateNotifications'
];
module.exports = BitMEXClient;
function BitMEXClient(options) {
const emitter = this;
// We inherit from EventEmitter2, which supports wildcards.
EventEmitter.call(emitter, {
wildcard: true,
delimiter: ':',
maxListeners: Infinity,
newListener: true,
});
if (!options) options = {};
this._data = {}; // internal data store keyed by [tableName][symbol]. Used by deltaParser.
this._keys = {}; // keys store - populated by images on connect
this._maxTableLen = typeof options.maxTableLen === 'number' ? options.maxTableLen : DEFAULT_MAX_TABLE_LEN;
if (!options.endpoint) {
options.endpoint = options.testnet ? endpoints.testnet : endpoints.production;
}
if (process.env.BITMEX_ENDPOINT) options.endpoint = process.env.BITMEX_ENDPOINT;
debug(options)
this._setupListenerTracking();
// Initialize the socket.
this.socket = createSocket(options, emitter);
if (options.apiKeyID) {
this.authenticated = true;
}
// Get valid streams so we can validate our subscriptions.
getStreams(options.endpoint, function(err, streams) {
if (err) throw err;
emitter.initialized = true;
emitter.streams = streams;
emitter.emit('initialize');
});
}
util.inherits(BitMEXClient, EventEmitter);
/**
* Simple data getter. Clones data on the way out so it can be safely modified.
* @param {String} [symbol] Symbol of data to retrieve.
* @param {String} [tableName] Table / stream name.
* @return {Object} All current data. If no tableName is provided, will return an object keyed by
* the table name.
*/
BitMEXClient.prototype.getData = function(symbol, tableName) {
const tableUsesSymbol = noSymbolTables.indexOf(tableName) === -1;
if (!tableUsesSymbol) symbol = '*';
let out;
// Both filters specified, easy return
if (symbol && tableName) {
out = this._data[tableName][symbol] || [];
}
// Since we're keying by [table][symbol], we have to search deep
else if (symbol && !tableName) {
out = Object.keys(this._data).reduce((memo, tableKey) => {
memo[tableKey] = this._data[tableKey][symbol] || [];
}, {});
}
// All table data
else if (!symbol && tableName) {
out = this._data[tableName] || {};
} else {
throw new Error('Pass a symbol, tableName, or both to getData([symbol], [tableName]) - but one must be provided.');
}
return clone(out);
};
/**
* Helper to get data for all symbols, by table.
*/
BitMEXClient.prototype.getTable = function(tableName) {
return this.getData(null, tableName);
};
/**
* Helper to get data for all tables, by symbol.
*/
BitMEXClient.prototype.getSymbol = function(symbol) {
return this.getData(symbol);
};
/**
* Add a stream to listen to. This function calls back with a full dataset with the arity
* (data, symbol, tableName).
*
* To catch errors, attach an 'error' listener to the client itself.
*
* If no tableName is passed, will subscribe to all public tables.
*
* @param {String} symbol Symbol to subscribe to.
* @param {String} [tableName] Table to subscribe to. See README.
* @param {Function} callback Data callback.
*/
BitMEXClient.prototype.addStream = function(symbol, tableName, callback) {
const client = this;
if (!this.initialized) {
return this.once('initialize', () => client.addStream(symbol, tableName, callback));
}
if (!this.socket.opened) {
// Not open yet. Call this when open
return this.socket.once('open', () => client.addStream(symbol, tableName, callback))
}
// Massage arguments.
if (typeof callback !== 'function') throw new Error('A callback must be passed to BitMEXClient#addStream.');
else if (client.streams.all.indexOf(tableName) === -1) {
throw new Error('Unknown table for BitMEX subscription: ' + tableName +
'. Available tables are ' + client.streams.all + '.');
}
addStreamHelper(client, symbol, tableName, callback);
};
BitMEXClient.prototype._setupListenerTracking = function() {
// Keep track of listeners.
const listenerTree = this._listenerTree = {};
this.on('newListener', (eventName) => {
const split = eventName.split(':');
if (split.length !== 3) return; // other events
const [table, , symbol] = split;
if (!listenerTree[table]) listenerTree[table] = {};
if (!listenerTree[table][symbol]) listenerTree[table][symbol] = 0;
listenerTree[table][symbol]++;
});
this.on('removeListener', (eventName) => {
const split = eventName.split(':');
if (split.length !== 3) return; // other events
const [table, , symbol] = split;
listenerTree[table][symbol]--;
});
}
BitMEXClient.prototype.subscriptionCount = function(table, symbol) {
return this._listenerTree[table] && this._listenerTree[table][symbol] || 0;
};
BitMEXClient.prototype.sendSubscribeRequest = function(table, symbol) {
console.log(JSON.stringify({op: 'subscribe', args: `${table}:${symbol}`}))
this.socket.send(JSON.stringify({op: 'subscribe', args: `${table}:${symbol}`}));
};
function addStreamHelper(client, symbol, tableName, callback) {
const tableUsesSymbol = noSymbolTables.indexOf(tableName) === -1;
if (!tableUsesSymbol) symbol = '*';
// Tell BitMEX we want to subscribe to this data. If wildcard, sub to all tables.
let toSubscribe;
if (tableName === '*') {
// This list comes from the getSymbols call, which hits
// https://www.bitmex.com/api/v1/schema/websocketHelp
toSubscribe = client.streams[client.authenticated ? 'all' : 'public'];
} else {
// Normal sub
toSubscribe = [tableName];
}
// For each subscription,
toSubscribe.forEach(function(table) {
// Create a subscription topic.
const subscription = `${table}:*:${symbol}`;
debug('Opening listener to %s.', subscription);
// Add the listener for deltas before subscribing at BitMEX.
// These events come from createSocket, which does minimal data parsing
// to figure out what table and symbol the data is for.
//
// The emitter emits 'partial', 'update', 'insert', and 'delete' events, listen to them all.
client.on(subscription, function(data) {
const [table, action, symbol] = this.event.split(':');
try {
const newData = deltaParser.onAction(action, table, symbol, client, data);
// Shift oldest elements out of the table (FIFO queue) to prevent unbounded memory growth
if (newData.length > client._maxTableLen) {
newData.splice(0, newData.length - client._maxTableLen);
}
callback(newData, symbol, table);
} catch(e) {
client.emit('error', e);
}
});
// If this is the first sub, subscribe to bitmex adapter.
if (client.subscriptionCount(table, symbol) === 1) {
const openSubscription = () => client.sendSubscribeRequest(table, symbol);
// If we reconnect, will need to reopen.
client.on('open', openSubscription);
// If we're already opened, prime the pump (I made that up)
if (client.socket.opened) openSubscription();
}
});
}
function clone(data) {
return data.map(o => Object.assign({}, o));
}
if (require.main === module) {
console.error('This module is not meant to be run directly. Try running example.js instead.');
process.exit(1);
}