Skip to content

Commit

Permalink
Merge pull request #1 from RV-Argonaut/fix-destroy
Browse files Browse the repository at this point in the history
fix: properly destroy streams when an error occurs
  • Loading branch information
ferm10n authored Mar 28, 2023
2 parents eeb838d + c93856c commit 18b7655
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 79 deletions.
19 changes: 7 additions & 12 deletions src/MIDParser.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ class MIDParser extends Transform {
* This transforms MID.payload (Buffer) in a MID.payload (Object).
* This class uses the implemented MIDs in 'node-open-protocol/src/mid' for parsing MIDs.
* In case of a not implemented MID, MID.payload is converted in to a String.
* @param opts parameters to Transform stream
* @param {Omit<import('stream').TransformOptions, 'writableObjectMode' | 'readableObjectMode'>} opts parameters to Transform stream
*/
constructor(opts) {
constructor(opts = {}) {
debug("new MIDParser");

opts = opts || {};

opts.writableObjectMode = true;
opts.readableObjectMode = true;

super(opts);
super({
...opts,
writableObjectMode: true,
readableObjectMode: true,
});
}

_transform(chunk, encoding, cb) {
Expand Down Expand Up @@ -67,10 +66,6 @@ class MIDParser extends Transform {
cb();
}
}

_destroy() {
//no-op, needed to handle older node versions
}
}

module.exports = MIDParser;
45 changes: 22 additions & 23 deletions src/MIDSerializer.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,37 @@ class MIDSerializer extends Transform {
* This transforms MID.payload (object) in MID.payload (Buffer).
* This class uses the implemented MIDs in 'node-open-protocol/src/mid' for serializing MIDs.
* In case of not a implemented MID, MID.payload (String | Buffer) is converted in a Buffer.
* @param opts parameters to Transform stream
* @param {Omit<import('stream').TransformOptions, 'writableObjectMode' | 'readableObjectMode'>} opts parameters to Transform stream
*/
constructor(opts) {
constructor(opts = {}) {
debug("new MIDSerializer");

opts = opts || {};
opts.writableObjectMode = true;
opts.readableObjectMode = true;
super(opts);
super({
...opts,
writableObjectMode: true,
readableObjectMode: true,
});
}

_transform(chunk, encoding, cb) {
debug("MIDSerializer _transform", chunk);

if(mids[chunk.mid]){

mids[chunk.mid].serializer(chunk, null, (err, data) => {

if(err){
cb(new Error(`Error on serializer [${err}]`));
debug('MIDSerializer _transform err-serializer', chunk, err);
return;
}

this.push(data);
cb();
});

try {
mids[chunk.mid].serializer(chunk, null, (err, data) => {
if(err){
cb(new Error(`Error on serializer [${err}]`));
debug('MIDSerializer _transform err-serializer', chunk, err);
return;
}

this.push(data);
cb();
});
} catch (err) {
cb(new Error(`Unexpected error on serializer [${err}]`));
debug('MIDSerializer _transform err-serializer', chunk, err);
}
}else{

if(chunk.payload === undefined){
Expand All @@ -66,10 +69,6 @@ class MIDSerializer extends Transform {
cb();
}
}

_destroy() {
//no-op, needed to handle older node versions
}
}

module.exports = MIDSerializer;
55 changes: 34 additions & 21 deletions src/linkLayer.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,59 @@ class LinkLayer extends Duplex {

/**
* Create a new object LinkLayer
*
* @typedef {{
* stream: import('net').Socket,
* timeOut?: number,
* retryTimes?: number,
* rawData?: boolean,
* disableMidParsing?: Record<number, boolean>,
* }} LinkLayerOptions
*
* @throws {error}
* @param {object} opts
* @param {stream} opts.stream
* @param {number} opts.timeOut
* @param {number} opts.retryTimes
* @param {boolean} opts.rawData
* @param {boolean} opts.disableMidParsing
* @param {Omit<import('stream').DuplexOptions, 'readableObjectMode' | 'writableObjectMode'> & LinkLayerOptions} [opts]
*/
constructor(opts) {
debug("new LinkLayer", opts);

opts = opts || {};
opts.readableObjectMode = true;
opts.writableObjectMode = true;

super(opts);

if (opts.stream === undefined) {
if (!opts || !opts.stream) {
debug("LinkLayer constructor err-socket-undefined");
throw new Error("[LinkLayer] Socket is undefined");
}

/**
* @type {import('stream').DuplexOptions & Required<LinkLayerOptions>}
*/
const _opts = {
stream: opts.stream,
timeOut: opts.timeOut || 3000,
retryTimes: opts.retryTimes || 3,
rawData: opts.rawData || false,
disableMidParsing: opts.disableMidParsing || {},

readableObjectMode: true,
writableObjectMode: true,
};
debug("new LinkLayer", _opts);

super(_opts);

//Create instances of manipulators
this.opParser = new OpenProtocolParser({
rawData: opts.rawData
rawData: _opts.rawData
});
this.opSerializer = new OpenProtocolSerializer();
this.midParser = new MIDParser();
this.midSerializer = new MIDSerializer();
//Create instances of manipulators

this.stream = opts.stream;
this.timeOut = opts.timeOut || 3000;
this.retryTimes = opts.retryTimes || 3;
this.stream = _opts.stream;
this.timeOut = _opts.timeOut;
this.retryTimes = _opts.retryTimes;

//Raw Data
this.rawData = opts.rawData || false;
this.rawData = _opts.rawData;

//Disable MID Parsing
this.disableMidParsing = opts.disableMidParsing || {};
this.disableMidParsing = _opts.disableMidParsing;

this.linkLayerActive = false;
this.partsOfMessage = [];
Expand Down
19 changes: 9 additions & 10 deletions src/openProtocolParser.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ class OpenProtocolParser extends Transform {
* @class OpenProtocolParser
* @description This class performs the parsing of the MID header.
* This transforms MID (Buffer) in MID (Object).
* @param {Object} opts an object with the option passed to the constructor
* @param {Partial<Omit<import('stream').TransformOptions, 'readableObjectMode' | 'decodeStrings'> & {
* rawData?: boolean
* }>} opts an object with the option passed to the constructor
*/
constructor(opts) {
opts = opts || {};
opts.readableObjectMode = true;
opts.decodeStrings = true;
constructor(opts = {}) {

super(opts);
super({
...opts,
decodeStrings: true,
readableObjectMode: true,
});

this.rawData = opts.rawData || false;
this._nBuffer = null;
Expand Down Expand Up @@ -237,10 +240,6 @@ class OpenProtocolParser extends Transform {
}
cb();
}

_destroy() {
//no-op, needed to handle older node versions
}
}

module.exports = OpenProtocolParser;
15 changes: 6 additions & 9 deletions src/openProtocolSerializer.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ class OpenProtocolSerializer extends Transform {
* @class OpenProtocolSerializer
* @description This class performs the serialization of the MID header.
* This transforms MID (object) in MID (Buffer).
* @param {Object} opts an object with the option passed to the constructor
* @param {Omit<import('stream').TransformOptions, 'writableObjectMode'>} opts an object with the option passed to the constructor
*/
constructor(opts) {
opts = opts || {};
opts.writableObjectMode = true;
super(opts);
constructor(opts = {}) {
super({
...opts,
writableObjectMode: true,
});
debug("new openProtocolSerializer");
}

Expand Down Expand Up @@ -159,10 +160,6 @@ class OpenProtocolSerializer extends Transform {

cb();
}

_destroy() {
//no-op, needed to handle older node versions
}
}

module.exports = OpenProtocolSerializer;
9 changes: 5 additions & 4 deletions src/sessionControlClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ class SessionControlClient extends EventEmitter {

/**
* @throws {error}
* @param {*} opts
* @param {object} opts
* @param {object} [opts.defaultRevisions = {}]
* @param {boolean} [opts.linkLayerActivate] true = activate LinkLayer / false = not activate LinkLayer / undefined = autoNegotiation LinkLayer
* @param {boolean} [opts.genericMode] true activate / false or undefined not activate
* @param {number} [opts.keepAlive = 10000]
*
* @param {stream} opts.stream
* @param {import('net').Socket} opts.stream
* @param {boolean} [opts.rawData]
* @param {object} [opts.disableMidParsing = {}]
* @param {number} [opts.timeOut = 3000]
Expand Down Expand Up @@ -1116,9 +1116,10 @@ class SessionControlClient extends EventEmitter {
this.midInProcess.doCallback(err);
}

this._sendKeepAlive();
this.inOperation = false;
this._sendingProcess();

// if a serialization error happened, then the streams are destroyed and no longer usable
this.close(err);
}

}
Expand Down

0 comments on commit 18b7655

Please sign in to comment.