Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modbus: fix race conditions in gridx/modbus #8517

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,26 +198,36 @@ func (cp *ConfigProvider) configure(conf config) error {
}

func (cp *ConfigProvider) configureMeters(conf config) error {
var mu sync.Mutex
g, _ := errgroup.WithContext(context.Background())

cp.meters = make(map[string]api.Meter)
for id, cc := range conf.Meters {
if cc.Name == "" {
return fmt.Errorf("cannot create %s meter: missing name", humanize.Ordinal(id+1))
}

m, err := meter.NewFromConfig(cc.Type, cc.Other)
if err != nil {
err = fmt.Errorf("cannot create meter '%s': %w", cc.Name, err)
return err
}
cc := cc

if _, exists := cp.meters[cc.Name]; exists {
return fmt.Errorf("duplicate meter name: %s already defined and must be unique", cc.Name)
}
g.Go(func() error {
m, err := meter.NewFromConfig(cc.Type, cc.Other)
if err != nil {
return fmt.Errorf("cannot create meter '%s': %w", cc.Name, err)
}

cp.meters[cc.Name] = m
mu.Lock()
defer mu.Unlock()

if _, exists := cp.meters[cc.Name]; exists {
return fmt.Errorf("duplicate meter name: %s already defined and must be unique", cc.Name)
}

cp.meters[cc.Name] = m
return nil
})
}

return nil
return g.Wait()
}

func (cp *ConfigProvider) configureChargers(conf config) error {
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,7 @@ require (
)

replace github.com/foogod/go-powerwall => github.com/andig/go-powerwall v0.2.1-0.20230525091927-777196024a18

replace github.com/grid-x/modbus => github.com/andig/gridx-modbus v0.0.0-20230618091634-9f0de8712080

replace github.com/volkszaehler/mbmd => "../mbmd"
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ github.com/andig/go-powerwall v0.2.1-0.20230525091927-777196024a18 h1:fbf6hVLSKU
github.com/andig/go-powerwall v0.2.1-0.20230525091927-777196024a18/go.mod h1:MR0vuGPmz9sLXrIAsdkXulW1j6t6svVr2D1nJKaJI78=
github.com/andig/gosunspec v0.0.0-20211108155140-af2e73b86e71 h1:tnjVNZjuz+CK6fdc7ohJpMHjcEGFI5APp0l5T5Ocr/Y=
github.com/andig/gosunspec v0.0.0-20211108155140-af2e73b86e71/go.mod h1:c6P6szcR+ROkqZruOR4f6qbDKFjZX6OitPpj+yJ/r8k=
github.com/andig/gridx-modbus v0.0.0-20230618091634-9f0de8712080 h1:5o2sSgmnJ6Ew/UTvX+h9OHhP/t4rwCD7CNsi/JmuFyc=
github.com/andig/gridx-modbus v0.0.0-20230618091634-9f0de8712080/go.mod h1:qVX2WhsI5xyAoM6I/MV1bXSKBPdLAjp7pCvieO/S0AY=
github.com/andig/mbserver v0.0.0-20230310211055-1d29cbb5820e h1:m/NTP3JWpR7M0ljLxiQU4fzR25jjhe1LDtxLMNcoNJQ=
github.com/andig/mbserver v0.0.0-20230310211055-1d29cbb5820e/go.mod h1:4VtYzTm//oUipwvO3yh0g/udTE7pYJM+U/kyAuFDsgM=
github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
Expand Down Expand Up @@ -586,9 +588,6 @@ github.com/graph-gophers/graphql-transport-ws v0.0.2 h1:DbmSkbIGzj8SvHei6n8Mh9eL
github.com/graph-gophers/graphql-transport-ws v0.0.2/go.mod h1:5BVKvFzOd2BalVIBFfnfmHjpJi/MZ5rOj8G55mXvZ8g=
github.com/gregdel/pushover v1.2.0 h1:SLnpvJijUyEZvkJNyrldGhFhryYgQYlThSLpB5Oqt5k=
github.com/gregdel/pushover v1.2.0/go.mod h1:EcaO66Nn1StkpEm1iKtBTV3d2A16SoMsVER1PthX7to=
github.com/grid-x/modbus v0.0.0-20210714071042-7af2b65ec03b/go.mod h1:YaK0rKJenZ74vZFcSSLlAQqtG74PMI68eDjpDCDDmTw=
github.com/grid-x/modbus v0.0.0-20230511111420-e90d491dbd4f h1:kYCn76Qlwsjbw3Bh4kF6ljySsK+uQODzGi3HUIZ+TpU=
github.com/grid-x/modbus v0.0.0-20230511111420-e90d491dbd4f/go.mod h1:qVX2WhsI5xyAoM6I/MV1bXSKBPdLAjp7pCvieO/S0AY=
github.com/grid-x/serial v0.0.0-20191104121038-e24bc9bf6f08/go.mod h1:kdOd86/VGFWRrtkNwf1MPk0u1gIjc4Y7R2j7nhwc7Rk=
github.com/grid-x/serial v0.0.0-20211107191517-583c7356b3aa h1:Rsn6ARgNkXrsXJIzhkE4vQr5Gbx2LvtEMv4BJOK4LyU=
github.com/grid-x/serial v0.0.0-20211107191517-583c7356b3aa/go.mod h1:kdOd86/VGFWRrtkNwf1MPk0u1gIjc4Y7R2j7nhwc7Rk=
Expand Down
19 changes: 12 additions & 7 deletions server/modbus/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ LOOP:
}

