Skip to content

Commit

Permalink
cleanup after all AF requests, add handling for S->C responses
Browse files Browse the repository at this point in the history
  • Loading branch information
splitice committed Jul 30, 2024
1 parent b4b2b9a commit e4fd216
Showing 1 changed file with 110 additions and 36 deletions.
146 changes: 110 additions & 36 deletions lib/Af.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ function format_af_error(errText, statusCode) {
return ret
}

const AreqTimeout = 60000
const AreqTimeout = 60001
const AreqDoneSymbol = Symbol("AreqDone")
const EmptyFn = ()=>{}


function cIdToString(cId) {
Expand Down Expand Up @@ -131,19 +133,17 @@ class Af extends EventEmitter {
const evt = {zclData, msg}
this.emit('ZCL:incomingMsg', evt);

if(zclData.frameCntl.direction === 1) {
let prefix = 'ZCL:'+((frameType === 0 && zclData.cmdId !== 'defaultRsp') ? 'foundation' : 'functional')+':'

// for broadcast responses only
this.emit(prefix + msg.dstendpoint + ':' + zclData.seqNum, evt);
let prefix = `ZCL:dir${zclData.frameCntl.direction ? 1 : 0}:`

// for broadcast responses only
this.emit(prefix + msg.dstendpoint + ':' + zclData.seqNum, evt);

prefix += msg.srcaddr.toString(16) + ':' + msg.srcendpoint + ':'
prefix += msg.srcaddr.toString(16) + ':' + msg.srcendpoint + ':'

// { groupid, clusterid, srcaddr, srcendpoint, dstendpoint, wasbroadcast, linkquality, securityuse, timestamp, transseqnumber, zclMsg }
this.emit(prefix + msg.dstendpoint + ':' + zclData.seqNum, evt);
// { groupid, clusterid, srcaddr, srcendpoint, dstendpoint, wasbroadcast, linkquality, securityuse, timestamp, transseqnumber, zclMsg }
this.emit(prefix + msg.dstendpoint + ':' + zclData.seqNum, evt);

this.emit(prefix + zclData.seqNum, evt);
}
this.emit(prefix + zclData.seqNum, evt);

if (frameType === 0 && zclData.cmdId === 'report')
this.emit('ind:reported', { ep: remoteEp, cId: msg.clusterid, attrs: zclData.payload });
Expand Down Expand Up @@ -213,8 +213,7 @@ class Af extends EventEmitter {
return afParamsExt;
}


async send(srcEp, dstEp, cId, rawPayload, opt = {}) {
async _send(cancellationState, srcEp, dstEp, cId, rawPayload, opt = {}) {
// srcEp maybe a local app ep, or a remote ep
if (!srcEp) srcEp = this._controller.getCoord().getDelegator()
let controller = this._controller,
Expand Down Expand Up @@ -273,6 +272,13 @@ class Af extends EventEmitter {

let shouldResend = false
let signalTimeout = Q.defer()

cancellationState.addOnCancel(()=>{
signalTimeout.resolve(null)

controller.request('NWK', 'queuedDelete', {trans: afParams.trans, nwkaddr: dstAddr})
})

try {
let areqCancelable = areqC()

Expand Down Expand Up @@ -398,7 +404,11 @@ class Af extends EventEmitter {
if (!shouldIndirect) {
await indirectSendFn()
}
return await controller.indirectSend(indirectSendFn, { signalTimeout, indirectTimeout: this.indirectTimeout, dstAddr, retries: opt.retries })
const ret = await controller.indirectSend(indirectSendFn, { signalTimeout, indirectTimeout: this.indirectTimeout, dstAddr, retries: opt.retries })
if(ret) {
return ret
}
return {trans: afParams.trans}
} finally {
signalTimeout.resolve(null)
}
Expand Down Expand Up @@ -455,14 +465,14 @@ class Af extends EventEmitter {

zclBuffer = zcl.frame(frameCntl, manufCode, seqNum, cmd, zclData);

const nwkAddr = dstEp.nwkAddr
if (frameCntl.direction === 0 && !cfg.response) { // client-to-server, thus require getting the feedback response

const nwkAddr = dstEp.nwkAddr
assert(typeof nwkAddr === 'number')
if (srcEp === dstEp) // from remote to remote itself
mandatoryEvent = 'ZCL:foundation:' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + seqNum;
mandatoryEvent = 'ZCL:dir1:' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + seqNum;
else // from local ep to remote ep
mandatoryEvent = 'ZCL:foundation:' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + srcEp.getEpId() + ':' + seqNum;
mandatoryEvent = 'ZCL:dir1:' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + srcEp.getEpId() + ':' + seqNum;

const foundationCommand = ZclMeta.foundation.get(zclId.foundation(cmd).key)

Expand All @@ -471,17 +481,42 @@ class Af extends EventEmitter {

var afOptions = cfg.afOptions !== undefined ? cfg.afOptions : {}

let dataCnfResp = null
let rsp
try {
rsp = await this.send(srcEp, dstEp, cId, zclBuffer, afOptions)
} catch (err) {
if (areq) {
await Af.areqCancel(areq)
if(areq) {
const sendP = this.send(srcEp, dstEp, cId, zclBuffer, afOptions)

try {
rsp = await Promise.race([sendP, areq.then(()=> areq.then(()=>AreqDoneSymbol))])
} catch (err) {
if (areq) {
await Af.areqCancel(areq)
}
if (err.code == "ETIMEDOUT") {
err.message = "zclFoundation(" + cmd + ":" + seqNum + ") " + err.message
}
throw err
}
if (err.code == "ETIMEDOUT") {
err.message = "zclFoundation(" + cmd + ":" + seqNum + ") " + err.message

if(rsp === AreqDoneSymbol) {
//console.log(`AREQ ${mandatoryEvent} done before transaction was complete`)
sendP.catch(EmptyFn)
sendP.cancel()
} else {
dataCnfResp = rsp
}
} else {
try {
rsp = await this.send(srcEp, dstEp, cId, zclBuffer, afOptions)
} catch (err) {
if (areq) {
await Af.areqCancel(areq)
}
if (err.code == "ETIMEDOUT") {
err.message = "zclFoundation(" + cmd + ":" + seqNum + ") " + err.message
}
throw err
}
throw err
}

if (!mandatoryEvent) {
Expand All @@ -500,6 +535,10 @@ class Af extends EventEmitter {
throw err
}

if(dataCnfResp) {
await this._controller.request('NWK', 'queuedDelete', {trans: dataCnfResp.trans, nwkaddr: nwkAddr})
}

return rsp[0].zclData
}

Expand Down Expand Up @@ -548,30 +587,58 @@ class Af extends EventEmitter {

zclBuffer = zcl.frame(frameCntl, manufCode, seqNum, cmd, zclData, cId);

if (frameCntl.direction === 0 && !cfg.response) { // client-to-server, thus require getting the feedback response
const nwkAddr = dstEp.nwkAddr
if (!frameCntl.disDefaultRsp && ((frameCntl.direction === 0 && !cfg.response) || cfg.response)) { // client-to-server, thus require getting the feedback response

if (srcEp === dstEp) // from remote to remote itself
mandatoryEvent = 'ZCL:functional:' + dstEp.nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + seqNum;
mandatoryEvent = 'ZCL:dir' + (frameCntl.direction ? 0 : 1) + ':' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + seqNum;
else // from local ep to remote ep
mandatoryEvent = 'ZCL:functional:' + dstEp.nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + srcEp.getEpId() + ':' + seqNum;
mandatoryEvent = 'ZCL:dir' + (frameCntl.direction ? 0 : 1) + ':' + nwkAddr.toString(16) + ':' + dstEp.getEpId() + ':' + srcEp.getEpId() + ':' + seqNum;


areq = this.waitFor(mandatoryEvent)
}

var afOptions = cfg.afOptions !== undefined ? cfg.afOptions : {}



let dataCnfResp = null
let rsp
try {
rsp = await this.send(srcEp, dstEp, cId, zclBuffer, afOptions)
} catch (err) {
if (areq) {
await Af.areqCancel(areq)
if(areq) {
const sendP = this.send(srcEp, dstEp, cId, zclBuffer, afOptions)

try {
rsp = await Promise.race([sendP, areq.then(()=> areq.then(()=>AreqDoneSymbol))])
} catch (err) {
if (areq) {
await Af.areqCancel(areq)
}
if (err.code == "ETIMEDOUT") {
err.message = "zclFunctional(" + cmd + ":" + seqNum + ") " + err.message
}
throw err
}
if (err.code == "ETIMEDOUT") {
err.message = "zclFunctional(" + cmd + ":" + seqNum + ") " + err.message

if(rsp === AreqDoneSymbol) {
//console.log(`AREQ ${mandatoryEvent} done before transaction was complete`)
sendP.catch(EmptyFn)
sendP.cancel()
} else {
dataCnfResp = rsp
}
} else {
try {
rsp = await this.send(srcEp, dstEp, cId, zclBuffer, afOptions)
} catch (err) {
if (areq) {
await Af.areqCancel(areq)
}
if (err.code == "ETIMEDOUT") {
err.message = "zclFunctional(" + cmd + ":" + seqNum + ") " + err.message
}
throw err
}
throw err
}

if (!areq) {
Expand All @@ -582,6 +649,7 @@ class Af extends EventEmitter {
rsp = await Q.timeout(areq, AreqTimeout)
} catch (err) {
if (err.code == "ETIMEDOUT") {
console.log({mandatoryEvent})
if (areq) {
await Af.areqCancel(areq)
}
Expand All @@ -590,6 +658,10 @@ class Af extends EventEmitter {
throw err
}

if(dataCnfResp) {
await this._controller.request('NWK', 'queuedDelete', {trans: dataCnfResp.trans, nwkaddr: nwkAddr})
}

return rsp[0].zclData
}

Expand Down Expand Up @@ -871,6 +943,8 @@ Af.msgHandlers = [
{ evt: 'AF:incomingMsgExt', hdlr: 'incomingMsgExt' }
];

Af.prototype.send = Q.canceller(Af.prototype._send)

/*************************************************************************************************/
/*** module.exports ***/
/*************************************************************************************************/
Expand Down

0 comments on commit e4fd216

Please sign in to comment.