From 0ddc907091f88553b0c65f04b2c5e940701b72b0 Mon Sep 17 00:00:00 2001 From: hadiChan <861883474@qq.com> Date: Thu, 8 Jun 2017 21:17:56 +0800 Subject: [PATCH] fixbug --- lib/MessageEventEmitter.js | 24 +++ lib/serverRpcEvent.js | 1 + middleware/restfulPushServer/PushBaseEvent.js | 30 +-- middleware/restfulPushServer/PushEvent.js | 200 ++++++++++++------ 4 files changed, 176 insertions(+), 79 deletions(-) diff --git a/lib/MessageEventEmitter.js b/lib/MessageEventEmitter.js index b458fa6..7847ef1 100644 --- a/lib/MessageEventEmitter.js +++ b/lib/MessageEventEmitter.js @@ -7,14 +7,21 @@ const logger = require('./logger.js') class MessageEventEmitter extends EventEmitter { constructor (gwcid) { super() + this.isConstructored = true this.gwcid = gwcid this.processRequest = Object.create(null) } isWsOpen () { + if (!this.isConstructored) { + return false + } return this.ws && this.ws.readyState === WebSocket.OPEN } // 收到消息的时候 onMessage (body) { + if (!this.isConstructored) { + return false + } if (this.ws.readyState !== WebSocket.OPEN) { logger.error(`${this.gwcid}Has been closed, on onMessage`) return @@ -44,6 +51,9 @@ class MessageEventEmitter extends EventEmitter { } // 处理请求 request (headers, body, start) { + if (!this.isConstructored) { + return Promise.reject(new Error('Has been destroyed')) + } var requestId requestId = headers.request_id = headers.request_id || workerUtil.createRequestId() return ddvRowraw.stringifyPromise(headers, body, start) @@ -59,6 +69,9 @@ class MessageEventEmitter extends EventEmitter { } // 收到请求结果-处理响应 onMessageResponse (res) { + if (!this.isConstructored) { + return false + } var requestId, code, e, t if (!(res.headers && (requestId = res.headers.request_id || res.headers.requestId || res.headers.requestid))) { logger.error(res) @@ -78,5 +91,16 @@ class MessageEventEmitter extends EventEmitter { } } } + // 销毁 + destroy () { + process.nextTick(() => { + var key + for (key in this) { + if (!this.hasOwnProperty(key)) continue + delete this[key] + } + key = void 0 + }) + } } module.exports = MessageEventEmitter diff --git a/lib/serverRpcEvent.js b/lib/serverRpcEvent.js index d186bf9..c12f76a 100644 --- a/lib/serverRpcEvent.js +++ b/lib/serverRpcEvent.js @@ -248,6 +248,7 @@ class ServerRpcEvent { var indexs // 清理定时器 Array.isArray(this.saveGwcidTimers) && this.saveGwcidTimers.forEach(timer => clearTimeout(timer)) + this.saveGwcidTimers = [] if (index !== void 0 && isSave !== true) { // 加入保存gwcid 的 indexs 中 this.saveGwcidListsIndexs.indexOf(index) > -1 || this.saveGwcidListsIndexs.push(index) diff --git a/middleware/restfulPushServer/PushBaseEvent.js b/middleware/restfulPushServer/PushBaseEvent.js index 06c55fa..8b1ee63 100644 --- a/middleware/restfulPushServer/PushBaseEvent.js +++ b/middleware/restfulPushServer/PushBaseEvent.js @@ -1,6 +1,7 @@ 'use strict' const logger = require('../../lib/logger.js') const MessageEventEmitter = require('../../lib/MessageEventEmitter.js') +const PushError = require('./PushError') class PushBaseEvent extends MessageEventEmitter { constructor (ws, req, options) { super(req.gwcid) @@ -21,12 +22,14 @@ class PushBaseEvent extends MessageEventEmitter { // 初始化 wsEventBaseInit () { this.ws.on('message', this.onMessage.bind(this)) - this.ws.on('close', this.onClose.bind(this)) // 获取文件事件 this.on('protocol::push', this.onMessagePush.bind(this)) } // 推送类型的信息 onMessagePush (res) { + if (!this.isConstructored) { + return false + } if (!(res.method && res.path && this.emit(['push', res.method.toLowerCase(), res.path], res.headers, res.body, res))) { logger.error(`[gwcid:${this.gwcid}]onMessagePush error`) this.send(`Push request not found, not find method:${res.method}`) @@ -34,13 +37,12 @@ class PushBaseEvent extends MessageEventEmitter { logger.error(e) }) } - } - // 收到消息的时候 - onClose () { - } // 关闭ws close () { + if (!this.isConstructored) { + return Promise.reject(new PushError('Has been destroyed', 'HAS_BEEN_DESTROYED')) + } return new Promise((resolve, reject) => { this.ws.close.apply(this.ws, arguments) resolve() @@ -48,6 +50,9 @@ class PushBaseEvent extends MessageEventEmitter { } // 发送 send (data, options) { + if (!this.isConstructored) { + return Promise.reject(new PushError('Has been destroyed', 'HAS_BEEN_DESTROYED')) + } return new Promise((resolve, reject) => { return this.sendWs(data, options, e => { e ? reject(e) : resolve() @@ -56,23 +61,22 @@ class PushBaseEvent extends MessageEventEmitter { } // 发送ws sendWs () { + if (!this.isConstructored) { + return false + } return this.ws.send.apply(this.ws, arguments) } // 销毁 destroy () { + if (!this.isConstructored) { + return false + } this.close() .catch(e => { logger.error(`[gwcid:${this.gwcid}] Failed to close at the time of destroy`) }) .then(() => { - process.nextTick(() => { - var key - for (key in this) { - if (!this.hasOwnProperty(key)) continue - delete this[key] - } - key = void 0 - }) + super.destroy() }) } } diff --git a/middleware/restfulPushServer/PushEvent.js b/middleware/restfulPushServer/PushEvent.js index 8d82bdf..3b9d334 100644 --- a/middleware/restfulPushServer/PushEvent.js +++ b/middleware/restfulPushServer/PushEvent.js @@ -17,6 +17,7 @@ class PushEvent extends PushBaseEvent { super(ws, req, serverRpcEvent.options) // 如果队列没有这个对象就加入这个对象 wsConnQueue[this.connId] = wsConnQueue[this.connId] || this + this.pushOpenUpgradeTimer = [] this.setConfigInfo() this.pushEventInit() this.serverRpcEvent = serverRpcEvent @@ -34,6 +35,9 @@ class PushEvent extends PushBaseEvent { } // 设置配置信息 setConfigInfo () { + if (!this.isConstructored) { + return false + } // 解析url let urlObj = url.parse(this.options.rpcEvent.apiUrl) this.options.apiUrlOpt = Object.create(null) @@ -44,78 +48,69 @@ class PushEvent extends PushBaseEvent { } // 关闭推送 pushClose (headers, body, res) { - // 请求id - var requestId - // 头部信息 + if (!this.isConstructored) { + return + } var headersObj = Object.create(null) - var headersString - // 配置信息 - var opt = Object.create(null) - // 地址信息 - var urlObj = url.parse(this.options.rpcEvent.apiUrl + this.options.rpcEvent.onRtmpBoxHeartbeat) - // 是否是Buffer - var isBuffer = res.headers.bodytype === 'buffer' - var promise + var isBuffer - requestId = res.headers && (res.headers.request_id || res.headers.requestId || res.headers.requestid) + if (!this.isWsOpen()) { + logger.error(new PushError(`${this.gwcid}Has been closed, on pushOpen`, 'HAS_BEEN_CLOSED')) + return + } + // 请求id + headersObj.request_id = res.headers && (res.headers.request_id || res.headers.requestId || res.headers.requestid) // 全局链接id headersObj.gwcid = this.gwcid // 服务器唯一识别号 headersObj.serverGuid = this.serverGuid - headersString = querystring.stringify(headersObj) + // 判断当前是否使用buffer模式返回 + isBuffer = this.bodytype === 'buffer' || (this.bodytype === 'auto' && res.headers.bodytype === 'buffer') - if (!this.isWsOpen()) { - logger.error(new PushError(`${this.gwcid}Has been closed, on pushClose`, 'HAS_BEEN_CLOSED')) + if (!(res.headers && headersObj.request_id)) { + logger.error(new PushError('requestId Not FOUND')) return } - if (res.headers && requestId) { - promise = ddvRowraw.stringifyPromise( - { - 'request_id': requestId - }, + request( + [this.options.rpcEvent.apiUrl, this.options.rpcEvent.onPushClose], + querystring.stringify({ + // 全局链接id + gwcid: this.gwcid, + // 服务器唯一识别号 + serverGuid: this.serverGuid + }), + 'PUT' + ) + .then(res => { + this.clearPushOpenUpgradeTimeout() + + this.isPushOpening = false + + return ddvRowraw.stringifyPromise( + headersObj, (isBuffer ? Buffer.alloc(0) : ''), - 'PUSH/1.0 200 OK' + 'PUSH/1.0 200 PUSH_CLOSEED' ) .then(raw => { - this.send(raw) - isBuffer = void 0 + headersObj = isBuffer = void 0 + return this.send(raw) }) .catch(e => { - logger.error(new PushError('error')) - }) - } else { - promise = Promise.resolve() - } - - promise.then(() => { - if (!this.isPushOpened) { - return - } - this.isPushOpened = false - - opt.method = 'PUT' - opt.path = urlObj.path || '/' - opt.headers = Object.create(null) - opt.headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' - opt.headers['Content-Length'] = Buffer.byteLength(headersString, 'utf8') - Object.assign(opt, this.options.apiUrlOpt) - - return request(opt) - .then(({headers, statusCode, statusMessage, body}) => { - if (statusCode >= 200 && statusCode < 300) { - return {headers, body} - } else { - logger.error(statusCode) - logger.error(statusMessage) - logger.error(body.toString()) - return Promise.reject(new PushError(statusMessage, (statusMessage || '').toUpperCase())) - } + logger.error('error:send data to client') + logger.error(e) }) }) + .catch(e => { + logger.error('onPushClose fail') + logger.error(e) + }) } // 打开推送 pushOpen (headers, body, res) { + if (!this.isConstructored) { + return + } var headersObj = Object.create(null) var statR = '' var rawR = '' @@ -184,7 +179,7 @@ class PushEvent extends PushBaseEvent { headersObj = statR = rawR = void 0 } else { - this.pushPing(headers, body, res) + this.pushPing() .then(res => { this.isPushOpened = true ddvRowraw.stringifyPromise( @@ -201,24 +196,40 @@ class PushEvent extends PushBaseEvent { }) .catch(e => { // 发送到客户端,发送失败 - console.log(321321321231, e) + ddvRowraw.stringifyPromise( + headersObj, + rawR, + `PUSH/1.0 ${e.statusCode || 500} ${e.errorId || e.statusMessage || 'UNKNOWN_ERROR'}` + ) + .then(raw => this.send(raw)) + .catch(e => { + logger.error('error:send data to client') + logger.error(e) + }) }) } } - pushPing (headers, body, res) { + pushPing () { + if (!this.isConstructored) { + return Promise.reject(new PushError('Has been destroyed', 'HAS_BEEN_DESTROYED')) + } var opt = Object.create(null) + let isBuffer = this.bodytype === 'buffer' if (!this.pingDataOptSign) { - this.getPingData(headers, body, res) + this.getPingData() } Object.assign(opt, this.pingDataOptSign) // 生成唯一请求id opt.request_id = workerUtil.createRequestId() - // 获取onPushOpen地址 - opt.path = this.options.rpcEvent.onPushOpen + // 获取地址并判断是否打开过,是:onPushOpenUpgrade地址,否onPushOpen地址 + opt.path = this.isPushOpened ? this.options.rpcEvent.onPushOpenUpgrade : this.options.rpcEvent.onPushOpen - return this.request(opt, (res.headers.bodytype === 'buffer' ? (Buffer.alloc(0)) : ''), 'PING /v1_0/sign PUSH/1.0') + return this.request(opt, (isBuffer ? (Buffer.alloc(0)) : ''), 'PING /v1_0/sign PUSH/1.0') .then(res => { + if (!this.isConstructored) { + return Promise.reject(new PushError('Has been destroyed', 'HAS_BEEN_DESTROYED')) + } let pingDataHKey = Object.keys(this.pingDataH || []) pingDataHKey.forEach((key, index) => { @@ -238,7 +249,7 @@ class PushEvent extends PushBaseEvent { logger.error(new PushError(`headers sign fail, ${key}, ${res.headers[lowkey]}, ${res.headers}`, 'HEADERS_SIGN_FAIL')) return false } - lowkey = void 0 + lowkey = isBuffer = void 0 }) // 撮合发送协议 端口 主机 信息 Object.assign(opt, this.options.apiUrlOpt) @@ -261,19 +272,51 @@ class PushEvent extends PushBaseEvent { } return request(opt, this.pingDataRaw) .then(({headers, statusCode, statusMessage, body}) => { + if (!this.isConstructored) { + return Promise.reject(new PushError('Has been destroyed', 'HAS_BEEN_DESTROYED')) + } + var bodyObj, time // 判断发送请求是否正常 if (statusCode >= 200 && statusCode < 300) { + try { + bodyObj = JSON.parse(body) + } catch (e) { + return Promise.reject(e) + } + // 获取检查时间 + time = bodyObj.data.upgradeTime || this.options.rpcEvent.upgradeTime || 5 * 1000 + this.clearPushOpenUpgradeTimeout() + this.pushOpenUpgradeTimer.push(setTimeout(() => { + this.pushPing() + .catch(e => { + let opt = { + request_id: workerUtil.createRequestId() + } + this.request(opt, (this.bodytype === 'buffer' ? (Buffer.alloc(0)) : ''), 'CLOSE /v1_0/init PUSH/1.0') + .catch(e => { + return this.close() + }) + }) + }, time)) + + bodyObj = time = void 0 return {headers, body, res} } else { logger.error(statusCode) logger.error(statusMessage) logger.error(body.toString()) - return Promise.reject(new PushError(statusMessage, (statusMessage || '').toUpperCase())) + let e = new PushError(statusMessage, (statusMessage || '').toUpperCase()) + e.errorId = e.statusMessage = statusMessage + e.statusCode = statusCode + return Promise.reject(e) } }) }) } - getPingData (headers, body, res) { + getPingData () { + if (!this.isConstructored) { + return + } // 上线时间-是第一次连接push-open的时间戳 this.pushTimeOnLine = this.pushTimeOnLine || workerUtil.time() // 构建发送php的参数对象 @@ -283,7 +326,7 @@ class PushEvent extends PushBaseEvent { // 本推送服务器的 guid this.pingData.serverGuid = this.serverGuid // 长连接连接类型 - this.pingData.bodyType = res.headers.bodytype + this.pingData.bodyType = this.bodytype // 第一次上线时间 this.pingData.timeOnline = this.pushTimeOnLine // 把参数序列化转为buffer缓存区数据 @@ -314,11 +357,12 @@ class PushEvent extends PushBaseEvent { this.pingDataOptSign.method = 'PUT' // 头 this.pingDataOptSign.headers = JSON.stringify(this.pingDataH) - - headers = res = body = void 0 } // 打开推送ping pushPingHeartbeat (headers, body, res) { + if (!this.isConstructored) { + return + } var requestId if (!this.isWsOpen()) { logger.error(new PushError(`${this.gwcid}Has been closed, on pushPingHeartbeat`, 'HAS_BEEN_CLOSED')) @@ -346,6 +390,9 @@ class PushEvent extends PushBaseEvent { } // 代理访问api服务器 onApiModelProxy (res) { + if (!this.isConstructored) { + return + } var requestId, resBody if (!this.isWsOpen()) { logger.error(new PushError(`${this.gwcid}Has been closed, on onApiModelProxy`, 'HAS_BEEN_CLOSED')) @@ -359,6 +406,9 @@ class PushEvent extends PushBaseEvent { apiModelProxy(res, this.options) // 有请求结果 .then(({headers, statusCode, statusMessage, body}) => { + if (!this.isConstructored) { + return Promise.reject(new PushError('Has been destroyed', 'HAS_BEEN_DESTROYED')) + } if (Buffer.isBuffer(resBody)) { resBody = Buffer.concat([resBody, body]) } else { @@ -373,6 +423,9 @@ class PushEvent extends PushBaseEvent { }) // 中途异常 .catch(e => { + if (!this.isConstructored) { + return Promise.reject(new PushError('Has been destroyed', 'HAS_BEEN_DESTROYED')) + } // 序列化流 return ddvRowraw.stringifyPromise({ 'request_id': requestId, @@ -390,6 +443,9 @@ class PushEvent extends PushBaseEvent { } // 发送信息个用户,信息来源rpc sendMsgToUser (headers, body) { + if (!this.isConstructored) { + return Promise.reject(new PushError('Has been destroyed', 'HAS_BEEN_DESTROYED')) + } if (!this.isWsOpen()) { let e = new PushError(`${this.gwcid}Has been closed, on sendMsgToUser`, 'HAS_BEEN_CLOSED') e.errorId = 'HAS_BEEN_CLOSED' @@ -403,6 +459,7 @@ class PushEvent extends PushBaseEvent { body = body.toString('utf-8') } var pushPath = '/' + if (headers && headers['push-path']) { pushPath = (headers['push-path'].charAt(0) === '/' ? '' : '/') + headers['push-path'] } @@ -412,13 +469,24 @@ class PushEvent extends PushBaseEvent { return Promise.reject(new PushError('send to user fail', 'SEND_TO_USER_FAIL')) }) } - // 收到消息的时候 + // 退出 onWsConnQueueClose () { + if (!this.isConstructored) { + return + } if (wsConnQueue[this.connId]) { workerUtil.isFunction(wsConnQueue[this.connId].destroy) && wsConnQueue[this.connId].destroy() delete wsConnQueue[this.connId] } } + clearPushOpenUpgradeTimeout () { + if (!this.isConstructored) { + return + } + // 清空 + Array.isArray(this.pushOpenUpgradeTimer) && this.pushOpenUpgradeTimer.forEach(timer => clearTimeout(timer)) + this.pushOpenUpgradeTimer = [] + } } module.exports = PushEvent