diff --git a/lib/http.js b/lib/http.js index 46cc92f..9f2e578 100644 --- a/lib/http.js +++ b/lib/http.js @@ -17,6 +17,8 @@ // - **Event: 'connection' (socket, [endpoint])**: there's a second argument if the negotiation of // HTTP/2 was successful: the reference to the [Endpoint](endpoint.html) object tied to the // socket. +// - **Event: 'endpoint' (endpoint)**: the endpoint created by the new connection. Emitted +// before any data has been transmitted. // // - **http2.createServer(options, [requestListener])**: additional option: // - **log**: an optional [bunyan](https://github.com/trentm/node-bunyan) logger object @@ -38,6 +40,7 @@ // - **http2.request(options, [callback])**: additional option: // - **plain**: if `true`, the client will not try to build a TLS tunnel, instead it will use // the raw TCP stream for HTTP/2 +// - **onEndpoint**: A event handler for the endpoint event. // // - **Class: http2.ClientRequest** // - **Event: 'socket' (socket)**: in case of an HTTP/2 incoming message, `socket` is a reference @@ -45,6 +48,8 @@ // - **Event: 'push' (promise)**: signals the intention of a server push associated to this // request. `promise` is an IncomingPromise. If there's no listener for this event, the server // push is cancelled. +// - **Event: 'endpoint'**: Emited when the endpoint for the request has been created. If the +// request is reusing a connection, the existing Endpoint is emitted. // - **request.setPriority(priority)**: assign a priority to this request. `priority` is a number // between 0 (highest priority) and 2^31-1 (lowest priority). Default value is 2^30. // @@ -446,7 +451,15 @@ Server.prototype = Object.create(EventEmitter.prototype, { constructor: { value: // Starting HTTP/2 Server.prototype._start = function _start(socket) { - var endpoint = new Endpoint(this._log, 'SERVER', this._settings); + var self = this; + var endpoint = new Endpoint({ + log: this._log, + role: 'SERVER', + settings: this._settings, + onEndpoint: function (endpoint) { + self.emit('endpoint', endpoint); + } + }); this._log.info({ e: endpoint, client: socket.remoteAddress + ':' + socket.remotePort, @@ -841,6 +854,11 @@ Agent.prototype.request = function request(options, callback) { var request = new OutgoingRequest(this._log); + // * dealing with endpoint event handler + if (typeof options.onEndpoint === 'function') { + request.addListener('endpoint', options.onEndpoint); + } + if (callback) { request.on('response', callback); } @@ -854,17 +872,27 @@ Agent.prototype.request = function request(options, callback) { // * There's an existing HTTP/2 connection to this host if (key in this.endpoints) { var endpoint = this.endpoints[key]; + this.emit('endpoint', endpoint); + request._start(endpoint.createStream(), options); } // * HTTP/2 over plain TCP else if (options.plain) { - endpoint = new Endpoint(this._log, 'CLIENT', this._settings); + endpoint = new Endpoint({ + log: this._log, + role: 'CLIENT', + settings: this._settings, + onEndpoint: options.onEndpoint + }); + endpoint.socket = net.connect({ host: options.host, port: options.port, localAddress: options.localAddress }); + this.endpoints[key] = endpoint; + endpoint.pipe(endpoint.socket).pipe(endpoint); request._start(endpoint.createStream(), options); } @@ -879,7 +907,7 @@ Agent.prototype.request = function request(options, callback) { options.ciphers = options.ciphers || cipherSuites; var httpsRequest = https.request(options); - httpsRequest.on('socket', function(socket) { + httpsRequest.on('socket', function (socket) { var negotiatedProtocol = socket.alpnProtocol || socket.npnProtocol; if (negotiatedProtocol != null) { // null in >=0.11.0, undefined in <0.11.0 negotiated(); @@ -895,7 +923,13 @@ Agent.prototype.request = function request(options, callback) { if (negotiatedProtocol === protocol.VERSION) { httpsRequest.socket.emit('agentRemove'); unbundleSocket(httpsRequest.socket); - endpoint = new Endpoint(self._log, 'CLIENT', self._settings); + endpoint = new Endpoint({ + log: self._log, + role: 'CLIENT', + settings: self._settings, + onEndpoint: options.onEndpoint + }); + endpoint.socket = httpsRequest.socket; endpoint.pipe(endpoint.socket).pipe(endpoint); } diff --git a/lib/protocol/connection.js b/lib/protocol/connection.js index 18f26ae..e28663e 100644 --- a/lib/protocol/connection.js +++ b/lib/protocol/connection.js @@ -14,7 +14,12 @@ exports.Connection = Connection; // Public API // ---------- -// * **new Connection(log, firstStreamId, settings)**: create a new Connection +// * **new Connection(config)**: create a new Connection +// - `config.log`: bunyan logger of the parent +// - `config.firstStreamId`: the ID of the first outbound stream +// - `config.settings`: initial HTTP/2 settings +// - `config.onSentFrame`: Event handler for sentFrame event +// - `config.onReceivedFrame`: Event handler for receivedFrame event // // * **Event: 'error' (type)**: signals a connection level error made by the other end // @@ -23,6 +28,10 @@ exports.Connection = Connection; // // * **Event: 'stream' (stream)**: signals that there's an incoming stream // +// * **Event: 'sentFrame' (frame)**: signals a frame has been sent to the remote. +// +// * **Event: 'receivedFrame' (frame)**: signals a frame has been received from the remote. +// // * **createStream(): stream**: initiate a new stream // // * **set(settings, callback)**: change the value of one or more settings according to the @@ -36,15 +45,27 @@ exports.Connection = Connection; // ----------- // The main aspects of managing the connection are: -function Connection(log, firstStreamId, settings) { +function Connection(config) { // * initializing the base class Flow.call(this, 0); + // * save settings object + this._config = config || {}; + + // * attach frame events + if (typeof this._config.onSentFrame === 'function') { + this.addListener('sentFrame', this._config.onSentFrame); + } + + if (typeof this._config.onReceivedFrame === 'function') { + this.addListener('receivedFrame', this._config.onReceivedFrame); + } + // * logging: every method uses the common logger object - this._log = log.child({ component: 'connection' }); + this._log = this._config.log.child({component: 'connection'}); // * stream management - this._initializeStreamManagement(firstStreamId); + this._initializeStreamManagement(this._config.firstStreamId); // * lifecycle management this._initializeLifecycleManagement(); @@ -53,7 +74,7 @@ function Connection(log, firstStreamId, settings) { this._initializeFlowControl(); // * settings management - this._initializeSettingsManagement(settings); + this._initializeSettingsManagement(this._config.connectionSettings); // * multiplexing this._initializeMultiplexing(); @@ -308,6 +329,8 @@ priority_loop: continue; } + this.emit('sentFrame', frame); + nextBucket.push(stream); if (frame.stream === undefined) { @@ -371,6 +394,7 @@ Connection.prototype._receive = function _receive(frame, done) { // * and writes it to the `stream`'s `upstream` stream.upstream.write(frame); + this.emit('receivedFrame', frame); done(); }; @@ -451,13 +475,17 @@ Connection.prototype.set = function set(settings, callback) { } }); - // * Sending out the SETTINGS frame - this.push({ + var settingsFrame = { type: 'SETTINGS', - flags: { ACK: false }, + flags: {ACK: false}, stream: 0, settings: settings - }); + }; + + this.emit('sentFrame', settingsFrame); + + // * Sending out the SETTINGS frame + this.push(settingsFrame); for (var name in settings) { this.emit('SENDING_' + name, settings[name]); } diff --git a/lib/protocol/endpoint.js b/lib/protocol/endpoint.js index a218db0..2cb84e9 100644 --- a/lib/protocol/endpoint.js +++ b/lib/protocol/endpoint.js @@ -16,12 +16,12 @@ exports.Endpoint = Endpoint; // Public API // ---------- -// - **new Endpoint(log, role, settings, filters)**: create a new Endpoint. +// - **new Endpoint(config)**: create a new Endpoint. // -// - `log`: bunyan logger of the parent -// - `role`: 'CLIENT' or 'SERVER' -// - `settings`: initial HTTP/2 settings -// - `filters`: a map of functions that filter the traffic between components (for debugging or +// - `config.log`: bunyan logger of the parent +// - `config.role`: 'CLIENT' or 'SERVER' +// - `config.settings`: initial HTTP/2 settings +// - `config.filters`: a map of functions that filter the traffic between components (for debugging or // intentional failure injection). // // Filter functions get three arguments: @@ -37,6 +37,11 @@ exports.Endpoint = Endpoint; // // * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection // +// * **Event: 'sentFrame' (frame)**: signals a frame has been sent to the remote. Raised from underlying connection. +// +// * **Event: 'receivedFrame' (frame)**: signals a frame has been received to the remote. Raised from +// underlying connection. +// // * **Event: 'error' (type)**: signals an error // // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection) @@ -47,16 +52,25 @@ exports.Endpoint = Endpoint; // ----------- // The process of initialization: -function Endpoint(log, role, settings, filters) { +function Endpoint(config) { Duplex.call(this); + this._config = config || {}; + + // * Handle onEndpoint + if (typeof this._config.onEndpoint === 'function') { + this.addListener('endpoint', this._config.onEndpoint); + } + + this.emit('endpoint', this); + // * Initializing logging infrastructure - this._log = log.child({ component: 'endpoint', e: this }); + this._log = this._config.log.child({component: 'endpoint', e: this}); // * First part of the handshake process: sending and receiving the client connection header // prelude. - assert((role === 'CLIENT') || role === 'SERVER'); - if (role === 'CLIENT') { + assert((this._config.role === 'CLIENT') || this._config.role === 'SERVER'); + if (this._config.role === 'CLIENT') { this._writePrelude(); } else { this._readPrelude(); @@ -65,7 +79,7 @@ function Endpoint(log, role, settings, filters) { // * Initialization of component. This includes the second part of the handshake process: // sending the first SETTINGS frame. This is done by the connection class right after // initialization. - this._initializeDataFlow(role, settings, filters || {}); + this._initializeDataFlow(this._config.role, this._config.settings, this._config.filters || {}); // * Initialization of management code. this._initializeManagement(); @@ -169,6 +183,8 @@ function pipeAndFilter(stream1, stream2, filter) { Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) { var firstStreamId, compressorRole, decompressorRole; + var self = this; + if (role === 'CLIENT') { firstStreamId = 1; compressorRole = 'REQUEST'; @@ -183,7 +199,17 @@ Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, sett this._deserializer = new Deserializer(this._log); this._compressor = new Compressor(this._log, compressorRole); this._decompressor = new Decompressor(this._log, decompressorRole); - this._connection = new Connection(this._log, firstStreamId, settings); + this._connection = new Connection({ + log: this._log, + firstStreamId: firstStreamId, + connectionSettings: settings, + onSentFrame: function (frame) { + self.emit('sentFrame', frame); + }, + onReceivedFrame: function (frame) { + self.emit('receivedFrame', frame); + } + }); pipeAndFilter(this._connection, this._compressor, filters.beforeCompression); pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization); diff --git a/test/connection.js b/test/connection.js index 2c68857..0002c8f 100644 --- a/test/connection.js +++ b/test/connection.js @@ -65,7 +65,7 @@ describe('connection.js', function() { describe('invalid operation', function() { describe('unsolicited ping answer', function() { it('should be ignored', function() { - var connection = new Connection(util.log, 1, settings); + var connection = new Connection({log: util.log, firstStreamId: 1, connectionSettings: settings}); connection._receivePing({ stream: 0, @@ -82,8 +82,8 @@ describe('connection.js', function() { describe('test scenario', function() { var c, s; beforeEach(function() { - c = new Connection(util.log.child({ role: 'client' }), 1, settings); - s = new Connection(util.log.child({ role: 'client' }), 2, settings); + c = new Connection({log: util.log.child({ role: 'client' }), firstStreamId: 1, connectionSettings: settings}); + s = new Connection({log: util.log.child({ role: 'client' }), firstStreamId: 2, connectionSettings: settings}); c.pipe(s).pipe(c); }); diff --git a/test/endpoint.js b/test/endpoint.js index bdd2569..b8a326f 100644 --- a/test/endpoint.js +++ b/test/endpoint.js @@ -13,8 +13,16 @@ describe('endpoint.js', function() { describe('scenario', function() { describe('connection setup', function() { it('should work as expected', function(done) { - var c = new Endpoint(util.log.child({ role: 'client' }), 'CLIENT', settings); - var s = new Endpoint(util.log.child({ role: 'client' }), 'SERVER', settings); + var c = new Endpoint({ + log: util.log.child({role: 'client'}), + role: 'CLIENT', + settings: settings + }); + var s = new Endpoint({ + log: util.log.child({role: 'client'}), + role: 'SERVER', + settings: settings + }); util.log.debug('Test initialization over, starting piping.'); c.pipe(s).pipe(c); @@ -30,8 +38,16 @@ describe('endpoint.js', function() { describe('`e`', function() { var format = endpoint.serializers.e; it('should assign a unique ID to each endpoint', function() { - var c = new Endpoint(util.log.child({ role: 'client' }), 'CLIENT', settings); - var s = new Endpoint(util.log.child({ role: 'client' }), 'SERVER', settings); + var c = new Endpoint({ + log: util.log.child({role: 'client'}), + role: 'CLIENT', + settings: settings + }); + var s = new Endpoint({ + log: util.log.child({role: 'client'}), + role: 'SERVER', + settings: settings + }); expect(format(c)).to.not.equal(format(s)); expect(format(c)).to.equal(format(c)); expect(format(s)).to.equal(format(s)); diff --git a/test/http.js b/test/http.js index 42bc68e..ddb4db5 100644 --- a/test/http.js +++ b/test/http.js @@ -506,6 +506,57 @@ describe('http.js', function() { }); }); }); + describe('raise events from all sent and received frames',function(){ + it('should work as expected', function (done) { + var path = '/x'; + var receiveFrameTypes = []; + var sentFrameTypes = []; + var message = 'Hello world!'; + var server = http2.createServer(options, function (request, response) { + expect(request.url).to.equal(path); + + request.on('data', util.noop); + request.once('end', function () { + response.end(message); + }); + }); + + server.listen(1245, function () { + var request = http2.request({ + protocol: 'https:', + host: 'localhost', + port: 1245, + path: path, + + onEndpoint: function (endpoint) { + endpoint.on('sentFrame', function (frame) { + sentFrameTypes.push(frame.type); + }); + + endpoint.on('receivedFrame', function (frame) { + receiveFrameTypes.push(frame.type); + + }); + } + }); + + request.end(); + request.on('response', function (response) { + + response.once('finish', function () { + expect(sentFrameTypes).to.include('SETTINGS'); + expect(sentFrameTypes).to.include('HEADERS'); + + expect(receiveFrameTypes).to.include('SETTINGS'); + expect(receiveFrameTypes).to.include('HEADERS'); + expect(receiveFrameTypes).to.include('DATA'); + + done(); + }); + }); + }); + }); + }); describe('server push', function() { it('should work as expected', function(done) { var path = '/x';