Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Websocket transport #2

Merged
merged 5 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
bazel-*
1 change: 1 addition & 0 deletions js/grpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ closure_js_library(
":grpc",
":options",
"//js/grpc/transport:fetch",
"//js/grpc/transport:websocket",
"//js/grpc/transport:xhr",
"@io_bazel_rules_closure//closure/library",
"@io_bazel_rules_closure//closure/library/labs/useragent:browser",
Expand Down
36 changes: 32 additions & 4 deletions js/grpc/api.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
goog.module('grpc.Api');

const FetchTransport = goog.require('grpc.transport.Fetch');
const GrpcEndpoint = goog.require('grpc.Endpoint');
const GrpcOptions = goog.require('grpc.Options');
const Transport = goog.require('grpc.Transport');
const WebSocketTransport = goog.require('grpc.transport.WebSocket');
const XhrTransport = goog.require('grpc.transport.Xhr');
const browser = goog.require('goog.labs.userAgent.browser');

Expand All @@ -19,23 +21,49 @@ class Api {
*/
constructor(opt_options, opt_transport) {

const options = opt_options || new GrpcOptions();
/**
* @private @const {!GrpcOptions}
*/
this.options_ = opt_options || new GrpcOptions();

/**
* @const @private
* @type {!Transport}
*/
this.transport_ = opt_transport || (
fetchSupported() ? new FetchTransport(options) : new XhrTransport(options));
fetchSupported() ? new FetchTransport(this.options_) : new XhrTransport(this.options_));
}

/**
* @param {?GrpcEndpoint=} opt_endpoint Optional endpoint config allows caller
* to select a per-call transport.
* @return {!Transport}
*/
getTransport() {
getTransport(opt_endpoint) {
if (opt_endpoint && opt_endpoint.transport) {
return this.getTransportByType(/** @type {!Transport.Type } */(opt_endpoint.transport));
}
return this.transport_;
}


/**
* Returns a transport by name.
* @param {!Transport.Type} type
* @return {!Transport}
*/
getTransportByType(type) {
switch (type) {
case 'fetch':
return new FetchTransport(this.options_);
case 'xhr':
return new XhrTransport(this.options_);
case 'websocket':
return new WebSocketTransport(this.options_);
default:
throw new Error(`unknown transport type: ${type}`);
}
}

}

/**
Expand Down
4 changes: 3 additions & 1 deletion js/grpc/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
*/
goog.module('grpc.Endpoint');


/**
* Metadata about a procedure endpoint that is used to make a call.
* All attributes are optional. 'path' is a prefix such as '/api/v1' used to route
* at the server.
*
* An optional transport name can be requested. Allowed values are enum Transport.Type.
*
* @typedef{{
path:(string|undefined),
host:(string|undefined),
port:(number|undefined),
transport:(string|undefined),
}}
*/
var Endpoint;
Expand Down
15 changes: 13 additions & 2 deletions js/grpc/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,23 @@ const Transport = function () { };
* @param {string} name The name of the procedure to call
* @param {!function(INPUT):!ByteSource} encoder A serializer function that can encode input messages.
* @param {!function(!ByteSource):OUTPUT} decoder A serializer function that can decode output messages.
* @param {!Observer<OUTPUT>} observer An observer used to recieve events.
* @param {!Observer<OUTPUT>} observer An observer used to recieve responses.
* @param {?Endpoint=} opt_endpoint Optional additional endpoint configuration.
* @return {!Observer<INPUT>} The input observer that the caller should provide events to.
* @return {!Observer<INPUT>} The input observer that the caller should supply
* with requests.
* @template INPUT
* @template OUTPUT
*/
Transport.prototype.call = function (name, encoder, decoder, observer, opt_endpoint) { };

/**
* @public
* @enum {string}
*/
Transport.Type = {
XHR: 'xhr',
FETCH: 'fetch',
WEBSOCKET: 'websocket',
};

exports = Transport;
17 changes: 17 additions & 0 deletions js/grpc/transport/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ closure_js_library(
],
)

closure_js_library(
name = "websocket",
srcs = [
"websocket.js",
"websocket/observer.js",
],
deps = [
":base_observer",
"//js/grpc",
"//js/grpc:options",
"//js/grpc/transport/chunk",
"@io_bazel_rules_closure//closure/library",
"@io_bazel_rules_closure//closure/library/net:websocket",
"@io_bazel_rules_closure//closure/protobuf:jspb",
],
)

closure_js_test(
name = "xhr_test",
srcs = [
Expand Down
8 changes: 6 additions & 2 deletions js/grpc/transport/base_observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ class Observer {
this.complete_ = false;
}


/**
* @override
*/
Expand Down Expand Up @@ -239,6 +238,12 @@ class Observer {
return this.status_;
}

/**
* @returns {!EventHandler}
*/
getHandler() {
return this.handler_;
}

/**
* Convert the protobuf encoded bytes to a grpc-request frame.
Expand All @@ -255,7 +260,6 @@ class Observer {
return data;
}


/**
* Convert the raw string to an ArrayBuffer
* @param {string} str The string to conver
Expand Down
38 changes: 37 additions & 1 deletion js/grpc/transport/chunk/parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,40 @@ function hasEnoughBytes(buffer, position, byteCount) {
return buffer.byteLength - position >= byteCount;
}

exports = { Parser, parseHeaders };
/**
* A validation function.
*
* @param {number} char
* @returns {boolean}
*/
function isAllowedControlChars(char) {
return char === 0x9 || char === 0xa || char === 0xd;
}

/**
* @param {number} val
* @returns {boolean}
*/
function isValidHeaderAscii(val) {
return isAllowedControlChars(val) || (val >= 0x20 && val <= 0x7e);
}

/**
* Encode the string in ascii as a uint8array.
*
* @param {string} input
* @returns {!Uint8Array}
*/
function encodeASCII(input) {
const encoded = new Uint8Array(input.length);
for (let i = 0; i !== input.length; ++i) {
const charCode = input.charCodeAt(i);
if (!isValidHeaderAscii(charCode)) {
throw new Error("Metadata contains invalid ASCII");
}
encoded[i] = charCode;
}
return encoded;
}

exports = { Parser, parseHeaders, encodeASCII };
2 changes: 1 addition & 1 deletion js/grpc/transport/fetch.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* @fileoverview Test transport implementation.
* @fileoverview Fetch transport implementation.
*
*/
goog.module('grpc.transport.Fetch');
Expand Down
39 changes: 39 additions & 0 deletions js/grpc/transport/websocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* @fileoverview Websocket transport implementation.
*
*/
goog.module('grpc.transport.WebSocket');

const Observer = goog.require('grpc.transport.websocket.Observer');
const Options = goog.require('grpc.Options');
const Transport = goog.require('grpc.Transport');


/**
* Transport implementation that uses the WebSocket API.
*
* @struct
* @implements {Transport}
* @template T
* @template E
*/
class WebSocket {

/**
* @param {!Options} options
*/
constructor(options) {
/** @const @private @type{!Options} */
this.options_ = options;
}

/**
* @override
*/
call(name, encoder, decoder, observer, opt_endpoint) {
return new Observer(this.options_, name, encoder, decoder, observer, opt_endpoint);
}

}

exports = WebSocket;
Loading