Skip to content

Commit

Permalink
add node.IdleTimeout and close idle connections after a timeout (blu…
Browse files Browse the repository at this point in the history
…environ/mavp2p#33) (#61)

* replace pkg/udplistener with pion/transport/v2/udp

* add node.IdleTimeout and close idle connections after a timeout (bluenviron/mavp2p#33)
  • Loading branch information
aler9 authored Aug 7, 2023
1 parent 49e9135 commit 5510fa9
Show file tree
Hide file tree
Showing 17 changed files with 323 additions and 569 deletions.
10 changes: 2 additions & 8 deletions endpoint_broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/bluenviron/gomavlib/v2/pkg/dialect"
"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
)

var _ endpointChannelSingle = (*endpointUDPBroadcast)(nil)
Expand All @@ -27,13 +26,8 @@ func (rw *readWriterFromFuncs) Write(p []byte) (int, error) {
}

func TestEndpointBroadcast(t *testing.T) {
dial := &dialect.Dialect{
Version: 3,
Messages: []message.Message{&MessageHeartbeat{}},
}

node, err := NewNode(NodeConf{
Dialect: dial,
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{EndpointUDPBroadcast{"127.255.255.255:5602", ":5601"}},
Expand All @@ -51,7 +45,7 @@ func TestEndpointBroadcast(t *testing.T) {
require.NoError(t, err)
defer pc.Close()

dialectRW, err := dialect.NewReadWriter(dial)
dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

rw, err := frame.NewReadWriter(frame.ReadWriterConf{
Expand Down
8 changes: 7 additions & 1 deletion endpoint_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,20 @@ func initEndpointClient(node *Node, conf endpointClientConf) (Endpoint, error) {
}
return "tcp4"
}()

timedContext, timedContextClose := context.WithTimeout(ctx, node.conf.ReadTimeout)
nconn, err := (&net.Dialer{}).DialContext(timedContext, network, conf.getAddress())
timedContextClose()

if err != nil {
return nil, err
}

return timednetconn.New(node.conf.WriteTimeout, nconn), nil
return timednetconn.New(
node.conf.IdleTimeout,
node.conf.WriteTimeout,
nconn,
), nil
},
),
}
Expand Down
127 changes: 105 additions & 22 deletions endpoint_client_test.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,37 @@
package gomavlib

import (
"io"
"net"
"testing"
"time"

"github.com/pion/transport/v2/udp"
"github.com/stretchr/testify/require"

"github.com/bluenviron/gomavlib/v2/pkg/dialect"
"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
"github.com/bluenviron/gomavlib/v2/pkg/udplistener"
)

var _ endpointChannelSingle = (*endpointClient)(nil)

func TestEndpointClient(t *testing.T) {
for _, ca := range []string{"tcp", "udp"} {
t.Run(ca, func(t *testing.T) {
dial := &dialect.Dialect{
Version: 3,
Messages: []message.Message{&MessageHeartbeat{}},
}

var ln net.Listener
var err error
if ca == "tcp" {
var err error
ln, err = net.Listen("tcp", "127.0.0.1:5601")
require.NoError(t, err)
} else {
ln, err = udplistener.New("udp", "127.0.0.1:5601")
}
require.NoError(t, err)
defer ln.Close()

connOpened := make(chan net.Conn)
addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:5601")
require.NoError(t, err)

go func() {
conn, err := ln.Accept()
ln, err = udp.Listen("udp", addr)
require.NoError(t, err)
connOpened <- conn
}()
}

defer ln.Close()

var e EndpointConf
if ca == "tcp" {
Expand All @@ -48,7 +41,7 @@ func TestEndpointClient(t *testing.T) {
}

node, err := NewNode(NodeConf{
Dialect: dial,
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{e},
Expand All @@ -62,7 +55,6 @@ func TestEndpointClient(t *testing.T) {
Channel: evt.(*EventChannelOpen).Channel,
}, evt)

var conn net.Conn
var rw *frame.ReadWriter

for i := 0; i < 3; i++ {
Expand All @@ -77,10 +69,11 @@ func TestEndpointClient(t *testing.T) {
node.WriteMessageAll(msg)

if i == 0 {
conn = <-connOpened
conn, err := ln.Accept()
require.NoError(t, err)
defer conn.Close()

dialectRW, err := dialect.NewReadWriter(dial)
dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

rw, err = frame.NewReadWriter(frame.ReadWriterConf{
Expand Down Expand Up @@ -128,3 +121,93 @@ func TestEndpointClient(t *testing.T) {
})
}
}

func TestEndpointClientIdleTimeout(t *testing.T) {
for _, ca := range []string{"tcp"} {
t.Run(ca, func(t *testing.T) {
var ln net.Listener
var err error
ln, err = net.Listen("tcp", "127.0.0.1:5603")
require.NoError(t, err)

defer ln.Close()

var e EndpointConf
if ca == "tcp" {
e = EndpointTCPClient{"127.0.0.1:5603"}
} else {
e = EndpointUDPClient{"127.0.0.1:5603"}
}

node, err := NewNode(NodeConf{
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{e},
HeartbeatDisable: true,
IdleTimeout: 500 * time.Millisecond,
})
require.NoError(t, err)
defer node.Close()

evt := <-node.Events()
require.Equal(t, &EventChannelOpen{
Channel: evt.(*EventChannelOpen).Channel,
}, evt)

msg := &MessageHeartbeat{
Type: 1,
Autopilot: 2,
BaseMode: 3,
CustomMode: 6,
SystemStatus: 4,
MavlinkVersion: 5,
}
node.WriteMessageAll(msg)

conn, err := ln.Accept()
require.NoError(t, err)

dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

rw, err := frame.NewReadWriter(frame.ReadWriterConf{
ReadWriter: conn,
DialectRW: dialectRW,
OutVersion: frame.V2,
OutSystemID: 11,
})
require.NoError(t, err)

fr, err := rw.Read()
require.NoError(t, err)
require.Equal(t, &frame.V2Frame{
SequenceID: 0,
SystemID: 10,
ComponentID: 1,
Message: msg,
Checksum: fr.GetChecksum(),
}, fr)

closed := make(chan struct{})

go func() {
_, err = rw.Read()
require.Equal(t, io.EOF, err)
conn.Close()
close(closed)
}()

select {
case <-closed:
case <-time.After(1 * time.Second):
t.Errorf("should not happen")
}

// the client reconnects to the server due to autoReconnector
conn, err = ln.Accept()
require.NoError(t, err)
conn.Close()
})
}
}
10 changes: 2 additions & 8 deletions endpoint_custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/bluenviron/gomavlib/v2/pkg/dialect"
"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
)

var _ endpointChannelSingle = (*endpointCustom)(nil)
Expand Down Expand Up @@ -65,15 +64,10 @@ func (e *dummyEndpoint) Write(p []byte) (int, error) {
}

func TestEndpointCustom(t *testing.T) {
dial := &dialect.Dialect{
Version: 3,
Messages: []message.Message{&MessageHeartbeat{}},
}

de := newDummyEndpoint()

node, err := NewNode(NodeConf{
Dialect: dial,
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{EndpointCustom{de}},
Expand All @@ -87,7 +81,7 @@ func TestEndpointCustom(t *testing.T) {
Channel: evt.(*EventChannelOpen).Channel,
}, evt)

dialectRW, err := dialect.NewReadWriter(dial)
dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

var buf bytes.Buffer
Expand Down
19 changes: 4 additions & 15 deletions endpoint_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/bluenviron/gomavlib/v2/pkg/dialect"
"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
)

var _ endpointChannelSingle = (*endpointSerial)(nil)
Expand All @@ -22,13 +21,8 @@ func TestEndpointSerial(t *testing.T) {
return de, nil
}

dial := &dialect.Dialect{
Version: 3,
Messages: []message.Message{&MessageHeartbeat{}},
}

node, err := NewNode(NodeConf{
Dialect: dial,
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{EndpointSerial{
Expand All @@ -49,7 +43,7 @@ func TestEndpointSerial(t *testing.T) {

de := <-endpointCreated

dialectRW, err := dialect.NewReadWriter(dial)
dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

var buf bytes.Buffer
Expand Down Expand Up @@ -120,13 +114,8 @@ func TestEndpointSerialReconnect(t *testing.T) {
return de, nil
}

dial := &dialect.Dialect{
Version: 3,
Messages: []message.Message{&MessageHeartbeat{}},
}

node, err := NewNode(NodeConf{
Dialect: dial,
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{EndpointSerial{
Expand All @@ -147,7 +136,7 @@ func TestEndpointSerialReconnect(t *testing.T) {

de := <-endpointCreated

dialectRW, err := dialect.NewReadWriter(dial)
dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

var buf bytes.Buffer
Expand Down
32 changes: 23 additions & 9 deletions endpoint_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"net"
"time"

"github.com/pion/transport/v2/udp"

"github.com/bluenviron/gomavlib/v2/pkg/timednetconn"
"github.com/bluenviron/gomavlib/v2/pkg/udplistener"
)

type endpointServerConf interface {
Expand Down Expand Up @@ -53,6 +54,7 @@ type endpointServer struct {
conf endpointServerConf
listener net.Listener
writeTimeout time.Duration
idleTimeout time.Duration

// in
terminate chan struct{}
Expand All @@ -72,20 +74,29 @@ func initEndpointServer(node *Node, conf endpointServerConf) (Endpoint, error) {
return nil, fmt.Errorf("invalid address")
}

var listener net.Listener
var ln net.Listener
if conf.isUDP() {
listener, err = udplistener.New("udp4", conf.getAddress())
addr, err := net.ResolveUDPAddr("udp4", conf.getAddress())
if err != nil {
return nil, err
}

ln, err = udp.Listen("udp4", addr)
if err != nil {
return nil, err
}
} else {
listener, err = net.Listen("tcp4", conf.getAddress())
}
if err != nil {
return nil, err
ln, err = net.Listen("tcp4", conf.getAddress())
if err != nil {
return nil, err
}
}

t := &endpointServer{
conf: conf,
writeTimeout: node.conf.WriteTimeout,
listener: listener,
idleTimeout: node.conf.IdleTimeout,
listener: ln,
terminate: make(chan struct{}),
}
return t, nil
Expand Down Expand Up @@ -118,7 +129,10 @@ func (t *endpointServer) accept() (string, io.ReadWriteCloser, error) {
return "tcp"
}(), nconn.RemoteAddr())

conn := timednetconn.New(t.writeTimeout, nconn)
conn := timednetconn.New(
t.idleTimeout,
t.writeTimeout,
nconn)

return label, conn, nil
}
Loading

0 comments on commit 5510fa9

Please sign in to comment.