-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.ts
174 lines (157 loc) Β· 4.18 KB
/
server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import {
serve as serveHTTP,
serveTLS as serveHTTPS,
Server,
ServerRequest,
HTTPOptions,
HTTPSOptions,
} from "https://deno.land/[email protected]/http/server.ts";
import { Queue } from "./queue.ts";
import {
acceptWebSocket,
WebSocket,
WebSocketEvent,
} from "https://deno.land/[email protected]/ws/mod.ts";
type WebSocketServerEvent = {
event: WebSocketEvent;
socket: WebSocket;
};
export class WebSocketServer {
public sockets: Set<WebSocket>;
private queue: Queue<WebSocketServerEvent>;
constructor(private httpServer?: Server) {
this.sockets = new Set();
this.queue = new Queue();
if (this.httpServer) {
// Upgrade all incoming HTTP requests to WebSockets
// (in other words hijack the HTTP server)
this.upgradeAllRequests();
} else {
/* Handle upgrade HTTP requests through handleUpgrade()
(leaving the HTTP server free to handle other requests) */
}
}
async close() {
this.queue.stop();
// Close all sockets before killing the server
// to allow close frames to be sent through the sockets
const closePromises = [...this.sockets].map((socket) => {
try {
return socket.close();
} catch (e) {
return Promise.resolve();
}
});
await Promise.all(closePromises);
if (this.httpServer) {
this.httpServer.close();
}
}
private async upgradeAllRequests() {
if (this.httpServer) {
for await (const request of this.httpServer) {
this.handleUpgrade(request);
}
}
}
// Upgrades any new HTTP request and start handling its events
public async handleUpgrade(req: ServerRequest) {
const { conn, r: bufReader, w: bufWriter, headers } = req;
try {
const socket = await acceptWebSocket(
{ conn, bufReader, bufWriter, headers },
);
this.trackSocket(socket);
this.handleSocketEvents(socket);
return socket;
} catch (err) {
console.error(err);
await req.respond({
status: 400, // Bad request
body: err.toString(),
});
}
}
private trackSocket(socket: WebSocket) {
this.sockets.add(socket);
}
private untrackSocket(socket: WebSocket) {
this.sockets.delete(socket);
}
// Adds WebSocket events to the queue
private async handleSocketEvents(socket: WebSocket) {
// TODO add a connection event which also exposes the http request?
for await (const event of socket) {
this.queue.add({ socket, event } as WebSocketServerEvent);
}
// When socket is closed, the for await loop will be finished
this.untrackSocket(socket);
// TODO also try to close the socket here if loop breaks due to error?
// try {
// socket.close();
// } catch (e) {
//
// }
}
[Symbol.asyncIterator](): AsyncIterableIterator<WebSocketServerEvent> {
return this.queue.iterate();
}
}
/**
* Create a WebSocket server with given options
*
* TODO add example usage here
*
* @param addr Server configuration
* @return Async iterable server instance for incoming socket events
*/
export function serve(addr: string | HTTPOptions): WebSocketServer {
const httpServer = serveHTTP(addr);
return new WebSocketServer(httpServer);
}
/**
* Start an WebSocket server with given options and event handler
*
* TODO add example usage here
*
* @param addr Server configuration
* @param handler Socket event handler
*/
export async function listenAndServe(
addr: string | HTTPOptions,
handler: (wsEvent: WebSocketServerEvent) => void,
): Promise<void> {
const server = serve(addr);
for await (const wsEvent of server) {
handler(wsEvent);
}
}
/**
* Create a secure WebSocket server (WSS) with given options
*
* TODO add example usage here
*
* @param options Server configuration
* @return Async iterable server instance for incoming socket events
*/
export function serveTLS(options: HTTPSOptions): WebSocketServer {
const httpsServer = serveHTTPS(options);
return new WebSocketServer(httpsServer);
}
/**
* Start a WebSocket server with given options and event handler
*
* TODO add example usage here
*
* @param options Server configuration
* @param handler Socket event handler
*/
export async function listenAndServeTLS(
options: HTTPSOptions,
handler: (wsEvent: WebSocketServerEvent) => void,
): Promise<void> {
const server = serveTLS(options);
for await (const wsEvent of server) {
handler(wsEvent);
}
}