Skip to content

Commit

Permalink
Merge pull request #17 from andig/fix/races
Browse files Browse the repository at this point in the history
Fix races in accessing connection and builder
  • Loading branch information
mlnoga authored Apr 21, 2024
2 parents 744e0ab + 054695c commit 1c5b750
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 36 deletions.
5 changes: 2 additions & 3 deletions build.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ type DatagramBuilder struct {
// Returns a new DatagramBuilder
func NewDatagramBuilder() (b *DatagramBuilder) {
return &DatagramBuilder{
buffer: bytes.Buffer{},
crc: NewCRC(),
crc: NewCRC(),
}
}

Expand Down Expand Up @@ -69,7 +68,7 @@ func (r *DatagramBuilder) Bytes() []byte {

// Converts the datagram into a string representation for printing
func (r *DatagramBuilder) String() string {
buf := bytes.Buffer{}
var buf bytes.Buffer
buf.WriteByte(byte('['))
for i, b := range r.buffer.Bytes() {
if i != 0 {
Expand Down
67 changes: 43 additions & 24 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,29 @@ package rct
import (
"fmt"
"net"
"sync"
"time"
)

// DialTimeout is the default cache for connecting to a RCT device
var DialTimeout = time.Second * 5
var (
// DialTimeout is the default cache for connecting to a RCT device
DialTimeout = time.Second * 5

// Map of active connections
connectionCache = make(map[string]*Connection)
)

// Connection to a RCT device
type Connection struct {
host string
conn net.Conn
builder *DatagramBuilder
parser *DatagramParser
cache *Cache
mu sync.Mutex
host string
conn net.Conn
parser *DatagramParser
cache *Cache
}

// Map of active connections
var connectionCache map[string]*Connection = make(map[string]*Connection)

// Creates a new connection to a RCT device at the given address
// Creates a new connection to a RCT device at the given address.
// Must not be called concurrently.
func NewConnection(host string, cache time.Duration) (*Connection, error) {
if conn, ok := connectionCache[host]; ok {
if conn.conn != nil { // there might be dead connection in the cache, e.g. when connection was disconnected
Expand All @@ -30,10 +34,9 @@ func NewConnection(host string, cache time.Duration) (*Connection, error) {
}

conn := &Connection{
host: host,
builder: NewDatagramBuilder(),
parser: NewDatagramParser(),
cache: NewCache(cache),
host: host,
parser: NewDatagramParser(),
cache: NewCache(cache),
}

if err := conn.connect(); err != nil {
Expand All @@ -59,7 +62,13 @@ func (c *Connection) Close() {
}

// Sends the given RCT datagram via the connection
func (c *Connection) Send(rdb *DatagramBuilder) (n int, err error) {
func (c *Connection) Send(rdb *DatagramBuilder) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.send(rdb)
}

func (c *Connection) send(rdb *DatagramBuilder) (int, error) {
// ensure active connection
if c.conn == nil {
if err := c.connect(); err != nil {
Expand All @@ -68,8 +77,7 @@ func (c *Connection) Send(rdb *DatagramBuilder) (n int, err error) {
}

// fmt.Printf("Sending %v\n", c.Builder.String())
n, err = c.conn.Write(rdb.Bytes())

n, err := c.conn.Write(rdb.Bytes())
// single retry on error when sending
if err != nil {
// fmt.Printf("Read %d bytes error %v\n", n, err)
Expand All @@ -85,7 +93,14 @@ func (c *Connection) Send(rdb *DatagramBuilder) (n int, err error) {
}

// Receives an RCT response via the connection
func (c *Connection) Receive() (dg *Datagram, err error) {
func (c *Connection) Receive() (*Datagram, error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.receive()
}

// Receives an RCT response via the connection
func (c *Connection) receive() (dg *Datagram, err error) {
// ensure active connection
if c.conn == nil {
if err := c.connect(); err != nil {
Expand All @@ -108,17 +123,21 @@ func (c *Connection) Receive() (dg *Datagram, err error) {
}

// Queries the given identifier on the RCT device, returning its value as a datagram
func (c *Connection) Query(id Identifier) (dg *Datagram, err error) {
func (c *Connection) Query(id Identifier) (*Datagram, error) {
c.mu.Lock()
defer c.mu.Unlock()

if dg, ok := c.cache.Get(id); ok {
return dg, nil
}
c.builder.Build(&Datagram{Read, id, nil})
_, err = c.Send(c.builder)
if err != nil {

builder := NewDatagramBuilder()
builder.Build(&Datagram{Read, id, nil})
if _, err := c.send(builder); err != nil {
return nil, err
}

dg, err = c.Receive()
dg, err := c.receive()
if err != nil {
return nil, err
}
Expand Down
12 changes: 3 additions & 9 deletions recoverable.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
package rct;
package rct

// Errors caused by a malformed or unexpected packet, which can be potentially be recovered by retrying the transmission
type RecoverableError struct {
Err string
Err string
}

// Prints error to string
func (e RecoverableError) Error() string {
return e.Err
}

// Returns true if the given error is potentially recoverable
func IsRecoverableError(err error) bool {
_, ok:=err.(RecoverableError)
return ok
return e.Err
}

0 comments on commit 1c5b750

Please sign in to comment.