This repository has been archived by the owner on Mar 30, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPusher.cs
392 lines (330 loc) · 13.9 KB
/
Pusher.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Nito.AsyncEx;
namespace PusherClient
{
/* TODO: Write tests
* - Websocket disconnect
- Connection lost, not cleanly closed
- MustConnectOverSSL = 4000,
- App does not exist
- App disabled
- Over connection limit
- Path not found
- Client over rate limie
- Conditions for client event triggering
*/
// TODO: Implement connection fallback strategy
/// <summary>
/// The Pusher Client object
/// </summary>
public class Pusher : EventEmitter, IPusher, ITriggerChannels
{
/// <summary>
/// Fires when a connection has been established with the Pusher Server
/// </summary>
public event ConnectedEventHandler Connected;
/// <summary>
/// Fires when the connection is disconnection from the Pusher Server
/// </summary>
public event ConnectedEventHandler Disconnected;
/// <summary>
/// Fires when the connection state changes
/// </summary>
public event ConnectionStateChangedEventHandler ConnectionStateChanged;
/// <summary>
/// Fire when an error occurs
/// </summary>
public event ErrorEventHandler Error;
/// <summary>
/// The TraceSource instance to be used for logging
/// </summary>
public static TraceSource Trace = new TraceSource(nameof(Pusher));
private readonly string _applicationKey;
private readonly PusherOptions _options;
private readonly List<string> _pendingChannelSubscriptions = new List<string>();
private readonly AsyncLock _mutex = new AsyncLock();
private Connection _connection;
/// <summary>
/// Gets the Socket ID
/// </summary>
public string SocketID => _connection?.SocketId;
/// <summary>
/// Gets the current connection state
/// </summary>
public ConnectionState State => _connection?.State ?? ConnectionState.NotConnected;
/// <summary>
/// Gets the channels in use by the Client
/// </summary>
public ConcurrentDictionary<string, Channel> Channels { get; private set; } = new ConcurrentDictionary<string, Channel>();
/// <summary>
/// Gets the Options in use by the Client
/// </summary>
internal PusherOptions Options => _options;
/// <summary>
/// Initializes a new instance of the <see cref="Pusher" /> class.
/// </summary>
/// <param name="applicationKey">The application key.</param>
/// <param name="options">The options.</param>
public Pusher(string applicationKey, PusherOptions options = null)
{
if (string.IsNullOrWhiteSpace(applicationKey))
throw new ArgumentException(ErrorConstants.ApplicationKeyNotSet, nameof(applicationKey));
_applicationKey = applicationKey;
_options = options ?? new PusherOptions { Encrypted = false };
}
void IPusher.ConnectionStateChanged(ConnectionState state)
{
if (state == ConnectionState.Connected)
{
SubscribeExistingChannels();
if (Connected != null)
Connected(this);
}
else if (state == ConnectionState.Disconnected)
{
MarkChannelsAsUnsubscribed();
if (Disconnected != null)
Disconnected(this);
if (ConnectionStateChanged != null)
ConnectionStateChanged(this, state);
}
else
{
if (ConnectionStateChanged != null)
ConnectionStateChanged(this, state);
}
}
void IPusher.ErrorOccured(PusherException pusherException)
{
RaiseError(pusherException);
}
void IPusher.EmitPusherEvent(string eventName, string data)
{
EmitEvent(eventName, data);
}
void IPusher.EmitChannelEvent(string channelName, string eventName, string data)
{
if (Channels.ContainsKey(channelName))
{
Channels[channelName].EmitEvent(eventName, data);
}
}
void IPusher.AddMember(string channelName, string member)
{
if (Channels.Keys.Contains(channelName) && Channels[channelName] is PresenceChannel)
{
((PresenceChannel)Channels[channelName]).AddMember(member);
}
}
void IPusher.RemoveMember(string channelName, string member)
{
if (Channels.Keys.Contains(channelName) && Channels[channelName] is PresenceChannel)
{
((PresenceChannel)Channels[channelName]).RemoveMember(member);
}
}
void IPusher.SubscriptionSuceeded(string channelName, string data)
{
if (_pendingChannelSubscriptions.Contains(channelName))
_pendingChannelSubscriptions.Remove(channelName);
if (Channels.Keys.Contains(channelName))
{
Channels[channelName].SubscriptionSucceeded(data);
}
}
/// <summary>
/// Start the connection to the Pusher Server. When completed, the <see cref="Connected"/> event will fire.
/// </summary>
public ConnectionState Connect()
{
return AsyncContext.Run(ConnectAsync);
}
/// <summary>
/// Start the connection to the Pusher Server asynchronously. When completed, the <see cref="Connected"/> event will fire.
/// </summary>
public async Task<ConnectionState> ConnectAsync()
{
if (_connection != null)
{
Trace.TraceEvent(TraceEventType.Warning, 0, ErrorConstants.ConnectionAlreadyConnected);
return ConnectionState.AlreadyConnected;
}
// Prevent multiple concurrent connections
var connectionResult = ConnectionState.Connecting;
using (await _mutex.LockAsync())
{
// Ensure we only ever attempt to connect once
if (_connection != null)
{
Trace.TraceEvent(TraceEventType.Warning, 0, ErrorConstants.ConnectionAlreadyConnected);
return ConnectionState.AlreadyConnected;
}
// TODO: Fallback to secure?
var url = ConstructUrl();
_connection = new Connection(this, url);
connectionResult = await _connection.Connect();
}
return connectionResult;
}
private string ConstructUrl()
{
var scheme = _options.Encrypted ? Constants.SECURE_SCHEMA : Constants.INSECURE_SCHEMA;
return $"{scheme}{_options.Host}/app/{_applicationKey}?protocol={Settings.Default.ProtocolVersion}&client={Settings.Default.ClientName}&version={Settings.Default.VersionNumber}";
}
/// <summary>
/// Start the disconnection from the Pusher Server. When completed, the <see cref="Disconnected"/> event will fire.
/// </summary>
public ConnectionState Disconnect()
{
return AsyncContext.Run(DisconnectAsync);
}
/// <summary>
/// Start the disconnection from the Pusher Server asynchronously. When completed, the <see cref="Disconnected"/> event will fire.
/// </summary>
public async Task<ConnectionState> DisconnectAsync()
{
ConnectionState connectionResult = ConnectionState.Disconnecting;
if (_connection != null)
{
MarkChannelsAsUnsubscribed();
connectionResult = await _connection.Disconnect();
}
else
{
connectionResult = ConnectionState.Disconnected;
}
return connectionResult;
}
/// <summary>
/// Subscribes to the given channel, unless the channel already exists, in which case the xisting channel will be returned.
/// </summary>
/// <param name="channelName">The name of the Channel to subsribe to</param>
/// <returns>The Channel that is being subscribed to</returns>
public Channel Subscribe(string channelName)
{
return AsyncContext.Run(() => SubscribeAsync(channelName));
}
/// <summary>
/// Subscribes to the given channel asynchronously, unless the channel already exists, in which case the xisting channel will be returned.
/// </summary>
/// <param name="channelName">The name of the Channel to subsribe to</param>
/// <returns>The Channel that is being subscribed to</returns>
public async Task<Channel> SubscribeAsync(string channelName)
{
if (string.IsNullOrWhiteSpace(channelName))
{
throw new ArgumentException("The channel name cannot be null or whitespace", nameof(channelName));
}
if (AlreadySubscribed(channelName))
{
Trace.TraceEvent(TraceEventType.Warning, 0, "Channel '" + channelName + "' is already subscribed to. Subscription event has been ignored.");
return Channels[channelName];
}
_pendingChannelSubscriptions.Add(channelName);
return await SubscribeToChannel(channelName);
}
private async Task<Channel> SubscribeToChannel(string channelName)
{
var channelType = GetChannelType(channelName);
if (!Channels.ContainsKey(channelName))
CreateChannel(channelType, channelName);
if (State == ConnectionState.Connected)
{
if (channelType == ChannelTypes.Presence || channelType == ChannelTypes.Private)
{
var jsonAuth = _options.Authorizer.Authorize(channelName, _connection.SocketId);
var template = new { auth = string.Empty, channel_data = string.Empty };
var message = JsonConvert.DeserializeAnonymousType(jsonAuth, template);
await _connection.Send(JsonConvert.SerializeObject(new { @event = Constants.CHANNEL_SUBSCRIBE, data = new { channel = channelName, auth = message.auth, channel_data = message.channel_data } }));
}
else
{
// No need for auth details. Just send subscribe event
await _connection.Send(JsonConvert.SerializeObject(new { @event = Constants.CHANNEL_SUBSCRIBE, data = new { channel = channelName } }));
}
}
return Channels[channelName];
}
private static ChannelTypes GetChannelType(string channelName)
{
// If private or presence channel, check that auth endpoint has been set
var channelType = ChannelTypes.Public;
if (channelName.ToLowerInvariant().StartsWith(Constants.PRIVATE_CHANNEL))
{
channelType = ChannelTypes.Private;
}
else if (channelName.ToLowerInvariant().StartsWith(Constants.PRESENCE_CHANNEL))
{
channelType = ChannelTypes.Presence;
}
return channelType;
}
private void CreateChannel(ChannelTypes type, string channelName)
{
switch (type)
{
case ChannelTypes.Public:
Channels[channelName] = new Channel(channelName, this);
break;
case ChannelTypes.Private:
AuthEndpointCheck();
Channels[channelName] = new PrivateChannel(channelName, this);
break;
case ChannelTypes.Presence:
AuthEndpointCheck();
Channels[channelName] = new PresenceChannel(channelName, this);
break;
}
}
private void AuthEndpointCheck()
{
if (_options.Authorizer == null)
{
var pusherException = new PusherException("You must set a ChannelAuthorizer property to use private or presence channels", ErrorCodes.ChannelAuthorizerNotSet);
RaiseError(pusherException);
throw pusherException;
}
}
void ITriggerChannels.Trigger(string channelName, string eventName, object obj)
{
AsyncContext.Run(() => _connection.Send(JsonConvert.SerializeObject(new { @event = eventName, channel = channelName, data = obj })));
}
void ITriggerChannels.Unsubscribe(string channelName)
{
if (_connection.IsConnected)
AsyncContext.Run(() => _connection.Send(JsonConvert.SerializeObject(new { @event = Constants.CHANNEL_UNSUBSCRIBE, data = new { channel = channelName } })));
}
private void RaiseError(PusherException error)
{
var handler = Error;
if (handler != null)
handler(this, error);
else
Pusher.Trace.TraceEvent(TraceEventType.Error, 0, error.ToString());
}
private bool AlreadySubscribed(string channelName)
{
return _pendingChannelSubscriptions.Contains(channelName) || (Channels.ContainsKey(channelName) && Channels[channelName].IsSubscribed);
}
private void MarkChannelsAsUnsubscribed()
{
foreach (var channel in Channels)
{
channel.Value.Unsubscribe();
}
}
private void SubscribeExistingChannels()
{
foreach (var channel in Channels)
{
AsyncContext.Run(() => SubscribeToChannel(channel.Key));
}
}
}
}