diff --git a/lib/Af.js b/lib/Af.js index fa63e14..17dabb8 100644 --- a/lib/Af.js +++ b/lib/Af.js @@ -32,7 +32,9 @@ function format_af_error(errText, statusCode) { return ret } -const AreqTimeout = 60000 +const AreqTimeout = 60001 +const AreqDoneSymbol = Symbol("AreqDone") +const EmptyFn = ()=>{} function cIdToString(cId) { @@ -131,19 +133,17 @@ class Af extends EventEmitter { const evt = {zclData, msg} this.emit('ZCL:incomingMsg', evt); - if(zclData.frameCntl.direction === 1) { - let prefix = 'ZCL:'+((frameType === 0 && zclData.cmdId !== 'defaultRsp') ? 'foundation' : 'functional')+':' - - // for broadcast responses only - this.emit(prefix + msg.dstendpoint + ':' + zclData.seqNum, evt); + let prefix = `ZCL:dir${zclData.frameCntl.direction ? 1 : 0}:` + + // for broadcast responses only + this.emit(prefix + msg.dstendpoint + ':' + zclData.seqNum, evt); - prefix += msg.srcaddr.toString(16) + ':' + msg.srcendpoint + ':' + prefix += msg.srcaddr.toString(16) + ':' + msg.srcendpoint + ':' - // { groupid, clusterid, srcaddr, srcendpoint, dstendpoint, wasbroadcast, linkquality, securityuse, timestamp, transseqnumber, zclMsg } - this.emit(prefix + msg.dstendpoint + ':' + zclData.seqNum, evt); + // { groupid, clusterid, srcaddr, srcendpoint, dstendpoint, wasbroadcast, linkquality, securityuse, timestamp, transseqnumber, zclMsg } + this.emit(prefix + msg.dstendpoint + ':' + zclData.seqNum, evt); - this.emit(prefix + zclData.seqNum, evt); - } + this.emit(prefix + zclData.seqNum, evt); if (frameType === 0 && zclData.cmdId === 'report') this.emit('ind:reported', { ep: remoteEp, cId: msg.clusterid, attrs: zclData.payload }); @@ -213,8 +213,7 @@ class Af extends EventEmitter { return afParamsExt; } - - async send(srcEp, dstEp, cId, rawPayload, opt = {}) { + async _send(cancellationState, srcEp, dstEp, cId, rawPayload, opt = {}) { // srcEp maybe a local app ep, or a remote ep if (!srcEp) srcEp = this._controller.getCoord().getDelegator() let controller = this._controller, @@ -273,6 +272,13 @@ class Af extends EventEmitter { let shouldResend = false let signalTimeout = Q.defer() + + cancellationState.addOnCancel(()=>{ + signalTimeout.resolve(null) + + controller.request('NWK', 'queuedDelete', {trans: afParams.trans, nwkaddr: dstAddr}) + }) + try { let areqCancelable = areqC() @@ -398,7 +404,11 @@ class Af extends EventEmitter { if (!shouldIndirect) { await indirectSendFn() } - return await controller.indirectSend(indirectSendFn, { signalTimeout, indirectTimeout: this.indirectTimeout, dstAddr, retries: opt.retries }) + const ret = await controller.indirectSend(indirectSendFn, { signalTimeout, indirectTimeout: this.indirectTimeout, dstAddr, retries: opt.retries }) + if(ret) { + return ret + } + return {trans: afParams.trans} } finally { signalTimeout.resolve(null) } @@ -455,14 +465,14 @@ class Af extends EventEmitter { zclBuffer = zcl.frame(frameCntl, manufCode, seqNum, cmd, zclData); + const nwkAddr = dstEp.nwkAddr if (frameCntl.direction === 0 && !cfg.response) { // client-to-server, thus require getting the feedback response - const nwkAddr = dstEp.nwkAddr assert(typeof nwkAddr === 'number') if (srcEp === dstEp) // from remote to remote itself - mandatoryEvent = 'ZCL:foundation:' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + seqNum; + mandatoryEvent = 'ZCL:dir1:' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + seqNum; else // from local ep to remote ep - mandatoryEvent = 'ZCL:foundation:' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + srcEp.getEpId() + ':' + seqNum; + mandatoryEvent = 'ZCL:dir1:' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + srcEp.getEpId() + ':' + seqNum; const foundationCommand = ZclMeta.foundation.get(zclId.foundation(cmd).key) @@ -471,17 +481,42 @@ class Af extends EventEmitter { var afOptions = cfg.afOptions !== undefined ? cfg.afOptions : {} + let dataCnfResp = null let rsp - try { - rsp = await this.send(srcEp, dstEp, cId, zclBuffer, afOptions) - } catch (err) { - if (areq) { - await Af.areqCancel(areq) + if(areq) { + const sendP = this.send(srcEp, dstEp, cId, zclBuffer, afOptions) + + try { + rsp = await Promise.race([sendP, areq.then(()=> areq.then(()=>AreqDoneSymbol))]) + } catch (err) { + if (areq) { + await Af.areqCancel(areq) + } + if (err.code == "ETIMEDOUT") { + err.message = "zclFoundation(" + cmd + ":" + seqNum + ") " + err.message + } + throw err } - if (err.code == "ETIMEDOUT") { - err.message = "zclFoundation(" + cmd + ":" + seqNum + ") " + err.message + + if(rsp === AreqDoneSymbol) { + //console.log(`AREQ ${mandatoryEvent} done before transaction was complete`) + sendP.catch(EmptyFn) + sendP.cancel() + } else { + dataCnfResp = rsp + } + } else { + try { + rsp = await this.send(srcEp, dstEp, cId, zclBuffer, afOptions) + } catch (err) { + if (areq) { + await Af.areqCancel(areq) + } + if (err.code == "ETIMEDOUT") { + err.message = "zclFoundation(" + cmd + ":" + seqNum + ") " + err.message + } + throw err } - throw err } if (!mandatoryEvent) { @@ -500,6 +535,10 @@ class Af extends EventEmitter { throw err } + if(dataCnfResp) { + await this._controller.request('NWK', 'queuedDelete', {trans: dataCnfResp.trans, nwkaddr: nwkAddr}) + } + return rsp[0].zclData } @@ -548,12 +587,13 @@ class Af extends EventEmitter { zclBuffer = zcl.frame(frameCntl, manufCode, seqNum, cmd, zclData, cId); - if (frameCntl.direction === 0 && !cfg.response) { // client-to-server, thus require getting the feedback response + const nwkAddr = dstEp.nwkAddr + if (!frameCntl.disDefaultRsp && ((frameCntl.direction === 0 && !cfg.response) || cfg.response)) { // client-to-server, thus require getting the feedback response if (srcEp === dstEp) // from remote to remote itself - mandatoryEvent = 'ZCL:functional:' + dstEp.nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + seqNum; + mandatoryEvent = 'ZCL:dir' + (frameCntl.direction ? 0 : 1) + ':' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + seqNum; else // from local ep to remote ep - mandatoryEvent = 'ZCL:functional:' + dstEp.nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + srcEp.getEpId() + ':' + seqNum; + mandatoryEvent = 'ZCL:dir' + (frameCntl.direction ? 0 : 1) + ':' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + srcEp.getEpId() + ':' + seqNum; areq = this.waitFor(mandatoryEvent) @@ -561,17 +601,44 @@ class Af extends EventEmitter { var afOptions = cfg.afOptions !== undefined ? cfg.afOptions : {} + + + let dataCnfResp = null let rsp - try { - rsp = await this.send(srcEp, dstEp, cId, zclBuffer, afOptions) - } catch (err) { - if (areq) { - await Af.areqCancel(areq) + if(areq) { + const sendP = this.send(srcEp, dstEp, cId, zclBuffer, afOptions) + + try { + rsp = await Promise.race([sendP, areq.then(()=> areq.then(()=>AreqDoneSymbol))]) + } catch (err) { + if (areq) { + await Af.areqCancel(areq) + } + if (err.code == "ETIMEDOUT") { + err.message = "zclFunctional(" + cmd + ":" + seqNum + ") " + err.message + } + throw err } - if (err.code == "ETIMEDOUT") { - err.message = "zclFunctional(" + cmd + ":" + seqNum + ") " + err.message + + if(rsp === AreqDoneSymbol) { + //console.log(`AREQ ${mandatoryEvent} done before transaction was complete`) + sendP.catch(EmptyFn) + sendP.cancel() + } else { + dataCnfResp = rsp + } + } else { + try { + rsp = await this.send(srcEp, dstEp, cId, zclBuffer, afOptions) + } catch (err) { + if (areq) { + await Af.areqCancel(areq) + } + if (err.code == "ETIMEDOUT") { + err.message = "zclFunctional(" + cmd + ":" + seqNum + ") " + err.message + } + throw err } - throw err } if (!areq) { @@ -582,6 +649,7 @@ class Af extends EventEmitter { rsp = await Q.timeout(areq, AreqTimeout) } catch (err) { if (err.code == "ETIMEDOUT") { + console.log({mandatoryEvent}) if (areq) { await Af.areqCancel(areq) } @@ -590,6 +658,10 @@ class Af extends EventEmitter { throw err } + if(dataCnfResp) { + await this._controller.request('NWK', 'queuedDelete', {trans: dataCnfResp.trans, nwkaddr: nwkAddr}) + } + return rsp[0].zclData } @@ -871,6 +943,8 @@ Af.msgHandlers = [ { evt: 'AF:incomingMsgExt', hdlr: 'incomingMsgExt' } ]; +Af.prototype.send = Q.canceller(Af.prototype._send) + /*************************************************************************************************/ /*** module.exports ***/ /*************************************************************************************************/