Skip to content

Commit

Permalink
Merge pull request #94 from Cloud-Automation/chopped-data-packets
Browse files Browse the repository at this point in the history
Chopped data packets
  • Loading branch information
GermanBluefox authored Nov 25, 2016
2 parents 2041f3b + 962c273 commit 6334ab3
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 67 deletions.
35 changes: 19 additions & 16 deletions src/modbus-tcp-client.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
'use strict'

var stampit = require('stampit')
var Net = require('net')
var ModbusCore = require('./modbus-client-core.js')

module.exports = stampit()
.compose(ModbusCore)
.init(function () {
var reqId = 0
var currentRequestId = reqId
var closedOnPurpose = false
var reconnect = false
var trashRequestId
var buffer = Buffer.alloc(0)
var socket

var init = function () {
Expand All @@ -35,8 +33,13 @@ module.exports = stampit()
this.setState('connect')

if (!socket) {
socket = new Net.Socket()

/* for testing you are able to inject a mocking object
* a simple event object should do the trick */
if (this.injectedSocket) {
socket = this.injectedSocket
} else {
socket = require('net').Socket()
}
socket.on('connect', onSocketConnect)
socket.on('close', onSocketClose)
socket.on('error', onSocketError)
Expand Down Expand Up @@ -76,37 +79,37 @@ module.exports = stampit()
var onSocketData = function (data) {
this.log.debug('received data')

var cnt = 0
buffer = Buffer.concat([buffer, data])

while (cnt < data.length) {
while (buffer.length > 7) {
// 1. extract mbap

var mbap = data.slice(cnt, cnt + 7)
var id = mbap.readUInt16BE(0)
var len = mbap.readUInt16BE(4)
var id = buffer.readUInt16BE(0)
var len = buffer.readUInt16BE(4)

if (id === trashRequestId) {
this.log.debug('current mbap contains trashed request id.')

return
}

cnt += 7

this.log.debug('MBAP extracted')
/* Not all data received yet. */
if (buffer.length < 7 + len - 1) {
break
}

// 2. extract pdu

var pdu = data.slice(cnt, cnt + len - 1)

cnt += pdu.length
var pdu = buffer.slice(7, 7 + len - 1)

this.log.debug('PDU extracted')

// emit data event and let the
// listener handle the pdu

this.emit('data', pdu)

buffer = buffer.slice(pdu.length + 7, buffer.length)
}
}.bind(this)

Expand Down
71 changes: 71 additions & 0 deletions src/modbus-tcp-server-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
'use strict'
var stampit = require('stampit')
var logger = require('stampit-log')

module.exports = stampit()
.compose(logger)
.init(function () {
let buffer = Buffer.alloc(0)

let init = function () {
if (!this.socket || this.socketId === undefined || !this.onRequest) {
throw new Error('No Socket defined.')
}

this.socket.on('end', this.onSocketEnd)
this.socket.on('data', this.onSocketData)
this.socket.on('error', this.onSocketError)
}.bind(this)

this.onSocketEnd = function () {
if (this.onEnd) {
this.onEnd()
}

this.log.debug('connection closed, socket', this.socketId)
}.bind(this)

this.onSocketData = function (data) {
this.log.debug('received data socket', this.socketId, data.byteLength)

buffer = Buffer.concat([buffer, data])

while (buffer.length > 8) {
// 1. extract mbap

let len = buffer.readUInt16BE(4)
let request = {
trans_id: buffer.readUInt16BE(0),
protocol_ver: buffer.readUInt16BE(2),
unit_id: buffer.readUInt8(6)
}

// 2. extract pdu

/* received data is not complete yet.
* break loop and wait for more data. */

if (buffer.length < 7 + len - 1) {
break
}

let pdu = buffer.slice(7, 7 + len)

// emit data event and let the
// listener handle the pdu

this.log.debug('PDU extracted.')

this.onRequest({ request: request, pdu: pdu, socket: this.socket })

buffer = buffer.slice(pdu.length + 7, buffer.length)
}
}.bind(this)

this.onSocketError = function (e) {
this.logError('Socker error', e)
}

init()
})

83 changes: 34 additions & 49 deletions src/modbus-tcp-server.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
'use strict'

var stampit = require('stampit')
var ModbusServerCore = require('./modbus-server-core.js')
var StateMachine = require('stampit-state-machine')
var net = require('net')
var ClientSocket = require('./modbus-tcp-server-client.js')

module.exports = stampit()
.compose(ModbusServerCore)
.compose(StateMachine)
.init(function () {
var server
var socketCount = 0
var fifo = []
var clients = []
let server
let socketList = []
let fifo = []
let clients = []

var init = function () {
let init = function () {
if (!this.port) {
this.port = 502
}
Expand Down Expand Up @@ -47,39 +47,6 @@ module.exports = stampit()
this.setState('ready')
}.bind(this)

var onSocketEnd = function (socket, socketId) {
return function () {
this.log.debug('connection closed, socket', socketId)
}.bind(this)
}.bind(this)

var onSocketData = function (socket, socketId) {
return function (data) {
this.log.debug('received data socket', socketId, data.byteLength)

// 1. extract mbap

var mbap = data.slice(0, 0 + 7)
var len = mbap.readUInt16BE(4)
var request = {
trans_id: mbap.readUInt16BE(0),
protocol_ver: mbap.readUInt16BE(2),
unit_id: mbap.readUInt8(6)
}

// 2. extract pdu

var pdu = data.slice(7, 7 + len - 1)

// emit data event and let the
// listener handle the pdu

fifo.push({ request: request, pdu: pdu, socket: socket })

flush()
}.bind(this)
}.bind(this)

var flush = function () {
if (this.inState('processing')) {
return
Expand Down Expand Up @@ -111,19 +78,37 @@ module.exports = stampit()
}.bind(this))
}.bind(this)

var onSocketError = function (socket, socketCount) {
return function (e) {
this.logError('Socker error', e)
var initiateSocket = function (socket) {
let socketId = socketList.length

var requestHandler = function (req) {
fifo.push(req)
flush()
}

var removeHandler = function () {
socketList[socketId] = undefined
/* remove undefined on the end of the array */
for (let i = socketList.length - 1; i >= 0; i -= 1) {
let cur = socketList[i]
if (cur !== undefined) {
break
}

socketList.splice(i, 1)
}
this.log.debug('Client connection closed, remaining clients. ', socketList.length)
}.bind(this)
}.bind(this)

var initiateSocket = function (socket) {
socketCount += 1
let clientSocket = ClientSocket({
socket: socket,
socketId: socketId,
onRequest: requestHandler,
onEnd: removeHandler
})

socket.on('end', onSocketEnd(socket, socketCount))
socket.on('data', onSocketData(socket, socketCount))
socket.on('error', onSocketError(socket, socketCount))
}
socketList.push(clientSocket)
}.bind(this)

this.close = function (cb) {
for (var c in clients) {
Expand Down
6 changes: 4 additions & 2 deletions src/modbus.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

var fs = require('fs')
var path = require('path')
var ModbusCore = require('./modbus-client-core.js')
var ModbusTcpClient = require('./modbus-tcp-client.js')

exports.client = {
tcp: {
core: require('./modbus-tcp-client.js'),
complete: require('./modbus-tcp-client.js')
core: ModbusCore.compose(ModbusTcpClient),
complete: ModbusCore.compose(ModbusTcpClient)
},
serial: {
core: require('./modbus-serial-client.js'),
Expand Down
92 changes: 92 additions & 0 deletions test/modbus-tcp.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/* global describe, it */
'use strict'
var assert = require('assert')
var EventEmitter = require('events')

describe('Modbus TCP Tests.', function () {
describe('Server Tests.', function () {
/* We are using the read coils request for the chopped data tests. */

let ClientSocket = require('../src/modbus-tcp-server-client.js')

it('should handle a chopped data request fine.', function (done) {
var em = new EventEmitter()
var requests = [Buffer.from([0x00, 0x01]), // Transaction Identifier
Buffer.from([0x00, 0x00]), // Protocol
Buffer.from([0x00, 0x05]), // Length
Buffer.from([0x01]), // Unit identifier
Buffer.from([0x01]), // PDU Function Code
Buffer.from([0x00, 0x0A]), // Start Address
Buffer.from([0x00, 0x15])] // Quantitiy

var exRequest = {
request: {
trans_id: 1,
protocol_ver: 0,
unit_id: 1
},
pdu: Buffer.from([0x01, 0x00, 0x0A, 0x00, 0x15]),
socket: em }

var callbackCounter = 0
var handler = function (req) {
assert.deepEqual(req, exRequest)
callbackCounter += 1
assert.equal(callbackCounter, 1)
done()
}

ClientSocket({
socket: em,
socketId: 1,
onRequest: handler
})

requests.forEach(function (b) {
em.emit('data', b)
})
})
})

describe('Client Tests.', function () {
var stampit = require('stampit')
var StateMachine = require('stampit-state-machine')
var Logger = require('stampit-log')

it('should handle a chopped data request fine.', function (done) {
var ModbusTcpClient = require('../src/modbus-tcp-client.js')
var injectedSocket = new EventEmitter()
var exResponse = Buffer.from([0x01, 0x02, 0x055, 0x01])

/* dummy method */
injectedSocket.connect = function () { }

/* create the client by composing
* logger, state machine and the tcp client,
* normally the logger and the state machine
* come with the modbus client core. */
var client = stampit()
.compose(Logger)
.compose(StateMachine)
.compose(ModbusTcpClient)({
injectedSocket: injectedSocket
})

/* connect to whatever and confirm */
client.connect()

injectedSocket.emit('connect')

/* fetch send data and compare */
client.on('data', function (data) {
assert.equal(data.compare(exResponse), 0)
done()
})

/* Send header data */
injectedSocket.emit('data', Buffer.from([0x00, 0x01, 0x00, 0x00, 0x00, 0x05, 0x01]))
/* emitting a read coils request (start = 0, count = 10) */
injectedSocket.emit('data', Buffer.from([0x01, 0x02, 0x55, 0x01]))
})
})
})

0 comments on commit 6334ab3

Please sign in to comment.