func (h *handler) HandleCoils(req *mbserver.CoilsRequest) ([]bool, error) {
conn := h.conn.Clone(req.UnitId)

if req.IsWrite {
if h.readOnly {
return nil, mbserver.ErrIllegalFunction
Expand All @@ -110,45 +112,48 @@ func (h *handler) HandleCoils(req *mbserver.CoilsRequest) ([]bool, error) {
u = 0xFF00
}

b, err := h.conn.WriteSingleCoilWithSlave(req.UnitId, req.Addr, u)
b, err := conn.WriteSingleCoil(req.Addr, u)
return h.coilsToResult("write coil", req.Quantity, b, err)
}

h.log.TRACE.Printf("write coils: id %d addr %d qty %d val %v", req.UnitId, req.Addr, req.Quantity, req.Args)
args := coilsToBytes(req.Args)
b, err := h.conn.WriteMultipleCoilsWithSlave(req.UnitId, req.Addr, req.Quantity, args)
b, err := conn.WriteMultipleCoils(req.Addr, req.Quantity, args)
return h.coilsToResult("write coils", req.Quantity, b, err)
}

h.log.TRACE.Printf("read coil: id %d addr %d qty %d", req.UnitId, req.Addr, req.Quantity)
b, err := h.conn.ReadCoilsWithSlave(req.UnitId, req.Addr, req.Quantity)
b, err := conn.ReadCoils(req.Addr, req.Quantity)
return h.coilsToResult("read coil", req.Quantity, b, err)
}

func (h *handler) HandleInputRegisters(req *mbserver.InputRegistersRequest) (res []uint16, err error) {
conn := h.conn.Clone(req.UnitId)
h.log.TRACE.Printf("read input: id %d addr %d qty %d", req.UnitId, req.Addr, req.Quantity)
b, err := h.conn.ReadInputRegistersWithSlave(req.UnitId, req.Addr, req.Quantity)
b, err := conn.ReadInputRegisters(req.Addr, req.Quantity)
return h.exceptionToUint16AndError("read input", b, err)
}

func (h *handler) HandleHoldingRegisters(req *mbserver.HoldingRegistersRequest) (res []uint16, err error) {
conn := h.conn.Clone(req.UnitId)

if req.IsWrite {
if h.readOnly {
return nil, mbserver.ErrIllegalFunction
}

if req.WriteFuncCode == gridx.FuncCodeWriteSingleRegister {
h.log.TRACE.Printf("write holding: id %d addr %d val %04x", req.UnitId, req.Addr, req.Args[0])
b, err := h.conn.WriteSingleRegisterWithSlave(req.UnitId, req.Addr, req.Args[0])
b, err := conn.WriteSingleRegister(req.Addr, req.Args[0])
return h.exceptionToUint16AndError("write holding", b, err)
}

h.log.TRACE.Printf("write holding: id %d addr %d qty %d val %0x", req.UnitId, req.Addr, req.Quantity, asBytes(req.Args))
b, err := h.conn.WriteMultipleRegistersWithSlave(req.UnitId, req.Addr, req.Quantity, asBytes(req.Args))
b, err := conn.WriteMultipleRegisters(req.Addr, req.Quantity, asBytes(req.Args))
return h.exceptionToUint16AndError("write multiple holding", b, err)
}

h.log.TRACE.Printf("read holding: id %d addr %d qty %d", req.UnitId, req.Addr, req.Quantity)
b, err := h.conn.ReadHoldingRegistersWithSlave(req.UnitId, req.Addr, req.Quantity)
b, err := conn.ReadHoldingRegisters(req.Addr, req.Quantity)
return h.exceptionToUint16AndError("read holding", b, err)
}
12 changes: 6 additions & 6 deletions server/modbus/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestConcurrentRead(t *testing.T) {
addr := uint16(rand.Int31n(200) + 1)
qty := uint16(rand.Int31n(32) + 1)

b, err := conn.ReadInputRegistersWithSlave(uint8(id), addr, qty)
b, err := conn.Clone(uint8(id)).ReadInputRegisters(addr, qty)
assert.NoError(t, err)

if err == nil {
Expand Down Expand Up @@ -94,24 +94,24 @@ func TestReadCoils(t *testing.T) {
assert.NoError(t, err)

{ // read
b, err := conn.ReadCoilsWithSlave(1, 1, 1)
b, err := conn.ReadCoils(1, 1)
assert.NoError(t, err)
assert.Equal(t, []byte{0x01}, b)

b, err = conn.ReadCoilsWithSlave(1, 1, 2)
b, err = conn.ReadCoils(1, 2)
assert.NoError(t, err)
assert.Equal(t, []byte{0x03}, b)

b, err = conn.ReadCoilsWithSlave(1, 1, 9)
b, err = conn.ReadCoils(1, 9)
assert.NoError(t, err)
assert.Equal(t, []byte{0xFF, 0x01}, b)
}
{ // write
b, err := conn.WriteSingleCoilWithSlave(1, 1, 0xFF00)
b, err := conn.WriteSingleCoil(1, 0xFF00)
assert.NoError(t, err)
assert.Equal(t, []byte{0xFF, 0x00}, b)

b, err = conn.WriteMultipleCoilsWithSlave(1, 1, 9, []byte{0xFF, 0x01})
b, err = conn.WriteMultipleCoils(1, 9, []byte{0xFF, 0x01})
assert.NoError(t, err)
assert.Equal(t, []byte{0x00, 0x09}, b)
}
Expand Down
141 changes: 34 additions & 107 deletions util/modbus/modbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,19 @@ func (s *Settings) String() string {

// Connection decorates a meters.Connection with transparent slave id and error handling
type Connection struct {
slaveID uint8
mu sync.Mutex
conn meters.Connection
delay time.Duration
conn meters.Connection
delay time.Duration
}

func (mb *Connection) prepare(slaveID uint8) {
mb.conn.Slave(slaveID)
// Clone clones the connection with a new slave id
func (mb *Connection) Clone(slaveID uint8) *Connection {
return &Connection{
conn: mb.conn.Clone(slaveID),
delay: mb.delay,
}
}

func (mb *Connection) sleep() {
if mb.delay > 0 {
time.Sleep(mb.delay)
}
Expand Down Expand Up @@ -95,136 +100,59 @@ func (mb *Connection) Timeout(timeout time.Duration) {
mb.conn.Timeout(timeout)
}

// ReadCoils wraps the underlying implementation
func (mb *Connection) ReadCoilsWithSlave(slaveID uint8, address, quantity uint16) ([]byte, error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().ReadCoils(address, quantity))
}

// WriteSingleCoil wraps the underlying implementation
func (mb *Connection) WriteSingleCoilWithSlave(slaveID uint8, address, value uint16) ([]byte, error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().WriteSingleCoil(address, value))
}

// ReadInputRegisters wraps the underlying implementation
func (mb *Connection) ReadInputRegistersWithSlave(slaveID uint8, address, quantity uint16) ([]byte, error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().ReadInputRegisters(address, quantity))
}

// ReadHoldingRegisters wraps the underlying implementation
func (mb *Connection) ReadHoldingRegistersWithSlave(slaveID uint8, address, quantity uint16) ([]byte, error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().ReadHoldingRegisters(address, quantity))
}

// WriteSingleRegister wraps the underlying implementation
func (mb *Connection) WriteSingleRegisterWithSlave(slaveID uint8, address, value uint16) ([]byte, error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().WriteSingleRegister(address, value))
}

// WriteMultipleRegisters wraps the underlying implementation
func (mb *Connection) WriteMultipleRegistersWithSlave(slaveID uint8, address, quantity uint16, value []byte) ([]byte, error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().WriteMultipleRegisters(address, quantity, value))
}

// ReadDiscreteInputs wraps the underlying implementation
func (mb *Connection) ReadDiscreteInputsWithSlave(slaveID uint8, address, quantity uint16) (results []byte, err error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().ReadDiscreteInputs(address, quantity))
}

// WriteMultipleCoils wraps the underlying implementation
func (mb *Connection) WriteMultipleCoilsWithSlave(slaveID uint8, address, quantity uint16, value []byte) (results []byte, err error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().WriteMultipleCoils(address, quantity, value))
}

// ReadWriteMultipleRegisters wraps the underlying implementation
func (mb *Connection) ReadWriteMultipleRegistersWithSlave(slaveID uint8, readAddress, readQuantity, writeAddress, writeQuantity uint16, value []byte) (results []byte, err error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().ReadWriteMultipleRegisters(readAddress, readQuantity, writeAddress, writeQuantity, value))
}

// MaskWriteRegister wraps the underlying implementation
func (mb *Connection) MaskWriteRegisterWithSlave(slaveID uint8, address, andMask, orMask uint16) (results []byte, err error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().MaskWriteRegister(address, andMask, orMask))
}

// ReadFIFOQueue wraps the underlying implementation
func (mb *Connection) ReadFIFOQueueWithSlave(slaveID uint8, address uint16) (results []byte, err error) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.prepare(slaveID)
return mb.handle(mb.conn.ModbusClient().ReadFIFOQueue(address))
}

func (mb *Connection) ReadCoils(address, quantity uint16) ([]byte, error) {
return mb.ReadCoilsWithSlave(mb.slaveID, address, quantity)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().ReadCoils(address, quantity))
}

func (mb *Connection) WriteSingleCoil(address, quantity uint16) ([]byte, error) {
return mb.WriteSingleCoilWithSlave(mb.slaveID, address, quantity)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().WriteSingleCoil(address, quantity))
}

func (mb *Connection) ReadInputRegisters(address, quantity uint16) ([]byte, error) {
return mb.ReadInputRegistersWithSlave(mb.slaveID, address, quantity)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().ReadInputRegisters(address, quantity))
}

func (mb *Connection) ReadHoldingRegisters(address, quantity uint16) ([]byte, error) {
return mb.ReadHoldingRegistersWithSlave(mb.slaveID, address, quantity)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().ReadHoldingRegisters(address, quantity))
}

func (mb *Connection) WriteSingleRegister(address, value uint16) ([]byte, error) {
return mb.WriteSingleRegisterWithSlave(mb.slaveID, address, value)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().WriteSingleRegister(address, value))
}

func (mb *Connection) WriteMultipleRegisters(address, quantity uint16, value []byte) ([]byte, error) {
return mb.WriteMultipleRegistersWithSlave(mb.slaveID, address, quantity, value)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().WriteMultipleRegisters(address, quantity, value))
}

func (mb *Connection) ReadDiscreteInputs(address, quantity uint16) (results []byte, err error) {
return mb.ReadDiscreteInputsWithSlave(mb.slaveID, address, quantity)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().ReadDiscreteInputs(address, quantity))
}

func (mb *Connection) WriteMultipleCoils(address, quantity uint16, value []byte) (results []byte, err error) {
return mb.WriteMultipleCoilsWithSlave(mb.slaveID, address, quantity, value)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().WriteMultipleCoils(address, quantity, value))
}

func (mb *Connection) ReadWriteMultipleRegisters(readAddress, readQuantity, writeAddress, writeQuantity uint16, value []byte) (results []byte, err error) {
return mb.ReadWriteMultipleRegistersWithSlave(mb.slaveID, readAddress, readQuantity, writeAddress, writeQuantity, value)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().ReadWriteMultipleRegisters(readAddress, readQuantity, writeAddress, writeQuantity, value))
}

func (mb *Connection) MaskWriteRegister(address, andMask, orMask uint16) (results []byte, err error) {
return mb.MaskWriteRegisterWithSlave(mb.slaveID, address, andMask, orMask)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().MaskWriteRegister(address, andMask, orMask))
}

func (mb *Connection) ReadFIFOQueue(address uint16) (results []byte, err error) {
return mb.ReadFIFOQueueWithSlave(mb.slaveID, address)
mb.sleep()
return mb.handle(mb.conn.ModbusClient().ReadFIFOQueue(address))
}

var (
Expand Down Expand Up @@ -299,8 +227,7 @@ func NewConnection(uri, device, comset string, baudrate int, proto Protocol, sla
}

slaveConn := &Connection{
slaveID: slaveID,
conn: conn,
conn: conn.Clone(slaveID),
}

return slaveConn, nil
Expand Down